~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

Merge bzr.dev to unify _type_equality_funcs workarounds

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
24
 
 
25
19
# NOTE: Some classes in here use camelCaseNaming() rather than
26
20
# underscore_naming().  That's for consistency with unittest; it's not the
27
21
# general style of bzrlib.  Please continue that consistency when adding e.g.
67
61
    chk_map,
68
62
    commands as _mod_commands,
69
63
    config,
 
64
    i18n,
70
65
    debug,
71
66
    errors,
72
67
    hooks,
94
89
    memory,
95
90
    pathfilter,
96
91
    )
 
92
from bzrlib.symbol_versioning import (
 
93
    deprecated_function,
 
94
    deprecated_in,
 
95
    )
97
96
from bzrlib.tests import (
98
97
    test_server,
99
98
    TestUtil,
142
141
    'BZREMAIL': None, # may still be present in the environment
143
142
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
143
    'BZR_PROGRESS_BAR': None,
145
 
    'BZR_LOG': None,
 
144
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
145
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
146
    # TestCase should not use disk resources, BZR_LOG is one.
 
147
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
146
148
    'BZR_PLUGIN_PATH': None,
147
149
    'BZR_DISABLE_PLUGINS': None,
148
150
    'BZR_PLUGINS_AT': None,
383
385
        getDetails = getattr(test, "getDetails", None)
384
386
        if getDetails is not None:
385
387
            getDetails().clear()
 
388
        # Clear _type_equality_funcs to try to stop TestCase instances
 
389
        # from wasting memory. 'clear' is not available in all Python
 
390
        # versions (bug 809048)
386
391
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
387
392
        if type_equality_funcs is not None:
388
 
            type_equality_funcs.clear()
 
393
            tef_clear = getattr(type_equality_funcs, "clear", None)
 
394
            if tef_clear is None:
 
395
                tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
396
                if tef_instance_dict is not None:
 
397
                    tef_clear = tef_instance_dict.clear
 
398
            if tef_clear is not None:
 
399
                tef_clear()
389
400
        self._traceback_from_test = None
390
401
 
391
402
    def startTests(self):
932
943
 
933
944
    The method is really a factory and users are expected to use it as such.
934
945
    """
935
 
    
 
946
 
936
947
    kwargs['setUp'] = isolated_doctest_setUp
937
948
    kwargs['tearDown'] = isolated_doctest_tearDown
938
949
    return doctest.DocTestSuite(*args, **kwargs)
992
1003
        # settled on or a the FIXME associated with _get_expand_default_value
993
1004
        # is addressed -- vila 20110219
994
1005
        self.overrideAttr(config, '_expand_default_value', None)
 
1006
        self._log_files = set()
 
1007
        # Each key in the ``_counters`` dict holds a value for a different
 
1008
        # counter. When the test ends, addDetail() should be used to output the
 
1009
        # counter values. This happens in install_counter_hook().
 
1010
        self._counters = {}
 
1011
        if 'config_stats' in selftest_debug_flags:
 
1012
            self._install_config_stats_hooks()
 
1013
        # Do not use i18n for tests (unless the test reverses this)
 
1014
        i18n.disable_i18n()
995
1015
 
996
1016
    def debug(self):
997
1017
        # debug a frame up.
998
1018
        import pdb
999
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
1019
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1020
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1021
                ).set_trace(sys._getframe().f_back)
1000
1022
 
1001
1023
    def discardDetail(self, name):
1002
1024
        """Extend the addDetail, getDetails api so we can remove a detail.
1014
1036
        if name in details:
1015
1037
            del details[name]
1016
1038
 
 
1039
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1040
        """Install a counting hook.
 
1041
 
 
1042
        Any hook can be counted as long as it doesn't need to return a value.
 
1043
 
 
1044
        :param hooks: Where the hook should be installed.
 
1045
 
 
1046
        :param name: The hook name that will be counted.
 
1047
 
 
1048
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1049
            to ``name``.
 
1050
        """
 
1051
        _counters = self._counters # Avoid closing over self
 
1052
        if counter_name is None:
 
1053
            counter_name = name
 
1054
        if _counters.has_key(counter_name):
 
1055
            raise AssertionError('%s is already used as a counter name'
 
1056
                                  % (counter_name,))
 
1057
        _counters[counter_name] = 0
 
1058
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1059
            lambda: ['%d' % (_counters[counter_name],)]))
 
1060
        def increment_counter(*args, **kwargs):
 
1061
            _counters[counter_name] += 1
 
1062
        label = 'count %s calls' % (counter_name,)
 
1063
        hooks.install_named_hook(name, increment_counter, label)
 
1064
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1065
 
 
1066
    def _install_config_stats_hooks(self):
 
1067
        """Install config hooks to count hook calls.
 
1068
 
 
1069
        """
 
1070
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1071
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1072
                                       'config.%s' % (hook_name,))
 
1073
 
 
1074
        # The OldConfigHooks are private and need special handling to protect
 
1075
        # against recursive tests (tests that run other tests), so we just do
 
1076
        # manually what registering them into _builtin_known_hooks will provide
 
1077
        # us.
 
1078
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1079
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1080
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1081
                                      'old_config.%s' % (hook_name,))
 
1082
 
1017
1083
    def _clear_debug_flags(self):
1018
1084
        """Prevent externally set debug flags affecting tests.
1019
1085
 
1073
1139
        # break some locks on purpose and should be taken into account by
1074
1140
        # considering that breaking a lock is just a dirty way of releasing it.
1075
1141
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1076
 
            message = ('Different number of acquired and '
1077
 
                       'released or broken locks. (%s, %s + %s)' %
1078
 
                       (acquired_locks, released_locks, broken_locks))
 
1142
            message = (
 
1143
                'Different number of acquired and '
 
1144
                'released or broken locks.\n'
 
1145
                'acquired=%s\n'
 
1146
                'released=%s\n'
 
1147
                'broken=%s\n' %
 
1148
                (acquired_locks, released_locks, broken_locks))
1079
1149
            if not self._lock_check_thorough:
1080
1150
                # Rather than fail, just warn
1081
1151
                print "Broken test %s: %s" % (self, message)
1109
1179
 
1110
1180
    def permit_dir(self, name):
1111
1181
        """Permit a directory to be used by this test. See permit_url."""
1112
 
        name_transport = _mod_transport.get_transport(name)
 
1182
        name_transport = _mod_transport.get_transport_from_path(name)
1113
1183
        self.permit_url(name)
1114
1184
        self.permit_url(name_transport.base)
1115
1185
 
1194
1264
        self.addCleanup(transport_server.stop_server)
1195
1265
        # Obtain a real transport because if the server supplies a password, it
1196
1266
        # will be hidden from the base on the client side.
1197
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1267
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1198
1268
        # Some transport servers effectively chroot the backing transport;
1199
1269
        # others like SFTPServer don't - users of the transport can walk up the
1200
1270
        # transport to read the entire backing transport. This wouldn't matter
1550
1620
        not other callers that go direct to the warning module.
1551
1621
 
1552
1622
        To test that a deprecated method raises an error, do something like
1553
 
        this::
 
1623
        this (remember that both assertRaises and applyDeprecated delays *args
 
1624
        and **kwargs passing)::
1554
1625
 
1555
1626
            self.assertRaises(errors.ReservedId,
1556
1627
                self.applyDeprecated,
1641
1712
        pseudo_log_file = StringIO()
1642
1713
        def _get_log_contents_for_weird_testtools_api():
1643
1714
            return [pseudo_log_file.getvalue().decode(
1644
 
                "utf-8", "replace").encode("utf-8")]          
 
1715
                "utf-8", "replace").encode("utf-8")]
1645
1716
        self.addDetail("log", content.Content(content.ContentType("text",
1646
1717
            "plain", {"charset": "utf8"}),
1647
1718
            _get_log_contents_for_weird_testtools_api))
1652
1723
    def _finishLogFile(self):
1653
1724
        """Finished with the log file.
1654
1725
 
1655
 
        Close the file and delete it, unless setKeepLogfile was called.
 
1726
        Close the file and delete it.
1656
1727
        """
1657
1728
        if trace._trace_file:
1658
1729
            # flush the log file, to get all content
1675
1746
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1676
1747
        """Overrides an object attribute restoring it after the test.
1677
1748
 
 
1749
        :note: This should be used with discretion; you should think about
 
1750
        whether it's better to make the code testable without monkey-patching.
 
1751
 
1678
1752
        :param obj: The object that will be mutated.
1679
1753
 
1680
1754
        :param attr_name: The attribute name we want to preserve/override in
1705
1779
        self.addCleanup(osutils.set_or_unset_env, name, value)
1706
1780
        return value
1707
1781
 
 
1782
    def recordCalls(self, obj, attr_name):
 
1783
        """Monkeypatch in a wrapper that will record calls.
 
1784
 
 
1785
        The monkeypatch is automatically removed when the test concludes.
 
1786
 
 
1787
        :param obj: The namespace holding the reference to be replaced;
 
1788
            typically a module, class, or object.
 
1789
        :param attr_name: A string for the name of the attribute to 
 
1790
            patch.
 
1791
        :returns: A list that will be extended with one item every time the
 
1792
            function is called, with a tuple of (args, kwargs).
 
1793
        """
 
1794
        calls = []
 
1795
 
 
1796
        def decorator(*args, **kwargs):
 
1797
            calls.append((args, kwargs))
 
1798
            return orig(*args, **kwargs)
 
1799
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1800
        return calls
 
1801
 
1708
1802
    def _cleanEnvironment(self):
1709
1803
        for name, value in isolated_environ.iteritems():
1710
1804
            self.overrideEnv(name, value)
1717
1811
        self._preserved_lazy_hooks.clear()
1718
1812
 
1719
1813
    def knownFailure(self, reason):
1720
 
        """This test has failed for some known reason."""
1721
 
        raise KnownFailure(reason)
 
1814
        """Declare that this test fails for a known reason
 
1815
 
 
1816
        Tests that are known to fail should generally be using expectedFailure
 
1817
        with an appropriate reverse assertion if a change could cause the test
 
1818
        to start passing. Conversely if the test has no immediate prospect of
 
1819
        succeeding then using skip is more suitable.
 
1820
 
 
1821
        When this method is called while an exception is being handled, that
 
1822
        traceback will be used, otherwise a new exception will be thrown to
 
1823
        provide one but won't be reported.
 
1824
        """
 
1825
        self._add_reason(reason)
 
1826
        try:
 
1827
            exc_info = sys.exc_info()
 
1828
            if exc_info != (None, None, None):
 
1829
                self._report_traceback(exc_info)
 
1830
            else:
 
1831
                try:
 
1832
                    raise self.failureException(reason)
 
1833
                except self.failureException:
 
1834
                    exc_info = sys.exc_info()
 
1835
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1836
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1837
        finally:
 
1838
            del exc_info
1722
1839
 
1723
1840
    def _suppress_log(self):
1724
1841
        """Remove the log info from details."""
2017
2134
    def start_bzr_subprocess(self, process_args, env_changes=None,
2018
2135
                             skip_if_plan_to_signal=False,
2019
2136
                             working_dir=None,
2020
 
                             allow_plugins=False):
 
2137
                             allow_plugins=False, stderr=subprocess.PIPE):
2021
2138
        """Start bzr in a subprocess for testing.
2022
2139
 
2023
2140
        This starts a new Python interpreter and runs bzr in there.
2035
2152
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2036
2153
            doesn't support signalling subprocesses.
2037
2154
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2155
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2156
            are those valid for the stderr argument of `subprocess.Popen`.
 
2157
            Default value is ``subprocess.PIPE``.
2038
2158
 
2039
2159
        :returns: Popen object for the started process.
2040
2160
        """
2066
2186
            # so we will avoid using it on all platforms, just to
2067
2187
            # make sure the code path is used, and we don't break on win32
2068
2188
            cleanup_environment()
 
2189
            # Include the subprocess's log file in the test details, in case
 
2190
            # the test fails due to an error in the subprocess.
 
2191
            self._add_subprocess_log(trace._get_bzr_log_filename())
2069
2192
            command = [sys.executable]
2070
2193
            # frozen executables don't need the path to bzr
2071
2194
            if getattr(sys, "frozen", None) is None:
2075
2198
            command.extend(process_args)
2076
2199
            process = self._popen(command, stdin=subprocess.PIPE,
2077
2200
                                  stdout=subprocess.PIPE,
2078
 
                                  stderr=subprocess.PIPE)
 
2201
                                  stderr=stderr)
2079
2202
        finally:
2080
2203
            restore_environment()
2081
2204
            if cwd is not None:
2083
2206
 
2084
2207
        return process
2085
2208
 
 
2209
    def _add_subprocess_log(self, log_file_path):
 
2210
        if len(self._log_files) == 0:
 
2211
            # Register an addCleanup func.  We do this on the first call to
 
2212
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2213
            # addCleanup is registered after any cleanups for tempdirs that
 
2214
            # subclasses might create, which will probably remove the log file
 
2215
            # we want to read.
 
2216
            self.addCleanup(self._subprocess_log_cleanup)
 
2217
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2218
        # twice.
 
2219
        self._log_files.add(log_file_path)
 
2220
 
 
2221
    def _subprocess_log_cleanup(self):
 
2222
        for count, log_file_path in enumerate(self._log_files):
 
2223
            # We use buffer_now=True to avoid holding the file open beyond
 
2224
            # the life of this function, which might interfere with e.g.
 
2225
            # cleaning tempdirs on Windows.
 
2226
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2227
            #detail_content = content.content_from_file(
 
2228
            #    log_file_path, buffer_now=True)
 
2229
            with open(log_file_path, 'rb') as log_file:
 
2230
                log_file_bytes = log_file.read()
 
2231
            detail_content = content.Content(content.ContentType("text",
 
2232
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2233
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2234
                detail_content)
 
2235
 
2086
2236
    def _popen(self, *args, **kwargs):
2087
2237
        """Place a call to Popen.
2088
2238
 
2240
2390
class TestCaseWithMemoryTransport(TestCase):
2241
2391
    """Common test class for tests that do not need disk resources.
2242
2392
 
2243
 
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2393
    Tests that need disk resources should derive from TestCaseInTempDir
 
2394
    orTestCaseWithTransport.
2244
2395
 
2245
2396
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2246
2397
 
2247
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2398
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2248
2399
    a directory which does not exist. This serves to help ensure test isolation
2249
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2250
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
2251
 
    file defaults for the transport in tests, nor does it obey the command line
 
2400
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2401
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2402
    defaults for the transport in tests, nor does it obey the command line
2252
2403
    override, so tests that accidentally write to the common directory should
2253
2404
    be rare.
2254
2405
 
2255
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
2256
 
    a .bzr directory that stops us ascending higher into the filesystem.
 
2406
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2407
        ``.bzr`` directory that stops us ascending higher into the filesystem.
2257
2408
    """
2258
2409
 
2259
2410
    TEST_ROOT = None
2277
2428
 
2278
2429
        :param relpath: a path relative to the base url.
2279
2430
        """
2280
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2431
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2281
2432
        self.assertFalse(t.is_readonly())
2282
2433
        return t
2283
2434
 
2289
2440
 
2290
2441
        :param relpath: a path relative to the base url.
2291
2442
        """
2292
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2443
        t = _mod_transport.get_transport_from_url(
 
2444
            self.get_readonly_url(relpath))
2293
2445
        self.assertTrue(t.is_readonly())
2294
2446
        return t
2295
2447
 
2416
2568
        real branch.
2417
2569
        """
2418
2570
        root = TestCaseWithMemoryTransport.TEST_ROOT
2419
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2571
        try:
 
2572
            # Make sure we get a readable and accessible home for .bzr.log
 
2573
            # and/or config files, and not fallback to weird defaults (see
 
2574
            # http://pad.lv/825027).
 
2575
            self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2576
            os.environ['BZR_HOME'] = root
 
2577
            wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2578
            del os.environ['BZR_HOME']
 
2579
        except Exception, e:
 
2580
            self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
 
2581
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2582
        # we don't need to re-open the wt to check it hasn't changed.
 
2583
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2584
            wt.control_transport.get_bytes('dirstate'))
2420
2585
 
2421
2586
    def _check_safety_net(self):
2422
2587
        """Check that the safety .bzr directory have not been touched.
2425
2590
        propagating. This method ensures than a test did not leaked.
2426
2591
        """
2427
2592
        root = TestCaseWithMemoryTransport.TEST_ROOT
2428
 
        self.permit_url(_mod_transport.get_transport(root).base)
2429
 
        wt = workingtree.WorkingTree.open(root)
2430
 
        last_rev = wt.last_revision()
2431
 
        if last_rev != 'null:':
 
2593
        t = _mod_transport.get_transport_from_path(root)
 
2594
        self.permit_url(t.base)
 
2595
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2596
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2432
2597
            # The current test have modified the /bzr directory, we need to
2433
2598
            # recreate a new one or all the followng tests will fail.
2434
2599
            # If you need to inspect its content uncomment the following line
2469
2634
    def make_branch(self, relpath, format=None):
2470
2635
        """Create a branch on the transport at relpath."""
2471
2636
        repo = self.make_repository(relpath, format=format)
2472
 
        return repo.bzrdir.create_branch()
 
2637
        return repo.bzrdir.create_branch(append_revisions_only=False)
 
2638
 
 
2639
    def resolve_format(self, format):
 
2640
        """Resolve an object to a ControlDir format object.
 
2641
 
 
2642
        The initial format object can either already be
 
2643
        a ControlDirFormat, None (for the default format),
 
2644
        or a string with the name of the control dir format.
 
2645
 
 
2646
        :param format: Object to resolve
 
2647
        :return A ControlDirFormat instance
 
2648
        """
 
2649
        if format is None:
 
2650
            format = 'default'
 
2651
        if isinstance(format, basestring):
 
2652
            format = bzrdir.format_registry.make_bzrdir(format)
 
2653
        return format
 
2654
 
 
2655
    def resolve_format(self, format):
 
2656
        """Resolve an object to a ControlDir format object.
 
2657
 
 
2658
        The initial format object can either already be
 
2659
        a ControlDirFormat, None (for the default format),
 
2660
        or a string with the name of the control dir format.
 
2661
 
 
2662
        :param format: Object to resolve
 
2663
        :return A ControlDirFormat instance
 
2664
        """
 
2665
        if format is None:
 
2666
            format = 'default'
 
2667
        if isinstance(format, basestring):
 
2668
            format = bzrdir.format_registry.make_bzrdir(format)
 
2669
        return format
2473
2670
 
2474
2671
    def make_bzrdir(self, relpath, format=None):
2475
2672
        try:
2479
2676
            t = _mod_transport.get_transport(maybe_a_url)
2480
2677
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2481
2678
                t.ensure_base()
2482
 
            if format is None:
2483
 
                format = 'default'
2484
 
            if isinstance(format, basestring):
2485
 
                format = bzrdir.format_registry.make_bzrdir(format)
 
2679
            format = self.resolve_format(format)
2486
2680
            return format.initialize_on_transport(t)
2487
2681
        except errors.UninitializableFormat:
2488
2682
            raise TestSkipped("Format %s is not initializable." % format)
2504
2698
            backing_server = self.get_server()
2505
2699
        smart_server = test_server.SmartTCPServer_for_testing()
2506
2700
        self.start_server(smart_server, backing_server)
2507
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2701
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2508
2702
                                                   ).clone(path)
2509
2703
        return remote_transport
2510
2704
 
2527
2721
    def setUp(self):
2528
2722
        super(TestCaseWithMemoryTransport, self).setUp()
2529
2723
        # Ensure that ConnectedTransport doesn't leak sockets
2530
 
        def get_transport_with_cleanup(*args, **kwargs):
2531
 
            t = orig_get_transport(*args, **kwargs)
 
2724
        def get_transport_from_url_with_cleanup(*args, **kwargs):
 
2725
            t = orig_get_transport_from_url(*args, **kwargs)
2532
2726
            if isinstance(t, _mod_transport.ConnectedTransport):
2533
2727
                self.addCleanup(t.disconnect)
2534
2728
            return t
2535
2729
 
2536
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2537
 
                                               get_transport_with_cleanup)
 
2730
        orig_get_transport_from_url = self.overrideAttr(
 
2731
            _mod_transport, 'get_transport_from_url',
 
2732
            get_transport_from_url_with_cleanup)
2538
2733
        self._make_test_root()
2539
2734
        self.addCleanup(os.chdir, os.getcwdu())
2540
2735
        self.makeAndChdirToTestDir()
2583
2778
 
2584
2779
    OVERRIDE_PYTHON = 'python'
2585
2780
 
 
2781
    def setUp(self):
 
2782
        super(TestCaseInTempDir, self).setUp()
 
2783
        # Remove the protection set in isolated_environ, we have a proper
 
2784
        # access to disk resources now.
 
2785
        self.overrideEnv('BZR_LOG', None)
 
2786
 
2586
2787
    def check_file_contents(self, filename, expect):
2587
2788
        self.log("check contents of file %s" % filename)
2588
2789
        f = file(filename)
2669
2870
                "a list or a tuple. Got %r instead" % (shape,))
2670
2871
        # It's OK to just create them using forward slashes on windows.
2671
2872
        if transport is None or transport.is_readonly():
2672
 
            transport = _mod_transport.get_transport(".")
 
2873
            transport = _mod_transport.get_transport_from_path(".")
2673
2874
        for name in shape:
2674
2875
            self.assertIsInstance(name, basestring)
2675
2876
            if name[-1] == '/':
3433
3634
#                           help track thread leaks
3434
3635
#   -Euncollected_cases     Display the identity of any test cases that weren't
3435
3636
#                           deallocated after being completed.
 
3637
#   -Econfig_stats          Will collect statistics using addDetail
3436
3638
selftest_debug_flags = set()
3437
3639
 
3438
3640
 
3742
3944
        'bzrlib.tests.test_email_message',
3743
3945
        'bzrlib.tests.test_eol_filters',
3744
3946
        'bzrlib.tests.test_errors',
 
3947
        'bzrlib.tests.test_estimate_compressed_size',
3745
3948
        'bzrlib.tests.test_export',
3746
3949
        'bzrlib.tests.test_export_pot',
3747
3950
        'bzrlib.tests.test_extract',
 
3951
        'bzrlib.tests.test_features',
3748
3952
        'bzrlib.tests.test_fetch',
3749
3953
        'bzrlib.tests.test_fixtures',
3750
3954
        'bzrlib.tests.test_fifo_cache',
3751
3955
        'bzrlib.tests.test_filters',
 
3956
        'bzrlib.tests.test_filter_tree',
3752
3957
        'bzrlib.tests.test_ftp_transport',
3753
3958
        'bzrlib.tests.test_foreign',
3754
3959
        'bzrlib.tests.test_generate_docs',
3763
3968
        'bzrlib.tests.test_http',
3764
3969
        'bzrlib.tests.test_http_response',
3765
3970
        'bzrlib.tests.test_https_ca_bundle',
 
3971
        'bzrlib.tests.test_i18n',
3766
3972
        'bzrlib.tests.test_identitymap',
3767
3973
        'bzrlib.tests.test_ignores',
3768
3974
        'bzrlib.tests.test_index',
4144
4350
        the module is available.
4145
4351
    """
4146
4352
 
 
4353
    from bzrlib.tests.features import ModuleAvailableFeature
4147
4354
    py_module = pyutils.get_named_object(py_module_name)
4148
4355
    scenarios = [
4149
4356
        ('python', {'module': py_module}),
4190
4397
                         % (os.path.basename(dirname), printable_e))
4191
4398
 
4192
4399
 
4193
 
class Feature(object):
4194
 
    """An operating system Feature."""
4195
 
 
4196
 
    def __init__(self):
4197
 
        self._available = None
4198
 
 
4199
 
    def available(self):
4200
 
        """Is the feature available?
4201
 
 
4202
 
        :return: True if the feature is available.
4203
 
        """
4204
 
        if self._available is None:
4205
 
            self._available = self._probe()
4206
 
        return self._available
4207
 
 
4208
 
    def _probe(self):
4209
 
        """Implement this method in concrete features.
4210
 
 
4211
 
        :return: True if the feature is available.
4212
 
        """
4213
 
        raise NotImplementedError
4214
 
 
4215
 
    def __str__(self):
4216
 
        if getattr(self, 'feature_name', None):
4217
 
            return self.feature_name()
4218
 
        return self.__class__.__name__
4219
 
 
4220
 
 
4221
 
class _SymlinkFeature(Feature):
4222
 
 
4223
 
    def _probe(self):
4224
 
        return osutils.has_symlinks()
4225
 
 
4226
 
    def feature_name(self):
4227
 
        return 'symlinks'
4228
 
 
4229
 
SymlinkFeature = _SymlinkFeature()
4230
 
 
4231
 
 
4232
 
class _HardlinkFeature(Feature):
4233
 
 
4234
 
    def _probe(self):
4235
 
        return osutils.has_hardlinks()
4236
 
 
4237
 
    def feature_name(self):
4238
 
        return 'hardlinks'
4239
 
 
4240
 
HardlinkFeature = _HardlinkFeature()
4241
 
 
4242
 
 
4243
 
class _OsFifoFeature(Feature):
4244
 
 
4245
 
    def _probe(self):
4246
 
        return getattr(os, 'mkfifo', None)
4247
 
 
4248
 
    def feature_name(self):
4249
 
        return 'filesystem fifos'
4250
 
 
4251
 
OsFifoFeature = _OsFifoFeature()
4252
 
 
4253
 
 
4254
 
class _UnicodeFilenameFeature(Feature):
4255
 
    """Does the filesystem support Unicode filenames?"""
4256
 
 
4257
 
    def _probe(self):
4258
 
        try:
4259
 
            # Check for character combinations unlikely to be covered by any
4260
 
            # single non-unicode encoding. We use the characters
4261
 
            # - greek small letter alpha (U+03B1) and
4262
 
            # - braille pattern dots-123456 (U+283F).
4263
 
            os.stat(u'\u03b1\u283f')
4264
 
        except UnicodeEncodeError:
4265
 
            return False
4266
 
        except (IOError, OSError):
4267
 
            # The filesystem allows the Unicode filename but the file doesn't
4268
 
            # exist.
4269
 
            return True
4270
 
        else:
4271
 
            # The filesystem allows the Unicode filename and the file exists,
4272
 
            # for some reason.
4273
 
            return True
4274
 
 
4275
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4276
 
 
4277
 
 
4278
 
class _CompatabilityThunkFeature(Feature):
4279
 
    """This feature is just a thunk to another feature.
4280
 
 
4281
 
    It issues a deprecation warning if it is accessed, to let you know that you
4282
 
    should really use a different feature.
4283
 
    """
4284
 
 
4285
 
    def __init__(self, dep_version, module, name,
4286
 
                 replacement_name, replacement_module=None):
4287
 
        super(_CompatabilityThunkFeature, self).__init__()
4288
 
        self._module = module
4289
 
        if replacement_module is None:
4290
 
            replacement_module = module
4291
 
        self._replacement_module = replacement_module
4292
 
        self._name = name
4293
 
        self._replacement_name = replacement_name
4294
 
        self._dep_version = dep_version
4295
 
        self._feature = None
4296
 
 
4297
 
    def _ensure(self):
4298
 
        if self._feature is None:
4299
 
            depr_msg = self._dep_version % ('%s.%s'
4300
 
                                            % (self._module, self._name))
4301
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4302
 
                                               self._replacement_name)
4303
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4304
 
            # Import the new feature and use it as a replacement for the
4305
 
            # deprecated one.
4306
 
            self._feature = pyutils.get_named_object(
4307
 
                self._replacement_module, self._replacement_name)
4308
 
 
4309
 
    def _probe(self):
4310
 
        self._ensure()
4311
 
        return self._feature._probe()
4312
 
 
4313
 
 
4314
 
class ModuleAvailableFeature(Feature):
4315
 
    """This is a feature than describes a module we want to be available.
4316
 
 
4317
 
    Declare the name of the module in __init__(), and then after probing, the
4318
 
    module will be available as 'self.module'.
4319
 
 
4320
 
    :ivar module: The module if it is available, else None.
4321
 
    """
4322
 
 
4323
 
    def __init__(self, module_name):
4324
 
        super(ModuleAvailableFeature, self).__init__()
4325
 
        self.module_name = module_name
4326
 
 
4327
 
    def _probe(self):
4328
 
        try:
4329
 
            self._module = __import__(self.module_name, {}, {}, [''])
4330
 
            return True
4331
 
        except ImportError:
4332
 
            return False
4333
 
 
4334
 
    @property
4335
 
    def module(self):
4336
 
        if self.available(): # Make sure the probe has been done
4337
 
            return self._module
4338
 
        return None
4339
 
 
4340
 
    def feature_name(self):
4341
 
        return self.module_name
4342
 
 
4343
 
 
4344
4400
def probe_unicode_in_user_encoding():
4345
4401
    """Try to encode several unicode strings to use in unicode-aware tests.
4346
4402
    Return first successfull match.
4374
4430
    return None
4375
4431
 
4376
4432
 
4377
 
class _HTTPSServerFeature(Feature):
4378
 
    """Some tests want an https Server, check if one is available.
4379
 
 
4380
 
    Right now, the only way this is available is under python2.6 which provides
4381
 
    an ssl module.
4382
 
    """
4383
 
 
4384
 
    def _probe(self):
4385
 
        try:
4386
 
            import ssl
4387
 
            return True
4388
 
        except ImportError:
4389
 
            return False
4390
 
 
4391
 
    def feature_name(self):
4392
 
        return 'HTTPSServer'
4393
 
 
4394
 
 
4395
 
HTTPSServerFeature = _HTTPSServerFeature()
4396
 
 
4397
 
 
4398
 
class _UnicodeFilename(Feature):
4399
 
    """Does the filesystem support Unicode filenames?"""
4400
 
 
4401
 
    def _probe(self):
4402
 
        try:
4403
 
            os.stat(u'\u03b1')
4404
 
        except UnicodeEncodeError:
4405
 
            return False
4406
 
        except (IOError, OSError):
4407
 
            # The filesystem allows the Unicode filename but the file doesn't
4408
 
            # exist.
4409
 
            return True
4410
 
        else:
4411
 
            # The filesystem allows the Unicode filename and the file exists,
4412
 
            # for some reason.
4413
 
            return True
4414
 
 
4415
 
UnicodeFilename = _UnicodeFilename()
4416
 
 
4417
 
 
4418
 
class _ByteStringNamedFilesystem(Feature):
4419
 
    """Is the filesystem based on bytes?"""
4420
 
 
4421
 
    def _probe(self):
4422
 
        if os.name == "posix":
4423
 
            return True
4424
 
        return False
4425
 
 
4426
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4427
 
 
4428
 
 
4429
 
class _UTF8Filesystem(Feature):
4430
 
    """Is the filesystem UTF-8?"""
4431
 
 
4432
 
    def _probe(self):
4433
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4434
 
            return True
4435
 
        return False
4436
 
 
4437
 
UTF8Filesystem = _UTF8Filesystem()
4438
 
 
4439
 
 
4440
 
class _BreakinFeature(Feature):
4441
 
    """Does this platform support the breakin feature?"""
4442
 
 
4443
 
    def _probe(self):
4444
 
        from bzrlib import breakin
4445
 
        if breakin.determine_signal() is None:
4446
 
            return False
4447
 
        if sys.platform == 'win32':
4448
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4449
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4450
 
            # access the function
4451
 
            try:
4452
 
                import ctypes
4453
 
            except OSError:
4454
 
                return False
4455
 
        return True
4456
 
 
4457
 
    def feature_name(self):
4458
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4459
 
 
4460
 
 
4461
 
BreakinFeature = _BreakinFeature()
4462
 
 
4463
 
 
4464
 
class _CaseInsCasePresFilenameFeature(Feature):
4465
 
    """Is the file-system case insensitive, but case-preserving?"""
4466
 
 
4467
 
    def _probe(self):
4468
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4469
 
        try:
4470
 
            # first check truly case-preserving for created files, then check
4471
 
            # case insensitive when opening existing files.
4472
 
            name = osutils.normpath(name)
4473
 
            base, rel = osutils.split(name)
4474
 
            found_rel = osutils.canonical_relpath(base, name)
4475
 
            return (found_rel == rel
4476
 
                    and os.path.isfile(name.upper())
4477
 
                    and os.path.isfile(name.lower()))
4478
 
        finally:
4479
 
            os.close(fileno)
4480
 
            os.remove(name)
4481
 
 
4482
 
    def feature_name(self):
4483
 
        return "case-insensitive case-preserving filesystem"
4484
 
 
4485
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4486
 
 
4487
 
 
4488
 
class _CaseInsensitiveFilesystemFeature(Feature):
4489
 
    """Check if underlying filesystem is case-insensitive but *not* case
4490
 
    preserving.
4491
 
    """
4492
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4493
 
    # more likely to be case preserving, so this case is rare.
4494
 
 
4495
 
    def _probe(self):
4496
 
        if CaseInsCasePresFilenameFeature.available():
4497
 
            return False
4498
 
 
4499
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4500
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4501
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4502
 
        else:
4503
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4504
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4505
 
            dir=root)
4506
 
        name_a = osutils.pathjoin(tdir, 'a')
4507
 
        name_A = osutils.pathjoin(tdir, 'A')
4508
 
        os.mkdir(name_a)
4509
 
        result = osutils.isdir(name_A)
4510
 
        _rmtree_temp_dir(tdir)
4511
 
        return result
4512
 
 
4513
 
    def feature_name(self):
4514
 
        return 'case-insensitive filesystem'
4515
 
 
4516
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4517
 
 
4518
 
 
4519
 
class _CaseSensitiveFilesystemFeature(Feature):
4520
 
 
4521
 
    def _probe(self):
4522
 
        if CaseInsCasePresFilenameFeature.available():
4523
 
            return False
4524
 
        elif CaseInsensitiveFilesystemFeature.available():
4525
 
            return False
4526
 
        else:
4527
 
            return True
4528
 
 
4529
 
    def feature_name(self):
4530
 
        return 'case-sensitive filesystem'
4531
 
 
4532
 
# new coding style is for feature instances to be lowercase
4533
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4534
 
 
4535
 
 
4536
4433
# Only define SubUnitBzrRunner if subunit is available.
4537
4434
try:
4538
4435
    from subunit import TestProtocolClient
4564
4461
except ImportError:
4565
4462
    pass
4566
4463
 
4567
 
class _PosixPermissionsFeature(Feature):
4568
 
 
4569
 
    def _probe(self):
4570
 
        def has_perms():
4571
 
            # create temporary file and check if specified perms are maintained.
4572
 
            import tempfile
4573
 
 
4574
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4575
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4576
 
            fd, name = f
4577
 
            os.close(fd)
4578
 
            os.chmod(name, write_perms)
4579
 
 
4580
 
            read_perms = os.stat(name).st_mode & 0777
4581
 
            os.unlink(name)
4582
 
            return (write_perms == read_perms)
4583
 
 
4584
 
        return (os.name == 'posix') and has_perms()
4585
 
 
4586
 
    def feature_name(self):
4587
 
        return 'POSIX permissions support'
4588
 
 
4589
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4464
 
 
4465
@deprecated_function(deprecated_in((2, 5, 0)))
 
4466
def ModuleAvailableFeature(name):
 
4467
    from bzrlib.tests import features
 
4468
    return features.ModuleAvailableFeature(name)
 
4469