55
50
# nb: check this before importing anything else from within it
56
51
_testtools_version = getattr(testtools, '__version__', ())
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
59
54
% (testtools.__file__, _testtools_version))
60
55
from testtools import content
62
58
from bzrlib import (
62
commands as _mod_commands,
71
plugin as _mod_plugin,
78
transport as _mod_transport,
80
import bzrlib.commands
81
import bzrlib.timestamp
83
import bzrlib.inventory
84
import bzrlib.iterablefile
87
82
import bzrlib.lsprof
88
83
except ImportError:
89
84
# lsprof not available
91
from bzrlib.merge import merge_inner
94
from bzrlib.smart import client, request, server
96
from bzrlib import symbol_versioning
86
from bzrlib.smart import client, request
87
from bzrlib.transport import (
97
91
from bzrlib.symbol_versioning import (
99
92
deprecated_function,
105
from bzrlib.transport import (
110
import bzrlib.transport
111
from bzrlib.trace import mutter, note
112
95
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
122
100
from bzrlib.ui import NullProgressView
123
101
from bzrlib.ui.text import TextUIFactory
124
import bzrlib.version_info_formats.format_custom
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
127
103
# Mark this python module as being part of the implementation
128
104
# of unittest: this gives us better tracebacks where the last
140
116
SUBUNIT_SEEK_SET = 0
141
117
SUBUNIT_SEEK_CUR = 1
144
class ExtendedTestResult(unittest._TextTestResult):
119
# These are intentionally brought into this namespace. That way plugins, etc
120
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
121
TestSuite = TestUtil.TestSuite
122
TestLoader = TestUtil.TestLoader
124
# Tests should run in a clean and clearly defined environment. The goal is to
125
# keep them isolated from the running environment as mush as possible. The test
126
# framework ensures the variables defined below are set (or deleted if the
127
# value is None) before a test is run and reset to their original value after
128
# the test is run. Generally if some code depends on an environment variable,
129
# the tests should start without this variable in the environment. There are a
130
# few exceptions but you shouldn't violate this rule lightly.
134
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
135
# tests do check our impls match APPDATA
136
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
140
'BZREMAIL': None, # may still be present in the environment
141
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
142
'BZR_PROGRESS_BAR': None,
143
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
144
# as a base class instead of TestCaseInTempDir. Tests inheriting from
145
# TestCase should not use disk resources, BZR_LOG is one.
146
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
147
'BZR_PLUGIN_PATH': None,
148
'BZR_DISABLE_PLUGINS': None,
149
'BZR_PLUGINS_AT': None,
150
'BZR_CONCURRENCY': None,
151
# Make sure that any text ui tests are consistent regardless of
152
# the environment the test case is run in; you may want tests that
153
# test other combinations. 'dumb' is a reasonable guess for tests
154
# going to a pipe or a StringIO.
160
'SSH_AUTH_SOCK': None,
170
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
171
# least. If you do (care), please update this comment
175
'BZR_REMOTE_PATH': None,
176
# Generally speaking, we don't want apport reporting on crashes in
177
# the test envirnoment unless we're specifically testing apport,
178
# so that it doesn't leak into the real system environment. We
179
# use an env var so it propagates to subprocesses.
180
'APPORT_DISABLE': '1',
184
def override_os_environ(test, env=None):
185
"""Modify os.environ keeping a copy.
187
:param test: A test instance
189
:param env: A dict containing variable definitions to be installed
192
env = isolated_environ
193
test._original_os_environ = dict([(var, value)
194
for var, value in os.environ.iteritems()])
195
for var, value in env.iteritems():
196
osutils.set_or_unset_env(var, value)
197
if var not in test._original_os_environ:
198
# The var is new, add it with a value of None, so
199
# restore_os_environ will delete it
200
test._original_os_environ[var] = None
203
def restore_os_environ(test):
204
"""Restore os.environ to its original state.
206
:param test: A test instance previously passed to override_os_environ.
208
for var, value in test._original_os_environ.iteritems():
209
# Restore the original value (or delete it if the value has been set to
210
# None in override_os_environ).
211
osutils.set_or_unset_env(var, value)
214
class ExtendedTestResult(testtools.TextTestResult):
145
215
"""Accepts, reports and accumulates the results of running tests.
147
217
Compared to the unittest version this class adds support for
196
266
self._overall_start_time = time.time()
197
267
self._strict = strict
268
self._first_thread_leaker_id = None
269
self._tests_leaking_threads_count = 0
270
self._traceback_from_test = None
199
272
def stopTestRun(self):
200
273
run = self.testsRun
201
274
actionTaken = "Ran"
202
275
stopTime = time.time()
203
276
timeTaken = stopTime - self.startTime
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
277
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
278
# the parent class method is similar have to duplicate
279
self._show_list('ERROR', self.errors)
280
self._show_list('FAIL', self.failures)
281
self.stream.write(self.sep2)
282
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
207
283
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
209
284
if not self.wasSuccessful():
210
285
self.stream.write("FAILED (")
211
286
failed, errored = map(len, (self.failures, self.errors))
218
293
if failed or errored: self.stream.write(", ")
219
294
self.stream.write("known_failure_count=%d" %
220
295
self.known_failure_count)
221
self.stream.writeln(")")
296
self.stream.write(")\n")
223
298
if self.known_failure_count:
224
self.stream.writeln("OK (known_failures=%d)" %
299
self.stream.write("OK (known_failures=%d)\n" %
225
300
self.known_failure_count)
227
self.stream.writeln("OK")
302
self.stream.write("OK\n")
228
303
if self.skip_count > 0:
229
304
skipped = self.skip_count
230
self.stream.writeln('%d test%s skipped' %
305
self.stream.write('%d test%s skipped\n' %
231
306
(skipped, skipped != 1 and "s" or ""))
232
307
if self.unsupported:
233
308
for feature, count in sorted(self.unsupported.items()):
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
309
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
235
310
(feature, count))
237
312
ok = self.wasStrictlySuccessful()
239
314
ok = self.wasSuccessful()
240
if TestCase._first_thread_leaker_id:
315
if self._first_thread_leaker_id:
241
316
self.stream.write(
242
317
'%s is leaking threads among %d leaking tests.\n' % (
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
318
self._first_thread_leaker_id,
319
self._tests_leaking_threads_count))
245
320
# We don't report the main thread as an active one.
246
321
self.stream.write(
247
322
'%d non-main threads were left active in the end.\n'
248
% (TestCase._active_threads - 1))
323
% (len(self._active_threads) - 1))
250
325
def getDescription(self, test):
276
352
def _shortened_test_description(self, test):
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
354
what = re.sub(r'^bzrlib\.tests\.', '', what)
357
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
358
# multiple times in a row, because the handler is added for
359
# each test but the container list is shared between cases.
360
# See lp:498869 lp:625574 and lp:637725 for background.
361
def _record_traceback_from_test(self, exc_info):
362
"""Store the traceback from passed exc_info tuple till"""
363
self._traceback_from_test = exc_info[2]
281
365
def startTest(self, test):
282
unittest.TestResult.startTest(self, test)
366
super(ExtendedTestResult, self).startTest(test)
283
367
if self.count == 0:
284
368
self.startTests()
285
370
self.report_test_start(test)
286
371
test.number = self.count
287
372
self._recordTestStartTime()
373
# Make testtools cases give us the real traceback on failure
374
addOnException = getattr(test, "addOnException", None)
375
if addOnException is not None:
376
addOnException(self._record_traceback_from_test)
377
# Only check for thread leaks on bzrlib derived test cases
378
if isinstance(test, TestCase):
379
test.addCleanup(self._check_leaked_threads, test)
381
def stopTest(self, test):
382
super(ExtendedTestResult, self).stopTest(test)
383
# Manually break cycles, means touching various private things but hey
384
getDetails = getattr(test, "getDetails", None)
385
if getDetails is not None:
387
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
388
if type_equality_funcs is not None:
389
type_equality_funcs.clear()
390
self._traceback_from_test = None
289
392
def startTests(self):
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
393
self.report_tests_starting()
394
self._active_threads = threading.enumerate()
396
def _check_leaked_threads(self, test):
397
"""See if any threads have leaked since last call
399
A sample of live threads is stored in the _active_threads attribute,
400
when this method runs it compares the current live threads and any not
401
in the previous sample are treated as having leaked.
403
now_active_threads = set(threading.enumerate())
404
threads_leaked = now_active_threads.difference(self._active_threads)
406
self._report_thread_leak(test, threads_leaked, now_active_threads)
407
self._tests_leaking_threads_count += 1
408
if self._first_thread_leaker_id is None:
409
self._first_thread_leaker_id = test.id()
410
self._active_threads = now_active_threads
308
412
def _recordTestStartTime(self):
309
413
"""Record that a test has started."""
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
414
self._start_datetime = self._now()
319
416
def addError(self, test, err):
320
417
"""Tell result that test finished with an error.
356
451
self._formatTime(benchmark_time),
358
453
self.report_success(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
454
super(ExtendedTestResult, self).addSuccess(test)
361
455
test._log_contents = ''
363
457
def addExpectedFailure(self, test, err):
364
458
self.known_failure_count += 1
365
459
self.report_known_failure(test, err)
461
def addUnexpectedSuccess(self, test, details=None):
462
"""Tell result the test unexpectedly passed, counting as a failure
464
When the minimum version of testtools required becomes 0.9.8 this
465
can be updated to use the new handling there.
467
super(ExtendedTestResult, self).addFailure(test, details=details)
468
self.failure_count += 1
469
self.report_unexpected_success(test,
470
"".join(details["reason"].iter_text()))
367
474
def addNotSupported(self, test, feature):
368
475
"""The test will not be run because of a missing feature.
401
509
raise errors.BzrError("Unknown whence %r" % whence)
403
def report_cleaning_up(self):
511
def report_tests_starting(self):
512
"""Display information before the test run begins"""
513
if getattr(sys, 'frozen', None) is None:
514
bzr_path = osutils.realpath(sys.argv[0])
516
bzr_path = sys.executable
518
'bzr selftest: %s\n' % (bzr_path,))
521
bzrlib.__path__[0],))
523
' bzr-%s python-%s %s\n' % (
524
bzrlib.version_string,
525
bzrlib._format_version_tuple(sys.version_info),
526
platform.platform(aliased=1),
528
self.stream.write('\n')
530
def report_test_start(self, test):
531
"""Display information on the test just about to be run"""
533
def _report_thread_leak(self, test, leaked_threads, active_threads):
534
"""Display information on a test that leaked one or more threads"""
535
# GZ 2010-09-09: A leak summary reported separately from the general
536
# thread debugging would be nice. Tests under subunit
537
# need something not using stream, perhaps adding a
538
# testtools details object would be fitting.
539
if 'threads' in selftest_debug_flags:
540
self.stream.write('%s is leaking, active is now %d\n' %
541
(test.id(), len(active_threads)))
406
543
def startTestRun(self):
407
544
self.startTime = time.time()
551
685
return '%s%s' % (indent, err[1])
553
687
def report_error(self, test, err):
554
self.stream.writeln('ERROR %s\n%s'
688
self.stream.write('ERROR %s\n%s\n'
555
689
% (self._testTimeString(test),
556
690
self._error_summary(err)))
558
692
def report_failure(self, test, err):
559
self.stream.writeln(' FAIL %s\n%s'
693
self.stream.write(' FAIL %s\n%s\n'
560
694
% (self._testTimeString(test),
561
695
self._error_summary(err)))
563
697
def report_known_failure(self, test, err):
564
self.stream.writeln('XFAIL %s\n%s'
698
self.stream.write('XFAIL %s\n%s\n'
565
699
% (self._testTimeString(test),
566
700
self._error_summary(err)))
702
def report_unexpected_success(self, test, reason):
703
self.stream.write(' FAIL %s\n%s: %s\n'
704
% (self._testTimeString(test),
705
"Unexpected success. Should have failed",
568
708
def report_success(self, test):
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
709
self.stream.write(' OK %s\n' % self._testTimeString(test))
570
710
for bench_called, stats in getattr(test, '_benchcalls', []):
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
711
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
572
712
stats.pprint(file=self.stream)
573
713
# flush the stream so that we get smooth output. This verbose mode is
574
714
# used to show the output in PQM.
575
715
self.stream.flush()
577
717
def report_skip(self, test, reason):
578
self.stream.writeln(' SKIP %s\n%s'
718
self.stream.write(' SKIP %s\n%s\n'
579
719
% (self._testTimeString(test), reason))
581
721
def report_not_applicable(self, test, reason):
582
self.stream.writeln(' N/A %s\n %s'
722
self.stream.write(' N/A %s\n %s\n'
583
723
% (self._testTimeString(test), reason))
585
725
def report_unsupported(self, test, feature):
586
726
"""test cannot be run because feature is missing."""
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
727
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
588
728
%(self._testTimeString(test), feature))
789
951
routine, and to build and check bzr trees.
791
953
In addition to the usual method of overriding tearDown(), this class also
792
allows subclasses to register functions into the _cleanups list, which is
954
allows subclasses to register cleanup functions via addCleanup, which are
793
955
run in order as the object is torn down. It's less likely this will be
794
956
accidentally overlooked.
797
_active_threads = None
798
_leaking_threads_tests = 0
799
_first_thread_leaker_id = None
800
_log_file_name = None
801
960
# record lsprof data when performing benchmark calls.
802
961
_gather_lsprof_in_benchmarks = False
804
963
def __init__(self, methodName='testMethod'):
805
964
super(TestCase, self).__init__(methodName)
807
965
self._directory_isolation = True
808
966
self.exception_handlers.insert(0,
809
967
(UnavailableFeature, self._do_unsupported_or_skip))
827
981
self._track_transports()
828
982
self._track_locks()
829
983
self._clear_debug_flags()
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
984
# Isolate global verbosity level, to make sure it's reproducible
985
# between tests. We should get rid of this altogether: bug 656694. --
987
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
988
# Isolate config option expansion until its default value for bzrlib is
989
# settled on or a the FIXME associated with _get_expand_default_value
990
# is addressed -- vila 20110219
991
self.overrideAttr(config, '_expand_default_value', None)
992
self._log_files = set()
993
# Each key in the ``_counters`` dict holds a value for a different
994
# counter. When the test ends, addDetail() should be used to output the
995
# counter values. This happens in install_counter_hook().
997
if 'config_stats' in selftest_debug_flags:
998
self._install_config_stats_hooks()
833
1000
def debug(self):
834
1001
# debug a frame up.
836
1003
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
1005
def discardDetail(self, name):
1006
"""Extend the addDetail, getDetails api so we can remove a detail.
1008
eg. bzr always adds the 'log' detail at startup, but we don't want to
1009
include it for skipped, xfail, etc tests.
1011
It is safe to call this for a detail that doesn't exist, in case this
1012
gets called multiple times.
1014
# We cheat. details is stored in __details which means we shouldn't
1015
# touch it. but getDetails() returns the dict directly, so we can
1017
details = self.getDetails()
1021
def install_counter_hook(self, hooks, name, counter_name=None):
1022
"""Install a counting hook.
1024
Any hook can be counted as long as it doesn't need to return a value.
1026
:param hooks: Where the hook should be installed.
1028
:param name: The hook name that will be counted.
1030
:param counter_name: The counter identifier in ``_counters``, defaults
1033
_counters = self._counters # Avoid closing over self
1034
if counter_name is None:
1036
if _counters.has_key(counter_name):
1037
raise AssertionError('%s is already used as a counter name'
1039
_counters[counter_name] = 0
1040
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1041
lambda: ['%d' % (_counters[counter_name],)]))
1042
def increment_counter(*args, **kwargs):
1043
_counters[counter_name] += 1
1044
label = 'count %s calls' % (counter_name,)
1045
hooks.install_named_hook(name, increment_counter, label)
1046
self.addCleanup(hooks.uninstall_named_hook, name, label)
1048
def _install_config_stats_hooks(self):
1049
"""Install config hooks to count hook calls.
1052
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1053
self.install_counter_hook(config.ConfigHooks, hook_name,
1054
'config.%s' % (hook_name,))
1056
# The OldConfigHooks are private and need special handling to protect
1057
# against recursive tests (tests that run other tests), so we just do
1058
# manually what registering them into _builtin_known_hooks will provide
1060
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1061
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
self.install_counter_hook(config.OldConfigHooks, hook_name,
1063
'old_config.%s' % (hook_name,))
853
1065
def _clear_debug_flags(self):
854
1066
"""Prevent externally set debug flags affecting tests.
866
1078
def _clear_hooks(self):
867
1079
# prevent hooks affecting tests
1080
known_hooks = hooks.known_hooks
868
1081
self._preserved_hooks = {}
869
for key, factory in hooks.known_hooks.items():
870
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
871
current_hooks = hooks.known_hooks_key_to_object(key)
1082
for key, (parent, name) in known_hooks.iter_parent_objects():
1083
current_hooks = getattr(parent, name)
872
1084
self._preserved_hooks[parent] = (name, current_hooks)
1085
self._preserved_lazy_hooks = hooks._lazy_hooks
1086
hooks._lazy_hooks = {}
873
1087
self.addCleanup(self._restoreHooks)
874
for key, factory in hooks.known_hooks.items():
875
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1088
for key, (parent, name) in known_hooks.iter_parent_objects():
1089
factory = known_hooks.get(key)
876
1090
setattr(parent, name, factory())
877
1091
# this hook should always be installed
878
1092
request._install_hook()
1135
1353
'st_mtime did not match')
1136
1354
self.assertEqual(expected.st_ctime, actual.st_ctime,
1137
1355
'st_ctime did not match')
1138
if sys.platform != 'win32':
1356
if sys.platform == 'win32':
1139
1357
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1140
1358
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1359
# odd. We just force it to always be 0 to avoid any problems.
1360
self.assertEqual(0, expected.st_dev)
1361
self.assertEqual(0, actual.st_dev)
1362
self.assertEqual(0, expected.st_ino)
1363
self.assertEqual(0, actual.st_ino)
1143
1365
self.assertEqual(expected.st_dev, actual.st_dev,
1144
1366
'st_dev did not match')
1145
1367
self.assertEqual(expected.st_ino, actual.st_ino,
1322
1547
self.assertEqual(expected_docstring, obj.__doc__)
1549
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1324
1550
def failUnlessExists(self, path):
1551
return self.assertPathExists(path)
1553
def assertPathExists(self, path):
1325
1554
"""Fail unless path or paths, which may be abs or relative, exist."""
1326
1555
if not isinstance(path, basestring):
1328
self.failUnlessExists(p)
1557
self.assertPathExists(p)
1330
self.failUnless(osutils.lexists(path),path+" does not exist")
1559
self.assertTrue(osutils.lexists(path),
1560
path + " does not exist")
1562
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1332
1563
def failIfExists(self, path):
1564
return self.assertPathDoesNotExist(path)
1566
def assertPathDoesNotExist(self, path):
1333
1567
"""Fail if path or paths, which may be abs or relative, exist."""
1334
1568
if not isinstance(path, basestring):
1336
self.failIfExists(p)
1570
self.assertPathDoesNotExist(p)
1338
self.failIf(osutils.lexists(path),path+" exists")
1572
self.assertFalse(osutils.lexists(path),
1340
1575
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1341
1576
"""A helper for callDeprecated and applyDeprecated.
1456
1692
The file is removed as the test is torn down.
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1694
pseudo_log_file = StringIO()
1695
def _get_log_contents_for_weird_testtools_api():
1696
return [pseudo_log_file.getvalue().decode(
1697
"utf-8", "replace").encode("utf-8")]
1698
self.addDetail("log", content.Content(content.ContentType("text",
1699
"plain", {"charset": "utf8"}),
1700
_get_log_contents_for_weird_testtools_api))
1701
self._log_file = pseudo_log_file
1702
self._log_memento = trace.push_log_file(self._log_file)
1462
1703
self.addCleanup(self._finishLogFile)
1464
1705
def _finishLogFile(self):
1465
1706
"""Finished with the log file.
1467
Close the file and delete it, unless setKeepLogfile was called.
1708
Close the file and delete it.
1469
if bzrlib.trace._trace_file:
1710
if trace._trace_file:
1470
1711
# flush the log file, to get all content
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1712
trace._trace_file.flush()
1713
trace.pop_log_file(self._log_memento)
1476
1715
def thisFailsStrictLockCheck(self):
1477
1716
"""It is known that this test would fail with -Dstrict_locks.
1513
1747
setattr(obj, attr_name, new)
1750
def overrideEnv(self, name, new):
1751
"""Set an environment variable, and reset it after the test.
1753
:param name: The environment variable name.
1755
:param new: The value to set the variable to. If None, the
1756
variable is deleted from the environment.
1758
:returns: The actual variable value.
1760
value = osutils.set_or_unset_env(name, new)
1761
self.addCleanup(osutils.set_or_unset_env, name, value)
1764
def recordCalls(self, obj, attr_name):
1765
"""Monkeypatch in a wrapper that will record calls.
1767
The monkeypatch is automatically removed when the test concludes.
1769
:param obj: The namespace holding the reference to be replaced;
1770
typically a module, class, or object.
1771
:param attr_name: A string for the name of the attribute to
1773
:returns: A list that will be extended with one item every time the
1774
function is called, with a tuple of (args, kwargs).
1778
def decorator(*args, **kwargs):
1779
calls.append((args, kwargs))
1780
return orig(*args, **kwargs)
1781
orig = self.overrideAttr(obj, attr_name, decorator)
1516
1784
def _cleanEnvironment(self):
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1519
'HOME': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
self.addCleanup(self._restoreEnvironment)
1567
for name, value in new_env.iteritems():
1568
self._captureVar(name, value)
1570
def _captureVar(self, name, newvalue):
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1785
for name, value in isolated_environ.iteritems():
1786
self.overrideEnv(name, value)
1578
1788
def _restoreHooks(self):
1579
1789
for klass, (name, hooks) in self._preserved_hooks.items():
1580
1790
setattr(klass, name, hooks)
1791
self._preserved_hooks.clear()
1792
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1793
self._preserved_lazy_hooks.clear()
1582
1795
def knownFailure(self, reason):
1583
1796
"""This test has failed for some known reason."""
1584
1797
raise KnownFailure(reason)
1799
def _suppress_log(self):
1800
"""Remove the log info from details."""
1801
self.discardDetail('log')
1586
1803
def _do_skip(self, result, reason):
1804
self._suppress_log()
1587
1805
addSkip = getattr(result, 'addSkip', None)
1588
1806
if not callable(addSkip):
1589
1807
result.addSuccess(result)
1612
1832
self._do_skip(result, reason)
1835
def _report_skip(self, result, err):
1836
"""Override the default _report_skip.
1838
We want to strip the 'log' detail. If we waint until _do_skip, it has
1839
already been formatted into the 'reason' string, and we can't pull it
1842
self._suppress_log()
1843
super(TestCase, self)._report_skip(self, result, err)
1846
def _report_expected_failure(self, result, err):
1849
See _report_skip for motivation.
1851
self._suppress_log()
1852
super(TestCase, self)._report_expected_failure(self, result, err)
1615
1855
def _do_unsupported_or_skip(self, result, e):
1616
1856
reason = e.args[0]
1857
self._suppress_log()
1617
1858
addNotSupported = getattr(result, 'addNotSupported', None)
1618
1859
if addNotSupported is not None:
1619
1860
result.addNotSupported(self, reason)
1645
1886
self._benchtime += time.time() - start
1647
1888
def log(self, *args):
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1729
1891
def get_log(self):
1730
1892
"""Get a unicode string containing the log from bzrlib.trace.
1945
2108
variables. A value of None will unset the env variable.
1946
2109
The values must be strings. The change will only occur in the
1947
2110
child, so you don't need to fix the environment after running.
1948
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
2111
:param skip_if_plan_to_signal: raise TestSkipped when true and system
2112
doesn't support signalling subprocesses.
1950
2113
:param allow_plugins: If False (default) pass --no-plugins to bzr.
2114
:param stderr: file to use for the subprocess's stderr. Valid values
2115
are those valid for the stderr argument of `subprocess.Popen`.
2116
Default value is ``subprocess.PIPE``.
1952
2118
:returns: Popen object for the started process.
1954
2120
if skip_if_plan_to_signal:
1955
if not getattr(os, 'kill', None):
1956
raise TestSkipped("os.kill not available.")
2121
if os.name != "posix":
2122
raise TestSkipped("Sending signals not supported")
1958
2124
if env_changes is None:
1959
2125
env_changes = {}
2168
def _add_subprocess_log(self, log_file_path):
2169
if len(self._log_files) == 0:
2170
# Register an addCleanup func. We do this on the first call to
2171
# _add_subprocess_log rather than in TestCase.setUp so that this
2172
# addCleanup is registered after any cleanups for tempdirs that
2173
# subclasses might create, which will probably remove the log file
2175
self.addCleanup(self._subprocess_log_cleanup)
2176
# self._log_files is a set, so if a log file is reused we won't grab it
2178
self._log_files.add(log_file_path)
2180
def _subprocess_log_cleanup(self):
2181
for count, log_file_path in enumerate(self._log_files):
2182
# We use buffer_now=True to avoid holding the file open beyond
2183
# the life of this function, which might interfere with e.g.
2184
# cleaning tempdirs on Windows.
2185
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2186
#detail_content = content.content_from_file(
2187
# log_file_path, buffer_now=True)
2188
with open(log_file_path, 'rb') as log_file:
2189
log_file_bytes = log_file.read()
2190
detail_content = content.Content(content.ContentType("text",
2191
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2192
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
1997
2195
def _popen(self, *args, **kwargs):
1998
2196
"""Place a call to Popen.
2000
2198
Allows tests to override this method to intercept the calls made to
2001
2199
Popen for introspection.
2003
return Popen(*args, **kwargs)
2201
return subprocess.Popen(*args, **kwargs)
2005
2203
def get_source_path(self):
2006
2204
"""Return the path of the directory containing bzrlib."""
2036
2234
if retcode is not None and retcode != process.returncode:
2037
2235
if process_args is None:
2038
2236
process_args = "(unknown args)"
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2237
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2238
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2041
2239
self.fail('Command bzr %s failed with retcode %s != %s'
2042
2240
% (process_args, retcode, process.returncode))
2043
2241
return [out, err]
2045
def check_inventory_shape(self, inv, shape):
2046
"""Compare an inventory to a list of expected names.
2243
def check_tree_shape(self, tree, shape):
2244
"""Compare a tree to a list of expected names.
2048
2246
Fail if they are not precisely equal.
2051
2249
shape = list(shape) # copy
2052
for path, ie in inv.entries():
2250
for path, ie in tree.iter_entries_by_dir():
2053
2251
name = path.replace('\\', '/')
2054
2252
if ie.kind == 'directory':
2055
2253
name = name + '/'
2255
pass # ignore root entry
2057
2257
shape.remove(name)
2059
2259
extras.append(name)
2149
2349
class TestCaseWithMemoryTransport(TestCase):
2150
2350
"""Common test class for tests that do not need disk resources.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2352
Tests that need disk resources should derive from TestCaseInTempDir
2353
orTestCaseWithTransport.
2154
2355
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2357
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2157
2358
a directory which does not exist. This serves to help ensure test isolation
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2359
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2360
must exist. However, TestCaseWithMemoryTransport does not offer local file
2361
defaults for the transport in tests, nor does it obey the command line
2161
2362
override, so tests that accidentally write to the common directory should
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2365
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2366
``.bzr`` directory that stops us ascending higher into the filesystem.
2168
2369
TEST_ROOT = None
2427
2635
test_home_dir = self.test_home_dir
2428
2636
if isinstance(test_home_dir, unicode):
2429
2637
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2638
self.overrideEnv('HOME', test_home_dir)
2639
self.overrideEnv('BZR_HOME', test_home_dir)
2433
2641
def setUp(self):
2434
2642
super(TestCaseWithMemoryTransport, self).setUp()
2643
# Ensure that ConnectedTransport doesn't leak sockets
2644
def get_transport_from_url_with_cleanup(*args, **kwargs):
2645
t = orig_get_transport_from_url(*args, **kwargs)
2646
if isinstance(t, _mod_transport.ConnectedTransport):
2647
self.addCleanup(t.disconnect)
2650
orig_get_transport_from_url = self.overrideAttr(
2651
_mod_transport, 'get_transport_from_url',
2652
get_transport_from_url_with_cleanup)
2435
2653
self._make_test_root()
2436
2654
self.addCleanup(os.chdir, os.getcwdu())
2437
2655
self.makeAndChdirToTestDir()
3191
3421
def partition_tests(suite, count):
3192
3422
"""Partition suite into count lists of tests."""
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3423
# This just assigns tests in a round-robin fashion. On one hand this
3424
# splits up blocks of related tests that might run faster if they shared
3425
# resources, but on the other it avoids assigning blocks of slow tests to
3426
# just one partition. So the slowest partition shouldn't be much slower
3428
partitions = [list() for i in range(count)]
3429
tests = iter_suite_tests(suite)
3430
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3431
partition.append(test)
3204
3435
def workaround_zealous_crypto_random():
3325
class ForwardingResult(unittest.TestResult):
3327
def __init__(self, target):
3328
unittest.TestResult.__init__(self)
3329
self.result = target
3331
def startTest(self, test):
3332
self.result.startTest(test)
3334
def stopTest(self, test):
3335
self.result.stopTest(test)
3337
def startTestRun(self):
3338
self.result.startTestRun()
3340
def stopTestRun(self):
3341
self.result.stopTestRun()
3343
def addSkip(self, test, reason):
3344
self.result.addSkip(test, reason)
3346
def addSuccess(self, test):
3347
self.result.addSuccess(test)
3349
def addError(self, test, err):
3350
self.result.addError(test, err)
3352
def addFailure(self, test, err):
3353
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3357
class ProfileResult(ForwardingResult):
3558
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3358
3559
"""Generate profiling data for all activity between start and success.
3360
3561
The profile data is appended to the test's _benchcalls attribute and can
3648
3859
'bzrlib.tests.per_repository',
3649
3860
'bzrlib.tests.per_repository_chk',
3650
3861
'bzrlib.tests.per_repository_reference',
3862
'bzrlib.tests.per_repository_vf',
3651
3863
'bzrlib.tests.per_uifactory',
3652
3864
'bzrlib.tests.per_versionedfile',
3653
3865
'bzrlib.tests.per_workingtree',
3654
3866
'bzrlib.tests.test__annotator',
3655
3867
'bzrlib.tests.test__bencode',
3868
'bzrlib.tests.test__btree_serializer',
3656
3869
'bzrlib.tests.test__chk_map',
3657
3870
'bzrlib.tests.test__dirstate_helpers',
3658
3871
'bzrlib.tests.test__groupcompress',
3686
3899
'bzrlib.tests.test_commit_merge',
3687
3900
'bzrlib.tests.test_config',
3688
3901
'bzrlib.tests.test_conflicts',
3902
'bzrlib.tests.test_controldir',
3689
3903
'bzrlib.tests.test_counted_lock',
3690
3904
'bzrlib.tests.test_crash',
3691
3905
'bzrlib.tests.test_decorators',
3692
3906
'bzrlib.tests.test_delta',
3693
3907
'bzrlib.tests.test_debug',
3694
'bzrlib.tests.test_deprecated_graph',
3695
3908
'bzrlib.tests.test_diff',
3696
3909
'bzrlib.tests.test_directory_service',
3697
3910
'bzrlib.tests.test_dirstate',
3699
3912
'bzrlib.tests.test_eol_filters',
3700
3913
'bzrlib.tests.test_errors',
3701
3914
'bzrlib.tests.test_export',
3915
'bzrlib.tests.test_export_pot',
3702
3916
'bzrlib.tests.test_extract',
3917
'bzrlib.tests.test_features',
3703
3918
'bzrlib.tests.test_fetch',
3919
'bzrlib.tests.test_fixtures',
3704
3920
'bzrlib.tests.test_fifo_cache',
3705
3921
'bzrlib.tests.test_filters',
3922
'bzrlib.tests.test_filter_tree',
3706
3923
'bzrlib.tests.test_ftp_transport',
3707
3924
'bzrlib.tests.test_foreign',
3708
3925
'bzrlib.tests.test_generate_docs',
3734
3953
'bzrlib.tests.test_lru_cache',
3735
3954
'bzrlib.tests.test_lsprof',
3736
3955
'bzrlib.tests.test_mail_client',
3956
'bzrlib.tests.test_matchers',
3737
3957
'bzrlib.tests.test_memorytree',
3738
3958
'bzrlib.tests.test_merge',
3739
3959
'bzrlib.tests.test_merge3',
3740
3960
'bzrlib.tests.test_merge_core',
3741
3961
'bzrlib.tests.test_merge_directive',
3962
'bzrlib.tests.test_mergetools',
3742
3963
'bzrlib.tests.test_missing',
3743
3964
'bzrlib.tests.test_msgeditor',
3744
3965
'bzrlib.tests.test_multiparent',
3937
4167
# Some tests mentioned in the list are not in the test suite. The
3938
4168
# list may be out of date, report to the tester.
3939
4169
for id in not_found:
3940
bzrlib.trace.warning('"%s" not found in the test suite', id)
4170
trace.warning('"%s" not found in the test suite', id)
3941
4171
for id in duplicates:
3942
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4172
trace.warning('"%s" is used as an id by several tests', id)
3947
def multiply_scenarios(scenarios_left, scenarios_right):
4177
def multiply_scenarios(*scenarios):
4178
"""Multiply two or more iterables of scenarios.
4180
It is safe to pass scenario generators or iterators.
4182
:returns: A list of compound scenarios: the cross-product of all
4183
scenarios, with the names concatenated and the parameters
4186
return reduce(_multiply_two_scenarios, map(list, scenarios))
4189
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3948
4190
"""Multiply two sets of scenarios.
3950
4192
:returns: the cartesian product of the two sets of scenarios, that is
4101
4356
if test_id != None:
4102
4357
ui.ui_factory.clear_term()
4103
4358
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4359
# Ugly, but the last thing we want here is fail, so bear with it.
4360
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4361
).encode('ascii', 'replace')
4104
4362
sys.stderr.write('Unable to remove testing dir %s\n%s'
4105
% (os.path.basename(dirname), e))
4108
class Feature(object):
4109
"""An operating system Feature."""
4112
self._available = None
4114
def available(self):
4115
"""Is the feature available?
4117
:return: True if the feature is available.
4119
if self._available is None:
4120
self._available = self._probe()
4121
return self._available
4124
"""Implement this method in concrete features.
4126
:return: True if the feature is available.
4128
raise NotImplementedError
4131
if getattr(self, 'feature_name', None):
4132
return self.feature_name()
4133
return self.__class__.__name__
4136
class _SymlinkFeature(Feature):
4139
return osutils.has_symlinks()
4141
def feature_name(self):
4144
SymlinkFeature = _SymlinkFeature()
4147
class _HardlinkFeature(Feature):
4150
return osutils.has_hardlinks()
4152
def feature_name(self):
4155
HardlinkFeature = _HardlinkFeature()
4158
class _OsFifoFeature(Feature):
4161
return getattr(os, 'mkfifo', None)
4163
def feature_name(self):
4164
return 'filesystem fifos'
4166
OsFifoFeature = _OsFifoFeature()
4169
class _UnicodeFilenameFeature(Feature):
4170
"""Does the filesystem support Unicode filenames?"""
4174
# Check for character combinations unlikely to be covered by any
4175
# single non-unicode encoding. We use the characters
4176
# - greek small letter alpha (U+03B1) and
4177
# - braille pattern dots-123456 (U+283F).
4178
os.stat(u'\u03b1\u283f')
4179
except UnicodeEncodeError:
4181
except (IOError, OSError):
4182
# The filesystem allows the Unicode filename but the file doesn't
4186
# The filesystem allows the Unicode filename and the file exists,
4190
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4193
class _CompatabilityThunkFeature(Feature):
4194
"""This feature is just a thunk to another feature.
4196
It issues a deprecation warning if it is accessed, to let you know that you
4197
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4202
super(_CompatabilityThunkFeature, self).__init__()
4203
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4208
self._replacement_name = replacement_name
4209
self._dep_version = dep_version
4210
self._feature = None
4213
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4227
return self._feature._probe()
4230
class ModuleAvailableFeature(Feature):
4231
"""This is a feature than describes a module we want to be available.
4233
Declare the name of the module in __init__(), and then after probing, the
4234
module will be available as 'self.module'.
4236
:ivar module: The module if it is available, else None.
4239
def __init__(self, module_name):
4240
super(ModuleAvailableFeature, self).__init__()
4241
self.module_name = module_name
4245
self._module = __import__(self.module_name, {}, {}, [''])
4252
if self.available(): # Make sure the probe has been done
4256
def feature_name(self):
4257
return self.module_name
4260
# This is kept here for compatibility, it is recommended to use
4261
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4363
% (os.path.basename(dirname), printable_e))
4267
4366
def probe_unicode_in_user_encoding():
4300
class _HTTPSServerFeature(Feature):
4301
"""Some tests want an https Server, check if one is available.
4303
Right now, the only way this is available is under python2.6 which provides
4314
def feature_name(self):
4315
return 'HTTPSServer'
4318
HTTPSServerFeature = _HTTPSServerFeature()
4321
class _UnicodeFilename(Feature):
4322
"""Does the filesystem support Unicode filenames?"""
4327
except UnicodeEncodeError:
4329
except (IOError, OSError):
4330
# The filesystem allows the Unicode filename but the file doesn't
4334
# The filesystem allows the Unicode filename and the file exists,
4338
UnicodeFilename = _UnicodeFilename()
4341
class _UTF8Filesystem(Feature):
4342
"""Is the filesystem UTF-8?"""
4345
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4349
UTF8Filesystem = _UTF8Filesystem()
4352
class _BreakinFeature(Feature):
4353
"""Does this platform support the breakin feature?"""
4356
from bzrlib import breakin
4357
if breakin.determine_signal() is None:
4359
if sys.platform == 'win32':
4360
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4361
# We trigger SIGBREAK via a Console api so we need ctypes to
4362
# access the function
4369
def feature_name(self):
4370
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4373
BreakinFeature = _BreakinFeature()
4376
class _CaseInsCasePresFilenameFeature(Feature):
4377
"""Is the file-system case insensitive, but case-preserving?"""
4380
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4382
# first check truly case-preserving for created files, then check
4383
# case insensitive when opening existing files.
4384
name = osutils.normpath(name)
4385
base, rel = osutils.split(name)
4386
found_rel = osutils.canonical_relpath(base, name)
4387
return (found_rel == rel
4388
and os.path.isfile(name.upper())
4389
and os.path.isfile(name.lower()))
4394
def feature_name(self):
4395
return "case-insensitive case-preserving filesystem"
4397
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4400
class _CaseInsensitiveFilesystemFeature(Feature):
4401
"""Check if underlying filesystem is case-insensitive but *not* case
4404
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4405
# more likely to be case preserving, so this case is rare.
4408
if CaseInsCasePresFilenameFeature.available():
4411
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4412
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4413
TestCaseWithMemoryTransport.TEST_ROOT = root
4415
root = TestCaseWithMemoryTransport.TEST_ROOT
4416
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4418
name_a = osutils.pathjoin(tdir, 'a')
4419
name_A = osutils.pathjoin(tdir, 'A')
4421
result = osutils.isdir(name_A)
4422
_rmtree_temp_dir(tdir)
4425
def feature_name(self):
4426
return 'case-insensitive filesystem'
4428
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4452
4399
# Only define SubUnitBzrRunner if subunit is available.
4454
4401
from subunit import TestProtocolClient
4455
4402
from subunit.test_results import AutoTimingTestResultDecorator
4403
class SubUnitBzrProtocolClient(TestProtocolClient):
4405
def addSuccess(self, test, details=None):
4406
# The subunit client always includes the details in the subunit
4407
# stream, but we don't want to include it in ours.
4408
if details is not None and 'log' in details:
4410
return super(SubUnitBzrProtocolClient, self).addSuccess(
4456
4413
class SubUnitBzrRunner(TextTestRunner):
4457
4414
def run(self, test):
4458
4415
result = AutoTimingTestResultDecorator(
4459
TestProtocolClient(self.stream))
4416
SubUnitBzrProtocolClient(self.stream))
4460
4417
test.run(result)
4462
4419
except ImportError:
4465
class _PosixPermissionsFeature(Feature):
4469
# create temporary file and check if specified perms are maintained.
4472
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4473
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4476
os.chmod(name, write_perms)
4478
read_perms = os.stat(name).st_mode & 0777
4480
return (write_perms == read_perms)
4482
return (os.name == 'posix') and has_perms()
4484
def feature_name(self):
4485
return 'POSIX permissions support'
4487
posix_permissions_feature = _PosixPermissionsFeature()
4423
@deprecated_function(deprecated_in((2, 5, 0)))
4424
def ModuleAvailableFeature(name):
4425
from bzrlib.tests import features
4426
return features.ModuleAvailableFeature(name)