~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Patch Queue Manager
  • Date: 2015-10-05 13:45:00 UTC
  • mfrom: (6603.3.1 bts794146)
  • Revision ID: pqm@pqm.ubuntu.com-20151005134500-v244rho557tv0ukd
(vila) Resolve Bug #1480015: Test failure: hexify removed from paramiko
 (Andrew Starr-Bochicchio) (Andrew Starr-Bochicchio)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
 
19
from __future__ import absolute_import
24
20
 
25
21
# NOTE: Some classes in here use camelCaseNaming() rather than
26
22
# underscore_naming().  That's for consistency with unittest; it's not the
42
38
import random
43
39
import re
44
40
import shlex
 
41
import site
45
42
import stat
46
43
import subprocess
47
44
import sys
63
60
import bzrlib
64
61
from bzrlib import (
65
62
    branchbuilder,
66
 
    bzrdir,
 
63
    controldir,
67
64
    chk_map,
68
65
    commands as _mod_commands,
69
66
    config,
 
67
    i18n,
70
68
    debug,
71
69
    errors,
72
70
    hooks,
94
92
    memory,
95
93
    pathfilter,
96
94
    )
 
95
from bzrlib.symbol_versioning import (
 
96
    deprecated_function,
 
97
    deprecated_in,
 
98
    )
97
99
from bzrlib.tests import (
 
100
    fixtures,
98
101
    test_server,
99
102
    TestUtil,
100
103
    treeshape,
101
104
    )
102
105
from bzrlib.ui import NullProgressView
103
106
from bzrlib.ui.text import TextUIFactory
 
107
from bzrlib.tests.features import _CompatabilityThunkFeature
104
108
 
105
109
# Mark this python module as being part of the implementation
106
110
# of unittest: this gives us better tracebacks where the last
133
137
isolated_environ = {
134
138
    'BZR_HOME': None,
135
139
    'HOME': None,
 
140
    'XDG_CONFIG_HOME': None,
136
141
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
142
    # tests do check our impls match APPDATA
138
143
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
213
218
        osutils.set_or_unset_env(var, value)
214
219
 
215
220
 
 
221
def _clear__type_equality_funcs(test):
 
222
    """Cleanup bound methods stored on TestCase instances
 
223
 
 
224
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
225
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
226
 
 
227
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
228
    shipped in Oneiric, an object with no clear method was used, hence the
 
229
    extra complications, see bug 809048 for details.
 
230
    """
 
231
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
232
    if type_equality_funcs is not None:
 
233
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
234
        if tef_clear is None:
 
235
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
236
            if tef_instance_dict is not None:
 
237
                tef_clear = tef_instance_dict.clear
 
238
        if tef_clear is not None:
 
239
            tef_clear()
 
240
 
 
241
 
216
242
class ExtendedTestResult(testtools.TextTestResult):
217
243
    """Accepts, reports and accumulates the results of running tests.
218
244
 
222
248
    different types of display.
223
249
 
224
250
    When a test finishes, in whatever way, it calls one of the addSuccess,
225
 
    addFailure or addError classes.  These in turn may redirect to a more
 
251
    addFailure or addError methods.  These in turn may redirect to a more
226
252
    specific case for the special test results supported by our extended
227
253
    tests.
228
254
 
386
412
        getDetails = getattr(test, "getDetails", None)
387
413
        if getDetails is not None:
388
414
            getDetails().clear()
389
 
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
390
 
        if type_equality_funcs is not None:
391
 
            type_equality_funcs.clear()
 
415
        _clear__type_equality_funcs(test)
392
416
        self._traceback_from_test = None
393
417
 
394
418
    def startTests(self):
495
519
        self.not_applicable_count += 1
496
520
        self.report_not_applicable(test, reason)
497
521
 
 
522
    def _count_stored_tests(self):
 
523
        """Count of tests instances kept alive due to not succeeding"""
 
524
        return self.error_count + self.failure_count + self.known_failure_count
 
525
 
498
526
    def _post_mortem(self, tb=None):
499
527
        """Start a PDB post mortem session."""
500
528
        if os.environ.get('BZR_TEST_PDB', None):
972
1000
 
973
1001
    def setUp(self):
974
1002
        super(TestCase, self).setUp()
 
1003
 
 
1004
        # At this point we're still accessing the config files in $BZR_HOME (as
 
1005
        # set by the user running selftest).
 
1006
        timeout = config.GlobalStack().get('selftest.timeout')
 
1007
        if timeout:
 
1008
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1009
            timeout_fixture.setUp()
 
1010
            self.addCleanup(timeout_fixture.cleanUp)
 
1011
 
975
1012
        for feature in getattr(self, '_test_needs_features', []):
976
1013
            self.requireFeature(feature)
977
1014
        self._cleanEnvironment()
 
1015
 
 
1016
        if bzrlib.global_state is not None:
 
1017
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1018
                              config.CommandLineStore())
 
1019
 
978
1020
        self._silenceUI()
979
1021
        self._startLogFile()
980
1022
        self._benchcalls = []
987
1029
        # between tests.  We should get rid of this altogether: bug 656694. --
988
1030
        # mbp 20101008
989
1031
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
990
 
        # Isolate config option expansion until its default value for bzrlib is
991
 
        # settled on or a the FIXME associated with _get_expand_default_value
992
 
        # is addressed -- vila 20110219
993
 
        self.overrideAttr(config, '_expand_default_value', None)
994
1032
        self._log_files = set()
995
1033
        # Each key in the ``_counters`` dict holds a value for a different
996
1034
        # counter. When the test ends, addDetail() should be used to output the
998
1036
        self._counters = {}
999
1037
        if 'config_stats' in selftest_debug_flags:
1000
1038
            self._install_config_stats_hooks()
 
1039
        # Do not use i18n for tests (unless the test reverses this)
 
1040
        i18n.disable_i18n()
1001
1041
 
1002
1042
    def debug(self):
1003
1043
        # debug a frame up.
1004
1044
        import pdb
1005
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
1045
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1046
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1047
                ).set_trace(sys._getframe().f_back)
1006
1048
 
1007
1049
    def discardDetail(self, name):
1008
1050
        """Extend the addDetail, getDetails api so we can remove a detail.
1163
1205
 
1164
1206
    def permit_dir(self, name):
1165
1207
        """Permit a directory to be used by this test. See permit_url."""
1166
 
        name_transport = _mod_transport.get_transport(name)
 
1208
        name_transport = _mod_transport.get_transport_from_path(name)
1167
1209
        self.permit_url(name)
1168
1210
        self.permit_url(name_transport.base)
1169
1211
 
1248
1290
        self.addCleanup(transport_server.stop_server)
1249
1291
        # Obtain a real transport because if the server supplies a password, it
1250
1292
        # will be hidden from the base on the client side.
1251
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1293
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1252
1294
        # Some transport servers effectively chroot the backing transport;
1253
1295
        # others like SFTPServer don't - users of the transport can walk up the
1254
1296
        # transport to read the entire backing transport. This wouldn't matter
1285
1327
        # hook into bzr dir opening. This leaves a small window of error for
1286
1328
        # transport tests, but they are well known, and we can improve on this
1287
1329
        # step.
1288
 
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1330
        controldir.ControlDir.hooks.install_named_hook("pre_open",
1289
1331
            self._preopen_isolate_transport, "Check bzr directories are safe.")
1290
1332
 
1291
1333
    def _ndiff_strings(self, a, b):
1689
1731
        return result
1690
1732
 
1691
1733
    def _startLogFile(self):
1692
 
        """Send bzr and test log messages to a temporary file.
1693
 
 
1694
 
        The file is removed as the test is torn down.
1695
 
        """
 
1734
        """Setup a in-memory target for bzr and testcase log messages"""
1696
1735
        pseudo_log_file = StringIO()
1697
1736
        def _get_log_contents_for_weird_testtools_api():
1698
1737
            return [pseudo_log_file.getvalue().decode(
1705
1744
        self.addCleanup(self._finishLogFile)
1706
1745
 
1707
1746
    def _finishLogFile(self):
1708
 
        """Finished with the log file.
1709
 
 
1710
 
        Close the file and delete it.
1711
 
        """
 
1747
        """Flush and dereference the in-memory log for this testcase"""
1712
1748
        if trace._trace_file:
1713
1749
            # flush the log file, to get all content
1714
1750
            trace._trace_file.flush()
1715
1751
        trace.pop_log_file(self._log_memento)
 
1752
        # The logging module now tracks references for cleanup so discard ours
 
1753
        del self._log_memento
1716
1754
 
1717
1755
    def thisFailsStrictLockCheck(self):
1718
1756
        """It is known that this test would fail with -Dstrict_locks.
1730
1768
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1731
1769
        """Overrides an object attribute restoring it after the test.
1732
1770
 
 
1771
        :note: This should be used with discretion; you should think about
 
1772
        whether it's better to make the code testable without monkey-patching.
 
1773
 
1733
1774
        :param obj: The object that will be mutated.
1734
1775
 
1735
1776
        :param attr_name: The attribute name we want to preserve/override in
1739
1780
 
1740
1781
        :returns: The actual attr value.
1741
1782
        """
1742
 
        value = getattr(obj, attr_name)
1743
1783
        # The actual value is captured by the call below
1744
 
        self.addCleanup(setattr, obj, attr_name, value)
 
1784
        value = getattr(obj, attr_name, _unitialized_attr)
 
1785
        if value is _unitialized_attr:
 
1786
            # When the test completes, the attribute should not exist, but if
 
1787
            # we aren't setting a value, we don't need to do anything.
 
1788
            if new is not _unitialized_attr:
 
1789
                self.addCleanup(delattr, obj, attr_name)
 
1790
        else:
 
1791
            self.addCleanup(setattr, obj, attr_name, value)
1745
1792
        if new is not _unitialized_attr:
1746
1793
            setattr(obj, attr_name, new)
1747
1794
        return value
1760
1807
        self.addCleanup(osutils.set_or_unset_env, name, value)
1761
1808
        return value
1762
1809
 
 
1810
    def recordCalls(self, obj, attr_name):
 
1811
        """Monkeypatch in a wrapper that will record calls.
 
1812
 
 
1813
        The monkeypatch is automatically removed when the test concludes.
 
1814
 
 
1815
        :param obj: The namespace holding the reference to be replaced;
 
1816
            typically a module, class, or object.
 
1817
        :param attr_name: A string for the name of the attribute to 
 
1818
            patch.
 
1819
        :returns: A list that will be extended with one item every time the
 
1820
            function is called, with a tuple of (args, kwargs).
 
1821
        """
 
1822
        calls = []
 
1823
 
 
1824
        def decorator(*args, **kwargs):
 
1825
            calls.append((args, kwargs))
 
1826
            return orig(*args, **kwargs)
 
1827
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1828
        return calls
 
1829
 
1763
1830
    def _cleanEnvironment(self):
1764
1831
        for name, value in isolated_environ.iteritems():
1765
1832
            self.overrideEnv(name, value)
1772
1839
        self._preserved_lazy_hooks.clear()
1773
1840
 
1774
1841
    def knownFailure(self, reason):
1775
 
        """This test has failed for some known reason."""
1776
 
        raise KnownFailure(reason)
 
1842
        """Declare that this test fails for a known reason
 
1843
 
 
1844
        Tests that are known to fail should generally be using expectedFailure
 
1845
        with an appropriate reverse assertion if a change could cause the test
 
1846
        to start passing. Conversely if the test has no immediate prospect of
 
1847
        succeeding then using skip is more suitable.
 
1848
 
 
1849
        When this method is called while an exception is being handled, that
 
1850
        traceback will be used, otherwise a new exception will be thrown to
 
1851
        provide one but won't be reported.
 
1852
        """
 
1853
        self._add_reason(reason)
 
1854
        try:
 
1855
            exc_info = sys.exc_info()
 
1856
            if exc_info != (None, None, None):
 
1857
                self._report_traceback(exc_info)
 
1858
            else:
 
1859
                try:
 
1860
                    raise self.failureException(reason)
 
1861
                except self.failureException:
 
1862
                    exc_info = sys.exc_info()
 
1863
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1864
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1865
        finally:
 
1866
            del exc_info
1777
1867
 
1778
1868
    def _suppress_log(self):
1779
1869
        """Remove the log info from details."""
1907
1997
 
1908
1998
        self.log('run bzr: %r', args)
1909
1999
        # FIXME: don't call into logging here
1910
 
        handler = logging.StreamHandler(stderr)
1911
 
        handler.setLevel(logging.INFO)
 
2000
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
2001
            level=logging.INFO)
1912
2002
        logger = logging.getLogger('')
1913
2003
        logger.addHandler(handler)
1914
2004
        old_ui_factory = ui.ui_factory
2102
2192
 
2103
2193
        if env_changes is None:
2104
2194
            env_changes = {}
 
2195
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2196
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2197
        # gets set to the computed directory of this parent process.
 
2198
        if site.USER_BASE is not None:
 
2199
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
2105
2200
        old_env = {}
2106
2201
 
2107
2202
        def cleanup_environment():
2298
2393
        from bzrlib.smart import request
2299
2394
        request_handlers = request.request_handlers
2300
2395
        orig_method = request_handlers.get(verb)
 
2396
        orig_info = request_handlers.get_info(verb)
2301
2397
        request_handlers.remove(verb)
2302
 
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2398
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2399
            info=orig_info)
2303
2400
 
2304
2401
 
2305
2402
class CapturedCall(object):
2358
2455
        self.transport_readonly_server = None
2359
2456
        self.__vfs_server = None
2360
2457
 
 
2458
    def setUp(self):
 
2459
        super(TestCaseWithMemoryTransport, self).setUp()
 
2460
 
 
2461
        def _add_disconnect_cleanup(transport):
 
2462
            """Schedule disconnection of given transport at test cleanup
 
2463
 
 
2464
            This needs to happen for all connected transports or leaks occur.
 
2465
 
 
2466
            Note reconnections may mean we call disconnect multiple times per
 
2467
            transport which is suboptimal but seems harmless.
 
2468
            """
 
2469
            self.addCleanup(transport.disconnect)
 
2470
 
 
2471
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2472
            _add_disconnect_cleanup, None)
 
2473
 
 
2474
        self._make_test_root()
 
2475
        self.addCleanup(os.chdir, os.getcwdu())
 
2476
        self.makeAndChdirToTestDir()
 
2477
        self.overrideEnvironmentForTesting()
 
2478
        self.__readonly_server = None
 
2479
        self.__server = None
 
2480
        self.reduceLockdirTimeout()
 
2481
        # Each test may use its own config files even if the local config files
 
2482
        # don't actually exist. They'll rightly fail if they try to create them
 
2483
        # though.
 
2484
        self.overrideAttr(config, '_shared_stores', {})
 
2485
 
2361
2486
    def get_transport(self, relpath=None):
2362
2487
        """Return a writeable transport.
2363
2488
 
2366
2491
 
2367
2492
        :param relpath: a path relative to the base url.
2368
2493
        """
2369
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2494
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2370
2495
        self.assertFalse(t.is_readonly())
2371
2496
        return t
2372
2497
 
2378
2503
 
2379
2504
        :param relpath: a path relative to the base url.
2380
2505
        """
2381
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2506
        t = _mod_transport.get_transport_from_url(
 
2507
            self.get_readonly_url(relpath))
2382
2508
        self.assertTrue(t.is_readonly())
2383
2509
        return t
2384
2510
 
2505
2631
        real branch.
2506
2632
        """
2507
2633
        root = TestCaseWithMemoryTransport.TEST_ROOT
2508
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2634
        try:
 
2635
            # Make sure we get a readable and accessible home for .bzr.log
 
2636
            # and/or config files, and not fallback to weird defaults (see
 
2637
            # http://pad.lv/825027).
 
2638
            self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2639
            os.environ['BZR_HOME'] = root
 
2640
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2641
            del os.environ['BZR_HOME']
 
2642
        except Exception, e:
 
2643
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
 
2644
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2645
        # we don't need to re-open the wt to check it hasn't changed.
 
2646
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2647
            wt.control_transport.get_bytes('dirstate'))
2509
2648
 
2510
2649
    def _check_safety_net(self):
2511
2650
        """Check that the safety .bzr directory have not been touched.
2514
2653
        propagating. This method ensures than a test did not leaked.
2515
2654
        """
2516
2655
        root = TestCaseWithMemoryTransport.TEST_ROOT
2517
 
        self.permit_url(_mod_transport.get_transport(root).base)
2518
 
        wt = workingtree.WorkingTree.open(root)
2519
 
        last_rev = wt.last_revision()
2520
 
        if last_rev != 'null:':
 
2656
        t = _mod_transport.get_transport_from_path(root)
 
2657
        self.permit_url(t.base)
 
2658
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2659
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2521
2660
            # The current test have modified the /bzr directory, we need to
2522
2661
            # recreate a new one or all the followng tests will fail.
2523
2662
            # If you need to inspect its content uncomment the following line
2555
2694
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2556
2695
        self.permit_dir(self.test_dir)
2557
2696
 
2558
 
    def make_branch(self, relpath, format=None):
 
2697
    def make_branch(self, relpath, format=None, name=None):
2559
2698
        """Create a branch on the transport at relpath."""
2560
2699
        repo = self.make_repository(relpath, format=format)
2561
 
        return repo.bzrdir.create_branch()
 
2700
        return repo.bzrdir.create_branch(append_revisions_only=False,
 
2701
                                         name=name)
 
2702
 
 
2703
    def get_default_format(self):
 
2704
        return 'default'
 
2705
 
 
2706
    def resolve_format(self, format):
 
2707
        """Resolve an object to a ControlDir format object.
 
2708
 
 
2709
        The initial format object can either already be
 
2710
        a ControlDirFormat, None (for the default format),
 
2711
        or a string with the name of the control dir format.
 
2712
 
 
2713
        :param format: Object to resolve
 
2714
        :return A ControlDirFormat instance
 
2715
        """
 
2716
        if format is None:
 
2717
            format = self.get_default_format()
 
2718
        if isinstance(format, basestring):
 
2719
            format = controldir.format_registry.make_bzrdir(format)
 
2720
        return format
2562
2721
 
2563
2722
    def make_bzrdir(self, relpath, format=None):
2564
2723
        try:
2568
2727
            t = _mod_transport.get_transport(maybe_a_url)
2569
2728
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2570
2729
                t.ensure_base()
2571
 
            if format is None:
2572
 
                format = 'default'
2573
 
            if isinstance(format, basestring):
2574
 
                format = bzrdir.format_registry.make_bzrdir(format)
 
2730
            format = self.resolve_format(format)
2575
2731
            return format.initialize_on_transport(t)
2576
2732
        except errors.UninitializableFormat:
2577
2733
            raise TestSkipped("Format %s is not initializable." % format)
2578
2734
 
2579
 
    def make_repository(self, relpath, shared=False, format=None):
 
2735
    def make_repository(self, relpath, shared=None, format=None):
2580
2736
        """Create a repository on our default transport at relpath.
2581
2737
 
2582
2738
        Note that relpath must be a relative path, not a full url.
2593
2749
            backing_server = self.get_server()
2594
2750
        smart_server = test_server.SmartTCPServer_for_testing()
2595
2751
        self.start_server(smart_server, backing_server)
2596
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2752
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2597
2753
                                                   ).clone(path)
2598
2754
        return remote_transport
2599
2755
 
2613
2769
        self.overrideEnv('HOME', test_home_dir)
2614
2770
        self.overrideEnv('BZR_HOME', test_home_dir)
2615
2771
 
2616
 
    def setUp(self):
2617
 
        super(TestCaseWithMemoryTransport, self).setUp()
2618
 
        # Ensure that ConnectedTransport doesn't leak sockets
2619
 
        def get_transport_with_cleanup(*args, **kwargs):
2620
 
            t = orig_get_transport(*args, **kwargs)
2621
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2622
 
                self.addCleanup(t.disconnect)
2623
 
            return t
2624
 
 
2625
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2626
 
                                               get_transport_with_cleanup)
2627
 
        self._make_test_root()
2628
 
        self.addCleanup(os.chdir, os.getcwdu())
2629
 
        self.makeAndChdirToTestDir()
2630
 
        self.overrideEnvironmentForTesting()
2631
 
        self.__readonly_server = None
2632
 
        self.__server = None
2633
 
        self.reduceLockdirTimeout()
2634
 
 
2635
2772
    def setup_smart_server_with_call_log(self):
2636
2773
        """Sets up a smart server as the transport server with a call log."""
2637
2774
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2775
        self.hpss_connections = []
2638
2776
        self.hpss_calls = []
2639
2777
        import traceback
2640
2778
        # Skip the current stack down to the caller of
2643
2781
        def capture_hpss_call(params):
2644
2782
            self.hpss_calls.append(
2645
2783
                CapturedCall(params, prefix_length))
 
2784
        def capture_connect(transport):
 
2785
            self.hpss_connections.append(transport)
2646
2786
        client._SmartClient.hooks.install_named_hook(
2647
2787
            'call', capture_hpss_call, None)
 
2788
        _mod_transport.Transport.hooks.install_named_hook(
 
2789
            'post_connect', capture_connect, None)
2648
2790
 
2649
2791
    def reset_smart_call_log(self):
2650
2792
        self.hpss_calls = []
 
2793
        self.hpss_connections = []
2651
2794
 
2652
2795
 
2653
2796
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2723
2866
        # stacking policy to honour; create a bzr dir with an unshared
2724
2867
        # repository (but not a branch - our code would be trying to escape
2725
2868
        # then!) to stop them, and permit it to be read.
2726
 
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2869
        # control = controldir.ControlDir.create(self.test_base_dir)
2727
2870
        # control.create_repository()
2728
2871
        self.test_home_dir = self.test_base_dir + '/home'
2729
2872
        os.mkdir(self.test_home_dir)
2764
2907
                "a list or a tuple. Got %r instead" % (shape,))
2765
2908
        # It's OK to just create them using forward slashes on windows.
2766
2909
        if transport is None or transport.is_readonly():
2767
 
            transport = _mod_transport.get_transport(".")
 
2910
            transport = _mod_transport.get_transport_from_path(".")
2768
2911
        for name in shape:
2769
2912
            self.assertIsInstance(name, basestring)
2770
2913
            if name[-1] == '/':
2818
2961
    readwrite one must both define get_url() as resolving to os.getcwd().
2819
2962
    """
2820
2963
 
 
2964
    def setUp(self):
 
2965
        super(TestCaseWithTransport, self).setUp()
 
2966
        self.__vfs_server = None
 
2967
 
2821
2968
    def get_vfs_only_server(self):
2822
2969
        """See TestCaseWithMemoryTransport.
2823
2970
 
2855
3002
        # this obviously requires a format that supports branch references
2856
3003
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2857
3004
        # RBC 20060208
 
3005
        format = self.resolve_format(format=format)
 
3006
        if not format.supports_workingtrees:
 
3007
            b = self.make_branch(relpath+'.branch', format=format)
 
3008
            return b.create_checkout(relpath, lightweight=True)
2858
3009
        b = self.make_branch(relpath, format=format)
2859
3010
        try:
2860
3011
            return b.bzrdir.create_workingtree()
2865
3016
            if self.vfs_transport_factory is test_server.LocalURLServer:
2866
3017
                # the branch is colocated on disk, we cannot create a checkout.
2867
3018
                # hopefully callers will expect this.
2868
 
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
3019
                local_controldir = controldir.ControlDir.open(
 
3020
                    self.get_vfs_only_url(relpath))
2869
3021
                wt = local_controldir.create_workingtree()
2870
3022
                if wt.branch._format != b._format:
2871
3023
                    wt._branch = b
2901
3053
        self.assertFalse(differences.has_changed(),
2902
3054
            "Trees %r and %r are different: %r" % (left, right, differences))
2903
3055
 
2904
 
    def setUp(self):
2905
 
        super(TestCaseWithTransport, self).setUp()
2906
 
        self.__vfs_server = None
2907
 
 
2908
3056
    def disable_missing_extensions_warning(self):
2909
3057
        """Some tests expect a precise stderr content.
2910
3058
 
2911
3059
        There is no point in forcing them to duplicate the extension related
2912
3060
        warning.
2913
3061
        """
2914
 
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3062
        config.GlobalStack().set('ignore_missing_extensions', True)
2915
3063
 
2916
3064
 
2917
3065
class ChrootedTestCase(TestCaseWithTransport):
3159
3307
                            result_decorators=result_decorators,
3160
3308
                            )
3161
3309
    runner.stop_on_failure=stop_on_failure
 
3310
    if isinstance(suite, unittest.TestSuite):
 
3311
        # Empty out _tests list of passed suite and populate new TestSuite
 
3312
        suite._tests[:], suite = [], TestSuite(suite)
3162
3313
    # built in decorator factories:
3163
3314
    decorators = [
3164
3315
        random_order(random_seed, runner),
3262
3413
 
3263
3414
class TestDecorator(TestUtil.TestSuite):
3264
3415
    """A decorator for TestCase/TestSuite objects.
3265
 
    
3266
 
    Usually, subclasses should override __iter__(used when flattening test
3267
 
    suites), which we do to filter, reorder, parallelise and so on, run() and
3268
 
    debug().
 
3416
 
 
3417
    Contains rather than flattening suite passed on construction
3269
3418
    """
3270
3419
 
3271
 
    def __init__(self, suite):
3272
 
        TestUtil.TestSuite.__init__(self)
3273
 
        self.addTest(suite)
3274
 
 
3275
 
    def countTestCases(self):
3276
 
        cases = 0
3277
 
        for test in self:
3278
 
            cases += test.countTestCases()
3279
 
        return cases
3280
 
 
3281
 
    def debug(self):
3282
 
        for test in self:
3283
 
            test.debug()
3284
 
 
3285
 
    def run(self, result):
3286
 
        # Use iteration on self, not self._tests, to allow subclasses to hook
3287
 
        # into __iter__.
3288
 
        for test in self:
3289
 
            if result.shouldStop:
3290
 
                break
3291
 
            test.run(result)
3292
 
        return result
 
3420
    def __init__(self, suite=None):
 
3421
        super(TestDecorator, self).__init__()
 
3422
        if suite is not None:
 
3423
            self.addTest(suite)
 
3424
 
 
3425
    # Don't need subclass run method with suite emptying
 
3426
    run = unittest.TestSuite.run
3293
3427
 
3294
3428
 
3295
3429
class CountingDecorator(TestDecorator):
3306
3440
    """A decorator which excludes test matching an exclude pattern."""
3307
3441
 
3308
3442
    def __init__(self, suite, exclude_pattern):
3309
 
        TestDecorator.__init__(self, suite)
3310
 
        self.exclude_pattern = exclude_pattern
3311
 
        self.excluded = False
3312
 
 
3313
 
    def __iter__(self):
3314
 
        if self.excluded:
3315
 
            return iter(self._tests)
3316
 
        self.excluded = True
3317
 
        suite = exclude_tests_by_re(self, self.exclude_pattern)
3318
 
        del self._tests[:]
3319
 
        self.addTests(suite)
3320
 
        return iter(self._tests)
 
3443
        super(ExcludeDecorator, self).__init__(
 
3444
            exclude_tests_by_re(suite, exclude_pattern))
3321
3445
 
3322
3446
 
3323
3447
class FilterTestsDecorator(TestDecorator):
3324
3448
    """A decorator which filters tests to those matching a pattern."""
3325
3449
 
3326
3450
    def __init__(self, suite, pattern):
3327
 
        TestDecorator.__init__(self, suite)
3328
 
        self.pattern = pattern
3329
 
        self.filtered = False
3330
 
 
3331
 
    def __iter__(self):
3332
 
        if self.filtered:
3333
 
            return iter(self._tests)
3334
 
        self.filtered = True
3335
 
        suite = filter_suite_by_re(self, self.pattern)
3336
 
        del self._tests[:]
3337
 
        self.addTests(suite)
3338
 
        return iter(self._tests)
 
3451
        super(FilterTestsDecorator, self).__init__(
 
3452
            filter_suite_by_re(suite, pattern))
3339
3453
 
3340
3454
 
3341
3455
class RandomDecorator(TestDecorator):
3342
3456
    """A decorator which randomises the order of its tests."""
3343
3457
 
3344
3458
    def __init__(self, suite, random_seed, stream):
3345
 
        TestDecorator.__init__(self, suite)
3346
 
        self.random_seed = random_seed
3347
 
        self.randomised = False
3348
 
        self.stream = stream
3349
 
 
3350
 
    def __iter__(self):
3351
 
        if self.randomised:
3352
 
            return iter(self._tests)
3353
 
        self.randomised = True
3354
 
        self.stream.write("Randomizing test order using seed %s\n\n" %
3355
 
            (self.actual_seed()))
 
3459
        random_seed = self.actual_seed(random_seed)
 
3460
        stream.write("Randomizing test order using seed %s\n\n" %
 
3461
            (random_seed,))
3356
3462
        # Initialise the random number generator.
3357
 
        random.seed(self.actual_seed())
3358
 
        suite = randomize_suite(self)
3359
 
        del self._tests[:]
3360
 
        self.addTests(suite)
3361
 
        return iter(self._tests)
 
3463
        random.seed(random_seed)
 
3464
        super(RandomDecorator, self).__init__(randomize_suite(suite))
3362
3465
 
3363
 
    def actual_seed(self):
3364
 
        if self.random_seed == "now":
 
3466
    @staticmethod
 
3467
    def actual_seed(seed):
 
3468
        if seed == "now":
3365
3469
            # We convert the seed to a long to make it reuseable across
3366
3470
            # invocations (because the user can reenter it).
3367
 
            self.random_seed = long(time.time())
 
3471
            return long(time.time())
3368
3472
        else:
3369
3473
            # Convert the seed to a long if we can
3370
3474
            try:
3371
 
                self.random_seed = long(self.random_seed)
3372
 
            except:
 
3475
                return long(seed)
 
3476
            except (TypeError, ValueError):
3373
3477
                pass
3374
 
        return self.random_seed
 
3478
        return seed
3375
3479
 
3376
3480
 
3377
3481
class TestFirstDecorator(TestDecorator):
3378
3482
    """A decorator which moves named tests to the front."""
3379
3483
 
3380
3484
    def __init__(self, suite, pattern):
3381
 
        TestDecorator.__init__(self, suite)
3382
 
        self.pattern = pattern
3383
 
        self.filtered = False
3384
 
 
3385
 
    def __iter__(self):
3386
 
        if self.filtered:
3387
 
            return iter(self._tests)
3388
 
        self.filtered = True
3389
 
        suites = split_suite_by_re(self, self.pattern)
3390
 
        del self._tests[:]
3391
 
        self.addTests(suites)
3392
 
        return iter(self._tests)
 
3485
        super(TestFirstDecorator, self).__init__()
 
3486
        self.addTests(split_suite_by_re(suite, pattern))
3393
3487
 
3394
3488
 
3395
3489
def partition_tests(suite, count):
3427
3521
    """
3428
3522
    concurrency = osutils.local_concurrency()
3429
3523
    result = []
3430
 
    from subunit import TestProtocolClient, ProtocolTestCase
 
3524
    from subunit import ProtocolTestCase
3431
3525
    from subunit.test_results import AutoTimingTestResultDecorator
3432
3526
    class TestInOtherProcess(ProtocolTestCase):
3433
3527
        # Should be in subunit, I think. RBC.
3439
3533
            try:
3440
3534
                ProtocolTestCase.run(self, result)
3441
3535
            finally:
3442
 
                os.waitpid(self.pid, 0)
 
3536
                pid, status = os.waitpid(self.pid, 0)
 
3537
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3538
            #                that something went wrong.
3443
3539
 
3444
3540
    test_blocks = partition_tests(suite, concurrency)
 
3541
    # Clear the tests from the original suite so it doesn't keep them alive
 
3542
    suite._tests[:] = []
3445
3543
    for process_tests in test_blocks:
3446
 
        process_suite = TestUtil.TestSuite()
3447
 
        process_suite.addTests(process_tests)
 
3544
        process_suite = TestUtil.TestSuite(process_tests)
 
3545
        # Also clear each split list so new suite has only reference
 
3546
        process_tests[:] = []
3448
3547
        c2pread, c2pwrite = os.pipe()
3449
3548
        pid = os.fork()
3450
3549
        if pid == 0:
3451
 
            workaround_zealous_crypto_random()
3452
3550
            try:
 
3551
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3552
                workaround_zealous_crypto_random()
3453
3553
                os.close(c2pread)
3454
3554
                # Leave stderr and stdout open so we can see test noise
3455
3555
                # Close stdin so that the child goes away if it decides to
3456
3556
                # read from stdin (otherwise its a roulette to see what
3457
3557
                # child actually gets keystrokes for pdb etc).
3458
3558
                sys.stdin.close()
3459
 
                sys.stdin = None
3460
 
                stream = os.fdopen(c2pwrite, 'wb', 1)
3461
3559
                subunit_result = AutoTimingTestResultDecorator(
3462
 
                    TestProtocolClient(stream))
 
3560
                    SubUnitBzrProtocolClient(stream))
3463
3561
                process_suite.run(subunit_result)
3464
 
            finally:
3465
 
                os._exit(0)
 
3562
            except:
 
3563
                # Try and report traceback on stream, but exit with error even
 
3564
                # if stream couldn't be created or something else goes wrong.
 
3565
                # The traceback is formatted to a string and written in one go
 
3566
                # to avoid interleaving lines from multiple failing children.
 
3567
                try:
 
3568
                    stream.write(traceback.format_exc())
 
3569
                finally:
 
3570
                    os._exit(1)
 
3571
            os._exit(0)
3466
3572
        else:
3467
3573
            os.close(c2pwrite)
3468
3574
            stream = os.fdopen(c2pread, 'rb', 1)
3574
3680
#                           with proper exclusion rules.
3575
3681
#   -Ethreads               Will display thread ident at creation/join time to
3576
3682
#                           help track thread leaks
3577
 
 
 
3683
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3684
#                           deallocated after being completed.
3578
3685
#   -Econfig_stats          Will collect statistics using addDetail
3579
3686
selftest_debug_flags = set()
3580
3687
 
3685
3792
 
3686
3793
    :return: (absents, duplicates) absents is a list containing the test found
3687
3794
        in id_list but not in test_suite, duplicates is a list containing the
3688
 
        test found multiple times in test_suite.
 
3795
        tests found multiple times in test_suite.
3689
3796
 
3690
3797
    When using a prefined test id list, it may occurs that some tests do not
3691
3798
    exist anymore or that some tests use the same id. This function warns the
3815
3922
        'bzrlib.doc',
3816
3923
        'bzrlib.tests.blackbox',
3817
3924
        'bzrlib.tests.commands',
3818
 
        'bzrlib.tests.doc_generate',
3819
3925
        'bzrlib.tests.per_branch',
3820
3926
        'bzrlib.tests.per_bzrdir',
3821
3927
        'bzrlib.tests.per_controldir',
3885
3991
        'bzrlib.tests.test_email_message',
3886
3992
        'bzrlib.tests.test_eol_filters',
3887
3993
        'bzrlib.tests.test_errors',
 
3994
        'bzrlib.tests.test_estimate_compressed_size',
3888
3995
        'bzrlib.tests.test_export',
3889
3996
        'bzrlib.tests.test_export_pot',
3890
3997
        'bzrlib.tests.test_extract',
 
3998
        'bzrlib.tests.test_features',
3891
3999
        'bzrlib.tests.test_fetch',
3892
4000
        'bzrlib.tests.test_fixtures',
3893
4001
        'bzrlib.tests.test_fifo_cache',
3894
4002
        'bzrlib.tests.test_filters',
 
4003
        'bzrlib.tests.test_filter_tree',
3895
4004
        'bzrlib.tests.test_ftp_transport',
3896
4005
        'bzrlib.tests.test_foreign',
3897
4006
        'bzrlib.tests.test_generate_docs',
3906
4015
        'bzrlib.tests.test_http',
3907
4016
        'bzrlib.tests.test_http_response',
3908
4017
        'bzrlib.tests.test_https_ca_bundle',
 
4018
        'bzrlib.tests.test_https_urllib',
3909
4019
        'bzrlib.tests.test_i18n',
3910
4020
        'bzrlib.tests.test_identitymap',
3911
4021
        'bzrlib.tests.test_ignores',
3960
4070
        'bzrlib.tests.test_revisiontree',
3961
4071
        'bzrlib.tests.test_rio',
3962
4072
        'bzrlib.tests.test_rules',
 
4073
        'bzrlib.tests.test_url_policy_open',
3963
4074
        'bzrlib.tests.test_sampler',
3964
4075
        'bzrlib.tests.test_scenarios',
3965
4076
        'bzrlib.tests.test_script',
3972
4083
        'bzrlib.tests.test_smart',
3973
4084
        'bzrlib.tests.test_smart_add',
3974
4085
        'bzrlib.tests.test_smart_request',
 
4086
        'bzrlib.tests.test_smart_signals',
3975
4087
        'bzrlib.tests.test_smart_transport',
3976
4088
        'bzrlib.tests.test_smtp_connection',
3977
4089
        'bzrlib.tests.test_source',
4008
4120
        'bzrlib.tests.test_version',
4009
4121
        'bzrlib.tests.test_version_info',
4010
4122
        'bzrlib.tests.test_versionedfile',
 
4123
        'bzrlib.tests.test_vf_search',
4011
4124
        'bzrlib.tests.test_weave',
4012
4125
        'bzrlib.tests.test_whitebox',
4013
4126
        'bzrlib.tests.test_win32utils',
4228
4341
    """Copy test and apply scenario to it.
4229
4342
 
4230
4343
    :param test: A test to adapt.
4231
 
    :param scenario: A tuple describing the scenarion.
 
4344
    :param scenario: A tuple describing the scenario.
4232
4345
        The first element of the tuple is the new test id.
4233
4346
        The second element is a dict containing attributes to set on the
4234
4347
        test.
4288
4401
        the module is available.
4289
4402
    """
4290
4403
 
 
4404
    from bzrlib.tests.features import ModuleAvailableFeature
4291
4405
    py_module = pyutils.get_named_object(py_module_name)
4292
4406
    scenarios = [
4293
4407
        ('python', {'module': py_module}),
4334
4448
                         % (os.path.basename(dirname), printable_e))
4335
4449
 
4336
4450
 
4337
 
class Feature(object):
4338
 
    """An operating system Feature."""
4339
 
 
4340
 
    def __init__(self):
4341
 
        self._available = None
4342
 
 
4343
 
    def available(self):
4344
 
        """Is the feature available?
4345
 
 
4346
 
        :return: True if the feature is available.
4347
 
        """
4348
 
        if self._available is None:
4349
 
            self._available = self._probe()
4350
 
        return self._available
4351
 
 
4352
 
    def _probe(self):
4353
 
        """Implement this method in concrete features.
4354
 
 
4355
 
        :return: True if the feature is available.
4356
 
        """
4357
 
        raise NotImplementedError
4358
 
 
4359
 
    def __str__(self):
4360
 
        if getattr(self, 'feature_name', None):
4361
 
            return self.feature_name()
4362
 
        return self.__class__.__name__
4363
 
 
4364
 
 
4365
 
class _SymlinkFeature(Feature):
4366
 
 
4367
 
    def _probe(self):
4368
 
        return osutils.has_symlinks()
4369
 
 
4370
 
    def feature_name(self):
4371
 
        return 'symlinks'
4372
 
 
4373
 
SymlinkFeature = _SymlinkFeature()
4374
 
 
4375
 
 
4376
 
class _HardlinkFeature(Feature):
4377
 
 
4378
 
    def _probe(self):
4379
 
        return osutils.has_hardlinks()
4380
 
 
4381
 
    def feature_name(self):
4382
 
        return 'hardlinks'
4383
 
 
4384
 
HardlinkFeature = _HardlinkFeature()
4385
 
 
4386
 
 
4387
 
class _OsFifoFeature(Feature):
4388
 
 
4389
 
    def _probe(self):
4390
 
        return getattr(os, 'mkfifo', None)
4391
 
 
4392
 
    def feature_name(self):
4393
 
        return 'filesystem fifos'
4394
 
 
4395
 
OsFifoFeature = _OsFifoFeature()
4396
 
 
4397
 
 
4398
 
class _UnicodeFilenameFeature(Feature):
4399
 
    """Does the filesystem support Unicode filenames?"""
4400
 
 
4401
 
    def _probe(self):
4402
 
        try:
4403
 
            # Check for character combinations unlikely to be covered by any
4404
 
            # single non-unicode encoding. We use the characters
4405
 
            # - greek small letter alpha (U+03B1) and
4406
 
            # - braille pattern dots-123456 (U+283F).
4407
 
            os.stat(u'\u03b1\u283f')
4408
 
        except UnicodeEncodeError:
4409
 
            return False
4410
 
        except (IOError, OSError):
4411
 
            # The filesystem allows the Unicode filename but the file doesn't
4412
 
            # exist.
4413
 
            return True
4414
 
        else:
4415
 
            # The filesystem allows the Unicode filename and the file exists,
4416
 
            # for some reason.
4417
 
            return True
4418
 
 
4419
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4420
 
 
4421
 
 
4422
 
class _CompatabilityThunkFeature(Feature):
4423
 
    """This feature is just a thunk to another feature.
4424
 
 
4425
 
    It issues a deprecation warning if it is accessed, to let you know that you
4426
 
    should really use a different feature.
4427
 
    """
4428
 
 
4429
 
    def __init__(self, dep_version, module, name,
4430
 
                 replacement_name, replacement_module=None):
4431
 
        super(_CompatabilityThunkFeature, self).__init__()
4432
 
        self._module = module
4433
 
        if replacement_module is None:
4434
 
            replacement_module = module
4435
 
        self._replacement_module = replacement_module
4436
 
        self._name = name
4437
 
        self._replacement_name = replacement_name
4438
 
        self._dep_version = dep_version
4439
 
        self._feature = None
4440
 
 
4441
 
    def _ensure(self):
4442
 
        if self._feature is None:
4443
 
            depr_msg = self._dep_version % ('%s.%s'
4444
 
                                            % (self._module, self._name))
4445
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4446
 
                                               self._replacement_name)
4447
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4448
 
            # Import the new feature and use it as a replacement for the
4449
 
            # deprecated one.
4450
 
            self._feature = pyutils.get_named_object(
4451
 
                self._replacement_module, self._replacement_name)
4452
 
 
4453
 
    def _probe(self):
4454
 
        self._ensure()
4455
 
        return self._feature._probe()
4456
 
 
4457
 
 
4458
 
class ModuleAvailableFeature(Feature):
4459
 
    """This is a feature than describes a module we want to be available.
4460
 
 
4461
 
    Declare the name of the module in __init__(), and then after probing, the
4462
 
    module will be available as 'self.module'.
4463
 
 
4464
 
    :ivar module: The module if it is available, else None.
4465
 
    """
4466
 
 
4467
 
    def __init__(self, module_name):
4468
 
        super(ModuleAvailableFeature, self).__init__()
4469
 
        self.module_name = module_name
4470
 
 
4471
 
    def _probe(self):
4472
 
        try:
4473
 
            self._module = __import__(self.module_name, {}, {}, [''])
4474
 
            return True
4475
 
        except ImportError:
4476
 
            return False
4477
 
 
4478
 
    @property
4479
 
    def module(self):
4480
 
        if self.available(): # Make sure the probe has been done
4481
 
            return self._module
4482
 
        return None
4483
 
 
4484
 
    def feature_name(self):
4485
 
        return self.module_name
4486
 
 
4487
 
 
4488
4451
def probe_unicode_in_user_encoding():
4489
4452
    """Try to encode several unicode strings to use in unicode-aware tests.
4490
4453
    Return first successfull match.
4518
4481
    return None
4519
4482
 
4520
4483
 
4521
 
class _HTTPSServerFeature(Feature):
4522
 
    """Some tests want an https Server, check if one is available.
4523
 
 
4524
 
    Right now, the only way this is available is under python2.6 which provides
4525
 
    an ssl module.
4526
 
    """
4527
 
 
4528
 
    def _probe(self):
4529
 
        try:
4530
 
            import ssl
4531
 
            return True
4532
 
        except ImportError:
4533
 
            return False
4534
 
 
4535
 
    def feature_name(self):
4536
 
        return 'HTTPSServer'
4537
 
 
4538
 
 
4539
 
HTTPSServerFeature = _HTTPSServerFeature()
4540
 
 
4541
 
 
4542
 
class _UnicodeFilename(Feature):
4543
 
    """Does the filesystem support Unicode filenames?"""
4544
 
 
4545
 
    def _probe(self):
4546
 
        try:
4547
 
            os.stat(u'\u03b1')
4548
 
        except UnicodeEncodeError:
4549
 
            return False
4550
 
        except (IOError, OSError):
4551
 
            # The filesystem allows the Unicode filename but the file doesn't
4552
 
            # exist.
4553
 
            return True
4554
 
        else:
4555
 
            # The filesystem allows the Unicode filename and the file exists,
4556
 
            # for some reason.
4557
 
            return True
4558
 
 
4559
 
UnicodeFilename = _UnicodeFilename()
4560
 
 
4561
 
 
4562
 
class _ByteStringNamedFilesystem(Feature):
4563
 
    """Is the filesystem based on bytes?"""
4564
 
 
4565
 
    def _probe(self):
4566
 
        if os.name == "posix":
4567
 
            return True
4568
 
        return False
4569
 
 
4570
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4571
 
 
4572
 
 
4573
 
class _UTF8Filesystem(Feature):
4574
 
    """Is the filesystem UTF-8?"""
4575
 
 
4576
 
    def _probe(self):
4577
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4578
 
            return True
4579
 
        return False
4580
 
 
4581
 
UTF8Filesystem = _UTF8Filesystem()
4582
 
 
4583
 
 
4584
 
class _BreakinFeature(Feature):
4585
 
    """Does this platform support the breakin feature?"""
4586
 
 
4587
 
    def _probe(self):
4588
 
        from bzrlib import breakin
4589
 
        if breakin.determine_signal() is None:
4590
 
            return False
4591
 
        if sys.platform == 'win32':
4592
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4593
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4594
 
            # access the function
4595
 
            try:
4596
 
                import ctypes
4597
 
            except OSError:
4598
 
                return False
4599
 
        return True
4600
 
 
4601
 
    def feature_name(self):
4602
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4603
 
 
4604
 
 
4605
 
BreakinFeature = _BreakinFeature()
4606
 
 
4607
 
 
4608
 
class _CaseInsCasePresFilenameFeature(Feature):
4609
 
    """Is the file-system case insensitive, but case-preserving?"""
4610
 
 
4611
 
    def _probe(self):
4612
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4613
 
        try:
4614
 
            # first check truly case-preserving for created files, then check
4615
 
            # case insensitive when opening existing files.
4616
 
            name = osutils.normpath(name)
4617
 
            base, rel = osutils.split(name)
4618
 
            found_rel = osutils.canonical_relpath(base, name)
4619
 
            return (found_rel == rel
4620
 
                    and os.path.isfile(name.upper())
4621
 
                    and os.path.isfile(name.lower()))
4622
 
        finally:
4623
 
            os.close(fileno)
4624
 
            os.remove(name)
4625
 
 
4626
 
    def feature_name(self):
4627
 
        return "case-insensitive case-preserving filesystem"
4628
 
 
4629
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4630
 
 
4631
 
 
4632
 
class _CaseInsensitiveFilesystemFeature(Feature):
4633
 
    """Check if underlying filesystem is case-insensitive but *not* case
4634
 
    preserving.
4635
 
    """
4636
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4637
 
    # more likely to be case preserving, so this case is rare.
4638
 
 
4639
 
    def _probe(self):
4640
 
        if CaseInsCasePresFilenameFeature.available():
4641
 
            return False
4642
 
 
4643
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4644
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4645
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4646
 
        else:
4647
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4648
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4649
 
            dir=root)
4650
 
        name_a = osutils.pathjoin(tdir, 'a')
4651
 
        name_A = osutils.pathjoin(tdir, 'A')
4652
 
        os.mkdir(name_a)
4653
 
        result = osutils.isdir(name_A)
4654
 
        _rmtree_temp_dir(tdir)
4655
 
        return result
4656
 
 
4657
 
    def feature_name(self):
4658
 
        return 'case-insensitive filesystem'
4659
 
 
4660
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4661
 
 
4662
 
 
4663
 
class _CaseSensitiveFilesystemFeature(Feature):
4664
 
 
4665
 
    def _probe(self):
4666
 
        if CaseInsCasePresFilenameFeature.available():
4667
 
            return False
4668
 
        elif CaseInsensitiveFilesystemFeature.available():
4669
 
            return False
4670
 
        else:
4671
 
            return True
4672
 
 
4673
 
    def feature_name(self):
4674
 
        return 'case-sensitive filesystem'
4675
 
 
4676
 
# new coding style is for feature instances to be lowercase
4677
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4678
 
 
4679
 
 
4680
4484
# Only define SubUnitBzrRunner if subunit is available.
4681
4485
try:
4682
4486
    from subunit import TestProtocolClient
4683
4487
    from subunit.test_results import AutoTimingTestResultDecorator
4684
4488
    class SubUnitBzrProtocolClient(TestProtocolClient):
4685
4489
 
 
4490
        def stopTest(self, test):
 
4491
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4492
            _clear__type_equality_funcs(test)
 
4493
 
4686
4494
        def addSuccess(self, test, details=None):
4687
4495
            # The subunit client always includes the details in the subunit
4688
4496
            # stream, but we don't want to include it in ours.
4700
4508
except ImportError:
4701
4509
    pass
4702
4510
 
4703
 
class _PosixPermissionsFeature(Feature):
4704
 
 
4705
 
    def _probe(self):
4706
 
        def has_perms():
4707
 
            # create temporary file and check if specified perms are maintained.
4708
 
            import tempfile
4709
 
 
4710
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4711
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4712
 
            fd, name = f
4713
 
            os.close(fd)
4714
 
            os.chmod(name, write_perms)
4715
 
 
4716
 
            read_perms = os.stat(name).st_mode & 0777
4717
 
            os.unlink(name)
4718
 
            return (write_perms == read_perms)
4719
 
 
4720
 
        return (os.name == 'posix') and has_perms()
4721
 
 
4722
 
    def feature_name(self):
4723
 
        return 'POSIX permissions support'
4724
 
 
4725
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4511
 
 
4512
# API compatibility for old plugins; see bug 892622.
 
4513
for name in [
 
4514
    'Feature',
 
4515
    'HTTPServerFeature', 
 
4516
    'ModuleAvailableFeature',
 
4517
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4518
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4519
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4520
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4521
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4522
    'posix_permissions_feature',
 
4523
    ]:
 
4524
    globals()[name] = _CompatabilityThunkFeature(
 
4525
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4526
        'bzrlib.tests', name,
 
4527
        name, 'bzrlib.tests.features')
 
4528
 
 
4529
 
 
4530
for (old_name, new_name) in [
 
4531
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4532
    ]:
 
4533
    globals()[name] = _CompatabilityThunkFeature(
 
4534
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4535
        'bzrlib.tests', old_name,
 
4536
        new_name, 'bzrlib.tests.features')