~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2011-02-11 17:58:56 UTC
  • mto: This revision was merged to the branch mainline in revision 5662.
  • Revision ID: jelmer@samba.org-20110211175856-alncdirjmi75qr67
Add 'WorkingTreeFormat.missing_parent_conflicts' flag to use in tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
61
67
    chk_map,
62
68
    commands as _mod_commands,
63
69
    config,
64
 
    i18n,
65
70
    debug,
66
71
    errors,
67
72
    hooks,
89
94
    memory,
90
95
    pathfilter,
91
96
    )
92
 
from bzrlib.symbol_versioning import (
93
 
    deprecated_function,
94
 
    deprecated_in,
95
 
    )
96
97
from bzrlib.tests import (
97
98
    test_server,
98
99
    TestUtil,
141
142
    'BZREMAIL': None, # may still be present in the environment
142
143
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
144
    'BZR_PROGRESS_BAR': None,
144
 
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
 
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
146
 
    # TestCase should not use disk resources, BZR_LOG is one.
147
 
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
145
    'BZR_LOG': None,
148
146
    'BZR_PLUGIN_PATH': None,
149
147
    'BZR_DISABLE_PLUGINS': None,
150
148
    'BZR_PLUGINS_AT': None,
379
377
        if isinstance(test, TestCase):
380
378
            test.addCleanup(self._check_leaked_threads, test)
381
379
 
382
 
    def stopTest(self, test):
383
 
        super(ExtendedTestResult, self).stopTest(test)
384
 
        # Manually break cycles, means touching various private things but hey
385
 
        getDetails = getattr(test, "getDetails", None)
386
 
        if getDetails is not None:
387
 
            getDetails().clear()
388
 
        # Clear _type_equality_funcs to try to stop TestCase instances
389
 
        # from wasting memory. 'clear' is not available in all Python
390
 
        # versions (bug 809048)
391
 
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
 
        if type_equality_funcs is not None:
393
 
            tef_clear = getattr(type_equality_funcs, "clear", None)
394
 
            if tef_clear is None:
395
 
                tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
 
                if tef_instance_dict is not None:
397
 
                    tef_clear = tef_instance_dict.clear
398
 
            if tef_clear is not None:
399
 
                tef_clear()
400
 
        self._traceback_from_test = None
401
 
 
402
380
    def startTests(self):
403
381
        self.report_tests_starting()
404
382
        self._active_threads = threading.enumerate()
405
383
 
 
384
    def stopTest(self, test):
 
385
        self._traceback_from_test = None
 
386
 
406
387
    def _check_leaked_threads(self, test):
407
388
        """See if any threads have leaked since last call
408
389
 
468
449
        self.known_failure_count += 1
469
450
        self.report_known_failure(test, err)
470
451
 
471
 
    def addUnexpectedSuccess(self, test, details=None):
472
 
        """Tell result the test unexpectedly passed, counting as a failure
473
 
 
474
 
        When the minimum version of testtools required becomes 0.9.8 this
475
 
        can be updated to use the new handling there.
476
 
        """
477
 
        super(ExtendedTestResult, self).addFailure(test, details=details)
478
 
        self.failure_count += 1
479
 
        self.report_unexpected_success(test,
480
 
            "".join(details["reason"].iter_text()))
481
 
        if self.stop_early:
482
 
            self.stop()
483
 
 
484
452
    def addNotSupported(self, test, feature):
485
453
        """The test will not be run because of a missing feature.
486
454
        """
645
613
    def report_known_failure(self, test, err):
646
614
        pass
647
615
 
648
 
    def report_unexpected_success(self, test, reason):
649
 
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
650
 
            self._test_description(test),
651
 
            "Unexpected success. Should have failed",
652
 
            reason,
653
 
            ))
654
 
 
655
616
    def report_skip(self, test, reason):
656
617
        pass
657
618
 
709
670
                % (self._testTimeString(test),
710
671
                   self._error_summary(err)))
711
672
 
712
 
    def report_unexpected_success(self, test, reason):
713
 
        self.stream.write(' FAIL %s\n%s: %s\n'
714
 
                % (self._testTimeString(test),
715
 
                   "Unexpected success. Should have failed",
716
 
                   reason))
717
 
 
718
673
    def report_success(self, test):
719
674
        self.stream.write('   OK %s\n' % self._testTimeString(test))
720
675
        for bench_called, stats in getattr(test, '_benchcalls', []):
939
894
 
940
895
    The method is really a factory and users are expected to use it as such.
941
896
    """
942
 
 
 
897
    
943
898
    kwargs['setUp'] = isolated_doctest_setUp
944
899
    kwargs['tearDown'] = isolated_doctest_tearDown
945
900
    return doctest.DocTestSuite(*args, **kwargs)
982
937
        super(TestCase, self).setUp()
983
938
        for feature in getattr(self, '_test_needs_features', []):
984
939
            self.requireFeature(feature)
 
940
        self._log_contents = None
 
941
        self.addDetail("log", content.Content(content.ContentType("text",
 
942
            "plain", {"charset": "utf8"}),
 
943
            lambda:[self._get_log(keep_log_file=True)]))
985
944
        self._cleanEnvironment()
986
945
        self._silenceUI()
987
946
        self._startLogFile()
995
954
        # between tests.  We should get rid of this altogether: bug 656694. --
996
955
        # mbp 20101008
997
956
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
998
 
        # Isolate config option expansion until its default value for bzrlib is
999
 
        # settled on or a the FIXME associated with _get_expand_default_value
1000
 
        # is addressed -- vila 20110219
1001
 
        self.overrideAttr(config, '_expand_default_value', None)
1002
 
        self._log_files = set()
1003
 
        # Each key in the ``_counters`` dict holds a value for a different
1004
 
        # counter. When the test ends, addDetail() should be used to output the
1005
 
        # counter values. This happens in install_counter_hook().
1006
 
        self._counters = {}
1007
 
        if 'config_stats' in selftest_debug_flags:
1008
 
            self._install_config_stats_hooks()
1009
 
        # Do not use i18n for tests (unless the test reverses this)
1010
 
        i18n.disable_i18n()
1011
957
 
1012
958
    def debug(self):
1013
959
        # debug a frame up.
1014
960
        import pdb
1015
 
        # The sys preserved stdin/stdout should allow blackbox tests debugging
1016
 
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
 
                ).set_trace(sys._getframe().f_back)
 
961
        pdb.Pdb().set_trace(sys._getframe().f_back)
1018
962
 
1019
963
    def discardDetail(self, name):
1020
964
        """Extend the addDetail, getDetails api so we can remove a detail.
1032
976
        if name in details:
1033
977
            del details[name]
1034
978
 
1035
 
    def install_counter_hook(self, hooks, name, counter_name=None):
1036
 
        """Install a counting hook.
1037
 
 
1038
 
        Any hook can be counted as long as it doesn't need to return a value.
1039
 
 
1040
 
        :param hooks: Where the hook should be installed.
1041
 
 
1042
 
        :param name: The hook name that will be counted.
1043
 
 
1044
 
        :param counter_name: The counter identifier in ``_counters``, defaults
1045
 
            to ``name``.
1046
 
        """
1047
 
        _counters = self._counters # Avoid closing over self
1048
 
        if counter_name is None:
1049
 
            counter_name = name
1050
 
        if _counters.has_key(counter_name):
1051
 
            raise AssertionError('%s is already used as a counter name'
1052
 
                                  % (counter_name,))
1053
 
        _counters[counter_name] = 0
1054
 
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
 
            lambda: ['%d' % (_counters[counter_name],)]))
1056
 
        def increment_counter(*args, **kwargs):
1057
 
            _counters[counter_name] += 1
1058
 
        label = 'count %s calls' % (counter_name,)
1059
 
        hooks.install_named_hook(name, increment_counter, label)
1060
 
        self.addCleanup(hooks.uninstall_named_hook, name, label)
1061
 
 
1062
 
    def _install_config_stats_hooks(self):
1063
 
        """Install config hooks to count hook calls.
1064
 
 
1065
 
        """
1066
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
 
            self.install_counter_hook(config.ConfigHooks, hook_name,
1068
 
                                       'config.%s' % (hook_name,))
1069
 
 
1070
 
        # The OldConfigHooks are private and need special handling to protect
1071
 
        # against recursive tests (tests that run other tests), so we just do
1072
 
        # manually what registering them into _builtin_known_hooks will provide
1073
 
        # us.
1074
 
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
 
            self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
 
                                      'old_config.%s' % (hook_name,))
1078
 
 
1079
979
    def _clear_debug_flags(self):
1080
980
        """Prevent externally set debug flags affecting tests.
1081
981
 
1096
996
        for key, (parent, name) in known_hooks.iter_parent_objects():
1097
997
            current_hooks = getattr(parent, name)
1098
998
            self._preserved_hooks[parent] = (name, current_hooks)
1099
 
        self._preserved_lazy_hooks = hooks._lazy_hooks
1100
 
        hooks._lazy_hooks = {}
1101
999
        self.addCleanup(self._restoreHooks)
1102
1000
        for key, (parent, name) in known_hooks.iter_parent_objects():
1103
1001
            factory = known_hooks.get(key)
1135
1033
        # break some locks on purpose and should be taken into account by
1136
1034
        # considering that breaking a lock is just a dirty way of releasing it.
1137
1035
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1138
 
            message = (
1139
 
                'Different number of acquired and '
1140
 
                'released or broken locks.\n'
1141
 
                'acquired=%s\n'
1142
 
                'released=%s\n'
1143
 
                'broken=%s\n' %
1144
 
                (acquired_locks, released_locks, broken_locks))
 
1036
            message = ('Different number of acquired and '
 
1037
                       'released or broken locks. (%s, %s + %s)' %
 
1038
                       (acquired_locks, released_locks, broken_locks))
1145
1039
            if not self._lock_check_thorough:
1146
1040
                # Rather than fail, just warn
1147
1041
                print "Broken test %s: %s" % (self, message)
1175
1069
 
1176
1070
    def permit_dir(self, name):
1177
1071
        """Permit a directory to be used by this test. See permit_url."""
1178
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1072
        name_transport = _mod_transport.get_transport(name)
1179
1073
        self.permit_url(name)
1180
1074
        self.permit_url(name_transport.base)
1181
1075
 
1260
1154
        self.addCleanup(transport_server.stop_server)
1261
1155
        # Obtain a real transport because if the server supplies a password, it
1262
1156
        # will be hidden from the base on the client side.
1263
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1157
        t = _mod_transport.get_transport(transport_server.get_url())
1264
1158
        # Some transport servers effectively chroot the backing transport;
1265
1159
        # others like SFTPServer don't - users of the transport can walk up the
1266
1160
        # transport to read the entire backing transport. This wouldn't matter
1367
1261
                         'st_mtime did not match')
1368
1262
        self.assertEqual(expected.st_ctime, actual.st_ctime,
1369
1263
                         'st_ctime did not match')
1370
 
        if sys.platform == 'win32':
 
1264
        if sys.platform != 'win32':
1371
1265
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1372
1266
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1373
 
            # odd. We just force it to always be 0 to avoid any problems.
1374
 
            self.assertEqual(0, expected.st_dev)
1375
 
            self.assertEqual(0, actual.st_dev)
1376
 
            self.assertEqual(0, expected.st_ino)
1377
 
            self.assertEqual(0, actual.st_ino)
1378
 
        else:
 
1267
            # odd. Regardless we shouldn't actually try to assert anything
 
1268
            # about their values
1379
1269
            self.assertEqual(expected.st_dev, actual.st_dev,
1380
1270
                             'st_dev did not match')
1381
1271
            self.assertEqual(expected.st_ino, actual.st_ino,
1390
1280
                length, len(obj_with_len), obj_with_len))
1391
1281
 
1392
1282
    def assertLogsError(self, exception_class, func, *args, **kwargs):
1393
 
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1283
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
1394
1284
        """
1395
1285
        captured = []
1396
1286
        orig_log_exception_quietly = trace.log_exception_quietly
1397
1287
        try:
1398
1288
            def capture():
1399
1289
                orig_log_exception_quietly()
1400
 
                captured.append(sys.exc_info()[1])
 
1290
                captured.append(sys.exc_info())
1401
1291
            trace.log_exception_quietly = capture
1402
1292
            func(*args, **kwargs)
1403
1293
        finally:
1404
1294
            trace.log_exception_quietly = orig_log_exception_quietly
1405
1295
        self.assertLength(1, captured)
1406
 
        err = captured[0]
 
1296
        err = captured[0][1]
1407
1297
        self.assertIsInstance(err, exception_class)
1408
1298
        return err
1409
1299
 
1544
1434
 
1545
1435
    def assertFileEqual(self, content, path):
1546
1436
        """Fail if path does not contain 'content'."""
1547
 
        self.assertPathExists(path)
 
1437
        self.failUnlessExists(path)
1548
1438
        f = file(path, 'rb')
1549
1439
        try:
1550
1440
            s = f.read()
1560
1450
        else:
1561
1451
            self.assertEqual(expected_docstring, obj.__doc__)
1562
1452
 
1563
 
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1564
1453
    def failUnlessExists(self, path):
1565
 
        return self.assertPathExists(path)
1566
 
 
1567
 
    def assertPathExists(self, path):
1568
1454
        """Fail unless path or paths, which may be abs or relative, exist."""
1569
1455
        if not isinstance(path, basestring):
1570
1456
            for p in path:
1571
 
                self.assertPathExists(p)
 
1457
                self.failUnlessExists(p)
1572
1458
        else:
1573
 
            self.assertTrue(osutils.lexists(path),
1574
 
                path + " does not exist")
 
1459
            self.failUnless(osutils.lexists(path),path+" does not exist")
1575
1460
 
1576
 
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1577
1461
    def failIfExists(self, path):
1578
 
        return self.assertPathDoesNotExist(path)
1579
 
 
1580
 
    def assertPathDoesNotExist(self, path):
1581
1462
        """Fail if path or paths, which may be abs or relative, exist."""
1582
1463
        if not isinstance(path, basestring):
1583
1464
            for p in path:
1584
 
                self.assertPathDoesNotExist(p)
 
1465
                self.failIfExists(p)
1585
1466
        else:
1586
 
            self.assertFalse(osutils.lexists(path),
1587
 
                path + " exists")
 
1467
            self.failIf(osutils.lexists(path),path+" exists")
1588
1468
 
1589
1469
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1590
1470
        """A helper for callDeprecated and applyDeprecated.
1616
1496
        not other callers that go direct to the warning module.
1617
1497
 
1618
1498
        To test that a deprecated method raises an error, do something like
1619
 
        this (remember that both assertRaises and applyDeprecated delays *args
1620
 
        and **kwargs passing)::
 
1499
        this::
1621
1500
 
1622
1501
            self.assertRaises(errors.ReservedId,
1623
1502
                self.applyDeprecated,
1705
1584
 
1706
1585
        The file is removed as the test is torn down.
1707
1586
        """
1708
 
        pseudo_log_file = StringIO()
1709
 
        def _get_log_contents_for_weird_testtools_api():
1710
 
            return [pseudo_log_file.getvalue().decode(
1711
 
                "utf-8", "replace").encode("utf-8")]
1712
 
        self.addDetail("log", content.Content(content.ContentType("text",
1713
 
            "plain", {"charset": "utf8"}),
1714
 
            _get_log_contents_for_weird_testtools_api))
1715
 
        self._log_file = pseudo_log_file
 
1587
        self._log_file = StringIO()
1716
1588
        self._log_memento = trace.push_log_file(self._log_file)
1717
1589
        self.addCleanup(self._finishLogFile)
1718
1590
 
1719
1591
    def _finishLogFile(self):
1720
1592
        """Finished with the log file.
1721
1593
 
1722
 
        Close the file and delete it.
 
1594
        Close the file and delete it, unless setKeepLogfile was called.
1723
1595
        """
1724
1596
        if trace._trace_file:
1725
1597
            # flush the log file, to get all content
1726
1598
            trace._trace_file.flush()
1727
1599
        trace.pop_log_file(self._log_memento)
 
1600
        # Cache the log result and delete the file on disk
 
1601
        self._get_log(False)
1728
1602
 
1729
1603
    def thisFailsStrictLockCheck(self):
1730
1604
        """It is known that this test would fail with -Dstrict_locks.
1742
1616
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1743
1617
        """Overrides an object attribute restoring it after the test.
1744
1618
 
1745
 
        :note: This should be used with discretion; you should think about
1746
 
        whether it's better to make the code testable without monkey-patching.
1747
 
 
1748
1619
        :param obj: The object that will be mutated.
1749
1620
 
1750
1621
        :param attr_name: The attribute name we want to preserve/override in
1775
1646
        self.addCleanup(osutils.set_or_unset_env, name, value)
1776
1647
        return value
1777
1648
 
1778
 
    def recordCalls(self, obj, attr_name):
1779
 
        """Monkeypatch in a wrapper that will record calls.
1780
 
 
1781
 
        The monkeypatch is automatically removed when the test concludes.
1782
 
 
1783
 
        :param obj: The namespace holding the reference to be replaced;
1784
 
            typically a module, class, or object.
1785
 
        :param attr_name: A string for the name of the attribute to 
1786
 
            patch.
1787
 
        :returns: A list that will be extended with one item every time the
1788
 
            function is called, with a tuple of (args, kwargs).
1789
 
        """
1790
 
        calls = []
1791
 
 
1792
 
        def decorator(*args, **kwargs):
1793
 
            calls.append((args, kwargs))
1794
 
            return orig(*args, **kwargs)
1795
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1796
 
        return calls
1797
 
 
1798
1649
    def _cleanEnvironment(self):
1799
1650
        for name, value in isolated_environ.iteritems():
1800
1651
            self.overrideEnv(name, value)
1802
1653
    def _restoreHooks(self):
1803
1654
        for klass, (name, hooks) in self._preserved_hooks.items():
1804
1655
            setattr(klass, name, hooks)
1805
 
        self._preserved_hooks.clear()
1806
 
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1807
 
        self._preserved_lazy_hooks.clear()
1808
1656
 
1809
1657
    def knownFailure(self, reason):
1810
 
        """Declare that this test fails for a known reason
1811
 
 
1812
 
        Tests that are known to fail should generally be using expectedFailure
1813
 
        with an appropriate reverse assertion if a change could cause the test
1814
 
        to start passing. Conversely if the test has no immediate prospect of
1815
 
        succeeding then using skip is more suitable.
1816
 
 
1817
 
        When this method is called while an exception is being handled, that
1818
 
        traceback will be used, otherwise a new exception will be thrown to
1819
 
        provide one but won't be reported.
1820
 
        """
1821
 
        self._add_reason(reason)
1822
 
        try:
1823
 
            exc_info = sys.exc_info()
1824
 
            if exc_info != (None, None, None):
1825
 
                self._report_traceback(exc_info)
1826
 
            else:
1827
 
                try:
1828
 
                    raise self.failureException(reason)
1829
 
                except self.failureException:
1830
 
                    exc_info = sys.exc_info()
1831
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1833
 
        finally:
1834
 
            del exc_info
 
1658
        """This test has failed for some known reason."""
 
1659
        raise KnownFailure(reason)
1835
1660
 
1836
1661
    def _suppress_log(self):
1837
1662
        """Remove the log info from details."""
1925
1750
    def log(self, *args):
1926
1751
        trace.mutter(*args)
1927
1752
 
 
1753
    def _get_log(self, keep_log_file=False):
 
1754
        """Internal helper to get the log from bzrlib.trace for this test.
 
1755
 
 
1756
        Please use self.getDetails, or self.get_log to access this in test case
 
1757
        code.
 
1758
 
 
1759
        :param keep_log_file: When True, if the log is still a file on disk
 
1760
            leave it as a file on disk. When False, if the log is still a file
 
1761
            on disk, the log file is deleted and the log preserved as
 
1762
            self._log_contents.
 
1763
        :return: A string containing the log.
 
1764
        """
 
1765
        if self._log_contents is not None:
 
1766
            try:
 
1767
                self._log_contents.decode('utf8')
 
1768
            except UnicodeDecodeError:
 
1769
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1770
                self._log_contents = unicodestr.encode('utf8')
 
1771
            return self._log_contents
 
1772
        if self._log_file is not None:
 
1773
            log_contents = self._log_file.getvalue()
 
1774
            try:
 
1775
                log_contents.decode('utf8')
 
1776
            except UnicodeDecodeError:
 
1777
                unicodestr = log_contents.decode('utf8', 'replace')
 
1778
                log_contents = unicodestr.encode('utf8')
 
1779
            if not keep_log_file:
 
1780
                self._log_file = None
 
1781
                # Permit multiple calls to get_log until we clean it up in
 
1782
                # finishLogFile
 
1783
                self._log_contents = log_contents
 
1784
            return log_contents
 
1785
        else:
 
1786
            return "No log file content."
 
1787
 
1928
1788
    def get_log(self):
1929
1789
        """Get a unicode string containing the log from bzrlib.trace.
1930
1790
 
2130
1990
    def start_bzr_subprocess(self, process_args, env_changes=None,
2131
1991
                             skip_if_plan_to_signal=False,
2132
1992
                             working_dir=None,
2133
 
                             allow_plugins=False, stderr=subprocess.PIPE):
 
1993
                             allow_plugins=False):
2134
1994
        """Start bzr in a subprocess for testing.
2135
1995
 
2136
1996
        This starts a new Python interpreter and runs bzr in there.
2148
2008
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2149
2009
            doesn't support signalling subprocesses.
2150
2010
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
2151
 
        :param stderr: file to use for the subprocess's stderr.  Valid values
2152
 
            are those valid for the stderr argument of `subprocess.Popen`.
2153
 
            Default value is ``subprocess.PIPE``.
2154
2011
 
2155
2012
        :returns: Popen object for the started process.
2156
2013
        """
2182
2039
            # so we will avoid using it on all platforms, just to
2183
2040
            # make sure the code path is used, and we don't break on win32
2184
2041
            cleanup_environment()
2185
 
            # Include the subprocess's log file in the test details, in case
2186
 
            # the test fails due to an error in the subprocess.
2187
 
            self._add_subprocess_log(trace._get_bzr_log_filename())
2188
2042
            command = [sys.executable]
2189
2043
            # frozen executables don't need the path to bzr
2190
2044
            if getattr(sys, "frozen", None) is None:
2194
2048
            command.extend(process_args)
2195
2049
            process = self._popen(command, stdin=subprocess.PIPE,
2196
2050
                                  stdout=subprocess.PIPE,
2197
 
                                  stderr=stderr)
 
2051
                                  stderr=subprocess.PIPE)
2198
2052
        finally:
2199
2053
            restore_environment()
2200
2054
            if cwd is not None:
2202
2056
 
2203
2057
        return process
2204
2058
 
2205
 
    def _add_subprocess_log(self, log_file_path):
2206
 
        if len(self._log_files) == 0:
2207
 
            # Register an addCleanup func.  We do this on the first call to
2208
 
            # _add_subprocess_log rather than in TestCase.setUp so that this
2209
 
            # addCleanup is registered after any cleanups for tempdirs that
2210
 
            # subclasses might create, which will probably remove the log file
2211
 
            # we want to read.
2212
 
            self.addCleanup(self._subprocess_log_cleanup)
2213
 
        # self._log_files is a set, so if a log file is reused we won't grab it
2214
 
        # twice.
2215
 
        self._log_files.add(log_file_path)
2216
 
 
2217
 
    def _subprocess_log_cleanup(self):
2218
 
        for count, log_file_path in enumerate(self._log_files):
2219
 
            # We use buffer_now=True to avoid holding the file open beyond
2220
 
            # the life of this function, which might interfere with e.g.
2221
 
            # cleaning tempdirs on Windows.
2222
 
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
 
            #detail_content = content.content_from_file(
2224
 
            #    log_file_path, buffer_now=True)
2225
 
            with open(log_file_path, 'rb') as log_file:
2226
 
                log_file_bytes = log_file.read()
2227
 
            detail_content = content.Content(content.ContentType("text",
2228
 
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
 
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2230
 
                detail_content)
2231
 
 
2232
2059
    def _popen(self, *args, **kwargs):
2233
2060
        """Place a call to Popen.
2234
2061
 
2277
2104
                      % (process_args, retcode, process.returncode))
2278
2105
        return [out, err]
2279
2106
 
2280
 
    def check_tree_shape(self, tree, shape):
2281
 
        """Compare a tree to a list of expected names.
 
2107
    def check_inventory_shape(self, inv, shape):
 
2108
        """Compare an inventory to a list of expected names.
2282
2109
 
2283
2110
        Fail if they are not precisely equal.
2284
2111
        """
2285
2112
        extras = []
2286
2113
        shape = list(shape)             # copy
2287
 
        for path, ie in tree.iter_entries_by_dir():
 
2114
        for path, ie in inv.entries():
2288
2115
            name = path.replace('\\', '/')
2289
2116
            if ie.kind == 'directory':
2290
2117
                name = name + '/'
2291
 
            if name == "/":
2292
 
                pass # ignore root entry
2293
 
            elif name in shape:
 
2118
            if name in shape:
2294
2119
                shape.remove(name)
2295
2120
            else:
2296
2121
                extras.append(name)
2386
2211
class TestCaseWithMemoryTransport(TestCase):
2387
2212
    """Common test class for tests that do not need disk resources.
2388
2213
 
2389
 
    Tests that need disk resources should derive from TestCaseInTempDir
2390
 
    orTestCaseWithTransport.
 
2214
    Tests that need disk resources should derive from TestCaseWithTransport.
2391
2215
 
2392
2216
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2393
2217
 
2394
 
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2218
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2219
    a directory which does not exist. This serves to help ensure test isolation
2396
 
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
 
    must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
 
    defaults for the transport in tests, nor does it obey the command line
 
2220
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2221
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2222
    file defaults for the transport in tests, nor does it obey the command line
2399
2223
    override, so tests that accidentally write to the common directory should
2400
2224
    be rare.
2401
2225
 
2402
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
 
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2226
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2227
    a .bzr directory that stops us ascending higher into the filesystem.
2404
2228
    """
2405
2229
 
2406
2230
    TEST_ROOT = None
2424
2248
 
2425
2249
        :param relpath: a path relative to the base url.
2426
2250
        """
2427
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2251
        t = _mod_transport.get_transport(self.get_url(relpath))
2428
2252
        self.assertFalse(t.is_readonly())
2429
2253
        return t
2430
2254
 
2436
2260
 
2437
2261
        :param relpath: a path relative to the base url.
2438
2262
        """
2439
 
        t = _mod_transport.get_transport_from_url(
2440
 
            self.get_readonly_url(relpath))
 
2263
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2441
2264
        self.assertTrue(t.is_readonly())
2442
2265
        return t
2443
2266
 
2564
2387
        real branch.
2565
2388
        """
2566
2389
        root = TestCaseWithMemoryTransport.TEST_ROOT
2567
 
        try:
2568
 
            # Make sure we get a readable and accessible home for .bzr.log
2569
 
            # and/or config files, and not fallback to weird defaults (see
2570
 
            # http://pad.lv/825027).
2571
 
            self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
 
            os.environ['BZR_HOME'] = root
2573
 
            wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
 
            del os.environ['BZR_HOME']
2575
 
        except Exception, e:
2576
 
            self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2578
 
        # we don't need to re-open the wt to check it hasn't changed.
2579
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
 
            wt.control_transport.get_bytes('dirstate'))
 
2390
        bzrdir.BzrDir.create_standalone_workingtree(root)
2581
2391
 
2582
2392
    def _check_safety_net(self):
2583
2393
        """Check that the safety .bzr directory have not been touched.
2586
2396
        propagating. This method ensures than a test did not leaked.
2587
2397
        """
2588
2398
        root = TestCaseWithMemoryTransport.TEST_ROOT
2589
 
        t = _mod_transport.get_transport_from_path(root)
2590
 
        self.permit_url(t.base)
2591
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2592
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2399
        self.permit_url(_mod_transport.get_transport(root).base)
 
2400
        wt = workingtree.WorkingTree.open(root)
 
2401
        last_rev = wt.last_revision()
 
2402
        if last_rev != 'null:':
2593
2403
            # The current test have modified the /bzr directory, we need to
2594
2404
            # recreate a new one or all the followng tests will fail.
2595
2405
            # If you need to inspect its content uncomment the following line
2630
2440
    def make_branch(self, relpath, format=None):
2631
2441
        """Create a branch on the transport at relpath."""
2632
2442
        repo = self.make_repository(relpath, format=format)
2633
 
        return repo.bzrdir.create_branch(append_revisions_only=False)
2634
 
 
2635
 
    def resolve_format(self, format):
2636
 
        """Resolve an object to a ControlDir format object.
2637
 
 
2638
 
        The initial format object can either already be
2639
 
        a ControlDirFormat, None (for the default format),
2640
 
        or a string with the name of the control dir format.
2641
 
 
2642
 
        :param format: Object to resolve
2643
 
        :return A ControlDirFormat instance
2644
 
        """
2645
 
        if format is None:
2646
 
            format = 'default'
2647
 
        if isinstance(format, basestring):
2648
 
            format = bzrdir.format_registry.make_bzrdir(format)
2649
 
        return format
2650
 
 
2651
 
    def resolve_format(self, format):
2652
 
        """Resolve an object to a ControlDir format object.
2653
 
 
2654
 
        The initial format object can either already be
2655
 
        a ControlDirFormat, None (for the default format),
2656
 
        or a string with the name of the control dir format.
2657
 
 
2658
 
        :param format: Object to resolve
2659
 
        :return A ControlDirFormat instance
2660
 
        """
2661
 
        if format is None:
2662
 
            format = 'default'
2663
 
        if isinstance(format, basestring):
2664
 
            format = bzrdir.format_registry.make_bzrdir(format)
2665
 
        return format
 
2443
        return repo.bzrdir.create_branch()
2666
2444
 
2667
2445
    def make_bzrdir(self, relpath, format=None):
2668
2446
        try:
2672
2450
            t = _mod_transport.get_transport(maybe_a_url)
2673
2451
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2452
                t.ensure_base()
2675
 
            format = self.resolve_format(format)
 
2453
            if format is None:
 
2454
                format = 'default'
 
2455
            if isinstance(format, basestring):
 
2456
                format = bzrdir.format_registry.make_bzrdir(format)
2676
2457
            return format.initialize_on_transport(t)
2677
2458
        except errors.UninitializableFormat:
2678
2459
            raise TestSkipped("Format %s is not initializable." % format)
2679
2460
 
2680
 
    def make_repository(self, relpath, shared=None, format=None):
 
2461
    def make_repository(self, relpath, shared=False, format=None):
2681
2462
        """Create a repository on our default transport at relpath.
2682
2463
 
2683
2464
        Note that relpath must be a relative path, not a full url.
2694
2475
            backing_server = self.get_server()
2695
2476
        smart_server = test_server.SmartTCPServer_for_testing()
2696
2477
        self.start_server(smart_server, backing_server)
2697
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2478
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2698
2479
                                                   ).clone(path)
2699
2480
        return remote_transport
2700
2481
 
2717
2498
    def setUp(self):
2718
2499
        super(TestCaseWithMemoryTransport, self).setUp()
2719
2500
        # Ensure that ConnectedTransport doesn't leak sockets
2720
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2501
        def get_transport_with_cleanup(*args, **kwargs):
 
2502
            t = orig_get_transport(*args, **kwargs)
2722
2503
            if isinstance(t, _mod_transport.ConnectedTransport):
2723
2504
                self.addCleanup(t.disconnect)
2724
2505
            return t
2725
2506
 
2726
 
        orig_get_transport_from_url = self.overrideAttr(
2727
 
            _mod_transport, 'get_transport_from_url',
2728
 
            get_transport_from_url_with_cleanup)
 
2507
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2508
                                               get_transport_with_cleanup)
2729
2509
        self._make_test_root()
2730
2510
        self.addCleanup(os.chdir, os.getcwdu())
2731
2511
        self.makeAndChdirToTestDir()
2774
2554
 
2775
2555
    OVERRIDE_PYTHON = 'python'
2776
2556
 
2777
 
    def setUp(self):
2778
 
        super(TestCaseInTempDir, self).setUp()
2779
 
        # Remove the protection set in isolated_environ, we have a proper
2780
 
        # access to disk resources now.
2781
 
        self.overrideEnv('BZR_LOG', None)
2782
 
 
2783
2557
    def check_file_contents(self, filename, expect):
2784
2558
        self.log("check contents of file %s" % filename)
2785
2559
        f = file(filename)
2866
2640
                "a list or a tuple. Got %r instead" % (shape,))
2867
2641
        # It's OK to just create them using forward slashes on windows.
2868
2642
        if transport is None or transport.is_readonly():
2869
 
            transport = _mod_transport.get_transport_from_path(".")
 
2643
            transport = _mod_transport.get_transport(".")
2870
2644
        for name in shape:
2871
2645
            self.assertIsInstance(name, basestring)
2872
2646
            if name[-1] == '/':
3676
3450
#                           with proper exclusion rules.
3677
3451
#   -Ethreads               Will display thread ident at creation/join time to
3678
3452
#                           help track thread leaks
3679
 
 
3680
 
#   -Econfig_stats          Will collect statistics using addDetail
3681
3453
selftest_debug_flags = set()
3682
3454
 
3683
3455
 
3935
3707
        'bzrlib.tests.per_repository',
3936
3708
        'bzrlib.tests.per_repository_chk',
3937
3709
        'bzrlib.tests.per_repository_reference',
3938
 
        'bzrlib.tests.per_repository_vf',
3939
3710
        'bzrlib.tests.per_uifactory',
3940
3711
        'bzrlib.tests.per_versionedfile',
3941
3712
        'bzrlib.tests.per_workingtree',
3975
3746
        'bzrlib.tests.test_commit_merge',
3976
3747
        'bzrlib.tests.test_config',
3977
3748
        'bzrlib.tests.test_conflicts',
3978
 
        'bzrlib.tests.test_controldir',
3979
3749
        'bzrlib.tests.test_counted_lock',
3980
3750
        'bzrlib.tests.test_crash',
3981
3751
        'bzrlib.tests.test_decorators',
3982
3752
        'bzrlib.tests.test_delta',
3983
3753
        'bzrlib.tests.test_debug',
 
3754
        'bzrlib.tests.test_deprecated_graph',
3984
3755
        'bzrlib.tests.test_diff',
3985
3756
        'bzrlib.tests.test_directory_service',
3986
3757
        'bzrlib.tests.test_dirstate',
3987
3758
        'bzrlib.tests.test_email_message',
3988
3759
        'bzrlib.tests.test_eol_filters',
3989
3760
        'bzrlib.tests.test_errors',
3990
 
        'bzrlib.tests.test_estimate_compressed_size',
3991
3761
        'bzrlib.tests.test_export',
3992
 
        'bzrlib.tests.test_export_pot',
3993
3762
        'bzrlib.tests.test_extract',
3994
 
        'bzrlib.tests.test_features',
3995
3763
        'bzrlib.tests.test_fetch',
3996
3764
        'bzrlib.tests.test_fixtures',
3997
3765
        'bzrlib.tests.test_fifo_cache',
3998
3766
        'bzrlib.tests.test_filters',
3999
 
        'bzrlib.tests.test_filter_tree',
4000
3767
        'bzrlib.tests.test_ftp_transport',
4001
3768
        'bzrlib.tests.test_foreign',
4002
3769
        'bzrlib.tests.test_generate_docs',
4011
3778
        'bzrlib.tests.test_http',
4012
3779
        'bzrlib.tests.test_http_response',
4013
3780
        'bzrlib.tests.test_https_ca_bundle',
4014
 
        'bzrlib.tests.test_i18n',
4015
3781
        'bzrlib.tests.test_identitymap',
4016
3782
        'bzrlib.tests.test_ignores',
4017
3783
        'bzrlib.tests.test_index',
4077
3843
        'bzrlib.tests.test_smart',
4078
3844
        'bzrlib.tests.test_smart_add',
4079
3845
        'bzrlib.tests.test_smart_request',
4080
 
        'bzrlib.tests.test_smart_signals',
4081
3846
        'bzrlib.tests.test_smart_transport',
4082
3847
        'bzrlib.tests.test_smtp_connection',
4083
3848
        'bzrlib.tests.test_source',
4110
3875
        'bzrlib.tests.test_upgrade',
4111
3876
        'bzrlib.tests.test_upgrade_stacked',
4112
3877
        'bzrlib.tests.test_urlutils',
4113
 
        'bzrlib.tests.test_utextwrap',
4114
3878
        'bzrlib.tests.test_version',
4115
3879
        'bzrlib.tests.test_version_info',
4116
3880
        'bzrlib.tests.test_versionedfile',
4133
3897
        'bzrlib',
4134
3898
        'bzrlib.branchbuilder',
4135
3899
        'bzrlib.decorators',
 
3900
        'bzrlib.export',
4136
3901
        'bzrlib.inventory',
4137
3902
        'bzrlib.iterablefile',
4138
3903
        'bzrlib.lockdir',
4394
4159
        the module is available.
4395
4160
    """
4396
4161
 
4397
 
    from bzrlib.tests.features import ModuleAvailableFeature
4398
4162
    py_module = pyutils.get_named_object(py_module_name)
4399
4163
    scenarios = [
4400
4164
        ('python', {'module': py_module}),
4441
4205
                         % (os.path.basename(dirname), printable_e))
4442
4206
 
4443
4207
 
 
4208
class Feature(object):
 
4209
    """An operating system Feature."""
 
4210
 
 
4211
    def __init__(self):
 
4212
        self._available = None
 
4213
 
 
4214
    def available(self):
 
4215
        """Is the feature available?
 
4216
 
 
4217
        :return: True if the feature is available.
 
4218
        """
 
4219
        if self._available is None:
 
4220
            self._available = self._probe()
 
4221
        return self._available
 
4222
 
 
4223
    def _probe(self):
 
4224
        """Implement this method in concrete features.
 
4225
 
 
4226
        :return: True if the feature is available.
 
4227
        """
 
4228
        raise NotImplementedError
 
4229
 
 
4230
    def __str__(self):
 
4231
        if getattr(self, 'feature_name', None):
 
4232
            return self.feature_name()
 
4233
        return self.__class__.__name__
 
4234
 
 
4235
 
 
4236
class _SymlinkFeature(Feature):
 
4237
 
 
4238
    def _probe(self):
 
4239
        return osutils.has_symlinks()
 
4240
 
 
4241
    def feature_name(self):
 
4242
        return 'symlinks'
 
4243
 
 
4244
SymlinkFeature = _SymlinkFeature()
 
4245
 
 
4246
 
 
4247
class _HardlinkFeature(Feature):
 
4248
 
 
4249
    def _probe(self):
 
4250
        return osutils.has_hardlinks()
 
4251
 
 
4252
    def feature_name(self):
 
4253
        return 'hardlinks'
 
4254
 
 
4255
HardlinkFeature = _HardlinkFeature()
 
4256
 
 
4257
 
 
4258
class _OsFifoFeature(Feature):
 
4259
 
 
4260
    def _probe(self):
 
4261
        return getattr(os, 'mkfifo', None)
 
4262
 
 
4263
    def feature_name(self):
 
4264
        return 'filesystem fifos'
 
4265
 
 
4266
OsFifoFeature = _OsFifoFeature()
 
4267
 
 
4268
 
 
4269
class _UnicodeFilenameFeature(Feature):
 
4270
    """Does the filesystem support Unicode filenames?"""
 
4271
 
 
4272
    def _probe(self):
 
4273
        try:
 
4274
            # Check for character combinations unlikely to be covered by any
 
4275
            # single non-unicode encoding. We use the characters
 
4276
            # - greek small letter alpha (U+03B1) and
 
4277
            # - braille pattern dots-123456 (U+283F).
 
4278
            os.stat(u'\u03b1\u283f')
 
4279
        except UnicodeEncodeError:
 
4280
            return False
 
4281
        except (IOError, OSError):
 
4282
            # The filesystem allows the Unicode filename but the file doesn't
 
4283
            # exist.
 
4284
            return True
 
4285
        else:
 
4286
            # The filesystem allows the Unicode filename and the file exists,
 
4287
            # for some reason.
 
4288
            return True
 
4289
 
 
4290
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4291
 
 
4292
 
 
4293
class _CompatabilityThunkFeature(Feature):
 
4294
    """This feature is just a thunk to another feature.
 
4295
 
 
4296
    It issues a deprecation warning if it is accessed, to let you know that you
 
4297
    should really use a different feature.
 
4298
    """
 
4299
 
 
4300
    def __init__(self, dep_version, module, name,
 
4301
                 replacement_name, replacement_module=None):
 
4302
        super(_CompatabilityThunkFeature, self).__init__()
 
4303
        self._module = module
 
4304
        if replacement_module is None:
 
4305
            replacement_module = module
 
4306
        self._replacement_module = replacement_module
 
4307
        self._name = name
 
4308
        self._replacement_name = replacement_name
 
4309
        self._dep_version = dep_version
 
4310
        self._feature = None
 
4311
 
 
4312
    def _ensure(self):
 
4313
        if self._feature is None:
 
4314
            depr_msg = self._dep_version % ('%s.%s'
 
4315
                                            % (self._module, self._name))
 
4316
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4317
                                               self._replacement_name)
 
4318
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4319
            # Import the new feature and use it as a replacement for the
 
4320
            # deprecated one.
 
4321
            self._feature = pyutils.get_named_object(
 
4322
                self._replacement_module, self._replacement_name)
 
4323
 
 
4324
    def _probe(self):
 
4325
        self._ensure()
 
4326
        return self._feature._probe()
 
4327
 
 
4328
 
 
4329
class ModuleAvailableFeature(Feature):
 
4330
    """This is a feature than describes a module we want to be available.
 
4331
 
 
4332
    Declare the name of the module in __init__(), and then after probing, the
 
4333
    module will be available as 'self.module'.
 
4334
 
 
4335
    :ivar module: The module if it is available, else None.
 
4336
    """
 
4337
 
 
4338
    def __init__(self, module_name):
 
4339
        super(ModuleAvailableFeature, self).__init__()
 
4340
        self.module_name = module_name
 
4341
 
 
4342
    def _probe(self):
 
4343
        try:
 
4344
            self._module = __import__(self.module_name, {}, {}, [''])
 
4345
            return True
 
4346
        except ImportError:
 
4347
            return False
 
4348
 
 
4349
    @property
 
4350
    def module(self):
 
4351
        if self.available(): # Make sure the probe has been done
 
4352
            return self._module
 
4353
        return None
 
4354
 
 
4355
    def feature_name(self):
 
4356
        return self.module_name
 
4357
 
 
4358
 
4444
4359
def probe_unicode_in_user_encoding():
4445
4360
    """Try to encode several unicode strings to use in unicode-aware tests.
4446
4361
    Return first successfull match.
4474
4389
    return None
4475
4390
 
4476
4391
 
 
4392
class _HTTPSServerFeature(Feature):
 
4393
    """Some tests want an https Server, check if one is available.
 
4394
 
 
4395
    Right now, the only way this is available is under python2.6 which provides
 
4396
    an ssl module.
 
4397
    """
 
4398
 
 
4399
    def _probe(self):
 
4400
        try:
 
4401
            import ssl
 
4402
            return True
 
4403
        except ImportError:
 
4404
            return False
 
4405
 
 
4406
    def feature_name(self):
 
4407
        return 'HTTPSServer'
 
4408
 
 
4409
 
 
4410
HTTPSServerFeature = _HTTPSServerFeature()
 
4411
 
 
4412
 
 
4413
class _UnicodeFilename(Feature):
 
4414
    """Does the filesystem support Unicode filenames?"""
 
4415
 
 
4416
    def _probe(self):
 
4417
        try:
 
4418
            os.stat(u'\u03b1')
 
4419
        except UnicodeEncodeError:
 
4420
            return False
 
4421
        except (IOError, OSError):
 
4422
            # The filesystem allows the Unicode filename but the file doesn't
 
4423
            # exist.
 
4424
            return True
 
4425
        else:
 
4426
            # The filesystem allows the Unicode filename and the file exists,
 
4427
            # for some reason.
 
4428
            return True
 
4429
 
 
4430
UnicodeFilename = _UnicodeFilename()
 
4431
 
 
4432
 
 
4433
class _ByteStringNamedFilesystem(Feature):
 
4434
    """Is the filesystem based on bytes?"""
 
4435
 
 
4436
    def _probe(self):
 
4437
        if os.name == "posix":
 
4438
            return True
 
4439
        return False
 
4440
 
 
4441
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4442
 
 
4443
 
 
4444
class _UTF8Filesystem(Feature):
 
4445
    """Is the filesystem UTF-8?"""
 
4446
 
 
4447
    def _probe(self):
 
4448
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4449
            return True
 
4450
        return False
 
4451
 
 
4452
UTF8Filesystem = _UTF8Filesystem()
 
4453
 
 
4454
 
 
4455
class _BreakinFeature(Feature):
 
4456
    """Does this platform support the breakin feature?"""
 
4457
 
 
4458
    def _probe(self):
 
4459
        from bzrlib import breakin
 
4460
        if breakin.determine_signal() is None:
 
4461
            return False
 
4462
        if sys.platform == 'win32':
 
4463
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4464
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4465
            # access the function
 
4466
            try:
 
4467
                import ctypes
 
4468
            except OSError:
 
4469
                return False
 
4470
        return True
 
4471
 
 
4472
    def feature_name(self):
 
4473
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4474
 
 
4475
 
 
4476
BreakinFeature = _BreakinFeature()
 
4477
 
 
4478
 
 
4479
class _CaseInsCasePresFilenameFeature(Feature):
 
4480
    """Is the file-system case insensitive, but case-preserving?"""
 
4481
 
 
4482
    def _probe(self):
 
4483
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4484
        try:
 
4485
            # first check truly case-preserving for created files, then check
 
4486
            # case insensitive when opening existing files.
 
4487
            name = osutils.normpath(name)
 
4488
            base, rel = osutils.split(name)
 
4489
            found_rel = osutils.canonical_relpath(base, name)
 
4490
            return (found_rel == rel
 
4491
                    and os.path.isfile(name.upper())
 
4492
                    and os.path.isfile(name.lower()))
 
4493
        finally:
 
4494
            os.close(fileno)
 
4495
            os.remove(name)
 
4496
 
 
4497
    def feature_name(self):
 
4498
        return "case-insensitive case-preserving filesystem"
 
4499
 
 
4500
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4501
 
 
4502
 
 
4503
class _CaseInsensitiveFilesystemFeature(Feature):
 
4504
    """Check if underlying filesystem is case-insensitive but *not* case
 
4505
    preserving.
 
4506
    """
 
4507
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4508
    # more likely to be case preserving, so this case is rare.
 
4509
 
 
4510
    def _probe(self):
 
4511
        if CaseInsCasePresFilenameFeature.available():
 
4512
            return False
 
4513
 
 
4514
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4515
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4516
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4517
        else:
 
4518
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4519
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4520
            dir=root)
 
4521
        name_a = osutils.pathjoin(tdir, 'a')
 
4522
        name_A = osutils.pathjoin(tdir, 'A')
 
4523
        os.mkdir(name_a)
 
4524
        result = osutils.isdir(name_A)
 
4525
        _rmtree_temp_dir(tdir)
 
4526
        return result
 
4527
 
 
4528
    def feature_name(self):
 
4529
        return 'case-insensitive filesystem'
 
4530
 
 
4531
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4532
 
 
4533
 
 
4534
class _CaseSensitiveFilesystemFeature(Feature):
 
4535
 
 
4536
    def _probe(self):
 
4537
        if CaseInsCasePresFilenameFeature.available():
 
4538
            return False
 
4539
        elif CaseInsensitiveFilesystemFeature.available():
 
4540
            return False
 
4541
        else:
 
4542
            return True
 
4543
 
 
4544
    def feature_name(self):
 
4545
        return 'case-sensitive filesystem'
 
4546
 
 
4547
# new coding style is for feature instances to be lowercase
 
4548
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4549
 
 
4550
 
4477
4551
# Only define SubUnitBzrRunner if subunit is available.
4478
4552
try:
4479
4553
    from subunit import TestProtocolClient
4497
4571
except ImportError:
4498
4572
    pass
4499
4573
 
4500
 
 
4501
 
@deprecated_function(deprecated_in((2, 5, 0)))
4502
 
def ModuleAvailableFeature(name):
4503
 
    from bzrlib.tests import features
4504
 
    return features.ModuleAvailableFeature(name)
4505
 
    
 
4574
class _PosixPermissionsFeature(Feature):
 
4575
 
 
4576
    def _probe(self):
 
4577
        def has_perms():
 
4578
            # create temporary file and check if specified perms are maintained.
 
4579
            import tempfile
 
4580
 
 
4581
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4582
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4583
            fd, name = f
 
4584
            os.close(fd)
 
4585
            os.chmod(name, write_perms)
 
4586
 
 
4587
            read_perms = os.stat(name).st_mode & 0777
 
4588
            os.unlink(name)
 
4589
            return (write_perms == read_perms)
 
4590
 
 
4591
        return (os.name == 'posix') and has_perms()
 
4592
 
 
4593
    def feature_name(self):
 
4594
        return 'POSIX permissions support'
 
4595
 
 
4596
posix_permissions_feature = _PosixPermissionsFeature()