~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Patch Queue Manager
  • Date: 2014-04-09 13:36:25 UTC
  • mfrom: (6592.1.2 1303879-py27-issues)
  • Revision ID: pqm@pqm.ubuntu.com-20140409133625-s24spv3kha2w2860
(vila) Fix python-2.7.6 test failures. (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
 
19
from __future__ import absolute_import
24
20
 
25
21
# NOTE: Some classes in here use camelCaseNaming() rather than
26
22
# underscore_naming().  That's for consistency with unittest; it's not the
42
38
import random
43
39
import re
44
40
import shlex
 
41
import site
45
42
import stat
46
43
import subprocess
47
44
import sys
63
60
import bzrlib
64
61
from bzrlib import (
65
62
    branchbuilder,
66
 
    bzrdir,
 
63
    controldir,
67
64
    chk_map,
68
65
    commands as _mod_commands,
69
66
    config,
 
67
    i18n,
70
68
    debug,
71
69
    errors,
72
70
    hooks,
94
92
    memory,
95
93
    pathfilter,
96
94
    )
 
95
from bzrlib.symbol_versioning import (
 
96
    deprecated_function,
 
97
    deprecated_in,
 
98
    )
97
99
from bzrlib.tests import (
 
100
    fixtures,
98
101
    test_server,
99
102
    TestUtil,
100
103
    treeshape,
101
104
    )
102
105
from bzrlib.ui import NullProgressView
103
106
from bzrlib.ui.text import TextUIFactory
 
107
from bzrlib.tests.features import _CompatabilityThunkFeature
104
108
 
105
109
# Mark this python module as being part of the implementation
106
110
# of unittest: this gives us better tracebacks where the last
133
137
isolated_environ = {
134
138
    'BZR_HOME': None,
135
139
    'HOME': None,
 
140
    'XDG_CONFIG_HOME': None,
136
141
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
142
    # tests do check our impls match APPDATA
138
143
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
142
147
    'BZREMAIL': None, # may still be present in the environment
143
148
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
149
    'BZR_PROGRESS_BAR': None,
145
 
    'BZR_LOG': None,
 
150
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
151
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
152
    # TestCase should not use disk resources, BZR_LOG is one.
 
153
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
146
154
    'BZR_PLUGIN_PATH': None,
147
155
    'BZR_DISABLE_PLUGINS': None,
148
156
    'BZR_PLUGINS_AT': None,
210
218
        osutils.set_or_unset_env(var, value)
211
219
 
212
220
 
 
221
def _clear__type_equality_funcs(test):
 
222
    """Cleanup bound methods stored on TestCase instances
 
223
 
 
224
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
225
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
226
 
 
227
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
228
    shipped in Oneiric, an object with no clear method was used, hence the
 
229
    extra complications, see bug 809048 for details.
 
230
    """
 
231
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
232
    if type_equality_funcs is not None:
 
233
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
234
        if tef_clear is None:
 
235
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
236
            if tef_instance_dict is not None:
 
237
                tef_clear = tef_instance_dict.clear
 
238
        if tef_clear is not None:
 
239
            tef_clear()
 
240
 
 
241
 
213
242
class ExtendedTestResult(testtools.TextTestResult):
214
243
    """Accepts, reports and accumulates the results of running tests.
215
244
 
219
248
    different types of display.
220
249
 
221
250
    When a test finishes, in whatever way, it calls one of the addSuccess,
222
 
    addFailure or addError classes.  These in turn may redirect to a more
 
251
    addFailure or addError methods.  These in turn may redirect to a more
223
252
    specific case for the special test results supported by our extended
224
253
    tests.
225
254
 
383
412
        getDetails = getattr(test, "getDetails", None)
384
413
        if getDetails is not None:
385
414
            getDetails().clear()
386
 
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
387
 
        if type_equality_funcs is not None:
388
 
            type_equality_funcs.clear()
 
415
        _clear__type_equality_funcs(test)
389
416
        self._traceback_from_test = None
390
417
 
391
418
    def startTests(self):
492
519
        self.not_applicable_count += 1
493
520
        self.report_not_applicable(test, reason)
494
521
 
 
522
    def _count_stored_tests(self):
 
523
        """Count of tests instances kept alive due to not succeeding"""
 
524
        return self.error_count + self.failure_count + self.known_failure_count
 
525
 
495
526
    def _post_mortem(self, tb=None):
496
527
        """Start a PDB post mortem session."""
497
528
        if os.environ.get('BZR_TEST_PDB', None):
928
959
 
929
960
    The method is really a factory and users are expected to use it as such.
930
961
    """
931
 
    
 
962
 
932
963
    kwargs['setUp'] = isolated_doctest_setUp
933
964
    kwargs['tearDown'] = isolated_doctest_tearDown
934
965
    return doctest.DocTestSuite(*args, **kwargs)
969
1000
 
970
1001
    def setUp(self):
971
1002
        super(TestCase, self).setUp()
 
1003
 
 
1004
        # At this point we're still accessing the config files in $BZR_HOME (as
 
1005
        # set by the user running selftest).
 
1006
        timeout = config.GlobalStack().get('selftest.timeout')
 
1007
        if timeout:
 
1008
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1009
            timeout_fixture.setUp()
 
1010
            self.addCleanup(timeout_fixture.cleanUp)
 
1011
 
972
1012
        for feature in getattr(self, '_test_needs_features', []):
973
1013
            self.requireFeature(feature)
974
1014
        self._cleanEnvironment()
 
1015
 
 
1016
        if bzrlib.global_state is not None:
 
1017
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1018
                              config.CommandLineStore())
 
1019
 
975
1020
        self._silenceUI()
976
1021
        self._startLogFile()
977
1022
        self._benchcalls = []
984
1029
        # between tests.  We should get rid of this altogether: bug 656694. --
985
1030
        # mbp 20101008
986
1031
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
987
 
        # Isolate config option expansion until its default value for bzrlib is
988
 
        # settled on or a the FIXME associated with _get_expand_default_value
989
 
        # is addressed -- vila 20110219
990
 
        self.overrideAttr(config, '_expand_default_value', None)
991
1032
        self._log_files = set()
 
1033
        # Each key in the ``_counters`` dict holds a value for a different
 
1034
        # counter. When the test ends, addDetail() should be used to output the
 
1035
        # counter values. This happens in install_counter_hook().
 
1036
        self._counters = {}
 
1037
        if 'config_stats' in selftest_debug_flags:
 
1038
            self._install_config_stats_hooks()
 
1039
        # Do not use i18n for tests (unless the test reverses this)
 
1040
        i18n.disable_i18n()
992
1041
 
993
1042
    def debug(self):
994
1043
        # debug a frame up.
995
1044
        import pdb
996
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
1045
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1046
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1047
                ).set_trace(sys._getframe().f_back)
997
1048
 
998
1049
    def discardDetail(self, name):
999
1050
        """Extend the addDetail, getDetails api so we can remove a detail.
1011
1062
        if name in details:
1012
1063
            del details[name]
1013
1064
 
 
1065
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1066
        """Install a counting hook.
 
1067
 
 
1068
        Any hook can be counted as long as it doesn't need to return a value.
 
1069
 
 
1070
        :param hooks: Where the hook should be installed.
 
1071
 
 
1072
        :param name: The hook name that will be counted.
 
1073
 
 
1074
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1075
            to ``name``.
 
1076
        """
 
1077
        _counters = self._counters # Avoid closing over self
 
1078
        if counter_name is None:
 
1079
            counter_name = name
 
1080
        if _counters.has_key(counter_name):
 
1081
            raise AssertionError('%s is already used as a counter name'
 
1082
                                  % (counter_name,))
 
1083
        _counters[counter_name] = 0
 
1084
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1085
            lambda: ['%d' % (_counters[counter_name],)]))
 
1086
        def increment_counter(*args, **kwargs):
 
1087
            _counters[counter_name] += 1
 
1088
        label = 'count %s calls' % (counter_name,)
 
1089
        hooks.install_named_hook(name, increment_counter, label)
 
1090
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1091
 
 
1092
    def _install_config_stats_hooks(self):
 
1093
        """Install config hooks to count hook calls.
 
1094
 
 
1095
        """
 
1096
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1097
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1098
                                       'config.%s' % (hook_name,))
 
1099
 
 
1100
        # The OldConfigHooks are private and need special handling to protect
 
1101
        # against recursive tests (tests that run other tests), so we just do
 
1102
        # manually what registering them into _builtin_known_hooks will provide
 
1103
        # us.
 
1104
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1105
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1106
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1107
                                      'old_config.%s' % (hook_name,))
 
1108
 
1014
1109
    def _clear_debug_flags(self):
1015
1110
        """Prevent externally set debug flags affecting tests.
1016
1111
 
1070
1165
        # break some locks on purpose and should be taken into account by
1071
1166
        # considering that breaking a lock is just a dirty way of releasing it.
1072
1167
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1073
 
            message = ('Different number of acquired and '
1074
 
                       'released or broken locks. (%s, %s + %s)' %
1075
 
                       (acquired_locks, released_locks, broken_locks))
 
1168
            message = (
 
1169
                'Different number of acquired and '
 
1170
                'released or broken locks.\n'
 
1171
                'acquired=%s\n'
 
1172
                'released=%s\n'
 
1173
                'broken=%s\n' %
 
1174
                (acquired_locks, released_locks, broken_locks))
1076
1175
            if not self._lock_check_thorough:
1077
1176
                # Rather than fail, just warn
1078
1177
                print "Broken test %s: %s" % (self, message)
1106
1205
 
1107
1206
    def permit_dir(self, name):
1108
1207
        """Permit a directory to be used by this test. See permit_url."""
1109
 
        name_transport = _mod_transport.get_transport(name)
 
1208
        name_transport = _mod_transport.get_transport_from_path(name)
1110
1209
        self.permit_url(name)
1111
1210
        self.permit_url(name_transport.base)
1112
1211
 
1191
1290
        self.addCleanup(transport_server.stop_server)
1192
1291
        # Obtain a real transport because if the server supplies a password, it
1193
1292
        # will be hidden from the base on the client side.
1194
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1293
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1195
1294
        # Some transport servers effectively chroot the backing transport;
1196
1295
        # others like SFTPServer don't - users of the transport can walk up the
1197
1296
        # transport to read the entire backing transport. This wouldn't matter
1228
1327
        # hook into bzr dir opening. This leaves a small window of error for
1229
1328
        # transport tests, but they are well known, and we can improve on this
1230
1329
        # step.
1231
 
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1330
        controldir.ControlDir.hooks.install_named_hook("pre_open",
1232
1331
            self._preopen_isolate_transport, "Check bzr directories are safe.")
1233
1332
 
1234
1333
    def _ndiff_strings(self, a, b):
1632
1731
        return result
1633
1732
 
1634
1733
    def _startLogFile(self):
1635
 
        """Send bzr and test log messages to a temporary file.
1636
 
 
1637
 
        The file is removed as the test is torn down.
1638
 
        """
 
1734
        """Setup a in-memory target for bzr and testcase log messages"""
1639
1735
        pseudo_log_file = StringIO()
1640
1736
        def _get_log_contents_for_weird_testtools_api():
1641
1737
            return [pseudo_log_file.getvalue().decode(
1648
1744
        self.addCleanup(self._finishLogFile)
1649
1745
 
1650
1746
    def _finishLogFile(self):
1651
 
        """Finished with the log file.
1652
 
 
1653
 
        Close the file and delete it, unless setKeepLogfile was called.
1654
 
        """
 
1747
        """Flush and dereference the in-memory log for this testcase"""
1655
1748
        if trace._trace_file:
1656
1749
            # flush the log file, to get all content
1657
1750
            trace._trace_file.flush()
1658
1751
        trace.pop_log_file(self._log_memento)
 
1752
        # The logging module now tracks references for cleanup so discard ours
 
1753
        del self._log_memento
1659
1754
 
1660
1755
    def thisFailsStrictLockCheck(self):
1661
1756
        """It is known that this test would fail with -Dstrict_locks.
1673
1768
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1674
1769
        """Overrides an object attribute restoring it after the test.
1675
1770
 
 
1771
        :note: This should be used with discretion; you should think about
 
1772
        whether it's better to make the code testable without monkey-patching.
 
1773
 
1676
1774
        :param obj: The object that will be mutated.
1677
1775
 
1678
1776
        :param attr_name: The attribute name we want to preserve/override in
1682
1780
 
1683
1781
        :returns: The actual attr value.
1684
1782
        """
1685
 
        value = getattr(obj, attr_name)
1686
1783
        # The actual value is captured by the call below
1687
 
        self.addCleanup(setattr, obj, attr_name, value)
 
1784
        value = getattr(obj, attr_name, _unitialized_attr)
 
1785
        if value is _unitialized_attr:
 
1786
            # When the test completes, the attribute should not exist, but if
 
1787
            # we aren't setting a value, we don't need to do anything.
 
1788
            if new is not _unitialized_attr:
 
1789
                self.addCleanup(delattr, obj, attr_name)
 
1790
        else:
 
1791
            self.addCleanup(setattr, obj, attr_name, value)
1688
1792
        if new is not _unitialized_attr:
1689
1793
            setattr(obj, attr_name, new)
1690
1794
        return value
1703
1807
        self.addCleanup(osutils.set_or_unset_env, name, value)
1704
1808
        return value
1705
1809
 
 
1810
    def recordCalls(self, obj, attr_name):
 
1811
        """Monkeypatch in a wrapper that will record calls.
 
1812
 
 
1813
        The monkeypatch is automatically removed when the test concludes.
 
1814
 
 
1815
        :param obj: The namespace holding the reference to be replaced;
 
1816
            typically a module, class, or object.
 
1817
        :param attr_name: A string for the name of the attribute to 
 
1818
            patch.
 
1819
        :returns: A list that will be extended with one item every time the
 
1820
            function is called, with a tuple of (args, kwargs).
 
1821
        """
 
1822
        calls = []
 
1823
 
 
1824
        def decorator(*args, **kwargs):
 
1825
            calls.append((args, kwargs))
 
1826
            return orig(*args, **kwargs)
 
1827
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1828
        return calls
 
1829
 
1706
1830
    def _cleanEnvironment(self):
1707
1831
        for name, value in isolated_environ.iteritems():
1708
1832
            self.overrideEnv(name, value)
1715
1839
        self._preserved_lazy_hooks.clear()
1716
1840
 
1717
1841
    def knownFailure(self, reason):
1718
 
        """This test has failed for some known reason."""
1719
 
        raise KnownFailure(reason)
 
1842
        """Declare that this test fails for a known reason
 
1843
 
 
1844
        Tests that are known to fail should generally be using expectedFailure
 
1845
        with an appropriate reverse assertion if a change could cause the test
 
1846
        to start passing. Conversely if the test has no immediate prospect of
 
1847
        succeeding then using skip is more suitable.
 
1848
 
 
1849
        When this method is called while an exception is being handled, that
 
1850
        traceback will be used, otherwise a new exception will be thrown to
 
1851
        provide one but won't be reported.
 
1852
        """
 
1853
        self._add_reason(reason)
 
1854
        try:
 
1855
            exc_info = sys.exc_info()
 
1856
            if exc_info != (None, None, None):
 
1857
                self._report_traceback(exc_info)
 
1858
            else:
 
1859
                try:
 
1860
                    raise self.failureException(reason)
 
1861
                except self.failureException:
 
1862
                    exc_info = sys.exc_info()
 
1863
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1864
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1865
        finally:
 
1866
            del exc_info
1720
1867
 
1721
1868
    def _suppress_log(self):
1722
1869
        """Remove the log info from details."""
1850
1997
 
1851
1998
        self.log('run bzr: %r', args)
1852
1999
        # FIXME: don't call into logging here
1853
 
        handler = logging.StreamHandler(stderr)
1854
 
        handler.setLevel(logging.INFO)
 
2000
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
2001
            level=logging.INFO)
1855
2002
        logger = logging.getLogger('')
1856
2003
        logger.addHandler(handler)
1857
2004
        old_ui_factory = ui.ui_factory
2045
2192
 
2046
2193
        if env_changes is None:
2047
2194
            env_changes = {}
 
2195
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2196
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2197
        # gets set to the computed directory of this parent process.
 
2198
        if site.USER_BASE is not None:
 
2199
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
2048
2200
        old_env = {}
2049
2201
 
2050
2202
        def cleanup_environment():
2241
2393
        from bzrlib.smart import request
2242
2394
        request_handlers = request.request_handlers
2243
2395
        orig_method = request_handlers.get(verb)
 
2396
        orig_info = request_handlers.get_info(verb)
2244
2397
        request_handlers.remove(verb)
2245
 
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2398
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2399
            info=orig_info)
2246
2400
 
2247
2401
 
2248
2402
class CapturedCall(object):
2271
2425
class TestCaseWithMemoryTransport(TestCase):
2272
2426
    """Common test class for tests that do not need disk resources.
2273
2427
 
2274
 
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2428
    Tests that need disk resources should derive from TestCaseInTempDir
 
2429
    orTestCaseWithTransport.
2275
2430
 
2276
2431
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2277
2432
 
2278
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2433
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2279
2434
    a directory which does not exist. This serves to help ensure test isolation
2280
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2281
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
2282
 
    file defaults for the transport in tests, nor does it obey the command line
 
2435
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2436
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2437
    defaults for the transport in tests, nor does it obey the command line
2283
2438
    override, so tests that accidentally write to the common directory should
2284
2439
    be rare.
2285
2440
 
2286
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
2287
 
    a .bzr directory that stops us ascending higher into the filesystem.
 
2441
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2442
        ``.bzr`` directory that stops us ascending higher into the filesystem.
2288
2443
    """
2289
2444
 
2290
2445
    TEST_ROOT = None
2300
2455
        self.transport_readonly_server = None
2301
2456
        self.__vfs_server = None
2302
2457
 
 
2458
    def setUp(self):
 
2459
        super(TestCaseWithMemoryTransport, self).setUp()
 
2460
 
 
2461
        def _add_disconnect_cleanup(transport):
 
2462
            """Schedule disconnection of given transport at test cleanup
 
2463
 
 
2464
            This needs to happen for all connected transports or leaks occur.
 
2465
 
 
2466
            Note reconnections may mean we call disconnect multiple times per
 
2467
            transport which is suboptimal but seems harmless.
 
2468
            """
 
2469
            self.addCleanup(transport.disconnect)
 
2470
 
 
2471
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2472
            _add_disconnect_cleanup, None)
 
2473
 
 
2474
        self._make_test_root()
 
2475
        self.addCleanup(os.chdir, os.getcwdu())
 
2476
        self.makeAndChdirToTestDir()
 
2477
        self.overrideEnvironmentForTesting()
 
2478
        self.__readonly_server = None
 
2479
        self.__server = None
 
2480
        self.reduceLockdirTimeout()
 
2481
        # Each test may use its own config files even if the local config files
 
2482
        # don't actually exist. They'll rightly fail if they try to create them
 
2483
        # though.
 
2484
        self.overrideAttr(config, '_shared_stores', {})
 
2485
 
2303
2486
    def get_transport(self, relpath=None):
2304
2487
        """Return a writeable transport.
2305
2488
 
2308
2491
 
2309
2492
        :param relpath: a path relative to the base url.
2310
2493
        """
2311
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2494
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2312
2495
        self.assertFalse(t.is_readonly())
2313
2496
        return t
2314
2497
 
2320
2503
 
2321
2504
        :param relpath: a path relative to the base url.
2322
2505
        """
2323
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2506
        t = _mod_transport.get_transport_from_url(
 
2507
            self.get_readonly_url(relpath))
2324
2508
        self.assertTrue(t.is_readonly())
2325
2509
        return t
2326
2510
 
2447
2631
        real branch.
2448
2632
        """
2449
2633
        root = TestCaseWithMemoryTransport.TEST_ROOT
2450
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2634
        try:
 
2635
            # Make sure we get a readable and accessible home for .bzr.log
 
2636
            # and/or config files, and not fallback to weird defaults (see
 
2637
            # http://pad.lv/825027).
 
2638
            self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2639
            os.environ['BZR_HOME'] = root
 
2640
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2641
            del os.environ['BZR_HOME']
 
2642
        except Exception, e:
 
2643
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
 
2644
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2645
        # we don't need to re-open the wt to check it hasn't changed.
 
2646
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2647
            wt.control_transport.get_bytes('dirstate'))
2451
2648
 
2452
2649
    def _check_safety_net(self):
2453
2650
        """Check that the safety .bzr directory have not been touched.
2456
2653
        propagating. This method ensures than a test did not leaked.
2457
2654
        """
2458
2655
        root = TestCaseWithMemoryTransport.TEST_ROOT
2459
 
        self.permit_url(_mod_transport.get_transport(root).base)
2460
 
        wt = workingtree.WorkingTree.open(root)
2461
 
        last_rev = wt.last_revision()
2462
 
        if last_rev != 'null:':
 
2656
        t = _mod_transport.get_transport_from_path(root)
 
2657
        self.permit_url(t.base)
 
2658
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2659
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2463
2660
            # The current test have modified the /bzr directory, we need to
2464
2661
            # recreate a new one or all the followng tests will fail.
2465
2662
            # If you need to inspect its content uncomment the following line
2497
2694
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2498
2695
        self.permit_dir(self.test_dir)
2499
2696
 
2500
 
    def make_branch(self, relpath, format=None):
 
2697
    def make_branch(self, relpath, format=None, name=None):
2501
2698
        """Create a branch on the transport at relpath."""
2502
2699
        repo = self.make_repository(relpath, format=format)
2503
 
        return repo.bzrdir.create_branch()
 
2700
        return repo.bzrdir.create_branch(append_revisions_only=False,
 
2701
                                         name=name)
 
2702
 
 
2703
    def get_default_format(self):
 
2704
        return 'default'
 
2705
 
 
2706
    def resolve_format(self, format):
 
2707
        """Resolve an object to a ControlDir format object.
 
2708
 
 
2709
        The initial format object can either already be
 
2710
        a ControlDirFormat, None (for the default format),
 
2711
        or a string with the name of the control dir format.
 
2712
 
 
2713
        :param format: Object to resolve
 
2714
        :return A ControlDirFormat instance
 
2715
        """
 
2716
        if format is None:
 
2717
            format = self.get_default_format()
 
2718
        if isinstance(format, basestring):
 
2719
            format = controldir.format_registry.make_bzrdir(format)
 
2720
        return format
2504
2721
 
2505
2722
    def make_bzrdir(self, relpath, format=None):
2506
2723
        try:
2510
2727
            t = _mod_transport.get_transport(maybe_a_url)
2511
2728
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2512
2729
                t.ensure_base()
2513
 
            if format is None:
2514
 
                format = 'default'
2515
 
            if isinstance(format, basestring):
2516
 
                format = bzrdir.format_registry.make_bzrdir(format)
 
2730
            format = self.resolve_format(format)
2517
2731
            return format.initialize_on_transport(t)
2518
2732
        except errors.UninitializableFormat:
2519
2733
            raise TestSkipped("Format %s is not initializable." % format)
2520
2734
 
2521
 
    def make_repository(self, relpath, shared=False, format=None):
 
2735
    def make_repository(self, relpath, shared=None, format=None):
2522
2736
        """Create a repository on our default transport at relpath.
2523
2737
 
2524
2738
        Note that relpath must be a relative path, not a full url.
2535
2749
            backing_server = self.get_server()
2536
2750
        smart_server = test_server.SmartTCPServer_for_testing()
2537
2751
        self.start_server(smart_server, backing_server)
2538
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2752
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2539
2753
                                                   ).clone(path)
2540
2754
        return remote_transport
2541
2755
 
2555
2769
        self.overrideEnv('HOME', test_home_dir)
2556
2770
        self.overrideEnv('BZR_HOME', test_home_dir)
2557
2771
 
2558
 
    def setUp(self):
2559
 
        super(TestCaseWithMemoryTransport, self).setUp()
2560
 
        # Ensure that ConnectedTransport doesn't leak sockets
2561
 
        def get_transport_with_cleanup(*args, **kwargs):
2562
 
            t = orig_get_transport(*args, **kwargs)
2563
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2564
 
                self.addCleanup(t.disconnect)
2565
 
            return t
2566
 
 
2567
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2568
 
                                               get_transport_with_cleanup)
2569
 
        self._make_test_root()
2570
 
        self.addCleanup(os.chdir, os.getcwdu())
2571
 
        self.makeAndChdirToTestDir()
2572
 
        self.overrideEnvironmentForTesting()
2573
 
        self.__readonly_server = None
2574
 
        self.__server = None
2575
 
        self.reduceLockdirTimeout()
2576
 
 
2577
2772
    def setup_smart_server_with_call_log(self):
2578
2773
        """Sets up a smart server as the transport server with a call log."""
2579
2774
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2775
        self.hpss_connections = []
2580
2776
        self.hpss_calls = []
2581
2777
        import traceback
2582
2778
        # Skip the current stack down to the caller of
2585
2781
        def capture_hpss_call(params):
2586
2782
            self.hpss_calls.append(
2587
2783
                CapturedCall(params, prefix_length))
 
2784
        def capture_connect(transport):
 
2785
            self.hpss_connections.append(transport)
2588
2786
        client._SmartClient.hooks.install_named_hook(
2589
2787
            'call', capture_hpss_call, None)
 
2788
        _mod_transport.Transport.hooks.install_named_hook(
 
2789
            'post_connect', capture_connect, None)
2590
2790
 
2591
2791
    def reset_smart_call_log(self):
2592
2792
        self.hpss_calls = []
 
2793
        self.hpss_connections = []
2593
2794
 
2594
2795
 
2595
2796
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2614
2815
 
2615
2816
    OVERRIDE_PYTHON = 'python'
2616
2817
 
 
2818
    def setUp(self):
 
2819
        super(TestCaseInTempDir, self).setUp()
 
2820
        # Remove the protection set in isolated_environ, we have a proper
 
2821
        # access to disk resources now.
 
2822
        self.overrideEnv('BZR_LOG', None)
 
2823
 
2617
2824
    def check_file_contents(self, filename, expect):
2618
2825
        self.log("check contents of file %s" % filename)
2619
2826
        f = file(filename)
2659
2866
        # stacking policy to honour; create a bzr dir with an unshared
2660
2867
        # repository (but not a branch - our code would be trying to escape
2661
2868
        # then!) to stop them, and permit it to be read.
2662
 
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2869
        # control = controldir.ControlDir.create(self.test_base_dir)
2663
2870
        # control.create_repository()
2664
2871
        self.test_home_dir = self.test_base_dir + '/home'
2665
2872
        os.mkdir(self.test_home_dir)
2700
2907
                "a list or a tuple. Got %r instead" % (shape,))
2701
2908
        # It's OK to just create them using forward slashes on windows.
2702
2909
        if transport is None or transport.is_readonly():
2703
 
            transport = _mod_transport.get_transport(".")
 
2910
            transport = _mod_transport.get_transport_from_path(".")
2704
2911
        for name in shape:
2705
2912
            self.assertIsInstance(name, basestring)
2706
2913
            if name[-1] == '/':
2754
2961
    readwrite one must both define get_url() as resolving to os.getcwd().
2755
2962
    """
2756
2963
 
 
2964
    def setUp(self):
 
2965
        super(TestCaseWithTransport, self).setUp()
 
2966
        self.__vfs_server = None
 
2967
 
2757
2968
    def get_vfs_only_server(self):
2758
2969
        """See TestCaseWithMemoryTransport.
2759
2970
 
2791
3002
        # this obviously requires a format that supports branch references
2792
3003
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2793
3004
        # RBC 20060208
 
3005
        format = self.resolve_format(format=format)
 
3006
        if not format.supports_workingtrees:
 
3007
            b = self.make_branch(relpath+'.branch', format=format)
 
3008
            return b.create_checkout(relpath, lightweight=True)
2794
3009
        b = self.make_branch(relpath, format=format)
2795
3010
        try:
2796
3011
            return b.bzrdir.create_workingtree()
2801
3016
            if self.vfs_transport_factory is test_server.LocalURLServer:
2802
3017
                # the branch is colocated on disk, we cannot create a checkout.
2803
3018
                # hopefully callers will expect this.
2804
 
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
3019
                local_controldir = controldir.ControlDir.open(
 
3020
                    self.get_vfs_only_url(relpath))
2805
3021
                wt = local_controldir.create_workingtree()
2806
3022
                if wt.branch._format != b._format:
2807
3023
                    wt._branch = b
2837
3053
        self.assertFalse(differences.has_changed(),
2838
3054
            "Trees %r and %r are different: %r" % (left, right, differences))
2839
3055
 
2840
 
    def setUp(self):
2841
 
        super(TestCaseWithTransport, self).setUp()
2842
 
        self.__vfs_server = None
2843
 
 
2844
3056
    def disable_missing_extensions_warning(self):
2845
3057
        """Some tests expect a precise stderr content.
2846
3058
 
2847
3059
        There is no point in forcing them to duplicate the extension related
2848
3060
        warning.
2849
3061
        """
2850
 
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3062
        config.GlobalStack().set('ignore_missing_extensions', True)
2851
3063
 
2852
3064
 
2853
3065
class ChrootedTestCase(TestCaseWithTransport):
3095
3307
                            result_decorators=result_decorators,
3096
3308
                            )
3097
3309
    runner.stop_on_failure=stop_on_failure
 
3310
    if isinstance(suite, unittest.TestSuite):
 
3311
        # Empty out _tests list of passed suite and populate new TestSuite
 
3312
        suite._tests[:], suite = [], TestSuite(suite)
3098
3313
    # built in decorator factories:
3099
3314
    decorators = [
3100
3315
        random_order(random_seed, runner),
3198
3413
 
3199
3414
class TestDecorator(TestUtil.TestSuite):
3200
3415
    """A decorator for TestCase/TestSuite objects.
3201
 
    
3202
 
    Usually, subclasses should override __iter__(used when flattening test
3203
 
    suites), which we do to filter, reorder, parallelise and so on, run() and
3204
 
    debug().
 
3416
 
 
3417
    Contains rather than flattening suite passed on construction
3205
3418
    """
3206
3419
 
3207
 
    def __init__(self, suite):
3208
 
        TestUtil.TestSuite.__init__(self)
3209
 
        self.addTest(suite)
3210
 
 
3211
 
    def countTestCases(self):
3212
 
        cases = 0
3213
 
        for test in self:
3214
 
            cases += test.countTestCases()
3215
 
        return cases
3216
 
 
3217
 
    def debug(self):
3218
 
        for test in self:
3219
 
            test.debug()
3220
 
 
3221
 
    def run(self, result):
3222
 
        # Use iteration on self, not self._tests, to allow subclasses to hook
3223
 
        # into __iter__.
3224
 
        for test in self:
3225
 
            if result.shouldStop:
3226
 
                break
3227
 
            test.run(result)
3228
 
        return result
 
3420
    def __init__(self, suite=None):
 
3421
        super(TestDecorator, self).__init__()
 
3422
        if suite is not None:
 
3423
            self.addTest(suite)
 
3424
 
 
3425
    # Don't need subclass run method with suite emptying
 
3426
    run = unittest.TestSuite.run
3229
3427
 
3230
3428
 
3231
3429
class CountingDecorator(TestDecorator):
3242
3440
    """A decorator which excludes test matching an exclude pattern."""
3243
3441
 
3244
3442
    def __init__(self, suite, exclude_pattern):
3245
 
        TestDecorator.__init__(self, suite)
3246
 
        self.exclude_pattern = exclude_pattern
3247
 
        self.excluded = False
3248
 
 
3249
 
    def __iter__(self):
3250
 
        if self.excluded:
3251
 
            return iter(self._tests)
3252
 
        self.excluded = True
3253
 
        suite = exclude_tests_by_re(self, self.exclude_pattern)
3254
 
        del self._tests[:]
3255
 
        self.addTests(suite)
3256
 
        return iter(self._tests)
 
3443
        super(ExcludeDecorator, self).__init__(
 
3444
            exclude_tests_by_re(suite, exclude_pattern))
3257
3445
 
3258
3446
 
3259
3447
class FilterTestsDecorator(TestDecorator):
3260
3448
    """A decorator which filters tests to those matching a pattern."""
3261
3449
 
3262
3450
    def __init__(self, suite, pattern):
3263
 
        TestDecorator.__init__(self, suite)
3264
 
        self.pattern = pattern
3265
 
        self.filtered = False
3266
 
 
3267
 
    def __iter__(self):
3268
 
        if self.filtered:
3269
 
            return iter(self._tests)
3270
 
        self.filtered = True
3271
 
        suite = filter_suite_by_re(self, self.pattern)
3272
 
        del self._tests[:]
3273
 
        self.addTests(suite)
3274
 
        return iter(self._tests)
 
3451
        super(FilterTestsDecorator, self).__init__(
 
3452
            filter_suite_by_re(suite, pattern))
3275
3453
 
3276
3454
 
3277
3455
class RandomDecorator(TestDecorator):
3278
3456
    """A decorator which randomises the order of its tests."""
3279
3457
 
3280
3458
    def __init__(self, suite, random_seed, stream):
3281
 
        TestDecorator.__init__(self, suite)
3282
 
        self.random_seed = random_seed
3283
 
        self.randomised = False
3284
 
        self.stream = stream
3285
 
 
3286
 
    def __iter__(self):
3287
 
        if self.randomised:
3288
 
            return iter(self._tests)
3289
 
        self.randomised = True
3290
 
        self.stream.write("Randomizing test order using seed %s\n\n" %
3291
 
            (self.actual_seed()))
 
3459
        random_seed = self.actual_seed(random_seed)
 
3460
        stream.write("Randomizing test order using seed %s\n\n" %
 
3461
            (random_seed,))
3292
3462
        # Initialise the random number generator.
3293
 
        random.seed(self.actual_seed())
3294
 
        suite = randomize_suite(self)
3295
 
        del self._tests[:]
3296
 
        self.addTests(suite)
3297
 
        return iter(self._tests)
 
3463
        random.seed(random_seed)
 
3464
        super(RandomDecorator, self).__init__(randomize_suite(suite))
3298
3465
 
3299
 
    def actual_seed(self):
3300
 
        if self.random_seed == "now":
 
3466
    @staticmethod
 
3467
    def actual_seed(seed):
 
3468
        if seed == "now":
3301
3469
            # We convert the seed to a long to make it reuseable across
3302
3470
            # invocations (because the user can reenter it).
3303
 
            self.random_seed = long(time.time())
 
3471
            return long(time.time())
3304
3472
        else:
3305
3473
            # Convert the seed to a long if we can
3306
3474
            try:
3307
 
                self.random_seed = long(self.random_seed)
3308
 
            except:
 
3475
                return long(seed)
 
3476
            except (TypeError, ValueError):
3309
3477
                pass
3310
 
        return self.random_seed
 
3478
        return seed
3311
3479
 
3312
3480
 
3313
3481
class TestFirstDecorator(TestDecorator):
3314
3482
    """A decorator which moves named tests to the front."""
3315
3483
 
3316
3484
    def __init__(self, suite, pattern):
3317
 
        TestDecorator.__init__(self, suite)
3318
 
        self.pattern = pattern
3319
 
        self.filtered = False
3320
 
 
3321
 
    def __iter__(self):
3322
 
        if self.filtered:
3323
 
            return iter(self._tests)
3324
 
        self.filtered = True
3325
 
        suites = split_suite_by_re(self, self.pattern)
3326
 
        del self._tests[:]
3327
 
        self.addTests(suites)
3328
 
        return iter(self._tests)
 
3485
        super(TestFirstDecorator, self).__init__()
 
3486
        self.addTests(split_suite_by_re(suite, pattern))
3329
3487
 
3330
3488
 
3331
3489
def partition_tests(suite, count):
3363
3521
    """
3364
3522
    concurrency = osutils.local_concurrency()
3365
3523
    result = []
3366
 
    from subunit import TestProtocolClient, ProtocolTestCase
 
3524
    from subunit import ProtocolTestCase
3367
3525
    from subunit.test_results import AutoTimingTestResultDecorator
3368
3526
    class TestInOtherProcess(ProtocolTestCase):
3369
3527
        # Should be in subunit, I think. RBC.
3375
3533
            try:
3376
3534
                ProtocolTestCase.run(self, result)
3377
3535
            finally:
3378
 
                os.waitpid(self.pid, 0)
 
3536
                pid, status = os.waitpid(self.pid, 0)
 
3537
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3538
            #                that something went wrong.
3379
3539
 
3380
3540
    test_blocks = partition_tests(suite, concurrency)
 
3541
    # Clear the tests from the original suite so it doesn't keep them alive
 
3542
    suite._tests[:] = []
3381
3543
    for process_tests in test_blocks:
3382
 
        process_suite = TestUtil.TestSuite()
3383
 
        process_suite.addTests(process_tests)
 
3544
        process_suite = TestUtil.TestSuite(process_tests)
 
3545
        # Also clear each split list so new suite has only reference
 
3546
        process_tests[:] = []
3384
3547
        c2pread, c2pwrite = os.pipe()
3385
3548
        pid = os.fork()
3386
3549
        if pid == 0:
3387
 
            workaround_zealous_crypto_random()
3388
3550
            try:
 
3551
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3552
                workaround_zealous_crypto_random()
3389
3553
                os.close(c2pread)
3390
3554
                # Leave stderr and stdout open so we can see test noise
3391
3555
                # Close stdin so that the child goes away if it decides to
3392
3556
                # read from stdin (otherwise its a roulette to see what
3393
3557
                # child actually gets keystrokes for pdb etc).
3394
3558
                sys.stdin.close()
3395
 
                sys.stdin = None
3396
 
                stream = os.fdopen(c2pwrite, 'wb', 1)
3397
3559
                subunit_result = AutoTimingTestResultDecorator(
3398
 
                    TestProtocolClient(stream))
 
3560
                    SubUnitBzrProtocolClient(stream))
3399
3561
                process_suite.run(subunit_result)
3400
 
            finally:
3401
 
                os._exit(0)
 
3562
            except:
 
3563
                # Try and report traceback on stream, but exit with error even
 
3564
                # if stream couldn't be created or something else goes wrong.
 
3565
                # The traceback is formatted to a string and written in one go
 
3566
                # to avoid interleaving lines from multiple failing children.
 
3567
                try:
 
3568
                    stream.write(traceback.format_exc())
 
3569
                finally:
 
3570
                    os._exit(1)
 
3571
            os._exit(0)
3402
3572
        else:
3403
3573
            os.close(c2pwrite)
3404
3574
            stream = os.fdopen(c2pread, 'rb', 1)
3510
3680
#                           with proper exclusion rules.
3511
3681
#   -Ethreads               Will display thread ident at creation/join time to
3512
3682
#                           help track thread leaks
 
3683
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3684
#                           deallocated after being completed.
 
3685
#   -Econfig_stats          Will collect statistics using addDetail
3513
3686
selftest_debug_flags = set()
3514
3687
 
3515
3688
 
3619
3792
 
3620
3793
    :return: (absents, duplicates) absents is a list containing the test found
3621
3794
        in id_list but not in test_suite, duplicates is a list containing the
3622
 
        test found multiple times in test_suite.
 
3795
        tests found multiple times in test_suite.
3623
3796
 
3624
3797
    When using a prefined test id list, it may occurs that some tests do not
3625
3798
    exist anymore or that some tests use the same id. This function warns the
3749
3922
        'bzrlib.doc',
3750
3923
        'bzrlib.tests.blackbox',
3751
3924
        'bzrlib.tests.commands',
3752
 
        'bzrlib.tests.doc_generate',
3753
3925
        'bzrlib.tests.per_branch',
3754
3926
        'bzrlib.tests.per_bzrdir',
3755
3927
        'bzrlib.tests.per_controldir',
3819
3991
        'bzrlib.tests.test_email_message',
3820
3992
        'bzrlib.tests.test_eol_filters',
3821
3993
        'bzrlib.tests.test_errors',
 
3994
        'bzrlib.tests.test_estimate_compressed_size',
3822
3995
        'bzrlib.tests.test_export',
3823
3996
        'bzrlib.tests.test_export_pot',
3824
3997
        'bzrlib.tests.test_extract',
 
3998
        'bzrlib.tests.test_features',
3825
3999
        'bzrlib.tests.test_fetch',
3826
4000
        'bzrlib.tests.test_fixtures',
3827
4001
        'bzrlib.tests.test_fifo_cache',
3828
4002
        'bzrlib.tests.test_filters',
 
4003
        'bzrlib.tests.test_filter_tree',
3829
4004
        'bzrlib.tests.test_ftp_transport',
3830
4005
        'bzrlib.tests.test_foreign',
3831
4006
        'bzrlib.tests.test_generate_docs',
3840
4015
        'bzrlib.tests.test_http',
3841
4016
        'bzrlib.tests.test_http_response',
3842
4017
        'bzrlib.tests.test_https_ca_bundle',
 
4018
        'bzrlib.tests.test_https_urllib',
3843
4019
        'bzrlib.tests.test_i18n',
3844
4020
        'bzrlib.tests.test_identitymap',
3845
4021
        'bzrlib.tests.test_ignores',
3894
4070
        'bzrlib.tests.test_revisiontree',
3895
4071
        'bzrlib.tests.test_rio',
3896
4072
        'bzrlib.tests.test_rules',
 
4073
        'bzrlib.tests.test_url_policy_open',
3897
4074
        'bzrlib.tests.test_sampler',
3898
4075
        'bzrlib.tests.test_scenarios',
3899
4076
        'bzrlib.tests.test_script',
3906
4083
        'bzrlib.tests.test_smart',
3907
4084
        'bzrlib.tests.test_smart_add',
3908
4085
        'bzrlib.tests.test_smart_request',
 
4086
        'bzrlib.tests.test_smart_signals',
3909
4087
        'bzrlib.tests.test_smart_transport',
3910
4088
        'bzrlib.tests.test_smtp_connection',
3911
4089
        'bzrlib.tests.test_source',
3942
4120
        'bzrlib.tests.test_version',
3943
4121
        'bzrlib.tests.test_version_info',
3944
4122
        'bzrlib.tests.test_versionedfile',
 
4123
        'bzrlib.tests.test_vf_search',
3945
4124
        'bzrlib.tests.test_weave',
3946
4125
        'bzrlib.tests.test_whitebox',
3947
4126
        'bzrlib.tests.test_win32utils',
4162
4341
    """Copy test and apply scenario to it.
4163
4342
 
4164
4343
    :param test: A test to adapt.
4165
 
    :param scenario: A tuple describing the scenarion.
 
4344
    :param scenario: A tuple describing the scenario.
4166
4345
        The first element of the tuple is the new test id.
4167
4346
        The second element is a dict containing attributes to set on the
4168
4347
        test.
4222
4401
        the module is available.
4223
4402
    """
4224
4403
 
 
4404
    from bzrlib.tests.features import ModuleAvailableFeature
4225
4405
    py_module = pyutils.get_named_object(py_module_name)
4226
4406
    scenarios = [
4227
4407
        ('python', {'module': py_module}),
4268
4448
                         % (os.path.basename(dirname), printable_e))
4269
4449
 
4270
4450
 
4271
 
class Feature(object):
4272
 
    """An operating system Feature."""
4273
 
 
4274
 
    def __init__(self):
4275
 
        self._available = None
4276
 
 
4277
 
    def available(self):
4278
 
        """Is the feature available?
4279
 
 
4280
 
        :return: True if the feature is available.
4281
 
        """
4282
 
        if self._available is None:
4283
 
            self._available = self._probe()
4284
 
        return self._available
4285
 
 
4286
 
    def _probe(self):
4287
 
        """Implement this method in concrete features.
4288
 
 
4289
 
        :return: True if the feature is available.
4290
 
        """
4291
 
        raise NotImplementedError
4292
 
 
4293
 
    def __str__(self):
4294
 
        if getattr(self, 'feature_name', None):
4295
 
            return self.feature_name()
4296
 
        return self.__class__.__name__
4297
 
 
4298
 
 
4299
 
class _SymlinkFeature(Feature):
4300
 
 
4301
 
    def _probe(self):
4302
 
        return osutils.has_symlinks()
4303
 
 
4304
 
    def feature_name(self):
4305
 
        return 'symlinks'
4306
 
 
4307
 
SymlinkFeature = _SymlinkFeature()
4308
 
 
4309
 
 
4310
 
class _HardlinkFeature(Feature):
4311
 
 
4312
 
    def _probe(self):
4313
 
        return osutils.has_hardlinks()
4314
 
 
4315
 
    def feature_name(self):
4316
 
        return 'hardlinks'
4317
 
 
4318
 
HardlinkFeature = _HardlinkFeature()
4319
 
 
4320
 
 
4321
 
class _OsFifoFeature(Feature):
4322
 
 
4323
 
    def _probe(self):
4324
 
        return getattr(os, 'mkfifo', None)
4325
 
 
4326
 
    def feature_name(self):
4327
 
        return 'filesystem fifos'
4328
 
 
4329
 
OsFifoFeature = _OsFifoFeature()
4330
 
 
4331
 
 
4332
 
class _UnicodeFilenameFeature(Feature):
4333
 
    """Does the filesystem support Unicode filenames?"""
4334
 
 
4335
 
    def _probe(self):
4336
 
        try:
4337
 
            # Check for character combinations unlikely to be covered by any
4338
 
            # single non-unicode encoding. We use the characters
4339
 
            # - greek small letter alpha (U+03B1) and
4340
 
            # - braille pattern dots-123456 (U+283F).
4341
 
            os.stat(u'\u03b1\u283f')
4342
 
        except UnicodeEncodeError:
4343
 
            return False
4344
 
        except (IOError, OSError):
4345
 
            # The filesystem allows the Unicode filename but the file doesn't
4346
 
            # exist.
4347
 
            return True
4348
 
        else:
4349
 
            # The filesystem allows the Unicode filename and the file exists,
4350
 
            # for some reason.
4351
 
            return True
4352
 
 
4353
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4354
 
 
4355
 
 
4356
 
class _CompatabilityThunkFeature(Feature):
4357
 
    """This feature is just a thunk to another feature.
4358
 
 
4359
 
    It issues a deprecation warning if it is accessed, to let you know that you
4360
 
    should really use a different feature.
4361
 
    """
4362
 
 
4363
 
    def __init__(self, dep_version, module, name,
4364
 
                 replacement_name, replacement_module=None):
4365
 
        super(_CompatabilityThunkFeature, self).__init__()
4366
 
        self._module = module
4367
 
        if replacement_module is None:
4368
 
            replacement_module = module
4369
 
        self._replacement_module = replacement_module
4370
 
        self._name = name
4371
 
        self._replacement_name = replacement_name
4372
 
        self._dep_version = dep_version
4373
 
        self._feature = None
4374
 
 
4375
 
    def _ensure(self):
4376
 
        if self._feature is None:
4377
 
            depr_msg = self._dep_version % ('%s.%s'
4378
 
                                            % (self._module, self._name))
4379
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4380
 
                                               self._replacement_name)
4381
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4382
 
            # Import the new feature and use it as a replacement for the
4383
 
            # deprecated one.
4384
 
            self._feature = pyutils.get_named_object(
4385
 
                self._replacement_module, self._replacement_name)
4386
 
 
4387
 
    def _probe(self):
4388
 
        self._ensure()
4389
 
        return self._feature._probe()
4390
 
 
4391
 
 
4392
 
class ModuleAvailableFeature(Feature):
4393
 
    """This is a feature than describes a module we want to be available.
4394
 
 
4395
 
    Declare the name of the module in __init__(), and then after probing, the
4396
 
    module will be available as 'self.module'.
4397
 
 
4398
 
    :ivar module: The module if it is available, else None.
4399
 
    """
4400
 
 
4401
 
    def __init__(self, module_name):
4402
 
        super(ModuleAvailableFeature, self).__init__()
4403
 
        self.module_name = module_name
4404
 
 
4405
 
    def _probe(self):
4406
 
        try:
4407
 
            self._module = __import__(self.module_name, {}, {}, [''])
4408
 
            return True
4409
 
        except ImportError:
4410
 
            return False
4411
 
 
4412
 
    @property
4413
 
    def module(self):
4414
 
        if self.available(): # Make sure the probe has been done
4415
 
            return self._module
4416
 
        return None
4417
 
 
4418
 
    def feature_name(self):
4419
 
        return self.module_name
4420
 
 
4421
 
 
4422
4451
def probe_unicode_in_user_encoding():
4423
4452
    """Try to encode several unicode strings to use in unicode-aware tests.
4424
4453
    Return first successfull match.
4452
4481
    return None
4453
4482
 
4454
4483
 
4455
 
class _HTTPSServerFeature(Feature):
4456
 
    """Some tests want an https Server, check if one is available.
4457
 
 
4458
 
    Right now, the only way this is available is under python2.6 which provides
4459
 
    an ssl module.
4460
 
    """
4461
 
 
4462
 
    def _probe(self):
4463
 
        try:
4464
 
            import ssl
4465
 
            return True
4466
 
        except ImportError:
4467
 
            return False
4468
 
 
4469
 
    def feature_name(self):
4470
 
        return 'HTTPSServer'
4471
 
 
4472
 
 
4473
 
HTTPSServerFeature = _HTTPSServerFeature()
4474
 
 
4475
 
 
4476
 
class _UnicodeFilename(Feature):
4477
 
    """Does the filesystem support Unicode filenames?"""
4478
 
 
4479
 
    def _probe(self):
4480
 
        try:
4481
 
            os.stat(u'\u03b1')
4482
 
        except UnicodeEncodeError:
4483
 
            return False
4484
 
        except (IOError, OSError):
4485
 
            # The filesystem allows the Unicode filename but the file doesn't
4486
 
            # exist.
4487
 
            return True
4488
 
        else:
4489
 
            # The filesystem allows the Unicode filename and the file exists,
4490
 
            # for some reason.
4491
 
            return True
4492
 
 
4493
 
UnicodeFilename = _UnicodeFilename()
4494
 
 
4495
 
 
4496
 
class _ByteStringNamedFilesystem(Feature):
4497
 
    """Is the filesystem based on bytes?"""
4498
 
 
4499
 
    def _probe(self):
4500
 
        if os.name == "posix":
4501
 
            return True
4502
 
        return False
4503
 
 
4504
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4505
 
 
4506
 
 
4507
 
class _UTF8Filesystem(Feature):
4508
 
    """Is the filesystem UTF-8?"""
4509
 
 
4510
 
    def _probe(self):
4511
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4512
 
            return True
4513
 
        return False
4514
 
 
4515
 
UTF8Filesystem = _UTF8Filesystem()
4516
 
 
4517
 
 
4518
 
class _BreakinFeature(Feature):
4519
 
    """Does this platform support the breakin feature?"""
4520
 
 
4521
 
    def _probe(self):
4522
 
        from bzrlib import breakin
4523
 
        if breakin.determine_signal() is None:
4524
 
            return False
4525
 
        if sys.platform == 'win32':
4526
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4527
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4528
 
            # access the function
4529
 
            try:
4530
 
                import ctypes
4531
 
            except OSError:
4532
 
                return False
4533
 
        return True
4534
 
 
4535
 
    def feature_name(self):
4536
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4537
 
 
4538
 
 
4539
 
BreakinFeature = _BreakinFeature()
4540
 
 
4541
 
 
4542
 
class _CaseInsCasePresFilenameFeature(Feature):
4543
 
    """Is the file-system case insensitive, but case-preserving?"""
4544
 
 
4545
 
    def _probe(self):
4546
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4547
 
        try:
4548
 
            # first check truly case-preserving for created files, then check
4549
 
            # case insensitive when opening existing files.
4550
 
            name = osutils.normpath(name)
4551
 
            base, rel = osutils.split(name)
4552
 
            found_rel = osutils.canonical_relpath(base, name)
4553
 
            return (found_rel == rel
4554
 
                    and os.path.isfile(name.upper())
4555
 
                    and os.path.isfile(name.lower()))
4556
 
        finally:
4557
 
            os.close(fileno)
4558
 
            os.remove(name)
4559
 
 
4560
 
    def feature_name(self):
4561
 
        return "case-insensitive case-preserving filesystem"
4562
 
 
4563
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4564
 
 
4565
 
 
4566
 
class _CaseInsensitiveFilesystemFeature(Feature):
4567
 
    """Check if underlying filesystem is case-insensitive but *not* case
4568
 
    preserving.
4569
 
    """
4570
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4571
 
    # more likely to be case preserving, so this case is rare.
4572
 
 
4573
 
    def _probe(self):
4574
 
        if CaseInsCasePresFilenameFeature.available():
4575
 
            return False
4576
 
 
4577
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4578
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4579
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4580
 
        else:
4581
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4582
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4583
 
            dir=root)
4584
 
        name_a = osutils.pathjoin(tdir, 'a')
4585
 
        name_A = osutils.pathjoin(tdir, 'A')
4586
 
        os.mkdir(name_a)
4587
 
        result = osutils.isdir(name_A)
4588
 
        _rmtree_temp_dir(tdir)
4589
 
        return result
4590
 
 
4591
 
    def feature_name(self):
4592
 
        return 'case-insensitive filesystem'
4593
 
 
4594
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4595
 
 
4596
 
 
4597
 
class _CaseSensitiveFilesystemFeature(Feature):
4598
 
 
4599
 
    def _probe(self):
4600
 
        if CaseInsCasePresFilenameFeature.available():
4601
 
            return False
4602
 
        elif CaseInsensitiveFilesystemFeature.available():
4603
 
            return False
4604
 
        else:
4605
 
            return True
4606
 
 
4607
 
    def feature_name(self):
4608
 
        return 'case-sensitive filesystem'
4609
 
 
4610
 
# new coding style is for feature instances to be lowercase
4611
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4612
 
 
4613
 
 
4614
4484
# Only define SubUnitBzrRunner if subunit is available.
4615
4485
try:
4616
4486
    from subunit import TestProtocolClient
4617
4487
    from subunit.test_results import AutoTimingTestResultDecorator
4618
4488
    class SubUnitBzrProtocolClient(TestProtocolClient):
4619
4489
 
 
4490
        def stopTest(self, test):
 
4491
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4492
            _clear__type_equality_funcs(test)
 
4493
 
4620
4494
        def addSuccess(self, test, details=None):
4621
4495
            # The subunit client always includes the details in the subunit
4622
4496
            # stream, but we don't want to include it in ours.
4634
4508
except ImportError:
4635
4509
    pass
4636
4510
 
4637
 
class _PosixPermissionsFeature(Feature):
4638
 
 
4639
 
    def _probe(self):
4640
 
        def has_perms():
4641
 
            # create temporary file and check if specified perms are maintained.
4642
 
            import tempfile
4643
 
 
4644
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4645
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4646
 
            fd, name = f
4647
 
            os.close(fd)
4648
 
            os.chmod(name, write_perms)
4649
 
 
4650
 
            read_perms = os.stat(name).st_mode & 0777
4651
 
            os.unlink(name)
4652
 
            return (write_perms == read_perms)
4653
 
 
4654
 
        return (os.name == 'posix') and has_perms()
4655
 
 
4656
 
    def feature_name(self):
4657
 
        return 'POSIX permissions support'
4658
 
 
4659
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4511
 
 
4512
# API compatibility for old plugins; see bug 892622.
 
4513
for name in [
 
4514
    'Feature',
 
4515
    'HTTPServerFeature', 
 
4516
    'ModuleAvailableFeature',
 
4517
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4518
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4519
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4520
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4521
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4522
    'posix_permissions_feature',
 
4523
    ]:
 
4524
    globals()[name] = _CompatabilityThunkFeature(
 
4525
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4526
        'bzrlib.tests', name,
 
4527
        name, 'bzrlib.tests.features')
 
4528
 
 
4529
 
 
4530
for (old_name, new_name) in [
 
4531
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4532
    ]:
 
4533
    globals()[name] = _CompatabilityThunkFeature(
 
4534
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4535
        'bzrlib.tests', old_name,
 
4536
        new_name, 'bzrlib.tests.features')