212
213
osutils.set_or_unset_env(var, value)
215
def _clear__type_equality_funcs(test):
216
"""Cleanup bound methods stored on TestCase instances
218
Clear the dict breaking a few (mostly) harmless cycles in the affected
219
unittests released with Python 2.6 and initial Python 2.7 versions.
221
For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
shipped in Oneiric, an object with no clear method was used, hence the
223
extra complications, see bug 809048 for details.
225
type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
if type_equality_funcs is not None:
227
tef_clear = getattr(type_equality_funcs, "clear", None)
228
if tef_clear is None:
229
tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
if tef_instance_dict is not None:
231
tef_clear = tef_instance_dict.clear
232
if tef_clear is not None:
236
216
class ExtendedTestResult(testtools.TextTestResult):
237
217
"""Accepts, reports and accumulates the results of running tests.
1792
1760
self.addCleanup(osutils.set_or_unset_env, name, value)
1795
def recordCalls(self, obj, attr_name):
1796
"""Monkeypatch in a wrapper that will record calls.
1798
The monkeypatch is automatically removed when the test concludes.
1800
:param obj: The namespace holding the reference to be replaced;
1801
typically a module, class, or object.
1802
:param attr_name: A string for the name of the attribute to
1804
:returns: A list that will be extended with one item every time the
1805
function is called, with a tuple of (args, kwargs).
1809
def decorator(*args, **kwargs):
1810
calls.append((args, kwargs))
1811
return orig(*args, **kwargs)
1812
orig = self.overrideAttr(obj, attr_name, decorator)
1815
1763
def _cleanEnvironment(self):
1816
1764
for name, value in isolated_environ.iteritems():
1817
1765
self.overrideEnv(name, value)
1824
1772
self._preserved_lazy_hooks.clear()
1826
1774
def knownFailure(self, reason):
1827
"""Declare that this test fails for a known reason
1829
Tests that are known to fail should generally be using expectedFailure
1830
with an appropriate reverse assertion if a change could cause the test
1831
to start passing. Conversely if the test has no immediate prospect of
1832
succeeding then using skip is more suitable.
1834
When this method is called while an exception is being handled, that
1835
traceback will be used, otherwise a new exception will be thrown to
1836
provide one but won't be reported.
1838
self._add_reason(reason)
1840
exc_info = sys.exc_info()
1841
if exc_info != (None, None, None):
1842
self._report_traceback(exc_info)
1845
raise self.failureException(reason)
1846
except self.failureException:
1847
exc_info = sys.exc_info()
1848
# GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1849
raise testtools.testcase._ExpectedFailure(exc_info)
1775
"""This test has failed for some known reason."""
1776
raise KnownFailure(reason)
1853
1778
def _suppress_log(self):
1854
1779
"""Remove the log info from details."""
2583
2507
root = TestCaseWithMemoryTransport.TEST_ROOT
2584
# Make sure we get a readable and accessible home for .bzr.log
2585
# and/or config files, and not fallback to weird defaults (see
2586
# http://pad.lv/825027).
2587
self.assertIs(None, os.environ.get('BZR_HOME', None))
2588
os.environ['BZR_HOME'] = root
2589
wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2590
del os.environ['BZR_HOME']
2591
# Hack for speed: remember the raw bytes of the dirstate file so that
2592
# we don't need to re-open the wt to check it hasn't changed.
2593
TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2594
wt.control_transport.get_bytes('dirstate'))
2508
bzrdir.BzrDir.create_standalone_workingtree(root)
2596
2510
def _check_safety_net(self):
2597
2511
"""Check that the safety .bzr directory have not been touched.
2644
2558
def make_branch(self, relpath, format=None):
2645
2559
"""Create a branch on the transport at relpath."""
2646
2560
repo = self.make_repository(relpath, format=format)
2647
return repo.bzrdir.create_branch(append_revisions_only=False)
2649
def get_default_format(self):
2652
def resolve_format(self, format):
2653
"""Resolve an object to a ControlDir format object.
2655
The initial format object can either already be
2656
a ControlDirFormat, None (for the default format),
2657
or a string with the name of the control dir format.
2659
:param format: Object to resolve
2660
:return A ControlDirFormat instance
2663
format = self.get_default_format()
2664
if isinstance(format, basestring):
2665
format = bzrdir.format_registry.make_bzrdir(format)
2561
return repo.bzrdir.create_branch()
2668
2563
def make_bzrdir(self, relpath, format=None):
2673
2568
t = _mod_transport.get_transport(maybe_a_url)
2674
2569
if len(segments) > 1 and segments[-1] not in ('', '.'):
2675
2570
t.ensure_base()
2676
format = self.resolve_format(format)
2573
if isinstance(format, basestring):
2574
format = bzrdir.format_registry.make_bzrdir(format)
2677
2575
return format.initialize_on_transport(t)
2678
2576
except errors.UninitializableFormat:
2679
2577
raise TestSkipped("Format %s is not initializable." % format)
2681
def make_repository(self, relpath, shared=None, format=None):
2579
def make_repository(self, relpath, shared=False, format=None):
2682
2580
"""Create a repository on our default transport at relpath.
2684
2582
Note that relpath must be a relative path, not a full url.
2718
2616
def setUp(self):
2719
2617
super(TestCaseWithMemoryTransport, self).setUp()
2720
2618
# Ensure that ConnectedTransport doesn't leak sockets
2721
def get_transport_from_url_with_cleanup(*args, **kwargs):
2722
t = orig_get_transport_from_url(*args, **kwargs)
2619
def get_transport_with_cleanup(*args, **kwargs):
2620
t = orig_get_transport(*args, **kwargs)
2723
2621
if isinstance(t, _mod_transport.ConnectedTransport):
2724
2622
self.addCleanup(t.disconnect)
2727
orig_get_transport_from_url = self.overrideAttr(
2728
_mod_transport, 'get_transport_from_url',
2729
get_transport_from_url_with_cleanup)
2625
orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2626
get_transport_with_cleanup)
2730
2627
self._make_test_root()
2731
2628
self.addCleanup(os.chdir, os.getcwdu())
2732
2629
self.makeAndChdirToTestDir()
3373
3263
class TestDecorator(TestUtil.TestSuite):
3374
3264
"""A decorator for TestCase/TestSuite objects.
3376
Contains rather than flattening suite passed on construction
3266
Usually, subclasses should override __iter__(used when flattening test
3267
suites), which we do to filter, reorder, parallelise and so on, run() and
3379
def __init__(self, suite=None):
3380
super(TestDecorator, self).__init__()
3381
if suite is not None:
3384
# Don't need subclass run method with suite emptying
3385
run = unittest.TestSuite.run
3271
def __init__(self, suite):
3272
TestUtil.TestSuite.__init__(self)
3275
def countTestCases(self):
3278
cases += test.countTestCases()
3285
def run(self, result):
3286
# Use iteration on self, not self._tests, to allow subclasses to hook
3289
if result.shouldStop:
3388
3295
class CountingDecorator(TestDecorator):
3399
3306
"""A decorator which excludes test matching an exclude pattern."""
3401
3308
def __init__(self, suite, exclude_pattern):
3402
super(ExcludeDecorator, self).__init__(
3403
exclude_tests_by_re(suite, exclude_pattern))
3309
TestDecorator.__init__(self, suite)
3310
self.exclude_pattern = exclude_pattern
3311
self.excluded = False
3315
return iter(self._tests)
3316
self.excluded = True
3317
suite = exclude_tests_by_re(self, self.exclude_pattern)
3319
self.addTests(suite)
3320
return iter(self._tests)
3406
3323
class FilterTestsDecorator(TestDecorator):
3407
3324
"""A decorator which filters tests to those matching a pattern."""
3409
3326
def __init__(self, suite, pattern):
3410
super(FilterTestsDecorator, self).__init__(
3411
filter_suite_by_re(suite, pattern))
3327
TestDecorator.__init__(self, suite)
3328
self.pattern = pattern
3329
self.filtered = False
3333
return iter(self._tests)
3334
self.filtered = True
3335
suite = filter_suite_by_re(self, self.pattern)
3337
self.addTests(suite)
3338
return iter(self._tests)
3414
3341
class RandomDecorator(TestDecorator):
3415
3342
"""A decorator which randomises the order of its tests."""
3417
3344
def __init__(self, suite, random_seed, stream):
3418
random_seed = self.actual_seed(random_seed)
3419
stream.write("Randomizing test order using seed %s\n\n" %
3345
TestDecorator.__init__(self, suite)
3346
self.random_seed = random_seed
3347
self.randomised = False
3348
self.stream = stream
3352
return iter(self._tests)
3353
self.randomised = True
3354
self.stream.write("Randomizing test order using seed %s\n\n" %
3355
(self.actual_seed()))
3421
3356
# Initialise the random number generator.
3422
random.seed(random_seed)
3423
super(RandomDecorator, self).__init__(randomize_suite(suite))
3357
random.seed(self.actual_seed())
3358
suite = randomize_suite(self)
3360
self.addTests(suite)
3361
return iter(self._tests)
3426
def actual_seed(seed):
3363
def actual_seed(self):
3364
if self.random_seed == "now":
3428
3365
# We convert the seed to a long to make it reuseable across
3429
3366
# invocations (because the user can reenter it).
3430
return long(time.time())
3367
self.random_seed = long(time.time())
3432
3369
# Convert the seed to a long if we can
3435
except (TypeError, ValueError):
3371
self.random_seed = long(self.random_seed)
3374
return self.random_seed
3440
3377
class TestFirstDecorator(TestDecorator):
3441
3378
"""A decorator which moves named tests to the front."""
3443
3380
def __init__(self, suite, pattern):
3444
super(TestFirstDecorator, self).__init__()
3445
self.addTests(split_suite_by_re(suite, pattern))
3381
TestDecorator.__init__(self, suite)
3382
self.pattern = pattern
3383
self.filtered = False
3387
return iter(self._tests)
3388
self.filtered = True
3389
suites = split_suite_by_re(self, self.pattern)
3391
self.addTests(suites)
3392
return iter(self._tests)
3448
3395
def partition_tests(suite, count):
3945
3885
'bzrlib.tests.test_email_message',
3946
3886
'bzrlib.tests.test_eol_filters',
3947
3887
'bzrlib.tests.test_errors',
3948
'bzrlib.tests.test_estimate_compressed_size',
3949
3888
'bzrlib.tests.test_export',
3950
3889
'bzrlib.tests.test_export_pot',
3951
3890
'bzrlib.tests.test_extract',
3952
'bzrlib.tests.test_features',
3953
3891
'bzrlib.tests.test_fetch',
3954
3892
'bzrlib.tests.test_fixtures',
3955
3893
'bzrlib.tests.test_fifo_cache',
3956
3894
'bzrlib.tests.test_filters',
3957
'bzrlib.tests.test_filter_tree',
3958
3895
'bzrlib.tests.test_ftp_transport',
3959
3896
'bzrlib.tests.test_foreign',
3960
3897
'bzrlib.tests.test_generate_docs',
4399
4334
% (os.path.basename(dirname), printable_e))
4337
class Feature(object):
4338
"""An operating system Feature."""
4341
self._available = None
4343
def available(self):
4344
"""Is the feature available?
4346
:return: True if the feature is available.
4348
if self._available is None:
4349
self._available = self._probe()
4350
return self._available
4353
"""Implement this method in concrete features.
4355
:return: True if the feature is available.
4357
raise NotImplementedError
4360
if getattr(self, 'feature_name', None):
4361
return self.feature_name()
4362
return self.__class__.__name__
4365
class _SymlinkFeature(Feature):
4368
return osutils.has_symlinks()
4370
def feature_name(self):
4373
SymlinkFeature = _SymlinkFeature()
4376
class _HardlinkFeature(Feature):
4379
return osutils.has_hardlinks()
4381
def feature_name(self):
4384
HardlinkFeature = _HardlinkFeature()
4387
class _OsFifoFeature(Feature):
4390
return getattr(os, 'mkfifo', None)
4392
def feature_name(self):
4393
return 'filesystem fifos'
4395
OsFifoFeature = _OsFifoFeature()
4398
class _UnicodeFilenameFeature(Feature):
4399
"""Does the filesystem support Unicode filenames?"""
4403
# Check for character combinations unlikely to be covered by any
4404
# single non-unicode encoding. We use the characters
4405
# - greek small letter alpha (U+03B1) and
4406
# - braille pattern dots-123456 (U+283F).
4407
os.stat(u'\u03b1\u283f')
4408
except UnicodeEncodeError:
4410
except (IOError, OSError):
4411
# The filesystem allows the Unicode filename but the file doesn't
4415
# The filesystem allows the Unicode filename and the file exists,
4419
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4422
class _CompatabilityThunkFeature(Feature):
4423
"""This feature is just a thunk to another feature.
4425
It issues a deprecation warning if it is accessed, to let you know that you
4426
should really use a different feature.
4429
def __init__(self, dep_version, module, name,
4430
replacement_name, replacement_module=None):
4431
super(_CompatabilityThunkFeature, self).__init__()
4432
self._module = module
4433
if replacement_module is None:
4434
replacement_module = module
4435
self._replacement_module = replacement_module
4437
self._replacement_name = replacement_name
4438
self._dep_version = dep_version
4439
self._feature = None
4442
if self._feature is None:
4443
depr_msg = self._dep_version % ('%s.%s'
4444
% (self._module, self._name))
4445
use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4446
self._replacement_name)
4447
symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4448
# Import the new feature and use it as a replacement for the
4450
self._feature = pyutils.get_named_object(
4451
self._replacement_module, self._replacement_name)
4455
return self._feature._probe()
4458
class ModuleAvailableFeature(Feature):
4459
"""This is a feature than describes a module we want to be available.
4461
Declare the name of the module in __init__(), and then after probing, the
4462
module will be available as 'self.module'.
4464
:ivar module: The module if it is available, else None.
4467
def __init__(self, module_name):
4468
super(ModuleAvailableFeature, self).__init__()
4469
self.module_name = module_name
4473
self._module = __import__(self.module_name, {}, {}, [''])
4480
if self.available(): # Make sure the probe has been done
4484
def feature_name(self):
4485
return self.module_name
4402
4488
def probe_unicode_in_user_encoding():
4403
4489
"""Try to encode several unicode strings to use in unicode-aware tests.
4404
4490
Return first successfull match.
4521
class _HTTPSServerFeature(Feature):
4522
"""Some tests want an https Server, check if one is available.
4524
Right now, the only way this is available is under python2.6 which provides
4535
def feature_name(self):
4536
return 'HTTPSServer'
4539
HTTPSServerFeature = _HTTPSServerFeature()
4542
class _UnicodeFilename(Feature):
4543
"""Does the filesystem support Unicode filenames?"""
4548
except UnicodeEncodeError:
4550
except (IOError, OSError):
4551
# The filesystem allows the Unicode filename but the file doesn't
4555
# The filesystem allows the Unicode filename and the file exists,
4559
UnicodeFilename = _UnicodeFilename()
4562
class _ByteStringNamedFilesystem(Feature):
4563
"""Is the filesystem based on bytes?"""
4566
if os.name == "posix":
4570
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4573
class _UTF8Filesystem(Feature):
4574
"""Is the filesystem UTF-8?"""
4577
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4581
UTF8Filesystem = _UTF8Filesystem()
4584
class _BreakinFeature(Feature):
4585
"""Does this platform support the breakin feature?"""
4588
from bzrlib import breakin
4589
if breakin.determine_signal() is None:
4591
if sys.platform == 'win32':
4592
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4593
# We trigger SIGBREAK via a Console api so we need ctypes to
4594
# access the function
4601
def feature_name(self):
4602
return "SIGQUIT or SIGBREAK w/ctypes on win32"
4605
BreakinFeature = _BreakinFeature()
4608
class _CaseInsCasePresFilenameFeature(Feature):
4609
"""Is the file-system case insensitive, but case-preserving?"""
4612
fileno, name = tempfile.mkstemp(prefix='MixedCase')
4614
# first check truly case-preserving for created files, then check
4615
# case insensitive when opening existing files.
4616
name = osutils.normpath(name)
4617
base, rel = osutils.split(name)
4618
found_rel = osutils.canonical_relpath(base, name)
4619
return (found_rel == rel
4620
and os.path.isfile(name.upper())
4621
and os.path.isfile(name.lower()))
4626
def feature_name(self):
4627
return "case-insensitive case-preserving filesystem"
4629
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4632
class _CaseInsensitiveFilesystemFeature(Feature):
4633
"""Check if underlying filesystem is case-insensitive but *not* case
4636
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4637
# more likely to be case preserving, so this case is rare.
4640
if CaseInsCasePresFilenameFeature.available():
4643
if TestCaseWithMemoryTransport.TEST_ROOT is None:
4644
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4645
TestCaseWithMemoryTransport.TEST_ROOT = root
4647
root = TestCaseWithMemoryTransport.TEST_ROOT
4648
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4650
name_a = osutils.pathjoin(tdir, 'a')
4651
name_A = osutils.pathjoin(tdir, 'A')
4653
result = osutils.isdir(name_A)
4654
_rmtree_temp_dir(tdir)
4657
def feature_name(self):
4658
return 'case-insensitive filesystem'
4660
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4663
class _CaseSensitiveFilesystemFeature(Feature):
4666
if CaseInsCasePresFilenameFeature.available():
4668
elif CaseInsensitiveFilesystemFeature.available():
4673
def feature_name(self):
4674
return 'case-sensitive filesystem'
4676
# new coding style is for feature instances to be lowercase
4677
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4435
4680
# Only define SubUnitBzrRunner if subunit is available.
4437
4682
from subunit import TestProtocolClient
4438
4683
from subunit.test_results import AutoTimingTestResultDecorator
4439
4684
class SubUnitBzrProtocolClient(TestProtocolClient):
4441
def stopTest(self, test):
4442
super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
_clear__type_equality_funcs(test)
4445
4686
def addSuccess(self, test, details=None):
4446
4687
# The subunit client always includes the details in the subunit
4447
4688
# stream, but we don't want to include it in ours.
4459
4700
except ImportError:
4463
@deprecated_function(deprecated_in((2, 5, 0)))
4464
def ModuleAvailableFeature(name):
4465
from bzrlib.tests import features
4466
return features.ModuleAvailableFeature(name)
4703
class _PosixPermissionsFeature(Feature):
4707
# create temporary file and check if specified perms are maintained.
4710
write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4711
f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4714
os.chmod(name, write_perms)
4716
read_perms = os.stat(name).st_mode & 0777
4718
return (write_perms == read_perms)
4720
return (os.name == 'posix') and has_perms()
4722
def feature_name(self):
4723
return 'POSIX permissions support'
4725
posix_permissions_feature = _PosixPermissionsFeature()