~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

Abbreviate pack_stat struct format to '>6L'

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
19
 
# TODO: Perhaps there should be an API to find out if bzr running under the
20
 
# test suite -- some plugins might want to avoid making intrusive changes if
21
 
# this is the case.  However, we want behaviour under to test to diverge as
22
 
# little as possible, so this should be used rarely if it's added at all.
23
 
# (Suggestion from j-a-meinel, 2005-11-24)
24
 
 
25
19
# NOTE: Some classes in here use camelCaseNaming() rather than
26
20
# underscore_naming().  That's for consistency with unittest; it's not the
27
21
# general style of bzrlib.  Please continue that consistency when adding e.g.
67
61
    chk_map,
68
62
    commands as _mod_commands,
69
63
    config,
 
64
    i18n,
70
65
    debug,
71
66
    errors,
72
67
    hooks,
94
89
    memory,
95
90
    pathfilter,
96
91
    )
 
92
from bzrlib.symbol_versioning import (
 
93
    deprecated_function,
 
94
    deprecated_in,
 
95
    )
97
96
from bzrlib.tests import (
98
97
    test_server,
99
98
    TestUtil,
142
141
    'BZREMAIL': None, # may still be present in the environment
143
142
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
143
    'BZR_PROGRESS_BAR': None,
145
 
    'BZR_LOG': None,
 
144
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
145
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
146
    # TestCase should not use disk resources, BZR_LOG is one.
 
147
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
146
148
    'BZR_PLUGIN_PATH': None,
147
149
    'BZR_DISABLE_PLUGINS': None,
148
150
    'BZR_PLUGINS_AT': None,
377
379
        if isinstance(test, TestCase):
378
380
            test.addCleanup(self._check_leaked_threads, test)
379
381
 
 
382
    def stopTest(self, test):
 
383
        super(ExtendedTestResult, self).stopTest(test)
 
384
        # Manually break cycles, means touching various private things but hey
 
385
        getDetails = getattr(test, "getDetails", None)
 
386
        if getDetails is not None:
 
387
            getDetails().clear()
 
388
        # Clear _type_equality_funcs to try to stop TestCase instances
 
389
        # from wasting memory. 'clear' is not available in all Python
 
390
        # versions (bug 809048)
 
391
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
392
        if type_equality_funcs is not None:
 
393
            tef_clear = getattr(type_equality_funcs, "clear", None)
 
394
            if tef_clear is None:
 
395
                tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
396
                if tef_instance_dict is not None:
 
397
                    tef_clear = tef_instance_dict.clear
 
398
            if tef_clear is not None:
 
399
                tef_clear()
 
400
        self._traceback_from_test = None
 
401
 
380
402
    def startTests(self):
381
403
        self.report_tests_starting()
382
404
        self._active_threads = threading.enumerate()
383
405
 
384
 
    def stopTest(self, test):
385
 
        self._traceback_from_test = None
386
 
 
387
406
    def _check_leaked_threads(self, test):
388
407
        """See if any threads have leaked since last call
389
408
 
449
468
        self.known_failure_count += 1
450
469
        self.report_known_failure(test, err)
451
470
 
 
471
    def addUnexpectedSuccess(self, test, details=None):
 
472
        """Tell result the test unexpectedly passed, counting as a failure
 
473
 
 
474
        When the minimum version of testtools required becomes 0.9.8 this
 
475
        can be updated to use the new handling there.
 
476
        """
 
477
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
478
        self.failure_count += 1
 
479
        self.report_unexpected_success(test,
 
480
            "".join(details["reason"].iter_text()))
 
481
        if self.stop_early:
 
482
            self.stop()
 
483
 
452
484
    def addNotSupported(self, test, feature):
453
485
        """The test will not be run because of a missing feature.
454
486
        """
613
645
    def report_known_failure(self, test, err):
614
646
        pass
615
647
 
 
648
    def report_unexpected_success(self, test, reason):
 
649
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
650
            self._test_description(test),
 
651
            "Unexpected success. Should have failed",
 
652
            reason,
 
653
            ))
 
654
 
616
655
    def report_skip(self, test, reason):
617
656
        pass
618
657
 
670
709
                % (self._testTimeString(test),
671
710
                   self._error_summary(err)))
672
711
 
 
712
    def report_unexpected_success(self, test, reason):
 
713
        self.stream.write(' FAIL %s\n%s: %s\n'
 
714
                % (self._testTimeString(test),
 
715
                   "Unexpected success. Should have failed",
 
716
                   reason))
 
717
 
673
718
    def report_success(self, test):
674
719
        self.stream.write('   OK %s\n' % self._testTimeString(test))
675
720
        for bench_called, stats in getattr(test, '_benchcalls', []):
894
939
 
895
940
    The method is really a factory and users are expected to use it as such.
896
941
    """
897
 
    
 
942
 
898
943
    kwargs['setUp'] = isolated_doctest_setUp
899
944
    kwargs['tearDown'] = isolated_doctest_tearDown
900
945
    return doctest.DocTestSuite(*args, **kwargs)
937
982
        super(TestCase, self).setUp()
938
983
        for feature in getattr(self, '_test_needs_features', []):
939
984
            self.requireFeature(feature)
940
 
        self._log_contents = None
941
 
        self.addDetail("log", content.Content(content.ContentType("text",
942
 
            "plain", {"charset": "utf8"}),
943
 
            lambda:[self._get_log(keep_log_file=True)]))
944
985
        self._cleanEnvironment()
945
986
        self._silenceUI()
946
987
        self._startLogFile()
958
999
        # settled on or a the FIXME associated with _get_expand_default_value
959
1000
        # is addressed -- vila 20110219
960
1001
        self.overrideAttr(config, '_expand_default_value', None)
 
1002
        self._log_files = set()
 
1003
        # Each key in the ``_counters`` dict holds a value for a different
 
1004
        # counter. When the test ends, addDetail() should be used to output the
 
1005
        # counter values. This happens in install_counter_hook().
 
1006
        self._counters = {}
 
1007
        if 'config_stats' in selftest_debug_flags:
 
1008
            self._install_config_stats_hooks()
 
1009
        # Do not use i18n for tests (unless the test reverses this)
 
1010
        i18n.disable_i18n()
961
1011
 
962
1012
    def debug(self):
963
1013
        # debug a frame up.
964
1014
        import pdb
965
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
1015
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1016
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1017
                ).set_trace(sys._getframe().f_back)
966
1018
 
967
1019
    def discardDetail(self, name):
968
1020
        """Extend the addDetail, getDetails api so we can remove a detail.
980
1032
        if name in details:
981
1033
            del details[name]
982
1034
 
 
1035
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1036
        """Install a counting hook.
 
1037
 
 
1038
        Any hook can be counted as long as it doesn't need to return a value.
 
1039
 
 
1040
        :param hooks: Where the hook should be installed.
 
1041
 
 
1042
        :param name: The hook name that will be counted.
 
1043
 
 
1044
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1045
            to ``name``.
 
1046
        """
 
1047
        _counters = self._counters # Avoid closing over self
 
1048
        if counter_name is None:
 
1049
            counter_name = name
 
1050
        if _counters.has_key(counter_name):
 
1051
            raise AssertionError('%s is already used as a counter name'
 
1052
                                  % (counter_name,))
 
1053
        _counters[counter_name] = 0
 
1054
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1055
            lambda: ['%d' % (_counters[counter_name],)]))
 
1056
        def increment_counter(*args, **kwargs):
 
1057
            _counters[counter_name] += 1
 
1058
        label = 'count %s calls' % (counter_name,)
 
1059
        hooks.install_named_hook(name, increment_counter, label)
 
1060
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1061
 
 
1062
    def _install_config_stats_hooks(self):
 
1063
        """Install config hooks to count hook calls.
 
1064
 
 
1065
        """
 
1066
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1067
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1068
                                       'config.%s' % (hook_name,))
 
1069
 
 
1070
        # The OldConfigHooks are private and need special handling to protect
 
1071
        # against recursive tests (tests that run other tests), so we just do
 
1072
        # manually what registering them into _builtin_known_hooks will provide
 
1073
        # us.
 
1074
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1075
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1076
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1077
                                      'old_config.%s' % (hook_name,))
 
1078
 
983
1079
    def _clear_debug_flags(self):
984
1080
        """Prevent externally set debug flags affecting tests.
985
1081
 
1039
1135
        # break some locks on purpose and should be taken into account by
1040
1136
        # considering that breaking a lock is just a dirty way of releasing it.
1041
1137
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1042
 
            message = ('Different number of acquired and '
1043
 
                       'released or broken locks. (%s, %s + %s)' %
1044
 
                       (acquired_locks, released_locks, broken_locks))
 
1138
            message = (
 
1139
                'Different number of acquired and '
 
1140
                'released or broken locks.\n'
 
1141
                'acquired=%s\n'
 
1142
                'released=%s\n'
 
1143
                'broken=%s\n' %
 
1144
                (acquired_locks, released_locks, broken_locks))
1045
1145
            if not self._lock_check_thorough:
1046
1146
                # Rather than fail, just warn
1047
1147
                print "Broken test %s: %s" % (self, message)
1075
1175
 
1076
1176
    def permit_dir(self, name):
1077
1177
        """Permit a directory to be used by this test. See permit_url."""
1078
 
        name_transport = _mod_transport.get_transport(name)
 
1178
        name_transport = _mod_transport.get_transport_from_path(name)
1079
1179
        self.permit_url(name)
1080
1180
        self.permit_url(name_transport.base)
1081
1181
 
1160
1260
        self.addCleanup(transport_server.stop_server)
1161
1261
        # Obtain a real transport because if the server supplies a password, it
1162
1262
        # will be hidden from the base on the client side.
1163
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1263
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1164
1264
        # Some transport servers effectively chroot the backing transport;
1165
1265
        # others like SFTPServer don't - users of the transport can walk up the
1166
1266
        # transport to read the entire backing transport. This wouldn't matter
1290
1390
                length, len(obj_with_len), obj_with_len))
1291
1391
 
1292
1392
    def assertLogsError(self, exception_class, func, *args, **kwargs):
1293
 
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1393
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
1294
1394
        """
1295
1395
        captured = []
1296
1396
        orig_log_exception_quietly = trace.log_exception_quietly
1297
1397
        try:
1298
1398
            def capture():
1299
1399
                orig_log_exception_quietly()
1300
 
                captured.append(sys.exc_info())
 
1400
                captured.append(sys.exc_info()[1])
1301
1401
            trace.log_exception_quietly = capture
1302
1402
            func(*args, **kwargs)
1303
1403
        finally:
1304
1404
            trace.log_exception_quietly = orig_log_exception_quietly
1305
1405
        self.assertLength(1, captured)
1306
 
        err = captured[0][1]
 
1406
        err = captured[0]
1307
1407
        self.assertIsInstance(err, exception_class)
1308
1408
        return err
1309
1409
 
1516
1616
        not other callers that go direct to the warning module.
1517
1617
 
1518
1618
        To test that a deprecated method raises an error, do something like
1519
 
        this::
 
1619
        this (remember that both assertRaises and applyDeprecated delays *args
 
1620
        and **kwargs passing)::
1520
1621
 
1521
1622
            self.assertRaises(errors.ReservedId,
1522
1623
                self.applyDeprecated,
1604
1705
 
1605
1706
        The file is removed as the test is torn down.
1606
1707
        """
1607
 
        self._log_file = StringIO()
 
1708
        pseudo_log_file = StringIO()
 
1709
        def _get_log_contents_for_weird_testtools_api():
 
1710
            return [pseudo_log_file.getvalue().decode(
 
1711
                "utf-8", "replace").encode("utf-8")]
 
1712
        self.addDetail("log", content.Content(content.ContentType("text",
 
1713
            "plain", {"charset": "utf8"}),
 
1714
            _get_log_contents_for_weird_testtools_api))
 
1715
        self._log_file = pseudo_log_file
1608
1716
        self._log_memento = trace.push_log_file(self._log_file)
1609
1717
        self.addCleanup(self._finishLogFile)
1610
1718
 
1611
1719
    def _finishLogFile(self):
1612
1720
        """Finished with the log file.
1613
1721
 
1614
 
        Close the file and delete it, unless setKeepLogfile was called.
 
1722
        Close the file and delete it.
1615
1723
        """
1616
1724
        if trace._trace_file:
1617
1725
            # flush the log file, to get all content
1618
1726
            trace._trace_file.flush()
1619
1727
        trace.pop_log_file(self._log_memento)
1620
 
        # Cache the log result and delete the file on disk
1621
 
        self._get_log(False)
1622
1728
 
1623
1729
    def thisFailsStrictLockCheck(self):
1624
1730
        """It is known that this test would fail with -Dstrict_locks.
1636
1742
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1637
1743
        """Overrides an object attribute restoring it after the test.
1638
1744
 
 
1745
        :note: This should be used with discretion; you should think about
 
1746
        whether it's better to make the code testable without monkey-patching.
 
1747
 
1639
1748
        :param obj: The object that will be mutated.
1640
1749
 
1641
1750
        :param attr_name: The attribute name we want to preserve/override in
1666
1775
        self.addCleanup(osutils.set_or_unset_env, name, value)
1667
1776
        return value
1668
1777
 
 
1778
    def recordCalls(self, obj, attr_name):
 
1779
        """Monkeypatch in a wrapper that will record calls.
 
1780
 
 
1781
        The monkeypatch is automatically removed when the test concludes.
 
1782
 
 
1783
        :param obj: The namespace holding the reference to be replaced;
 
1784
            typically a module, class, or object.
 
1785
        :param attr_name: A string for the name of the attribute to 
 
1786
            patch.
 
1787
        :returns: A list that will be extended with one item every time the
 
1788
            function is called, with a tuple of (args, kwargs).
 
1789
        """
 
1790
        calls = []
 
1791
 
 
1792
        def decorator(*args, **kwargs):
 
1793
            calls.append((args, kwargs))
 
1794
            return orig(*args, **kwargs)
 
1795
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1796
        return calls
 
1797
 
1669
1798
    def _cleanEnvironment(self):
1670
1799
        for name, value in isolated_environ.iteritems():
1671
1800
            self.overrideEnv(name, value)
1673
1802
    def _restoreHooks(self):
1674
1803
        for klass, (name, hooks) in self._preserved_hooks.items():
1675
1804
            setattr(klass, name, hooks)
1676
 
        hooks._lazy_hooks = self._preserved_lazy_hooks
 
1805
        self._preserved_hooks.clear()
 
1806
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1807
        self._preserved_lazy_hooks.clear()
1677
1808
 
1678
1809
    def knownFailure(self, reason):
1679
 
        """This test has failed for some known reason."""
1680
 
        raise KnownFailure(reason)
 
1810
        """Declare that this test fails for a known reason
 
1811
 
 
1812
        Tests that are known to fail should generally be using expectedFailure
 
1813
        with an appropriate reverse assertion if a change could cause the test
 
1814
        to start passing. Conversely if the test has no immediate prospect of
 
1815
        succeeding then using skip is more suitable.
 
1816
 
 
1817
        When this method is called while an exception is being handled, that
 
1818
        traceback will be used, otherwise a new exception will be thrown to
 
1819
        provide one but won't be reported.
 
1820
        """
 
1821
        self._add_reason(reason)
 
1822
        try:
 
1823
            exc_info = sys.exc_info()
 
1824
            if exc_info != (None, None, None):
 
1825
                self._report_traceback(exc_info)
 
1826
            else:
 
1827
                try:
 
1828
                    raise self.failureException(reason)
 
1829
                except self.failureException:
 
1830
                    exc_info = sys.exc_info()
 
1831
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1832
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1833
        finally:
 
1834
            del exc_info
1681
1835
 
1682
1836
    def _suppress_log(self):
1683
1837
        """Remove the log info from details."""
1771
1925
    def log(self, *args):
1772
1926
        trace.mutter(*args)
1773
1927
 
1774
 
    def _get_log(self, keep_log_file=False):
1775
 
        """Internal helper to get the log from bzrlib.trace for this test.
1776
 
 
1777
 
        Please use self.getDetails, or self.get_log to access this in test case
1778
 
        code.
1779
 
 
1780
 
        :param keep_log_file: When True, if the log is still a file on disk
1781
 
            leave it as a file on disk. When False, if the log is still a file
1782
 
            on disk, the log file is deleted and the log preserved as
1783
 
            self._log_contents.
1784
 
        :return: A string containing the log.
1785
 
        """
1786
 
        if self._log_contents is not None:
1787
 
            try:
1788
 
                self._log_contents.decode('utf8')
1789
 
            except UnicodeDecodeError:
1790
 
                unicodestr = self._log_contents.decode('utf8', 'replace')
1791
 
                self._log_contents = unicodestr.encode('utf8')
1792
 
            return self._log_contents
1793
 
        if self._log_file is not None:
1794
 
            log_contents = self._log_file.getvalue()
1795
 
            try:
1796
 
                log_contents.decode('utf8')
1797
 
            except UnicodeDecodeError:
1798
 
                unicodestr = log_contents.decode('utf8', 'replace')
1799
 
                log_contents = unicodestr.encode('utf8')
1800
 
            if not keep_log_file:
1801
 
                self._log_file = None
1802
 
                # Permit multiple calls to get_log until we clean it up in
1803
 
                # finishLogFile
1804
 
                self._log_contents = log_contents
1805
 
            return log_contents
1806
 
        else:
1807
 
            return "No log file content."
1808
 
 
1809
1928
    def get_log(self):
1810
1929
        """Get a unicode string containing the log from bzrlib.trace.
1811
1930
 
2011
2130
    def start_bzr_subprocess(self, process_args, env_changes=None,
2012
2131
                             skip_if_plan_to_signal=False,
2013
2132
                             working_dir=None,
2014
 
                             allow_plugins=False):
 
2133
                             allow_plugins=False, stderr=subprocess.PIPE):
2015
2134
        """Start bzr in a subprocess for testing.
2016
2135
 
2017
2136
        This starts a new Python interpreter and runs bzr in there.
2029
2148
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2030
2149
            doesn't support signalling subprocesses.
2031
2150
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2151
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2152
            are those valid for the stderr argument of `subprocess.Popen`.
 
2153
            Default value is ``subprocess.PIPE``.
2032
2154
 
2033
2155
        :returns: Popen object for the started process.
2034
2156
        """
2060
2182
            # so we will avoid using it on all platforms, just to
2061
2183
            # make sure the code path is used, and we don't break on win32
2062
2184
            cleanup_environment()
 
2185
            # Include the subprocess's log file in the test details, in case
 
2186
            # the test fails due to an error in the subprocess.
 
2187
            self._add_subprocess_log(trace._get_bzr_log_filename())
2063
2188
            command = [sys.executable]
2064
2189
            # frozen executables don't need the path to bzr
2065
2190
            if getattr(sys, "frozen", None) is None:
2069
2194
            command.extend(process_args)
2070
2195
            process = self._popen(command, stdin=subprocess.PIPE,
2071
2196
                                  stdout=subprocess.PIPE,
2072
 
                                  stderr=subprocess.PIPE)
 
2197
                                  stderr=stderr)
2073
2198
        finally:
2074
2199
            restore_environment()
2075
2200
            if cwd is not None:
2077
2202
 
2078
2203
        return process
2079
2204
 
 
2205
    def _add_subprocess_log(self, log_file_path):
 
2206
        if len(self._log_files) == 0:
 
2207
            # Register an addCleanup func.  We do this on the first call to
 
2208
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2209
            # addCleanup is registered after any cleanups for tempdirs that
 
2210
            # subclasses might create, which will probably remove the log file
 
2211
            # we want to read.
 
2212
            self.addCleanup(self._subprocess_log_cleanup)
 
2213
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2214
        # twice.
 
2215
        self._log_files.add(log_file_path)
 
2216
 
 
2217
    def _subprocess_log_cleanup(self):
 
2218
        for count, log_file_path in enumerate(self._log_files):
 
2219
            # We use buffer_now=True to avoid holding the file open beyond
 
2220
            # the life of this function, which might interfere with e.g.
 
2221
            # cleaning tempdirs on Windows.
 
2222
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2223
            #detail_content = content.content_from_file(
 
2224
            #    log_file_path, buffer_now=True)
 
2225
            with open(log_file_path, 'rb') as log_file:
 
2226
                log_file_bytes = log_file.read()
 
2227
            detail_content = content.Content(content.ContentType("text",
 
2228
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2229
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2230
                detail_content)
 
2231
 
2080
2232
    def _popen(self, *args, **kwargs):
2081
2233
        """Place a call to Popen.
2082
2234
 
2234
2386
class TestCaseWithMemoryTransport(TestCase):
2235
2387
    """Common test class for tests that do not need disk resources.
2236
2388
 
2237
 
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2389
    Tests that need disk resources should derive from TestCaseInTempDir
 
2390
    orTestCaseWithTransport.
2238
2391
 
2239
2392
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2240
2393
 
2241
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2394
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2242
2395
    a directory which does not exist. This serves to help ensure test isolation
2243
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
2244
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
2245
 
    file defaults for the transport in tests, nor does it obey the command line
 
2396
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2397
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2398
    defaults for the transport in tests, nor does it obey the command line
2246
2399
    override, so tests that accidentally write to the common directory should
2247
2400
    be rare.
2248
2401
 
2249
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
2250
 
    a .bzr directory that stops us ascending higher into the filesystem.
 
2402
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2403
        ``.bzr`` directory that stops us ascending higher into the filesystem.
2251
2404
    """
2252
2405
 
2253
2406
    TEST_ROOT = None
2271
2424
 
2272
2425
        :param relpath: a path relative to the base url.
2273
2426
        """
2274
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2427
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2275
2428
        self.assertFalse(t.is_readonly())
2276
2429
        return t
2277
2430
 
2283
2436
 
2284
2437
        :param relpath: a path relative to the base url.
2285
2438
        """
2286
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2439
        t = _mod_transport.get_transport_from_url(
 
2440
            self.get_readonly_url(relpath))
2287
2441
        self.assertTrue(t.is_readonly())
2288
2442
        return t
2289
2443
 
2410
2564
        real branch.
2411
2565
        """
2412
2566
        root = TestCaseWithMemoryTransport.TEST_ROOT
2413
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2567
        try:
 
2568
            # Make sure we get a readable and accessible home for .bzr.log
 
2569
            # and/or config files, and not fallback to weird defaults (see
 
2570
            # http://pad.lv/825027).
 
2571
            self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2572
            os.environ['BZR_HOME'] = root
 
2573
            wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2574
            del os.environ['BZR_HOME']
 
2575
        except Exception, e:
 
2576
            self.fail("Fail to initialize the safety net: %r\nExiting\n" % (e,))
 
2577
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2578
        # we don't need to re-open the wt to check it hasn't changed.
 
2579
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2580
            wt.control_transport.get_bytes('dirstate'))
2414
2581
 
2415
2582
    def _check_safety_net(self):
2416
2583
        """Check that the safety .bzr directory have not been touched.
2419
2586
        propagating. This method ensures than a test did not leaked.
2420
2587
        """
2421
2588
        root = TestCaseWithMemoryTransport.TEST_ROOT
2422
 
        self.permit_url(_mod_transport.get_transport(root).base)
2423
 
        wt = workingtree.WorkingTree.open(root)
2424
 
        last_rev = wt.last_revision()
2425
 
        if last_rev != 'null:':
 
2589
        t = _mod_transport.get_transport_from_path(root)
 
2590
        self.permit_url(t.base)
 
2591
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2592
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2426
2593
            # The current test have modified the /bzr directory, we need to
2427
2594
            # recreate a new one or all the followng tests will fail.
2428
2595
            # If you need to inspect its content uncomment the following line
2463
2630
    def make_branch(self, relpath, format=None):
2464
2631
        """Create a branch on the transport at relpath."""
2465
2632
        repo = self.make_repository(relpath, format=format)
2466
 
        return repo.bzrdir.create_branch()
 
2633
        return repo.bzrdir.create_branch(append_revisions_only=False)
 
2634
 
 
2635
    def resolve_format(self, format):
 
2636
        """Resolve an object to a ControlDir format object.
 
2637
 
 
2638
        The initial format object can either already be
 
2639
        a ControlDirFormat, None (for the default format),
 
2640
        or a string with the name of the control dir format.
 
2641
 
 
2642
        :param format: Object to resolve
 
2643
        :return A ControlDirFormat instance
 
2644
        """
 
2645
        if format is None:
 
2646
            format = 'default'
 
2647
        if isinstance(format, basestring):
 
2648
            format = bzrdir.format_registry.make_bzrdir(format)
 
2649
        return format
 
2650
 
 
2651
    def resolve_format(self, format):
 
2652
        """Resolve an object to a ControlDir format object.
 
2653
 
 
2654
        The initial format object can either already be
 
2655
        a ControlDirFormat, None (for the default format),
 
2656
        or a string with the name of the control dir format.
 
2657
 
 
2658
        :param format: Object to resolve
 
2659
        :return A ControlDirFormat instance
 
2660
        """
 
2661
        if format is None:
 
2662
            format = 'default'
 
2663
        if isinstance(format, basestring):
 
2664
            format = bzrdir.format_registry.make_bzrdir(format)
 
2665
        return format
2467
2666
 
2468
2667
    def make_bzrdir(self, relpath, format=None):
2469
2668
        try:
2473
2672
            t = _mod_transport.get_transport(maybe_a_url)
2474
2673
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2475
2674
                t.ensure_base()
2476
 
            if format is None:
2477
 
                format = 'default'
2478
 
            if isinstance(format, basestring):
2479
 
                format = bzrdir.format_registry.make_bzrdir(format)
 
2675
            format = self.resolve_format(format)
2480
2676
            return format.initialize_on_transport(t)
2481
2677
        except errors.UninitializableFormat:
2482
2678
            raise TestSkipped("Format %s is not initializable." % format)
2483
2679
 
2484
 
    def make_repository(self, relpath, shared=False, format=None):
 
2680
    def make_repository(self, relpath, shared=None, format=None):
2485
2681
        """Create a repository on our default transport at relpath.
2486
2682
 
2487
2683
        Note that relpath must be a relative path, not a full url.
2498
2694
            backing_server = self.get_server()
2499
2695
        smart_server = test_server.SmartTCPServer_for_testing()
2500
2696
        self.start_server(smart_server, backing_server)
2501
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2697
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2502
2698
                                                   ).clone(path)
2503
2699
        return remote_transport
2504
2700
 
2521
2717
    def setUp(self):
2522
2718
        super(TestCaseWithMemoryTransport, self).setUp()
2523
2719
        # Ensure that ConnectedTransport doesn't leak sockets
2524
 
        def get_transport_with_cleanup(*args, **kwargs):
2525
 
            t = orig_get_transport(*args, **kwargs)
 
2720
        def get_transport_from_url_with_cleanup(*args, **kwargs):
 
2721
            t = orig_get_transport_from_url(*args, **kwargs)
2526
2722
            if isinstance(t, _mod_transport.ConnectedTransport):
2527
2723
                self.addCleanup(t.disconnect)
2528
2724
            return t
2529
2725
 
2530
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2531
 
                                               get_transport_with_cleanup)
 
2726
        orig_get_transport_from_url = self.overrideAttr(
 
2727
            _mod_transport, 'get_transport_from_url',
 
2728
            get_transport_from_url_with_cleanup)
2532
2729
        self._make_test_root()
2533
2730
        self.addCleanup(os.chdir, os.getcwdu())
2534
2731
        self.makeAndChdirToTestDir()
2577
2774
 
2578
2775
    OVERRIDE_PYTHON = 'python'
2579
2776
 
 
2777
    def setUp(self):
 
2778
        super(TestCaseInTempDir, self).setUp()
 
2779
        # Remove the protection set in isolated_environ, we have a proper
 
2780
        # access to disk resources now.
 
2781
        self.overrideEnv('BZR_LOG', None)
 
2782
 
2580
2783
    def check_file_contents(self, filename, expect):
2581
2784
        self.log("check contents of file %s" % filename)
2582
2785
        f = file(filename)
2663
2866
                "a list or a tuple. Got %r instead" % (shape,))
2664
2867
        # It's OK to just create them using forward slashes on windows.
2665
2868
        if transport is None or transport.is_readonly():
2666
 
            transport = _mod_transport.get_transport(".")
 
2869
            transport = _mod_transport.get_transport_from_path(".")
2667
2870
        for name in shape:
2668
2871
            self.assertIsInstance(name, basestring)
2669
2872
            if name[-1] == '/':
3473
3676
#                           with proper exclusion rules.
3474
3677
#   -Ethreads               Will display thread ident at creation/join time to
3475
3678
#                           help track thread leaks
 
3679
 
 
3680
#   -Econfig_stats          Will collect statistics using addDetail
3476
3681
selftest_debug_flags = set()
3477
3682
 
3478
3683
 
3782
3987
        'bzrlib.tests.test_email_message',
3783
3988
        'bzrlib.tests.test_eol_filters',
3784
3989
        'bzrlib.tests.test_errors',
 
3990
        'bzrlib.tests.test_estimate_compressed_size',
3785
3991
        'bzrlib.tests.test_export',
 
3992
        'bzrlib.tests.test_export_pot',
3786
3993
        'bzrlib.tests.test_extract',
 
3994
        'bzrlib.tests.test_features',
3787
3995
        'bzrlib.tests.test_fetch',
3788
3996
        'bzrlib.tests.test_fixtures',
3789
3997
        'bzrlib.tests.test_fifo_cache',
3790
3998
        'bzrlib.tests.test_filters',
 
3999
        'bzrlib.tests.test_filter_tree',
3791
4000
        'bzrlib.tests.test_ftp_transport',
3792
4001
        'bzrlib.tests.test_foreign',
3793
4002
        'bzrlib.tests.test_generate_docs',
3802
4011
        'bzrlib.tests.test_http',
3803
4012
        'bzrlib.tests.test_http_response',
3804
4013
        'bzrlib.tests.test_https_ca_bundle',
 
4014
        'bzrlib.tests.test_i18n',
3805
4015
        'bzrlib.tests.test_identitymap',
3806
4016
        'bzrlib.tests.test_ignores',
3807
4017
        'bzrlib.tests.test_index',
3899
4109
        'bzrlib.tests.test_upgrade',
3900
4110
        'bzrlib.tests.test_upgrade_stacked',
3901
4111
        'bzrlib.tests.test_urlutils',
 
4112
        'bzrlib.tests.test_utextwrap',
3902
4113
        'bzrlib.tests.test_version',
3903
4114
        'bzrlib.tests.test_version_info',
3904
4115
        'bzrlib.tests.test_versionedfile',
4182
4393
        the module is available.
4183
4394
    """
4184
4395
 
 
4396
    from bzrlib.tests.features import ModuleAvailableFeature
4185
4397
    py_module = pyutils.get_named_object(py_module_name)
4186
4398
    scenarios = [
4187
4399
        ('python', {'module': py_module}),
4228
4440
                         % (os.path.basename(dirname), printable_e))
4229
4441
 
4230
4442
 
4231
 
class Feature(object):
4232
 
    """An operating system Feature."""
4233
 
 
4234
 
    def __init__(self):
4235
 
        self._available = None
4236
 
 
4237
 
    def available(self):
4238
 
        """Is the feature available?
4239
 
 
4240
 
        :return: True if the feature is available.
4241
 
        """
4242
 
        if self._available is None:
4243
 
            self._available = self._probe()
4244
 
        return self._available
4245
 
 
4246
 
    def _probe(self):
4247
 
        """Implement this method in concrete features.
4248
 
 
4249
 
        :return: True if the feature is available.
4250
 
        """
4251
 
        raise NotImplementedError
4252
 
 
4253
 
    def __str__(self):
4254
 
        if getattr(self, 'feature_name', None):
4255
 
            return self.feature_name()
4256
 
        return self.__class__.__name__
4257
 
 
4258
 
 
4259
 
class _SymlinkFeature(Feature):
4260
 
 
4261
 
    def _probe(self):
4262
 
        return osutils.has_symlinks()
4263
 
 
4264
 
    def feature_name(self):
4265
 
        return 'symlinks'
4266
 
 
4267
 
SymlinkFeature = _SymlinkFeature()
4268
 
 
4269
 
 
4270
 
class _HardlinkFeature(Feature):
4271
 
 
4272
 
    def _probe(self):
4273
 
        return osutils.has_hardlinks()
4274
 
 
4275
 
    def feature_name(self):
4276
 
        return 'hardlinks'
4277
 
 
4278
 
HardlinkFeature = _HardlinkFeature()
4279
 
 
4280
 
 
4281
 
class _OsFifoFeature(Feature):
4282
 
 
4283
 
    def _probe(self):
4284
 
        return getattr(os, 'mkfifo', None)
4285
 
 
4286
 
    def feature_name(self):
4287
 
        return 'filesystem fifos'
4288
 
 
4289
 
OsFifoFeature = _OsFifoFeature()
4290
 
 
4291
 
 
4292
 
class _UnicodeFilenameFeature(Feature):
4293
 
    """Does the filesystem support Unicode filenames?"""
4294
 
 
4295
 
    def _probe(self):
4296
 
        try:
4297
 
            # Check for character combinations unlikely to be covered by any
4298
 
            # single non-unicode encoding. We use the characters
4299
 
            # - greek small letter alpha (U+03B1) and
4300
 
            # - braille pattern dots-123456 (U+283F).
4301
 
            os.stat(u'\u03b1\u283f')
4302
 
        except UnicodeEncodeError:
4303
 
            return False
4304
 
        except (IOError, OSError):
4305
 
            # The filesystem allows the Unicode filename but the file doesn't
4306
 
            # exist.
4307
 
            return True
4308
 
        else:
4309
 
            # The filesystem allows the Unicode filename and the file exists,
4310
 
            # for some reason.
4311
 
            return True
4312
 
 
4313
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4314
 
 
4315
 
 
4316
 
class _CompatabilityThunkFeature(Feature):
4317
 
    """This feature is just a thunk to another feature.
4318
 
 
4319
 
    It issues a deprecation warning if it is accessed, to let you know that you
4320
 
    should really use a different feature.
4321
 
    """
4322
 
 
4323
 
    def __init__(self, dep_version, module, name,
4324
 
                 replacement_name, replacement_module=None):
4325
 
        super(_CompatabilityThunkFeature, self).__init__()
4326
 
        self._module = module
4327
 
        if replacement_module is None:
4328
 
            replacement_module = module
4329
 
        self._replacement_module = replacement_module
4330
 
        self._name = name
4331
 
        self._replacement_name = replacement_name
4332
 
        self._dep_version = dep_version
4333
 
        self._feature = None
4334
 
 
4335
 
    def _ensure(self):
4336
 
        if self._feature is None:
4337
 
            depr_msg = self._dep_version % ('%s.%s'
4338
 
                                            % (self._module, self._name))
4339
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4340
 
                                               self._replacement_name)
4341
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4342
 
            # Import the new feature and use it as a replacement for the
4343
 
            # deprecated one.
4344
 
            self._feature = pyutils.get_named_object(
4345
 
                self._replacement_module, self._replacement_name)
4346
 
 
4347
 
    def _probe(self):
4348
 
        self._ensure()
4349
 
        return self._feature._probe()
4350
 
 
4351
 
 
4352
 
class ModuleAvailableFeature(Feature):
4353
 
    """This is a feature than describes a module we want to be available.
4354
 
 
4355
 
    Declare the name of the module in __init__(), and then after probing, the
4356
 
    module will be available as 'self.module'.
4357
 
 
4358
 
    :ivar module: The module if it is available, else None.
4359
 
    """
4360
 
 
4361
 
    def __init__(self, module_name):
4362
 
        super(ModuleAvailableFeature, self).__init__()
4363
 
        self.module_name = module_name
4364
 
 
4365
 
    def _probe(self):
4366
 
        try:
4367
 
            self._module = __import__(self.module_name, {}, {}, [''])
4368
 
            return True
4369
 
        except ImportError:
4370
 
            return False
4371
 
 
4372
 
    @property
4373
 
    def module(self):
4374
 
        if self.available(): # Make sure the probe has been done
4375
 
            return self._module
4376
 
        return None
4377
 
 
4378
 
    def feature_name(self):
4379
 
        return self.module_name
4380
 
 
4381
 
 
4382
4443
def probe_unicode_in_user_encoding():
4383
4444
    """Try to encode several unicode strings to use in unicode-aware tests.
4384
4445
    Return first successfull match.
4412
4473
    return None
4413
4474
 
4414
4475
 
4415
 
class _HTTPSServerFeature(Feature):
4416
 
    """Some tests want an https Server, check if one is available.
4417
 
 
4418
 
    Right now, the only way this is available is under python2.6 which provides
4419
 
    an ssl module.
4420
 
    """
4421
 
 
4422
 
    def _probe(self):
4423
 
        try:
4424
 
            import ssl
4425
 
            return True
4426
 
        except ImportError:
4427
 
            return False
4428
 
 
4429
 
    def feature_name(self):
4430
 
        return 'HTTPSServer'
4431
 
 
4432
 
 
4433
 
HTTPSServerFeature = _HTTPSServerFeature()
4434
 
 
4435
 
 
4436
 
class _UnicodeFilename(Feature):
4437
 
    """Does the filesystem support Unicode filenames?"""
4438
 
 
4439
 
    def _probe(self):
4440
 
        try:
4441
 
            os.stat(u'\u03b1')
4442
 
        except UnicodeEncodeError:
4443
 
            return False
4444
 
        except (IOError, OSError):
4445
 
            # The filesystem allows the Unicode filename but the file doesn't
4446
 
            # exist.
4447
 
            return True
4448
 
        else:
4449
 
            # The filesystem allows the Unicode filename and the file exists,
4450
 
            # for some reason.
4451
 
            return True
4452
 
 
4453
 
UnicodeFilename = _UnicodeFilename()
4454
 
 
4455
 
 
4456
 
class _ByteStringNamedFilesystem(Feature):
4457
 
    """Is the filesystem based on bytes?"""
4458
 
 
4459
 
    def _probe(self):
4460
 
        if os.name == "posix":
4461
 
            return True
4462
 
        return False
4463
 
 
4464
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4465
 
 
4466
 
 
4467
 
class _UTF8Filesystem(Feature):
4468
 
    """Is the filesystem UTF-8?"""
4469
 
 
4470
 
    def _probe(self):
4471
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4472
 
            return True
4473
 
        return False
4474
 
 
4475
 
UTF8Filesystem = _UTF8Filesystem()
4476
 
 
4477
 
 
4478
 
class _BreakinFeature(Feature):
4479
 
    """Does this platform support the breakin feature?"""
4480
 
 
4481
 
    def _probe(self):
4482
 
        from bzrlib import breakin
4483
 
        if breakin.determine_signal() is None:
4484
 
            return False
4485
 
        if sys.platform == 'win32':
4486
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4487
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4488
 
            # access the function
4489
 
            try:
4490
 
                import ctypes
4491
 
            except OSError:
4492
 
                return False
4493
 
        return True
4494
 
 
4495
 
    def feature_name(self):
4496
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4497
 
 
4498
 
 
4499
 
BreakinFeature = _BreakinFeature()
4500
 
 
4501
 
 
4502
 
class _CaseInsCasePresFilenameFeature(Feature):
4503
 
    """Is the file-system case insensitive, but case-preserving?"""
4504
 
 
4505
 
    def _probe(self):
4506
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4507
 
        try:
4508
 
            # first check truly case-preserving for created files, then check
4509
 
            # case insensitive when opening existing files.
4510
 
            name = osutils.normpath(name)
4511
 
            base, rel = osutils.split(name)
4512
 
            found_rel = osutils.canonical_relpath(base, name)
4513
 
            return (found_rel == rel
4514
 
                    and os.path.isfile(name.upper())
4515
 
                    and os.path.isfile(name.lower()))
4516
 
        finally:
4517
 
            os.close(fileno)
4518
 
            os.remove(name)
4519
 
 
4520
 
    def feature_name(self):
4521
 
        return "case-insensitive case-preserving filesystem"
4522
 
 
4523
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4524
 
 
4525
 
 
4526
 
class _CaseInsensitiveFilesystemFeature(Feature):
4527
 
    """Check if underlying filesystem is case-insensitive but *not* case
4528
 
    preserving.
4529
 
    """
4530
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4531
 
    # more likely to be case preserving, so this case is rare.
4532
 
 
4533
 
    def _probe(self):
4534
 
        if CaseInsCasePresFilenameFeature.available():
4535
 
            return False
4536
 
 
4537
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4538
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4539
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4540
 
        else:
4541
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4542
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4543
 
            dir=root)
4544
 
        name_a = osutils.pathjoin(tdir, 'a')
4545
 
        name_A = osutils.pathjoin(tdir, 'A')
4546
 
        os.mkdir(name_a)
4547
 
        result = osutils.isdir(name_A)
4548
 
        _rmtree_temp_dir(tdir)
4549
 
        return result
4550
 
 
4551
 
    def feature_name(self):
4552
 
        return 'case-insensitive filesystem'
4553
 
 
4554
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4555
 
 
4556
 
 
4557
 
class _CaseSensitiveFilesystemFeature(Feature):
4558
 
 
4559
 
    def _probe(self):
4560
 
        if CaseInsCasePresFilenameFeature.available():
4561
 
            return False
4562
 
        elif CaseInsensitiveFilesystemFeature.available():
4563
 
            return False
4564
 
        else:
4565
 
            return True
4566
 
 
4567
 
    def feature_name(self):
4568
 
        return 'case-sensitive filesystem'
4569
 
 
4570
 
# new coding style is for feature instances to be lowercase
4571
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4572
 
 
4573
 
 
4574
4476
# Only define SubUnitBzrRunner if subunit is available.
4575
4477
try:
4576
4478
    from subunit import TestProtocolClient
4594
4496
except ImportError:
4595
4497
    pass
4596
4498
 
4597
 
class _PosixPermissionsFeature(Feature):
4598
 
 
4599
 
    def _probe(self):
4600
 
        def has_perms():
4601
 
            # create temporary file and check if specified perms are maintained.
4602
 
            import tempfile
4603
 
 
4604
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4605
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4606
 
            fd, name = f
4607
 
            os.close(fd)
4608
 
            os.chmod(name, write_perms)
4609
 
 
4610
 
            read_perms = os.stat(name).st_mode & 0777
4611
 
            os.unlink(name)
4612
 
            return (write_perms == read_perms)
4613
 
 
4614
 
        return (os.name == 'posix') and has_perms()
4615
 
 
4616
 
    def feature_name(self):
4617
 
        return 'POSIX permissions support'
4618
 
 
4619
 
posix_permissions_feature = _PosixPermissionsFeature()
 
4499
 
 
4500
@deprecated_function(deprecated_in((2, 5, 0)))
 
4501
def ModuleAvailableFeature(name):
 
4502
    from bzrlib.tests import features
 
4503
    return features.ModuleAvailableFeature(name)
 
4504