~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-06-28 19:12:10 UTC
  • mfrom: (5967.7.4 rm-magic-methods)
  • Revision ID: pqm@pqm.ubuntu.com-20110628191210-bwblsxn26kyu3swl
(mbp) remove __contains__ methods from inventory and dict (Martin Pool)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
88
94
    memory,
89
95
    pathfilter,
90
96
    )
91
 
from bzrlib.symbol_versioning import (
92
 
    deprecated_function,
93
 
    deprecated_in,
94
 
    )
95
97
from bzrlib.tests import (
96
98
    test_server,
97
99
    TestUtil,
1161
1163
 
1162
1164
    def permit_dir(self, name):
1163
1165
        """Permit a directory to be used by this test. See permit_url."""
1164
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1166
        name_transport = _mod_transport.get_transport(name)
1165
1167
        self.permit_url(name)
1166
1168
        self.permit_url(name_transport.base)
1167
1169
 
1246
1248
        self.addCleanup(transport_server.stop_server)
1247
1249
        # Obtain a real transport because if the server supplies a password, it
1248
1250
        # will be hidden from the base on the client side.
1249
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1251
        t = _mod_transport.get_transport(transport_server.get_url())
1250
1252
        # Some transport servers effectively chroot the backing transport;
1251
1253
        # others like SFTPServer don't - users of the transport can walk up the
1252
1254
        # transport to read the entire backing transport. This wouldn't matter
1728
1730
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1729
1731
        """Overrides an object attribute restoring it after the test.
1730
1732
 
1731
 
        :note: This should be used with discretion; you should think about
1732
 
        whether it's better to make the code testable without monkey-patching.
1733
 
 
1734
1733
        :param obj: The object that will be mutated.
1735
1734
 
1736
1735
        :param attr_name: The attribute name we want to preserve/override in
1761
1760
        self.addCleanup(osutils.set_or_unset_env, name, value)
1762
1761
        return value
1763
1762
 
1764
 
    def recordCalls(self, obj, attr_name):
1765
 
        """Monkeypatch in a wrapper that will record calls.
1766
 
 
1767
 
        The monkeypatch is automatically removed when the test concludes.
1768
 
 
1769
 
        :param obj: The namespace holding the reference to be replaced;
1770
 
            typically a module, class, or object.
1771
 
        :param attr_name: A string for the name of the attribute to 
1772
 
            patch.
1773
 
        :returns: A list that will be extended with one item every time the
1774
 
            function is called, with a tuple of (args, kwargs).
1775
 
        """
1776
 
        calls = []
1777
 
 
1778
 
        def decorator(*args, **kwargs):
1779
 
            calls.append((args, kwargs))
1780
 
            return orig(*args, **kwargs)
1781
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1782
 
        return calls
1783
 
 
1784
1763
    def _cleanEnvironment(self):
1785
1764
        for name, value in isolated_environ.iteritems():
1786
1765
            self.overrideEnv(name, value)
1793
1772
        self._preserved_lazy_hooks.clear()
1794
1773
 
1795
1774
    def knownFailure(self, reason):
1796
 
        """Declare that this test fails for a known reason
1797
 
 
1798
 
        Tests that are known to fail should generally be using expectedFailure
1799
 
        with an appropriate reverse assertion if a change could cause the test
1800
 
        to start passing. Conversely if the test has no immediate prospect of
1801
 
        succeeding then using skip is more suitable.
1802
 
 
1803
 
        When this method is called while an exception is being handled, that
1804
 
        traceback will be used, otherwise a new exception will be thrown to
1805
 
        provide one but won't be reported.
1806
 
        """
1807
 
        self._add_reason(reason)
1808
 
        try:
1809
 
            exc_info = sys.exc_info()
1810
 
            if exc_info != (None, None, None):
1811
 
                self._report_traceback(exc_info)
1812
 
            else:
1813
 
                try:
1814
 
                    raise self.failureException(reason)
1815
 
                except self.failureException:
1816
 
                    exc_info = sys.exc_info()
1817
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1818
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1819
 
        finally:
1820
 
            del exc_info
 
1775
        """This test has failed for some known reason."""
 
1776
        raise KnownFailure(reason)
1821
1777
 
1822
1778
    def _suppress_log(self):
1823
1779
        """Remove the log info from details."""
2410
2366
 
2411
2367
        :param relpath: a path relative to the base url.
2412
2368
        """
2413
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2369
        t = _mod_transport.get_transport(self.get_url(relpath))
2414
2370
        self.assertFalse(t.is_readonly())
2415
2371
        return t
2416
2372
 
2549
2505
        real branch.
2550
2506
        """
2551
2507
        root = TestCaseWithMemoryTransport.TEST_ROOT
2552
 
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2553
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2554
 
        # we don't need to re-open the wt to check it hasn't changed.
2555
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2556
 
            wt.control_transport.get_bytes('dirstate'))
 
2508
        bzrdir.BzrDir.create_standalone_workingtree(root)
2557
2509
 
2558
2510
    def _check_safety_net(self):
2559
2511
        """Check that the safety .bzr directory have not been touched.
2562
2514
        propagating. This method ensures than a test did not leaked.
2563
2515
        """
2564
2516
        root = TestCaseWithMemoryTransport.TEST_ROOT
2565
 
        t = _mod_transport.get_transport(root)
2566
 
        self.permit_url(t.base)
2567
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2568
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2517
        self.permit_url(_mod_transport.get_transport(root).base)
 
2518
        wt = workingtree.WorkingTree.open(root)
 
2519
        last_rev = wt.last_revision()
 
2520
        if last_rev != 'null:':
2569
2521
            # The current test have modified the /bzr directory, we need to
2570
2522
            # recreate a new one or all the followng tests will fail.
2571
2523
            # If you need to inspect its content uncomment the following line
2641
2593
            backing_server = self.get_server()
2642
2594
        smart_server = test_server.SmartTCPServer_for_testing()
2643
2595
        self.start_server(smart_server, backing_server)
2644
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2596
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2645
2597
                                                   ).clone(path)
2646
2598
        return remote_transport
2647
2599
 
2664
2616
    def setUp(self):
2665
2617
        super(TestCaseWithMemoryTransport, self).setUp()
2666
2618
        # Ensure that ConnectedTransport doesn't leak sockets
2667
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2668
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2619
        def get_transport_with_cleanup(*args, **kwargs):
 
2620
            t = orig_get_transport(*args, **kwargs)
2669
2621
            if isinstance(t, _mod_transport.ConnectedTransport):
2670
2622
                self.addCleanup(t.disconnect)
2671
2623
            return t
2672
2624
 
2673
 
        orig_get_transport_from_url = self.overrideAttr(
2674
 
            _mod_transport, 'get_transport_from_url',
2675
 
            get_transport_from_url_with_cleanup)
 
2625
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2626
                                               get_transport_with_cleanup)
2676
2627
        self._make_test_root()
2677
2628
        self.addCleanup(os.chdir, os.getcwdu())
2678
2629
        self.makeAndChdirToTestDir()
3937
3888
        'bzrlib.tests.test_export',
3938
3889
        'bzrlib.tests.test_export_pot',
3939
3890
        'bzrlib.tests.test_extract',
3940
 
        'bzrlib.tests.test_features',
3941
3891
        'bzrlib.tests.test_fetch',
3942
3892
        'bzrlib.tests.test_fixtures',
3943
3893
        'bzrlib.tests.test_fifo_cache',
3944
3894
        'bzrlib.tests.test_filters',
3945
 
        'bzrlib.tests.test_filter_tree',
3946
3895
        'bzrlib.tests.test_ftp_transport',
3947
3896
        'bzrlib.tests.test_foreign',
3948
3897
        'bzrlib.tests.test_generate_docs',
4339
4288
        the module is available.
4340
4289
    """
4341
4290
 
4342
 
    from bzrlib.tests.features import ModuleAvailableFeature
4343
4291
    py_module = pyutils.get_named_object(py_module_name)
4344
4292
    scenarios = [
4345
4293
        ('python', {'module': py_module}),
4386
4334
                         % (os.path.basename(dirname), printable_e))
4387
4335
 
4388
4336
 
 
4337
class Feature(object):
 
4338
    """An operating system Feature."""
 
4339
 
 
4340
    def __init__(self):
 
4341
        self._available = None
 
4342
 
 
4343
    def available(self):
 
4344
        """Is the feature available?
 
4345
 
 
4346
        :return: True if the feature is available.
 
4347
        """
 
4348
        if self._available is None:
 
4349
            self._available = self._probe()
 
4350
        return self._available
 
4351
 
 
4352
    def _probe(self):
 
4353
        """Implement this method in concrete features.
 
4354
 
 
4355
        :return: True if the feature is available.
 
4356
        """
 
4357
        raise NotImplementedError
 
4358
 
 
4359
    def __str__(self):
 
4360
        if getattr(self, 'feature_name', None):
 
4361
            return self.feature_name()
 
4362
        return self.__class__.__name__
 
4363
 
 
4364
 
 
4365
class _SymlinkFeature(Feature):
 
4366
 
 
4367
    def _probe(self):
 
4368
        return osutils.has_symlinks()
 
4369
 
 
4370
    def feature_name(self):
 
4371
        return 'symlinks'
 
4372
 
 
4373
SymlinkFeature = _SymlinkFeature()
 
4374
 
 
4375
 
 
4376
class _HardlinkFeature(Feature):
 
4377
 
 
4378
    def _probe(self):
 
4379
        return osutils.has_hardlinks()
 
4380
 
 
4381
    def feature_name(self):
 
4382
        return 'hardlinks'
 
4383
 
 
4384
HardlinkFeature = _HardlinkFeature()
 
4385
 
 
4386
 
 
4387
class _OsFifoFeature(Feature):
 
4388
 
 
4389
    def _probe(self):
 
4390
        return getattr(os, 'mkfifo', None)
 
4391
 
 
4392
    def feature_name(self):
 
4393
        return 'filesystem fifos'
 
4394
 
 
4395
OsFifoFeature = _OsFifoFeature()
 
4396
 
 
4397
 
 
4398
class _UnicodeFilenameFeature(Feature):
 
4399
    """Does the filesystem support Unicode filenames?"""
 
4400
 
 
4401
    def _probe(self):
 
4402
        try:
 
4403
            # Check for character combinations unlikely to be covered by any
 
4404
            # single non-unicode encoding. We use the characters
 
4405
            # - greek small letter alpha (U+03B1) and
 
4406
            # - braille pattern dots-123456 (U+283F).
 
4407
            os.stat(u'\u03b1\u283f')
 
4408
        except UnicodeEncodeError:
 
4409
            return False
 
4410
        except (IOError, OSError):
 
4411
            # The filesystem allows the Unicode filename but the file doesn't
 
4412
            # exist.
 
4413
            return True
 
4414
        else:
 
4415
            # The filesystem allows the Unicode filename and the file exists,
 
4416
            # for some reason.
 
4417
            return True
 
4418
 
 
4419
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4420
 
 
4421
 
 
4422
class _CompatabilityThunkFeature(Feature):
 
4423
    """This feature is just a thunk to another feature.
 
4424
 
 
4425
    It issues a deprecation warning if it is accessed, to let you know that you
 
4426
    should really use a different feature.
 
4427
    """
 
4428
 
 
4429
    def __init__(self, dep_version, module, name,
 
4430
                 replacement_name, replacement_module=None):
 
4431
        super(_CompatabilityThunkFeature, self).__init__()
 
4432
        self._module = module
 
4433
        if replacement_module is None:
 
4434
            replacement_module = module
 
4435
        self._replacement_module = replacement_module
 
4436
        self._name = name
 
4437
        self._replacement_name = replacement_name
 
4438
        self._dep_version = dep_version
 
4439
        self._feature = None
 
4440
 
 
4441
    def _ensure(self):
 
4442
        if self._feature is None:
 
4443
            depr_msg = self._dep_version % ('%s.%s'
 
4444
                                            % (self._module, self._name))
 
4445
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4446
                                               self._replacement_name)
 
4447
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4448
            # Import the new feature and use it as a replacement for the
 
4449
            # deprecated one.
 
4450
            self._feature = pyutils.get_named_object(
 
4451
                self._replacement_module, self._replacement_name)
 
4452
 
 
4453
    def _probe(self):
 
4454
        self._ensure()
 
4455
        return self._feature._probe()
 
4456
 
 
4457
 
 
4458
class ModuleAvailableFeature(Feature):
 
4459
    """This is a feature than describes a module we want to be available.
 
4460
 
 
4461
    Declare the name of the module in __init__(), and then after probing, the
 
4462
    module will be available as 'self.module'.
 
4463
 
 
4464
    :ivar module: The module if it is available, else None.
 
4465
    """
 
4466
 
 
4467
    def __init__(self, module_name):
 
4468
        super(ModuleAvailableFeature, self).__init__()
 
4469
        self.module_name = module_name
 
4470
 
 
4471
    def _probe(self):
 
4472
        try:
 
4473
            self._module = __import__(self.module_name, {}, {}, [''])
 
4474
            return True
 
4475
        except ImportError:
 
4476
            return False
 
4477
 
 
4478
    @property
 
4479
    def module(self):
 
4480
        if self.available(): # Make sure the probe has been done
 
4481
            return self._module
 
4482
        return None
 
4483
 
 
4484
    def feature_name(self):
 
4485
        return self.module_name
 
4486
 
 
4487
 
4389
4488
def probe_unicode_in_user_encoding():
4390
4489
    """Try to encode several unicode strings to use in unicode-aware tests.
4391
4490
    Return first successfull match.
4419
4518
    return None
4420
4519
 
4421
4520
 
 
4521
class _HTTPSServerFeature(Feature):
 
4522
    """Some tests want an https Server, check if one is available.
 
4523
 
 
4524
    Right now, the only way this is available is under python2.6 which provides
 
4525
    an ssl module.
 
4526
    """
 
4527
 
 
4528
    def _probe(self):
 
4529
        try:
 
4530
            import ssl
 
4531
            return True
 
4532
        except ImportError:
 
4533
            return False
 
4534
 
 
4535
    def feature_name(self):
 
4536
        return 'HTTPSServer'
 
4537
 
 
4538
 
 
4539
HTTPSServerFeature = _HTTPSServerFeature()
 
4540
 
 
4541
 
 
4542
class _UnicodeFilename(Feature):
 
4543
    """Does the filesystem support Unicode filenames?"""
 
4544
 
 
4545
    def _probe(self):
 
4546
        try:
 
4547
            os.stat(u'\u03b1')
 
4548
        except UnicodeEncodeError:
 
4549
            return False
 
4550
        except (IOError, OSError):
 
4551
            # The filesystem allows the Unicode filename but the file doesn't
 
4552
            # exist.
 
4553
            return True
 
4554
        else:
 
4555
            # The filesystem allows the Unicode filename and the file exists,
 
4556
            # for some reason.
 
4557
            return True
 
4558
 
 
4559
UnicodeFilename = _UnicodeFilename()
 
4560
 
 
4561
 
 
4562
class _ByteStringNamedFilesystem(Feature):
 
4563
    """Is the filesystem based on bytes?"""
 
4564
 
 
4565
    def _probe(self):
 
4566
        if os.name == "posix":
 
4567
            return True
 
4568
        return False
 
4569
 
 
4570
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4571
 
 
4572
 
 
4573
class _UTF8Filesystem(Feature):
 
4574
    """Is the filesystem UTF-8?"""
 
4575
 
 
4576
    def _probe(self):
 
4577
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4578
            return True
 
4579
        return False
 
4580
 
 
4581
UTF8Filesystem = _UTF8Filesystem()
 
4582
 
 
4583
 
 
4584
class _BreakinFeature(Feature):
 
4585
    """Does this platform support the breakin feature?"""
 
4586
 
 
4587
    def _probe(self):
 
4588
        from bzrlib import breakin
 
4589
        if breakin.determine_signal() is None:
 
4590
            return False
 
4591
        if sys.platform == 'win32':
 
4592
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4593
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4594
            # access the function
 
4595
            try:
 
4596
                import ctypes
 
4597
            except OSError:
 
4598
                return False
 
4599
        return True
 
4600
 
 
4601
    def feature_name(self):
 
4602
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4603
 
 
4604
 
 
4605
BreakinFeature = _BreakinFeature()
 
4606
 
 
4607
 
 
4608
class _CaseInsCasePresFilenameFeature(Feature):
 
4609
    """Is the file-system case insensitive, but case-preserving?"""
 
4610
 
 
4611
    def _probe(self):
 
4612
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4613
        try:
 
4614
            # first check truly case-preserving for created files, then check
 
4615
            # case insensitive when opening existing files.
 
4616
            name = osutils.normpath(name)
 
4617
            base, rel = osutils.split(name)
 
4618
            found_rel = osutils.canonical_relpath(base, name)
 
4619
            return (found_rel == rel
 
4620
                    and os.path.isfile(name.upper())
 
4621
                    and os.path.isfile(name.lower()))
 
4622
        finally:
 
4623
            os.close(fileno)
 
4624
            os.remove(name)
 
4625
 
 
4626
    def feature_name(self):
 
4627
        return "case-insensitive case-preserving filesystem"
 
4628
 
 
4629
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4630
 
 
4631
 
 
4632
class _CaseInsensitiveFilesystemFeature(Feature):
 
4633
    """Check if underlying filesystem is case-insensitive but *not* case
 
4634
    preserving.
 
4635
    """
 
4636
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4637
    # more likely to be case preserving, so this case is rare.
 
4638
 
 
4639
    def _probe(self):
 
4640
        if CaseInsCasePresFilenameFeature.available():
 
4641
            return False
 
4642
 
 
4643
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4644
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4645
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4646
        else:
 
4647
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4648
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4649
            dir=root)
 
4650
        name_a = osutils.pathjoin(tdir, 'a')
 
4651
        name_A = osutils.pathjoin(tdir, 'A')
 
4652
        os.mkdir(name_a)
 
4653
        result = osutils.isdir(name_A)
 
4654
        _rmtree_temp_dir(tdir)
 
4655
        return result
 
4656
 
 
4657
    def feature_name(self):
 
4658
        return 'case-insensitive filesystem'
 
4659
 
 
4660
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4661
 
 
4662
 
 
4663
class _CaseSensitiveFilesystemFeature(Feature):
 
4664
 
 
4665
    def _probe(self):
 
4666
        if CaseInsCasePresFilenameFeature.available():
 
4667
            return False
 
4668
        elif CaseInsensitiveFilesystemFeature.available():
 
4669
            return False
 
4670
        else:
 
4671
            return True
 
4672
 
 
4673
    def feature_name(self):
 
4674
        return 'case-sensitive filesystem'
 
4675
 
 
4676
# new coding style is for feature instances to be lowercase
 
4677
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4678
 
 
4679
 
4422
4680
# Only define SubUnitBzrRunner if subunit is available.
4423
4681
try:
4424
4682
    from subunit import TestProtocolClient
4442
4700
except ImportError:
4443
4701
    pass
4444
4702
 
4445
 
 
4446
 
@deprecated_function(deprecated_in((2, 5, 0)))
4447
 
def ModuleAvailableFeature(name):
4448
 
    from bzrlib.tests import features
4449
 
    return features.ModuleAvailableFeature(name)
4450
 
    
 
4703
class _PosixPermissionsFeature(Feature):
 
4704
 
 
4705
    def _probe(self):
 
4706
        def has_perms():
 
4707
            # create temporary file and check if specified perms are maintained.
 
4708
            import tempfile
 
4709
 
 
4710
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4711
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4712
            fd, name = f
 
4713
            os.close(fd)
 
4714
            os.chmod(name, write_perms)
 
4715
 
 
4716
            read_perms = os.stat(name).st_mode & 0777
 
4717
            os.unlink(name)
 
4718
            return (write_perms == read_perms)
 
4719
 
 
4720
        return (os.name == 'posix') and has_perms()
 
4721
 
 
4722
    def feature_name(self):
 
4723
        return 'POSIX permissions support'
 
4724
 
 
4725
posix_permissions_feature = _PosixPermissionsFeature()