~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Tarmac
  • Author(s): Vincent Ladeuil
  • Date: 2017-01-30 14:42:05 UTC
  • mfrom: (6620.1.1 trunk)
  • Revision ID: tarmac-20170130144205-r8fh2xpmiuxyozpv
Merge  2.7 into trunk including fix for bug #1657238 [r=vila]

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2013, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
 
19
from __future__ import absolute_import
24
20
 
25
21
# NOTE: Some classes in here use camelCaseNaming() rather than
26
22
# underscore_naming().  That's for consistency with unittest; it's not the
36
32
import errno
37
33
import itertools
38
34
import logging
 
35
import math
39
36
import os
40
37
import platform
41
38
import pprint
42
39
import random
43
40
import re
44
41
import shlex
 
42
import site
45
43
import stat
46
44
import subprocess
47
45
import sys
63
61
import bzrlib
64
62
from bzrlib import (
65
63
    branchbuilder,
66
 
    bzrdir,
 
64
    controldir,
67
65
    chk_map,
68
66
    commands as _mod_commands,
69
67
    config,
 
68
    i18n,
70
69
    debug,
71
70
    errors,
72
71
    hooks,
94
93
    memory,
95
94
    pathfilter,
96
95
    )
 
96
from bzrlib.symbol_versioning import (
 
97
    deprecated_function,
 
98
    deprecated_in,
 
99
    )
97
100
from bzrlib.tests import (
 
101
    fixtures,
98
102
    test_server,
99
103
    TestUtil,
100
104
    treeshape,
101
105
    )
102
106
from bzrlib.ui import NullProgressView
103
107
from bzrlib.ui.text import TextUIFactory
 
108
from bzrlib.tests.features import _CompatabilityThunkFeature
104
109
 
105
110
# Mark this python module as being part of the implementation
106
111
# of unittest: this gives us better tracebacks where the last
133
138
isolated_environ = {
134
139
    'BZR_HOME': None,
135
140
    'HOME': None,
 
141
    'XDG_CONFIG_HOME': None,
136
142
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
143
    # tests do check our impls match APPDATA
138
144
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
213
219
        osutils.set_or_unset_env(var, value)
214
220
 
215
221
 
 
222
def _clear__type_equality_funcs(test):
 
223
    """Cleanup bound methods stored on TestCase instances
 
224
 
 
225
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
226
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
227
 
 
228
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
229
    shipped in Oneiric, an object with no clear method was used, hence the
 
230
    extra complications, see bug 809048 for details.
 
231
    """
 
232
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
233
    if type_equality_funcs is not None:
 
234
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
235
        if tef_clear is None:
 
236
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
237
            if tef_instance_dict is not None:
 
238
                tef_clear = tef_instance_dict.clear
 
239
        if tef_clear is not None:
 
240
            tef_clear()
 
241
 
 
242
 
216
243
class ExtendedTestResult(testtools.TextTestResult):
217
244
    """Accepts, reports and accumulates the results of running tests.
218
245
 
222
249
    different types of display.
223
250
 
224
251
    When a test finishes, in whatever way, it calls one of the addSuccess,
225
 
    addFailure or addError classes.  These in turn may redirect to a more
 
252
    addFailure or addError methods.  These in turn may redirect to a more
226
253
    specific case for the special test results supported by our extended
227
254
    tests.
228
255
 
333
360
            return float(''.join(details['benchtime'].iter_bytes()))
334
361
        return getattr(testCase, "_benchtime", None)
335
362
 
 
363
    def _delta_to_float(self, a_timedelta, precision):
 
364
        # This calls ceiling to ensure that the most pessimistic view of time
 
365
        # taken is shown (rather than leaving it to the Python %f operator
 
366
        # to decide whether to round/floor/ceiling. This was added when we
 
367
        # had pyp3 test failures that suggest a floor was happening.
 
368
        shift = 10 ** precision
 
369
        return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
 
370
            a_timedelta.microseconds / 1000000.0) * shift) / shift
 
371
 
336
372
    def _elapsedTestTimeString(self):
337
373
        """Return a time string for the overall time the current test has taken."""
338
374
        return self._formatTime(self._delta_to_float(
339
 
            self._now() - self._start_datetime))
 
375
            self._now() - self._start_datetime, 3))
340
376
 
341
377
    def _testTimeString(self, testCase):
342
378
        benchmark_time = self._extractBenchmarkTime(testCase)
386
422
        getDetails = getattr(test, "getDetails", None)
387
423
        if getDetails is not None:
388
424
            getDetails().clear()
389
 
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
390
 
        if type_equality_funcs is not None:
391
 
            type_equality_funcs.clear()
 
425
        _clear__type_equality_funcs(test)
392
426
        self._traceback_from_test = None
393
427
 
394
428
    def startTests(self):
495
529
        self.not_applicable_count += 1
496
530
        self.report_not_applicable(test, reason)
497
531
 
 
532
    def _count_stored_tests(self):
 
533
        """Count of tests instances kept alive due to not succeeding"""
 
534
        return self.error_count + self.failure_count + self.known_failure_count
 
535
 
498
536
    def _post_mortem(self, tb=None):
499
537
        """Start a PDB post mortem session."""
500
538
        if os.environ.get('BZR_TEST_PDB', None):
972
1010
 
973
1011
    def setUp(self):
974
1012
        super(TestCase, self).setUp()
 
1013
 
 
1014
        # At this point we're still accessing the config files in $BZR_HOME (as
 
1015
        # set by the user running selftest).
 
1016
        timeout = config.GlobalStack().get('selftest.timeout')
 
1017
        if timeout:
 
1018
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1019
            timeout_fixture.setUp()
 
1020
            self.addCleanup(timeout_fixture.cleanUp)
 
1021
 
975
1022
        for feature in getattr(self, '_test_needs_features', []):
976
1023
            self.requireFeature(feature)
977
1024
        self._cleanEnvironment()
 
1025
 
 
1026
        if bzrlib.global_state is not None:
 
1027
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1028
                              config.CommandLineStore())
 
1029
 
978
1030
        self._silenceUI()
979
1031
        self._startLogFile()
980
1032
        self._benchcalls = []
987
1039
        # between tests.  We should get rid of this altogether: bug 656694. --
988
1040
        # mbp 20101008
989
1041
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
990
 
        # Isolate config option expansion until its default value for bzrlib is
991
 
        # settled on or a the FIXME associated with _get_expand_default_value
992
 
        # is addressed -- vila 20110219
993
 
        self.overrideAttr(config, '_expand_default_value', None)
994
1042
        self._log_files = set()
995
1043
        # Each key in the ``_counters`` dict holds a value for a different
996
1044
        # counter. When the test ends, addDetail() should be used to output the
998
1046
        self._counters = {}
999
1047
        if 'config_stats' in selftest_debug_flags:
1000
1048
            self._install_config_stats_hooks()
 
1049
        # Do not use i18n for tests (unless the test reverses this)
 
1050
        i18n.disable_i18n()
1001
1051
 
1002
1052
    def debug(self):
1003
1053
        # debug a frame up.
1004
1054
        import pdb
1005
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
1055
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1056
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1057
                ).set_trace(sys._getframe().f_back)
1006
1058
 
1007
1059
    def discardDetail(self, name):
1008
1060
        """Extend the addDetail, getDetails api so we can remove a detail.
1163
1215
 
1164
1216
    def permit_dir(self, name):
1165
1217
        """Permit a directory to be used by this test. See permit_url."""
1166
 
        name_transport = _mod_transport.get_transport(name)
 
1218
        name_transport = _mod_transport.get_transport_from_path(name)
1167
1219
        self.permit_url(name)
1168
1220
        self.permit_url(name_transport.base)
1169
1221
 
1248
1300
        self.addCleanup(transport_server.stop_server)
1249
1301
        # Obtain a real transport because if the server supplies a password, it
1250
1302
        # will be hidden from the base on the client side.
1251
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1303
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1252
1304
        # Some transport servers effectively chroot the backing transport;
1253
1305
        # others like SFTPServer don't - users of the transport can walk up the
1254
1306
        # transport to read the entire backing transport. This wouldn't matter
1285
1337
        # hook into bzr dir opening. This leaves a small window of error for
1286
1338
        # transport tests, but they are well known, and we can improve on this
1287
1339
        # step.
1288
 
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1340
        controldir.ControlDir.hooks.install_named_hook("pre_open",
1289
1341
            self._preopen_isolate_transport, "Check bzr directories are safe.")
1290
1342
 
1291
1343
    def _ndiff_strings(self, a, b):
1317
1369
            % (message,
1318
1370
               pprint.pformat(a), pprint.pformat(b)))
1319
1371
 
 
1372
    # FIXME: This is deprecated in unittest2 but plugins may still use it so we
 
1373
    # need a deprecation period for them -- vila 2016-02-01
1320
1374
    assertEquals = assertEqual
1321
1375
 
1322
1376
    def assertEqualDiff(self, a, b, message=None):
1325
1379
        This is intended for use with multi-line strings where it can
1326
1380
        be hard to find the differences by eye.
1327
1381
        """
1328
 
        # TODO: perhaps override assertEquals to call this for strings?
 
1382
        # TODO: perhaps override assertEqual to call this for strings?
1329
1383
        if a == b:
1330
1384
            return
1331
1385
        if message is None:
1689
1743
        return result
1690
1744
 
1691
1745
    def _startLogFile(self):
1692
 
        """Send bzr and test log messages to a temporary file.
1693
 
 
1694
 
        The file is removed as the test is torn down.
1695
 
        """
 
1746
        """Setup a in-memory target for bzr and testcase log messages"""
1696
1747
        pseudo_log_file = StringIO()
1697
1748
        def _get_log_contents_for_weird_testtools_api():
1698
1749
            return [pseudo_log_file.getvalue().decode(
1705
1756
        self.addCleanup(self._finishLogFile)
1706
1757
 
1707
1758
    def _finishLogFile(self):
1708
 
        """Finished with the log file.
1709
 
 
1710
 
        Close the file and delete it.
1711
 
        """
 
1759
        """Flush and dereference the in-memory log for this testcase"""
1712
1760
        if trace._trace_file:
1713
1761
            # flush the log file, to get all content
1714
1762
            trace._trace_file.flush()
1715
1763
        trace.pop_log_file(self._log_memento)
 
1764
        # The logging module now tracks references for cleanup so discard ours
 
1765
        del self._log_memento
1716
1766
 
1717
1767
    def thisFailsStrictLockCheck(self):
1718
1768
        """It is known that this test would fail with -Dstrict_locks.
1730
1780
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1731
1781
        """Overrides an object attribute restoring it after the test.
1732
1782
 
 
1783
        :note: This should be used with discretion; you should think about
 
1784
        whether it's better to make the code testable without monkey-patching.
 
1785
 
1733
1786
        :param obj: The object that will be mutated.
1734
1787
 
1735
1788
        :param attr_name: The attribute name we want to preserve/override in
1739
1792
 
1740
1793
        :returns: The actual attr value.
1741
1794
        """
1742
 
        value = getattr(obj, attr_name)
1743
1795
        # The actual value is captured by the call below
1744
 
        self.addCleanup(setattr, obj, attr_name, value)
 
1796
        value = getattr(obj, attr_name, _unitialized_attr)
 
1797
        if value is _unitialized_attr:
 
1798
            # When the test completes, the attribute should not exist, but if
 
1799
            # we aren't setting a value, we don't need to do anything.
 
1800
            if new is not _unitialized_attr:
 
1801
                self.addCleanup(delattr, obj, attr_name)
 
1802
        else:
 
1803
            self.addCleanup(setattr, obj, attr_name, value)
1745
1804
        if new is not _unitialized_attr:
1746
1805
            setattr(obj, attr_name, new)
1747
1806
        return value
1760
1819
        self.addCleanup(osutils.set_or_unset_env, name, value)
1761
1820
        return value
1762
1821
 
 
1822
    def recordCalls(self, obj, attr_name):
 
1823
        """Monkeypatch in a wrapper that will record calls.
 
1824
 
 
1825
        The monkeypatch is automatically removed when the test concludes.
 
1826
 
 
1827
        :param obj: The namespace holding the reference to be replaced;
 
1828
            typically a module, class, or object.
 
1829
        :param attr_name: A string for the name of the attribute to 
 
1830
            patch.
 
1831
        :returns: A list that will be extended with one item every time the
 
1832
            function is called, with a tuple of (args, kwargs).
 
1833
        """
 
1834
        calls = []
 
1835
 
 
1836
        def decorator(*args, **kwargs):
 
1837
            calls.append((args, kwargs))
 
1838
            return orig(*args, **kwargs)
 
1839
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1840
        return calls
 
1841
 
1763
1842
    def _cleanEnvironment(self):
1764
1843
        for name, value in isolated_environ.iteritems():
1765
1844
            self.overrideEnv(name, value)
1772
1851
        self._preserved_lazy_hooks.clear()
1773
1852
 
1774
1853
    def knownFailure(self, reason):
1775
 
        """This test has failed for some known reason."""
1776
 
        raise KnownFailure(reason)
 
1854
        """Declare that this test fails for a known reason
 
1855
 
 
1856
        Tests that are known to fail should generally be using expectedFailure
 
1857
        with an appropriate reverse assertion if a change could cause the test
 
1858
        to start passing. Conversely if the test has no immediate prospect of
 
1859
        succeeding then using skip is more suitable.
 
1860
 
 
1861
        When this method is called while an exception is being handled, that
 
1862
        traceback will be used, otherwise a new exception will be thrown to
 
1863
        provide one but won't be reported.
 
1864
        """
 
1865
        self._add_reason(reason)
 
1866
        try:
 
1867
            exc_info = sys.exc_info()
 
1868
            if exc_info != (None, None, None):
 
1869
                self._report_traceback(exc_info)
 
1870
            else:
 
1871
                try:
 
1872
                    raise self.failureException(reason)
 
1873
                except self.failureException:
 
1874
                    exc_info = sys.exc_info()
 
1875
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1876
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1877
        finally:
 
1878
            del exc_info
1777
1879
 
1778
1880
    def _suppress_log(self):
1779
1881
        """Remove the log info from details."""
1907
2009
 
1908
2010
        self.log('run bzr: %r', args)
1909
2011
        # FIXME: don't call into logging here
1910
 
        handler = logging.StreamHandler(stderr)
1911
 
        handler.setLevel(logging.INFO)
 
2012
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
2013
            level=logging.INFO)
1912
2014
        logger = logging.getLogger('')
1913
2015
        logger.addHandler(handler)
1914
2016
        old_ui_factory = ui.ui_factory
1950
2052
        if err:
1951
2053
            self.log('errors:\n%r', err)
1952
2054
        if retcode is not None:
1953
 
            self.assertEquals(retcode, result,
 
2055
            self.assertEqual(retcode, result,
1954
2056
                              message='Unexpected return code')
1955
2057
        return result, out, err
1956
2058
 
2102
2204
 
2103
2205
        if env_changes is None:
2104
2206
            env_changes = {}
 
2207
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2208
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2209
        # gets set to the computed directory of this parent process.
 
2210
        if site.USER_BASE is not None:
 
2211
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
2105
2212
        old_env = {}
2106
2213
 
2107
2214
        def cleanup_environment():
2298
2405
        from bzrlib.smart import request
2299
2406
        request_handlers = request.request_handlers
2300
2407
        orig_method = request_handlers.get(verb)
 
2408
        orig_info = request_handlers.get_info(verb)
2301
2409
        request_handlers.remove(verb)
2302
 
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2410
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2411
            info=orig_info)
2303
2412
 
2304
2413
 
2305
2414
class CapturedCall(object):
2358
2467
        self.transport_readonly_server = None
2359
2468
        self.__vfs_server = None
2360
2469
 
 
2470
    def setUp(self):
 
2471
        super(TestCaseWithMemoryTransport, self).setUp()
 
2472
 
 
2473
        def _add_disconnect_cleanup(transport):
 
2474
            """Schedule disconnection of given transport at test cleanup
 
2475
 
 
2476
            This needs to happen for all connected transports or leaks occur.
 
2477
 
 
2478
            Note reconnections may mean we call disconnect multiple times per
 
2479
            transport which is suboptimal but seems harmless.
 
2480
            """
 
2481
            self.addCleanup(transport.disconnect)
 
2482
 
 
2483
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2484
            _add_disconnect_cleanup, None)
 
2485
 
 
2486
        self._make_test_root()
 
2487
        self.addCleanup(os.chdir, os.getcwdu())
 
2488
        self.makeAndChdirToTestDir()
 
2489
        self.overrideEnvironmentForTesting()
 
2490
        self.__readonly_server = None
 
2491
        self.__server = None
 
2492
        self.reduceLockdirTimeout()
 
2493
        # Each test may use its own config files even if the local config files
 
2494
        # don't actually exist. They'll rightly fail if they try to create them
 
2495
        # though.
 
2496
        self.overrideAttr(config, '_shared_stores', {})
 
2497
 
2361
2498
    def get_transport(self, relpath=None):
2362
2499
        """Return a writeable transport.
2363
2500
 
2366
2503
 
2367
2504
        :param relpath: a path relative to the base url.
2368
2505
        """
2369
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2506
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2370
2507
        self.assertFalse(t.is_readonly())
2371
2508
        return t
2372
2509
 
2378
2515
 
2379
2516
        :param relpath: a path relative to the base url.
2380
2517
        """
2381
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2518
        t = _mod_transport.get_transport_from_url(
 
2519
            self.get_readonly_url(relpath))
2382
2520
        self.assertTrue(t.is_readonly())
2383
2521
        return t
2384
2522
 
2505
2643
        real branch.
2506
2644
        """
2507
2645
        root = TestCaseWithMemoryTransport.TEST_ROOT
2508
 
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2646
        try:
 
2647
            # Make sure we get a readable and accessible home for .bzr.log
 
2648
            # and/or config files, and not fallback to weird defaults (see
 
2649
            # http://pad.lv/825027).
 
2650
            self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2651
            os.environ['BZR_HOME'] = root
 
2652
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2653
            del os.environ['BZR_HOME']
 
2654
        except Exception, e:
 
2655
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
2509
2656
        # Hack for speed: remember the raw bytes of the dirstate file so that
2510
2657
        # we don't need to re-open the wt to check it hasn't changed.
2511
2658
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2518
2665
        propagating. This method ensures than a test did not leaked.
2519
2666
        """
2520
2667
        root = TestCaseWithMemoryTransport.TEST_ROOT
2521
 
        t = _mod_transport.get_transport(root)
 
2668
        t = _mod_transport.get_transport_from_path(root)
2522
2669
        self.permit_url(t.base)
2523
2670
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2524
2671
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2559
2706
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2560
2707
        self.permit_dir(self.test_dir)
2561
2708
 
2562
 
    def make_branch(self, relpath, format=None):
 
2709
    def make_branch(self, relpath, format=None, name=None):
2563
2710
        """Create a branch on the transport at relpath."""
2564
2711
        repo = self.make_repository(relpath, format=format)
2565
 
        return repo.bzrdir.create_branch()
 
2712
        return repo.bzrdir.create_branch(append_revisions_only=False,
 
2713
                                         name=name)
 
2714
 
 
2715
    def get_default_format(self):
 
2716
        return 'default'
 
2717
 
 
2718
    def resolve_format(self, format):
 
2719
        """Resolve an object to a ControlDir format object.
 
2720
 
 
2721
        The initial format object can either already be
 
2722
        a ControlDirFormat, None (for the default format),
 
2723
        or a string with the name of the control dir format.
 
2724
 
 
2725
        :param format: Object to resolve
 
2726
        :return A ControlDirFormat instance
 
2727
        """
 
2728
        if format is None:
 
2729
            format = self.get_default_format()
 
2730
        if isinstance(format, basestring):
 
2731
            format = controldir.format_registry.make_bzrdir(format)
 
2732
        return format
2566
2733
 
2567
2734
    def make_bzrdir(self, relpath, format=None):
2568
2735
        try:
2572
2739
            t = _mod_transport.get_transport(maybe_a_url)
2573
2740
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2574
2741
                t.ensure_base()
2575
 
            if format is None:
2576
 
                format = 'default'
2577
 
            if isinstance(format, basestring):
2578
 
                format = bzrdir.format_registry.make_bzrdir(format)
 
2742
            format = self.resolve_format(format)
2579
2743
            return format.initialize_on_transport(t)
2580
2744
        except errors.UninitializableFormat:
2581
2745
            raise TestSkipped("Format %s is not initializable." % format)
2582
2746
 
2583
 
    def make_repository(self, relpath, shared=False, format=None):
 
2747
    def make_repository(self, relpath, shared=None, format=None):
2584
2748
        """Create a repository on our default transport at relpath.
2585
2749
 
2586
2750
        Note that relpath must be a relative path, not a full url.
2597
2761
            backing_server = self.get_server()
2598
2762
        smart_server = test_server.SmartTCPServer_for_testing()
2599
2763
        self.start_server(smart_server, backing_server)
2600
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2764
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2601
2765
                                                   ).clone(path)
2602
2766
        return remote_transport
2603
2767
 
2617
2781
        self.overrideEnv('HOME', test_home_dir)
2618
2782
        self.overrideEnv('BZR_HOME', test_home_dir)
2619
2783
 
2620
 
    def setUp(self):
2621
 
        super(TestCaseWithMemoryTransport, self).setUp()
2622
 
        # Ensure that ConnectedTransport doesn't leak sockets
2623
 
        def get_transport_with_cleanup(*args, **kwargs):
2624
 
            t = orig_get_transport(*args, **kwargs)
2625
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2626
 
                self.addCleanup(t.disconnect)
2627
 
            return t
2628
 
 
2629
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2630
 
                                               get_transport_with_cleanup)
2631
 
        self._make_test_root()
2632
 
        self.addCleanup(os.chdir, os.getcwdu())
2633
 
        self.makeAndChdirToTestDir()
2634
 
        self.overrideEnvironmentForTesting()
2635
 
        self.__readonly_server = None
2636
 
        self.__server = None
2637
 
        self.reduceLockdirTimeout()
2638
 
 
2639
2784
    def setup_smart_server_with_call_log(self):
2640
2785
        """Sets up a smart server as the transport server with a call log."""
2641
2786
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2787
        self.hpss_connections = []
2642
2788
        self.hpss_calls = []
2643
2789
        import traceback
2644
2790
        # Skip the current stack down to the caller of
2647
2793
        def capture_hpss_call(params):
2648
2794
            self.hpss_calls.append(
2649
2795
                CapturedCall(params, prefix_length))
 
2796
        def capture_connect(transport):
 
2797
            self.hpss_connections.append(transport)
2650
2798
        client._SmartClient.hooks.install_named_hook(
2651
2799
            'call', capture_hpss_call, None)
 
2800
        _mod_transport.Transport.hooks.install_named_hook(
 
2801
            'post_connect', capture_connect, None)
2652
2802
 
2653
2803
    def reset_smart_call_log(self):
2654
2804
        self.hpss_calls = []
 
2805
        self.hpss_connections = []
2655
2806
 
2656
2807
 
2657
2808
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2727
2878
        # stacking policy to honour; create a bzr dir with an unshared
2728
2879
        # repository (but not a branch - our code would be trying to escape
2729
2880
        # then!) to stop them, and permit it to be read.
2730
 
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2881
        # control = controldir.ControlDir.create(self.test_base_dir)
2731
2882
        # control.create_repository()
2732
2883
        self.test_home_dir = self.test_base_dir + '/home'
2733
2884
        os.mkdir(self.test_home_dir)
2768
2919
                "a list or a tuple. Got %r instead" % (shape,))
2769
2920
        # It's OK to just create them using forward slashes on windows.
2770
2921
        if transport is None or transport.is_readonly():
2771
 
            transport = _mod_transport.get_transport(".")
 
2922
            transport = _mod_transport.get_transport_from_path(".")
2772
2923
        for name in shape:
2773
2924
            self.assertIsInstance(name, basestring)
2774
2925
            if name[-1] == '/':
2822
2973
    readwrite one must both define get_url() as resolving to os.getcwd().
2823
2974
    """
2824
2975
 
 
2976
    def setUp(self):
 
2977
        super(TestCaseWithTransport, self).setUp()
 
2978
        self.__vfs_server = None
 
2979
 
2825
2980
    def get_vfs_only_server(self):
2826
2981
        """See TestCaseWithMemoryTransport.
2827
2982
 
2859
3014
        # this obviously requires a format that supports branch references
2860
3015
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2861
3016
        # RBC 20060208
 
3017
        format = self.resolve_format(format=format)
 
3018
        if not format.supports_workingtrees:
 
3019
            b = self.make_branch(relpath+'.branch', format=format)
 
3020
            return b.create_checkout(relpath, lightweight=True)
2862
3021
        b = self.make_branch(relpath, format=format)
2863
3022
        try:
2864
3023
            return b.bzrdir.create_workingtree()
2869
3028
            if self.vfs_transport_factory is test_server.LocalURLServer:
2870
3029
                # the branch is colocated on disk, we cannot create a checkout.
2871
3030
                # hopefully callers will expect this.
2872
 
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
3031
                local_controldir = controldir.ControlDir.open(
 
3032
                    self.get_vfs_only_url(relpath))
2873
3033
                wt = local_controldir.create_workingtree()
2874
3034
                if wt.branch._format != b._format:
2875
3035
                    wt._branch = b
2905
3065
        self.assertFalse(differences.has_changed(),
2906
3066
            "Trees %r and %r are different: %r" % (left, right, differences))
2907
3067
 
2908
 
    def setUp(self):
2909
 
        super(TestCaseWithTransport, self).setUp()
2910
 
        self.__vfs_server = None
2911
 
 
2912
3068
    def disable_missing_extensions_warning(self):
2913
3069
        """Some tests expect a precise stderr content.
2914
3070
 
2915
3071
        There is no point in forcing them to duplicate the extension related
2916
3072
        warning.
2917
3073
        """
2918
 
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3074
        config.GlobalStack().set('ignore_missing_extensions', True)
2919
3075
 
2920
3076
 
2921
3077
class ChrootedTestCase(TestCaseWithTransport):
3163
3319
                            result_decorators=result_decorators,
3164
3320
                            )
3165
3321
    runner.stop_on_failure=stop_on_failure
 
3322
    if isinstance(suite, unittest.TestSuite):
 
3323
        # Empty out _tests list of passed suite and populate new TestSuite
 
3324
        suite._tests[:], suite = [], TestSuite(suite)
3166
3325
    # built in decorator factories:
3167
3326
    decorators = [
3168
3327
        random_order(random_seed, runner),
3266
3425
 
3267
3426
class TestDecorator(TestUtil.TestSuite):
3268
3427
    """A decorator for TestCase/TestSuite objects.
3269
 
    
3270
 
    Usually, subclasses should override __iter__(used when flattening test
3271
 
    suites), which we do to filter, reorder, parallelise and so on, run() and
3272
 
    debug().
 
3428
 
 
3429
    Contains rather than flattening suite passed on construction
3273
3430
    """
3274
3431
 
3275
 
    def __init__(self, suite):
3276
 
        TestUtil.TestSuite.__init__(self)
3277
 
        self.addTest(suite)
3278
 
 
3279
 
    def countTestCases(self):
3280
 
        cases = 0
3281
 
        for test in self:
3282
 
            cases += test.countTestCases()
3283
 
        return cases
3284
 
 
3285
 
    def debug(self):
3286
 
        for test in self:
3287
 
            test.debug()
3288
 
 
3289
 
    def run(self, result):
3290
 
        # Use iteration on self, not self._tests, to allow subclasses to hook
3291
 
        # into __iter__.
3292
 
        for test in self:
3293
 
            if result.shouldStop:
3294
 
                break
3295
 
            test.run(result)
3296
 
        return result
 
3432
    def __init__(self, suite=None):
 
3433
        super(TestDecorator, self).__init__()
 
3434
        if suite is not None:
 
3435
            self.addTest(suite)
 
3436
 
 
3437
    # Don't need subclass run method with suite emptying
 
3438
    run = unittest.TestSuite.run
3297
3439
 
3298
3440
 
3299
3441
class CountingDecorator(TestDecorator):
3310
3452
    """A decorator which excludes test matching an exclude pattern."""
3311
3453
 
3312
3454
    def __init__(self, suite, exclude_pattern):
3313
 
        TestDecorator.__init__(self, suite)
3314
 
        self.exclude_pattern = exclude_pattern
3315
 
        self.excluded = False
3316
 
 
3317
 
    def __iter__(self):
3318
 
        if self.excluded:
3319
 
            return iter(self._tests)
3320
 
        self.excluded = True
3321
 
        suite = exclude_tests_by_re(self, self.exclude_pattern)
3322
 
        del self._tests[:]
3323
 
        self.addTests(suite)
3324
 
        return iter(self._tests)
 
3455
        super(ExcludeDecorator, self).__init__(
 
3456
            exclude_tests_by_re(suite, exclude_pattern))
3325
3457
 
3326
3458
 
3327
3459
class FilterTestsDecorator(TestDecorator):
3328
3460
    """A decorator which filters tests to those matching a pattern."""
3329
3461
 
3330
3462
    def __init__(self, suite, pattern):
3331
 
        TestDecorator.__init__(self, suite)
3332
 
        self.pattern = pattern
3333
 
        self.filtered = False
3334
 
 
3335
 
    def __iter__(self):
3336
 
        if self.filtered:
3337
 
            return iter(self._tests)
3338
 
        self.filtered = True
3339
 
        suite = filter_suite_by_re(self, self.pattern)
3340
 
        del self._tests[:]
3341
 
        self.addTests(suite)
3342
 
        return iter(self._tests)
 
3463
        super(FilterTestsDecorator, self).__init__(
 
3464
            filter_suite_by_re(suite, pattern))
3343
3465
 
3344
3466
 
3345
3467
class RandomDecorator(TestDecorator):
3346
3468
    """A decorator which randomises the order of its tests."""
3347
3469
 
3348
3470
    def __init__(self, suite, random_seed, stream):
3349
 
        TestDecorator.__init__(self, suite)
3350
 
        self.random_seed = random_seed
3351
 
        self.randomised = False
3352
 
        self.stream = stream
3353
 
 
3354
 
    def __iter__(self):
3355
 
        if self.randomised:
3356
 
            return iter(self._tests)
3357
 
        self.randomised = True
3358
 
        self.stream.write("Randomizing test order using seed %s\n\n" %
3359
 
            (self.actual_seed()))
 
3471
        random_seed = self.actual_seed(random_seed)
 
3472
        stream.write("Randomizing test order using seed %s\n\n" %
 
3473
            (random_seed,))
3360
3474
        # Initialise the random number generator.
3361
 
        random.seed(self.actual_seed())
3362
 
        suite = randomize_suite(self)
3363
 
        del self._tests[:]
3364
 
        self.addTests(suite)
3365
 
        return iter(self._tests)
 
3475
        random.seed(random_seed)
 
3476
        super(RandomDecorator, self).__init__(randomize_suite(suite))
3366
3477
 
3367
 
    def actual_seed(self):
3368
 
        if self.random_seed == "now":
 
3478
    @staticmethod
 
3479
    def actual_seed(seed):
 
3480
        if seed == "now":
3369
3481
            # We convert the seed to a long to make it reuseable across
3370
3482
            # invocations (because the user can reenter it).
3371
 
            self.random_seed = long(time.time())
 
3483
            return long(time.time())
3372
3484
        else:
3373
3485
            # Convert the seed to a long if we can
3374
3486
            try:
3375
 
                self.random_seed = long(self.random_seed)
3376
 
            except:
 
3487
                return long(seed)
 
3488
            except (TypeError, ValueError):
3377
3489
                pass
3378
 
        return self.random_seed
 
3490
        return seed
3379
3491
 
3380
3492
 
3381
3493
class TestFirstDecorator(TestDecorator):
3382
3494
    """A decorator which moves named tests to the front."""
3383
3495
 
3384
3496
    def __init__(self, suite, pattern):
3385
 
        TestDecorator.__init__(self, suite)
3386
 
        self.pattern = pattern
3387
 
        self.filtered = False
3388
 
 
3389
 
    def __iter__(self):
3390
 
        if self.filtered:
3391
 
            return iter(self._tests)
3392
 
        self.filtered = True
3393
 
        suites = split_suite_by_re(self, self.pattern)
3394
 
        del self._tests[:]
3395
 
        self.addTests(suites)
3396
 
        return iter(self._tests)
 
3497
        super(TestFirstDecorator, self).__init__()
 
3498
        self.addTests(split_suite_by_re(suite, pattern))
3397
3499
 
3398
3500
 
3399
3501
def partition_tests(suite, count):
3431
3533
    """
3432
3534
    concurrency = osutils.local_concurrency()
3433
3535
    result = []
3434
 
    from subunit import TestProtocolClient, ProtocolTestCase
 
3536
    from subunit import ProtocolTestCase
3435
3537
    from subunit.test_results import AutoTimingTestResultDecorator
3436
3538
    class TestInOtherProcess(ProtocolTestCase):
3437
3539
        # Should be in subunit, I think. RBC.
3443
3545
            try:
3444
3546
                ProtocolTestCase.run(self, result)
3445
3547
            finally:
3446
 
                os.waitpid(self.pid, 0)
 
3548
                pid, status = os.waitpid(self.pid, 0)
 
3549
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3550
            #                that something went wrong.
3447
3551
 
3448
3552
    test_blocks = partition_tests(suite, concurrency)
 
3553
    # Clear the tests from the original suite so it doesn't keep them alive
 
3554
    suite._tests[:] = []
3449
3555
    for process_tests in test_blocks:
3450
 
        process_suite = TestUtil.TestSuite()
3451
 
        process_suite.addTests(process_tests)
 
3556
        process_suite = TestUtil.TestSuite(process_tests)
 
3557
        # Also clear each split list so new suite has only reference
 
3558
        process_tests[:] = []
3452
3559
        c2pread, c2pwrite = os.pipe()
3453
3560
        pid = os.fork()
3454
3561
        if pid == 0:
3455
 
            workaround_zealous_crypto_random()
3456
3562
            try:
 
3563
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3564
                workaround_zealous_crypto_random()
3457
3565
                os.close(c2pread)
3458
3566
                # Leave stderr and stdout open so we can see test noise
3459
3567
                # Close stdin so that the child goes away if it decides to
3460
3568
                # read from stdin (otherwise its a roulette to see what
3461
3569
                # child actually gets keystrokes for pdb etc).
3462
3570
                sys.stdin.close()
3463
 
                sys.stdin = None
3464
 
                stream = os.fdopen(c2pwrite, 'wb', 1)
3465
3571
                subunit_result = AutoTimingTestResultDecorator(
3466
 
                    TestProtocolClient(stream))
 
3572
                    SubUnitBzrProtocolClient(stream))
3467
3573
                process_suite.run(subunit_result)
3468
 
            finally:
3469
 
                os._exit(0)
 
3574
            except:
 
3575
                # Try and report traceback on stream, but exit with error even
 
3576
                # if stream couldn't be created or something else goes wrong.
 
3577
                # The traceback is formatted to a string and written in one go
 
3578
                # to avoid interleaving lines from multiple failing children.
 
3579
                try:
 
3580
                    stream.write(traceback.format_exc())
 
3581
                finally:
 
3582
                    os._exit(1)
 
3583
            os._exit(0)
3470
3584
        else:
3471
3585
            os.close(c2pwrite)
3472
3586
            stream = os.fdopen(c2pread, 'rb', 1)
3578
3692
#                           with proper exclusion rules.
3579
3693
#   -Ethreads               Will display thread ident at creation/join time to
3580
3694
#                           help track thread leaks
3581
 
 
 
3695
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3696
#                           deallocated after being completed.
3582
3697
#   -Econfig_stats          Will collect statistics using addDetail
3583
3698
selftest_debug_flags = set()
3584
3699
 
3689
3804
 
3690
3805
    :return: (absents, duplicates) absents is a list containing the test found
3691
3806
        in id_list but not in test_suite, duplicates is a list containing the
3692
 
        test found multiple times in test_suite.
 
3807
        tests found multiple times in test_suite.
3693
3808
 
3694
3809
    When using a prefined test id list, it may occurs that some tests do not
3695
3810
    exist anymore or that some tests use the same id. This function warns the
3819
3934
        'bzrlib.doc',
3820
3935
        'bzrlib.tests.blackbox',
3821
3936
        'bzrlib.tests.commands',
3822
 
        'bzrlib.tests.doc_generate',
3823
3937
        'bzrlib.tests.per_branch',
3824
3938
        'bzrlib.tests.per_bzrdir',
3825
3939
        'bzrlib.tests.per_controldir',
3889
4003
        'bzrlib.tests.test_email_message',
3890
4004
        'bzrlib.tests.test_eol_filters',
3891
4005
        'bzrlib.tests.test_errors',
 
4006
        'bzrlib.tests.test_estimate_compressed_size',
3892
4007
        'bzrlib.tests.test_export',
3893
4008
        'bzrlib.tests.test_export_pot',
3894
4009
        'bzrlib.tests.test_extract',
 
4010
        'bzrlib.tests.test_features',
3895
4011
        'bzrlib.tests.test_fetch',
3896
4012
        'bzrlib.tests.test_fixtures',
3897
4013
        'bzrlib.tests.test_fifo_cache',
3898
4014
        'bzrlib.tests.test_filters',
 
4015
        'bzrlib.tests.test_filter_tree',
3899
4016
        'bzrlib.tests.test_ftp_transport',
3900
4017
        'bzrlib.tests.test_foreign',
3901
4018
        'bzrlib.tests.test_generate_docs',
3910
4027
        'bzrlib.tests.test_http',
3911
4028
        'bzrlib.tests.test_http_response',
3912
4029
        'bzrlib.tests.test_https_ca_bundle',
 
4030
        'bzrlib.tests.test_https_urllib',
3913
4031
        'bzrlib.tests.test_i18n',
3914
4032
        'bzrlib.tests.test_identitymap',
3915
4033
        'bzrlib.tests.test_ignores',
3964
4082
        'bzrlib.tests.test_revisiontree',
3965
4083
        'bzrlib.tests.test_rio',
3966
4084
        'bzrlib.tests.test_rules',
 
4085
        'bzrlib.tests.test_url_policy_open',
3967
4086
        'bzrlib.tests.test_sampler',
3968
4087
        'bzrlib.tests.test_scenarios',
3969
4088
        'bzrlib.tests.test_script',
3976
4095
        'bzrlib.tests.test_smart',
3977
4096
        'bzrlib.tests.test_smart_add',
3978
4097
        'bzrlib.tests.test_smart_request',
 
4098
        'bzrlib.tests.test_smart_signals',
3979
4099
        'bzrlib.tests.test_smart_transport',
3980
4100
        'bzrlib.tests.test_smtp_connection',
3981
4101
        'bzrlib.tests.test_source',
4012
4132
        'bzrlib.tests.test_version',
4013
4133
        'bzrlib.tests.test_version_info',
4014
4134
        'bzrlib.tests.test_versionedfile',
 
4135
        'bzrlib.tests.test_vf_search',
4015
4136
        'bzrlib.tests.test_weave',
4016
4137
        'bzrlib.tests.test_whitebox',
4017
4138
        'bzrlib.tests.test_win32utils',
4232
4353
    """Copy test and apply scenario to it.
4233
4354
 
4234
4355
    :param test: A test to adapt.
4235
 
    :param scenario: A tuple describing the scenarion.
 
4356
    :param scenario: A tuple describing the scenario.
4236
4357
        The first element of the tuple is the new test id.
4237
4358
        The second element is a dict containing attributes to set on the
4238
4359
        test.
4292
4413
        the module is available.
4293
4414
    """
4294
4415
 
 
4416
    from bzrlib.tests.features import ModuleAvailableFeature
4295
4417
    py_module = pyutils.get_named_object(py_module_name)
4296
4418
    scenarios = [
4297
4419
        ('python', {'module': py_module}),
4338
4460
                         % (os.path.basename(dirname), printable_e))
4339
4461
 
4340
4462
 
4341
 
class Feature(object):
4342
 
    """An operating system Feature."""
4343
 
 
4344
 
    def __init__(self):
4345
 
        self._available = None
4346
 
 
4347
 
    def available(self):
4348
 
        """Is the feature available?
4349
 
 
4350
 
        :return: True if the feature is available.
4351
 
        """
4352
 
        if self._available is None:
4353
 
            self._available = self._probe()
4354
 
        return self._available
4355
 
 
4356
 
    def _probe(self):
4357
 
        """Implement this method in concrete features.
4358
 
 
4359
 
        :return: True if the feature is available.
4360
 
        """
4361
 
        raise NotImplementedError
4362
 
 
4363
 
    def __str__(self):
4364
 
        if getattr(self, 'feature_name', None):
4365
 
            return self.feature_name()
4366
 
        return self.__class__.__name__
4367
 
 
4368
 
 
4369
 
class _SymlinkFeature(Feature):
4370
 
 
4371
 
    def _probe(self):
4372
 
        return osutils.has_symlinks()
4373
 
 
4374
 
    def feature_name(self):
4375
 
        return 'symlinks'
4376
 
 
4377
 
SymlinkFeature = _SymlinkFeature()
4378
 
 
4379
 
 
4380
 
class _HardlinkFeature(Feature):
4381
 
 
4382
 
    def _probe(self):
4383
 
        return osutils.has_hardlinks()
4384
 
 
4385
 
    def feature_name(self):
4386
 
        return 'hardlinks'
4387
 
 
4388
 
HardlinkFeature = _HardlinkFeature()
4389
 
 
4390
 
 
4391
 
class _OsFifoFeature(Feature):
4392
 
 
4393
 
    def _probe(self):
4394
 
        return getattr(os, 'mkfifo', None)
4395
 
 
4396
 
    def feature_name(self):
4397
 
        return 'filesystem fifos'
4398
 
 
4399
 
OsFifoFeature = _OsFifoFeature()
4400
 
 
4401
 
 
4402
 
class _UnicodeFilenameFeature(Feature):
4403
 
    """Does the filesystem support Unicode filenames?"""
4404
 
 
4405
 
    def _probe(self):
4406
 
        try:
4407
 
            # Check for character combinations unlikely to be covered by any
4408
 
            # single non-unicode encoding. We use the characters
4409
 
            # - greek small letter alpha (U+03B1) and
4410
 
            # - braille pattern dots-123456 (U+283F).
4411
 
            os.stat(u'\u03b1\u283f')
4412
 
        except UnicodeEncodeError:
4413
 
            return False
4414
 
        except (IOError, OSError):
4415
 
            # The filesystem allows the Unicode filename but the file doesn't
4416
 
            # exist.
4417
 
            return True
4418
 
        else:
4419
 
            # The filesystem allows the Unicode filename and the file exists,
4420
 
            # for some reason.
4421
 
            return True
4422
 
 
4423
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4424
 
 
4425
 
 
4426
 
class _CompatabilityThunkFeature(Feature):
4427
 
    """This feature is just a thunk to another feature.
4428
 
 
4429
 
    It issues a deprecation warning if it is accessed, to let you know that you
4430
 
    should really use a different feature.
4431
 
    """
4432
 
 
4433
 
    def __init__(self, dep_version, module, name,
4434
 
                 replacement_name, replacement_module=None):
4435
 
        super(_CompatabilityThunkFeature, self).__init__()
4436
 
        self._module = module
4437
 
        if replacement_module is None:
4438
 
            replacement_module = module
4439
 
        self._replacement_module = replacement_module
4440
 
        self._name = name
4441
 
        self._replacement_name = replacement_name
4442
 
        self._dep_version = dep_version
4443
 
        self._feature = None
4444
 
 
4445
 
    def _ensure(self):
4446
 
        if self._feature is None:
4447
 
            depr_msg = self._dep_version % ('%s.%s'
4448
 
                                            % (self._module, self._name))
4449
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4450
 
                                               self._replacement_name)
4451
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4452
 
            # Import the new feature and use it as a replacement for the
4453
 
            # deprecated one.
4454
 
            self._feature = pyutils.get_named_object(
4455
 
                self._replacement_module, self._replacement_name)
4456
 
 
4457
 
    def _probe(self):
4458
 
        self._ensure()
4459
 
        return self._feature._probe()
4460
 
 
4461
 
 
4462
 
class ModuleAvailableFeature(Feature):
4463
 
    """This is a feature than describes a module we want to be available.
4464
 
 
4465
 
    Declare the name of the module in __init__(), and then after probing, the
4466
 
    module will be available as 'self.module'.
4467
 
 
4468
 
    :ivar module: The module if it is available, else None.
4469
 
    """
4470
 
 
4471
 
    def __init__(self, module_name):
4472
 
        super(ModuleAvailableFeature, self).__init__()
4473
 
        self.module_name = module_name
4474
 
 
4475
 
    def _probe(self):
4476
 
        try:
4477
 
            self._module = __import__(self.module_name, {}, {}, [''])
4478
 
            return True
4479
 
        except ImportError:
4480
 
            return False
4481
 
 
4482
 
    @property
4483
 
    def module(self):
4484
 
        if self.available(): # Make sure the probe has been done
4485
 
            return self._module
4486
 
        return None
4487
 
 
4488
 
    def feature_name(self):
4489
 
        return self.module_name
4490
 
 
4491
 
 
4492
4463
def probe_unicode_in_user_encoding():
4493
4464
    """Try to encode several unicode strings to use in unicode-aware tests.
4494
4465
    Return first successfull match.
4522
4493
    return None
4523
4494
 
4524
4495
 
4525
 
class _HTTPSServerFeature(Feature):
4526
 
    """Some tests want an https Server, check if one is available.
4527
 
 
4528
 
    Right now, the only way this is available is under python2.6 which provides
4529
 
    an ssl module.
4530
 
    """
4531
 
 
4532
 
    def _probe(self):
4533
 
        try:
4534
 
            import ssl
4535
 
            return True
4536
 
        except ImportError:
4537
 
            return False
4538
 
 
4539
 
    def feature_name(self):
4540
 
        return 'HTTPSServer'
4541
 
 
4542
 
 
4543
 
HTTPSServerFeature = _HTTPSServerFeature()
4544
 
 
4545
 
 
4546
 
class _UnicodeFilename(Feature):
4547
 
    """Does the filesystem support Unicode filenames?"""
4548
 
 
4549
 
    def _probe(self):
4550
 
        try:
4551
 
            os.stat(u'\u03b1')
4552
 
        except UnicodeEncodeError:
4553
 
            return False
4554
 
        except (IOError, OSError):
4555
 
            # The filesystem allows the Unicode filename but the file doesn't
4556
 
            # exist.
4557
 
            return True
4558
 
        else:
4559
 
            # The filesystem allows the Unicode filename and the file exists,
4560
 
            # for some reason.
4561
 
            return True
4562
 
 
4563
 
UnicodeFilename = _UnicodeFilename()
4564
 
 
4565
 
 
4566
 
class _ByteStringNamedFilesystem(Feature):
4567
 
    """Is the filesystem based on bytes?"""
4568
 
 
4569
 
    def _probe(self):
4570
 
        if os.name == "posix":
4571
 
            return True
4572
 
        return False
4573
 
 
4574
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4575
 
 
4576
 
 
4577
 
class _UTF8Filesystem(Feature):
4578
 
    """Is the filesystem UTF-8?"""
4579
 
 
4580
 
    def _probe(self):
4581
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4582
 
            return True
4583
 
        return False
4584
 
 
4585
 
UTF8Filesystem = _UTF8Filesystem()
4586
 
 
4587
 
 
4588
 
class _BreakinFeature(Feature):
4589
 
    """Does this platform support the breakin feature?"""
4590
 
 
4591
 
    def _probe(self):
4592
 
        from bzrlib import breakin
4593
 
        if breakin.determine_signal() is None:
4594
 
            return False
4595
 
        if sys.platform == 'win32':
4596
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4597
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4598
 
            # access the function
4599
 
            try:
4600
 
                import ctypes
4601
 
            except OSError:
4602
 
                return False
4603
 
        return True
4604
 
 
4605
 
    def feature_name(self):
4606
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4607
 
 
4608
 
 
4609
 
BreakinFeature = _BreakinFeature()
4610
 
 
4611
 
 
4612
 
class _CaseInsCasePresFilenameFeature(Feature):
4613
 
    """Is the file-system case insensitive, but case-preserving?"""
4614
 
 
4615
 
    def _probe(self):
4616
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4617
 
        try:
4618
 
            # first check truly case-preserving for created files, then check
4619
 
            # case insensitive when opening existing files.
4620
 
            name = osutils.normpath(name)
4621
 
            base, rel = osutils.split(name)
4622
 
            found_rel = osutils.canonical_relpath(base, name)
4623
 
            return (found_rel == rel
4624
 
                    and os.path.isfile(name.upper())
4625
 
                    and os.path.isfile(name.lower()))
4626
 
        finally:
4627
 
            os.close(fileno)
4628
 
            os.remove(name)
4629
 
 
4630
 
    def feature_name(self):
4631
 
        return "case-insensitive case-preserving filesystem"
4632
 
 
4633
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4634
 
 
4635
 
 
4636
 
class _CaseInsensitiveFilesystemFeature(Feature):
4637
 
    """Check if underlying filesystem is case-insensitive but *not* case
4638
 
    preserving.
4639
 
    """
4640
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4641
 
    # more likely to be case preserving, so this case is rare.
4642
 
 
4643
 
    def _probe(self):
4644
 
        if CaseInsCasePresFilenameFeature.available():
4645
 
            return False
4646
 
 
4647
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4648
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4649
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4650
 
        else:
4651
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4652
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4653
 
            dir=root)
4654
 
        name_a = osutils.pathjoin(tdir, 'a')
4655
 
        name_A = osutils.pathjoin(tdir, 'A')
4656
 
        os.mkdir(name_a)
4657
 
        result = osutils.isdir(name_A)
4658
 
        _rmtree_temp_dir(tdir)
4659
 
        return result
4660
 
 
4661
 
    def feature_name(self):
4662
 
        return 'case-insensitive filesystem'
4663
 
 
4664
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4665
 
 
4666
 
 
4667
 
class _CaseSensitiveFilesystemFeature(Feature):
4668
 
 
4669
 
    def _probe(self):
4670
 
        if CaseInsCasePresFilenameFeature.available():
4671
 
            return False
4672
 
        elif CaseInsensitiveFilesystemFeature.available():
4673
 
            return False
4674
 
        else:
4675
 
            return True
4676
 
 
4677
 
    def feature_name(self):
4678
 
        return 'case-sensitive filesystem'
4679
 
 
4680
 
# new coding style is for feature instances to be lowercase
4681
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4682
 
 
4683
 
 
4684
4496
# Only define SubUnitBzrRunner if subunit is available.
4685
4497
try:
4686
4498
    from subunit import TestProtocolClient
4687
4499
    from subunit.test_results import AutoTimingTestResultDecorator
4688
4500
    class SubUnitBzrProtocolClient(TestProtocolClient):
4689
4501
 
 
4502
        def stopTest(self, test):
 
4503
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4504
            _clear__type_equality_funcs(test)
 
4505
 
4690
4506
        def addSuccess(self, test, details=None):
4691
4507
            # The subunit client always includes the details in the subunit
4692
4508
            # stream, but we don't want to include it in ours.
4704
4520
except ImportError:
4705
4521
    pass
4706
4522
 
4707
 
class _PosixPermissionsFeature(Feature):
4708
 
 
4709
 
    def _probe(self):
4710
 
        def has_perms():
4711
 
            # create temporary file and check if specified perms are maintained.
4712
 
            import tempfile
4713
 
 
4714
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4715
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4716
 
            fd, name = f
4717
 
            os.close(fd)
4718
 
            os.chmod(name, write_perms)
4719
 
 
4720
 
            read_perms = os.stat(name).st_mode & 0777
4721
 
            os.unlink(name)
4722
 
            return (write_perms == read_perms)
4723
 
 
4724
 
        return (os.name == 'posix') and has_perms()
4725
 
 
4726
 
    def feature_name(self):
4727
 
        return 'POSIX permissions support'
4728
 
 
4729
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4523
 
 
4524
# API compatibility for old plugins; see bug 892622.
 
4525
for name in [
 
4526
    'Feature',
 
4527
    'HTTPServerFeature', 
 
4528
    'ModuleAvailableFeature',
 
4529
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4530
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4531
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4532
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4533
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4534
    'posix_permissions_feature',
 
4535
    ]:
 
4536
    globals()[name] = _CompatabilityThunkFeature(
 
4537
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4538
        'bzrlib.tests', name,
 
4539
        name, 'bzrlib.tests.features')
 
4540
 
 
4541
 
 
4542
for (old_name, new_name) in [
 
4543
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4544
    ]:
 
4545
    globals()[name] = _CompatabilityThunkFeature(
 
4546
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4547
        'bzrlib.tests', old_name,
 
4548
        new_name, 'bzrlib.tests.features')