213
219
osutils.set_or_unset_env(var, value)
222
def _clear__type_equality_funcs(test):
223
"""Cleanup bound methods stored on TestCase instances
225
Clear the dict breaking a few (mostly) harmless cycles in the affected
226
unittests released with Python 2.6 and initial Python 2.7 versions.
228
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
229
shipped in Oneiric, an object with no clear method was used, hence the
230
extra complications, see bug 809048 for details.
232
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
233
if type_equality_funcs is not None:
234
tef_clear = getattr(type_equality_funcs, "clear", None)
235
if tef_clear is None:
236
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
237
if tef_instance_dict is not None:
238
tef_clear = tef_instance_dict.clear
239
if tef_clear is not None:
216
243
class ExtendedTestResult(testtools.TextTestResult):
217
244
"""Accepts, reports and accumulates the results of running tests.
333
360
return float(''.join(details['benchtime'].iter_bytes()))
334
361
return getattr(testCase, "_benchtime", None)
363
def _delta_to_float(self, a_timedelta, precision):
364
# This calls ceiling to ensure that the most pessimistic view of time
365
# taken is shown (rather than leaving it to the Python %f operator
366
# to decide whether to round/floor/ceiling. This was added when we
367
# had pyp3 test failures that suggest a floor was happening.
368
shift = 10 ** precision
369
return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
370
a_timedelta.microseconds / 1000000.0) * shift) / shift
336
372
def _elapsedTestTimeString(self):
337
373
"""Return a time string for the overall time the current test has taken."""
338
374
return self._formatTime(self._delta_to_float(
339
self._now() - self._start_datetime))
375
self._now() - self._start_datetime, 3))
341
377
def _testTimeString(self, testCase):
342
378
benchmark_time = self._extractBenchmarkTime(testCase)
973
1011
def setUp(self):
974
1012
super(TestCase, self).setUp()
1014
# At this point we're still accessing the config files in $BZR_HOME (as
1015
# set by the user running selftest).
1016
timeout = config.GlobalStack().get('selftest.timeout')
1018
timeout_fixture = fixtures.TimeoutFixture(timeout)
1019
timeout_fixture.setUp()
1020
self.addCleanup(timeout_fixture.cleanUp)
975
1022
for feature in getattr(self, '_test_needs_features', []):
976
1023
self.requireFeature(feature)
977
1024
self._cleanEnvironment()
1026
if bzrlib.global_state is not None:
1027
self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1028
config.CommandLineStore())
978
1030
self._silenceUI()
979
1031
self._startLogFile()
980
1032
self._benchcalls = []
1740
1793
:returns: The actual attr value.
1742
value = getattr(obj, attr_name)
1743
1795
# The actual value is captured by the call below
1744
self.addCleanup(setattr, obj, attr_name, value)
1796
value = getattr(obj, attr_name, _unitialized_attr)
1797
if value is _unitialized_attr:
1798
# When the test completes, the attribute should not exist, but if
1799
# we aren't setting a value, we don't need to do anything.
1800
if new is not _unitialized_attr:
1801
self.addCleanup(delattr, obj, attr_name)
1803
self.addCleanup(setattr, obj, attr_name, value)
1745
1804
if new is not _unitialized_attr:
1746
1805
setattr(obj, attr_name, new)
1760
1819
self.addCleanup(osutils.set_or_unset_env, name, value)
1822
def recordCalls(self, obj, attr_name):
1823
"""Monkeypatch in a wrapper that will record calls.
1825
The monkeypatch is automatically removed when the test concludes.
1827
:param obj: The namespace holding the reference to be replaced;
1828
typically a module, class, or object.
1829
:param attr_name: A string for the name of the attribute to
1831
:returns: A list that will be extended with one item every time the
1832
function is called, with a tuple of (args, kwargs).
1836
def decorator(*args, **kwargs):
1837
calls.append((args, kwargs))
1838
return orig(*args, **kwargs)
1839
orig = self.overrideAttr(obj, attr_name, decorator)
1763
1842
def _cleanEnvironment(self):
1764
1843
for name, value in isolated_environ.iteritems():
1765
1844
self.overrideEnv(name, value)
1772
1851
self._preserved_lazy_hooks.clear()
1774
1853
def knownFailure(self, reason):
1775
"""This test has failed for some known reason."""
1776
raise KnownFailure(reason)
1854
"""Declare that this test fails for a known reason
1856
Tests that are known to fail should generally be using expectedFailure
1857
with an appropriate reverse assertion if a change could cause the test
1858
to start passing. Conversely if the test has no immediate prospect of
1859
succeeding then using skip is more suitable.
1861
When this method is called while an exception is being handled, that
1862
traceback will be used, otherwise a new exception will be thrown to
1863
provide one but won't be reported.
1865
self._add_reason(reason)
1867
exc_info = sys.exc_info()
1868
if exc_info != (None, None, None):
1869
self._report_traceback(exc_info)
1872
raise self.failureException(reason)
1873
except self.failureException:
1874
exc_info = sys.exc_info()
1875
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1876
raise testtools.testcase._ExpectedFailure(exc_info)
1778
1880
def _suppress_log(self):
1779
1881
"""Remove the log info from details."""
2358
2467
self.transport_readonly_server = None
2359
2468
self.__vfs_server = None
2471
super(TestCaseWithMemoryTransport, self).setUp()
2473
def _add_disconnect_cleanup(transport):
2474
"""Schedule disconnection of given transport at test cleanup
2476
This needs to happen for all connected transports or leaks occur.
2478
Note reconnections may mean we call disconnect multiple times per
2479
transport which is suboptimal but seems harmless.
2481
self.addCleanup(transport.disconnect)
2483
_mod_transport.Transport.hooks.install_named_hook('post_connect',
2484
_add_disconnect_cleanup, None)
2486
self._make_test_root()
2487
self.addCleanup(os.chdir, os.getcwdu())
2488
self.makeAndChdirToTestDir()
2489
self.overrideEnvironmentForTesting()
2490
self.__readonly_server = None
2491
self.__server = None
2492
self.reduceLockdirTimeout()
2493
# Each test may use its own config files even if the local config files
2494
# don't actually exist. They'll rightly fail if they try to create them
2496
self.overrideAttr(config, '_shared_stores', {})
2361
2498
def get_transport(self, relpath=None):
2362
2499
"""Return a writeable transport.
2507
2645
root = TestCaseWithMemoryTransport.TEST_ROOT
2508
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2647
# Make sure we get a readable and accessible home for .bzr.log
2648
# and/or config files, and not fallback to weird defaults (see
2649
# http://pad.lv/825027).
2650
self.assertIs(None, os.environ.get('BZR_HOME', None))
2651
os.environ['BZR_HOME'] = root
2652
wt = controldir.ControlDir.create_standalone_workingtree(root)
2653
del os.environ['BZR_HOME']
2654
except Exception, e:
2655
self.fail("Fail to initialize the safety net: %r\n" % (e,))
2509
2656
# Hack for speed: remember the raw bytes of the dirstate file so that
2510
2657
# we don't need to re-open the wt to check it hasn't changed.
2511
2658
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2559
2706
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2560
2707
self.permit_dir(self.test_dir)
2562
def make_branch(self, relpath, format=None):
2709
def make_branch(self, relpath, format=None, name=None):
2563
2710
"""Create a branch on the transport at relpath."""
2564
2711
repo = self.make_repository(relpath, format=format)
2565
return repo.bzrdir.create_branch()
2712
return repo.bzrdir.create_branch(append_revisions_only=False,
2715
def get_default_format(self):
2718
def resolve_format(self, format):
2719
"""Resolve an object to a ControlDir format object.
2721
The initial format object can either already be
2722
a ControlDirFormat, None (for the default format),
2723
or a string with the name of the control dir format.
2725
:param format: Object to resolve
2726
:return A ControlDirFormat instance
2729
format = self.get_default_format()
2730
if isinstance(format, basestring):
2731
format = controldir.format_registry.make_bzrdir(format)
2567
2734
def make_bzrdir(self, relpath, format=None):
2572
2739
t = _mod_transport.get_transport(maybe_a_url)
2573
2740
if len(segments) > 1 and segments[-1] not in ('', '.'):
2574
2741
t.ensure_base()
2577
if isinstance(format, basestring):
2578
format = bzrdir.format_registry.make_bzrdir(format)
2742
format = self.resolve_format(format)
2579
2743
return format.initialize_on_transport(t)
2580
2744
except errors.UninitializableFormat:
2581
2745
raise TestSkipped("Format %s is not initializable." % format)
2583
def make_repository(self, relpath, shared=False, format=None):
2747
def make_repository(self, relpath, shared=None, format=None):
2584
2748
"""Create a repository on our default transport at relpath.
2586
2750
Note that relpath must be a relative path, not a full url.
2617
2781
self.overrideEnv('HOME', test_home_dir)
2618
2782
self.overrideEnv('BZR_HOME', test_home_dir)
2621
super(TestCaseWithMemoryTransport, self).setUp()
2622
# Ensure that ConnectedTransport doesn't leak sockets
2623
def get_transport_with_cleanup(*args, **kwargs):
2624
t = orig_get_transport(*args, **kwargs)
2625
if isinstance(t, _mod_transport.ConnectedTransport):
2626
self.addCleanup(t.disconnect)
2629
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2630
get_transport_with_cleanup)
2631
self._make_test_root()
2632
self.addCleanup(os.chdir, os.getcwdu())
2633
self.makeAndChdirToTestDir()
2634
self.overrideEnvironmentForTesting()
2635
self.__readonly_server = None
2636
self.__server = None
2637
self.reduceLockdirTimeout()
2639
2784
def setup_smart_server_with_call_log(self):
2640
2785
"""Sets up a smart server as the transport server with a call log."""
2641
2786
self.transport_server = test_server.SmartTCPServer_for_testing
2787
self.hpss_connections = []
2642
2788
self.hpss_calls = []
2643
2789
import traceback
2644
2790
# Skip the current stack down to the caller of
3267
3426
class TestDecorator(TestUtil.TestSuite):
3268
3427
"""A decorator for TestCase/TestSuite objects.
3270
Usually, subclasses should override __iter__(used when flattening test
3271
suites), which we do to filter, reorder, parallelise and so on, run() and
3429
Contains rather than flattening suite passed on construction
3275
def __init__(self, suite):
3276
TestUtil.TestSuite.__init__(self)
3279
def countTestCases(self):
3282
cases += test.countTestCases()
3289
def run(self, result):
3290
# Use iteration on self, not self._tests, to allow subclasses to hook
3293
if result.shouldStop:
3432
def __init__(self, suite=None):
3433
super(TestDecorator, self).__init__()
3434
if suite is not None:
3437
# Don't need subclass run method with suite emptying
3438
run = unittest.TestSuite.run
3299
3441
class CountingDecorator(TestDecorator):
3310
3452
"""A decorator which excludes test matching an exclude pattern."""
3312
3454
def __init__(self, suite, exclude_pattern):
3313
TestDecorator.__init__(self, suite)
3314
self.exclude_pattern = exclude_pattern
3315
self.excluded = False
3319
return iter(self._tests)
3320
self.excluded = True
3321
suite = exclude_tests_by_re(self, self.exclude_pattern)
3323
self.addTests(suite)
3324
return iter(self._tests)
3455
super(ExcludeDecorator, self).__init__(
3456
exclude_tests_by_re(suite, exclude_pattern))
3327
3459
class FilterTestsDecorator(TestDecorator):
3328
3460
"""A decorator which filters tests to those matching a pattern."""
3330
3462
def __init__(self, suite, pattern):
3331
TestDecorator.__init__(self, suite)
3332
self.pattern = pattern
3333
self.filtered = False
3337
return iter(self._tests)
3338
self.filtered = True
3339
suite = filter_suite_by_re(self, self.pattern)
3341
self.addTests(suite)
3342
return iter(self._tests)
3463
super(FilterTestsDecorator, self).__init__(
3464
filter_suite_by_re(suite, pattern))
3345
3467
class RandomDecorator(TestDecorator):
3346
3468
"""A decorator which randomises the order of its tests."""
3348
3470
def __init__(self, suite, random_seed, stream):
3349
TestDecorator.__init__(self, suite)
3350
self.random_seed = random_seed
3351
self.randomised = False
3352
self.stream = stream
3356
return iter(self._tests)
3357
self.randomised = True
3358
self.stream.write("Randomizing test order using seed %s\n\n" %
3359
(self.actual_seed()))
3471
random_seed = self.actual_seed(random_seed)
3472
stream.write("Randomizing test order using seed %s\n\n" %
3360
3474
# Initialise the random number generator.
3361
random.seed(self.actual_seed())
3362
suite = randomize_suite(self)
3364
self.addTests(suite)
3365
return iter(self._tests)
3475
random.seed(random_seed)
3476
super(RandomDecorator, self).__init__(randomize_suite(suite))
3367
def actual_seed(self):
3368
if self.random_seed == "now":
3479
def actual_seed(seed):
3369
3481
# We convert the seed to a long to make it reuseable across
3370
3482
# invocations (because the user can reenter it).
3371
self.random_seed = long(time.time())
3483
return long(time.time())
3373
3485
# Convert the seed to a long if we can
3375
self.random_seed = long(self.random_seed)
3488
except (TypeError, ValueError):
3378
return self.random_seed
3381
3493
class TestFirstDecorator(TestDecorator):
3382
3494
"""A decorator which moves named tests to the front."""
3384
3496
def __init__(self, suite, pattern):
3385
TestDecorator.__init__(self, suite)
3386
self.pattern = pattern
3387
self.filtered = False
3391
return iter(self._tests)
3392
self.filtered = True
3393
suites = split_suite_by_re(self, self.pattern)
3395
self.addTests(suites)
3396
return iter(self._tests)
3497
super(TestFirstDecorator, self).__init__()
3498
self.addTests(split_suite_by_re(suite, pattern))
3399
3501
def partition_tests(suite, count):
3444
3546
ProtocolTestCase.run(self, result)
3446
os.waitpid(self.pid, 0)
3548
pid, status = os.waitpid(self.pid, 0)
3549
# GZ 2011-10-18: If status is nonzero, should report to the result
3550
# that something went wrong.
3448
3552
test_blocks = partition_tests(suite, concurrency)
3553
# Clear the tests from the original suite so it doesn't keep them alive
3554
suite._tests[:] = []
3449
3555
for process_tests in test_blocks:
3450
process_suite = TestUtil.TestSuite()
3451
process_suite.addTests(process_tests)
3556
process_suite = TestUtil.TestSuite(process_tests)
3557
# Also clear each split list so new suite has only reference
3558
process_tests[:] = []
3452
3559
c2pread, c2pwrite = os.pipe()
3453
3560
pid = os.fork()
3455
workaround_zealous_crypto_random()
3563
stream = os.fdopen(c2pwrite, 'wb', 1)
3564
workaround_zealous_crypto_random()
3457
3565
os.close(c2pread)
3458
3566
# Leave stderr and stdout open so we can see test noise
3459
3567
# Close stdin so that the child goes away if it decides to
3460
3568
# read from stdin (otherwise its a roulette to see what
3461
3569
# child actually gets keystrokes for pdb etc).
3462
3570
sys.stdin.close()
3464
stream = os.fdopen(c2pwrite, 'wb', 1)
3465
3571
subunit_result = AutoTimingTestResultDecorator(
3466
TestProtocolClient(stream))
3572
SubUnitBzrProtocolClient(stream))
3467
3573
process_suite.run(subunit_result)
3575
# Try and report traceback on stream, but exit with error even
3576
# if stream couldn't be created or something else goes wrong.
3577
# The traceback is formatted to a string and written in one go
3578
# to avoid interleaving lines from multiple failing children.
3580
stream.write(traceback.format_exc())
3471
3585
os.close(c2pwrite)
3472
3586
stream = os.fdopen(c2pread, 'rb', 1)
3889
4003
'bzrlib.tests.test_email_message',
3890
4004
'bzrlib.tests.test_eol_filters',
3891
4005
'bzrlib.tests.test_errors',
4006
'bzrlib.tests.test_estimate_compressed_size',
3892
4007
'bzrlib.tests.test_export',
3893
4008
'bzrlib.tests.test_export_pot',
3894
4009
'bzrlib.tests.test_extract',
4010
'bzrlib.tests.test_features',
3895
4011
'bzrlib.tests.test_fetch',
3896
4012
'bzrlib.tests.test_fixtures',
3897
4013
'bzrlib.tests.test_fifo_cache',
3898
4014
'bzrlib.tests.test_filters',
4015
'bzrlib.tests.test_filter_tree',
3899
4016
'bzrlib.tests.test_ftp_transport',
3900
4017
'bzrlib.tests.test_foreign',
3901
4018
'bzrlib.tests.test_generate_docs',
4338
4460
% (os.path.basename(dirname), printable_e))
4341
class Feature(object):
4342
"""An operating system Feature."""
4345
self._available = None
4347
def available(self):
4348
"""Is the feature available?
4350
:return: True if the feature is available.
4352
if self._available is None:
4353
self._available = self._probe()
4354
return self._available
4357
"""Implement this method in concrete features.
4359
:return: True if the feature is available.
4361
raise NotImplementedError
4364
if getattr(self, 'feature_name', None):
4365
return self.feature_name()
4366
return self.__class__.__name__
4369
class _SymlinkFeature(Feature):
4372
return osutils.has_symlinks()
4374
def feature_name(self):
4377
SymlinkFeature = _SymlinkFeature()
4380
class _HardlinkFeature(Feature):
4383
return osutils.has_hardlinks()
4385
def feature_name(self):
4388
HardlinkFeature = _HardlinkFeature()
4391
class _OsFifoFeature(Feature):
4394
return getattr(os, 'mkfifo', None)
4396
def feature_name(self):
4397
return 'filesystem fifos'
4399
OsFifoFeature = _OsFifoFeature()
4402
class _UnicodeFilenameFeature(Feature):
4403
"""Does the filesystem support Unicode filenames?"""
4407
# Check for character combinations unlikely to be covered by any
4408
# single non-unicode encoding. We use the characters
4409
# - greek small letter alpha (U+03B1) and
4410
# - braille pattern dots-123456 (U+283F).
4411
os.stat(u'\u03b1\u283f')
4412
except UnicodeEncodeError:
4414
except (IOError, OSError):
4415
# The filesystem allows the Unicode filename but the file doesn't
4419
# The filesystem allows the Unicode filename and the file exists,
4423
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4426
class _CompatabilityThunkFeature(Feature):
4427
"""This feature is just a thunk to another feature.
4429
It issues a deprecation warning if it is accessed, to let you know that you
4430
should really use a different feature.
4433
def __init__(self, dep_version, module, name,
4434
replacement_name, replacement_module=None):
4435
super(_CompatabilityThunkFeature, self).__init__()
4436
self._module = module
4437
if replacement_module is None:
4438
replacement_module = module
4439
self._replacement_module = replacement_module
4441
self._replacement_name = replacement_name
4442
self._dep_version = dep_version
4443
self._feature = None
4446
if self._feature is None:
4447
depr_msg = self._dep_version % ('%s.%s'
4448
% (self._module, self._name))
4449
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4450
self._replacement_name)
4451
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4452
# Import the new feature and use it as a replacement for the
4454
self._feature = pyutils.get_named_object(
4455
self._replacement_module, self._replacement_name)
4459
return self._feature._probe()
4462
class ModuleAvailableFeature(Feature):
4463
"""This is a feature than describes a module we want to be available.
4465
Declare the name of the module in __init__(), and then after probing, the
4466
module will be available as 'self.module'.
4468
:ivar module: The module if it is available, else None.
4471
def __init__(self, module_name):
4472
super(ModuleAvailableFeature, self).__init__()
4473
self.module_name = module_name
4477
self._module = __import__(self.module_name, {}, {}, [''])
4484
if self.available(): # Make sure the probe has been done
4488
def feature_name(self):
4489
return self.module_name
4492
4463
def probe_unicode_in_user_encoding():
4493
4464
"""Try to encode several unicode strings to use in unicode-aware tests.
4494
4465
Return first successfull match.
4525
class _HTTPSServerFeature(Feature):
4526
"""Some tests want an https Server, check if one is available.
4528
Right now, the only way this is available is under python2.6 which provides
4539
def feature_name(self):
4540
return 'HTTPSServer'
4543
HTTPSServerFeature = _HTTPSServerFeature()
4546
class _UnicodeFilename(Feature):
4547
"""Does the filesystem support Unicode filenames?"""
4552
except UnicodeEncodeError:
4554
except (IOError, OSError):
4555
# The filesystem allows the Unicode filename but the file doesn't
4559
# The filesystem allows the Unicode filename and the file exists,
4563
UnicodeFilename = _UnicodeFilename()
4566
class _ByteStringNamedFilesystem(Feature):
4567
"""Is the filesystem based on bytes?"""
4570
if os.name == "posix":
4574
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4577
class _UTF8Filesystem(Feature):
4578
"""Is the filesystem UTF-8?"""
4581
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4585
UTF8Filesystem = _UTF8Filesystem()
4588
class _BreakinFeature(Feature):
4589
"""Does this platform support the breakin feature?"""
4592
from bzrlib import breakin
4593
if breakin.determine_signal() is None:
4595
if sys.platform == 'win32':
4596
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4597
# We trigger SIGBREAK via a Console api so we need ctypes to
4598
# access the function
4605
def feature_name(self):
4606
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4609
BreakinFeature = _BreakinFeature()
4612
class _CaseInsCasePresFilenameFeature(Feature):
4613
"""Is the file-system case insensitive, but case-preserving?"""
4616
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4618
# first check truly case-preserving for created files, then check
4619
# case insensitive when opening existing files.
4620
name = osutils.normpath(name)
4621
base, rel = osutils.split(name)
4622
found_rel = osutils.canonical_relpath(base, name)
4623
return (found_rel == rel
4624
and os.path.isfile(name.upper())
4625
and os.path.isfile(name.lower()))
4630
def feature_name(self):
4631
return "case-insensitive case-preserving filesystem"
4633
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4636
class _CaseInsensitiveFilesystemFeature(Feature):
4637
"""Check if underlying filesystem is case-insensitive but *not* case
4640
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4641
# more likely to be case preserving, so this case is rare.
4644
if CaseInsCasePresFilenameFeature.available():
4647
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4648
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4649
TestCaseWithMemoryTransport.TEST_ROOT = root
4651
root = TestCaseWithMemoryTransport.TEST_ROOT
4652
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4654
name_a = osutils.pathjoin(tdir, 'a')
4655
name_A = osutils.pathjoin(tdir, 'A')
4657
result = osutils.isdir(name_A)
4658
_rmtree_temp_dir(tdir)
4661
def feature_name(self):
4662
return 'case-insensitive filesystem'
4664
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4667
class _CaseSensitiveFilesystemFeature(Feature):
4670
if CaseInsCasePresFilenameFeature.available():
4672
elif CaseInsensitiveFilesystemFeature.available():
4677
def feature_name(self):
4678
return 'case-sensitive filesystem'
4680
# new coding style is for feature instances to be lowercase
4681
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4684
4496
# Only define SubUnitBzrRunner if subunit is available.
4686
4498
from subunit import TestProtocolClient
4687
4499
from subunit.test_results import AutoTimingTestResultDecorator
4688
4500
class SubUnitBzrProtocolClient(TestProtocolClient):
4502
def stopTest(self, test):
4503
super(SubUnitBzrProtocolClient, self).stopTest(test)
4504
_clear__type_equality_funcs(test)
4690
4506
def addSuccess(self, test, details=None):
4691
4507
# The subunit client always includes the details in the subunit
4692
4508
# stream, but we don't want to include it in ours.
4704
4520
except ImportError:
4707
class _PosixPermissionsFeature(Feature):
4711
# create temporary file and check if specified perms are maintained.
4714
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4715
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4718
os.chmod(name, write_perms)
4720
read_perms = os.stat(name).st_mode & 0777
4722
return (write_perms == read_perms)
4724
return (os.name == 'posix') and has_perms()
4726
def feature_name(self):
4727
return 'POSIX permissions support'
4729
posix_permissions_feature = _PosixPermissionsFeature()
4524
# API compatibility for old plugins; see bug 892622.
4527
'HTTPServerFeature',
4528
'ModuleAvailableFeature',
4529
'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
4530
'OsFifoFeature', 'UnicodeFilenameFeature',
4531
'ByteStringNamedFilesystem', 'UTF8Filesystem',
4532
'BreakinFeature', 'CaseInsCasePresFilenameFeature',
4533
'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
4534
'posix_permissions_feature',
4536
globals()[name] = _CompatabilityThunkFeature(
4537
symbol_versioning.deprecated_in((2, 5, 0)),
4538
'bzrlib.tests', name,
4539
name, 'bzrlib.tests.features')
4542
for (old_name, new_name) in [
4543
('UnicodeFilename', 'UnicodeFilenameFeature'),
4545
globals()[name] = _CompatabilityThunkFeature(
4546
symbol_versioning.deprecated_in((2, 5, 0)),
4547
'bzrlib.tests', old_name,
4548
new_name, 'bzrlib.tests.features')