50
55
# nb: check this before importing anything else from within it
51
56
_testtools_version = getattr(testtools, '__version__', ())
52
if _testtools_version < (0, 9, 5):
53
raise ImportError("need at least testtools 0.9.5: %s is %r"
57
if _testtools_version < (0, 9, 2):
58
raise ImportError("need at least testtools 0.9.2: %s is %r"
54
59
% (testtools.__file__, _testtools_version))
55
60
from testtools import content
58
62
from bzrlib import (
62
commands as _mod_commands,
72
plugin as _mod_plugin,
79
transport as _mod_transport,
80
import bzrlib.commands
81
import bzrlib.timestamp
83
import bzrlib.inventory
84
import bzrlib.iterablefile
83
87
import bzrlib.lsprof
84
88
except ImportError:
85
89
# lsprof not available
87
from bzrlib.smart import client, request
91
from bzrlib.merge import merge_inner
94
from bzrlib.smart import client, request, server
96
from bzrlib import symbol_versioning
97
from bzrlib.symbol_versioning import (
88
105
from bzrlib.transport import (
92
from bzrlib.symbol_versioning import (
110
import bzrlib.transport
111
from bzrlib.trace import mutter, note
96
112
from bzrlib.tests import (
116
from bzrlib.tests.http_server import HttpServer
117
from bzrlib.tests.TestUtil import (
121
from bzrlib.tests.treeshape import build_tree_contents
101
122
from bzrlib.ui import NullProgressView
102
123
from bzrlib.ui.text import TextUIFactory
124
import bzrlib.version_info_formats.format_custom
125
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
104
127
# Mark this python module as being part of the implementation
105
128
# of unittest: this gives us better tracebacks where the last
117
140
SUBUNIT_SEEK_SET = 0
118
141
SUBUNIT_SEEK_CUR = 1
120
# These are intentionally brought into this namespace. That way plugins, etc
121
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
122
TestSuite = TestUtil.TestSuite
123
TestLoader = TestUtil.TestLoader
125
# Tests should run in a clean and clearly defined environment. The goal is to
126
# keep them isolated from the running environment as mush as possible. The test
127
# framework ensures the variables defined below are set (or deleted if the
128
# value is None) before a test is run and reset to their original value after
129
# the test is run. Generally if some code depends on an environment variable,
130
# the tests should start without this variable in the environment. There are a
131
# few exceptions but you shouldn't violate this rule lightly.
135
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
136
# tests do check our impls match APPDATA
137
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
141
'BZREMAIL': None, # may still be present in the environment
142
'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
'BZR_PROGRESS_BAR': None,
144
# This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
# as a base class instead of TestCaseInTempDir. Tests inheriting from
146
# TestCase should not use disk resources, BZR_LOG is one.
147
'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
148
'BZR_PLUGIN_PATH': None,
149
'BZR_DISABLE_PLUGINS': None,
150
'BZR_PLUGINS_AT': None,
151
'BZR_CONCURRENCY': None,
152
# Make sure that any text ui tests are consistent regardless of
153
# the environment the test case is run in; you may want tests that
154
# test other combinations. 'dumb' is a reasonable guess for tests
155
# going to a pipe or a StringIO.
161
'SSH_AUTH_SOCK': None,
171
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
172
# least. If you do (care), please update this comment
176
'BZR_REMOTE_PATH': None,
177
# Generally speaking, we don't want apport reporting on crashes in
178
# the test envirnoment unless we're specifically testing apport,
179
# so that it doesn't leak into the real system environment. We
180
# use an env var so it propagates to subprocesses.
181
'APPORT_DISABLE': '1',
185
def override_os_environ(test, env=None):
186
"""Modify os.environ keeping a copy.
188
:param test: A test instance
190
:param env: A dict containing variable definitions to be installed
193
env = isolated_environ
194
test._original_os_environ = dict([(var, value)
195
for var, value in os.environ.iteritems()])
196
for var, value in env.iteritems():
197
osutils.set_or_unset_env(var, value)
198
if var not in test._original_os_environ:
199
# The var is new, add it with a value of None, so
200
# restore_os_environ will delete it
201
test._original_os_environ[var] = None
204
def restore_os_environ(test):
205
"""Restore os.environ to its original state.
207
:param test: A test instance previously passed to override_os_environ.
209
for var, value in test._original_os_environ.iteritems():
210
# Restore the original value (or delete it if the value has been set to
211
# None in override_os_environ).
212
osutils.set_or_unset_env(var, value)
215
class ExtendedTestResult(testtools.TextTestResult):
144
class ExtendedTestResult(unittest._TextTestResult):
216
145
"""Accepts, reports and accumulates the results of running tests.
218
147
Compared to the unittest version this class adds support for
267
196
self._overall_start_time = time.time()
268
197
self._strict = strict
269
self._first_thread_leaker_id = None
270
self._tests_leaking_threads_count = 0
271
self._traceback_from_test = None
273
199
def stopTestRun(self):
274
200
run = self.testsRun
275
201
actionTaken = "Ran"
276
202
stopTime = time.time()
277
203
timeTaken = stopTime - self.startTime
278
# GZ 2010-07-19: Seems testtools has no printErrors method, and though
279
# the parent class method is similar have to duplicate
280
self._show_list('ERROR', self.errors)
281
self._show_list('FAIL', self.failures)
282
self.stream.write(self.sep2)
283
self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
205
self.stream.writeln(self.separator2)
206
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
284
207
run, run != 1 and "s" or "", timeTaken))
208
self.stream.writeln()
285
209
if not self.wasSuccessful():
286
210
self.stream.write("FAILED (")
287
211
failed, errored = map(len, (self.failures, self.errors))
294
218
if failed or errored: self.stream.write(", ")
295
219
self.stream.write("known_failure_count=%d" %
296
220
self.known_failure_count)
297
self.stream.write(")\n")
221
self.stream.writeln(")")
299
223
if self.known_failure_count:
300
self.stream.write("OK (known_failures=%d)\n" %
224
self.stream.writeln("OK (known_failures=%d)" %
301
225
self.known_failure_count)
303
self.stream.write("OK\n")
227
self.stream.writeln("OK")
304
228
if self.skip_count > 0:
305
229
skipped = self.skip_count
306
self.stream.write('%d test%s skipped\n' %
230
self.stream.writeln('%d test%s skipped' %
307
231
(skipped, skipped != 1 and "s" or ""))
308
232
if self.unsupported:
309
233
for feature, count in sorted(self.unsupported.items()):
310
self.stream.write("Missing feature '%s' skipped %d tests.\n" %
234
self.stream.writeln("Missing feature '%s' skipped %d tests." %
311
235
(feature, count))
313
237
ok = self.wasStrictlySuccessful()
315
239
ok = self.wasSuccessful()
316
if self._first_thread_leaker_id:
240
if TestCase._first_thread_leaker_id:
317
241
self.stream.write(
318
242
'%s is leaking threads among %d leaking tests.\n' % (
319
self._first_thread_leaker_id,
320
self._tests_leaking_threads_count))
243
TestCase._first_thread_leaker_id,
244
TestCase._leaking_threads_tests))
321
245
# We don't report the main thread as an active one.
322
246
self.stream.write(
323
247
'%d non-main threads were left active in the end.\n'
324
% (len(self._active_threads) - 1))
248
% (TestCase._active_threads - 1))
326
250
def getDescription(self, test):
353
276
def _shortened_test_description(self, test):
355
what = re.sub(r'^bzrlib\.tests\.', '', what)
278
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
358
# GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
359
# multiple times in a row, because the handler is added for
360
# each test but the container list is shared between cases.
361
# See lp:498869 lp:625574 and lp:637725 for background.
362
def _record_traceback_from_test(self, exc_info):
363
"""Store the traceback from passed exc_info tuple till"""
364
self._traceback_from_test = exc_info[2]
366
281
def startTest(self, test):
367
super(ExtendedTestResult, self).startTest(test)
282
unittest.TestResult.startTest(self, test)
368
283
if self.count == 0:
369
284
self.startTests()
371
285
self.report_test_start(test)
372
286
test.number = self.count
373
287
self._recordTestStartTime()
374
# Make testtools cases give us the real traceback on failure
375
addOnException = getattr(test, "addOnException", None)
376
if addOnException is not None:
377
addOnException(self._record_traceback_from_test)
378
# Only check for thread leaks on bzrlib derived test cases
379
if isinstance(test, TestCase):
380
test.addCleanup(self._check_leaked_threads, test)
382
def stopTest(self, test):
383
super(ExtendedTestResult, self).stopTest(test)
384
# Manually break cycles, means touching various private things but hey
385
getDetails = getattr(test, "getDetails", None)
386
if getDetails is not None:
388
# Clear _type_equality_funcs to try to stop TestCase instances
389
# from wasting memory. 'clear' is not available in all Python
390
# versions (bug 809048)
391
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
if type_equality_funcs is not None:
393
tef_clear = getattr(type_equality_funcs, "clear", None)
394
if tef_clear is None:
395
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
if tef_instance_dict is not None:
397
tef_clear = tef_instance_dict.clear
398
if tef_clear is not None:
400
self._traceback_from_test = None
402
289
def startTests(self):
403
self.report_tests_starting()
404
self._active_threads = threading.enumerate()
406
def _check_leaked_threads(self, test):
407
"""See if any threads have leaked since last call
409
A sample of live threads is stored in the _active_threads attribute,
410
when this method runs it compares the current live threads and any not
411
in the previous sample are treated as having leaked.
413
now_active_threads = set(threading.enumerate())
414
threads_leaked = now_active_threads.difference(self._active_threads)
416
self._report_thread_leak(test, threads_leaked, now_active_threads)
417
self._tests_leaking_threads_count += 1
418
if self._first_thread_leaker_id is None:
419
self._first_thread_leaker_id = test.id()
420
self._active_threads = now_active_threads
291
if getattr(sys, 'frozen', None) is None:
292
bzr_path = osutils.realpath(sys.argv[0])
294
bzr_path = sys.executable
296
'bzr selftest: %s\n' % (bzr_path,))
299
bzrlib.__path__[0],))
301
' bzr-%s python-%s %s\n' % (
302
bzrlib.version_string,
303
bzrlib._format_version_tuple(sys.version_info),
304
platform.platform(aliased=1),
306
self.stream.write('\n')
422
308
def _recordTestStartTime(self):
423
309
"""Record that a test has started."""
424
self._start_datetime = self._now()
310
self._start_time = time.time()
312
def _cleanupLogFile(self, test):
313
# We can only do this if we have one of our TestCases, not if
315
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
316
if setKeepLogfile is not None:
426
319
def addError(self, test, err):
427
320
"""Tell result that test finished with an error.
461
356
self._formatTime(benchmark_time),
463
358
self.report_success(test)
464
super(ExtendedTestResult, self).addSuccess(test)
359
self._cleanupLogFile(test)
360
unittest.TestResult.addSuccess(self, test)
465
361
test._log_contents = ''
467
363
def addExpectedFailure(self, test, err):
468
364
self.known_failure_count += 1
469
365
self.report_known_failure(test, err)
471
def addUnexpectedSuccess(self, test, details=None):
472
"""Tell result the test unexpectedly passed, counting as a failure
474
When the minimum version of testtools required becomes 0.9.8 this
475
can be updated to use the new handling there.
477
super(ExtendedTestResult, self).addFailure(test, details=details)
478
self.failure_count += 1
479
self.report_unexpected_success(test,
480
"".join(details["reason"].iter_text()))
484
367
def addNotSupported(self, test, feature):
485
368
"""The test will not be run because of a missing feature.
519
401
raise errors.BzrError("Unknown whence %r" % whence)
521
def report_tests_starting(self):
522
"""Display information before the test run begins"""
523
if getattr(sys, 'frozen', None) is None:
524
bzr_path = osutils.realpath(sys.argv[0])
526
bzr_path = sys.executable
528
'bzr selftest: %s\n' % (bzr_path,))
531
bzrlib.__path__[0],))
533
' bzr-%s python-%s %s\n' % (
534
bzrlib.version_string,
535
bzrlib._format_version_tuple(sys.version_info),
536
platform.platform(aliased=1),
538
self.stream.write('\n')
540
def report_test_start(self, test):
541
"""Display information on the test just about to be run"""
543
def _report_thread_leak(self, test, leaked_threads, active_threads):
544
"""Display information on a test that leaked one or more threads"""
545
# GZ 2010-09-09: A leak summary reported separately from the general
546
# thread debugging would be nice. Tests under subunit
547
# need something not using stream, perhaps adding a
548
# testtools details object would be fitting.
549
if 'threads' in selftest_debug_flags:
550
self.stream.write('%s is leaking, active is now %d\n' %
551
(test.id(), len(active_threads)))
403
def report_cleaning_up(self):
553
406
def startTestRun(self):
554
407
self.startTime = time.time()
695
551
return '%s%s' % (indent, err[1])
697
553
def report_error(self, test, err):
698
self.stream.write('ERROR %s\n%s\n'
554
self.stream.writeln('ERROR %s\n%s'
699
555
% (self._testTimeString(test),
700
556
self._error_summary(err)))
702
558
def report_failure(self, test, err):
703
self.stream.write(' FAIL %s\n%s\n'
559
self.stream.writeln(' FAIL %s\n%s'
704
560
% (self._testTimeString(test),
705
561
self._error_summary(err)))
707
563
def report_known_failure(self, test, err):
708
self.stream.write('XFAIL %s\n%s\n'
564
self.stream.writeln('XFAIL %s\n%s'
709
565
% (self._testTimeString(test),
710
566
self._error_summary(err)))
712
def report_unexpected_success(self, test, reason):
713
self.stream.write(' FAIL %s\n%s: %s\n'
714
% (self._testTimeString(test),
715
"Unexpected success. Should have failed",
718
568
def report_success(self, test):
719
self.stream.write(' OK %s\n' % self._testTimeString(test))
569
self.stream.writeln(' OK %s' % self._testTimeString(test))
720
570
for bench_called, stats in getattr(test, '_benchcalls', []):
721
self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
571
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
722
572
stats.pprint(file=self.stream)
723
573
# flush the stream so that we get smooth output. This verbose mode is
724
574
# used to show the output in PQM.
725
575
self.stream.flush()
727
577
def report_skip(self, test, reason):
728
self.stream.write(' SKIP %s\n%s\n'
578
self.stream.writeln(' SKIP %s\n%s'
729
579
% (self._testTimeString(test), reason))
731
581
def report_not_applicable(self, test, reason):
732
self.stream.write(' N/A %s\n %s\n'
582
self.stream.writeln(' N/A %s\n %s'
733
583
% (self._testTimeString(test), reason))
735
585
def report_unsupported(self, test, feature):
736
586
"""test cannot be run because feature is missing."""
737
self.stream.write("NODEP %s\n The feature '%s' is not available.\n"
587
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
738
588
%(self._testTimeString(test), feature))
961
789
routine, and to build and check bzr trees.
963
791
In addition to the usual method of overriding tearDown(), this class also
964
allows subclasses to register cleanup functions via addCleanup, which are
792
allows subclasses to register functions into the _cleanups list, which is
965
793
run in order as the object is torn down. It's less likely this will be
966
794
accidentally overlooked.
797
_active_threads = None
798
_leaking_threads_tests = 0
799
_first_thread_leaker_id = None
800
_log_file_name = None
970
801
# record lsprof data when performing benchmark calls.
971
802
_gather_lsprof_in_benchmarks = False
973
804
def __init__(self, methodName='testMethod'):
974
805
super(TestCase, self).__init__(methodName)
975
807
self._directory_isolation = True
976
808
self.exception_handlers.insert(0,
977
809
(UnavailableFeature, self._do_unsupported_or_skip))
991
827
self._track_transports()
992
828
self._track_locks()
993
829
self._clear_debug_flags()
994
# Isolate global verbosity level, to make sure it's reproducible
995
# between tests. We should get rid of this altogether: bug 656694. --
997
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
998
# Isolate config option expansion until its default value for bzrlib is
999
# settled on or a the FIXME associated with _get_expand_default_value
1000
# is addressed -- vila 20110219
1001
self.overrideAttr(config, '_expand_default_value', None)
1002
self._log_files = set()
1003
# Each key in the ``_counters`` dict holds a value for a different
1004
# counter. When the test ends, addDetail() should be used to output the
1005
# counter values. This happens in install_counter_hook().
1007
if 'config_stats' in selftest_debug_flags:
1008
self._install_config_stats_hooks()
1009
# Do not use i18n for tests (unless the test reverses this)
830
TestCase._active_threads = threading.activeCount()
831
self.addCleanup(self._check_leaked_threads)
1012
833
def debug(self):
1013
834
# debug a frame up.
1015
# The sys preserved stdin/stdout should allow blackbox tests debugging
1016
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
).set_trace(sys._getframe().f_back)
1019
def discardDetail(self, name):
1020
"""Extend the addDetail, getDetails api so we can remove a detail.
1022
eg. bzr always adds the 'log' detail at startup, but we don't want to
1023
include it for skipped, xfail, etc tests.
1025
It is safe to call this for a detail that doesn't exist, in case this
1026
gets called multiple times.
1028
# We cheat. details is stored in __details which means we shouldn't
1029
# touch it. but getDetails() returns the dict directly, so we can
1031
details = self.getDetails()
1035
def install_counter_hook(self, hooks, name, counter_name=None):
1036
"""Install a counting hook.
1038
Any hook can be counted as long as it doesn't need to return a value.
1040
:param hooks: Where the hook should be installed.
1042
:param name: The hook name that will be counted.
1044
:param counter_name: The counter identifier in ``_counters``, defaults
1047
_counters = self._counters # Avoid closing over self
1048
if counter_name is None:
1050
if _counters.has_key(counter_name):
1051
raise AssertionError('%s is already used as a counter name'
1053
_counters[counter_name] = 0
1054
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
lambda: ['%d' % (_counters[counter_name],)]))
1056
def increment_counter(*args, **kwargs):
1057
_counters[counter_name] += 1
1058
label = 'count %s calls' % (counter_name,)
1059
hooks.install_named_hook(name, increment_counter, label)
1060
self.addCleanup(hooks.uninstall_named_hook, name, label)
1062
def _install_config_stats_hooks(self):
1063
"""Install config hooks to count hook calls.
1066
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
self.install_counter_hook(config.ConfigHooks, hook_name,
1068
'config.%s' % (hook_name,))
1070
# The OldConfigHooks are private and need special handling to protect
1071
# against recursive tests (tests that run other tests), so we just do
1072
# manually what registering them into _builtin_known_hooks will provide
1074
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
'old_config.%s' % (hook_name,))
836
pdb.Pdb().set_trace(sys._getframe().f_back)
838
def _check_leaked_threads(self):
839
active = threading.activeCount()
840
leaked_threads = active - TestCase._active_threads
841
TestCase._active_threads = active
842
# If some tests make the number of threads *decrease*, we'll consider
843
# that they are just observing old threads dieing, not agressively kill
844
# random threads. So we don't report these tests as leaking. The risk
845
# is that we have false positives that way (the test see 2 threads
846
# going away but leak one) but it seems less likely than the actual
847
# false positives (the test see threads going away and does not leak).
848
if leaked_threads > 0:
849
TestCase._leaking_threads_tests += 1
850
if TestCase._first_thread_leaker_id is None:
851
TestCase._first_thread_leaker_id = self.id()
1079
853
def _clear_debug_flags(self):
1080
854
"""Prevent externally set debug flags affecting tests.
1092
866
def _clear_hooks(self):
1093
867
# prevent hooks affecting tests
1094
known_hooks = hooks.known_hooks
1095
868
self._preserved_hooks = {}
1096
for key, (parent, name) in known_hooks.iter_parent_objects():
1097
current_hooks = getattr(parent, name)
869
for key, factory in hooks.known_hooks.items():
870
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
871
current_hooks = hooks.known_hooks_key_to_object(key)
1098
872
self._preserved_hooks[parent] = (name, current_hooks)
1099
self._preserved_lazy_hooks = hooks._lazy_hooks
1100
hooks._lazy_hooks = {}
1101
873
self.addCleanup(self._restoreHooks)
1102
for key, (parent, name) in known_hooks.iter_parent_objects():
1103
factory = known_hooks.get(key)
874
for key, factory in hooks.known_hooks.items():
875
parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
1104
876
setattr(parent, name, factory())
1105
877
# this hook should always be installed
1106
878
request._install_hook()
1367
1135
'st_mtime did not match')
1368
1136
self.assertEqual(expected.st_ctime, actual.st_ctime,
1369
1137
'st_ctime did not match')
1370
if sys.platform == 'win32':
1138
if sys.platform != 'win32':
1371
1139
# On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1372
1140
# is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1373
# odd. We just force it to always be 0 to avoid any problems.
1374
self.assertEqual(0, expected.st_dev)
1375
self.assertEqual(0, actual.st_dev)
1376
self.assertEqual(0, expected.st_ino)
1377
self.assertEqual(0, actual.st_ino)
1141
# odd. Regardless we shouldn't actually try to assert anything
1142
# about their values
1379
1143
self.assertEqual(expected.st_dev, actual.st_dev,
1380
1144
'st_dev did not match')
1381
1145
self.assertEqual(expected.st_ino, actual.st_ino,
1561
1322
self.assertEqual(expected_docstring, obj.__doc__)
1563
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1564
1324
def failUnlessExists(self, path):
1565
return self.assertPathExists(path)
1567
def assertPathExists(self, path):
1568
1325
"""Fail unless path or paths, which may be abs or relative, exist."""
1569
1326
if not isinstance(path, basestring):
1571
self.assertPathExists(p)
1328
self.failUnlessExists(p)
1573
self.assertTrue(osutils.lexists(path),
1574
path + " does not exist")
1330
self.failUnless(osutils.lexists(path),path+" does not exist")
1576
@symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1577
1332
def failIfExists(self, path):
1578
return self.assertPathDoesNotExist(path)
1580
def assertPathDoesNotExist(self, path):
1581
1333
"""Fail if path or paths, which may be abs or relative, exist."""
1582
1334
if not isinstance(path, basestring):
1584
self.assertPathDoesNotExist(p)
1336
self.failIfExists(p)
1586
self.assertFalse(osutils.lexists(path),
1338
self.failIf(osutils.lexists(path),path+" exists")
1589
1340
def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1590
1341
"""A helper for callDeprecated and applyDeprecated.
1706
1456
The file is removed as the test is torn down.
1708
pseudo_log_file = StringIO()
1709
def _get_log_contents_for_weird_testtools_api():
1710
return [pseudo_log_file.getvalue().decode(
1711
"utf-8", "replace").encode("utf-8")]
1712
self.addDetail("log", content.Content(content.ContentType("text",
1713
"plain", {"charset": "utf8"}),
1714
_get_log_contents_for_weird_testtools_api))
1715
self._log_file = pseudo_log_file
1716
self._log_memento = trace.push_log_file(self._log_file)
1458
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1459
self._log_file = os.fdopen(fileno, 'w+')
1460
self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1461
self._log_file_name = name
1717
1462
self.addCleanup(self._finishLogFile)
1719
1464
def _finishLogFile(self):
1720
1465
"""Finished with the log file.
1722
Close the file and delete it.
1467
Close the file and delete it, unless setKeepLogfile was called.
1724
if trace._trace_file:
1469
if bzrlib.trace._trace_file:
1725
1470
# flush the log file, to get all content
1726
trace._trace_file.flush()
1727
trace.pop_log_file(self._log_memento)
1471
bzrlib.trace._trace_file.flush()
1472
bzrlib.trace.pop_log_file(self._log_memento)
1473
# Cache the log result and delete the file on disk
1474
self._get_log(False)
1729
1476
def thisFailsStrictLockCheck(self):
1730
1477
"""It is known that this test would fail with -Dstrict_locks.
1761
1513
setattr(obj, attr_name, new)
1764
def overrideEnv(self, name, new):
1765
"""Set an environment variable, and reset it after the test.
1767
:param name: The environment variable name.
1769
:param new: The value to set the variable to. If None, the
1770
variable is deleted from the environment.
1772
:returns: The actual variable value.
1774
value = osutils.set_or_unset_env(name, new)
1775
self.addCleanup(osutils.set_or_unset_env, name, value)
1778
def recordCalls(self, obj, attr_name):
1779
"""Monkeypatch in a wrapper that will record calls.
1781
The monkeypatch is automatically removed when the test concludes.
1783
:param obj: The namespace holding the reference to be replaced;
1784
typically a module, class, or object.
1785
:param attr_name: A string for the name of the attribute to
1787
:returns: A list that will be extended with one item every time the
1788
function is called, with a tuple of (args, kwargs).
1792
def decorator(*args, **kwargs):
1793
calls.append((args, kwargs))
1794
return orig(*args, **kwargs)
1795
orig = self.overrideAttr(obj, attr_name, decorator)
1798
1516
def _cleanEnvironment(self):
1799
for name, value in isolated_environ.iteritems():
1800
self.overrideEnv(name, value)
1518
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1519
'HOME': os.getcwd(),
1520
# bzr now uses the Win32 API and doesn't rely on APPDATA, but the
1521
# tests do check our impls match APPDATA
1522
'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1526
'BZREMAIL': None, # may still be present in the environment
1528
'BZR_PROGRESS_BAR': None,
1530
'BZR_PLUGIN_PATH': None,
1531
'BZR_DISABLE_PLUGINS': None,
1532
'BZR_PLUGINS_AT': None,
1533
'BZR_CONCURRENCY': None,
1534
# Make sure that any text ui tests are consistent regardless of
1535
# the environment the test case is run in; you may want tests that
1536
# test other combinations. 'dumb' is a reasonable guess for tests
1537
# going to a pipe or a StringIO.
1541
'BZR_COLUMNS': '80',
1543
'SSH_AUTH_SOCK': None,
1547
'https_proxy': None,
1548
'HTTPS_PROXY': None,
1553
# Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1554
# least. If you do (care), please update this comment
1558
'BZR_REMOTE_PATH': None,
1559
# Generally speaking, we don't want apport reporting on crashes in
1560
# the test envirnoment unless we're specifically testing apport,
1561
# so that it doesn't leak into the real system environment. We
1562
# use an env var so it propagates to subprocesses.
1563
'APPORT_DISABLE': '1',
1566
self.addCleanup(self._restoreEnvironment)
1567
for name, value in new_env.iteritems():
1568
self._captureVar(name, value)
1570
def _captureVar(self, name, newvalue):
1571
"""Set an environment variable, and reset it when finished."""
1572
self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
def _restoreEnvironment(self):
1575
for name, value in self._old_env.iteritems():
1576
osutils.set_or_unset_env(name, value)
1802
1578
def _restoreHooks(self):
1803
1579
for klass, (name, hooks) in self._preserved_hooks.items():
1804
1580
setattr(klass, name, hooks)
1805
self._preserved_hooks.clear()
1806
bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1807
self._preserved_lazy_hooks.clear()
1809
1582
def knownFailure(self, reason):
1810
"""Declare that this test fails for a known reason
1812
Tests that are known to fail should generally be using expectedFailure
1813
with an appropriate reverse assertion if a change could cause the test
1814
to start passing. Conversely if the test has no immediate prospect of
1815
succeeding then using skip is more suitable.
1817
When this method is called while an exception is being handled, that
1818
traceback will be used, otherwise a new exception will be thrown to
1819
provide one but won't be reported.
1821
self._add_reason(reason)
1823
exc_info = sys.exc_info()
1824
if exc_info != (None, None, None):
1825
self._report_traceback(exc_info)
1828
raise self.failureException(reason)
1829
except self.failureException:
1830
exc_info = sys.exc_info()
1831
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
raise testtools.testcase._ExpectedFailure(exc_info)
1836
def _suppress_log(self):
1837
"""Remove the log info from details."""
1838
self.discardDetail('log')
1583
"""This test has failed for some known reason."""
1584
raise KnownFailure(reason)
1840
1586
def _do_skip(self, result, reason):
1841
self._suppress_log()
1842
1587
addSkip = getattr(result, 'addSkip', None)
1843
1588
if not callable(addSkip):
1844
1589
result.addSuccess(result)
1869
1612
self._do_skip(result, reason)
1872
def _report_skip(self, result, err):
1873
"""Override the default _report_skip.
1875
We want to strip the 'log' detail. If we waint until _do_skip, it has
1876
already been formatted into the 'reason' string, and we can't pull it
1879
self._suppress_log()
1880
super(TestCase, self)._report_skip(self, result, err)
1883
def _report_expected_failure(self, result, err):
1886
See _report_skip for motivation.
1888
self._suppress_log()
1889
super(TestCase, self)._report_expected_failure(self, result, err)
1892
1615
def _do_unsupported_or_skip(self, result, e):
1893
1616
reason = e.args[0]
1894
self._suppress_log()
1895
1617
addNotSupported = getattr(result, 'addNotSupported', None)
1896
1618
if addNotSupported is not None:
1897
1619
result.addNotSupported(self, reason)
1923
1645
self._benchtime += time.time() - start
1925
1647
def log(self, *args):
1650
def _get_log(self, keep_log_file=False):
1651
"""Internal helper to get the log from bzrlib.trace for this test.
1653
Please use self.getDetails, or self.get_log to access this in test case
1656
:param keep_log_file: When True, if the log is still a file on disk
1657
leave it as a file on disk. When False, if the log is still a file
1658
on disk, the log file is deleted and the log preserved as
1660
:return: A string containing the log.
1662
if self._log_contents is not None:
1664
self._log_contents.decode('utf8')
1665
except UnicodeDecodeError:
1666
unicodestr = self._log_contents.decode('utf8', 'replace')
1667
self._log_contents = unicodestr.encode('utf8')
1668
return self._log_contents
1670
if bzrlib.trace._trace_file:
1671
# flush the log file, to get all content
1672
bzrlib.trace._trace_file.flush()
1673
if self._log_file_name is not None:
1674
logfile = open(self._log_file_name)
1676
log_contents = logfile.read()
1680
log_contents.decode('utf8')
1681
except UnicodeDecodeError:
1682
unicodestr = log_contents.decode('utf8', 'replace')
1683
log_contents = unicodestr.encode('utf8')
1684
if not keep_log_file:
1686
max_close_attempts = 100
1687
first_close_error = None
1688
while close_attempts < max_close_attempts:
1691
self._log_file.close()
1692
except IOError, ioe:
1693
if ioe.errno is None:
1694
# No errno implies 'close() called during
1695
# concurrent operation on the same file object', so
1696
# retry. Probably a thread is trying to write to
1698
if first_close_error is None:
1699
first_close_error = ioe
1704
if close_attempts > 1:
1706
'Unable to close log file on first attempt, '
1707
'will retry: %s\n' % (first_close_error,))
1708
if close_attempts == max_close_attempts:
1710
'Unable to close log file after %d attempts.\n'
1711
% (max_close_attempts,))
1712
self._log_file = None
1713
# Permit multiple calls to get_log until we clean it up in
1715
self._log_contents = log_contents
1717
os.remove(self._log_file_name)
1719
if sys.platform == 'win32' and e.errno == errno.EACCES:
1720
sys.stderr.write(('Unable to delete log file '
1721
' %r\n' % self._log_file_name))
1724
self._log_file_name = None
1727
return "No log file content and no log file name."
1928
1729
def get_log(self):
1929
1730
"""Get a unicode string containing the log from bzrlib.trace.
2205
def _add_subprocess_log(self, log_file_path):
2206
if len(self._log_files) == 0:
2207
# Register an addCleanup func. We do this on the first call to
2208
# _add_subprocess_log rather than in TestCase.setUp so that this
2209
# addCleanup is registered after any cleanups for tempdirs that
2210
# subclasses might create, which will probably remove the log file
2212
self.addCleanup(self._subprocess_log_cleanup)
2213
# self._log_files is a set, so if a log file is reused we won't grab it
2215
self._log_files.add(log_file_path)
2217
def _subprocess_log_cleanup(self):
2218
for count, log_file_path in enumerate(self._log_files):
2219
# We use buffer_now=True to avoid holding the file open beyond
2220
# the life of this function, which might interfere with e.g.
2221
# cleaning tempdirs on Windows.
2222
# XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
#detail_content = content.content_from_file(
2224
# log_file_path, buffer_now=True)
2225
with open(log_file_path, 'rb') as log_file:
2226
log_file_bytes = log_file.read()
2227
detail_content = content.Content(content.ContentType("text",
2228
"plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2232
1997
def _popen(self, *args, **kwargs):
2233
1998
"""Place a call to Popen.
2235
2000
Allows tests to override this method to intercept the calls made to
2236
2001
Popen for introspection.
2238
return subprocess.Popen(*args, **kwargs)
2003
return Popen(*args, **kwargs)
2240
2005
def get_source_path(self):
2241
2006
"""Return the path of the directory containing bzrlib."""
2271
2036
if retcode is not None and retcode != process.returncode:
2272
2037
if process_args is None:
2273
2038
process_args = "(unknown args)"
2274
trace.mutter('Output of bzr %s:\n%s', process_args, out)
2275
trace.mutter('Error for bzr %s:\n%s', process_args, err)
2039
mutter('Output of bzr %s:\n%s', process_args, out)
2040
mutter('Error for bzr %s:\n%s', process_args, err)
2276
2041
self.fail('Command bzr %s failed with retcode %s != %s'
2277
2042
% (process_args, retcode, process.returncode))
2278
2043
return [out, err]
2280
def check_tree_shape(self, tree, shape):
2281
"""Compare a tree to a list of expected names.
2045
def check_inventory_shape(self, inv, shape):
2046
"""Compare an inventory to a list of expected names.
2283
2048
Fail if they are not precisely equal.
2286
2051
shape = list(shape) # copy
2287
for path, ie in tree.iter_entries_by_dir():
2052
for path, ie in inv.entries():
2288
2053
name = path.replace('\\', '/')
2289
2054
if ie.kind == 'directory':
2290
2055
name = name + '/'
2292
pass # ignore root entry
2294
2057
shape.remove(name)
2296
2059
extras.append(name)
2386
2149
class TestCaseWithMemoryTransport(TestCase):
2387
2150
"""Common test class for tests that do not need disk resources.
2389
Tests that need disk resources should derive from TestCaseInTempDir
2390
orTestCaseWithTransport.
2152
Tests that need disk resources should derive from TestCaseWithTransport.
2392
2154
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2394
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2156
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2157
a directory which does not exist. This serves to help ensure test isolation
2396
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
defaults for the transport in tests, nor does it obey the command line
2158
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2159
must exist. However, TestCaseWithMemoryTransport does not offer local
2160
file defaults for the transport in tests, nor does it obey the command line
2399
2161
override, so tests that accidentally write to the common directory should
2402
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
``.bzr`` directory that stops us ascending higher into the filesystem.
2164
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2165
a .bzr directory that stops us ascending higher into the filesystem.
2406
2168
TEST_ROOT = None
2630
2378
def make_branch(self, relpath, format=None):
2631
2379
"""Create a branch on the transport at relpath."""
2632
2380
repo = self.make_repository(relpath, format=format)
2633
return repo.bzrdir.create_branch(append_revisions_only=False)
2635
def resolve_format(self, format):
2636
"""Resolve an object to a ControlDir format object.
2638
The initial format object can either already be
2639
a ControlDirFormat, None (for the default format),
2640
or a string with the name of the control dir format.
2642
:param format: Object to resolve
2643
:return A ControlDirFormat instance
2647
if isinstance(format, basestring):
2648
format = bzrdir.format_registry.make_bzrdir(format)
2651
def resolve_format(self, format):
2652
"""Resolve an object to a ControlDir format object.
2654
The initial format object can either already be
2655
a ControlDirFormat, None (for the default format),
2656
or a string with the name of the control dir format.
2658
:param format: Object to resolve
2659
:return A ControlDirFormat instance
2663
if isinstance(format, basestring):
2664
format = bzrdir.format_registry.make_bzrdir(format)
2381
return repo.bzrdir.create_branch()
2667
2383
def make_bzrdir(self, relpath, format=None):
2669
2385
# might be a relative or absolute path
2670
2386
maybe_a_url = self.get_url(relpath)
2671
2387
segments = maybe_a_url.rsplit('/', 1)
2672
t = _mod_transport.get_transport(maybe_a_url)
2388
t = get_transport(maybe_a_url)
2673
2389
if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2390
t.ensure_base()
2675
format = self.resolve_format(format)
2393
if isinstance(format, basestring):
2394
format = bzrdir.format_registry.make_bzrdir(format)
2676
2395
return format.initialize_on_transport(t)
2677
2396
except errors.UninitializableFormat:
2678
2397
raise TestSkipped("Format %s is not initializable." % format)
2680
def make_repository(self, relpath, shared=None, format=None):
2399
def make_repository(self, relpath, shared=False, format=None):
2681
2400
"""Create a repository on our default transport at relpath.
2683
2402
Note that relpath must be a relative path, not a full url.
2711
2427
test_home_dir = self.test_home_dir
2712
2428
if isinstance(test_home_dir, unicode):
2713
2429
test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2714
self.overrideEnv('HOME', test_home_dir)
2715
self.overrideEnv('BZR_HOME', test_home_dir)
2430
os.environ['HOME'] = test_home_dir
2431
os.environ['BZR_HOME'] = test_home_dir
2717
2433
def setUp(self):
2718
2434
super(TestCaseWithMemoryTransport, self).setUp()
2719
# Ensure that ConnectedTransport doesn't leak sockets
2720
def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
t = orig_get_transport_from_url(*args, **kwargs)
2722
if isinstance(t, _mod_transport.ConnectedTransport):
2723
self.addCleanup(t.disconnect)
2726
orig_get_transport_from_url = self.overrideAttr(
2727
_mod_transport, 'get_transport_from_url',
2728
get_transport_from_url_with_cleanup)
2729
2435
self._make_test_root()
2730
2436
self.addCleanup(os.chdir, os.getcwdu())
2731
2437
self.makeAndChdirToTestDir()
3497
3191
def partition_tests(suite, count):
3498
3192
"""Partition suite into count lists of tests."""
3499
# This just assigns tests in a round-robin fashion. On one hand this
3500
# splits up blocks of related tests that might run faster if they shared
3501
# resources, but on the other it avoids assigning blocks of slow tests to
3502
# just one partition. So the slowest partition shouldn't be much slower
3504
partitions = [list() for i in range(count)]
3505
tests = iter_suite_tests(suite)
3506
for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3507
partition.append(test)
3194
tests = list(iter_suite_tests(suite))
3195
tests_per_process = int(math.ceil(float(len(tests)) / count))
3196
for block in range(count):
3197
low_test = block * tests_per_process
3198
high_test = low_test + tests_per_process
3199
process_tests = tests[low_test:high_test]
3200
result.append(process_tests)
3511
3204
def workaround_zealous_crypto_random():
3634
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3325
class ForwardingResult(unittest.TestResult):
3327
def __init__(self, target):
3328
unittest.TestResult.__init__(self)
3329
self.result = target
3331
def startTest(self, test):
3332
self.result.startTest(test)
3334
def stopTest(self, test):
3335
self.result.stopTest(test)
3337
def startTestRun(self):
3338
self.result.startTestRun()
3340
def stopTestRun(self):
3341
self.result.stopTestRun()
3343
def addSkip(self, test, reason):
3344
self.result.addSkip(test, reason)
3346
def addSuccess(self, test):
3347
self.result.addSuccess(test)
3349
def addError(self, test, err):
3350
self.result.addError(test, err)
3352
def addFailure(self, test, err):
3353
self.result.addFailure(test, err)
3354
ForwardingResult = testtools.ExtendedToOriginalDecorator
3357
class ProfileResult(ForwardingResult):
3635
3358
"""Generate profiling data for all activity between start and success.
3637
3360
The profile data is appended to the test's _benchcalls attribute and can
3935
3648
'bzrlib.tests.per_repository',
3936
3649
'bzrlib.tests.per_repository_chk',
3937
3650
'bzrlib.tests.per_repository_reference',
3938
'bzrlib.tests.per_repository_vf',
3939
3651
'bzrlib.tests.per_uifactory',
3940
3652
'bzrlib.tests.per_versionedfile',
3941
3653
'bzrlib.tests.per_workingtree',
3942
3654
'bzrlib.tests.test__annotator',
3943
3655
'bzrlib.tests.test__bencode',
3944
'bzrlib.tests.test__btree_serializer',
3945
3656
'bzrlib.tests.test__chk_map',
3946
3657
'bzrlib.tests.test__dirstate_helpers',
3947
3658
'bzrlib.tests.test__groupcompress',
3975
3686
'bzrlib.tests.test_commit_merge',
3976
3687
'bzrlib.tests.test_config',
3977
3688
'bzrlib.tests.test_conflicts',
3978
'bzrlib.tests.test_controldir',
3979
3689
'bzrlib.tests.test_counted_lock',
3980
3690
'bzrlib.tests.test_crash',
3981
3691
'bzrlib.tests.test_decorators',
3982
3692
'bzrlib.tests.test_delta',
3983
3693
'bzrlib.tests.test_debug',
3694
'bzrlib.tests.test_deprecated_graph',
3984
3695
'bzrlib.tests.test_diff',
3985
3696
'bzrlib.tests.test_directory_service',
3986
3697
'bzrlib.tests.test_dirstate',
3987
3698
'bzrlib.tests.test_email_message',
3988
3699
'bzrlib.tests.test_eol_filters',
3989
3700
'bzrlib.tests.test_errors',
3990
'bzrlib.tests.test_estimate_compressed_size',
3991
3701
'bzrlib.tests.test_export',
3992
'bzrlib.tests.test_export_pot',
3993
3702
'bzrlib.tests.test_extract',
3994
'bzrlib.tests.test_features',
3995
3703
'bzrlib.tests.test_fetch',
3996
'bzrlib.tests.test_fixtures',
3997
3704
'bzrlib.tests.test_fifo_cache',
3998
3705
'bzrlib.tests.test_filters',
3999
'bzrlib.tests.test_filter_tree',
4000
3706
'bzrlib.tests.test_ftp_transport',
4001
3707
'bzrlib.tests.test_foreign',
4002
3708
'bzrlib.tests.test_generate_docs',
4030
3734
'bzrlib.tests.test_lru_cache',
4031
3735
'bzrlib.tests.test_lsprof',
4032
3736
'bzrlib.tests.test_mail_client',
4033
'bzrlib.tests.test_matchers',
4034
3737
'bzrlib.tests.test_memorytree',
4035
3738
'bzrlib.tests.test_merge',
4036
3739
'bzrlib.tests.test_merge3',
4037
3740
'bzrlib.tests.test_merge_core',
4038
3741
'bzrlib.tests.test_merge_directive',
4039
'bzrlib.tests.test_mergetools',
4040
3742
'bzrlib.tests.test_missing',
4041
3743
'bzrlib.tests.test_msgeditor',
4042
3744
'bzrlib.tests.test_multiparent',
4245
3937
# Some tests mentioned in the list are not in the test suite. The
4246
3938
# list may be out of date, report to the tester.
4247
3939
for id in not_found:
4248
trace.warning('"%s" not found in the test suite', id)
3940
bzrlib.trace.warning('"%s" not found in the test suite', id)
4249
3941
for id in duplicates:
4250
trace.warning('"%s" is used as an id by several tests', id)
3942
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
4255
def multiply_scenarios(*scenarios):
4256
"""Multiply two or more iterables of scenarios.
4258
It is safe to pass scenario generators or iterators.
4260
:returns: A list of compound scenarios: the cross-product of all
4261
scenarios, with the names concatenated and the parameters
4264
return reduce(_multiply_two_scenarios, map(list, scenarios))
4267
def _multiply_two_scenarios(scenarios_left, scenarios_right):
3947
def multiply_scenarios(scenarios_left, scenarios_right):
4268
3948
"""Multiply two sets of scenarios.
4270
3950
:returns: the cartesian product of the two sets of scenarios, that is
4434
4101
if test_id != None:
4435
4102
ui.ui_factory.clear_term()
4436
4103
sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4437
# Ugly, but the last thing we want here is fail, so bear with it.
4438
printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4439
).encode('ascii', 'replace')
4440
4104
sys.stderr.write('Unable to remove testing dir %s\n%s'
4441
% (os.path.basename(dirname), printable_e))
4105
% (os.path.basename(dirname), e))
4108
class Feature(object):
4109
"""An operating system Feature."""
4112
self._available = None
4114
def available(self):
4115
"""Is the feature available?
4117
:return: True if the feature is available.
4119
if self._available is None:
4120
self._available = self._probe()
4121
return self._available
4124
"""Implement this method in concrete features.
4126
:return: True if the feature is available.
4128
raise NotImplementedError
4131
if getattr(self, 'feature_name', None):
4132
return self.feature_name()
4133
return self.__class__.__name__
4136
class _SymlinkFeature(Feature):
4139
return osutils.has_symlinks()
4141
def feature_name(self):
4144
SymlinkFeature = _SymlinkFeature()
4147
class _HardlinkFeature(Feature):
4150
return osutils.has_hardlinks()
4152
def feature_name(self):
4155
HardlinkFeature = _HardlinkFeature()
4158
class _OsFifoFeature(Feature):
4161
return getattr(os, 'mkfifo', None)
4163
def feature_name(self):
4164
return 'filesystem fifos'
4166
OsFifoFeature = _OsFifoFeature()
4169
class _UnicodeFilenameFeature(Feature):
4170
"""Does the filesystem support Unicode filenames?"""
4174
# Check for character combinations unlikely to be covered by any
4175
# single non-unicode encoding. We use the characters
4176
# - greek small letter alpha (U+03B1) and
4177
# - braille pattern dots-123456 (U+283F).
4178
os.stat(u'\u03b1\u283f')
4179
except UnicodeEncodeError:
4181
except (IOError, OSError):
4182
# The filesystem allows the Unicode filename but the file doesn't
4186
# The filesystem allows the Unicode filename and the file exists,
4190
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4193
class _CompatabilityThunkFeature(Feature):
4194
"""This feature is just a thunk to another feature.
4196
It issues a deprecation warning if it is accessed, to let you know that you
4197
should really use a different feature.
4200
def __init__(self, dep_version, module, name,
4201
replacement_name, replacement_module=None):
4202
super(_CompatabilityThunkFeature, self).__init__()
4203
self._module = module
4204
if replacement_module is None:
4205
replacement_module = module
4206
self._replacement_module = replacement_module
4208
self._replacement_name = replacement_name
4209
self._dep_version = dep_version
4210
self._feature = None
4213
if self._feature is None:
4214
depr_msg = self._dep_version % ('%s.%s'
4215
% (self._module, self._name))
4216
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4217
self._replacement_name)
4218
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4219
# Import the new feature and use it as a replacement for the
4221
mod = __import__(self._replacement_module, {}, {},
4222
[self._replacement_name])
4223
self._feature = getattr(mod, self._replacement_name)
4227
return self._feature._probe()
4230
class ModuleAvailableFeature(Feature):
4231
"""This is a feature than describes a module we want to be available.
4233
Declare the name of the module in __init__(), and then after probing, the
4234
module will be available as 'self.module'.
4236
:ivar module: The module if it is available, else None.
4239
def __init__(self, module_name):
4240
super(ModuleAvailableFeature, self).__init__()
4241
self.module_name = module_name
4245
self._module = __import__(self.module_name, {}, {}, [''])
4252
if self.available(): # Make sure the probe has been done
4256
def feature_name(self):
4257
return self.module_name
4260
# This is kept here for compatibility, it is recommended to use
4261
# 'bzrlib.tests.feature.paramiko' instead
4262
ParamikoFeature = _CompatabilityThunkFeature(
4263
deprecated_in((2,1,0)),
4264
'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4444
4267
def probe_unicode_in_user_encoding():
4300
class _HTTPSServerFeature(Feature):
4301
"""Some tests want an https Server, check if one is available.
4303
Right now, the only way this is available is under python2.6 which provides
4314
def feature_name(self):
4315
return 'HTTPSServer'
4318
HTTPSServerFeature = _HTTPSServerFeature()
4321
class _UnicodeFilename(Feature):
4322
"""Does the filesystem support Unicode filenames?"""
4327
except UnicodeEncodeError:
4329
except (IOError, OSError):
4330
# The filesystem allows the Unicode filename but the file doesn't
4334
# The filesystem allows the Unicode filename and the file exists,
4338
UnicodeFilename = _UnicodeFilename()
4341
class _UTF8Filesystem(Feature):
4342
"""Is the filesystem UTF-8?"""
4345
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4349
UTF8Filesystem = _UTF8Filesystem()
4352
class _BreakinFeature(Feature):
4353
"""Does this platform support the breakin feature?"""
4356
from bzrlib import breakin
4357
if breakin.determine_signal() is None:
4359
if sys.platform == 'win32':
4360
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4361
# We trigger SIGBREAK via a Console api so we need ctypes to
4362
# access the function
4369
def feature_name(self):
4370
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4373
BreakinFeature = _BreakinFeature()
4376
class _CaseInsCasePresFilenameFeature(Feature):
4377
"""Is the file-system case insensitive, but case-preserving?"""
4380
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4382
# first check truly case-preserving for created files, then check
4383
# case insensitive when opening existing files.
4384
name = osutils.normpath(name)
4385
base, rel = osutils.split(name)
4386
found_rel = osutils.canonical_relpath(base, name)
4387
return (found_rel == rel
4388
and os.path.isfile(name.upper())
4389
and os.path.isfile(name.lower()))
4394
def feature_name(self):
4395
return "case-insensitive case-preserving filesystem"
4397
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4400
class _CaseInsensitiveFilesystemFeature(Feature):
4401
"""Check if underlying filesystem is case-insensitive but *not* case
4404
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4405
# more likely to be case preserving, so this case is rare.
4408
if CaseInsCasePresFilenameFeature.available():
4411
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4412
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4413
TestCaseWithMemoryTransport.TEST_ROOT = root
4415
root = TestCaseWithMemoryTransport.TEST_ROOT
4416
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4418
name_a = osutils.pathjoin(tdir, 'a')
4419
name_A = osutils.pathjoin(tdir, 'A')
4421
result = osutils.isdir(name_A)
4422
_rmtree_temp_dir(tdir)
4425
def feature_name(self):
4426
return 'case-insensitive filesystem'
4428
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4431
class _CaseSensitiveFilesystemFeature(Feature):
4434
if CaseInsCasePresFilenameFeature.available():
4436
elif CaseInsensitiveFilesystemFeature.available():
4441
def feature_name(self):
4442
return 'case-sensitive filesystem'
4444
# new coding style is for feature instances to be lowercase
4445
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4448
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4449
SubUnitFeature = _CompatabilityThunkFeature(
4450
deprecated_in((2,1,0)),
4451
'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4477
4452
# Only define SubUnitBzrRunner if subunit is available.
4479
4454
from subunit import TestProtocolClient
4480
4455
from subunit.test_results import AutoTimingTestResultDecorator
4481
class SubUnitBzrProtocolClient(TestProtocolClient):
4483
def addSuccess(self, test, details=None):
4484
# The subunit client always includes the details in the subunit
4485
# stream, but we don't want to include it in ours.
4486
if details is not None and 'log' in details:
4488
return super(SubUnitBzrProtocolClient, self).addSuccess(
4491
4456
class SubUnitBzrRunner(TextTestRunner):
4492
4457
def run(self, test):
4493
4458
result = AutoTimingTestResultDecorator(
4494
SubUnitBzrProtocolClient(self.stream))
4459
TestProtocolClient(self.stream))
4495
4460
test.run(result)
4497
4462
except ImportError:
4501
@deprecated_function(deprecated_in((2, 5, 0)))
4502
def ModuleAvailableFeature(name):
4503
from bzrlib.tests import features
4504
return features.ModuleAvailableFeature(name)
4465
class _PosixPermissionsFeature(Feature):
4469
# create temporary file and check if specified perms are maintained.
4472
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4473
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4476
os.chmod(name, write_perms)
4478
read_perms = os.stat(name).st_mode & 0777
4480
return (write_perms == read_perms)
4482
return (os.name == 'posix') and has_perms()
4484
def feature_name(self):
4485
return 'POSIX permissions support'
4487
posix_permissions_feature = _PosixPermissionsFeature()