~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Vincent Ladeuil
  • Date: 2011-05-31 10:01:15 UTC
  • mto: (5993.1.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 5994.
  • Revision ID: v.ladeuil+lp@free.fr-20110531100115-bkea029bq5oki70l
Put the '\n' back into the formats and fix tests accordingly (reducing code duplication).

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
88
94
    memory,
89
95
    pathfilter,
90
96
    )
91
 
from bzrlib.symbol_versioning import (
92
 
    deprecated_function,
93
 
    deprecated_in,
94
 
    )
95
97
from bzrlib.tests import (
96
98
    test_server,
97
99
    TestUtil,
140
142
    'BZREMAIL': None, # may still be present in the environment
141
143
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
142
144
    'BZR_PROGRESS_BAR': None,
143
 
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
144
 
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
145
 
    # TestCase should not use disk resources, BZR_LOG is one.
146
 
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
145
    'BZR_LOG': None,
147
146
    'BZR_PLUGIN_PATH': None,
148
147
    'BZR_DISABLE_PLUGINS': None,
149
148
    'BZR_PLUGINS_AT': None,
929
928
 
930
929
    The method is really a factory and users are expected to use it as such.
931
930
    """
932
 
 
 
931
    
933
932
    kwargs['setUp'] = isolated_doctest_setUp
934
933
    kwargs['tearDown'] = isolated_doctest_tearDown
935
934
    return doctest.DocTestSuite(*args, **kwargs)
989
988
        # settled on or a the FIXME associated with _get_expand_default_value
990
989
        # is addressed -- vila 20110219
991
990
        self.overrideAttr(config, '_expand_default_value', None)
992
 
        self._log_files = set()
993
 
        # Each key in the ``_counters`` dict holds a value for a different
994
 
        # counter. When the test ends, addDetail() should be used to output the
995
 
        # counter values. This happens in install_counter_hook().
996
 
        self._counters = {}
997
 
        if 'config_stats' in selftest_debug_flags:
998
 
            self._install_config_stats_hooks()
999
991
 
1000
992
    def debug(self):
1001
993
        # debug a frame up.
1018
1010
        if name in details:
1019
1011
            del details[name]
1020
1012
 
1021
 
    def install_counter_hook(self, hooks, name, counter_name=None):
1022
 
        """Install a counting hook.
1023
 
 
1024
 
        Any hook can be counted as long as it doesn't need to return a value.
1025
 
 
1026
 
        :param hooks: Where the hook should be installed.
1027
 
 
1028
 
        :param name: The hook name that will be counted.
1029
 
 
1030
 
        :param counter_name: The counter identifier in ``_counters``, defaults
1031
 
            to ``name``.
1032
 
        """
1033
 
        _counters = self._counters # Avoid closing over self
1034
 
        if counter_name is None:
1035
 
            counter_name = name
1036
 
        if _counters.has_key(counter_name):
1037
 
            raise AssertionError('%s is already used as a counter name'
1038
 
                                  % (counter_name,))
1039
 
        _counters[counter_name] = 0
1040
 
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1041
 
            lambda: ['%d' % (_counters[counter_name],)]))
1042
 
        def increment_counter(*args, **kwargs):
1043
 
            _counters[counter_name] += 1
1044
 
        label = 'count %s calls' % (counter_name,)
1045
 
        hooks.install_named_hook(name, increment_counter, label)
1046
 
        self.addCleanup(hooks.uninstall_named_hook, name, label)
1047
 
 
1048
 
    def _install_config_stats_hooks(self):
1049
 
        """Install config hooks to count hook calls.
1050
 
 
1051
 
        """
1052
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1053
 
            self.install_counter_hook(config.ConfigHooks, hook_name,
1054
 
                                       'config.%s' % (hook_name,))
1055
 
 
1056
 
        # The OldConfigHooks are private and need special handling to protect
1057
 
        # against recursive tests (tests that run other tests), so we just do
1058
 
        # manually what registering them into _builtin_known_hooks will provide
1059
 
        # us.
1060
 
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1061
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1062
 
            self.install_counter_hook(config.OldConfigHooks, hook_name,
1063
 
                                      'old_config.%s' % (hook_name,))
1064
 
 
1065
1013
    def _clear_debug_flags(self):
1066
1014
        """Prevent externally set debug flags affecting tests.
1067
1015
 
1121
1069
        # break some locks on purpose and should be taken into account by
1122
1070
        # considering that breaking a lock is just a dirty way of releasing it.
1123
1071
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1124
 
            message = (
1125
 
                'Different number of acquired and '
1126
 
                'released or broken locks.\n'
1127
 
                'acquired=%s\n'
1128
 
                'released=%s\n'
1129
 
                'broken=%s\n' %
1130
 
                (acquired_locks, released_locks, broken_locks))
 
1072
            message = ('Different number of acquired and '
 
1073
                       'released or broken locks. (%s, %s + %s)' %
 
1074
                       (acquired_locks, released_locks, broken_locks))
1131
1075
            if not self._lock_check_thorough:
1132
1076
                # Rather than fail, just warn
1133
1077
                print "Broken test %s: %s" % (self, message)
1161
1105
 
1162
1106
    def permit_dir(self, name):
1163
1107
        """Permit a directory to be used by this test. See permit_url."""
1164
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1108
        name_transport = _mod_transport.get_transport(name)
1165
1109
        self.permit_url(name)
1166
1110
        self.permit_url(name_transport.base)
1167
1111
 
1246
1190
        self.addCleanup(transport_server.stop_server)
1247
1191
        # Obtain a real transport because if the server supplies a password, it
1248
1192
        # will be hidden from the base on the client side.
1249
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1193
        t = _mod_transport.get_transport(transport_server.get_url())
1250
1194
        # Some transport servers effectively chroot the backing transport;
1251
1195
        # others like SFTPServer don't - users of the transport can walk up the
1252
1196
        # transport to read the entire backing transport. This wouldn't matter
1602
1546
        not other callers that go direct to the warning module.
1603
1547
 
1604
1548
        To test that a deprecated method raises an error, do something like
1605
 
        this (remember that both assertRaises and applyDeprecated delays *args
1606
 
        and **kwargs passing)::
 
1549
        this::
1607
1550
 
1608
1551
            self.assertRaises(errors.ReservedId,
1609
1552
                self.applyDeprecated,
1694
1637
        pseudo_log_file = StringIO()
1695
1638
        def _get_log_contents_for_weird_testtools_api():
1696
1639
            return [pseudo_log_file.getvalue().decode(
1697
 
                "utf-8", "replace").encode("utf-8")]
 
1640
                "utf-8", "replace").encode("utf-8")]          
1698
1641
        self.addDetail("log", content.Content(content.ContentType("text",
1699
1642
            "plain", {"charset": "utf8"}),
1700
1643
            _get_log_contents_for_weird_testtools_api))
1705
1648
    def _finishLogFile(self):
1706
1649
        """Finished with the log file.
1707
1650
 
1708
 
        Close the file and delete it.
 
1651
        Close the file and delete it, unless setKeepLogfile was called.
1709
1652
        """
1710
1653
        if trace._trace_file:
1711
1654
            # flush the log file, to get all content
1728
1671
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1729
1672
        """Overrides an object attribute restoring it after the test.
1730
1673
 
1731
 
        :note: This should be used with discretion; you should think about
1732
 
        whether it's better to make the code testable without monkey-patching.
1733
 
 
1734
1674
        :param obj: The object that will be mutated.
1735
1675
 
1736
1676
        :param attr_name: The attribute name we want to preserve/override in
1761
1701
        self.addCleanup(osutils.set_or_unset_env, name, value)
1762
1702
        return value
1763
1703
 
1764
 
    def recordCalls(self, obj, attr_name):
1765
 
        """Monkeypatch in a wrapper that will record calls.
1766
 
 
1767
 
        The monkeypatch is automatically removed when the test concludes.
1768
 
 
1769
 
        :param obj: The namespace holding the reference to be replaced;
1770
 
            typically a module, class, or object.
1771
 
        :param attr_name: A string for the name of the attribute to 
1772
 
            patch.
1773
 
        :returns: A list that will be extended with one item every time the
1774
 
            function is called, with a tuple of (args, kwargs).
1775
 
        """
1776
 
        calls = []
1777
 
 
1778
 
        def decorator(*args, **kwargs):
1779
 
            calls.append((args, kwargs))
1780
 
            return orig(*args, **kwargs)
1781
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1782
 
        return calls
1783
 
 
1784
1704
    def _cleanEnvironment(self):
1785
1705
        for name, value in isolated_environ.iteritems():
1786
1706
            self.overrideEnv(name, value)
1793
1713
        self._preserved_lazy_hooks.clear()
1794
1714
 
1795
1715
    def knownFailure(self, reason):
1796
 
        """Declare that this test fails for a known reason
1797
 
 
1798
 
        Tests that are known to fail should generally be using expectedFailure
1799
 
        with an appropriate reverse assertion if a change could cause the test
1800
 
        to start passing. Conversely if the test has no immediate prospect of
1801
 
        succeeding then using skip is more suitable.
1802
 
 
1803
 
        When this method is called while an exception is being handled, that
1804
 
        traceback will be used, otherwise a new exception will be thrown to
1805
 
        provide one but won't be reported.
1806
 
        """
1807
 
        self._add_reason(reason)
1808
 
        try:
1809
 
            exc_info = sys.exc_info()
1810
 
            if exc_info != (None, None, None):
1811
 
                self._report_traceback(exc_info)
1812
 
            else:
1813
 
                try:
1814
 
                    raise self.failureException(reason)
1815
 
                except self.failureException:
1816
 
                    exc_info = sys.exc_info()
1817
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1818
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1819
 
        finally:
1820
 
            del exc_info
 
1716
        """This test has failed for some known reason."""
 
1717
        raise KnownFailure(reason)
1821
1718
 
1822
1719
    def _suppress_log(self):
1823
1720
        """Remove the log info from details."""
2168
2065
            # so we will avoid using it on all platforms, just to
2169
2066
            # make sure the code path is used, and we don't break on win32
2170
2067
            cleanup_environment()
2171
 
            # Include the subprocess's log file in the test details, in case
2172
 
            # the test fails due to an error in the subprocess.
2173
 
            self._add_subprocess_log(trace._get_bzr_log_filename())
2174
2068
            command = [sys.executable]
2175
2069
            # frozen executables don't need the path to bzr
2176
2070
            if getattr(sys, "frozen", None) is None:
2188
2082
 
2189
2083
        return process
2190
2084
 
2191
 
    def _add_subprocess_log(self, log_file_path):
2192
 
        if len(self._log_files) == 0:
2193
 
            # Register an addCleanup func.  We do this on the first call to
2194
 
            # _add_subprocess_log rather than in TestCase.setUp so that this
2195
 
            # addCleanup is registered after any cleanups for tempdirs that
2196
 
            # subclasses might create, which will probably remove the log file
2197
 
            # we want to read.
2198
 
            self.addCleanup(self._subprocess_log_cleanup)
2199
 
        # self._log_files is a set, so if a log file is reused we won't grab it
2200
 
        # twice.
2201
 
        self._log_files.add(log_file_path)
2202
 
 
2203
 
    def _subprocess_log_cleanup(self):
2204
 
        for count, log_file_path in enumerate(self._log_files):
2205
 
            # We use buffer_now=True to avoid holding the file open beyond
2206
 
            # the life of this function, which might interfere with e.g.
2207
 
            # cleaning tempdirs on Windows.
2208
 
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2209
 
            #detail_content = content.content_from_file(
2210
 
            #    log_file_path, buffer_now=True)
2211
 
            with open(log_file_path, 'rb') as log_file:
2212
 
                log_file_bytes = log_file.read()
2213
 
            detail_content = content.Content(content.ContentType("text",
2214
 
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2215
 
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2216
 
                detail_content)
2217
 
 
2218
2085
    def _popen(self, *args, **kwargs):
2219
2086
        """Place a call to Popen.
2220
2087
 
2372
2239
class TestCaseWithMemoryTransport(TestCase):
2373
2240
    """Common test class for tests that do not need disk resources.
2374
2241
 
2375
 
    Tests that need disk resources should derive from TestCaseInTempDir
2376
 
    orTestCaseWithTransport.
 
2242
    Tests that need disk resources should derive from TestCaseWithTransport.
2377
2243
 
2378
2244
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2379
2245
 
2380
 
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2246
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2381
2247
    a directory which does not exist. This serves to help ensure test isolation
2382
 
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2383
 
    must exist. However, TestCaseWithMemoryTransport does not offer local file
2384
 
    defaults for the transport in tests, nor does it obey the command line
 
2248
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2249
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2250
    file defaults for the transport in tests, nor does it obey the command line
2385
2251
    override, so tests that accidentally write to the common directory should
2386
2252
    be rare.
2387
2253
 
2388
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
2389
 
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2254
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2255
    a .bzr directory that stops us ascending higher into the filesystem.
2390
2256
    """
2391
2257
 
2392
2258
    TEST_ROOT = None
2410
2276
 
2411
2277
        :param relpath: a path relative to the base url.
2412
2278
        """
2413
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2279
        t = _mod_transport.get_transport(self.get_url(relpath))
2414
2280
        self.assertFalse(t.is_readonly())
2415
2281
        return t
2416
2282
 
2549
2415
        real branch.
2550
2416
        """
2551
2417
        root = TestCaseWithMemoryTransport.TEST_ROOT
2552
 
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2553
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2554
 
        # we don't need to re-open the wt to check it hasn't changed.
2555
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2556
 
            wt.control_transport.get_bytes('dirstate'))
 
2418
        bzrdir.BzrDir.create_standalone_workingtree(root)
2557
2419
 
2558
2420
    def _check_safety_net(self):
2559
2421
        """Check that the safety .bzr directory have not been touched.
2562
2424
        propagating. This method ensures than a test did not leaked.
2563
2425
        """
2564
2426
        root = TestCaseWithMemoryTransport.TEST_ROOT
2565
 
        t = _mod_transport.get_transport(root)
2566
 
        self.permit_url(t.base)
2567
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2568
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2427
        self.permit_url(_mod_transport.get_transport(root).base)
 
2428
        wt = workingtree.WorkingTree.open(root)
 
2429
        last_rev = wt.last_revision()
 
2430
        if last_rev != 'null:':
2569
2431
            # The current test have modified the /bzr directory, we need to
2570
2432
            # recreate a new one or all the followng tests will fail.
2571
2433
            # If you need to inspect its content uncomment the following line
2641
2503
            backing_server = self.get_server()
2642
2504
        smart_server = test_server.SmartTCPServer_for_testing()
2643
2505
        self.start_server(smart_server, backing_server)
2644
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2506
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2645
2507
                                                   ).clone(path)
2646
2508
        return remote_transport
2647
2509
 
2664
2526
    def setUp(self):
2665
2527
        super(TestCaseWithMemoryTransport, self).setUp()
2666
2528
        # Ensure that ConnectedTransport doesn't leak sockets
2667
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2668
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2529
        def get_transport_with_cleanup(*args, **kwargs):
 
2530
            t = orig_get_transport(*args, **kwargs)
2669
2531
            if isinstance(t, _mod_transport.ConnectedTransport):
2670
2532
                self.addCleanup(t.disconnect)
2671
2533
            return t
2672
2534
 
2673
 
        orig_get_transport_from_url = self.overrideAttr(
2674
 
            _mod_transport, 'get_transport_from_url',
2675
 
            get_transport_from_url_with_cleanup)
 
2535
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2536
                                               get_transport_with_cleanup)
2676
2537
        self._make_test_root()
2677
2538
        self.addCleanup(os.chdir, os.getcwdu())
2678
2539
        self.makeAndChdirToTestDir()
2721
2582
 
2722
2583
    OVERRIDE_PYTHON = 'python'
2723
2584
 
2724
 
    def setUp(self):
2725
 
        super(TestCaseInTempDir, self).setUp()
2726
 
        # Remove the protection set in isolated_environ, we have a proper
2727
 
        # access to disk resources now.
2728
 
        self.overrideEnv('BZR_LOG', None)
2729
 
 
2730
2585
    def check_file_contents(self, filename, expect):
2731
2586
        self.log("check contents of file %s" % filename)
2732
2587
        f = file(filename)
3623
3478
#                           with proper exclusion rules.
3624
3479
#   -Ethreads               Will display thread ident at creation/join time to
3625
3480
#                           help track thread leaks
3626
 
 
3627
 
#   -Econfig_stats          Will collect statistics using addDetail
3628
3481
selftest_debug_flags = set()
3629
3482
 
3630
3483
 
3937
3790
        'bzrlib.tests.test_export',
3938
3791
        'bzrlib.tests.test_export_pot',
3939
3792
        'bzrlib.tests.test_extract',
3940
 
        'bzrlib.tests.test_features',
3941
3793
        'bzrlib.tests.test_fetch',
3942
3794
        'bzrlib.tests.test_fixtures',
3943
3795
        'bzrlib.tests.test_fifo_cache',
3944
3796
        'bzrlib.tests.test_filters',
3945
 
        'bzrlib.tests.test_filter_tree',
3946
3797
        'bzrlib.tests.test_ftp_transport',
3947
3798
        'bzrlib.tests.test_foreign',
3948
3799
        'bzrlib.tests.test_generate_docs',
4339
4190
        the module is available.
4340
4191
    """
4341
4192
 
4342
 
    from bzrlib.tests.features import ModuleAvailableFeature
4343
4193
    py_module = pyutils.get_named_object(py_module_name)
4344
4194
    scenarios = [
4345
4195
        ('python', {'module': py_module}),
4386
4236
                         % (os.path.basename(dirname), printable_e))
4387
4237
 
4388
4238
 
 
4239
class Feature(object):
 
4240
    """An operating system Feature."""
 
4241
 
 
4242
    def __init__(self):
 
4243
        self._available = None
 
4244
 
 
4245
    def available(self):
 
4246
        """Is the feature available?
 
4247
 
 
4248
        :return: True if the feature is available.
 
4249
        """
 
4250
        if self._available is None:
 
4251
            self._available = self._probe()
 
4252
        return self._available
 
4253
 
 
4254
    def _probe(self):
 
4255
        """Implement this method in concrete features.
 
4256
 
 
4257
        :return: True if the feature is available.
 
4258
        """
 
4259
        raise NotImplementedError
 
4260
 
 
4261
    def __str__(self):
 
4262
        if getattr(self, 'feature_name', None):
 
4263
            return self.feature_name()
 
4264
        return self.__class__.__name__
 
4265
 
 
4266
 
 
4267
class _SymlinkFeature(Feature):
 
4268
 
 
4269
    def _probe(self):
 
4270
        return osutils.has_symlinks()
 
4271
 
 
4272
    def feature_name(self):
 
4273
        return 'symlinks'
 
4274
 
 
4275
SymlinkFeature = _SymlinkFeature()
 
4276
 
 
4277
 
 
4278
class _HardlinkFeature(Feature):
 
4279
 
 
4280
    def _probe(self):
 
4281
        return osutils.has_hardlinks()
 
4282
 
 
4283
    def feature_name(self):
 
4284
        return 'hardlinks'
 
4285
 
 
4286
HardlinkFeature = _HardlinkFeature()
 
4287
 
 
4288
 
 
4289
class _OsFifoFeature(Feature):
 
4290
 
 
4291
    def _probe(self):
 
4292
        return getattr(os, 'mkfifo', None)
 
4293
 
 
4294
    def feature_name(self):
 
4295
        return 'filesystem fifos'
 
4296
 
 
4297
OsFifoFeature = _OsFifoFeature()
 
4298
 
 
4299
 
 
4300
class _UnicodeFilenameFeature(Feature):
 
4301
    """Does the filesystem support Unicode filenames?"""
 
4302
 
 
4303
    def _probe(self):
 
4304
        try:
 
4305
            # Check for character combinations unlikely to be covered by any
 
4306
            # single non-unicode encoding. We use the characters
 
4307
            # - greek small letter alpha (U+03B1) and
 
4308
            # - braille pattern dots-123456 (U+283F).
 
4309
            os.stat(u'\u03b1\u283f')
 
4310
        except UnicodeEncodeError:
 
4311
            return False
 
4312
        except (IOError, OSError):
 
4313
            # The filesystem allows the Unicode filename but the file doesn't
 
4314
            # exist.
 
4315
            return True
 
4316
        else:
 
4317
            # The filesystem allows the Unicode filename and the file exists,
 
4318
            # for some reason.
 
4319
            return True
 
4320
 
 
4321
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4322
 
 
4323
 
 
4324
class _CompatabilityThunkFeature(Feature):
 
4325
    """This feature is just a thunk to another feature.
 
4326
 
 
4327
    It issues a deprecation warning if it is accessed, to let you know that you
 
4328
    should really use a different feature.
 
4329
    """
 
4330
 
 
4331
    def __init__(self, dep_version, module, name,
 
4332
                 replacement_name, replacement_module=None):
 
4333
        super(_CompatabilityThunkFeature, self).__init__()
 
4334
        self._module = module
 
4335
        if replacement_module is None:
 
4336
            replacement_module = module
 
4337
        self._replacement_module = replacement_module
 
4338
        self._name = name
 
4339
        self._replacement_name = replacement_name
 
4340
        self._dep_version = dep_version
 
4341
        self._feature = None
 
4342
 
 
4343
    def _ensure(self):
 
4344
        if self._feature is None:
 
4345
            depr_msg = self._dep_version % ('%s.%s'
 
4346
                                            % (self._module, self._name))
 
4347
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4348
                                               self._replacement_name)
 
4349
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4350
            # Import the new feature and use it as a replacement for the
 
4351
            # deprecated one.
 
4352
            self._feature = pyutils.get_named_object(
 
4353
                self._replacement_module, self._replacement_name)
 
4354
 
 
4355
    def _probe(self):
 
4356
        self._ensure()
 
4357
        return self._feature._probe()
 
4358
 
 
4359
 
 
4360
class ModuleAvailableFeature(Feature):
 
4361
    """This is a feature than describes a module we want to be available.
 
4362
 
 
4363
    Declare the name of the module in __init__(), and then after probing, the
 
4364
    module will be available as 'self.module'.
 
4365
 
 
4366
    :ivar module: The module if it is available, else None.
 
4367
    """
 
4368
 
 
4369
    def __init__(self, module_name):
 
4370
        super(ModuleAvailableFeature, self).__init__()
 
4371
        self.module_name = module_name
 
4372
 
 
4373
    def _probe(self):
 
4374
        try:
 
4375
            self._module = __import__(self.module_name, {}, {}, [''])
 
4376
            return True
 
4377
        except ImportError:
 
4378
            return False
 
4379
 
 
4380
    @property
 
4381
    def module(self):
 
4382
        if self.available(): # Make sure the probe has been done
 
4383
            return self._module
 
4384
        return None
 
4385
 
 
4386
    def feature_name(self):
 
4387
        return self.module_name
 
4388
 
 
4389
 
4389
4390
def probe_unicode_in_user_encoding():
4390
4391
    """Try to encode several unicode strings to use in unicode-aware tests.
4391
4392
    Return first successfull match.
4419
4420
    return None
4420
4421
 
4421
4422
 
 
4423
class _HTTPSServerFeature(Feature):
 
4424
    """Some tests want an https Server, check if one is available.
 
4425
 
 
4426
    Right now, the only way this is available is under python2.6 which provides
 
4427
    an ssl module.
 
4428
    """
 
4429
 
 
4430
    def _probe(self):
 
4431
        try:
 
4432
            import ssl
 
4433
            return True
 
4434
        except ImportError:
 
4435
            return False
 
4436
 
 
4437
    def feature_name(self):
 
4438
        return 'HTTPSServer'
 
4439
 
 
4440
 
 
4441
HTTPSServerFeature = _HTTPSServerFeature()
 
4442
 
 
4443
 
 
4444
class _UnicodeFilename(Feature):
 
4445
    """Does the filesystem support Unicode filenames?"""
 
4446
 
 
4447
    def _probe(self):
 
4448
        try:
 
4449
            os.stat(u'\u03b1')
 
4450
        except UnicodeEncodeError:
 
4451
            return False
 
4452
        except (IOError, OSError):
 
4453
            # The filesystem allows the Unicode filename but the file doesn't
 
4454
            # exist.
 
4455
            return True
 
4456
        else:
 
4457
            # The filesystem allows the Unicode filename and the file exists,
 
4458
            # for some reason.
 
4459
            return True
 
4460
 
 
4461
UnicodeFilename = _UnicodeFilename()
 
4462
 
 
4463
 
 
4464
class _ByteStringNamedFilesystem(Feature):
 
4465
    """Is the filesystem based on bytes?"""
 
4466
 
 
4467
    def _probe(self):
 
4468
        if os.name == "posix":
 
4469
            return True
 
4470
        return False
 
4471
 
 
4472
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4473
 
 
4474
 
 
4475
class _UTF8Filesystem(Feature):
 
4476
    """Is the filesystem UTF-8?"""
 
4477
 
 
4478
    def _probe(self):
 
4479
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4480
            return True
 
4481
        return False
 
4482
 
 
4483
UTF8Filesystem = _UTF8Filesystem()
 
4484
 
 
4485
 
 
4486
class _BreakinFeature(Feature):
 
4487
    """Does this platform support the breakin feature?"""
 
4488
 
 
4489
    def _probe(self):
 
4490
        from bzrlib import breakin
 
4491
        if breakin.determine_signal() is None:
 
4492
            return False
 
4493
        if sys.platform == 'win32':
 
4494
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4495
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4496
            # access the function
 
4497
            try:
 
4498
                import ctypes
 
4499
            except OSError:
 
4500
                return False
 
4501
        return True
 
4502
 
 
4503
    def feature_name(self):
 
4504
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4505
 
 
4506
 
 
4507
BreakinFeature = _BreakinFeature()
 
4508
 
 
4509
 
 
4510
class _CaseInsCasePresFilenameFeature(Feature):
 
4511
    """Is the file-system case insensitive, but case-preserving?"""
 
4512
 
 
4513
    def _probe(self):
 
4514
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4515
        try:
 
4516
            # first check truly case-preserving for created files, then check
 
4517
            # case insensitive when opening existing files.
 
4518
            name = osutils.normpath(name)
 
4519
            base, rel = osutils.split(name)
 
4520
            found_rel = osutils.canonical_relpath(base, name)
 
4521
            return (found_rel == rel
 
4522
                    and os.path.isfile(name.upper())
 
4523
                    and os.path.isfile(name.lower()))
 
4524
        finally:
 
4525
            os.close(fileno)
 
4526
            os.remove(name)
 
4527
 
 
4528
    def feature_name(self):
 
4529
        return "case-insensitive case-preserving filesystem"
 
4530
 
 
4531
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4532
 
 
4533
 
 
4534
class _CaseInsensitiveFilesystemFeature(Feature):
 
4535
    """Check if underlying filesystem is case-insensitive but *not* case
 
4536
    preserving.
 
4537
    """
 
4538
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4539
    # more likely to be case preserving, so this case is rare.
 
4540
 
 
4541
    def _probe(self):
 
4542
        if CaseInsCasePresFilenameFeature.available():
 
4543
            return False
 
4544
 
 
4545
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4546
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4547
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4548
        else:
 
4549
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4550
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4551
            dir=root)
 
4552
        name_a = osutils.pathjoin(tdir, 'a')
 
4553
        name_A = osutils.pathjoin(tdir, 'A')
 
4554
        os.mkdir(name_a)
 
4555
        result = osutils.isdir(name_A)
 
4556
        _rmtree_temp_dir(tdir)
 
4557
        return result
 
4558
 
 
4559
    def feature_name(self):
 
4560
        return 'case-insensitive filesystem'
 
4561
 
 
4562
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4563
 
 
4564
 
 
4565
class _CaseSensitiveFilesystemFeature(Feature):
 
4566
 
 
4567
    def _probe(self):
 
4568
        if CaseInsCasePresFilenameFeature.available():
 
4569
            return False
 
4570
        elif CaseInsensitiveFilesystemFeature.available():
 
4571
            return False
 
4572
        else:
 
4573
            return True
 
4574
 
 
4575
    def feature_name(self):
 
4576
        return 'case-sensitive filesystem'
 
4577
 
 
4578
# new coding style is for feature instances to be lowercase
 
4579
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4580
 
 
4581
 
4422
4582
# Only define SubUnitBzrRunner if subunit is available.
4423
4583
try:
4424
4584
    from subunit import TestProtocolClient
4442
4602
except ImportError:
4443
4603
    pass
4444
4604
 
4445
 
 
4446
 
@deprecated_function(deprecated_in((2, 5, 0)))
4447
 
def ModuleAvailableFeature(name):
4448
 
    from bzrlib.tests import features
4449
 
    return features.ModuleAvailableFeature(name)
4450
 
    
 
4605
class _PosixPermissionsFeature(Feature):
 
4606
 
 
4607
    def _probe(self):
 
4608
        def has_perms():
 
4609
            # create temporary file and check if specified perms are maintained.
 
4610
            import tempfile
 
4611
 
 
4612
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4613
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4614
            fd, name = f
 
4615
            os.close(fd)
 
4616
            os.chmod(name, write_perms)
 
4617
 
 
4618
            read_perms = os.stat(name).st_mode & 0777
 
4619
            os.unlink(name)
 
4620
            return (write_perms == read_perms)
 
4621
 
 
4622
        return (os.name == 'posix') and has_perms()
 
4623
 
 
4624
    def feature_name(self):
 
4625
        return 'POSIX permissions support'
 
4626
 
 
4627
posix_permissions_feature = _PosixPermissionsFeature()