~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

(gz) Fix test failure on alpha by correcting format string for
 gc_chk_sha1_record (Martin [gz])

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
61
67
    chk_map,
62
68
    commands as _mod_commands,
63
69
    config,
64
 
    i18n,
65
70
    debug,
66
71
    errors,
67
72
    hooks,
89
94
    memory,
90
95
    pathfilter,
91
96
    )
92
 
from bzrlib.symbol_versioning import (
93
 
    deprecated_function,
94
 
    deprecated_in,
95
 
    )
96
97
from bzrlib.tests import (
97
98
    test_server,
98
99
    TestUtil,
141
142
    'BZREMAIL': None, # may still be present in the environment
142
143
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
144
    'BZR_PROGRESS_BAR': None,
144
 
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
 
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
146
 
    # TestCase should not use disk resources, BZR_LOG is one.
147
 
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
145
    'BZR_LOG': None,
148
146
    'BZR_PLUGIN_PATH': None,
149
147
    'BZR_DISABLE_PLUGINS': None,
150
148
    'BZR_PLUGINS_AT': None,
379
377
        if isinstance(test, TestCase):
380
378
            test.addCleanup(self._check_leaked_threads, test)
381
379
 
382
 
    def stopTest(self, test):
383
 
        super(ExtendedTestResult, self).stopTest(test)
384
 
        # Manually break cycles, means touching various private things but hey
385
 
        getDetails = getattr(test, "getDetails", None)
386
 
        if getDetails is not None:
387
 
            getDetails().clear()
388
 
        # Clear _type_equality_funcs to try to stop TestCase instances
389
 
        # from wasting memory. 'clear' is not available in all Python
390
 
        # versions (bug 809048)
391
 
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
392
 
        if type_equality_funcs is not None:
393
 
            tef_clear = getattr(type_equality_funcs, "clear", None)
394
 
            if tef_clear is None:
395
 
                tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
396
 
                if tef_instance_dict is not None:
397
 
                    tef_clear = tef_instance_dict.clear
398
 
            if tef_clear is not None:
399
 
                tef_clear()
400
 
        self._traceback_from_test = None
401
 
 
402
380
    def startTests(self):
403
381
        self.report_tests_starting()
404
382
        self._active_threads = threading.enumerate()
405
383
 
 
384
    def stopTest(self, test):
 
385
        self._traceback_from_test = None
 
386
 
406
387
    def _check_leaked_threads(self, test):
407
388
        """See if any threads have leaked since last call
408
389
 
468
449
        self.known_failure_count += 1
469
450
        self.report_known_failure(test, err)
470
451
 
471
 
    def addUnexpectedSuccess(self, test, details=None):
472
 
        """Tell result the test unexpectedly passed, counting as a failure
473
 
 
474
 
        When the minimum version of testtools required becomes 0.9.8 this
475
 
        can be updated to use the new handling there.
476
 
        """
477
 
        super(ExtendedTestResult, self).addFailure(test, details=details)
478
 
        self.failure_count += 1
479
 
        self.report_unexpected_success(test,
480
 
            "".join(details["reason"].iter_text()))
481
 
        if self.stop_early:
482
 
            self.stop()
483
 
 
484
452
    def addNotSupported(self, test, feature):
485
453
        """The test will not be run because of a missing feature.
486
454
        """
645
613
    def report_known_failure(self, test, err):
646
614
        pass
647
615
 
648
 
    def report_unexpected_success(self, test, reason):
649
 
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
650
 
            self._test_description(test),
651
 
            "Unexpected success. Should have failed",
652
 
            reason,
653
 
            ))
654
 
 
655
616
    def report_skip(self, test, reason):
656
617
        pass
657
618
 
709
670
                % (self._testTimeString(test),
710
671
                   self._error_summary(err)))
711
672
 
712
 
    def report_unexpected_success(self, test, reason):
713
 
        self.stream.write(' FAIL %s\n%s: %s\n'
714
 
                % (self._testTimeString(test),
715
 
                   "Unexpected success. Should have failed",
716
 
                   reason))
717
 
 
718
673
    def report_success(self, test):
719
674
        self.stream.write('   OK %s\n' % self._testTimeString(test))
720
675
        for bench_called, stats in getattr(test, '_benchcalls', []):
939
894
 
940
895
    The method is really a factory and users are expected to use it as such.
941
896
    """
942
 
 
 
897
    
943
898
    kwargs['setUp'] = isolated_doctest_setUp
944
899
    kwargs['tearDown'] = isolated_doctest_tearDown
945
900
    return doctest.DocTestSuite(*args, **kwargs)
982
937
        super(TestCase, self).setUp()
983
938
        for feature in getattr(self, '_test_needs_features', []):
984
939
            self.requireFeature(feature)
 
940
        self._log_contents = None
 
941
        self.addDetail("log", content.Content(content.ContentType("text",
 
942
            "plain", {"charset": "utf8"}),
 
943
            lambda:[self._get_log(keep_log_file=True)]))
985
944
        self._cleanEnvironment()
986
945
        self._silenceUI()
987
946
        self._startLogFile()
999
958
        # settled on or a the FIXME associated with _get_expand_default_value
1000
959
        # is addressed -- vila 20110219
1001
960
        self.overrideAttr(config, '_expand_default_value', None)
1002
 
        self._log_files = set()
1003
 
        # Each key in the ``_counters`` dict holds a value for a different
1004
 
        # counter. When the test ends, addDetail() should be used to output the
1005
 
        # counter values. This happens in install_counter_hook().
1006
 
        self._counters = {}
1007
 
        if 'config_stats' in selftest_debug_flags:
1008
 
            self._install_config_stats_hooks()
1009
 
        # Do not use i18n for tests (unless the test reverses this)
1010
 
        i18n.disable_i18n()
1011
961
 
1012
962
    def debug(self):
1013
963
        # debug a frame up.
1014
964
        import pdb
1015
 
        # The sys preserved stdin/stdout should allow blackbox tests debugging
1016
 
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1017
 
                ).set_trace(sys._getframe().f_back)
 
965
        pdb.Pdb().set_trace(sys._getframe().f_back)
1018
966
 
1019
967
    def discardDetail(self, name):
1020
968
        """Extend the addDetail, getDetails api so we can remove a detail.
1032
980
        if name in details:
1033
981
            del details[name]
1034
982
 
1035
 
    def install_counter_hook(self, hooks, name, counter_name=None):
1036
 
        """Install a counting hook.
1037
 
 
1038
 
        Any hook can be counted as long as it doesn't need to return a value.
1039
 
 
1040
 
        :param hooks: Where the hook should be installed.
1041
 
 
1042
 
        :param name: The hook name that will be counted.
1043
 
 
1044
 
        :param counter_name: The counter identifier in ``_counters``, defaults
1045
 
            to ``name``.
1046
 
        """
1047
 
        _counters = self._counters # Avoid closing over self
1048
 
        if counter_name is None:
1049
 
            counter_name = name
1050
 
        if _counters.has_key(counter_name):
1051
 
            raise AssertionError('%s is already used as a counter name'
1052
 
                                  % (counter_name,))
1053
 
        _counters[counter_name] = 0
1054
 
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1055
 
            lambda: ['%d' % (_counters[counter_name],)]))
1056
 
        def increment_counter(*args, **kwargs):
1057
 
            _counters[counter_name] += 1
1058
 
        label = 'count %s calls' % (counter_name,)
1059
 
        hooks.install_named_hook(name, increment_counter, label)
1060
 
        self.addCleanup(hooks.uninstall_named_hook, name, label)
1061
 
 
1062
 
    def _install_config_stats_hooks(self):
1063
 
        """Install config hooks to count hook calls.
1064
 
 
1065
 
        """
1066
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1067
 
            self.install_counter_hook(config.ConfigHooks, hook_name,
1068
 
                                       'config.%s' % (hook_name,))
1069
 
 
1070
 
        # The OldConfigHooks are private and need special handling to protect
1071
 
        # against recursive tests (tests that run other tests), so we just do
1072
 
        # manually what registering them into _builtin_known_hooks will provide
1073
 
        # us.
1074
 
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1075
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1076
 
            self.install_counter_hook(config.OldConfigHooks, hook_name,
1077
 
                                      'old_config.%s' % (hook_name,))
1078
 
 
1079
983
    def _clear_debug_flags(self):
1080
984
        """Prevent externally set debug flags affecting tests.
1081
985
 
1096
1000
        for key, (parent, name) in known_hooks.iter_parent_objects():
1097
1001
            current_hooks = getattr(parent, name)
1098
1002
            self._preserved_hooks[parent] = (name, current_hooks)
1099
 
        self._preserved_lazy_hooks = hooks._lazy_hooks
1100
 
        hooks._lazy_hooks = {}
1101
1003
        self.addCleanup(self._restoreHooks)
1102
1004
        for key, (parent, name) in known_hooks.iter_parent_objects():
1103
1005
            factory = known_hooks.get(key)
1135
1037
        # break some locks on purpose and should be taken into account by
1136
1038
        # considering that breaking a lock is just a dirty way of releasing it.
1137
1039
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1138
 
            message = (
1139
 
                'Different number of acquired and '
1140
 
                'released or broken locks.\n'
1141
 
                'acquired=%s\n'
1142
 
                'released=%s\n'
1143
 
                'broken=%s\n' %
1144
 
                (acquired_locks, released_locks, broken_locks))
 
1040
            message = ('Different number of acquired and '
 
1041
                       'released or broken locks. (%s, %s + %s)' %
 
1042
                       (acquired_locks, released_locks, broken_locks))
1145
1043
            if not self._lock_check_thorough:
1146
1044
                # Rather than fail, just warn
1147
1045
                print "Broken test %s: %s" % (self, message)
1175
1073
 
1176
1074
    def permit_dir(self, name):
1177
1075
        """Permit a directory to be used by this test. See permit_url."""
1178
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1076
        name_transport = _mod_transport.get_transport(name)
1179
1077
        self.permit_url(name)
1180
1078
        self.permit_url(name_transport.base)
1181
1079
 
1260
1158
        self.addCleanup(transport_server.stop_server)
1261
1159
        # Obtain a real transport because if the server supplies a password, it
1262
1160
        # will be hidden from the base on the client side.
1263
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1161
        t = _mod_transport.get_transport(transport_server.get_url())
1264
1162
        # Some transport servers effectively chroot the backing transport;
1265
1163
        # others like SFTPServer don't - users of the transport can walk up the
1266
1164
        # transport to read the entire backing transport. This wouldn't matter
1367
1265
                         'st_mtime did not match')
1368
1266
        self.assertEqual(expected.st_ctime, actual.st_ctime,
1369
1267
                         'st_ctime did not match')
1370
 
        if sys.platform == 'win32':
 
1268
        if sys.platform != 'win32':
1371
1269
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1372
1270
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1373
 
            # odd. We just force it to always be 0 to avoid any problems.
1374
 
            self.assertEqual(0, expected.st_dev)
1375
 
            self.assertEqual(0, actual.st_dev)
1376
 
            self.assertEqual(0, expected.st_ino)
1377
 
            self.assertEqual(0, actual.st_ino)
1378
 
        else:
 
1271
            # odd. Regardless we shouldn't actually try to assert anything
 
1272
            # about their values
1379
1273
            self.assertEqual(expected.st_dev, actual.st_dev,
1380
1274
                             'st_dev did not match')
1381
1275
            self.assertEqual(expected.st_ino, actual.st_ino,
1390
1284
                length, len(obj_with_len), obj_with_len))
1391
1285
 
1392
1286
    def assertLogsError(self, exception_class, func, *args, **kwargs):
1393
 
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1287
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
1394
1288
        """
1395
1289
        captured = []
1396
1290
        orig_log_exception_quietly = trace.log_exception_quietly
1397
1291
        try:
1398
1292
            def capture():
1399
1293
                orig_log_exception_quietly()
1400
 
                captured.append(sys.exc_info()[1])
 
1294
                captured.append(sys.exc_info())
1401
1295
            trace.log_exception_quietly = capture
1402
1296
            func(*args, **kwargs)
1403
1297
        finally:
1404
1298
            trace.log_exception_quietly = orig_log_exception_quietly
1405
1299
        self.assertLength(1, captured)
1406
 
        err = captured[0]
 
1300
        err = captured[0][1]
1407
1301
        self.assertIsInstance(err, exception_class)
1408
1302
        return err
1409
1303
 
1544
1438
 
1545
1439
    def assertFileEqual(self, content, path):
1546
1440
        """Fail if path does not contain 'content'."""
1547
 
        self.assertPathExists(path)
 
1441
        self.failUnlessExists(path)
1548
1442
        f = file(path, 'rb')
1549
1443
        try:
1550
1444
            s = f.read()
1560
1454
        else:
1561
1455
            self.assertEqual(expected_docstring, obj.__doc__)
1562
1456
 
1563
 
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1564
1457
    def failUnlessExists(self, path):
1565
 
        return self.assertPathExists(path)
1566
 
 
1567
 
    def assertPathExists(self, path):
1568
1458
        """Fail unless path or paths, which may be abs or relative, exist."""
1569
1459
        if not isinstance(path, basestring):
1570
1460
            for p in path:
1571
 
                self.assertPathExists(p)
 
1461
                self.failUnlessExists(p)
1572
1462
        else:
1573
 
            self.assertTrue(osutils.lexists(path),
1574
 
                path + " does not exist")
 
1463
            self.failUnless(osutils.lexists(path),path+" does not exist")
1575
1464
 
1576
 
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
1577
1465
    def failIfExists(self, path):
1578
 
        return self.assertPathDoesNotExist(path)
1579
 
 
1580
 
    def assertPathDoesNotExist(self, path):
1581
1466
        """Fail if path or paths, which may be abs or relative, exist."""
1582
1467
        if not isinstance(path, basestring):
1583
1468
            for p in path:
1584
 
                self.assertPathDoesNotExist(p)
 
1469
                self.failIfExists(p)
1585
1470
        else:
1586
 
            self.assertFalse(osutils.lexists(path),
1587
 
                path + " exists")
 
1471
            self.failIf(osutils.lexists(path),path+" exists")
1588
1472
 
1589
1473
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1590
1474
        """A helper for callDeprecated and applyDeprecated.
1616
1500
        not other callers that go direct to the warning module.
1617
1501
 
1618
1502
        To test that a deprecated method raises an error, do something like
1619
 
        this (remember that both assertRaises and applyDeprecated delays *args
1620
 
        and **kwargs passing)::
 
1503
        this::
1621
1504
 
1622
1505
            self.assertRaises(errors.ReservedId,
1623
1506
                self.applyDeprecated,
1705
1588
 
1706
1589
        The file is removed as the test is torn down.
1707
1590
        """
1708
 
        pseudo_log_file = StringIO()
1709
 
        def _get_log_contents_for_weird_testtools_api():
1710
 
            return [pseudo_log_file.getvalue().decode(
1711
 
                "utf-8", "replace").encode("utf-8")]
1712
 
        self.addDetail("log", content.Content(content.ContentType("text",
1713
 
            "plain", {"charset": "utf8"}),
1714
 
            _get_log_contents_for_weird_testtools_api))
1715
 
        self._log_file = pseudo_log_file
 
1591
        self._log_file = StringIO()
1716
1592
        self._log_memento = trace.push_log_file(self._log_file)
1717
1593
        self.addCleanup(self._finishLogFile)
1718
1594
 
1719
1595
    def _finishLogFile(self):
1720
1596
        """Finished with the log file.
1721
1597
 
1722
 
        Close the file and delete it.
 
1598
        Close the file and delete it, unless setKeepLogfile was called.
1723
1599
        """
1724
1600
        if trace._trace_file:
1725
1601
            # flush the log file, to get all content
1726
1602
            trace._trace_file.flush()
1727
1603
        trace.pop_log_file(self._log_memento)
 
1604
        # Cache the log result and delete the file on disk
 
1605
        self._get_log(False)
1728
1606
 
1729
1607
    def thisFailsStrictLockCheck(self):
1730
1608
        """It is known that this test would fail with -Dstrict_locks.
1742
1620
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1743
1621
        """Overrides an object attribute restoring it after the test.
1744
1622
 
1745
 
        :note: This should be used with discretion; you should think about
1746
 
        whether it's better to make the code testable without monkey-patching.
1747
 
 
1748
1623
        :param obj: The object that will be mutated.
1749
1624
 
1750
1625
        :param attr_name: The attribute name we want to preserve/override in
1775
1650
        self.addCleanup(osutils.set_or_unset_env, name, value)
1776
1651
        return value
1777
1652
 
1778
 
    def recordCalls(self, obj, attr_name):
1779
 
        """Monkeypatch in a wrapper that will record calls.
1780
 
 
1781
 
        The monkeypatch is automatically removed when the test concludes.
1782
 
 
1783
 
        :param obj: The namespace holding the reference to be replaced;
1784
 
            typically a module, class, or object.
1785
 
        :param attr_name: A string for the name of the attribute to 
1786
 
            patch.
1787
 
        :returns: A list that will be extended with one item every time the
1788
 
            function is called, with a tuple of (args, kwargs).
1789
 
        """
1790
 
        calls = []
1791
 
 
1792
 
        def decorator(*args, **kwargs):
1793
 
            calls.append((args, kwargs))
1794
 
            return orig(*args, **kwargs)
1795
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1796
 
        return calls
1797
 
 
1798
1653
    def _cleanEnvironment(self):
1799
1654
        for name, value in isolated_environ.iteritems():
1800
1655
            self.overrideEnv(name, value)
1802
1657
    def _restoreHooks(self):
1803
1658
        for klass, (name, hooks) in self._preserved_hooks.items():
1804
1659
            setattr(klass, name, hooks)
1805
 
        self._preserved_hooks.clear()
1806
 
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
1807
 
        self._preserved_lazy_hooks.clear()
1808
1660
 
1809
1661
    def knownFailure(self, reason):
1810
 
        """Declare that this test fails for a known reason
1811
 
 
1812
 
        Tests that are known to fail should generally be using expectedFailure
1813
 
        with an appropriate reverse assertion if a change could cause the test
1814
 
        to start passing. Conversely if the test has no immediate prospect of
1815
 
        succeeding then using skip is more suitable.
1816
 
 
1817
 
        When this method is called while an exception is being handled, that
1818
 
        traceback will be used, otherwise a new exception will be thrown to
1819
 
        provide one but won't be reported.
1820
 
        """
1821
 
        self._add_reason(reason)
1822
 
        try:
1823
 
            exc_info = sys.exc_info()
1824
 
            if exc_info != (None, None, None):
1825
 
                self._report_traceback(exc_info)
1826
 
            else:
1827
 
                try:
1828
 
                    raise self.failureException(reason)
1829
 
                except self.failureException:
1830
 
                    exc_info = sys.exc_info()
1831
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1832
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1833
 
        finally:
1834
 
            del exc_info
 
1662
        """This test has failed for some known reason."""
 
1663
        raise KnownFailure(reason)
1835
1664
 
1836
1665
    def _suppress_log(self):
1837
1666
        """Remove the log info from details."""
1925
1754
    def log(self, *args):
1926
1755
        trace.mutter(*args)
1927
1756
 
 
1757
    def _get_log(self, keep_log_file=False):
 
1758
        """Internal helper to get the log from bzrlib.trace for this test.
 
1759
 
 
1760
        Please use self.getDetails, or self.get_log to access this in test case
 
1761
        code.
 
1762
 
 
1763
        :param keep_log_file: When True, if the log is still a file on disk
 
1764
            leave it as a file on disk. When False, if the log is still a file
 
1765
            on disk, the log file is deleted and the log preserved as
 
1766
            self._log_contents.
 
1767
        :return: A string containing the log.
 
1768
        """
 
1769
        if self._log_contents is not None:
 
1770
            try:
 
1771
                self._log_contents.decode('utf8')
 
1772
            except UnicodeDecodeError:
 
1773
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1774
                self._log_contents = unicodestr.encode('utf8')
 
1775
            return self._log_contents
 
1776
        if self._log_file is not None:
 
1777
            log_contents = self._log_file.getvalue()
 
1778
            try:
 
1779
                log_contents.decode('utf8')
 
1780
            except UnicodeDecodeError:
 
1781
                unicodestr = log_contents.decode('utf8', 'replace')
 
1782
                log_contents = unicodestr.encode('utf8')
 
1783
            if not keep_log_file:
 
1784
                self._log_file = None
 
1785
                # Permit multiple calls to get_log until we clean it up in
 
1786
                # finishLogFile
 
1787
                self._log_contents = log_contents
 
1788
            return log_contents
 
1789
        else:
 
1790
            return "No log file content."
 
1791
 
1928
1792
    def get_log(self):
1929
1793
        """Get a unicode string containing the log from bzrlib.trace.
1930
1794
 
2130
1994
    def start_bzr_subprocess(self, process_args, env_changes=None,
2131
1995
                             skip_if_plan_to_signal=False,
2132
1996
                             working_dir=None,
2133
 
                             allow_plugins=False, stderr=subprocess.PIPE):
 
1997
                             allow_plugins=False):
2134
1998
        """Start bzr in a subprocess for testing.
2135
1999
 
2136
2000
        This starts a new Python interpreter and runs bzr in there.
2148
2012
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2149
2013
            doesn't support signalling subprocesses.
2150
2014
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
2151
 
        :param stderr: file to use for the subprocess's stderr.  Valid values
2152
 
            are those valid for the stderr argument of `subprocess.Popen`.
2153
 
            Default value is ``subprocess.PIPE``.
2154
2015
 
2155
2016
        :returns: Popen object for the started process.
2156
2017
        """
2182
2043
            # so we will avoid using it on all platforms, just to
2183
2044
            # make sure the code path is used, and we don't break on win32
2184
2045
            cleanup_environment()
2185
 
            # Include the subprocess's log file in the test details, in case
2186
 
            # the test fails due to an error in the subprocess.
2187
 
            self._add_subprocess_log(trace._get_bzr_log_filename())
2188
2046
            command = [sys.executable]
2189
2047
            # frozen executables don't need the path to bzr
2190
2048
            if getattr(sys, "frozen", None) is None:
2194
2052
            command.extend(process_args)
2195
2053
            process = self._popen(command, stdin=subprocess.PIPE,
2196
2054
                                  stdout=subprocess.PIPE,
2197
 
                                  stderr=stderr)
 
2055
                                  stderr=subprocess.PIPE)
2198
2056
        finally:
2199
2057
            restore_environment()
2200
2058
            if cwd is not None:
2202
2060
 
2203
2061
        return process
2204
2062
 
2205
 
    def _add_subprocess_log(self, log_file_path):
2206
 
        if len(self._log_files) == 0:
2207
 
            # Register an addCleanup func.  We do this on the first call to
2208
 
            # _add_subprocess_log rather than in TestCase.setUp so that this
2209
 
            # addCleanup is registered after any cleanups for tempdirs that
2210
 
            # subclasses might create, which will probably remove the log file
2211
 
            # we want to read.
2212
 
            self.addCleanup(self._subprocess_log_cleanup)
2213
 
        # self._log_files is a set, so if a log file is reused we won't grab it
2214
 
        # twice.
2215
 
        self._log_files.add(log_file_path)
2216
 
 
2217
 
    def _subprocess_log_cleanup(self):
2218
 
        for count, log_file_path in enumerate(self._log_files):
2219
 
            # We use buffer_now=True to avoid holding the file open beyond
2220
 
            # the life of this function, which might interfere with e.g.
2221
 
            # cleaning tempdirs on Windows.
2222
 
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2223
 
            #detail_content = content.content_from_file(
2224
 
            #    log_file_path, buffer_now=True)
2225
 
            with open(log_file_path, 'rb') as log_file:
2226
 
                log_file_bytes = log_file.read()
2227
 
            detail_content = content.Content(content.ContentType("text",
2228
 
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
2229
 
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2230
 
                detail_content)
2231
 
 
2232
2063
    def _popen(self, *args, **kwargs):
2233
2064
        """Place a call to Popen.
2234
2065
 
2277
2108
                      % (process_args, retcode, process.returncode))
2278
2109
        return [out, err]
2279
2110
 
2280
 
    def check_tree_shape(self, tree, shape):
2281
 
        """Compare a tree to a list of expected names.
 
2111
    def check_inventory_shape(self, inv, shape):
 
2112
        """Compare an inventory to a list of expected names.
2282
2113
 
2283
2114
        Fail if they are not precisely equal.
2284
2115
        """
2285
2116
        extras = []
2286
2117
        shape = list(shape)             # copy
2287
 
        for path, ie in tree.iter_entries_by_dir():
 
2118
        for path, ie in inv.entries():
2288
2119
            name = path.replace('\\', '/')
2289
2120
            if ie.kind == 'directory':
2290
2121
                name = name + '/'
2291
 
            if name == "/":
2292
 
                pass # ignore root entry
2293
 
            elif name in shape:
 
2122
            if name in shape:
2294
2123
                shape.remove(name)
2295
2124
            else:
2296
2125
                extras.append(name)
2386
2215
class TestCaseWithMemoryTransport(TestCase):
2387
2216
    """Common test class for tests that do not need disk resources.
2388
2217
 
2389
 
    Tests that need disk resources should derive from TestCaseInTempDir
2390
 
    orTestCaseWithTransport.
 
2218
    Tests that need disk resources should derive from TestCaseWithTransport.
2391
2219
 
2392
2220
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2393
2221
 
2394
 
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2222
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2395
2223
    a directory which does not exist. This serves to help ensure test isolation
2396
 
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2397
 
    must exist. However, TestCaseWithMemoryTransport does not offer local file
2398
 
    defaults for the transport in tests, nor does it obey the command line
 
2224
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2225
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2226
    file defaults for the transport in tests, nor does it obey the command line
2399
2227
    override, so tests that accidentally write to the common directory should
2400
2228
    be rare.
2401
2229
 
2402
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
2403
 
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2230
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2231
    a .bzr directory that stops us ascending higher into the filesystem.
2404
2232
    """
2405
2233
 
2406
2234
    TEST_ROOT = None
2424
2252
 
2425
2253
        :param relpath: a path relative to the base url.
2426
2254
        """
2427
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2255
        t = _mod_transport.get_transport(self.get_url(relpath))
2428
2256
        self.assertFalse(t.is_readonly())
2429
2257
        return t
2430
2258
 
2436
2264
 
2437
2265
        :param relpath: a path relative to the base url.
2438
2266
        """
2439
 
        t = _mod_transport.get_transport_from_url(
2440
 
            self.get_readonly_url(relpath))
 
2267
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2441
2268
        self.assertTrue(t.is_readonly())
2442
2269
        return t
2443
2270
 
2564
2391
        real branch.
2565
2392
        """
2566
2393
        root = TestCaseWithMemoryTransport.TEST_ROOT
2567
 
        try:
2568
 
            # Make sure we get a readable and accessible home for .bzr.log
2569
 
            # and/or config files, and not fallback to weird defaults (see
2570
 
            # http://pad.lv/825027).
2571
 
            self.assertIs(None, os.environ.get('BZR_HOME', None))
2572
 
            os.environ['BZR_HOME'] = root
2573
 
            wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2574
 
            del os.environ['BZR_HOME']
2575
 
        except Exception, e:
2576
 
            self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
2577
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2578
 
        # we don't need to re-open the wt to check it hasn't changed.
2579
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2580
 
            wt.control_transport.get_bytes('dirstate'))
 
2394
        bzrdir.BzrDir.create_standalone_workingtree(root)
2581
2395
 
2582
2396
    def _check_safety_net(self):
2583
2397
        """Check that the safety .bzr directory have not been touched.
2586
2400
        propagating. This method ensures than a test did not leaked.
2587
2401
        """
2588
2402
        root = TestCaseWithMemoryTransport.TEST_ROOT
2589
 
        t = _mod_transport.get_transport_from_path(root)
2590
 
        self.permit_url(t.base)
2591
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2592
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2403
        self.permit_url(_mod_transport.get_transport(root).base)
 
2404
        wt = workingtree.WorkingTree.open(root)
 
2405
        last_rev = wt.last_revision()
 
2406
        if last_rev != 'null:':
2593
2407
            # The current test have modified the /bzr directory, we need to
2594
2408
            # recreate a new one or all the followng tests will fail.
2595
2409
            # If you need to inspect its content uncomment the following line
2630
2444
    def make_branch(self, relpath, format=None):
2631
2445
        """Create a branch on the transport at relpath."""
2632
2446
        repo = self.make_repository(relpath, format=format)
2633
 
        return repo.bzrdir.create_branch(append_revisions_only=False)
2634
 
 
2635
 
    def resolve_format(self, format):
2636
 
        """Resolve an object to a ControlDir format object.
2637
 
 
2638
 
        The initial format object can either already be
2639
 
        a ControlDirFormat, None (for the default format),
2640
 
        or a string with the name of the control dir format.
2641
 
 
2642
 
        :param format: Object to resolve
2643
 
        :return A ControlDirFormat instance
2644
 
        """
2645
 
        if format is None:
2646
 
            format = 'default'
2647
 
        if isinstance(format, basestring):
2648
 
            format = bzrdir.format_registry.make_bzrdir(format)
2649
 
        return format
2650
 
 
2651
 
    def resolve_format(self, format):
2652
 
        """Resolve an object to a ControlDir format object.
2653
 
 
2654
 
        The initial format object can either already be
2655
 
        a ControlDirFormat, None (for the default format),
2656
 
        or a string with the name of the control dir format.
2657
 
 
2658
 
        :param format: Object to resolve
2659
 
        :return A ControlDirFormat instance
2660
 
        """
2661
 
        if format is None:
2662
 
            format = 'default'
2663
 
        if isinstance(format, basestring):
2664
 
            format = bzrdir.format_registry.make_bzrdir(format)
2665
 
        return format
 
2447
        return repo.bzrdir.create_branch()
2666
2448
 
2667
2449
    def make_bzrdir(self, relpath, format=None):
2668
2450
        try:
2672
2454
            t = _mod_transport.get_transport(maybe_a_url)
2673
2455
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2674
2456
                t.ensure_base()
2675
 
            format = self.resolve_format(format)
 
2457
            if format is None:
 
2458
                format = 'default'
 
2459
            if isinstance(format, basestring):
 
2460
                format = bzrdir.format_registry.make_bzrdir(format)
2676
2461
            return format.initialize_on_transport(t)
2677
2462
        except errors.UninitializableFormat:
2678
2463
            raise TestSkipped("Format %s is not initializable." % format)
2679
2464
 
2680
 
    def make_repository(self, relpath, shared=None, format=None):
 
2465
    def make_repository(self, relpath, shared=False, format=None):
2681
2466
        """Create a repository on our default transport at relpath.
2682
2467
 
2683
2468
        Note that relpath must be a relative path, not a full url.
2694
2479
            backing_server = self.get_server()
2695
2480
        smart_server = test_server.SmartTCPServer_for_testing()
2696
2481
        self.start_server(smart_server, backing_server)
2697
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2482
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2698
2483
                                                   ).clone(path)
2699
2484
        return remote_transport
2700
2485
 
2717
2502
    def setUp(self):
2718
2503
        super(TestCaseWithMemoryTransport, self).setUp()
2719
2504
        # Ensure that ConnectedTransport doesn't leak sockets
2720
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2721
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2505
        def get_transport_with_cleanup(*args, **kwargs):
 
2506
            t = orig_get_transport(*args, **kwargs)
2722
2507
            if isinstance(t, _mod_transport.ConnectedTransport):
2723
2508
                self.addCleanup(t.disconnect)
2724
2509
            return t
2725
2510
 
2726
 
        orig_get_transport_from_url = self.overrideAttr(
2727
 
            _mod_transport, 'get_transport_from_url',
2728
 
            get_transport_from_url_with_cleanup)
 
2511
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2512
                                               get_transport_with_cleanup)
2729
2513
        self._make_test_root()
2730
2514
        self.addCleanup(os.chdir, os.getcwdu())
2731
2515
        self.makeAndChdirToTestDir()
2774
2558
 
2775
2559
    OVERRIDE_PYTHON = 'python'
2776
2560
 
2777
 
    def setUp(self):
2778
 
        super(TestCaseInTempDir, self).setUp()
2779
 
        # Remove the protection set in isolated_environ, we have a proper
2780
 
        # access to disk resources now.
2781
 
        self.overrideEnv('BZR_LOG', None)
2782
 
 
2783
2561
    def check_file_contents(self, filename, expect):
2784
2562
        self.log("check contents of file %s" % filename)
2785
2563
        f = file(filename)
2866
2644
                "a list or a tuple. Got %r instead" % (shape,))
2867
2645
        # It's OK to just create them using forward slashes on windows.
2868
2646
        if transport is None or transport.is_readonly():
2869
 
            transport = _mod_transport.get_transport_from_path(".")
 
2647
            transport = _mod_transport.get_transport(".")
2870
2648
        for name in shape:
2871
2649
            self.assertIsInstance(name, basestring)
2872
2650
            if name[-1] == '/':
3676
3454
#                           with proper exclusion rules.
3677
3455
#   -Ethreads               Will display thread ident at creation/join time to
3678
3456
#                           help track thread leaks
3679
 
 
3680
 
#   -Econfig_stats          Will collect statistics using addDetail
3681
3457
selftest_debug_flags = set()
3682
3458
 
3683
3459
 
3935
3711
        'bzrlib.tests.per_repository',
3936
3712
        'bzrlib.tests.per_repository_chk',
3937
3713
        'bzrlib.tests.per_repository_reference',
3938
 
        'bzrlib.tests.per_repository_vf',
3939
3714
        'bzrlib.tests.per_uifactory',
3940
3715
        'bzrlib.tests.per_versionedfile',
3941
3716
        'bzrlib.tests.per_workingtree',
3981
3756
        'bzrlib.tests.test_decorators',
3982
3757
        'bzrlib.tests.test_delta',
3983
3758
        'bzrlib.tests.test_debug',
 
3759
        'bzrlib.tests.test_deprecated_graph',
3984
3760
        'bzrlib.tests.test_diff',
3985
3761
        'bzrlib.tests.test_directory_service',
3986
3762
        'bzrlib.tests.test_dirstate',
3987
3763
        'bzrlib.tests.test_email_message',
3988
3764
        'bzrlib.tests.test_eol_filters',
3989
3765
        'bzrlib.tests.test_errors',
3990
 
        'bzrlib.tests.test_estimate_compressed_size',
3991
3766
        'bzrlib.tests.test_export',
3992
 
        'bzrlib.tests.test_export_pot',
3993
3767
        'bzrlib.tests.test_extract',
3994
 
        'bzrlib.tests.test_features',
3995
3768
        'bzrlib.tests.test_fetch',
3996
3769
        'bzrlib.tests.test_fixtures',
3997
3770
        'bzrlib.tests.test_fifo_cache',
3998
3771
        'bzrlib.tests.test_filters',
3999
 
        'bzrlib.tests.test_filter_tree',
4000
3772
        'bzrlib.tests.test_ftp_transport',
4001
3773
        'bzrlib.tests.test_foreign',
4002
3774
        'bzrlib.tests.test_generate_docs',
4011
3783
        'bzrlib.tests.test_http',
4012
3784
        'bzrlib.tests.test_http_response',
4013
3785
        'bzrlib.tests.test_https_ca_bundle',
4014
 
        'bzrlib.tests.test_i18n',
4015
3786
        'bzrlib.tests.test_identitymap',
4016
3787
        'bzrlib.tests.test_ignores',
4017
3788
        'bzrlib.tests.test_index',
4077
3848
        'bzrlib.tests.test_smart',
4078
3849
        'bzrlib.tests.test_smart_add',
4079
3850
        'bzrlib.tests.test_smart_request',
4080
 
        'bzrlib.tests.test_smart_signals',
4081
3851
        'bzrlib.tests.test_smart_transport',
4082
3852
        'bzrlib.tests.test_smtp_connection',
4083
3853
        'bzrlib.tests.test_source',
4110
3880
        'bzrlib.tests.test_upgrade',
4111
3881
        'bzrlib.tests.test_upgrade_stacked',
4112
3882
        'bzrlib.tests.test_urlutils',
4113
 
        'bzrlib.tests.test_utextwrap',
4114
3883
        'bzrlib.tests.test_version',
4115
3884
        'bzrlib.tests.test_version_info',
4116
3885
        'bzrlib.tests.test_versionedfile',
4133
3902
        'bzrlib',
4134
3903
        'bzrlib.branchbuilder',
4135
3904
        'bzrlib.decorators',
 
3905
        'bzrlib.export',
4136
3906
        'bzrlib.inventory',
4137
3907
        'bzrlib.iterablefile',
4138
3908
        'bzrlib.lockdir',
4394
4164
        the module is available.
4395
4165
    """
4396
4166
 
4397
 
    from bzrlib.tests.features import ModuleAvailableFeature
4398
4167
    py_module = pyutils.get_named_object(py_module_name)
4399
4168
    scenarios = [
4400
4169
        ('python', {'module': py_module}),
4441
4210
                         % (os.path.basename(dirname), printable_e))
4442
4211
 
4443
4212
 
 
4213
class Feature(object):
 
4214
    """An operating system Feature."""
 
4215
 
 
4216
    def __init__(self):
 
4217
        self._available = None
 
4218
 
 
4219
    def available(self):
 
4220
        """Is the feature available?
 
4221
 
 
4222
        :return: True if the feature is available.
 
4223
        """
 
4224
        if self._available is None:
 
4225
            self._available = self._probe()
 
4226
        return self._available
 
4227
 
 
4228
    def _probe(self):
 
4229
        """Implement this method in concrete features.
 
4230
 
 
4231
        :return: True if the feature is available.
 
4232
        """
 
4233
        raise NotImplementedError
 
4234
 
 
4235
    def __str__(self):
 
4236
        if getattr(self, 'feature_name', None):
 
4237
            return self.feature_name()
 
4238
        return self.__class__.__name__
 
4239
 
 
4240
 
 
4241
class _SymlinkFeature(Feature):
 
4242
 
 
4243
    def _probe(self):
 
4244
        return osutils.has_symlinks()
 
4245
 
 
4246
    def feature_name(self):
 
4247
        return 'symlinks'
 
4248
 
 
4249
SymlinkFeature = _SymlinkFeature()
 
4250
 
 
4251
 
 
4252
class _HardlinkFeature(Feature):
 
4253
 
 
4254
    def _probe(self):
 
4255
        return osutils.has_hardlinks()
 
4256
 
 
4257
    def feature_name(self):
 
4258
        return 'hardlinks'
 
4259
 
 
4260
HardlinkFeature = _HardlinkFeature()
 
4261
 
 
4262
 
 
4263
class _OsFifoFeature(Feature):
 
4264
 
 
4265
    def _probe(self):
 
4266
        return getattr(os, 'mkfifo', None)
 
4267
 
 
4268
    def feature_name(self):
 
4269
        return 'filesystem fifos'
 
4270
 
 
4271
OsFifoFeature = _OsFifoFeature()
 
4272
 
 
4273
 
 
4274
class _UnicodeFilenameFeature(Feature):
 
4275
    """Does the filesystem support Unicode filenames?"""
 
4276
 
 
4277
    def _probe(self):
 
4278
        try:
 
4279
            # Check for character combinations unlikely to be covered by any
 
4280
            # single non-unicode encoding. We use the characters
 
4281
            # - greek small letter alpha (U+03B1) and
 
4282
            # - braille pattern dots-123456 (U+283F).
 
4283
            os.stat(u'\u03b1\u283f')
 
4284
        except UnicodeEncodeError:
 
4285
            return False
 
4286
        except (IOError, OSError):
 
4287
            # The filesystem allows the Unicode filename but the file doesn't
 
4288
            # exist.
 
4289
            return True
 
4290
        else:
 
4291
            # The filesystem allows the Unicode filename and the file exists,
 
4292
            # for some reason.
 
4293
            return True
 
4294
 
 
4295
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4296
 
 
4297
 
 
4298
class _CompatabilityThunkFeature(Feature):
 
4299
    """This feature is just a thunk to another feature.
 
4300
 
 
4301
    It issues a deprecation warning if it is accessed, to let you know that you
 
4302
    should really use a different feature.
 
4303
    """
 
4304
 
 
4305
    def __init__(self, dep_version, module, name,
 
4306
                 replacement_name, replacement_module=None):
 
4307
        super(_CompatabilityThunkFeature, self).__init__()
 
4308
        self._module = module
 
4309
        if replacement_module is None:
 
4310
            replacement_module = module
 
4311
        self._replacement_module = replacement_module
 
4312
        self._name = name
 
4313
        self._replacement_name = replacement_name
 
4314
        self._dep_version = dep_version
 
4315
        self._feature = None
 
4316
 
 
4317
    def _ensure(self):
 
4318
        if self._feature is None:
 
4319
            depr_msg = self._dep_version % ('%s.%s'
 
4320
                                            % (self._module, self._name))
 
4321
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4322
                                               self._replacement_name)
 
4323
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4324
            # Import the new feature and use it as a replacement for the
 
4325
            # deprecated one.
 
4326
            self._feature = pyutils.get_named_object(
 
4327
                self._replacement_module, self._replacement_name)
 
4328
 
 
4329
    def _probe(self):
 
4330
        self._ensure()
 
4331
        return self._feature._probe()
 
4332
 
 
4333
 
 
4334
class ModuleAvailableFeature(Feature):
 
4335
    """This is a feature than describes a module we want to be available.
 
4336
 
 
4337
    Declare the name of the module in __init__(), and then after probing, the
 
4338
    module will be available as 'self.module'.
 
4339
 
 
4340
    :ivar module: The module if it is available, else None.
 
4341
    """
 
4342
 
 
4343
    def __init__(self, module_name):
 
4344
        super(ModuleAvailableFeature, self).__init__()
 
4345
        self.module_name = module_name
 
4346
 
 
4347
    def _probe(self):
 
4348
        try:
 
4349
            self._module = __import__(self.module_name, {}, {}, [''])
 
4350
            return True
 
4351
        except ImportError:
 
4352
            return False
 
4353
 
 
4354
    @property
 
4355
    def module(self):
 
4356
        if self.available(): # Make sure the probe has been done
 
4357
            return self._module
 
4358
        return None
 
4359
 
 
4360
    def feature_name(self):
 
4361
        return self.module_name
 
4362
 
 
4363
 
4444
4364
def probe_unicode_in_user_encoding():
4445
4365
    """Try to encode several unicode strings to use in unicode-aware tests.
4446
4366
    Return first successfull match.
4474
4394
    return None
4475
4395
 
4476
4396
 
 
4397
class _HTTPSServerFeature(Feature):
 
4398
    """Some tests want an https Server, check if one is available.
 
4399
 
 
4400
    Right now, the only way this is available is under python2.6 which provides
 
4401
    an ssl module.
 
4402
    """
 
4403
 
 
4404
    def _probe(self):
 
4405
        try:
 
4406
            import ssl
 
4407
            return True
 
4408
        except ImportError:
 
4409
            return False
 
4410
 
 
4411
    def feature_name(self):
 
4412
        return 'HTTPSServer'
 
4413
 
 
4414
 
 
4415
HTTPSServerFeature = _HTTPSServerFeature()
 
4416
 
 
4417
 
 
4418
class _UnicodeFilename(Feature):
 
4419
    """Does the filesystem support Unicode filenames?"""
 
4420
 
 
4421
    def _probe(self):
 
4422
        try:
 
4423
            os.stat(u'\u03b1')
 
4424
        except UnicodeEncodeError:
 
4425
            return False
 
4426
        except (IOError, OSError):
 
4427
            # The filesystem allows the Unicode filename but the file doesn't
 
4428
            # exist.
 
4429
            return True
 
4430
        else:
 
4431
            # The filesystem allows the Unicode filename and the file exists,
 
4432
            # for some reason.
 
4433
            return True
 
4434
 
 
4435
UnicodeFilename = _UnicodeFilename()
 
4436
 
 
4437
 
 
4438
class _ByteStringNamedFilesystem(Feature):
 
4439
    """Is the filesystem based on bytes?"""
 
4440
 
 
4441
    def _probe(self):
 
4442
        if os.name == "posix":
 
4443
            return True
 
4444
        return False
 
4445
 
 
4446
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4447
 
 
4448
 
 
4449
class _UTF8Filesystem(Feature):
 
4450
    """Is the filesystem UTF-8?"""
 
4451
 
 
4452
    def _probe(self):
 
4453
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4454
            return True
 
4455
        return False
 
4456
 
 
4457
UTF8Filesystem = _UTF8Filesystem()
 
4458
 
 
4459
 
 
4460
class _BreakinFeature(Feature):
 
4461
    """Does this platform support the breakin feature?"""
 
4462
 
 
4463
    def _probe(self):
 
4464
        from bzrlib import breakin
 
4465
        if breakin.determine_signal() is None:
 
4466
            return False
 
4467
        if sys.platform == 'win32':
 
4468
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4469
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4470
            # access the function
 
4471
            try:
 
4472
                import ctypes
 
4473
            except OSError:
 
4474
                return False
 
4475
        return True
 
4476
 
 
4477
    def feature_name(self):
 
4478
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4479
 
 
4480
 
 
4481
BreakinFeature = _BreakinFeature()
 
4482
 
 
4483
 
 
4484
class _CaseInsCasePresFilenameFeature(Feature):
 
4485
    """Is the file-system case insensitive, but case-preserving?"""
 
4486
 
 
4487
    def _probe(self):
 
4488
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4489
        try:
 
4490
            # first check truly case-preserving for created files, then check
 
4491
            # case insensitive when opening existing files.
 
4492
            name = osutils.normpath(name)
 
4493
            base, rel = osutils.split(name)
 
4494
            found_rel = osutils.canonical_relpath(base, name)
 
4495
            return (found_rel == rel
 
4496
                    and os.path.isfile(name.upper())
 
4497
                    and os.path.isfile(name.lower()))
 
4498
        finally:
 
4499
            os.close(fileno)
 
4500
            os.remove(name)
 
4501
 
 
4502
    def feature_name(self):
 
4503
        return "case-insensitive case-preserving filesystem"
 
4504
 
 
4505
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4506
 
 
4507
 
 
4508
class _CaseInsensitiveFilesystemFeature(Feature):
 
4509
    """Check if underlying filesystem is case-insensitive but *not* case
 
4510
    preserving.
 
4511
    """
 
4512
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4513
    # more likely to be case preserving, so this case is rare.
 
4514
 
 
4515
    def _probe(self):
 
4516
        if CaseInsCasePresFilenameFeature.available():
 
4517
            return False
 
4518
 
 
4519
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4520
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4521
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4522
        else:
 
4523
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4524
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4525
            dir=root)
 
4526
        name_a = osutils.pathjoin(tdir, 'a')
 
4527
        name_A = osutils.pathjoin(tdir, 'A')
 
4528
        os.mkdir(name_a)
 
4529
        result = osutils.isdir(name_A)
 
4530
        _rmtree_temp_dir(tdir)
 
4531
        return result
 
4532
 
 
4533
    def feature_name(self):
 
4534
        return 'case-insensitive filesystem'
 
4535
 
 
4536
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4537
 
 
4538
 
 
4539
class _CaseSensitiveFilesystemFeature(Feature):
 
4540
 
 
4541
    def _probe(self):
 
4542
        if CaseInsCasePresFilenameFeature.available():
 
4543
            return False
 
4544
        elif CaseInsensitiveFilesystemFeature.available():
 
4545
            return False
 
4546
        else:
 
4547
            return True
 
4548
 
 
4549
    def feature_name(self):
 
4550
        return 'case-sensitive filesystem'
 
4551
 
 
4552
# new coding style is for feature instances to be lowercase
 
4553
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4554
 
 
4555
 
4477
4556
# Only define SubUnitBzrRunner if subunit is available.
4478
4557
try:
4479
4558
    from subunit import TestProtocolClient
4497
4576
except ImportError:
4498
4577
    pass
4499
4578
 
4500
 
 
4501
 
@deprecated_function(deprecated_in((2, 5, 0)))
4502
 
def ModuleAvailableFeature(name):
4503
 
    from bzrlib.tests import features
4504
 
    return features.ModuleAvailableFeature(name)
4505
 
    
 
4579
class _PosixPermissionsFeature(Feature):
 
4580
 
 
4581
    def _probe(self):
 
4582
        def has_perms():
 
4583
            # create temporary file and check if specified perms are maintained.
 
4584
            import tempfile
 
4585
 
 
4586
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4587
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4588
            fd, name = f
 
4589
            os.close(fd)
 
4590
            os.chmod(name, write_perms)
 
4591
 
 
4592
            read_perms = os.stat(name).st_mode & 0777
 
4593
            os.unlink(name)
 
4594
            return (write_perms == read_perms)
 
4595
 
 
4596
        return (os.name == 'posix') and has_perms()
 
4597
 
 
4598
    def feature_name(self):
 
4599
        return 'POSIX permissions support'
 
4600
 
 
4601
posix_permissions_feature = _PosixPermissionsFeature()