~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin von Gagern
  • Date: 2011-06-01 12:53:56 UTC
  • mto: This revision was merged to the branch mainline in revision 6009.
  • Revision ID: martin.vgagern@gmx.net-20110601125356-lwozv2vecea6hxfz
Change from no_decorate to classify as name for the argument.

The command line switch remains as --no-classify, to keep backwards
compatibility.  Users are free to include --no-classify in an alias, and
still use --classify to change back.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
61
67
    chk_map,
62
68
    commands as _mod_commands,
63
69
    config,
64
 
    i18n,
65
70
    debug,
66
71
    errors,
67
72
    hooks,
89
94
    memory,
90
95
    pathfilter,
91
96
    )
92
 
from bzrlib.symbol_versioning import (
93
 
    deprecated_function,
94
 
    deprecated_in,
95
 
    )
96
97
from bzrlib.tests import (
97
98
    test_server,
98
99
    TestUtil,
141
142
    'BZREMAIL': None, # may still be present in the environment
142
143
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
144
    'BZR_PROGRESS_BAR': None,
144
 
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
 
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
146
 
    # TestCase should not use disk resources, BZR_LOG is one.
147
 
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
145
    'BZR_LOG': None,
148
146
    'BZR_PLUGIN_PATH': None,
149
147
    'BZR_DISABLE_PLUGINS': None,
150
148
    'BZR_PLUGINS_AT': None,
385
383
        getDetails = getattr(test, "getDetails", None)
386
384
        if getDetails is not None:
387
385
            getDetails().clear()
388
 
        # Clear _type_equality_funcs to try to stop TestCase instances
389
 
        # from wasting memory. 'clear' is not available in all Python
390
 
        # versions (bug 809048)
391
386
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
387
        if type_equality_funcs is not None:
393
 
            tef_clear = getattr(type_equality_funcs, "clear", None)
394
 
            if tef_clear is None:
395
 
                tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
 
                if tef_instance_dict is not None:
397
 
                    tef_clear = tef_instance_dict.clear
398
 
            if tef_clear is not None:
399
 
                tef_clear()
 
388
            type_equality_funcs.clear()
400
389
        self._traceback_from_test = None
401
390
 
402
391
    def startTests(self):
939
928
 
940
929
    The method is really a factory and users are expected to use it as such.
941
930
    """
942
 
 
 
931
    
943
932
    kwargs['setUp'] = isolated_doctest_setUp
944
933
    kwargs['tearDown'] = isolated_doctest_tearDown
945
934
    return doctest.DocTestSuite(*args, **kwargs)
999
988
        # settled on or a the FIXME associated with _get_expand_default_value
1000
989
        # is addressed -- vila 20110219
1001
990
        self.overrideAttr(config, '_expand_default_value', None)
1002
 
        self._log_files = set()
1003
 
        # Each key in the ``_counters`` dict holds a value for a different
1004
 
        # counter. When the test ends, addDetail() should be used to output the
1005
 
        # counter values. This happens in install_counter_hook().
1006
 
        self._counters = {}
1007
 
        if 'config_stats' in selftest_debug_flags:
1008
 
            self._install_config_stats_hooks()
1009
 
        # Do not use i18n for tests (unless the test reverses this)
1010
 
        i18n.disable_i18n()
1011
991
 
1012
992
    def debug(self):
1013
993
        # debug a frame up.
1014
994
        import pdb
1015
 
        # The sys preserved stdin/stdout should allow blackbox tests debugging
1016
 
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
 
                ).set_trace(sys._getframe().f_back)
 
995
        pdb.Pdb().set_trace(sys._getframe().f_back)
1018
996
 
1019
997
    def discardDetail(self, name):
1020
998
        """Extend the addDetail, getDetails api so we can remove a detail.
1032
1010
        if name in details:
1033
1011
            del details[name]
1034
1012
 
1035
 
    def install_counter_hook(self, hooks, name, counter_name=None):
1036
 
        """Install a counting hook.
1037
 
 
1038
 
        Any hook can be counted as long as it doesn't need to return a value.
1039
 
 
1040
 
        :param hooks: Where the hook should be installed.
1041
 
 
1042
 
        :param name: The hook name that will be counted.
1043
 
 
1044
 
        :param counter_name: The counter identifier in ``_counters``, defaults
1045
 
            to ``name``.
1046
 
        """
1047
 
        _counters = self._counters # Avoid closing over self
1048
 
        if counter_name is None:
1049
 
            counter_name = name
1050
 
        if _counters.has_key(counter_name):
1051
 
            raise AssertionError('%s is already used as a counter name'
1052
 
                                  % (counter_name,))
1053
 
        _counters[counter_name] = 0
1054
 
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
 
            lambda: ['%d' % (_counters[counter_name],)]))
1056
 
        def increment_counter(*args, **kwargs):
1057
 
            _counters[counter_name] += 1
1058
 
        label = 'count %s calls' % (counter_name,)
1059
 
        hooks.install_named_hook(name, increment_counter, label)
1060
 
        self.addCleanup(hooks.uninstall_named_hook, name, label)
1061
 
 
1062
 
    def _install_config_stats_hooks(self):
1063
 
        """Install config hooks to count hook calls.
1064
 
 
1065
 
        """
1066
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
 
            self.install_counter_hook(config.ConfigHooks, hook_name,
1068
 
                                       'config.%s' % (hook_name,))
1069
 
 
1070
 
        # The OldConfigHooks are private and need special handling to protect
1071
 
        # against recursive tests (tests that run other tests), so we just do
1072
 
        # manually what registering them into _builtin_known_hooks will provide
1073
 
        # us.
1074
 
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
 
            self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
 
                                      'old_config.%s' % (hook_name,))
1078
 
 
1079
1013
    def _clear_debug_flags(self):
1080
1014
        """Prevent externally set debug flags affecting tests.
1081
1015
 
1135
1069
        # break some locks on purpose and should be taken into account by
1136
1070
        # considering that breaking a lock is just a dirty way of releasing it.
1137
1071
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1138
 
            message = (
1139
 
                'Different number of acquired and '
1140
 
                'released or broken locks.\n'
1141
 
                'acquired=%s\n'
1142
 
                'released=%s\n'
1143
 
                'broken=%s\n' %
1144
 
                (acquired_locks, released_locks, broken_locks))
 
1072
            message = ('Different number of acquired and '
 
1073
                       'released or broken locks. (%s, %s + %s)' %
 
1074
                       (acquired_locks, released_locks, broken_locks))
1145
1075
            if not self._lock_check_thorough:
1146
1076
                # Rather than fail, just warn
1147
1077
                print "Broken test %s: %s" % (self, message)
1175
1105
 
1176
1106
    def permit_dir(self, name):
1177
1107
        """Permit a directory to be used by this test. See permit_url."""
1178
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1108
        name_transport = _mod_transport.get_transport(name)
1179
1109
        self.permit_url(name)
1180
1110
        self.permit_url(name_transport.base)
1181
1111
 
1260
1190
        self.addCleanup(transport_server.stop_server)
1261
1191
        # Obtain a real transport because if the server supplies a password, it
1262
1192
        # will be hidden from the base on the client side.
1263
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1193
        t = _mod_transport.get_transport(transport_server.get_url())
1264
1194
        # Some transport servers effectively chroot the backing transport;
1265
1195
        # others like SFTPServer don't - users of the transport can walk up the
1266
1196
        # transport to read the entire backing transport. This wouldn't matter
1708
1638
        pseudo_log_file = StringIO()
1709
1639
        def _get_log_contents_for_weird_testtools_api():
1710
1640
            return [pseudo_log_file.getvalue().decode(
1711
 
                "utf-8", "replace").encode("utf-8")]
 
1641
                "utf-8", "replace").encode("utf-8")]          
1712
1642
        self.addDetail("log", content.Content(content.ContentType("text",
1713
1643
            "plain", {"charset": "utf8"}),
1714
1644
            _get_log_contents_for_weird_testtools_api))
1719
1649
    def _finishLogFile(self):
1720
1650
        """Finished with the log file.
1721
1651
 
1722
 
        Close the file and delete it.
 
1652
        Close the file and delete it, unless setKeepLogfile was called.
1723
1653
        """
1724
1654
        if trace._trace_file:
1725
1655
            # flush the log file, to get all content
1742
1672
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1743
1673
        """Overrides an object attribute restoring it after the test.
1744
1674
 
1745
 
        :note: This should be used with discretion; you should think about
1746
 
        whether it's better to make the code testable without monkey-patching.
1747
 
 
1748
1675
        :param obj: The object that will be mutated.
1749
1676
 
1750
1677
        :param attr_name: The attribute name we want to preserve/override in
1775
1702
        self.addCleanup(osutils.set_or_unset_env, name, value)
1776
1703
        return value
1777
1704
 
1778
 
    def recordCalls(self, obj, attr_name):
1779
 
        """Monkeypatch in a wrapper that will record calls.
1780
 
 
1781
 
        The monkeypatch is automatically removed when the test concludes.
1782
 
 
1783
 
        :param obj: The namespace holding the reference to be replaced;
1784
 
            typically a module, class, or object.
1785
 
        :param attr_name: A string for the name of the attribute to 
1786
 
            patch.
1787
 
        :returns: A list that will be extended with one item every time the
1788
 
            function is called, with a tuple of (args, kwargs).
1789
 
        """
1790
 
        calls = []
1791
 
 
1792
 
        def decorator(*args, **kwargs):
1793
 
            calls.append((args, kwargs))
1794
 
            return orig(*args, **kwargs)
1795
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1796
 
        return calls
1797
 
 
1798
1705
    def _cleanEnvironment(self):
1799
1706
        for name, value in isolated_environ.iteritems():
1800
1707
            self.overrideEnv(name, value)
1807
1714
        self._preserved_lazy_hooks.clear()
1808
1715
 
1809
1716
    def knownFailure(self, reason):
1810
 
        """Declare that this test fails for a known reason
1811
 
 
1812
 
        Tests that are known to fail should generally be using expectedFailure
1813
 
        with an appropriate reverse assertion if a change could cause the test
1814
 
        to start passing. Conversely if the test has no immediate prospect of
1815
 
        succeeding then using skip is more suitable.
1816
 
 
1817
 
        When this method is called while an exception is being handled, that
1818
 
        traceback will be used, otherwise a new exception will be thrown to
1819
 
        provide one but won't be reported.
1820
 
        """
1821
 
        self._add_reason(reason)
1822
 
        try:
1823
 
            exc_info = sys.exc_info()
1824
 
            if exc_info != (None, None, None):
1825
 
                self._report_traceback(exc_info)
1826
 
            else:
1827
 
                try:
1828
 
                    raise self.failureException(reason)
1829
 
                except self.failureException:
1830
 
                    exc_info = sys.exc_info()
1831
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1833
 
        finally:
1834
 
            del exc_info
 
1717
        """This test has failed for some known reason."""
 
1718
        raise KnownFailure(reason)
1835
1719
 
1836
1720
    def _suppress_log(self):
1837
1721
        """Remove the log info from details."""
2182
2066
            # so we will avoid using it on all platforms, just to
2183
2067
            # make sure the code path is used, and we don't break on win32
2184
2068
            cleanup_environment()
2185
 
            # Include the subprocess's log file in the test details, in case
2186
 
            # the test fails due to an error in the subprocess.
2187
 
            self._add_subprocess_log(trace._get_bzr_log_filename())
2188
2069
            command = [sys.executable]
2189
2070
            # frozen executables don't need the path to bzr
2190
2071
            if getattr(sys, "frozen", None) is None:
2202
2083
 
2203
2084
        return process
2204
2085
 
2205
 
    def _add_subprocess_log(self, log_file_path):
2206
 
        if len(self._log_files) == 0:
2207
 
            # Register an addCleanup func.  We do this on the first call to
2208
 
            # _add_subprocess_log rather than in TestCase.setUp so that this
2209
 
            # addCleanup is registered after any cleanups for tempdirs that
2210
 
            # subclasses might create, which will probably remove the log file
2211
 
            # we want to read.
2212
 
            self.addCleanup(self._subprocess_log_cleanup)
2213
 
        # self._log_files is a set, so if a log file is reused we won't grab it
2214
 
        # twice.
2215
 
        self._log_files.add(log_file_path)
2216
 
 
2217
 
    def _subprocess_log_cleanup(self):
2218
 
        for count, log_file_path in enumerate(self._log_files):
2219
 
            # We use buffer_now=True to avoid holding the file open beyond
2220
 
            # the life of this function, which might interfere with e.g.
2221
 
            # cleaning tempdirs on Windows.
2222
 
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
 
            #detail_content = content.content_from_file(
2224
 
            #    log_file_path, buffer_now=True)
2225
 
            with open(log_file_path, 'rb') as log_file:
2226
 
                log_file_bytes = log_file.read()
2227
 
            detail_content = content.Content(content.ContentType("text",
2228
 
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
 
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2230
 
                detail_content)
2231
 
 
2232
2086
    def _popen(self, *args, **kwargs):
2233
2087
        """Place a call to Popen.
2234
2088
 
2386
2240
class TestCaseWithMemoryTransport(TestCase):
2387
2241
    """Common test class for tests that do not need disk resources.
2388
2242
 
2389
 
    Tests that need disk resources should derive from TestCaseInTempDir
2390
 
    orTestCaseWithTransport.
 
2243
    Tests that need disk resources should derive from TestCaseWithTransport.
2391
2244
 
2392
2245
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2393
2246
 
2394
 
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2247
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2248
    a directory which does not exist. This serves to help ensure test isolation
2396
 
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
 
    must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
 
    defaults for the transport in tests, nor does it obey the command line
 
2249
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2250
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2251
    file defaults for the transport in tests, nor does it obey the command line
2399
2252
    override, so tests that accidentally write to the common directory should
2400
2253
    be rare.
2401
2254
 
2402
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
 
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2255
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2256
    a .bzr directory that stops us ascending higher into the filesystem.
2404
2257
    """
2405
2258
 
2406
2259
    TEST_ROOT = None
2424
2277
 
2425
2278
        :param relpath: a path relative to the base url.
2426
2279
        """
2427
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2280
        t = _mod_transport.get_transport(self.get_url(relpath))
2428
2281
        self.assertFalse(t.is_readonly())
2429
2282
        return t
2430
2283
 
2436
2289
 
2437
2290
        :param relpath: a path relative to the base url.
2438
2291
        """
2439
 
        t = _mod_transport.get_transport_from_url(
2440
 
            self.get_readonly_url(relpath))
 
2292
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2441
2293
        self.assertTrue(t.is_readonly())
2442
2294
        return t
2443
2295
 
2564
2416
        real branch.
2565
2417
        """
2566
2418
        root = TestCaseWithMemoryTransport.TEST_ROOT
2567
 
        try:
2568
 
            # Make sure we get a readable and accessible home for .bzr.log
2569
 
            # and/or config files, and not fallback to weird defaults (see
2570
 
            # http://pad.lv/825027).
2571
 
            self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
 
            os.environ['BZR_HOME'] = root
2573
 
            wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
 
            del os.environ['BZR_HOME']
2575
 
        except Exception, e:
2576
 
            self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2578
 
        # we don't need to re-open the wt to check it hasn't changed.
2579
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
 
            wt.control_transport.get_bytes('dirstate'))
 
2419
        bzrdir.BzrDir.create_standalone_workingtree(root)
2581
2420
 
2582
2421
    def _check_safety_net(self):
2583
2422
        """Check that the safety .bzr directory have not been touched.
2586
2425
        propagating. This method ensures than a test did not leaked.
2587
2426
        """
2588
2427
        root = TestCaseWithMemoryTransport.TEST_ROOT
2589
 
        t = _mod_transport.get_transport_from_path(root)
2590
 
        self.permit_url(t.base)
2591
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2592
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2428
        self.permit_url(_mod_transport.get_transport(root).base)
 
2429
        wt = workingtree.WorkingTree.open(root)
 
2430
        last_rev = wt.last_revision()
 
2431
        if last_rev != 'null:':
2593
2432
            # The current test have modified the /bzr directory, we need to
2594
2433
            # recreate a new one or all the followng tests will fail.
2595
2434
            # If you need to inspect its content uncomment the following line
2630
2469
    def make_branch(self, relpath, format=None):
2631
2470
        """Create a branch on the transport at relpath."""
2632
2471
        repo = self.make_repository(relpath, format=format)
2633
 
        return repo.bzrdir.create_branch(append_revisions_only=False)
2634
 
 
2635
 
    def resolve_format(self, format):
2636
 
        """Resolve an object to a ControlDir format object.
2637
 
 
2638
 
        The initial format object can either already be
2639
 
        a ControlDirFormat, None (for the default format),
2640
 
        or a string with the name of the control dir format.
2641
 
 
2642
 
        :param format: Object to resolve
2643
 
        :return A ControlDirFormat instance
2644
 
        """
2645
 
        if format is None:
2646
 
            format = 'default'
2647
 
        if isinstance(format, basestring):
2648
 
            format = bzrdir.format_registry.make_bzrdir(format)
2649
 
        return format
2650
 
 
2651
 
    def resolve_format(self, format):
2652
 
        """Resolve an object to a ControlDir format object.
2653
 
 
2654
 
        The initial format object can either already be
2655
 
        a ControlDirFormat, None (for the default format),
2656
 
        or a string with the name of the control dir format.
2657
 
 
2658
 
        :param format: Object to resolve
2659
 
        :return A ControlDirFormat instance
2660
 
        """
2661
 
        if format is None:
2662
 
            format = 'default'
2663
 
        if isinstance(format, basestring):
2664
 
            format = bzrdir.format_registry.make_bzrdir(format)
2665
 
        return format
 
2472
        return repo.bzrdir.create_branch()
2666
2473
 
2667
2474
    def make_bzrdir(self, relpath, format=None):
2668
2475
        try:
2672
2479
            t = _mod_transport.get_transport(maybe_a_url)
2673
2480
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2481
                t.ensure_base()
2675
 
            format = self.resolve_format(format)
 
2482
            if format is None:
 
2483
                format = 'default'
 
2484
            if isinstance(format, basestring):
 
2485
                format = bzrdir.format_registry.make_bzrdir(format)
2676
2486
            return format.initialize_on_transport(t)
2677
2487
        except errors.UninitializableFormat:
2678
2488
            raise TestSkipped("Format %s is not initializable." % format)
2679
2489
 
2680
 
    def make_repository(self, relpath, shared=None, format=None):
 
2490
    def make_repository(self, relpath, shared=False, format=None):
2681
2491
        """Create a repository on our default transport at relpath.
2682
2492
 
2683
2493
        Note that relpath must be a relative path, not a full url.
2694
2504
            backing_server = self.get_server()
2695
2505
        smart_server = test_server.SmartTCPServer_for_testing()
2696
2506
        self.start_server(smart_server, backing_server)
2697
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2507
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2698
2508
                                                   ).clone(path)
2699
2509
        return remote_transport
2700
2510
 
2717
2527
    def setUp(self):
2718
2528
        super(TestCaseWithMemoryTransport, self).setUp()
2719
2529
        # Ensure that ConnectedTransport doesn't leak sockets
2720
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2530
        def get_transport_with_cleanup(*args, **kwargs):
 
2531
            t = orig_get_transport(*args, **kwargs)
2722
2532
            if isinstance(t, _mod_transport.ConnectedTransport):
2723
2533
                self.addCleanup(t.disconnect)
2724
2534
            return t
2725
2535
 
2726
 
        orig_get_transport_from_url = self.overrideAttr(
2727
 
            _mod_transport, 'get_transport_from_url',
2728
 
            get_transport_from_url_with_cleanup)
 
2536
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2537
                                               get_transport_with_cleanup)
2729
2538
        self._make_test_root()
2730
2539
        self.addCleanup(os.chdir, os.getcwdu())
2731
2540
        self.makeAndChdirToTestDir()
2774
2583
 
2775
2584
    OVERRIDE_PYTHON = 'python'
2776
2585
 
2777
 
    def setUp(self):
2778
 
        super(TestCaseInTempDir, self).setUp()
2779
 
        # Remove the protection set in isolated_environ, we have a proper
2780
 
        # access to disk resources now.
2781
 
        self.overrideEnv('BZR_LOG', None)
2782
 
 
2783
2586
    def check_file_contents(self, filename, expect):
2784
2587
        self.log("check contents of file %s" % filename)
2785
2588
        f = file(filename)
2866
2669
                "a list or a tuple. Got %r instead" % (shape,))
2867
2670
        # It's OK to just create them using forward slashes on windows.
2868
2671
        if transport is None or transport.is_readonly():
2869
 
            transport = _mod_transport.get_transport_from_path(".")
 
2672
            transport = _mod_transport.get_transport(".")
2870
2673
        for name in shape:
2871
2674
            self.assertIsInstance(name, basestring)
2872
2675
            if name[-1] == '/':
3676
3479
#                           with proper exclusion rules.
3677
3480
#   -Ethreads               Will display thread ident at creation/join time to
3678
3481
#                           help track thread leaks
3679
 
 
3680
 
#   -Econfig_stats          Will collect statistics using addDetail
3681
3482
selftest_debug_flags = set()
3682
3483
 
3683
3484
 
3987
3788
        'bzrlib.tests.test_email_message',
3988
3789
        'bzrlib.tests.test_eol_filters',
3989
3790
        'bzrlib.tests.test_errors',
3990
 
        'bzrlib.tests.test_estimate_compressed_size',
3991
3791
        'bzrlib.tests.test_export',
3992
3792
        'bzrlib.tests.test_export_pot',
3993
3793
        'bzrlib.tests.test_extract',
3994
 
        'bzrlib.tests.test_features',
3995
3794
        'bzrlib.tests.test_fetch',
3996
3795
        'bzrlib.tests.test_fixtures',
3997
3796
        'bzrlib.tests.test_fifo_cache',
3998
3797
        'bzrlib.tests.test_filters',
3999
 
        'bzrlib.tests.test_filter_tree',
4000
3798
        'bzrlib.tests.test_ftp_transport',
4001
3799
        'bzrlib.tests.test_foreign',
4002
3800
        'bzrlib.tests.test_generate_docs',
4077
3875
        'bzrlib.tests.test_smart',
4078
3876
        'bzrlib.tests.test_smart_add',
4079
3877
        'bzrlib.tests.test_smart_request',
4080
 
        'bzrlib.tests.test_smart_signals',
4081
3878
        'bzrlib.tests.test_smart_transport',
4082
3879
        'bzrlib.tests.test_smtp_connection',
4083
3880
        'bzrlib.tests.test_source',
4394
4191
        the module is available.
4395
4192
    """
4396
4193
 
4397
 
    from bzrlib.tests.features import ModuleAvailableFeature
4398
4194
    py_module = pyutils.get_named_object(py_module_name)
4399
4195
    scenarios = [
4400
4196
        ('python', {'module': py_module}),
4441
4237
                         % (os.path.basename(dirname), printable_e))
4442
4238
 
4443
4239
 
 
4240
class Feature(object):
 
4241
    """An operating system Feature."""
 
4242
 
 
4243
    def __init__(self):
 
4244
        self._available = None
 
4245
 
 
4246
    def available(self):
 
4247
        """Is the feature available?
 
4248
 
 
4249
        :return: True if the feature is available.
 
4250
        """
 
4251
        if self._available is None:
 
4252
            self._available = self._probe()
 
4253
        return self._available
 
4254
 
 
4255
    def _probe(self):
 
4256
        """Implement this method in concrete features.
 
4257
 
 
4258
        :return: True if the feature is available.
 
4259
        """
 
4260
        raise NotImplementedError
 
4261
 
 
4262
    def __str__(self):
 
4263
        if getattr(self, 'feature_name', None):
 
4264
            return self.feature_name()
 
4265
        return self.__class__.__name__
 
4266
 
 
4267
 
 
4268
class _SymlinkFeature(Feature):
 
4269
 
 
4270
    def _probe(self):
 
4271
        return osutils.has_symlinks()
 
4272
 
 
4273
    def feature_name(self):
 
4274
        return 'symlinks'
 
4275
 
 
4276
SymlinkFeature = _SymlinkFeature()
 
4277
 
 
4278
 
 
4279
class _HardlinkFeature(Feature):
 
4280
 
 
4281
    def _probe(self):
 
4282
        return osutils.has_hardlinks()
 
4283
 
 
4284
    def feature_name(self):
 
4285
        return 'hardlinks'
 
4286
 
 
4287
HardlinkFeature = _HardlinkFeature()
 
4288
 
 
4289
 
 
4290
class _OsFifoFeature(Feature):
 
4291
 
 
4292
    def _probe(self):
 
4293
        return getattr(os, 'mkfifo', None)
 
4294
 
 
4295
    def feature_name(self):
 
4296
        return 'filesystem fifos'
 
4297
 
 
4298
OsFifoFeature = _OsFifoFeature()
 
4299
 
 
4300
 
 
4301
class _UnicodeFilenameFeature(Feature):
 
4302
    """Does the filesystem support Unicode filenames?"""
 
4303
 
 
4304
    def _probe(self):
 
4305
        try:
 
4306
            # Check for character combinations unlikely to be covered by any
 
4307
            # single non-unicode encoding. We use the characters
 
4308
            # - greek small letter alpha (U+03B1) and
 
4309
            # - braille pattern dots-123456 (U+283F).
 
4310
            os.stat(u'\u03b1\u283f')
 
4311
        except UnicodeEncodeError:
 
4312
            return False
 
4313
        except (IOError, OSError):
 
4314
            # The filesystem allows the Unicode filename but the file doesn't
 
4315
            # exist.
 
4316
            return True
 
4317
        else:
 
4318
            # The filesystem allows the Unicode filename and the file exists,
 
4319
            # for some reason.
 
4320
            return True
 
4321
 
 
4322
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4323
 
 
4324
 
 
4325
class _CompatabilityThunkFeature(Feature):
 
4326
    """This feature is just a thunk to another feature.
 
4327
 
 
4328
    It issues a deprecation warning if it is accessed, to let you know that you
 
4329
    should really use a different feature.
 
4330
    """
 
4331
 
 
4332
    def __init__(self, dep_version, module, name,
 
4333
                 replacement_name, replacement_module=None):
 
4334
        super(_CompatabilityThunkFeature, self).__init__()
 
4335
        self._module = module
 
4336
        if replacement_module is None:
 
4337
            replacement_module = module
 
4338
        self._replacement_module = replacement_module
 
4339
        self._name = name
 
4340
        self._replacement_name = replacement_name
 
4341
        self._dep_version = dep_version
 
4342
        self._feature = None
 
4343
 
 
4344
    def _ensure(self):
 
4345
        if self._feature is None:
 
4346
            depr_msg = self._dep_version % ('%s.%s'
 
4347
                                            % (self._module, self._name))
 
4348
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4349
                                               self._replacement_name)
 
4350
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4351
            # Import the new feature and use it as a replacement for the
 
4352
            # deprecated one.
 
4353
            self._feature = pyutils.get_named_object(
 
4354
                self._replacement_module, self._replacement_name)
 
4355
 
 
4356
    def _probe(self):
 
4357
        self._ensure()
 
4358
        return self._feature._probe()
 
4359
 
 
4360
 
 
4361
class ModuleAvailableFeature(Feature):
 
4362
    """This is a feature than describes a module we want to be available.
 
4363
 
 
4364
    Declare the name of the module in __init__(), and then after probing, the
 
4365
    module will be available as 'self.module'.
 
4366
 
 
4367
    :ivar module: The module if it is available, else None.
 
4368
    """
 
4369
 
 
4370
    def __init__(self, module_name):
 
4371
        super(ModuleAvailableFeature, self).__init__()
 
4372
        self.module_name = module_name
 
4373
 
 
4374
    def _probe(self):
 
4375
        try:
 
4376
            self._module = __import__(self.module_name, {}, {}, [''])
 
4377
            return True
 
4378
        except ImportError:
 
4379
            return False
 
4380
 
 
4381
    @property
 
4382
    def module(self):
 
4383
        if self.available(): # Make sure the probe has been done
 
4384
            return self._module
 
4385
        return None
 
4386
 
 
4387
    def feature_name(self):
 
4388
        return self.module_name
 
4389
 
 
4390
 
4444
4391
def probe_unicode_in_user_encoding():
4445
4392
    """Try to encode several unicode strings to use in unicode-aware tests.
4446
4393
    Return first successfull match.
4474
4421
    return None
4475
4422
 
4476
4423
 
 
4424
class _HTTPSServerFeature(Feature):
 
4425
    """Some tests want an https Server, check if one is available.
 
4426
 
 
4427
    Right now, the only way this is available is under python2.6 which provides
 
4428
    an ssl module.
 
4429
    """
 
4430
 
 
4431
    def _probe(self):
 
4432
        try:
 
4433
            import ssl
 
4434
            return True
 
4435
        except ImportError:
 
4436
            return False
 
4437
 
 
4438
    def feature_name(self):
 
4439
        return 'HTTPSServer'
 
4440
 
 
4441
 
 
4442
HTTPSServerFeature = _HTTPSServerFeature()
 
4443
 
 
4444
 
 
4445
class _UnicodeFilename(Feature):
 
4446
    """Does the filesystem support Unicode filenames?"""
 
4447
 
 
4448
    def _probe(self):
 
4449
        try:
 
4450
            os.stat(u'\u03b1')
 
4451
        except UnicodeEncodeError:
 
4452
            return False
 
4453
        except (IOError, OSError):
 
4454
            # The filesystem allows the Unicode filename but the file doesn't
 
4455
            # exist.
 
4456
            return True
 
4457
        else:
 
4458
            # The filesystem allows the Unicode filename and the file exists,
 
4459
            # for some reason.
 
4460
            return True
 
4461
 
 
4462
UnicodeFilename = _UnicodeFilename()
 
4463
 
 
4464
 
 
4465
class _ByteStringNamedFilesystem(Feature):
 
4466
    """Is the filesystem based on bytes?"""
 
4467
 
 
4468
    def _probe(self):
 
4469
        if os.name == "posix":
 
4470
            return True
 
4471
        return False
 
4472
 
 
4473
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4474
 
 
4475
 
 
4476
class _UTF8Filesystem(Feature):
 
4477
    """Is the filesystem UTF-8?"""
 
4478
 
 
4479
    def _probe(self):
 
4480
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4481
            return True
 
4482
        return False
 
4483
 
 
4484
UTF8Filesystem = _UTF8Filesystem()
 
4485
 
 
4486
 
 
4487
class _BreakinFeature(Feature):
 
4488
    """Does this platform support the breakin feature?"""
 
4489
 
 
4490
    def _probe(self):
 
4491
        from bzrlib import breakin
 
4492
        if breakin.determine_signal() is None:
 
4493
            return False
 
4494
        if sys.platform == 'win32':
 
4495
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4496
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4497
            # access the function
 
4498
            try:
 
4499
                import ctypes
 
4500
            except OSError:
 
4501
                return False
 
4502
        return True
 
4503
 
 
4504
    def feature_name(self):
 
4505
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4506
 
 
4507
 
 
4508
BreakinFeature = _BreakinFeature()
 
4509
 
 
4510
 
 
4511
class _CaseInsCasePresFilenameFeature(Feature):
 
4512
    """Is the file-system case insensitive, but case-preserving?"""
 
4513
 
 
4514
    def _probe(self):
 
4515
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4516
        try:
 
4517
            # first check truly case-preserving for created files, then check
 
4518
            # case insensitive when opening existing files.
 
4519
            name = osutils.normpath(name)
 
4520
            base, rel = osutils.split(name)
 
4521
            found_rel = osutils.canonical_relpath(base, name)
 
4522
            return (found_rel == rel
 
4523
                    and os.path.isfile(name.upper())
 
4524
                    and os.path.isfile(name.lower()))
 
4525
        finally:
 
4526
            os.close(fileno)
 
4527
            os.remove(name)
 
4528
 
 
4529
    def feature_name(self):
 
4530
        return "case-insensitive case-preserving filesystem"
 
4531
 
 
4532
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4533
 
 
4534
 
 
4535
class _CaseInsensitiveFilesystemFeature(Feature):
 
4536
    """Check if underlying filesystem is case-insensitive but *not* case
 
4537
    preserving.
 
4538
    """
 
4539
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4540
    # more likely to be case preserving, so this case is rare.
 
4541
 
 
4542
    def _probe(self):
 
4543
        if CaseInsCasePresFilenameFeature.available():
 
4544
            return False
 
4545
 
 
4546
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4547
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4548
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4549
        else:
 
4550
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4551
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4552
            dir=root)
 
4553
        name_a = osutils.pathjoin(tdir, 'a')
 
4554
        name_A = osutils.pathjoin(tdir, 'A')
 
4555
        os.mkdir(name_a)
 
4556
        result = osutils.isdir(name_A)
 
4557
        _rmtree_temp_dir(tdir)
 
4558
        return result
 
4559
 
 
4560
    def feature_name(self):
 
4561
        return 'case-insensitive filesystem'
 
4562
 
 
4563
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4564
 
 
4565
 
 
4566
class _CaseSensitiveFilesystemFeature(Feature):
 
4567
 
 
4568
    def _probe(self):
 
4569
        if CaseInsCasePresFilenameFeature.available():
 
4570
            return False
 
4571
        elif CaseInsensitiveFilesystemFeature.available():
 
4572
            return False
 
4573
        else:
 
4574
            return True
 
4575
 
 
4576
    def feature_name(self):
 
4577
        return 'case-sensitive filesystem'
 
4578
 
 
4579
# new coding style is for feature instances to be lowercase
 
4580
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4581
 
 
4582
 
4477
4583
# Only define SubUnitBzrRunner if subunit is available.
4478
4584
try:
4479
4585
    from subunit import TestProtocolClient
4497
4603
except ImportError:
4498
4604
    pass
4499
4605
 
4500
 
 
4501
 
@deprecated_function(deprecated_in((2, 5, 0)))
4502
 
def ModuleAvailableFeature(name):
4503
 
    from bzrlib.tests import features
4504
 
    return features.ModuleAvailableFeature(name)
4505
 
    
 
4606
class _PosixPermissionsFeature(Feature):
 
4607
 
 
4608
    def _probe(self):
 
4609
        def has_perms():
 
4610
            # create temporary file and check if specified perms are maintained.
 
4611
            import tempfile
 
4612
 
 
4613
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4614
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4615
            fd, name = f
 
4616
            os.close(fd)
 
4617
            os.chmod(name, write_perms)
 
4618
 
 
4619
            read_perms = os.stat(name).st_mode & 0777
 
4620
            os.unlink(name)
 
4621
            return (write_perms == read_perms)
 
4622
 
 
4623
        return (os.name == 'posix') and has_perms()
 
4624
 
 
4625
    def feature_name(self):
 
4626
        return 'POSIX permissions support'
 
4627
 
 
4628
posix_permissions_feature = _PosixPermissionsFeature()