~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

(gz) Remove bzrlib/util/elementtree/ package (Martin Packman)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
 
19
from __future__ import absolute_import
24
20
 
25
21
# NOTE: Some classes in here use camelCaseNaming() rather than
26
22
# underscore_naming().  That's for consistency with unittest; it's not the
42
38
import random
43
39
import re
44
40
import shlex
 
41
import site
45
42
import stat
46
43
import subprocess
47
44
import sys
67
64
    chk_map,
68
65
    commands as _mod_commands,
69
66
    config,
 
67
    i18n,
70
68
    debug,
71
69
    errors,
72
70
    hooks,
94
92
    memory,
95
93
    pathfilter,
96
94
    )
 
95
from bzrlib.symbol_versioning import (
 
96
    deprecated_function,
 
97
    deprecated_in,
 
98
    )
97
99
from bzrlib.tests import (
 
100
    fixtures,
98
101
    test_server,
99
102
    TestUtil,
100
103
    treeshape,
101
104
    )
102
105
from bzrlib.ui import NullProgressView
103
106
from bzrlib.ui.text import TextUIFactory
 
107
from bzrlib.tests.features import _CompatabilityThunkFeature
104
108
 
105
109
# Mark this python module as being part of the implementation
106
110
# of unittest: this gives us better tracebacks where the last
142
146
    'BZREMAIL': None, # may still be present in the environment
143
147
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
148
    'BZR_PROGRESS_BAR': None,
145
 
    'BZR_LOG': None,
 
149
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
150
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
151
    # TestCase should not use disk resources, BZR_LOG is one.
 
152
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
146
153
    'BZR_PLUGIN_PATH': None,
147
154
    'BZR_DISABLE_PLUGINS': None,
148
155
    'BZR_PLUGINS_AT': None,
210
217
        osutils.set_or_unset_env(var, value)
211
218
 
212
219
 
 
220
def _clear__type_equality_funcs(test):
 
221
    """Cleanup bound methods stored on TestCase instances
 
222
 
 
223
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
224
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
225
 
 
226
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
227
    shipped in Oneiric, an object with no clear method was used, hence the
 
228
    extra complications, see bug 809048 for details.
 
229
    """
 
230
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
231
    if type_equality_funcs is not None:
 
232
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
233
        if tef_clear is None:
 
234
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
235
            if tef_instance_dict is not None:
 
236
                tef_clear = tef_instance_dict.clear
 
237
        if tef_clear is not None:
 
238
            tef_clear()
 
239
 
 
240
 
213
241
class ExtendedTestResult(testtools.TextTestResult):
214
242
    """Accepts, reports and accumulates the results of running tests.
215
243
 
383
411
        getDetails = getattr(test, "getDetails", None)
384
412
        if getDetails is not None:
385
413
            getDetails().clear()
386
 
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
387
 
        if type_equality_funcs is not None:
388
 
            type_equality_funcs.clear()
 
414
        _clear__type_equality_funcs(test)
389
415
        self._traceback_from_test = None
390
416
 
391
417
    def startTests(self):
492
518
        self.not_applicable_count += 1
493
519
        self.report_not_applicable(test, reason)
494
520
 
 
521
    def _count_stored_tests(self):
 
522
        """Count of tests instances kept alive due to not succeeding"""
 
523
        return self.error_count + self.failure_count + self.known_failure_count
 
524
 
495
525
    def _post_mortem(self, tb=None):
496
526
        """Start a PDB post mortem session."""
497
527
        if os.environ.get('BZR_TEST_PDB', None):
928
958
 
929
959
    The method is really a factory and users are expected to use it as such.
930
960
    """
931
 
    
 
961
 
932
962
    kwargs['setUp'] = isolated_doctest_setUp
933
963
    kwargs['tearDown'] = isolated_doctest_tearDown
934
964
    return doctest.DocTestSuite(*args, **kwargs)
969
999
 
970
1000
    def setUp(self):
971
1001
        super(TestCase, self).setUp()
 
1002
 
 
1003
        timeout = config.GlobalStack().get('selftest.timeout')
 
1004
        if timeout:
 
1005
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1006
            timeout_fixture.setUp()
 
1007
            self.addCleanup(timeout_fixture.cleanUp)
 
1008
 
972
1009
        for feature in getattr(self, '_test_needs_features', []):
973
1010
            self.requireFeature(feature)
974
1011
        self._cleanEnvironment()
 
1012
 
 
1013
        if bzrlib.global_state is not None:
 
1014
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1015
                              config.CommandLineStore())
 
1016
 
975
1017
        self._silenceUI()
976
1018
        self._startLogFile()
977
1019
        self._benchcalls = []
989
1031
        # is addressed -- vila 20110219
990
1032
        self.overrideAttr(config, '_expand_default_value', None)
991
1033
        self._log_files = set()
 
1034
        # Each key in the ``_counters`` dict holds a value for a different
 
1035
        # counter. When the test ends, addDetail() should be used to output the
 
1036
        # counter values. This happens in install_counter_hook().
 
1037
        self._counters = {}
 
1038
        if 'config_stats' in selftest_debug_flags:
 
1039
            self._install_config_stats_hooks()
 
1040
        # Do not use i18n for tests (unless the test reverses this)
 
1041
        i18n.disable_i18n()
992
1042
 
993
1043
    def debug(self):
994
1044
        # debug a frame up.
995
1045
        import pdb
996
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
1046
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1047
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1048
                ).set_trace(sys._getframe().f_back)
997
1049
 
998
1050
    def discardDetail(self, name):
999
1051
        """Extend the addDetail, getDetails api so we can remove a detail.
1011
1063
        if name in details:
1012
1064
            del details[name]
1013
1065
 
 
1066
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1067
        """Install a counting hook.
 
1068
 
 
1069
        Any hook can be counted as long as it doesn't need to return a value.
 
1070
 
 
1071
        :param hooks: Where the hook should be installed.
 
1072
 
 
1073
        :param name: The hook name that will be counted.
 
1074
 
 
1075
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1076
            to ``name``.
 
1077
        """
 
1078
        _counters = self._counters # Avoid closing over self
 
1079
        if counter_name is None:
 
1080
            counter_name = name
 
1081
        if _counters.has_key(counter_name):
 
1082
            raise AssertionError('%s is already used as a counter name'
 
1083
                                  % (counter_name,))
 
1084
        _counters[counter_name] = 0
 
1085
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1086
            lambda: ['%d' % (_counters[counter_name],)]))
 
1087
        def increment_counter(*args, **kwargs):
 
1088
            _counters[counter_name] += 1
 
1089
        label = 'count %s calls' % (counter_name,)
 
1090
        hooks.install_named_hook(name, increment_counter, label)
 
1091
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1092
 
 
1093
    def _install_config_stats_hooks(self):
 
1094
        """Install config hooks to count hook calls.
 
1095
 
 
1096
        """
 
1097
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1098
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1099
                                       'config.%s' % (hook_name,))
 
1100
 
 
1101
        # The OldConfigHooks are private and need special handling to protect
 
1102
        # against recursive tests (tests that run other tests), so we just do
 
1103
        # manually what registering them into _builtin_known_hooks will provide
 
1104
        # us.
 
1105
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1106
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1107
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1108
                                      'old_config.%s' % (hook_name,))
 
1109
 
1014
1110
    def _clear_debug_flags(self):
1015
1111
        """Prevent externally set debug flags affecting tests.
1016
1112
 
1070
1166
        # break some locks on purpose and should be taken into account by
1071
1167
        # considering that breaking a lock is just a dirty way of releasing it.
1072
1168
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1073
 
            message = ('Different number of acquired and '
1074
 
                       'released or broken locks. (%s, %s + %s)' %
1075
 
                       (acquired_locks, released_locks, broken_locks))
 
1169
            message = (
 
1170
                'Different number of acquired and '
 
1171
                'released or broken locks.\n'
 
1172
                'acquired=%s\n'
 
1173
                'released=%s\n'
 
1174
                'broken=%s\n' %
 
1175
                (acquired_locks, released_locks, broken_locks))
1076
1176
            if not self._lock_check_thorough:
1077
1177
                # Rather than fail, just warn
1078
1178
                print "Broken test %s: %s" % (self, message)
1106
1206
 
1107
1207
    def permit_dir(self, name):
1108
1208
        """Permit a directory to be used by this test. See permit_url."""
1109
 
        name_transport = _mod_transport.get_transport(name)
 
1209
        name_transport = _mod_transport.get_transport_from_path(name)
1110
1210
        self.permit_url(name)
1111
1211
        self.permit_url(name_transport.base)
1112
1212
 
1191
1291
        self.addCleanup(transport_server.stop_server)
1192
1292
        # Obtain a real transport because if the server supplies a password, it
1193
1293
        # will be hidden from the base on the client side.
1194
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1294
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1195
1295
        # Some transport servers effectively chroot the backing transport;
1196
1296
        # others like SFTPServer don't - users of the transport can walk up the
1197
1297
        # transport to read the entire backing transport. This wouldn't matter
1632
1732
        return result
1633
1733
 
1634
1734
    def _startLogFile(self):
1635
 
        """Send bzr and test log messages to a temporary file.
1636
 
 
1637
 
        The file is removed as the test is torn down.
1638
 
        """
 
1735
        """Setup a in-memory target for bzr and testcase log messages"""
1639
1736
        pseudo_log_file = StringIO()
1640
1737
        def _get_log_contents_for_weird_testtools_api():
1641
1738
            return [pseudo_log_file.getvalue().decode(
1648
1745
        self.addCleanup(self._finishLogFile)
1649
1746
 
1650
1747
    def _finishLogFile(self):
1651
 
        """Finished with the log file.
1652
 
 
1653
 
        Close the file and delete it, unless setKeepLogfile was called.
1654
 
        """
 
1748
        """Flush and dereference the in-memory log for this testcase"""
1655
1749
        if trace._trace_file:
1656
1750
            # flush the log file, to get all content
1657
1751
            trace._trace_file.flush()
1658
1752
        trace.pop_log_file(self._log_memento)
 
1753
        # The logging module now tracks references for cleanup so discard ours
 
1754
        del self._log_memento
1659
1755
 
1660
1756
    def thisFailsStrictLockCheck(self):
1661
1757
        """It is known that this test would fail with -Dstrict_locks.
1673
1769
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1674
1770
        """Overrides an object attribute restoring it after the test.
1675
1771
 
 
1772
        :note: This should be used with discretion; you should think about
 
1773
        whether it's better to make the code testable without monkey-patching.
 
1774
 
1676
1775
        :param obj: The object that will be mutated.
1677
1776
 
1678
1777
        :param attr_name: The attribute name we want to preserve/override in
1703
1802
        self.addCleanup(osutils.set_or_unset_env, name, value)
1704
1803
        return value
1705
1804
 
 
1805
    def recordCalls(self, obj, attr_name):
 
1806
        """Monkeypatch in a wrapper that will record calls.
 
1807
 
 
1808
        The monkeypatch is automatically removed when the test concludes.
 
1809
 
 
1810
        :param obj: The namespace holding the reference to be replaced;
 
1811
            typically a module, class, or object.
 
1812
        :param attr_name: A string for the name of the attribute to 
 
1813
            patch.
 
1814
        :returns: A list that will be extended with one item every time the
 
1815
            function is called, with a tuple of (args, kwargs).
 
1816
        """
 
1817
        calls = []
 
1818
 
 
1819
        def decorator(*args, **kwargs):
 
1820
            calls.append((args, kwargs))
 
1821
            return orig(*args, **kwargs)
 
1822
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1823
        return calls
 
1824
 
1706
1825
    def _cleanEnvironment(self):
1707
1826
        for name, value in isolated_environ.iteritems():
1708
1827
            self.overrideEnv(name, value)
1715
1834
        self._preserved_lazy_hooks.clear()
1716
1835
 
1717
1836
    def knownFailure(self, reason):
1718
 
        """This test has failed for some known reason."""
1719
 
        raise KnownFailure(reason)
 
1837
        """Declare that this test fails for a known reason
 
1838
 
 
1839
        Tests that are known to fail should generally be using expectedFailure
 
1840
        with an appropriate reverse assertion if a change could cause the test
 
1841
        to start passing. Conversely if the test has no immediate prospect of
 
1842
        succeeding then using skip is more suitable.
 
1843
 
 
1844
        When this method is called while an exception is being handled, that
 
1845
        traceback will be used, otherwise a new exception will be thrown to
 
1846
        provide one but won't be reported.
 
1847
        """
 
1848
        self._add_reason(reason)
 
1849
        try:
 
1850
            exc_info = sys.exc_info()
 
1851
            if exc_info != (None, None, None):
 
1852
                self._report_traceback(exc_info)
 
1853
            else:
 
1854
                try:
 
1855
                    raise self.failureException(reason)
 
1856
                except self.failureException:
 
1857
                    exc_info = sys.exc_info()
 
1858
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1859
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1860
        finally:
 
1861
            del exc_info
1720
1862
 
1721
1863
    def _suppress_log(self):
1722
1864
        """Remove the log info from details."""
1850
1992
 
1851
1993
        self.log('run bzr: %r', args)
1852
1994
        # FIXME: don't call into logging here
1853
 
        handler = logging.StreamHandler(stderr)
1854
 
        handler.setLevel(logging.INFO)
 
1995
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
1996
            level=logging.INFO)
1855
1997
        logger = logging.getLogger('')
1856
1998
        logger.addHandler(handler)
1857
1999
        old_ui_factory = ui.ui_factory
2045
2187
 
2046
2188
        if env_changes is None:
2047
2189
            env_changes = {}
 
2190
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2191
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2192
        # gets set to the computed directory of this parent process.
 
2193
        if site.USER_BASE is not None:
 
2194
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
2048
2195
        old_env = {}
2049
2196
 
2050
2197
        def cleanup_environment():
2241
2388
        from bzrlib.smart import request
2242
2389
        request_handlers = request.request_handlers
2243
2390
        orig_method = request_handlers.get(verb)
 
2391
        orig_info = request_handlers.get_info(verb)
2244
2392
        request_handlers.remove(verb)
2245
 
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2393
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2394
            info=orig_info)
2246
2395
 
2247
2396
 
2248
2397
class CapturedCall(object):
2271
2420
class TestCaseWithMemoryTransport(TestCase):
2272
2421
    """Common test class for tests that do not need disk resources.
2273
2422
 
2274
 
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2423
    Tests that need disk resources should derive from TestCaseInTempDir
 
2424
    orTestCaseWithTransport.
2275
2425
 
2276
2426
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2277
2427
 
2278
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2428
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2279
2429
    a directory which does not exist. This serves to help ensure test isolation
2280
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2281
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
2282
 
    file defaults for the transport in tests, nor does it obey the command line
 
2430
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2431
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2432
    defaults for the transport in tests, nor does it obey the command line
2283
2433
    override, so tests that accidentally write to the common directory should
2284
2434
    be rare.
2285
2435
 
2286
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
2287
 
    a .bzr directory that stops us ascending higher into the filesystem.
 
2436
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2437
        ``.bzr`` directory that stops us ascending higher into the filesystem.
2288
2438
    """
2289
2439
 
2290
2440
    TEST_ROOT = None
2308
2458
 
2309
2459
        :param relpath: a path relative to the base url.
2310
2460
        """
2311
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2461
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2312
2462
        self.assertFalse(t.is_readonly())
2313
2463
        return t
2314
2464
 
2320
2470
 
2321
2471
        :param relpath: a path relative to the base url.
2322
2472
        """
2323
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2473
        t = _mod_transport.get_transport_from_url(
 
2474
            self.get_readonly_url(relpath))
2324
2475
        self.assertTrue(t.is_readonly())
2325
2476
        return t
2326
2477
 
2447
2598
        real branch.
2448
2599
        """
2449
2600
        root = TestCaseWithMemoryTransport.TEST_ROOT
2450
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2601
        # Make sure we get a readable and accessible home for .bzr.log
 
2602
        # and/or config files, and not fallback to weird defaults (see
 
2603
        # http://pad.lv/825027).
 
2604
        self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2605
        os.environ['BZR_HOME'] = root
 
2606
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2607
        del os.environ['BZR_HOME']
 
2608
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2609
        # we don't need to re-open the wt to check it hasn't changed.
 
2610
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2611
            wt.control_transport.get_bytes('dirstate'))
2451
2612
 
2452
2613
    def _check_safety_net(self):
2453
2614
        """Check that the safety .bzr directory have not been touched.
2456
2617
        propagating. This method ensures than a test did not leaked.
2457
2618
        """
2458
2619
        root = TestCaseWithMemoryTransport.TEST_ROOT
2459
 
        self.permit_url(_mod_transport.get_transport(root).base)
2460
 
        wt = workingtree.WorkingTree.open(root)
2461
 
        last_rev = wt.last_revision()
2462
 
        if last_rev != 'null:':
 
2620
        t = _mod_transport.get_transport_from_path(root)
 
2621
        self.permit_url(t.base)
 
2622
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2623
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2463
2624
            # The current test have modified the /bzr directory, we need to
2464
2625
            # recreate a new one or all the followng tests will fail.
2465
2626
            # If you need to inspect its content uncomment the following line
2500
2661
    def make_branch(self, relpath, format=None):
2501
2662
        """Create a branch on the transport at relpath."""
2502
2663
        repo = self.make_repository(relpath, format=format)
2503
 
        return repo.bzrdir.create_branch()
 
2664
        return repo.bzrdir.create_branch(append_revisions_only=False)
 
2665
 
 
2666
    def get_default_format(self):
 
2667
        return 'default'
 
2668
 
 
2669
    def resolve_format(self, format):
 
2670
        """Resolve an object to a ControlDir format object.
 
2671
 
 
2672
        The initial format object can either already be
 
2673
        a ControlDirFormat, None (for the default format),
 
2674
        or a string with the name of the control dir format.
 
2675
 
 
2676
        :param format: Object to resolve
 
2677
        :return A ControlDirFormat instance
 
2678
        """
 
2679
        if format is None:
 
2680
            format = self.get_default_format()
 
2681
        if isinstance(format, basestring):
 
2682
            format = bzrdir.format_registry.make_bzrdir(format)
 
2683
        return format
2504
2684
 
2505
2685
    def make_bzrdir(self, relpath, format=None):
2506
2686
        try:
2510
2690
            t = _mod_transport.get_transport(maybe_a_url)
2511
2691
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2512
2692
                t.ensure_base()
2513
 
            if format is None:
2514
 
                format = 'default'
2515
 
            if isinstance(format, basestring):
2516
 
                format = bzrdir.format_registry.make_bzrdir(format)
 
2693
            format = self.resolve_format(format)
2517
2694
            return format.initialize_on_transport(t)
2518
2695
        except errors.UninitializableFormat:
2519
2696
            raise TestSkipped("Format %s is not initializable." % format)
2520
2697
 
2521
 
    def make_repository(self, relpath, shared=False, format=None):
 
2698
    def make_repository(self, relpath, shared=None, format=None):
2522
2699
        """Create a repository on our default transport at relpath.
2523
2700
 
2524
2701
        Note that relpath must be a relative path, not a full url.
2535
2712
            backing_server = self.get_server()
2536
2713
        smart_server = test_server.SmartTCPServer_for_testing()
2537
2714
        self.start_server(smart_server, backing_server)
2538
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2715
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2539
2716
                                                   ).clone(path)
2540
2717
        return remote_transport
2541
2718
 
2557
2734
 
2558
2735
    def setUp(self):
2559
2736
        super(TestCaseWithMemoryTransport, self).setUp()
2560
 
        # Ensure that ConnectedTransport doesn't leak sockets
2561
 
        def get_transport_with_cleanup(*args, **kwargs):
2562
 
            t = orig_get_transport(*args, **kwargs)
2563
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2564
 
                self.addCleanup(t.disconnect)
2565
 
            return t
2566
 
 
2567
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2568
 
                                               get_transport_with_cleanup)
 
2737
 
 
2738
        def _add_disconnect_cleanup(transport):
 
2739
            """Schedule disconnection of given transport at test cleanup
 
2740
 
 
2741
            This needs to happen for all connected transports or leaks occur.
 
2742
 
 
2743
            Note reconnections may mean we call disconnect multiple times per
 
2744
            transport which is suboptimal but seems harmless.
 
2745
            """
 
2746
            self.addCleanup(transport.disconnect)
 
2747
 
 
2748
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2749
            _add_disconnect_cleanup, None)
 
2750
 
2569
2751
        self._make_test_root()
2570
2752
        self.addCleanup(os.chdir, os.getcwdu())
2571
2753
        self.makeAndChdirToTestDir()
2577
2759
    def setup_smart_server_with_call_log(self):
2578
2760
        """Sets up a smart server as the transport server with a call log."""
2579
2761
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2762
        self.hpss_connections = []
2580
2763
        self.hpss_calls = []
2581
2764
        import traceback
2582
2765
        # Skip the current stack down to the caller of
2585
2768
        def capture_hpss_call(params):
2586
2769
            self.hpss_calls.append(
2587
2770
                CapturedCall(params, prefix_length))
 
2771
        def capture_connect(transport):
 
2772
            self.hpss_connections.append(transport)
2588
2773
        client._SmartClient.hooks.install_named_hook(
2589
2774
            'call', capture_hpss_call, None)
 
2775
        _mod_transport.Transport.hooks.install_named_hook(
 
2776
            'post_connect', capture_connect, None)
2590
2777
 
2591
2778
    def reset_smart_call_log(self):
2592
2779
        self.hpss_calls = []
 
2780
        self.hpss_connections = []
2593
2781
 
2594
2782
 
2595
2783
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2614
2802
 
2615
2803
    OVERRIDE_PYTHON = 'python'
2616
2804
 
 
2805
    def setUp(self):
 
2806
        super(TestCaseInTempDir, self).setUp()
 
2807
        # Remove the protection set in isolated_environ, we have a proper
 
2808
        # access to disk resources now.
 
2809
        self.overrideEnv('BZR_LOG', None)
 
2810
 
2617
2811
    def check_file_contents(self, filename, expect):
2618
2812
        self.log("check contents of file %s" % filename)
2619
2813
        f = file(filename)
2700
2894
                "a list or a tuple. Got %r instead" % (shape,))
2701
2895
        # It's OK to just create them using forward slashes on windows.
2702
2896
        if transport is None or transport.is_readonly():
2703
 
            transport = _mod_transport.get_transport(".")
 
2897
            transport = _mod_transport.get_transport_from_path(".")
2704
2898
        for name in shape:
2705
2899
            self.assertIsInstance(name, basestring)
2706
2900
            if name[-1] == '/':
2791
2985
        # this obviously requires a format that supports branch references
2792
2986
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2793
2987
        # RBC 20060208
 
2988
        format = self.resolve_format(format=format)
 
2989
        if not format.supports_workingtrees:
 
2990
            b = self.make_branch(relpath+'.branch', format=format)
 
2991
            return b.create_checkout(relpath, lightweight=True)
2794
2992
        b = self.make_branch(relpath, format=format)
2795
2993
        try:
2796
2994
            return b.bzrdir.create_workingtree()
3095
3293
                            result_decorators=result_decorators,
3096
3294
                            )
3097
3295
    runner.stop_on_failure=stop_on_failure
 
3296
    if isinstance(suite, unittest.TestSuite):
 
3297
        # Empty out _tests list of passed suite and populate new TestSuite
 
3298
        suite._tests[:], suite = [], TestSuite(suite)
3098
3299
    # built in decorator factories:
3099
3300
    decorators = [
3100
3301
        random_order(random_seed, runner),
3198
3399
 
3199
3400
class TestDecorator(TestUtil.TestSuite):
3200
3401
    """A decorator for TestCase/TestSuite objects.
3201
 
    
3202
 
    Usually, subclasses should override __iter__(used when flattening test
3203
 
    suites), which we do to filter, reorder, parallelise and so on, run() and
3204
 
    debug().
 
3402
 
 
3403
    Contains rather than flattening suite passed on construction
3205
3404
    """
3206
3405
 
3207
 
    def __init__(self, suite):
3208
 
        TestUtil.TestSuite.__init__(self)
3209
 
        self.addTest(suite)
3210
 
 
3211
 
    def countTestCases(self):
3212
 
        cases = 0
3213
 
        for test in self:
3214
 
            cases += test.countTestCases()
3215
 
        return cases
3216
 
 
3217
 
    def debug(self):
3218
 
        for test in self:
3219
 
            test.debug()
3220
 
 
3221
 
    def run(self, result):
3222
 
        # Use iteration on self, not self._tests, to allow subclasses to hook
3223
 
        # into __iter__.
3224
 
        for test in self:
3225
 
            if result.shouldStop:
3226
 
                break
3227
 
            test.run(result)
3228
 
        return result
 
3406
    def __init__(self, suite=None):
 
3407
        super(TestDecorator, self).__init__()
 
3408
        if suite is not None:
 
3409
            self.addTest(suite)
 
3410
 
 
3411
    # Don't need subclass run method with suite emptying
 
3412
    run = unittest.TestSuite.run
3229
3413
 
3230
3414
 
3231
3415
class CountingDecorator(TestDecorator):
3242
3426
    """A decorator which excludes test matching an exclude pattern."""
3243
3427
 
3244
3428
    def __init__(self, suite, exclude_pattern):
3245
 
        TestDecorator.__init__(self, suite)
3246
 
        self.exclude_pattern = exclude_pattern
3247
 
        self.excluded = False
3248
 
 
3249
 
    def __iter__(self):
3250
 
        if self.excluded:
3251
 
            return iter(self._tests)
3252
 
        self.excluded = True
3253
 
        suite = exclude_tests_by_re(self, self.exclude_pattern)
3254
 
        del self._tests[:]
3255
 
        self.addTests(suite)
3256
 
        return iter(self._tests)
 
3429
        super(ExcludeDecorator, self).__init__(
 
3430
            exclude_tests_by_re(suite, exclude_pattern))
3257
3431
 
3258
3432
 
3259
3433
class FilterTestsDecorator(TestDecorator):
3260
3434
    """A decorator which filters tests to those matching a pattern."""
3261
3435
 
3262
3436
    def __init__(self, suite, pattern):
3263
 
        TestDecorator.__init__(self, suite)
3264
 
        self.pattern = pattern
3265
 
        self.filtered = False
3266
 
 
3267
 
    def __iter__(self):
3268
 
        if self.filtered:
3269
 
            return iter(self._tests)
3270
 
        self.filtered = True
3271
 
        suite = filter_suite_by_re(self, self.pattern)
3272
 
        del self._tests[:]
3273
 
        self.addTests(suite)
3274
 
        return iter(self._tests)
 
3437
        super(FilterTestsDecorator, self).__init__(
 
3438
            filter_suite_by_re(suite, pattern))
3275
3439
 
3276
3440
 
3277
3441
class RandomDecorator(TestDecorator):
3278
3442
    """A decorator which randomises the order of its tests."""
3279
3443
 
3280
3444
    def __init__(self, suite, random_seed, stream):
3281
 
        TestDecorator.__init__(self, suite)
3282
 
        self.random_seed = random_seed
3283
 
        self.randomised = False
3284
 
        self.stream = stream
3285
 
 
3286
 
    def __iter__(self):
3287
 
        if self.randomised:
3288
 
            return iter(self._tests)
3289
 
        self.randomised = True
3290
 
        self.stream.write("Randomizing test order using seed %s\n\n" %
3291
 
            (self.actual_seed()))
 
3445
        random_seed = self.actual_seed(random_seed)
 
3446
        stream.write("Randomizing test order using seed %s\n\n" %
 
3447
            (random_seed,))
3292
3448
        # Initialise the random number generator.
3293
 
        random.seed(self.actual_seed())
3294
 
        suite = randomize_suite(self)
3295
 
        del self._tests[:]
3296
 
        self.addTests(suite)
3297
 
        return iter(self._tests)
 
3449
        random.seed(random_seed)
 
3450
        super(RandomDecorator, self).__init__(randomize_suite(suite))
3298
3451
 
3299
 
    def actual_seed(self):
3300
 
        if self.random_seed == "now":
 
3452
    @staticmethod
 
3453
    def actual_seed(seed):
 
3454
        if seed == "now":
3301
3455
            # We convert the seed to a long to make it reuseable across
3302
3456
            # invocations (because the user can reenter it).
3303
 
            self.random_seed = long(time.time())
 
3457
            return long(time.time())
3304
3458
        else:
3305
3459
            # Convert the seed to a long if we can
3306
3460
            try:
3307
 
                self.random_seed = long(self.random_seed)
3308
 
            except:
 
3461
                return long(seed)
 
3462
            except (TypeError, ValueError):
3309
3463
                pass
3310
 
        return self.random_seed
 
3464
        return seed
3311
3465
 
3312
3466
 
3313
3467
class TestFirstDecorator(TestDecorator):
3314
3468
    """A decorator which moves named tests to the front."""
3315
3469
 
3316
3470
    def __init__(self, suite, pattern):
3317
 
        TestDecorator.__init__(self, suite)
3318
 
        self.pattern = pattern
3319
 
        self.filtered = False
3320
 
 
3321
 
    def __iter__(self):
3322
 
        if self.filtered:
3323
 
            return iter(self._tests)
3324
 
        self.filtered = True
3325
 
        suites = split_suite_by_re(self, self.pattern)
3326
 
        del self._tests[:]
3327
 
        self.addTests(suites)
3328
 
        return iter(self._tests)
 
3471
        super(TestFirstDecorator, self).__init__()
 
3472
        self.addTests(split_suite_by_re(suite, pattern))
3329
3473
 
3330
3474
 
3331
3475
def partition_tests(suite, count):
3363
3507
    """
3364
3508
    concurrency = osutils.local_concurrency()
3365
3509
    result = []
3366
 
    from subunit import TestProtocolClient, ProtocolTestCase
 
3510
    from subunit import ProtocolTestCase
3367
3511
    from subunit.test_results import AutoTimingTestResultDecorator
3368
3512
    class TestInOtherProcess(ProtocolTestCase):
3369
3513
        # Should be in subunit, I think. RBC.
3375
3519
            try:
3376
3520
                ProtocolTestCase.run(self, result)
3377
3521
            finally:
3378
 
                os.waitpid(self.pid, 0)
 
3522
                pid, status = os.waitpid(self.pid, 0)
 
3523
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3524
            #                that something went wrong.
3379
3525
 
3380
3526
    test_blocks = partition_tests(suite, concurrency)
 
3527
    # Clear the tests from the original suite so it doesn't keep them alive
 
3528
    suite._tests[:] = []
3381
3529
    for process_tests in test_blocks:
3382
 
        process_suite = TestUtil.TestSuite()
3383
 
        process_suite.addTests(process_tests)
 
3530
        process_suite = TestUtil.TestSuite(process_tests)
 
3531
        # Also clear each split list so new suite has only reference
 
3532
        process_tests[:] = []
3384
3533
        c2pread, c2pwrite = os.pipe()
3385
3534
        pid = os.fork()
3386
3535
        if pid == 0:
3387
 
            workaround_zealous_crypto_random()
3388
3536
            try:
 
3537
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3538
                workaround_zealous_crypto_random()
3389
3539
                os.close(c2pread)
3390
3540
                # Leave stderr and stdout open so we can see test noise
3391
3541
                # Close stdin so that the child goes away if it decides to
3392
3542
                # read from stdin (otherwise its a roulette to see what
3393
3543
                # child actually gets keystrokes for pdb etc).
3394
3544
                sys.stdin.close()
3395
 
                sys.stdin = None
3396
 
                stream = os.fdopen(c2pwrite, 'wb', 1)
3397
3545
                subunit_result = AutoTimingTestResultDecorator(
3398
 
                    TestProtocolClient(stream))
 
3546
                    SubUnitBzrProtocolClient(stream))
3399
3547
                process_suite.run(subunit_result)
3400
 
            finally:
3401
 
                os._exit(0)
 
3548
            except:
 
3549
                # Try and report traceback on stream, but exit with error even
 
3550
                # if stream couldn't be created or something else goes wrong.
 
3551
                # The traceback is formatted to a string and written in one go
 
3552
                # to avoid interleaving lines from multiple failing children.
 
3553
                try:
 
3554
                    stream.write(traceback.format_exc())
 
3555
                finally:
 
3556
                    os._exit(1)
 
3557
            os._exit(0)
3402
3558
        else:
3403
3559
            os.close(c2pwrite)
3404
3560
            stream = os.fdopen(c2pread, 'rb', 1)
3510
3666
#                           with proper exclusion rules.
3511
3667
#   -Ethreads               Will display thread ident at creation/join time to
3512
3668
#                           help track thread leaks
 
3669
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3670
#                           deallocated after being completed.
 
3671
#   -Econfig_stats          Will collect statistics using addDetail
3513
3672
selftest_debug_flags = set()
3514
3673
 
3515
3674
 
3819
3978
        'bzrlib.tests.test_email_message',
3820
3979
        'bzrlib.tests.test_eol_filters',
3821
3980
        'bzrlib.tests.test_errors',
 
3981
        'bzrlib.tests.test_estimate_compressed_size',
3822
3982
        'bzrlib.tests.test_export',
3823
3983
        'bzrlib.tests.test_export_pot',
3824
3984
        'bzrlib.tests.test_extract',
 
3985
        'bzrlib.tests.test_features',
3825
3986
        'bzrlib.tests.test_fetch',
3826
3987
        'bzrlib.tests.test_fixtures',
3827
3988
        'bzrlib.tests.test_fifo_cache',
3828
3989
        'bzrlib.tests.test_filters',
 
3990
        'bzrlib.tests.test_filter_tree',
3829
3991
        'bzrlib.tests.test_ftp_transport',
3830
3992
        'bzrlib.tests.test_foreign',
3831
3993
        'bzrlib.tests.test_generate_docs',
3906
4068
        'bzrlib.tests.test_smart',
3907
4069
        'bzrlib.tests.test_smart_add',
3908
4070
        'bzrlib.tests.test_smart_request',
 
4071
        'bzrlib.tests.test_smart_signals',
3909
4072
        'bzrlib.tests.test_smart_transport',
3910
4073
        'bzrlib.tests.test_smtp_connection',
3911
4074
        'bzrlib.tests.test_source',
3942
4105
        'bzrlib.tests.test_version',
3943
4106
        'bzrlib.tests.test_version_info',
3944
4107
        'bzrlib.tests.test_versionedfile',
 
4108
        'bzrlib.tests.test_vf_search',
3945
4109
        'bzrlib.tests.test_weave',
3946
4110
        'bzrlib.tests.test_whitebox',
3947
4111
        'bzrlib.tests.test_win32utils',
4222
4386
        the module is available.
4223
4387
    """
4224
4388
 
 
4389
    from bzrlib.tests.features import ModuleAvailableFeature
4225
4390
    py_module = pyutils.get_named_object(py_module_name)
4226
4391
    scenarios = [
4227
4392
        ('python', {'module': py_module}),
4268
4433
                         % (os.path.basename(dirname), printable_e))
4269
4434
 
4270
4435
 
4271
 
class Feature(object):
4272
 
    """An operating system Feature."""
4273
 
 
4274
 
    def __init__(self):
4275
 
        self._available = None
4276
 
 
4277
 
    def available(self):
4278
 
        """Is the feature available?
4279
 
 
4280
 
        :return: True if the feature is available.
4281
 
        """
4282
 
        if self._available is None:
4283
 
            self._available = self._probe()
4284
 
        return self._available
4285
 
 
4286
 
    def _probe(self):
4287
 
        """Implement this method in concrete features.
4288
 
 
4289
 
        :return: True if the feature is available.
4290
 
        """
4291
 
        raise NotImplementedError
4292
 
 
4293
 
    def __str__(self):
4294
 
        if getattr(self, 'feature_name', None):
4295
 
            return self.feature_name()
4296
 
        return self.__class__.__name__
4297
 
 
4298
 
 
4299
 
class _SymlinkFeature(Feature):
4300
 
 
4301
 
    def _probe(self):
4302
 
        return osutils.has_symlinks()
4303
 
 
4304
 
    def feature_name(self):
4305
 
        return 'symlinks'
4306
 
 
4307
 
SymlinkFeature = _SymlinkFeature()
4308
 
 
4309
 
 
4310
 
class _HardlinkFeature(Feature):
4311
 
 
4312
 
    def _probe(self):
4313
 
        return osutils.has_hardlinks()
4314
 
 
4315
 
    def feature_name(self):
4316
 
        return 'hardlinks'
4317
 
 
4318
 
HardlinkFeature = _HardlinkFeature()
4319
 
 
4320
 
 
4321
 
class _OsFifoFeature(Feature):
4322
 
 
4323
 
    def _probe(self):
4324
 
        return getattr(os, 'mkfifo', None)
4325
 
 
4326
 
    def feature_name(self):
4327
 
        return 'filesystem fifos'
4328
 
 
4329
 
OsFifoFeature = _OsFifoFeature()
4330
 
 
4331
 
 
4332
 
class _UnicodeFilenameFeature(Feature):
4333
 
    """Does the filesystem support Unicode filenames?"""
4334
 
 
4335
 
    def _probe(self):
4336
 
        try:
4337
 
            # Check for character combinations unlikely to be covered by any
4338
 
            # single non-unicode encoding. We use the characters
4339
 
            # - greek small letter alpha (U+03B1) and
4340
 
            # - braille pattern dots-123456 (U+283F).
4341
 
            os.stat(u'\u03b1\u283f')
4342
 
        except UnicodeEncodeError:
4343
 
            return False
4344
 
        except (IOError, OSError):
4345
 
            # The filesystem allows the Unicode filename but the file doesn't
4346
 
            # exist.
4347
 
            return True
4348
 
        else:
4349
 
            # The filesystem allows the Unicode filename and the file exists,
4350
 
            # for some reason.
4351
 
            return True
4352
 
 
4353
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4354
 
 
4355
 
 
4356
 
class _CompatabilityThunkFeature(Feature):
4357
 
    """This feature is just a thunk to another feature.
4358
 
 
4359
 
    It issues a deprecation warning if it is accessed, to let you know that you
4360
 
    should really use a different feature.
4361
 
    """
4362
 
 
4363
 
    def __init__(self, dep_version, module, name,
4364
 
                 replacement_name, replacement_module=None):
4365
 
        super(_CompatabilityThunkFeature, self).__init__()
4366
 
        self._module = module
4367
 
        if replacement_module is None:
4368
 
            replacement_module = module
4369
 
        self._replacement_module = replacement_module
4370
 
        self._name = name
4371
 
        self._replacement_name = replacement_name
4372
 
        self._dep_version = dep_version
4373
 
        self._feature = None
4374
 
 
4375
 
    def _ensure(self):
4376
 
        if self._feature is None:
4377
 
            depr_msg = self._dep_version % ('%s.%s'
4378
 
                                            % (self._module, self._name))
4379
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4380
 
                                               self._replacement_name)
4381
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4382
 
            # Import the new feature and use it as a replacement for the
4383
 
            # deprecated one.
4384
 
            self._feature = pyutils.get_named_object(
4385
 
                self._replacement_module, self._replacement_name)
4386
 
 
4387
 
    def _probe(self):
4388
 
        self._ensure()
4389
 
        return self._feature._probe()
4390
 
 
4391
 
 
4392
 
class ModuleAvailableFeature(Feature):
4393
 
    """This is a feature than describes a module we want to be available.
4394
 
 
4395
 
    Declare the name of the module in __init__(), and then after probing, the
4396
 
    module will be available as 'self.module'.
4397
 
 
4398
 
    :ivar module: The module if it is available, else None.
4399
 
    """
4400
 
 
4401
 
    def __init__(self, module_name):
4402
 
        super(ModuleAvailableFeature, self).__init__()
4403
 
        self.module_name = module_name
4404
 
 
4405
 
    def _probe(self):
4406
 
        try:
4407
 
            self._module = __import__(self.module_name, {}, {}, [''])
4408
 
            return True
4409
 
        except ImportError:
4410
 
            return False
4411
 
 
4412
 
    @property
4413
 
    def module(self):
4414
 
        if self.available(): # Make sure the probe has been done
4415
 
            return self._module
4416
 
        return None
4417
 
 
4418
 
    def feature_name(self):
4419
 
        return self.module_name
4420
 
 
4421
 
 
4422
4436
def probe_unicode_in_user_encoding():
4423
4437
    """Try to encode several unicode strings to use in unicode-aware tests.
4424
4438
    Return first successfull match.
4452
4466
    return None
4453
4467
 
4454
4468
 
4455
 
class _HTTPSServerFeature(Feature):
4456
 
    """Some tests want an https Server, check if one is available.
4457
 
 
4458
 
    Right now, the only way this is available is under python2.6 which provides
4459
 
    an ssl module.
4460
 
    """
4461
 
 
4462
 
    def _probe(self):
4463
 
        try:
4464
 
            import ssl
4465
 
            return True
4466
 
        except ImportError:
4467
 
            return False
4468
 
 
4469
 
    def feature_name(self):
4470
 
        return 'HTTPSServer'
4471
 
 
4472
 
 
4473
 
HTTPSServerFeature = _HTTPSServerFeature()
4474
 
 
4475
 
 
4476
 
class _UnicodeFilename(Feature):
4477
 
    """Does the filesystem support Unicode filenames?"""
4478
 
 
4479
 
    def _probe(self):
4480
 
        try:
4481
 
            os.stat(u'\u03b1')
4482
 
        except UnicodeEncodeError:
4483
 
            return False
4484
 
        except (IOError, OSError):
4485
 
            # The filesystem allows the Unicode filename but the file doesn't
4486
 
            # exist.
4487
 
            return True
4488
 
        else:
4489
 
            # The filesystem allows the Unicode filename and the file exists,
4490
 
            # for some reason.
4491
 
            return True
4492
 
 
4493
 
UnicodeFilename = _UnicodeFilename()
4494
 
 
4495
 
 
4496
 
class _ByteStringNamedFilesystem(Feature):
4497
 
    """Is the filesystem based on bytes?"""
4498
 
 
4499
 
    def _probe(self):
4500
 
        if os.name == "posix":
4501
 
            return True
4502
 
        return False
4503
 
 
4504
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4505
 
 
4506
 
 
4507
 
class _UTF8Filesystem(Feature):
4508
 
    """Is the filesystem UTF-8?"""
4509
 
 
4510
 
    def _probe(self):
4511
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4512
 
            return True
4513
 
        return False
4514
 
 
4515
 
UTF8Filesystem = _UTF8Filesystem()
4516
 
 
4517
 
 
4518
 
class _BreakinFeature(Feature):
4519
 
    """Does this platform support the breakin feature?"""
4520
 
 
4521
 
    def _probe(self):
4522
 
        from bzrlib import breakin
4523
 
        if breakin.determine_signal() is None:
4524
 
            return False
4525
 
        if sys.platform == 'win32':
4526
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4527
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4528
 
            # access the function
4529
 
            try:
4530
 
                import ctypes
4531
 
            except OSError:
4532
 
                return False
4533
 
        return True
4534
 
 
4535
 
    def feature_name(self):
4536
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4537
 
 
4538
 
 
4539
 
BreakinFeature = _BreakinFeature()
4540
 
 
4541
 
 
4542
 
class _CaseInsCasePresFilenameFeature(Feature):
4543
 
    """Is the file-system case insensitive, but case-preserving?"""
4544
 
 
4545
 
    def _probe(self):
4546
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4547
 
        try:
4548
 
            # first check truly case-preserving for created files, then check
4549
 
            # case insensitive when opening existing files.
4550
 
            name = osutils.normpath(name)
4551
 
            base, rel = osutils.split(name)
4552
 
            found_rel = osutils.canonical_relpath(base, name)
4553
 
            return (found_rel == rel
4554
 
                    and os.path.isfile(name.upper())
4555
 
                    and os.path.isfile(name.lower()))
4556
 
        finally:
4557
 
            os.close(fileno)
4558
 
            os.remove(name)
4559
 
 
4560
 
    def feature_name(self):
4561
 
        return "case-insensitive case-preserving filesystem"
4562
 
 
4563
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4564
 
 
4565
 
 
4566
 
class _CaseInsensitiveFilesystemFeature(Feature):
4567
 
    """Check if underlying filesystem is case-insensitive but *not* case
4568
 
    preserving.
4569
 
    """
4570
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4571
 
    # more likely to be case preserving, so this case is rare.
4572
 
 
4573
 
    def _probe(self):
4574
 
        if CaseInsCasePresFilenameFeature.available():
4575
 
            return False
4576
 
 
4577
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4578
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4579
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4580
 
        else:
4581
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4582
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4583
 
            dir=root)
4584
 
        name_a = osutils.pathjoin(tdir, 'a')
4585
 
        name_A = osutils.pathjoin(tdir, 'A')
4586
 
        os.mkdir(name_a)
4587
 
        result = osutils.isdir(name_A)
4588
 
        _rmtree_temp_dir(tdir)
4589
 
        return result
4590
 
 
4591
 
    def feature_name(self):
4592
 
        return 'case-insensitive filesystem'
4593
 
 
4594
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4595
 
 
4596
 
 
4597
 
class _CaseSensitiveFilesystemFeature(Feature):
4598
 
 
4599
 
    def _probe(self):
4600
 
        if CaseInsCasePresFilenameFeature.available():
4601
 
            return False
4602
 
        elif CaseInsensitiveFilesystemFeature.available():
4603
 
            return False
4604
 
        else:
4605
 
            return True
4606
 
 
4607
 
    def feature_name(self):
4608
 
        return 'case-sensitive filesystem'
4609
 
 
4610
 
# new coding style is for feature instances to be lowercase
4611
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4612
 
 
4613
 
 
4614
4469
# Only define SubUnitBzrRunner if subunit is available.
4615
4470
try:
4616
4471
    from subunit import TestProtocolClient
4617
4472
    from subunit.test_results import AutoTimingTestResultDecorator
4618
4473
    class SubUnitBzrProtocolClient(TestProtocolClient):
4619
4474
 
 
4475
        def stopTest(self, test):
 
4476
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4477
            _clear__type_equality_funcs(test)
 
4478
 
4620
4479
        def addSuccess(self, test, details=None):
4621
4480
            # The subunit client always includes the details in the subunit
4622
4481
            # stream, but we don't want to include it in ours.
4634
4493
except ImportError:
4635
4494
    pass
4636
4495
 
4637
 
class _PosixPermissionsFeature(Feature):
4638
 
 
4639
 
    def _probe(self):
4640
 
        def has_perms():
4641
 
            # create temporary file and check if specified perms are maintained.
4642
 
            import tempfile
4643
 
 
4644
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4645
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4646
 
            fd, name = f
4647
 
            os.close(fd)
4648
 
            os.chmod(name, write_perms)
4649
 
 
4650
 
            read_perms = os.stat(name).st_mode & 0777
4651
 
            os.unlink(name)
4652
 
            return (write_perms == read_perms)
4653
 
 
4654
 
        return (os.name == 'posix') and has_perms()
4655
 
 
4656
 
    def feature_name(self):
4657
 
        return 'POSIX permissions support'
4658
 
 
4659
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4496
 
 
4497
# API compatibility for old plugins; see bug 892622.
 
4498
for name in [
 
4499
    'Feature',
 
4500
    'HTTPServerFeature', 
 
4501
    'ModuleAvailableFeature',
 
4502
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4503
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4504
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4505
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4506
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4507
    'posix_permissions_feature',
 
4508
    ]:
 
4509
    globals()[name] = _CompatabilityThunkFeature(
 
4510
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4511
        'bzrlib.tests', name,
 
4512
        name, 'bzrlib.tests.features')
 
4513
 
 
4514
 
 
4515
for (old_name, new_name) in [
 
4516
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4517
    ]:
 
4518
    globals()[name] = _CompatabilityThunkFeature(
 
4519
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4520
        'bzrlib.tests', old_name,
 
4521
        new_name, 'bzrlib.tests.features')