~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-08-09 17:04:46 UTC
  • mfrom: (6055.1.3 822571-bzr-home-unicode)
  • Revision ID: pqm@pqm.ubuntu.com-20110809170446-f1wc1a8fhgnxi4cn
(vila) Decode BZR_HOME with fs encoding to allow unicode homes. (Vincent
 Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
24
 
 
25
19
# NOTE: Some classes in here use camelCaseNaming() rather than
26
20
# underscore_naming().  That's for consistency with unittest; it's not the
27
21
# general style of bzrlib.  Please continue that consistency when adding e.g.
94
88
    memory,
95
89
    pathfilter,
96
90
    )
 
91
from bzrlib.symbol_versioning import (
 
92
    deprecated_function,
 
93
    deprecated_in,
 
94
    )
97
95
from bzrlib.tests import (
98
96
    test_server,
99
97
    TestUtil,
142
140
    'BZREMAIL': None, # may still be present in the environment
143
141
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
142
    'BZR_PROGRESS_BAR': None,
145
 
    'BZR_LOG': None,
 
143
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
144
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
145
    # TestCase should not use disk resources, BZR_LOG is one.
 
146
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
146
147
    'BZR_PLUGIN_PATH': None,
147
148
    'BZR_DISABLE_PLUGINS': None,
148
149
    'BZR_PLUGINS_AT': None,
377
378
        if isinstance(test, TestCase):
378
379
            test.addCleanup(self._check_leaked_threads, test)
379
380
 
 
381
    def stopTest(self, test):
 
382
        super(ExtendedTestResult, self).stopTest(test)
 
383
        # Manually break cycles, means touching various private things but hey
 
384
        getDetails = getattr(test, "getDetails", None)
 
385
        if getDetails is not None:
 
386
            getDetails().clear()
 
387
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
388
        if type_equality_funcs is not None:
 
389
            type_equality_funcs.clear()
 
390
        self._traceback_from_test = None
 
391
 
380
392
    def startTests(self):
381
393
        self.report_tests_starting()
382
394
        self._active_threads = threading.enumerate()
383
395
 
384
 
    def stopTest(self, test):
385
 
        self._traceback_from_test = None
386
 
 
387
396
    def _check_leaked_threads(self, test):
388
397
        """See if any threads have leaked since last call
389
398
 
920
929
 
921
930
    The method is really a factory and users are expected to use it as such.
922
931
    """
923
 
    
 
932
 
924
933
    kwargs['setUp'] = isolated_doctest_setUp
925
934
    kwargs['tearDown'] = isolated_doctest_tearDown
926
935
    return doctest.DocTestSuite(*args, **kwargs)
963
972
        super(TestCase, self).setUp()
964
973
        for feature in getattr(self, '_test_needs_features', []):
965
974
            self.requireFeature(feature)
966
 
        self._log_contents = None
967
 
        self.addDetail("log", content.Content(content.ContentType("text",
968
 
            "plain", {"charset": "utf8"}),
969
 
            lambda:[self._get_log(keep_log_file=True)]))
970
975
        self._cleanEnvironment()
971
976
        self._silenceUI()
972
977
        self._startLogFile()
984
989
        # settled on or a the FIXME associated with _get_expand_default_value
985
990
        # is addressed -- vila 20110219
986
991
        self.overrideAttr(config, '_expand_default_value', None)
 
992
        self._log_files = set()
 
993
        # Each key in the ``_counters`` dict holds a value for a different
 
994
        # counter. When the test ends, addDetail() should be used to output the
 
995
        # counter values. This happens in install_counter_hook().
 
996
        self._counters = {}
 
997
        if 'config_stats' in selftest_debug_flags:
 
998
            self._install_config_stats_hooks()
987
999
 
988
1000
    def debug(self):
989
1001
        # debug a frame up.
1006
1018
        if name in details:
1007
1019
            del details[name]
1008
1020
 
 
1021
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1022
        """Install a counting hook.
 
1023
 
 
1024
        Any hook can be counted as long as it doesn't need to return a value.
 
1025
 
 
1026
        :param hooks: Where the hook should be installed.
 
1027
 
 
1028
        :param name: The hook name that will be counted.
 
1029
 
 
1030
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1031
            to ``name``.
 
1032
        """
 
1033
        _counters = self._counters # Avoid closing over self
 
1034
        if counter_name is None:
 
1035
            counter_name = name
 
1036
        if _counters.has_key(counter_name):
 
1037
            raise AssertionError('%s is already used as a counter name'
 
1038
                                  % (counter_name,))
 
1039
        _counters[counter_name] = 0
 
1040
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1041
            lambda: ['%d' % (_counters[counter_name],)]))
 
1042
        def increment_counter(*args, **kwargs):
 
1043
            _counters[counter_name] += 1
 
1044
        label = 'count %s calls' % (counter_name,)
 
1045
        hooks.install_named_hook(name, increment_counter, label)
 
1046
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1047
 
 
1048
    def _install_config_stats_hooks(self):
 
1049
        """Install config hooks to count hook calls.
 
1050
 
 
1051
        """
 
1052
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1053
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1054
                                       'config.%s' % (hook_name,))
 
1055
 
 
1056
        # The OldConfigHooks are private and need special handling to protect
 
1057
        # against recursive tests (tests that run other tests), so we just do
 
1058
        # manually what registering them into _builtin_known_hooks will provide
 
1059
        # us.
 
1060
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1061
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1062
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1063
                                      'old_config.%s' % (hook_name,))
 
1064
 
1009
1065
    def _clear_debug_flags(self):
1010
1066
        """Prevent externally set debug flags affecting tests.
1011
1067
 
1065
1121
        # break some locks on purpose and should be taken into account by
1066
1122
        # considering that breaking a lock is just a dirty way of releasing it.
1067
1123
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1068
 
            message = ('Different number of acquired and '
1069
 
                       'released or broken locks. (%s, %s + %s)' %
1070
 
                       (acquired_locks, released_locks, broken_locks))
 
1124
            message = (
 
1125
                'Different number of acquired and '
 
1126
                'released or broken locks.\n'
 
1127
                'acquired=%s\n'
 
1128
                'released=%s\n'
 
1129
                'broken=%s\n' %
 
1130
                (acquired_locks, released_locks, broken_locks))
1071
1131
            if not self._lock_check_thorough:
1072
1132
                # Rather than fail, just warn
1073
1133
                print "Broken test %s: %s" % (self, message)
1101
1161
 
1102
1162
    def permit_dir(self, name):
1103
1163
        """Permit a directory to be used by this test. See permit_url."""
1104
 
        name_transport = _mod_transport.get_transport(name)
 
1164
        name_transport = _mod_transport.get_transport_from_path(name)
1105
1165
        self.permit_url(name)
1106
1166
        self.permit_url(name_transport.base)
1107
1167
 
1186
1246
        self.addCleanup(transport_server.stop_server)
1187
1247
        # Obtain a real transport because if the server supplies a password, it
1188
1248
        # will be hidden from the base on the client side.
1189
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1249
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1190
1250
        # Some transport servers effectively chroot the backing transport;
1191
1251
        # others like SFTPServer don't - users of the transport can walk up the
1192
1252
        # transport to read the entire backing transport. This wouldn't matter
1323
1383
        try:
1324
1384
            def capture():
1325
1385
                orig_log_exception_quietly()
1326
 
                captured.append(sys.exc_info())
 
1386
                captured.append(sys.exc_info()[1])
1327
1387
            trace.log_exception_quietly = capture
1328
1388
            func(*args, **kwargs)
1329
1389
        finally:
1330
1390
            trace.log_exception_quietly = orig_log_exception_quietly
1331
1391
        self.assertLength(1, captured)
1332
 
        err = captured[0][1]
 
1392
        err = captured[0]
1333
1393
        self.assertIsInstance(err, exception_class)
1334
1394
        return err
1335
1395
 
1542
1602
        not other callers that go direct to the warning module.
1543
1603
 
1544
1604
        To test that a deprecated method raises an error, do something like
1545
 
        this::
 
1605
        this (remember that both assertRaises and applyDeprecated delays *args
 
1606
        and **kwargs passing)::
1546
1607
 
1547
1608
            self.assertRaises(errors.ReservedId,
1548
1609
                self.applyDeprecated,
1630
1691
 
1631
1692
        The file is removed as the test is torn down.
1632
1693
        """
1633
 
        self._log_file = StringIO()
 
1694
        pseudo_log_file = StringIO()
 
1695
        def _get_log_contents_for_weird_testtools_api():
 
1696
            return [pseudo_log_file.getvalue().decode(
 
1697
                "utf-8", "replace").encode("utf-8")]
 
1698
        self.addDetail("log", content.Content(content.ContentType("text",
 
1699
            "plain", {"charset": "utf8"}),
 
1700
            _get_log_contents_for_weird_testtools_api))
 
1701
        self._log_file = pseudo_log_file
1634
1702
        self._log_memento = trace.push_log_file(self._log_file)
1635
1703
        self.addCleanup(self._finishLogFile)
1636
1704
 
1637
1705
    def _finishLogFile(self):
1638
1706
        """Finished with the log file.
1639
1707
 
1640
 
        Close the file and delete it, unless setKeepLogfile was called.
 
1708
        Close the file and delete it.
1641
1709
        """
1642
1710
        if trace._trace_file:
1643
1711
            # flush the log file, to get all content
1644
1712
            trace._trace_file.flush()
1645
1713
        trace.pop_log_file(self._log_memento)
1646
 
        # Cache the log result and delete the file on disk
1647
 
        self._get_log(False)
1648
1714
 
1649
1715
    def thisFailsStrictLockCheck(self):
1650
1716
        """It is known that this test would fail with -Dstrict_locks.
1662
1728
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1663
1729
        """Overrides an object attribute restoring it after the test.
1664
1730
 
 
1731
        :note: This should be used with discretion; you should think about
 
1732
        whether it's better to make the code testable without monkey-patching.
 
1733
 
1665
1734
        :param obj: The object that will be mutated.
1666
1735
 
1667
1736
        :param attr_name: The attribute name we want to preserve/override in
1692
1761
        self.addCleanup(osutils.set_or_unset_env, name, value)
1693
1762
        return value
1694
1763
 
 
1764
    def recordCalls(self, obj, attr_name):
 
1765
        """Monkeypatch in a wrapper that will record calls.
 
1766
 
 
1767
        The monkeypatch is automatically removed when the test concludes.
 
1768
 
 
1769
        :param obj: The namespace holding the reference to be replaced;
 
1770
            typically a module, class, or object.
 
1771
        :param attr_name: A string for the name of the attribute to 
 
1772
            patch.
 
1773
        :returns: A list that will be extended with one item every time the
 
1774
            function is called, with a tuple of (args, kwargs).
 
1775
        """
 
1776
        calls = []
 
1777
 
 
1778
        def decorator(*args, **kwargs):
 
1779
            calls.append((args, kwargs))
 
1780
            return orig(*args, **kwargs)
 
1781
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1782
        return calls
 
1783
 
1695
1784
    def _cleanEnvironment(self):
1696
1785
        for name, value in isolated_environ.iteritems():
1697
1786
            self.overrideEnv(name, value)
1699
1788
    def _restoreHooks(self):
1700
1789
        for klass, (name, hooks) in self._preserved_hooks.items():
1701
1790
            setattr(klass, name, hooks)
1702
 
        hooks._lazy_hooks = self._preserved_lazy_hooks
 
1791
        self._preserved_hooks.clear()
 
1792
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1793
        self._preserved_lazy_hooks.clear()
1703
1794
 
1704
1795
    def knownFailure(self, reason):
1705
 
        """This test has failed for some known reason."""
1706
 
        raise KnownFailure(reason)
 
1796
        """Declare that this test fails for a known reason
 
1797
 
 
1798
        Tests that are known to fail should generally be using expectedFailure
 
1799
        with an appropriate reverse assertion if a change could cause the test
 
1800
        to start passing. Conversely if the test has no immediate prospect of
 
1801
        succeeding then using skip is more suitable.
 
1802
 
 
1803
        When this method is called while an exception is being handled, that
 
1804
        traceback will be used, otherwise a new exception will be thrown to
 
1805
        provide one but won't be reported.
 
1806
        """
 
1807
        self._add_reason(reason)
 
1808
        try:
 
1809
            exc_info = sys.exc_info()
 
1810
            if exc_info != (None, None, None):
 
1811
                self._report_traceback(exc_info)
 
1812
            else:
 
1813
                try:
 
1814
                    raise self.failureException(reason)
 
1815
                except self.failureException:
 
1816
                    exc_info = sys.exc_info()
 
1817
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1818
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1819
        finally:
 
1820
            del exc_info
1707
1821
 
1708
1822
    def _suppress_log(self):
1709
1823
        """Remove the log info from details."""
1797
1911
    def log(self, *args):
1798
1912
        trace.mutter(*args)
1799
1913
 
1800
 
    def _get_log(self, keep_log_file=False):
1801
 
        """Internal helper to get the log from bzrlib.trace for this test.
1802
 
 
1803
 
        Please use self.getDetails, or self.get_log to access this in test case
1804
 
        code.
1805
 
 
1806
 
        :param keep_log_file: When True, if the log is still a file on disk
1807
 
            leave it as a file on disk. When False, if the log is still a file
1808
 
            on disk, the log file is deleted and the log preserved as
1809
 
            self._log_contents.
1810
 
        :return: A string containing the log.
1811
 
        """
1812
 
        if self._log_contents is not None:
1813
 
            try:
1814
 
                self._log_contents.decode('utf8')
1815
 
            except UnicodeDecodeError:
1816
 
                unicodestr = self._log_contents.decode('utf8', 'replace')
1817
 
                self._log_contents = unicodestr.encode('utf8')
1818
 
            return self._log_contents
1819
 
        if self._log_file is not None:
1820
 
            log_contents = self._log_file.getvalue()
1821
 
            try:
1822
 
                log_contents.decode('utf8')
1823
 
            except UnicodeDecodeError:
1824
 
                unicodestr = log_contents.decode('utf8', 'replace')
1825
 
                log_contents = unicodestr.encode('utf8')
1826
 
            if not keep_log_file:
1827
 
                self._log_file = None
1828
 
                # Permit multiple calls to get_log until we clean it up in
1829
 
                # finishLogFile
1830
 
                self._log_contents = log_contents
1831
 
            return log_contents
1832
 
        else:
1833
 
            return "No log file content."
1834
 
 
1835
1914
    def get_log(self):
1836
1915
        """Get a unicode string containing the log from bzrlib.trace.
1837
1916
 
2037
2116
    def start_bzr_subprocess(self, process_args, env_changes=None,
2038
2117
                             skip_if_plan_to_signal=False,
2039
2118
                             working_dir=None,
2040
 
                             allow_plugins=False):
 
2119
                             allow_plugins=False, stderr=subprocess.PIPE):
2041
2120
        """Start bzr in a subprocess for testing.
2042
2121
 
2043
2122
        This starts a new Python interpreter and runs bzr in there.
2055
2134
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2056
2135
            doesn't support signalling subprocesses.
2057
2136
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2137
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2138
            are those valid for the stderr argument of `subprocess.Popen`.
 
2139
            Default value is ``subprocess.PIPE``.
2058
2140
 
2059
2141
        :returns: Popen object for the started process.
2060
2142
        """
2086
2168
            # so we will avoid using it on all platforms, just to
2087
2169
            # make sure the code path is used, and we don't break on win32
2088
2170
            cleanup_environment()
 
2171
            # Include the subprocess's log file in the test details, in case
 
2172
            # the test fails due to an error in the subprocess.
 
2173
            self._add_subprocess_log(trace._get_bzr_log_filename())
2089
2174
            command = [sys.executable]
2090
2175
            # frozen executables don't need the path to bzr
2091
2176
            if getattr(sys, "frozen", None) is None:
2095
2180
            command.extend(process_args)
2096
2181
            process = self._popen(command, stdin=subprocess.PIPE,
2097
2182
                                  stdout=subprocess.PIPE,
2098
 
                                  stderr=subprocess.PIPE)
 
2183
                                  stderr=stderr)
2099
2184
        finally:
2100
2185
            restore_environment()
2101
2186
            if cwd is not None:
2103
2188
 
2104
2189
        return process
2105
2190
 
 
2191
    def _add_subprocess_log(self, log_file_path):
 
2192
        if len(self._log_files) == 0:
 
2193
            # Register an addCleanup func.  We do this on the first call to
 
2194
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2195
            # addCleanup is registered after any cleanups for tempdirs that
 
2196
            # subclasses might create, which will probably remove the log file
 
2197
            # we want to read.
 
2198
            self.addCleanup(self._subprocess_log_cleanup)
 
2199
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2200
        # twice.
 
2201
        self._log_files.add(log_file_path)
 
2202
 
 
2203
    def _subprocess_log_cleanup(self):
 
2204
        for count, log_file_path in enumerate(self._log_files):
 
2205
            # We use buffer_now=True to avoid holding the file open beyond
 
2206
            # the life of this function, which might interfere with e.g.
 
2207
            # cleaning tempdirs on Windows.
 
2208
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2209
            #detail_content = content.content_from_file(
 
2210
            #    log_file_path, buffer_now=True)
 
2211
            with open(log_file_path, 'rb') as log_file:
 
2212
                log_file_bytes = log_file.read()
 
2213
            detail_content = content.Content(content.ContentType("text",
 
2214
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2215
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2216
                detail_content)
 
2217
 
2106
2218
    def _popen(self, *args, **kwargs):
2107
2219
        """Place a call to Popen.
2108
2220
 
2260
2372
class TestCaseWithMemoryTransport(TestCase):
2261
2373
    """Common test class for tests that do not need disk resources.
2262
2374
 
2263
 
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2375
    Tests that need disk resources should derive from TestCaseInTempDir
 
2376
    orTestCaseWithTransport.
2264
2377
 
2265
2378
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2266
2379
 
2267
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2380
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2268
2381
    a directory which does not exist. This serves to help ensure test isolation
2269
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2270
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
2271
 
    file defaults for the transport in tests, nor does it obey the command line
 
2382
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2383
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2384
    defaults for the transport in tests, nor does it obey the command line
2272
2385
    override, so tests that accidentally write to the common directory should
2273
2386
    be rare.
2274
2387
 
2275
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
2276
 
    a .bzr directory that stops us ascending higher into the filesystem.
 
2388
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2389
        ``.bzr`` directory that stops us ascending higher into the filesystem.
2277
2390
    """
2278
2391
 
2279
2392
    TEST_ROOT = None
2297
2410
 
2298
2411
        :param relpath: a path relative to the base url.
2299
2412
        """
2300
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2413
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2301
2414
        self.assertFalse(t.is_readonly())
2302
2415
        return t
2303
2416
 
2436
2549
        real branch.
2437
2550
        """
2438
2551
        root = TestCaseWithMemoryTransport.TEST_ROOT
2439
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2552
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2553
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2554
        # we don't need to re-open the wt to check it hasn't changed.
 
2555
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2556
            wt.control_transport.get_bytes('dirstate'))
2440
2557
 
2441
2558
    def _check_safety_net(self):
2442
2559
        """Check that the safety .bzr directory have not been touched.
2445
2562
        propagating. This method ensures than a test did not leaked.
2446
2563
        """
2447
2564
        root = TestCaseWithMemoryTransport.TEST_ROOT
2448
 
        self.permit_url(_mod_transport.get_transport(root).base)
2449
 
        wt = workingtree.WorkingTree.open(root)
2450
 
        last_rev = wt.last_revision()
2451
 
        if last_rev != 'null:':
 
2565
        t = _mod_transport.get_transport(root)
 
2566
        self.permit_url(t.base)
 
2567
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2568
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2452
2569
            # The current test have modified the /bzr directory, we need to
2453
2570
            # recreate a new one or all the followng tests will fail.
2454
2571
            # If you need to inspect its content uncomment the following line
2524
2641
            backing_server = self.get_server()
2525
2642
        smart_server = test_server.SmartTCPServer_for_testing()
2526
2643
        self.start_server(smart_server, backing_server)
2527
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2644
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2528
2645
                                                   ).clone(path)
2529
2646
        return remote_transport
2530
2647
 
2547
2664
    def setUp(self):
2548
2665
        super(TestCaseWithMemoryTransport, self).setUp()
2549
2666
        # Ensure that ConnectedTransport doesn't leak sockets
2550
 
        def get_transport_with_cleanup(*args, **kwargs):
2551
 
            t = orig_get_transport(*args, **kwargs)
 
2667
        def get_transport_from_url_with_cleanup(*args, **kwargs):
 
2668
            t = orig_get_transport_from_url(*args, **kwargs)
2552
2669
            if isinstance(t, _mod_transport.ConnectedTransport):
2553
2670
                self.addCleanup(t.disconnect)
2554
2671
            return t
2555
2672
 
2556
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2557
 
                                               get_transport_with_cleanup)
 
2673
        orig_get_transport_from_url = self.overrideAttr(
 
2674
            _mod_transport, 'get_transport_from_url',
 
2675
            get_transport_from_url_with_cleanup)
2558
2676
        self._make_test_root()
2559
2677
        self.addCleanup(os.chdir, os.getcwdu())
2560
2678
        self.makeAndChdirToTestDir()
2603
2721
 
2604
2722
    OVERRIDE_PYTHON = 'python'
2605
2723
 
 
2724
    def setUp(self):
 
2725
        super(TestCaseInTempDir, self).setUp()
 
2726
        # Remove the protection set in isolated_environ, we have a proper
 
2727
        # access to disk resources now.
 
2728
        self.overrideEnv('BZR_LOG', None)
 
2729
 
2606
2730
    def check_file_contents(self, filename, expect):
2607
2731
        self.log("check contents of file %s" % filename)
2608
2732
        f = file(filename)
3499
3623
#                           with proper exclusion rules.
3500
3624
#   -Ethreads               Will display thread ident at creation/join time to
3501
3625
#                           help track thread leaks
 
3626
 
 
3627
#   -Econfig_stats          Will collect statistics using addDetail
3502
3628
selftest_debug_flags = set()
3503
3629
 
3504
3630
 
3811
3937
        'bzrlib.tests.test_export',
3812
3938
        'bzrlib.tests.test_export_pot',
3813
3939
        'bzrlib.tests.test_extract',
 
3940
        'bzrlib.tests.test_features',
3814
3941
        'bzrlib.tests.test_fetch',
3815
3942
        'bzrlib.tests.test_fixtures',
3816
3943
        'bzrlib.tests.test_fifo_cache',
3817
3944
        'bzrlib.tests.test_filters',
 
3945
        'bzrlib.tests.test_filter_tree',
3818
3946
        'bzrlib.tests.test_ftp_transport',
3819
3947
        'bzrlib.tests.test_foreign',
3820
3948
        'bzrlib.tests.test_generate_docs',
3829
3957
        'bzrlib.tests.test_http',
3830
3958
        'bzrlib.tests.test_http_response',
3831
3959
        'bzrlib.tests.test_https_ca_bundle',
 
3960
        'bzrlib.tests.test_i18n',
3832
3961
        'bzrlib.tests.test_identitymap',
3833
3962
        'bzrlib.tests.test_ignores',
3834
3963
        'bzrlib.tests.test_index',
4210
4339
        the module is available.
4211
4340
    """
4212
4341
 
 
4342
    from bzrlib.tests.features import ModuleAvailableFeature
4213
4343
    py_module = pyutils.get_named_object(py_module_name)
4214
4344
    scenarios = [
4215
4345
        ('python', {'module': py_module}),
4256
4386
                         % (os.path.basename(dirname), printable_e))
4257
4387
 
4258
4388
 
4259
 
class Feature(object):
4260
 
    """An operating system Feature."""
4261
 
 
4262
 
    def __init__(self):
4263
 
        self._available = None
4264
 
 
4265
 
    def available(self):
4266
 
        """Is the feature available?
4267
 
 
4268
 
        :return: True if the feature is available.
4269
 
        """
4270
 
        if self._available is None:
4271
 
            self._available = self._probe()
4272
 
        return self._available
4273
 
 
4274
 
    def _probe(self):
4275
 
        """Implement this method in concrete features.
4276
 
 
4277
 
        :return: True if the feature is available.
4278
 
        """
4279
 
        raise NotImplementedError
4280
 
 
4281
 
    def __str__(self):
4282
 
        if getattr(self, 'feature_name', None):
4283
 
            return self.feature_name()
4284
 
        return self.__class__.__name__
4285
 
 
4286
 
 
4287
 
class _SymlinkFeature(Feature):
4288
 
 
4289
 
    def _probe(self):
4290
 
        return osutils.has_symlinks()
4291
 
 
4292
 
    def feature_name(self):
4293
 
        return 'symlinks'
4294
 
 
4295
 
SymlinkFeature = _SymlinkFeature()
4296
 
 
4297
 
 
4298
 
class _HardlinkFeature(Feature):
4299
 
 
4300
 
    def _probe(self):
4301
 
        return osutils.has_hardlinks()
4302
 
 
4303
 
    def feature_name(self):
4304
 
        return 'hardlinks'
4305
 
 
4306
 
HardlinkFeature = _HardlinkFeature()
4307
 
 
4308
 
 
4309
 
class _OsFifoFeature(Feature):
4310
 
 
4311
 
    def _probe(self):
4312
 
        return getattr(os, 'mkfifo', None)
4313
 
 
4314
 
    def feature_name(self):
4315
 
        return 'filesystem fifos'
4316
 
 
4317
 
OsFifoFeature = _OsFifoFeature()
4318
 
 
4319
 
 
4320
 
class _UnicodeFilenameFeature(Feature):
4321
 
    """Does the filesystem support Unicode filenames?"""
4322
 
 
4323
 
    def _probe(self):
4324
 
        try:
4325
 
            # Check for character combinations unlikely to be covered by any
4326
 
            # single non-unicode encoding. We use the characters
4327
 
            # - greek small letter alpha (U+03B1) and
4328
 
            # - braille pattern dots-123456 (U+283F).
4329
 
            os.stat(u'\u03b1\u283f')
4330
 
        except UnicodeEncodeError:
4331
 
            return False
4332
 
        except (IOError, OSError):
4333
 
            # The filesystem allows the Unicode filename but the file doesn't
4334
 
            # exist.
4335
 
            return True
4336
 
        else:
4337
 
            # The filesystem allows the Unicode filename and the file exists,
4338
 
            # for some reason.
4339
 
            return True
4340
 
 
4341
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4342
 
 
4343
 
 
4344
 
class _CompatabilityThunkFeature(Feature):
4345
 
    """This feature is just a thunk to another feature.
4346
 
 
4347
 
    It issues a deprecation warning if it is accessed, to let you know that you
4348
 
    should really use a different feature.
4349
 
    """
4350
 
 
4351
 
    def __init__(self, dep_version, module, name,
4352
 
                 replacement_name, replacement_module=None):
4353
 
        super(_CompatabilityThunkFeature, self).__init__()
4354
 
        self._module = module
4355
 
        if replacement_module is None:
4356
 
            replacement_module = module
4357
 
        self._replacement_module = replacement_module
4358
 
        self._name = name
4359
 
        self._replacement_name = replacement_name
4360
 
        self._dep_version = dep_version
4361
 
        self._feature = None
4362
 
 
4363
 
    def _ensure(self):
4364
 
        if self._feature is None:
4365
 
            depr_msg = self._dep_version % ('%s.%s'
4366
 
                                            % (self._module, self._name))
4367
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4368
 
                                               self._replacement_name)
4369
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4370
 
            # Import the new feature and use it as a replacement for the
4371
 
            # deprecated one.
4372
 
            self._feature = pyutils.get_named_object(
4373
 
                self._replacement_module, self._replacement_name)
4374
 
 
4375
 
    def _probe(self):
4376
 
        self._ensure()
4377
 
        return self._feature._probe()
4378
 
 
4379
 
 
4380
 
class ModuleAvailableFeature(Feature):
4381
 
    """This is a feature than describes a module we want to be available.
4382
 
 
4383
 
    Declare the name of the module in __init__(), and then after probing, the
4384
 
    module will be available as 'self.module'.
4385
 
 
4386
 
    :ivar module: The module if it is available, else None.
4387
 
    """
4388
 
 
4389
 
    def __init__(self, module_name):
4390
 
        super(ModuleAvailableFeature, self).__init__()
4391
 
        self.module_name = module_name
4392
 
 
4393
 
    def _probe(self):
4394
 
        try:
4395
 
            self._module = __import__(self.module_name, {}, {}, [''])
4396
 
            return True
4397
 
        except ImportError:
4398
 
            return False
4399
 
 
4400
 
    @property
4401
 
    def module(self):
4402
 
        if self.available(): # Make sure the probe has been done
4403
 
            return self._module
4404
 
        return None
4405
 
 
4406
 
    def feature_name(self):
4407
 
        return self.module_name
4408
 
 
4409
 
 
4410
4389
def probe_unicode_in_user_encoding():
4411
4390
    """Try to encode several unicode strings to use in unicode-aware tests.
4412
4391
    Return first successfull match.
4440
4419
    return None
4441
4420
 
4442
4421
 
4443
 
class _HTTPSServerFeature(Feature):
4444
 
    """Some tests want an https Server, check if one is available.
4445
 
 
4446
 
    Right now, the only way this is available is under python2.6 which provides
4447
 
    an ssl module.
4448
 
    """
4449
 
 
4450
 
    def _probe(self):
4451
 
        try:
4452
 
            import ssl
4453
 
            return True
4454
 
        except ImportError:
4455
 
            return False
4456
 
 
4457
 
    def feature_name(self):
4458
 
        return 'HTTPSServer'
4459
 
 
4460
 
 
4461
 
HTTPSServerFeature = _HTTPSServerFeature()
4462
 
 
4463
 
 
4464
 
class _UnicodeFilename(Feature):
4465
 
    """Does the filesystem support Unicode filenames?"""
4466
 
 
4467
 
    def _probe(self):
4468
 
        try:
4469
 
            os.stat(u'\u03b1')
4470
 
        except UnicodeEncodeError:
4471
 
            return False
4472
 
        except (IOError, OSError):
4473
 
            # The filesystem allows the Unicode filename but the file doesn't
4474
 
            # exist.
4475
 
            return True
4476
 
        else:
4477
 
            # The filesystem allows the Unicode filename and the file exists,
4478
 
            # for some reason.
4479
 
            return True
4480
 
 
4481
 
UnicodeFilename = _UnicodeFilename()
4482
 
 
4483
 
 
4484
 
class _ByteStringNamedFilesystem(Feature):
4485
 
    """Is the filesystem based on bytes?"""
4486
 
 
4487
 
    def _probe(self):
4488
 
        if os.name == "posix":
4489
 
            return True
4490
 
        return False
4491
 
 
4492
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4493
 
 
4494
 
 
4495
 
class _UTF8Filesystem(Feature):
4496
 
    """Is the filesystem UTF-8?"""
4497
 
 
4498
 
    def _probe(self):
4499
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4500
 
            return True
4501
 
        return False
4502
 
 
4503
 
UTF8Filesystem = _UTF8Filesystem()
4504
 
 
4505
 
 
4506
 
class _BreakinFeature(Feature):
4507
 
    """Does this platform support the breakin feature?"""
4508
 
 
4509
 
    def _probe(self):
4510
 
        from bzrlib import breakin
4511
 
        if breakin.determine_signal() is None:
4512
 
            return False
4513
 
        if sys.platform == 'win32':
4514
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4515
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4516
 
            # access the function
4517
 
            try:
4518
 
                import ctypes
4519
 
            except OSError:
4520
 
                return False
4521
 
        return True
4522
 
 
4523
 
    def feature_name(self):
4524
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4525
 
 
4526
 
 
4527
 
BreakinFeature = _BreakinFeature()
4528
 
 
4529
 
 
4530
 
class _CaseInsCasePresFilenameFeature(Feature):
4531
 
    """Is the file-system case insensitive, but case-preserving?"""
4532
 
 
4533
 
    def _probe(self):
4534
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4535
 
        try:
4536
 
            # first check truly case-preserving for created files, then check
4537
 
            # case insensitive when opening existing files.
4538
 
            name = osutils.normpath(name)
4539
 
            base, rel = osutils.split(name)
4540
 
            found_rel = osutils.canonical_relpath(base, name)
4541
 
            return (found_rel == rel
4542
 
                    and os.path.isfile(name.upper())
4543
 
                    and os.path.isfile(name.lower()))
4544
 
        finally:
4545
 
            os.close(fileno)
4546
 
            os.remove(name)
4547
 
 
4548
 
    def feature_name(self):
4549
 
        return "case-insensitive case-preserving filesystem"
4550
 
 
4551
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4552
 
 
4553
 
 
4554
 
class _CaseInsensitiveFilesystemFeature(Feature):
4555
 
    """Check if underlying filesystem is case-insensitive but *not* case
4556
 
    preserving.
4557
 
    """
4558
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4559
 
    # more likely to be case preserving, so this case is rare.
4560
 
 
4561
 
    def _probe(self):
4562
 
        if CaseInsCasePresFilenameFeature.available():
4563
 
            return False
4564
 
 
4565
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4566
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4567
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4568
 
        else:
4569
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4570
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4571
 
            dir=root)
4572
 
        name_a = osutils.pathjoin(tdir, 'a')
4573
 
        name_A = osutils.pathjoin(tdir, 'A')
4574
 
        os.mkdir(name_a)
4575
 
        result = osutils.isdir(name_A)
4576
 
        _rmtree_temp_dir(tdir)
4577
 
        return result
4578
 
 
4579
 
    def feature_name(self):
4580
 
        return 'case-insensitive filesystem'
4581
 
 
4582
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4583
 
 
4584
 
 
4585
 
class _CaseSensitiveFilesystemFeature(Feature):
4586
 
 
4587
 
    def _probe(self):
4588
 
        if CaseInsCasePresFilenameFeature.available():
4589
 
            return False
4590
 
        elif CaseInsensitiveFilesystemFeature.available():
4591
 
            return False
4592
 
        else:
4593
 
            return True
4594
 
 
4595
 
    def feature_name(self):
4596
 
        return 'case-sensitive filesystem'
4597
 
 
4598
 
# new coding style is for feature instances to be lowercase
4599
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4600
 
 
4601
 
 
4602
4422
# Only define SubUnitBzrRunner if subunit is available.
4603
4423
try:
4604
4424
    from subunit import TestProtocolClient
4622
4442
except ImportError:
4623
4443
    pass
4624
4444
 
4625
 
class _PosixPermissionsFeature(Feature):
4626
 
 
4627
 
    def _probe(self):
4628
 
        def has_perms():
4629
 
            # create temporary file and check if specified perms are maintained.
4630
 
            import tempfile
4631
 
 
4632
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4633
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4634
 
            fd, name = f
4635
 
            os.close(fd)
4636
 
            os.chmod(name, write_perms)
4637
 
 
4638
 
            read_perms = os.stat(name).st_mode & 0777
4639
 
            os.unlink(name)
4640
 
            return (write_perms == read_perms)
4641
 
 
4642
 
        return (os.name == 'posix') and has_perms()
4643
 
 
4644
 
    def feature_name(self):
4645
 
        return 'POSIX permissions support'
4646
 
 
4647
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4445
 
 
4446
@deprecated_function(deprecated_in((2, 5, 0)))
 
4447
def ModuleAvailableFeature(name):
 
4448
    from bzrlib.tests import features
 
4449
    return features.ModuleAvailableFeature(name)
 
4450