210
218
osutils.set_or_unset_env(var, value)
221
def _clear__type_equality_funcs(test):
222
"""Cleanup bound methods stored on TestCase instances
224
Clear the dict breaking a few (mostly) harmless cycles in the affected
225
unittests released with Python 2.6 and initial Python 2.7 versions.
227
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
228
shipped in Oneiric, an object with no clear method was used, hence the
229
extra complications, see bug 809048 for details.
231
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
232
if type_equality_funcs is not None:
233
tef_clear = getattr(type_equality_funcs, "clear", None)
234
if tef_clear is None:
235
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
236
if tef_instance_dict is not None:
237
tef_clear = tef_instance_dict.clear
238
if tef_clear is not None:
213
242
class ExtendedTestResult(testtools.TextTestResult):
214
243
"""Accepts, reports and accumulates the results of running tests.
970
1001
def setUp(self):
971
1002
super(TestCase, self).setUp()
1004
# At this point we're still accessing the config files in $BZR_HOME (as
1005
# set by the user running selftest).
1006
timeout = config.GlobalStack().get('selftest.timeout')
1008
timeout_fixture = fixtures.TimeoutFixture(timeout)
1009
timeout_fixture.setUp()
1010
self.addCleanup(timeout_fixture.cleanUp)
972
1012
for feature in getattr(self, '_test_needs_features', []):
973
1013
self.requireFeature(feature)
974
1014
self._cleanEnvironment()
1016
if bzrlib.global_state is not None:
1017
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1018
config.CommandLineStore())
975
1020
self._silenceUI()
976
1021
self._startLogFile()
977
1022
self._benchcalls = []
984
1029
# between tests. We should get rid of this altogether: bug 656694. --
986
1031
self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
987
# Isolate config option expansion until its default value for bzrlib is
988
# settled on or a the FIXME associated with _get_expand_default_value
989
# is addressed -- vila 20110219
990
self.overrideAttr(config, '_expand_default_value', None)
991
1032
self._log_files = set()
1033
# Each key in the ``_counters`` dict holds a value for a different
1034
# counter. When the test ends, addDetail() should be used to output the
1035
# counter values. This happens in install_counter_hook().
1037
if 'config_stats' in selftest_debug_flags:
1038
self._install_config_stats_hooks()
1039
# Do not use i18n for tests (unless the test reverses this)
993
1042
def debug(self):
994
1043
# debug a frame up.
996
pdb.Pdb().set_trace(sys._getframe().f_back)
1045
# The sys preserved stdin/stdout should allow blackbox tests debugging
1046
pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1047
).set_trace(sys._getframe().f_back)
998
1049
def discardDetail(self, name):
999
1050
"""Extend the addDetail, getDetails api so we can remove a detail.
1011
1062
if name in details:
1012
1063
del details[name]
1065
def install_counter_hook(self, hooks, name, counter_name=None):
1066
"""Install a counting hook.
1068
Any hook can be counted as long as it doesn't need to return a value.
1070
:param hooks: Where the hook should be installed.
1072
:param name: The hook name that will be counted.
1074
:param counter_name: The counter identifier in ``_counters``, defaults
1077
_counters = self._counters # Avoid closing over self
1078
if counter_name is None:
1080
if _counters.has_key(counter_name):
1081
raise AssertionError('%s is already used as a counter name'
1083
_counters[counter_name] = 0
1084
self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1085
lambda: ['%d' % (_counters[counter_name],)]))
1086
def increment_counter(*args, **kwargs):
1087
_counters[counter_name] += 1
1088
label = 'count %s calls' % (counter_name,)
1089
hooks.install_named_hook(name, increment_counter, label)
1090
self.addCleanup(hooks.uninstall_named_hook, name, label)
1092
def _install_config_stats_hooks(self):
1093
"""Install config hooks to count hook calls.
1096
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1097
self.install_counter_hook(config.ConfigHooks, hook_name,
1098
'config.%s' % (hook_name,))
1100
# The OldConfigHooks are private and need special handling to protect
1101
# against recursive tests (tests that run other tests), so we just do
1102
# manually what registering them into _builtin_known_hooks will provide
1104
self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1105
for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1106
self.install_counter_hook(config.OldConfigHooks, hook_name,
1107
'old_config.%s' % (hook_name,))
1014
1109
def _clear_debug_flags(self):
1015
1110
"""Prevent externally set debug flags affecting tests.
1070
1165
# break some locks on purpose and should be taken into account by
1071
1166
# considering that breaking a lock is just a dirty way of releasing it.
1072
1167
if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1073
message = ('Different number of acquired and '
1074
'released or broken locks. (%s, %s + %s)' %
1075
(acquired_locks, released_locks, broken_locks))
1169
'Different number of acquired and '
1170
'released or broken locks.\n'
1174
(acquired_locks, released_locks, broken_locks))
1076
1175
if not self._lock_check_thorough:
1077
1176
# Rather than fail, just warn
1078
1177
print "Broken test %s: %s" % (self, message)
1683
1781
:returns: The actual attr value.
1685
value = getattr(obj, attr_name)
1686
1783
# The actual value is captured by the call below
1687
self.addCleanup(setattr, obj, attr_name, value)
1784
value = getattr(obj, attr_name, _unitialized_attr)
1785
if value is _unitialized_attr:
1786
# When the test completes, the attribute should not exist, but if
1787
# we aren't setting a value, we don't need to do anything.
1788
if new is not _unitialized_attr:
1789
self.addCleanup(delattr, obj, attr_name)
1791
self.addCleanup(setattr, obj, attr_name, value)
1688
1792
if new is not _unitialized_attr:
1689
1793
setattr(obj, attr_name, new)
1703
1807
self.addCleanup(osutils.set_or_unset_env, name, value)
1810
def recordCalls(self, obj, attr_name):
1811
"""Monkeypatch in a wrapper that will record calls.
1813
The monkeypatch is automatically removed when the test concludes.
1815
:param obj: The namespace holding the reference to be replaced;
1816
typically a module, class, or object.
1817
:param attr_name: A string for the name of the attribute to
1819
:returns: A list that will be extended with one item every time the
1820
function is called, with a tuple of (args, kwargs).
1824
def decorator(*args, **kwargs):
1825
calls.append((args, kwargs))
1826
return orig(*args, **kwargs)
1827
orig = self.overrideAttr(obj, attr_name, decorator)
1706
1830
def _cleanEnvironment(self):
1707
1831
for name, value in isolated_environ.iteritems():
1708
1832
self.overrideEnv(name, value)
1715
1839
self._preserved_lazy_hooks.clear()
1717
1841
def knownFailure(self, reason):
1718
"""This test has failed for some known reason."""
1719
raise KnownFailure(reason)
1842
"""Declare that this test fails for a known reason
1844
Tests that are known to fail should generally be using expectedFailure
1845
with an appropriate reverse assertion if a change could cause the test
1846
to start passing. Conversely if the test has no immediate prospect of
1847
succeeding then using skip is more suitable.
1849
When this method is called while an exception is being handled, that
1850
traceback will be used, otherwise a new exception will be thrown to
1851
provide one but won't be reported.
1853
self._add_reason(reason)
1855
exc_info = sys.exc_info()
1856
if exc_info != (None, None, None):
1857
self._report_traceback(exc_info)
1860
raise self.failureException(reason)
1861
except self.failureException:
1862
exc_info = sys.exc_info()
1863
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1864
raise testtools.testcase._ExpectedFailure(exc_info)
1721
1868
def _suppress_log(self):
1722
1869
"""Remove the log info from details."""
2271
2425
class TestCaseWithMemoryTransport(TestCase):
2272
2426
"""Common test class for tests that do not need disk resources.
2274
Tests that need disk resources should derive from TestCaseWithTransport.
2428
Tests that need disk resources should derive from TestCaseInTempDir
2429
orTestCaseWithTransport.
2276
2431
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2278
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2433
For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2279
2434
a directory which does not exist. This serves to help ensure test isolation
2280
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2281
must exist. However, TestCaseWithMemoryTransport does not offer local
2282
file defaults for the transport in tests, nor does it obey the command line
2435
is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2436
must exist. However, TestCaseWithMemoryTransport does not offer local file
2437
defaults for the transport in tests, nor does it obey the command line
2283
2438
override, so tests that accidentally write to the common directory should
2286
:cvar TEST_ROOT: Directory containing all temporary directories, plus
2287
a .bzr directory that stops us ascending higher into the filesystem.
2441
:cvar TEST_ROOT: Directory containing all temporary directories, plus a
2442
``.bzr`` directory that stops us ascending higher into the filesystem.
2290
2445
TEST_ROOT = None
2300
2455
self.transport_readonly_server = None
2301
2456
self.__vfs_server = None
2459
super(TestCaseWithMemoryTransport, self).setUp()
2461
def _add_disconnect_cleanup(transport):
2462
"""Schedule disconnection of given transport at test cleanup
2464
This needs to happen for all connected transports or leaks occur.
2466
Note reconnections may mean we call disconnect multiple times per
2467
transport which is suboptimal but seems harmless.
2469
self.addCleanup(transport.disconnect)
2471
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2472
_add_disconnect_cleanup, None)
2474
self._make_test_root()
2475
self.addCleanup(os.chdir, os.getcwdu())
2476
self.makeAndChdirToTestDir()
2477
self.overrideEnvironmentForTesting()
2478
self.__readonly_server = None
2479
self.__server = None
2480
self.reduceLockdirTimeout()
2481
# Each test may use its own config files even if the local config files
2482
# don't actually exist. They'll rightly fail if they try to create them
2484
self.overrideAttr(config, '_shared_stores', {})
2303
2486
def get_transport(self, relpath=None):
2304
2487
"""Return a writeable transport.
2449
2633
root = TestCaseWithMemoryTransport.TEST_ROOT
2450
bzrdir.BzrDir.create_standalone_workingtree(root)
2635
# Make sure we get a readable and accessible home for .bzr.log
2636
# and/or config files, and not fallback to weird defaults (see
2637
# http://pad.lv/825027).
2638
self.assertIs(None, os.environ.get('BZR_HOME', None))
2639
os.environ['BZR_HOME'] = root
2640
wt = controldir.ControlDir.create_standalone_workingtree(root)
2641
del os.environ['BZR_HOME']
2642
except Exception, e:
2643
self.fail("Fail to initialize the safety net: %r\n" % (e,))
2644
# Hack for speed: remember the raw bytes of the dirstate file so that
2645
# we don't need to re-open the wt to check it hasn't changed.
2646
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2647
wt.control_transport.get_bytes('dirstate'))
2452
2649
def _check_safety_net(self):
2453
2650
"""Check that the safety .bzr directory have not been touched.
2497
2694
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2498
2695
self.permit_dir(self.test_dir)
2500
def make_branch(self, relpath, format=None):
2697
def make_branch(self, relpath, format=None, name=None):
2501
2698
"""Create a branch on the transport at relpath."""
2502
2699
repo = self.make_repository(relpath, format=format)
2503
return repo.bzrdir.create_branch()
2700
return repo.bzrdir.create_branch(append_revisions_only=False,
2703
def get_default_format(self):
2706
def resolve_format(self, format):
2707
"""Resolve an object to a ControlDir format object.
2709
The initial format object can either already be
2710
a ControlDirFormat, None (for the default format),
2711
or a string with the name of the control dir format.
2713
:param format: Object to resolve
2714
:return A ControlDirFormat instance
2717
format = self.get_default_format()
2718
if isinstance(format, basestring):
2719
format = controldir.format_registry.make_bzrdir(format)
2505
2722
def make_bzrdir(self, relpath, format=None):
2510
2727
t = _mod_transport.get_transport(maybe_a_url)
2511
2728
if len(segments) > 1 and segments[-1] not in ('', '.'):
2512
2729
t.ensure_base()
2515
if isinstance(format, basestring):
2516
format = bzrdir.format_registry.make_bzrdir(format)
2730
format = self.resolve_format(format)
2517
2731
return format.initialize_on_transport(t)
2518
2732
except errors.UninitializableFormat:
2519
2733
raise TestSkipped("Format %s is not initializable." % format)
2521
def make_repository(self, relpath, shared=False, format=None):
2735
def make_repository(self, relpath, shared=None, format=None):
2522
2736
"""Create a repository on our default transport at relpath.
2524
2738
Note that relpath must be a relative path, not a full url.
2555
2769
self.overrideEnv('HOME', test_home_dir)
2556
2770
self.overrideEnv('BZR_HOME', test_home_dir)
2559
super(TestCaseWithMemoryTransport, self).setUp()
2560
# Ensure that ConnectedTransport doesn't leak sockets
2561
def get_transport_with_cleanup(*args, **kwargs):
2562
t = orig_get_transport(*args, **kwargs)
2563
if isinstance(t, _mod_transport.ConnectedTransport):
2564
self.addCleanup(t.disconnect)
2567
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2568
get_transport_with_cleanup)
2569
self._make_test_root()
2570
self.addCleanup(os.chdir, os.getcwdu())
2571
self.makeAndChdirToTestDir()
2572
self.overrideEnvironmentForTesting()
2573
self.__readonly_server = None
2574
self.__server = None
2575
self.reduceLockdirTimeout()
2577
2772
def setup_smart_server_with_call_log(self):
2578
2773
"""Sets up a smart server as the transport server with a call log."""
2579
2774
self.transport_server = test_server.SmartTCPServer_for_testing
2775
self.hpss_connections = []
2580
2776
self.hpss_calls = []
2581
2777
import traceback
2582
2778
# Skip the current stack down to the caller of
3199
3414
class TestDecorator(TestUtil.TestSuite):
3200
3415
"""A decorator for TestCase/TestSuite objects.
3202
Usually, subclasses should override __iter__(used when flattening test
3203
suites), which we do to filter, reorder, parallelise and so on, run() and
3417
Contains rather than flattening suite passed on construction
3207
def __init__(self, suite):
3208
TestUtil.TestSuite.__init__(self)
3211
def countTestCases(self):
3214
cases += test.countTestCases()
3221
def run(self, result):
3222
# Use iteration on self, not self._tests, to allow subclasses to hook
3225
if result.shouldStop:
3420
def __init__(self, suite=None):
3421
super(TestDecorator, self).__init__()
3422
if suite is not None:
3425
# Don't need subclass run method with suite emptying
3426
run = unittest.TestSuite.run
3231
3429
class CountingDecorator(TestDecorator):
3242
3440
"""A decorator which excludes test matching an exclude pattern."""
3244
3442
def __init__(self, suite, exclude_pattern):
3245
TestDecorator.__init__(self, suite)
3246
self.exclude_pattern = exclude_pattern
3247
self.excluded = False
3251
return iter(self._tests)
3252
self.excluded = True
3253
suite = exclude_tests_by_re(self, self.exclude_pattern)
3255
self.addTests(suite)
3256
return iter(self._tests)
3443
super(ExcludeDecorator, self).__init__(
3444
exclude_tests_by_re(suite, exclude_pattern))
3259
3447
class FilterTestsDecorator(TestDecorator):
3260
3448
"""A decorator which filters tests to those matching a pattern."""
3262
3450
def __init__(self, suite, pattern):
3263
TestDecorator.__init__(self, suite)
3264
self.pattern = pattern
3265
self.filtered = False
3269
return iter(self._tests)
3270
self.filtered = True
3271
suite = filter_suite_by_re(self, self.pattern)
3273
self.addTests(suite)
3274
return iter(self._tests)
3451
super(FilterTestsDecorator, self).__init__(
3452
filter_suite_by_re(suite, pattern))
3277
3455
class RandomDecorator(TestDecorator):
3278
3456
"""A decorator which randomises the order of its tests."""
3280
3458
def __init__(self, suite, random_seed, stream):
3281
TestDecorator.__init__(self, suite)
3282
self.random_seed = random_seed
3283
self.randomised = False
3284
self.stream = stream
3288
return iter(self._tests)
3289
self.randomised = True
3290
self.stream.write("Randomizing test order using seed %s\n\n" %
3291
(self.actual_seed()))
3459
random_seed = self.actual_seed(random_seed)
3460
stream.write("Randomizing test order using seed %s\n\n" %
3292
3462
# Initialise the random number generator.
3293
random.seed(self.actual_seed())
3294
suite = randomize_suite(self)
3296
self.addTests(suite)
3297
return iter(self._tests)
3463
random.seed(random_seed)
3464
super(RandomDecorator, self).__init__(randomize_suite(suite))
3299
def actual_seed(self):
3300
if self.random_seed == "now":
3467
def actual_seed(seed):
3301
3469
# We convert the seed to a long to make it reuseable across
3302
3470
# invocations (because the user can reenter it).
3303
self.random_seed = long(time.time())
3471
return long(time.time())
3305
3473
# Convert the seed to a long if we can
3307
self.random_seed = long(self.random_seed)
3476
except (TypeError, ValueError):
3310
return self.random_seed
3313
3481
class TestFirstDecorator(TestDecorator):
3314
3482
"""A decorator which moves named tests to the front."""
3316
3484
def __init__(self, suite, pattern):
3317
TestDecorator.__init__(self, suite)
3318
self.pattern = pattern
3319
self.filtered = False
3323
return iter(self._tests)
3324
self.filtered = True
3325
suites = split_suite_by_re(self, self.pattern)
3327
self.addTests(suites)
3328
return iter(self._tests)
3485
super(TestFirstDecorator, self).__init__()
3486
self.addTests(split_suite_by_re(suite, pattern))
3331
3489
def partition_tests(suite, count):
3376
3534
ProtocolTestCase.run(self, result)
3378
os.waitpid(self.pid, 0)
3536
pid, status = os.waitpid(self.pid, 0)
3537
# GZ 2011-10-18: If status is nonzero, should report to the result
3538
# that something went wrong.
3380
3540
test_blocks = partition_tests(suite, concurrency)
3541
# Clear the tests from the original suite so it doesn't keep them alive
3542
suite._tests[:] = []
3381
3543
for process_tests in test_blocks:
3382
process_suite = TestUtil.TestSuite()
3383
process_suite.addTests(process_tests)
3544
process_suite = TestUtil.TestSuite(process_tests)
3545
# Also clear each split list so new suite has only reference
3546
process_tests[:] = []
3384
3547
c2pread, c2pwrite = os.pipe()
3385
3548
pid = os.fork()
3387
workaround_zealous_crypto_random()
3551
stream = os.fdopen(c2pwrite, 'wb', 1)
3552
workaround_zealous_crypto_random()
3389
3553
os.close(c2pread)
3390
3554
# Leave stderr and stdout open so we can see test noise
3391
3555
# Close stdin so that the child goes away if it decides to
3392
3556
# read from stdin (otherwise its a roulette to see what
3393
3557
# child actually gets keystrokes for pdb etc).
3394
3558
sys.stdin.close()
3396
stream = os.fdopen(c2pwrite, 'wb', 1)
3397
3559
subunit_result = AutoTimingTestResultDecorator(
3398
TestProtocolClient(stream))
3560
SubUnitBzrProtocolClient(stream))
3399
3561
process_suite.run(subunit_result)
3563
# Try and report traceback on stream, but exit with error even
3564
# if stream couldn't be created or something else goes wrong.
3565
# The traceback is formatted to a string and written in one go
3566
# to avoid interleaving lines from multiple failing children.
3568
stream.write(traceback.format_exc())
3403
3573
os.close(c2pwrite)
3404
3574
stream = os.fdopen(c2pread, 'rb', 1)
3819
3991
'bzrlib.tests.test_email_message',
3820
3992
'bzrlib.tests.test_eol_filters',
3821
3993
'bzrlib.tests.test_errors',
3994
'bzrlib.tests.test_estimate_compressed_size',
3822
3995
'bzrlib.tests.test_export',
3823
3996
'bzrlib.tests.test_export_pot',
3824
3997
'bzrlib.tests.test_extract',
3998
'bzrlib.tests.test_features',
3825
3999
'bzrlib.tests.test_fetch',
3826
4000
'bzrlib.tests.test_fixtures',
3827
4001
'bzrlib.tests.test_fifo_cache',
3828
4002
'bzrlib.tests.test_filters',
4003
'bzrlib.tests.test_filter_tree',
3829
4004
'bzrlib.tests.test_ftp_transport',
3830
4005
'bzrlib.tests.test_foreign',
3831
4006
'bzrlib.tests.test_generate_docs',
4268
4448
% (os.path.basename(dirname), printable_e))
4271
class Feature(object):
4272
"""An operating system Feature."""
4275
self._available = None
4277
def available(self):
4278
"""Is the feature available?
4280
:return: True if the feature is available.
4282
if self._available is None:
4283
self._available = self._probe()
4284
return self._available
4287
"""Implement this method in concrete features.
4289
:return: True if the feature is available.
4291
raise NotImplementedError
4294
if getattr(self, 'feature_name', None):
4295
return self.feature_name()
4296
return self.__class__.__name__
4299
class _SymlinkFeature(Feature):
4302
return osutils.has_symlinks()
4304
def feature_name(self):
4307
SymlinkFeature = _SymlinkFeature()
4310
class _HardlinkFeature(Feature):
4313
return osutils.has_hardlinks()
4315
def feature_name(self):
4318
HardlinkFeature = _HardlinkFeature()
4321
class _OsFifoFeature(Feature):
4324
return getattr(os, 'mkfifo', None)
4326
def feature_name(self):
4327
return 'filesystem fifos'
4329
OsFifoFeature = _OsFifoFeature()
4332
class _UnicodeFilenameFeature(Feature):
4333
"""Does the filesystem support Unicode filenames?"""
4337
# Check for character combinations unlikely to be covered by any
4338
# single non-unicode encoding. We use the characters
4339
# - greek small letter alpha (U+03B1) and
4340
# - braille pattern dots-123456 (U+283F).
4341
os.stat(u'\u03b1\u283f')
4342
except UnicodeEncodeError:
4344
except (IOError, OSError):
4345
# The filesystem allows the Unicode filename but the file doesn't
4349
# The filesystem allows the Unicode filename and the file exists,
4353
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4356
class _CompatabilityThunkFeature(Feature):
4357
"""This feature is just a thunk to another feature.
4359
It issues a deprecation warning if it is accessed, to let you know that you
4360
should really use a different feature.
4363
def __init__(self, dep_version, module, name,
4364
replacement_name, replacement_module=None):
4365
super(_CompatabilityThunkFeature, self).__init__()
4366
self._module = module
4367
if replacement_module is None:
4368
replacement_module = module
4369
self._replacement_module = replacement_module
4371
self._replacement_name = replacement_name
4372
self._dep_version = dep_version
4373
self._feature = None
4376
if self._feature is None:
4377
depr_msg = self._dep_version % ('%s.%s'
4378
% (self._module, self._name))
4379
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4380
self._replacement_name)
4381
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4382
# Import the new feature and use it as a replacement for the
4384
self._feature = pyutils.get_named_object(
4385
self._replacement_module, self._replacement_name)
4389
return self._feature._probe()
4392
class ModuleAvailableFeature(Feature):
4393
"""This is a feature than describes a module we want to be available.
4395
Declare the name of the module in __init__(), and then after probing, the
4396
module will be available as 'self.module'.
4398
:ivar module: The module if it is available, else None.
4401
def __init__(self, module_name):
4402
super(ModuleAvailableFeature, self).__init__()
4403
self.module_name = module_name
4407
self._module = __import__(self.module_name, {}, {}, [''])
4414
if self.available(): # Make sure the probe has been done
4418
def feature_name(self):
4419
return self.module_name
4422
4451
def probe_unicode_in_user_encoding():
4423
4452
"""Try to encode several unicode strings to use in unicode-aware tests.
4424
4453
Return first successfull match.
4455
class _HTTPSServerFeature(Feature):
4456
"""Some tests want an https Server, check if one is available.
4458
Right now, the only way this is available is under python2.6 which provides
4469
def feature_name(self):
4470
return 'HTTPSServer'
4473
HTTPSServerFeature = _HTTPSServerFeature()
4476
class _UnicodeFilename(Feature):
4477
"""Does the filesystem support Unicode filenames?"""
4482
except UnicodeEncodeError:
4484
except (IOError, OSError):
4485
# The filesystem allows the Unicode filename but the file doesn't
4489
# The filesystem allows the Unicode filename and the file exists,
4493
UnicodeFilename = _UnicodeFilename()
4496
class _ByteStringNamedFilesystem(Feature):
4497
"""Is the filesystem based on bytes?"""
4500
if os.name == "posix":
4504
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4507
class _UTF8Filesystem(Feature):
4508
"""Is the filesystem UTF-8?"""
4511
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4515
UTF8Filesystem = _UTF8Filesystem()
4518
class _BreakinFeature(Feature):
4519
"""Does this platform support the breakin feature?"""
4522
from bzrlib import breakin
4523
if breakin.determine_signal() is None:
4525
if sys.platform == 'win32':
4526
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4527
# We trigger SIGBREAK via a Console api so we need ctypes to
4528
# access the function
4535
def feature_name(self):
4536
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4539
BreakinFeature = _BreakinFeature()
4542
class _CaseInsCasePresFilenameFeature(Feature):
4543
"""Is the file-system case insensitive, but case-preserving?"""
4546
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4548
# first check truly case-preserving for created files, then check
4549
# case insensitive when opening existing files.
4550
name = osutils.normpath(name)
4551
base, rel = osutils.split(name)
4552
found_rel = osutils.canonical_relpath(base, name)
4553
return (found_rel == rel
4554
and os.path.isfile(name.upper())
4555
and os.path.isfile(name.lower()))
4560
def feature_name(self):
4561
return "case-insensitive case-preserving filesystem"
4563
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4566
class _CaseInsensitiveFilesystemFeature(Feature):
4567
"""Check if underlying filesystem is case-insensitive but *not* case
4570
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4571
# more likely to be case preserving, so this case is rare.
4574
if CaseInsCasePresFilenameFeature.available():
4577
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4578
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4579
TestCaseWithMemoryTransport.TEST_ROOT = root
4581
root = TestCaseWithMemoryTransport.TEST_ROOT
4582
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4584
name_a = osutils.pathjoin(tdir, 'a')
4585
name_A = osutils.pathjoin(tdir, 'A')
4587
result = osutils.isdir(name_A)
4588
_rmtree_temp_dir(tdir)
4591
def feature_name(self):
4592
return 'case-insensitive filesystem'
4594
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4597
class _CaseSensitiveFilesystemFeature(Feature):
4600
if CaseInsCasePresFilenameFeature.available():
4602
elif CaseInsensitiveFilesystemFeature.available():
4607
def feature_name(self):
4608
return 'case-sensitive filesystem'
4610
# new coding style is for feature instances to be lowercase
4611
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4614
4484
# Only define SubUnitBzrRunner if subunit is available.
4616
4486
from subunit import TestProtocolClient
4617
4487
from subunit.test_results import AutoTimingTestResultDecorator
4618
4488
class SubUnitBzrProtocolClient(TestProtocolClient):
4490
def stopTest(self, test):
4491
super(SubUnitBzrProtocolClient, self).stopTest(test)
4492
_clear__type_equality_funcs(test)
4620
4494
def addSuccess(self, test, details=None):
4621
4495
# The subunit client always includes the details in the subunit
4622
4496
# stream, but we don't want to include it in ours.
4634
4508
except ImportError:
4637
class _PosixPermissionsFeature(Feature):
4641
# create temporary file and check if specified perms are maintained.
4644
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4645
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4648
os.chmod(name, write_perms)
4650
read_perms = os.stat(name).st_mode & 0777
4652
return (write_perms == read_perms)
4654
return (os.name == 'posix') and has_perms()
4656
def feature_name(self):
4657
return 'POSIX permissions support'
4659
posix_permissions_feature = _PosixPermissionsFeature()
4512
# API compatibility for old plugins; see bug 892622.
4515
'HTTPServerFeature',
4516
'ModuleAvailableFeature',
4517
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4518
'OsFifoFeature', 'UnicodeFilenameFeature',
4519
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4520
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4521
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4522
'posix_permissions_feature',
4524
globals()[name] = _CompatabilityThunkFeature(
4525
symbol_versioning.deprecated_in((2, 5, 0)),
4526
'bzrlib.tests', name,
4527
name, 'bzrlib.tests.features')
4530
for (old_name, new_name) in [
4531
('UnicodeFilename', 'UnicodeFilenameFeature'),
4533
globals()[name] = _CompatabilityThunkFeature(
4534
symbol_versioning.deprecated_in((2, 5, 0)),
4535
'bzrlib.tests', old_name,
4536
new_name, 'bzrlib.tests.features')