~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2011-06-16 11:58:04 UTC
  • mto: This revision was merged to the branch mainline in revision 5987.
  • Revision ID: jelmer@samba.org-20110616115804-7tnqon61emrbdoxm
RemoveĀ unusedĀ Tree._get_ancestors.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
61
67
    chk_map,
62
68
    commands as _mod_commands,
63
69
    config,
64
 
    i18n,
65
70
    debug,
66
71
    errors,
67
72
    hooks,
89
94
    memory,
90
95
    pathfilter,
91
96
    )
92
 
from bzrlib.symbol_versioning import (
93
 
    deprecated_function,
94
 
    deprecated_in,
95
 
    )
96
97
from bzrlib.tests import (
97
98
    test_server,
98
99
    TestUtil,
141
142
    'BZREMAIL': None, # may still be present in the environment
142
143
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
143
144
    'BZR_PROGRESS_BAR': None,
144
 
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
145
 
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
146
 
    # TestCase should not use disk resources, BZR_LOG is one.
147
 
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
145
    'BZR_LOG': None,
148
146
    'BZR_PLUGIN_PATH': None,
149
147
    'BZR_DISABLE_PLUGINS': None,
150
148
    'BZR_PLUGINS_AT': None,
212
210
        osutils.set_or_unset_env(var, value)
213
211
 
214
212
 
215
 
def _clear__type_equality_funcs(test):
216
 
    """Cleanup bound methods stored on TestCase instances
217
 
 
218
 
    Clear the dict breaking a few (mostly) harmless cycles in the affected
219
 
    unittests released with Python 2.6 and initial Python 2.7 versions.
220
 
 
221
 
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
 
    shipped in Oneiric, an object with no clear method was used, hence the
223
 
    extra complications, see bug 809048 for details.
224
 
    """
225
 
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
 
    if type_equality_funcs is not None:
227
 
        tef_clear = getattr(type_equality_funcs, "clear", None)
228
 
        if tef_clear is None:
229
 
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
 
            if tef_instance_dict is not None:
231
 
                tef_clear = tef_instance_dict.clear
232
 
        if tef_clear is not None:
233
 
            tef_clear()
234
 
 
235
 
 
236
213
class ExtendedTestResult(testtools.TextTestResult):
237
214
    """Accepts, reports and accumulates the results of running tests.
238
215
 
406
383
        getDetails = getattr(test, "getDetails", None)
407
384
        if getDetails is not None:
408
385
            getDetails().clear()
409
 
        _clear__type_equality_funcs(test)
 
386
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
387
        if type_equality_funcs is not None:
 
388
            type_equality_funcs.clear()
410
389
        self._traceback_from_test = None
411
390
 
412
391
    def startTests(self):
513
492
        self.not_applicable_count += 1
514
493
        self.report_not_applicable(test, reason)
515
494
 
516
 
    def _count_stored_tests(self):
517
 
        """Count of tests instances kept alive due to not succeeding"""
518
 
        return self.error_count + self.failure_count + self.known_failure_count
519
 
 
520
495
    def _post_mortem(self, tb=None):
521
496
        """Start a PDB post mortem session."""
522
497
        if os.environ.get('BZR_TEST_PDB', None):
953
928
 
954
929
    The method is really a factory and users are expected to use it as such.
955
930
    """
956
 
 
 
931
    
957
932
    kwargs['setUp'] = isolated_doctest_setUp
958
933
    kwargs['tearDown'] = isolated_doctest_tearDown
959
934
    return doctest.DocTestSuite(*args, **kwargs)
997
972
        for feature in getattr(self, '_test_needs_features', []):
998
973
            self.requireFeature(feature)
999
974
        self._cleanEnvironment()
1000
 
        if bzrlib.global_state is not None:
1001
 
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1002
 
                              config.CommandLineSection())
1003
975
        self._silenceUI()
1004
976
        self._startLogFile()
1005
977
        self._benchcalls = []
1017
989
        # is addressed -- vila 20110219
1018
990
        self.overrideAttr(config, '_expand_default_value', None)
1019
991
        self._log_files = set()
1020
 
        # Each key in the ``_counters`` dict holds a value for a different
1021
 
        # counter. When the test ends, addDetail() should be used to output the
1022
 
        # counter values. This happens in install_counter_hook().
1023
 
        self._counters = {}
1024
 
        if 'config_stats' in selftest_debug_flags:
1025
 
            self._install_config_stats_hooks()
1026
 
        # Do not use i18n for tests (unless the test reverses this)
1027
 
        i18n.disable_i18n()
1028
992
 
1029
993
    def debug(self):
1030
994
        # debug a frame up.
1031
995
        import pdb
1032
 
        # The sys preserved stdin/stdout should allow blackbox tests debugging
1033
 
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1034
 
                ).set_trace(sys._getframe().f_back)
 
996
        pdb.Pdb().set_trace(sys._getframe().f_back)
1035
997
 
1036
998
    def discardDetail(self, name):
1037
999
        """Extend the addDetail, getDetails api so we can remove a detail.
1049
1011
        if name in details:
1050
1012
            del details[name]
1051
1013
 
1052
 
    def install_counter_hook(self, hooks, name, counter_name=None):
1053
 
        """Install a counting hook.
1054
 
 
1055
 
        Any hook can be counted as long as it doesn't need to return a value.
1056
 
 
1057
 
        :param hooks: Where the hook should be installed.
1058
 
 
1059
 
        :param name: The hook name that will be counted.
1060
 
 
1061
 
        :param counter_name: The counter identifier in ``_counters``, defaults
1062
 
            to ``name``.
1063
 
        """
1064
 
        _counters = self._counters # Avoid closing over self
1065
 
        if counter_name is None:
1066
 
            counter_name = name
1067
 
        if _counters.has_key(counter_name):
1068
 
            raise AssertionError('%s is already used as a counter name'
1069
 
                                  % (counter_name,))
1070
 
        _counters[counter_name] = 0
1071
 
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
1072
 
            lambda: ['%d' % (_counters[counter_name],)]))
1073
 
        def increment_counter(*args, **kwargs):
1074
 
            _counters[counter_name] += 1
1075
 
        label = 'count %s calls' % (counter_name,)
1076
 
        hooks.install_named_hook(name, increment_counter, label)
1077
 
        self.addCleanup(hooks.uninstall_named_hook, name, label)
1078
 
 
1079
 
    def _install_config_stats_hooks(self):
1080
 
        """Install config hooks to count hook calls.
1081
 
 
1082
 
        """
1083
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1084
 
            self.install_counter_hook(config.ConfigHooks, hook_name,
1085
 
                                       'config.%s' % (hook_name,))
1086
 
 
1087
 
        # The OldConfigHooks are private and need special handling to protect
1088
 
        # against recursive tests (tests that run other tests), so we just do
1089
 
        # manually what registering them into _builtin_known_hooks will provide
1090
 
        # us.
1091
 
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1092
 
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1093
 
            self.install_counter_hook(config.OldConfigHooks, hook_name,
1094
 
                                      'old_config.%s' % (hook_name,))
1095
 
 
1096
1014
    def _clear_debug_flags(self):
1097
1015
        """Prevent externally set debug flags affecting tests.
1098
1016
 
1192
1110
 
1193
1111
    def permit_dir(self, name):
1194
1112
        """Permit a directory to be used by this test. See permit_url."""
1195
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1113
        name_transport = _mod_transport.get_transport(name)
1196
1114
        self.permit_url(name)
1197
1115
        self.permit_url(name_transport.base)
1198
1116
 
1277
1195
        self.addCleanup(transport_server.stop_server)
1278
1196
        # Obtain a real transport because if the server supplies a password, it
1279
1197
        # will be hidden from the base on the client side.
1280
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1198
        t = _mod_transport.get_transport(transport_server.get_url())
1281
1199
        # Some transport servers effectively chroot the backing transport;
1282
1200
        # others like SFTPServer don't - users of the transport can walk up the
1283
1201
        # transport to read the entire backing transport. This wouldn't matter
1736
1654
    def _finishLogFile(self):
1737
1655
        """Finished with the log file.
1738
1656
 
1739
 
        Close the file and delete it.
 
1657
        Close the file and delete it, unless setKeepLogfile was called.
1740
1658
        """
1741
1659
        if trace._trace_file:
1742
1660
            # flush the log file, to get all content
1759
1677
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1760
1678
        """Overrides an object attribute restoring it after the test.
1761
1679
 
1762
 
        :note: This should be used with discretion; you should think about
1763
 
        whether it's better to make the code testable without monkey-patching.
1764
 
 
1765
1680
        :param obj: The object that will be mutated.
1766
1681
 
1767
1682
        :param attr_name: The attribute name we want to preserve/override in
1792
1707
        self.addCleanup(osutils.set_or_unset_env, name, value)
1793
1708
        return value
1794
1709
 
1795
 
    def recordCalls(self, obj, attr_name):
1796
 
        """Monkeypatch in a wrapper that will record calls.
1797
 
 
1798
 
        The monkeypatch is automatically removed when the test concludes.
1799
 
 
1800
 
        :param obj: The namespace holding the reference to be replaced;
1801
 
            typically a module, class, or object.
1802
 
        :param attr_name: A string for the name of the attribute to 
1803
 
            patch.
1804
 
        :returns: A list that will be extended with one item every time the
1805
 
            function is called, with a tuple of (args, kwargs).
1806
 
        """
1807
 
        calls = []
1808
 
 
1809
 
        def decorator(*args, **kwargs):
1810
 
            calls.append((args, kwargs))
1811
 
            return orig(*args, **kwargs)
1812
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1813
 
        return calls
1814
 
 
1815
1710
    def _cleanEnvironment(self):
1816
1711
        for name, value in isolated_environ.iteritems():
1817
1712
            self.overrideEnv(name, value)
1824
1719
        self._preserved_lazy_hooks.clear()
1825
1720
 
1826
1721
    def knownFailure(self, reason):
1827
 
        """Declare that this test fails for a known reason
1828
 
 
1829
 
        Tests that are known to fail should generally be using expectedFailure
1830
 
        with an appropriate reverse assertion if a change could cause the test
1831
 
        to start passing. Conversely if the test has no immediate prospect of
1832
 
        succeeding then using skip is more suitable.
1833
 
 
1834
 
        When this method is called while an exception is being handled, that
1835
 
        traceback will be used, otherwise a new exception will be thrown to
1836
 
        provide one but won't be reported.
1837
 
        """
1838
 
        self._add_reason(reason)
1839
 
        try:
1840
 
            exc_info = sys.exc_info()
1841
 
            if exc_info != (None, None, None):
1842
 
                self._report_traceback(exc_info)
1843
 
            else:
1844
 
                try:
1845
 
                    raise self.failureException(reason)
1846
 
                except self.failureException:
1847
 
                    exc_info = sys.exc_info()
1848
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1849
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1850
 
        finally:
1851
 
            del exc_info
 
1722
        """This test has failed for some known reason."""
 
1723
        raise KnownFailure(reason)
1852
1724
 
1853
1725
    def _suppress_log(self):
1854
1726
        """Remove the log info from details."""
2403
2275
class TestCaseWithMemoryTransport(TestCase):
2404
2276
    """Common test class for tests that do not need disk resources.
2405
2277
 
2406
 
    Tests that need disk resources should derive from TestCaseInTempDir
2407
 
    orTestCaseWithTransport.
 
2278
    Tests that need disk resources should derive from TestCaseWithTransport.
2408
2279
 
2409
2280
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
2410
2281
 
2411
 
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2282
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
2412
2283
    a directory which does not exist. This serves to help ensure test isolation
2413
 
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2414
 
    must exist. However, TestCaseWithMemoryTransport does not offer local file
2415
 
    defaults for the transport in tests, nor does it obey the command line
 
2284
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2285
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2286
    file defaults for the transport in tests, nor does it obey the command line
2416
2287
    override, so tests that accidentally write to the common directory should
2417
2288
    be rare.
2418
2289
 
2419
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
2420
 
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2290
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2291
    a .bzr directory that stops us ascending higher into the filesystem.
2421
2292
    """
2422
2293
 
2423
2294
    TEST_ROOT = None
2441
2312
 
2442
2313
        :param relpath: a path relative to the base url.
2443
2314
        """
2444
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2315
        t = _mod_transport.get_transport(self.get_url(relpath))
2445
2316
        self.assertFalse(t.is_readonly())
2446
2317
        return t
2447
2318
 
2453
2324
 
2454
2325
        :param relpath: a path relative to the base url.
2455
2326
        """
2456
 
        t = _mod_transport.get_transport_from_url(
2457
 
            self.get_readonly_url(relpath))
 
2327
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2458
2328
        self.assertTrue(t.is_readonly())
2459
2329
        return t
2460
2330
 
2581
2451
        real branch.
2582
2452
        """
2583
2453
        root = TestCaseWithMemoryTransport.TEST_ROOT
2584
 
        # Make sure we get a readable and accessible home for .bzr.log
2585
 
        # and/or config files, and not fallback to weird defaults (see
2586
 
        # http://pad.lv/825027).
2587
 
        self.assertIs(None, os.environ.get('BZR_HOME', None))
2588
 
        os.environ['BZR_HOME'] = root
2589
 
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2590
 
        del os.environ['BZR_HOME']
2591
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2592
 
        # we don't need to re-open the wt to check it hasn't changed.
2593
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2594
 
            wt.control_transport.get_bytes('dirstate'))
 
2454
        bzrdir.BzrDir.create_standalone_workingtree(root)
2595
2455
 
2596
2456
    def _check_safety_net(self):
2597
2457
        """Check that the safety .bzr directory have not been touched.
2600
2460
        propagating. This method ensures than a test did not leaked.
2601
2461
        """
2602
2462
        root = TestCaseWithMemoryTransport.TEST_ROOT
2603
 
        t = _mod_transport.get_transport_from_path(root)
2604
 
        self.permit_url(t.base)
2605
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2606
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2463
        self.permit_url(_mod_transport.get_transport(root).base)
 
2464
        wt = workingtree.WorkingTree.open(root)
 
2465
        last_rev = wt.last_revision()
 
2466
        if last_rev != 'null:':
2607
2467
            # The current test have modified the /bzr directory, we need to
2608
2468
            # recreate a new one or all the followng tests will fail.
2609
2469
            # If you need to inspect its content uncomment the following line
2644
2504
    def make_branch(self, relpath, format=None):
2645
2505
        """Create a branch on the transport at relpath."""
2646
2506
        repo = self.make_repository(relpath, format=format)
2647
 
        return repo.bzrdir.create_branch(append_revisions_only=False)
2648
 
 
2649
 
    def get_default_format(self):
2650
 
        return 'default'
2651
 
 
2652
 
    def resolve_format(self, format):
2653
 
        """Resolve an object to a ControlDir format object.
2654
 
 
2655
 
        The initial format object can either already be
2656
 
        a ControlDirFormat, None (for the default format),
2657
 
        or a string with the name of the control dir format.
2658
 
 
2659
 
        :param format: Object to resolve
2660
 
        :return A ControlDirFormat instance
2661
 
        """
2662
 
        if format is None:
2663
 
            format = self.get_default_format()
2664
 
        if isinstance(format, basestring):
2665
 
            format = bzrdir.format_registry.make_bzrdir(format)
2666
 
        return format
 
2507
        return repo.bzrdir.create_branch()
2667
2508
 
2668
2509
    def make_bzrdir(self, relpath, format=None):
2669
2510
        try:
2673
2514
            t = _mod_transport.get_transport(maybe_a_url)
2674
2515
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2675
2516
                t.ensure_base()
2676
 
            format = self.resolve_format(format)
 
2517
            if format is None:
 
2518
                format = 'default'
 
2519
            if isinstance(format, basestring):
 
2520
                format = bzrdir.format_registry.make_bzrdir(format)
2677
2521
            return format.initialize_on_transport(t)
2678
2522
        except errors.UninitializableFormat:
2679
2523
            raise TestSkipped("Format %s is not initializable." % format)
2680
2524
 
2681
 
    def make_repository(self, relpath, shared=None, format=None):
 
2525
    def make_repository(self, relpath, shared=False, format=None):
2682
2526
        """Create a repository on our default transport at relpath.
2683
2527
 
2684
2528
        Note that relpath must be a relative path, not a full url.
2695
2539
            backing_server = self.get_server()
2696
2540
        smart_server = test_server.SmartTCPServer_for_testing()
2697
2541
        self.start_server(smart_server, backing_server)
2698
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2542
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2699
2543
                                                   ).clone(path)
2700
2544
        return remote_transport
2701
2545
 
2718
2562
    def setUp(self):
2719
2563
        super(TestCaseWithMemoryTransport, self).setUp()
2720
2564
        # Ensure that ConnectedTransport doesn't leak sockets
2721
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2722
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2565
        def get_transport_with_cleanup(*args, **kwargs):
 
2566
            t = orig_get_transport(*args, **kwargs)
2723
2567
            if isinstance(t, _mod_transport.ConnectedTransport):
2724
2568
                self.addCleanup(t.disconnect)
2725
2569
            return t
2726
2570
 
2727
 
        orig_get_transport_from_url = self.overrideAttr(
2728
 
            _mod_transport, 'get_transport_from_url',
2729
 
            get_transport_from_url_with_cleanup)
 
2571
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2572
                                               get_transport_with_cleanup)
2730
2573
        self._make_test_root()
2731
2574
        self.addCleanup(os.chdir, os.getcwdu())
2732
2575
        self.makeAndChdirToTestDir()
2775
2618
 
2776
2619
    OVERRIDE_PYTHON = 'python'
2777
2620
 
2778
 
    def setUp(self):
2779
 
        super(TestCaseInTempDir, self).setUp()
2780
 
        # Remove the protection set in isolated_environ, we have a proper
2781
 
        # access to disk resources now.
2782
 
        self.overrideEnv('BZR_LOG', None)
2783
 
 
2784
2621
    def check_file_contents(self, filename, expect):
2785
2622
        self.log("check contents of file %s" % filename)
2786
2623
        f = file(filename)
2867
2704
                "a list or a tuple. Got %r instead" % (shape,))
2868
2705
        # It's OK to just create them using forward slashes on windows.
2869
2706
        if transport is None or transport.is_readonly():
2870
 
            transport = _mod_transport.get_transport_from_path(".")
 
2707
            transport = _mod_transport.get_transport(".")
2871
2708
        for name in shape:
2872
2709
            self.assertIsInstance(name, basestring)
2873
2710
            if name[-1] == '/':
2958
2795
        # this obviously requires a format that supports branch references
2959
2796
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2960
2797
        # RBC 20060208
2961
 
        format = self.resolve_format(format=format)
2962
 
        if not format.supports_workingtrees:
2963
 
            b = self.make_branch(relpath+'.branch', format=format)
2964
 
            return b.create_checkout(relpath, lightweight=True)
2965
2798
        b = self.make_branch(relpath, format=format)
2966
2799
        try:
2967
2800
            return b.bzrdir.create_workingtree()
3266
3099
                            result_decorators=result_decorators,
3267
3100
                            )
3268
3101
    runner.stop_on_failure=stop_on_failure
3269
 
    if isinstance(suite, unittest.TestSuite):
3270
 
        # Empty out _tests list of passed suite and populate new TestSuite
3271
 
        suite._tests[:], suite = [], TestSuite(suite)
3272
3102
    # built in decorator factories:
3273
3103
    decorators = [
3274
3104
        random_order(random_seed, runner),
3372
3202
 
3373
3203
class TestDecorator(TestUtil.TestSuite):
3374
3204
    """A decorator for TestCase/TestSuite objects.
3375
 
 
3376
 
    Contains rather than flattening suite passed on construction
 
3205
    
 
3206
    Usually, subclasses should override __iter__(used when flattening test
 
3207
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
3208
    debug().
3377
3209
    """
3378
3210
 
3379
 
    def __init__(self, suite=None):
3380
 
        super(TestDecorator, self).__init__()
3381
 
        if suite is not None:
3382
 
            self.addTest(suite)
3383
 
 
3384
 
    # Don't need subclass run method with suite emptying
3385
 
    run = unittest.TestSuite.run
 
3211
    def __init__(self, suite):
 
3212
        TestUtil.TestSuite.__init__(self)
 
3213
        self.addTest(suite)
 
3214
 
 
3215
    def countTestCases(self):
 
3216
        cases = 0
 
3217
        for test in self:
 
3218
            cases += test.countTestCases()
 
3219
        return cases
 
3220
 
 
3221
    def debug(self):
 
3222
        for test in self:
 
3223
            test.debug()
 
3224
 
 
3225
    def run(self, result):
 
3226
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
3227
        # into __iter__.
 
3228
        for test in self:
 
3229
            if result.shouldStop:
 
3230
                break
 
3231
            test.run(result)
 
3232
        return result
3386
3233
 
3387
3234
 
3388
3235
class CountingDecorator(TestDecorator):
3399
3246
    """A decorator which excludes test matching an exclude pattern."""
3400
3247
 
3401
3248
    def __init__(self, suite, exclude_pattern):
3402
 
        super(ExcludeDecorator, self).__init__(
3403
 
            exclude_tests_by_re(suite, exclude_pattern))
 
3249
        TestDecorator.__init__(self, suite)
 
3250
        self.exclude_pattern = exclude_pattern
 
3251
        self.excluded = False
 
3252
 
 
3253
    def __iter__(self):
 
3254
        if self.excluded:
 
3255
            return iter(self._tests)
 
3256
        self.excluded = True
 
3257
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
3258
        del self._tests[:]
 
3259
        self.addTests(suite)
 
3260
        return iter(self._tests)
3404
3261
 
3405
3262
 
3406
3263
class FilterTestsDecorator(TestDecorator):
3407
3264
    """A decorator which filters tests to those matching a pattern."""
3408
3265
 
3409
3266
    def __init__(self, suite, pattern):
3410
 
        super(FilterTestsDecorator, self).__init__(
3411
 
            filter_suite_by_re(suite, pattern))
 
3267
        TestDecorator.__init__(self, suite)
 
3268
        self.pattern = pattern
 
3269
        self.filtered = False
 
3270
 
 
3271
    def __iter__(self):
 
3272
        if self.filtered:
 
3273
            return iter(self._tests)
 
3274
        self.filtered = True
 
3275
        suite = filter_suite_by_re(self, self.pattern)
 
3276
        del self._tests[:]
 
3277
        self.addTests(suite)
 
3278
        return iter(self._tests)
3412
3279
 
3413
3280
 
3414
3281
class RandomDecorator(TestDecorator):
3415
3282
    """A decorator which randomises the order of its tests."""
3416
3283
 
3417
3284
    def __init__(self, suite, random_seed, stream):
3418
 
        random_seed = self.actual_seed(random_seed)
3419
 
        stream.write("Randomizing test order using seed %s\n\n" %
3420
 
            (random_seed,))
 
3285
        TestDecorator.__init__(self, suite)
 
3286
        self.random_seed = random_seed
 
3287
        self.randomised = False
 
3288
        self.stream = stream
 
3289
 
 
3290
    def __iter__(self):
 
3291
        if self.randomised:
 
3292
            return iter(self._tests)
 
3293
        self.randomised = True
 
3294
        self.stream.write("Randomizing test order using seed %s\n\n" %
 
3295
            (self.actual_seed()))
3421
3296
        # Initialise the random number generator.
3422
 
        random.seed(random_seed)
3423
 
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3297
        random.seed(self.actual_seed())
 
3298
        suite = randomize_suite(self)
 
3299
        del self._tests[:]
 
3300
        self.addTests(suite)
 
3301
        return iter(self._tests)
3424
3302
 
3425
 
    @staticmethod
3426
 
    def actual_seed(seed):
3427
 
        if seed == "now":
 
3303
    def actual_seed(self):
 
3304
        if self.random_seed == "now":
3428
3305
            # We convert the seed to a long to make it reuseable across
3429
3306
            # invocations (because the user can reenter it).
3430
 
            return long(time.time())
 
3307
            self.random_seed = long(time.time())
3431
3308
        else:
3432
3309
            # Convert the seed to a long if we can
3433
3310
            try:
3434
 
                return long(seed)
3435
 
            except (TypeError, ValueError):
 
3311
                self.random_seed = long(self.random_seed)
 
3312
            except:
3436
3313
                pass
3437
 
        return seed
 
3314
        return self.random_seed
3438
3315
 
3439
3316
 
3440
3317
class TestFirstDecorator(TestDecorator):
3441
3318
    """A decorator which moves named tests to the front."""
3442
3319
 
3443
3320
    def __init__(self, suite, pattern):
3444
 
        super(TestFirstDecorator, self).__init__()
3445
 
        self.addTests(split_suite_by_re(suite, pattern))
 
3321
        TestDecorator.__init__(self, suite)
 
3322
        self.pattern = pattern
 
3323
        self.filtered = False
 
3324
 
 
3325
    def __iter__(self):
 
3326
        if self.filtered:
 
3327
            return iter(self._tests)
 
3328
        self.filtered = True
 
3329
        suites = split_suite_by_re(self, self.pattern)
 
3330
        del self._tests[:]
 
3331
        self.addTests(suites)
 
3332
        return iter(self._tests)
3446
3333
 
3447
3334
 
3448
3335
def partition_tests(suite, count):
3480
3367
    """
3481
3368
    concurrency = osutils.local_concurrency()
3482
3369
    result = []
3483
 
    from subunit import ProtocolTestCase
 
3370
    from subunit import TestProtocolClient, ProtocolTestCase
3484
3371
    from subunit.test_results import AutoTimingTestResultDecorator
3485
3372
    class TestInOtherProcess(ProtocolTestCase):
3486
3373
        # Should be in subunit, I think. RBC.
3495
3382
                os.waitpid(self.pid, 0)
3496
3383
 
3497
3384
    test_blocks = partition_tests(suite, concurrency)
3498
 
    # Clear the tests from the original suite so it doesn't keep them alive
3499
 
    suite._tests[:] = []
3500
3385
    for process_tests in test_blocks:
3501
 
        process_suite = TestUtil.TestSuite(process_tests)
3502
 
        # Also clear each split list so new suite has only reference
3503
 
        process_tests[:] = []
 
3386
        process_suite = TestUtil.TestSuite()
 
3387
        process_suite.addTests(process_tests)
3504
3388
        c2pread, c2pwrite = os.pipe()
3505
3389
        pid = os.fork()
3506
3390
        if pid == 0:
3512
3396
                # read from stdin (otherwise its a roulette to see what
3513
3397
                # child actually gets keystrokes for pdb etc).
3514
3398
                sys.stdin.close()
3515
 
                # GZ 2011-06-16: Why set stdin to None? Breaks multi fork.
3516
 
                #sys.stdin = None
 
3399
                sys.stdin = None
3517
3400
                stream = os.fdopen(c2pwrite, 'wb', 1)
3518
3401
                subunit_result = AutoTimingTestResultDecorator(
3519
 
                    SubUnitBzrProtocolClient(stream))
 
3402
                    TestProtocolClient(stream))
3520
3403
                process_suite.run(subunit_result)
3521
3404
            finally:
3522
 
                # GZ 2011-06-16: Is always exiting with silent success
3523
 
                #                really the right thing? Hurts debugging.
3524
3405
                os._exit(0)
3525
3406
        else:
3526
3407
            os.close(c2pwrite)
3633
3514
#                           with proper exclusion rules.
3634
3515
#   -Ethreads               Will display thread ident at creation/join time to
3635
3516
#                           help track thread leaks
3636
 
#   -Euncollected_cases     Display the identity of any test cases that weren't
3637
 
#                           deallocated after being completed.
3638
 
#   -Econfig_stats          Will collect statistics using addDetail
3639
3517
selftest_debug_flags = set()
3640
3518
 
3641
3519
 
3945
3823
        'bzrlib.tests.test_email_message',
3946
3824
        'bzrlib.tests.test_eol_filters',
3947
3825
        'bzrlib.tests.test_errors',
3948
 
        'bzrlib.tests.test_estimate_compressed_size',
3949
3826
        'bzrlib.tests.test_export',
3950
3827
        'bzrlib.tests.test_export_pot',
3951
3828
        'bzrlib.tests.test_extract',
3952
 
        'bzrlib.tests.test_features',
3953
3829
        'bzrlib.tests.test_fetch',
3954
3830
        'bzrlib.tests.test_fixtures',
3955
3831
        'bzrlib.tests.test_fifo_cache',
3956
3832
        'bzrlib.tests.test_filters',
3957
 
        'bzrlib.tests.test_filter_tree',
3958
3833
        'bzrlib.tests.test_ftp_transport',
3959
3834
        'bzrlib.tests.test_foreign',
3960
3835
        'bzrlib.tests.test_generate_docs',
4035
3910
        'bzrlib.tests.test_smart',
4036
3911
        'bzrlib.tests.test_smart_add',
4037
3912
        'bzrlib.tests.test_smart_request',
4038
 
        'bzrlib.tests.test_smart_signals',
4039
3913
        'bzrlib.tests.test_smart_transport',
4040
3914
        'bzrlib.tests.test_smtp_connection',
4041
3915
        'bzrlib.tests.test_source',
4352
4226
        the module is available.
4353
4227
    """
4354
4228
 
4355
 
    from bzrlib.tests.features import ModuleAvailableFeature
4356
4229
    py_module = pyutils.get_named_object(py_module_name)
4357
4230
    scenarios = [
4358
4231
        ('python', {'module': py_module}),
4399
4272
                         % (os.path.basename(dirname), printable_e))
4400
4273
 
4401
4274
 
 
4275
class Feature(object):
 
4276
    """An operating system Feature."""
 
4277
 
 
4278
    def __init__(self):
 
4279
        self._available = None
 
4280
 
 
4281
    def available(self):
 
4282
        """Is the feature available?
 
4283
 
 
4284
        :return: True if the feature is available.
 
4285
        """
 
4286
        if self._available is None:
 
4287
            self._available = self._probe()
 
4288
        return self._available
 
4289
 
 
4290
    def _probe(self):
 
4291
        """Implement this method in concrete features.
 
4292
 
 
4293
        :return: True if the feature is available.
 
4294
        """
 
4295
        raise NotImplementedError
 
4296
 
 
4297
    def __str__(self):
 
4298
        if getattr(self, 'feature_name', None):
 
4299
            return self.feature_name()
 
4300
        return self.__class__.__name__
 
4301
 
 
4302
 
 
4303
class _SymlinkFeature(Feature):
 
4304
 
 
4305
    def _probe(self):
 
4306
        return osutils.has_symlinks()
 
4307
 
 
4308
    def feature_name(self):
 
4309
        return 'symlinks'
 
4310
 
 
4311
SymlinkFeature = _SymlinkFeature()
 
4312
 
 
4313
 
 
4314
class _HardlinkFeature(Feature):
 
4315
 
 
4316
    def _probe(self):
 
4317
        return osutils.has_hardlinks()
 
4318
 
 
4319
    def feature_name(self):
 
4320
        return 'hardlinks'
 
4321
 
 
4322
HardlinkFeature = _HardlinkFeature()
 
4323
 
 
4324
 
 
4325
class _OsFifoFeature(Feature):
 
4326
 
 
4327
    def _probe(self):
 
4328
        return getattr(os, 'mkfifo', None)
 
4329
 
 
4330
    def feature_name(self):
 
4331
        return 'filesystem fifos'
 
4332
 
 
4333
OsFifoFeature = _OsFifoFeature()
 
4334
 
 
4335
 
 
4336
class _UnicodeFilenameFeature(Feature):
 
4337
    """Does the filesystem support Unicode filenames?"""
 
4338
 
 
4339
    def _probe(self):
 
4340
        try:
 
4341
            # Check for character combinations unlikely to be covered by any
 
4342
            # single non-unicode encoding. We use the characters
 
4343
            # - greek small letter alpha (U+03B1) and
 
4344
            # - braille pattern dots-123456 (U+283F).
 
4345
            os.stat(u'\u03b1\u283f')
 
4346
        except UnicodeEncodeError:
 
4347
            return False
 
4348
        except (IOError, OSError):
 
4349
            # The filesystem allows the Unicode filename but the file doesn't
 
4350
            # exist.
 
4351
            return True
 
4352
        else:
 
4353
            # The filesystem allows the Unicode filename and the file exists,
 
4354
            # for some reason.
 
4355
            return True
 
4356
 
 
4357
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4358
 
 
4359
 
 
4360
class _CompatabilityThunkFeature(Feature):
 
4361
    """This feature is just a thunk to another feature.
 
4362
 
 
4363
    It issues a deprecation warning if it is accessed, to let you know that you
 
4364
    should really use a different feature.
 
4365
    """
 
4366
 
 
4367
    def __init__(self, dep_version, module, name,
 
4368
                 replacement_name, replacement_module=None):
 
4369
        super(_CompatabilityThunkFeature, self).__init__()
 
4370
        self._module = module
 
4371
        if replacement_module is None:
 
4372
            replacement_module = module
 
4373
        self._replacement_module = replacement_module
 
4374
        self._name = name
 
4375
        self._replacement_name = replacement_name
 
4376
        self._dep_version = dep_version
 
4377
        self._feature = None
 
4378
 
 
4379
    def _ensure(self):
 
4380
        if self._feature is None:
 
4381
            depr_msg = self._dep_version % ('%s.%s'
 
4382
                                            % (self._module, self._name))
 
4383
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4384
                                               self._replacement_name)
 
4385
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4386
            # Import the new feature and use it as a replacement for the
 
4387
            # deprecated one.
 
4388
            self._feature = pyutils.get_named_object(
 
4389
                self._replacement_module, self._replacement_name)
 
4390
 
 
4391
    def _probe(self):
 
4392
        self._ensure()
 
4393
        return self._feature._probe()
 
4394
 
 
4395
 
 
4396
class ModuleAvailableFeature(Feature):
 
4397
    """This is a feature than describes a module we want to be available.
 
4398
 
 
4399
    Declare the name of the module in __init__(), and then after probing, the
 
4400
    module will be available as 'self.module'.
 
4401
 
 
4402
    :ivar module: The module if it is available, else None.
 
4403
    """
 
4404
 
 
4405
    def __init__(self, module_name):
 
4406
        super(ModuleAvailableFeature, self).__init__()
 
4407
        self.module_name = module_name
 
4408
 
 
4409
    def _probe(self):
 
4410
        try:
 
4411
            self._module = __import__(self.module_name, {}, {}, [''])
 
4412
            return True
 
4413
        except ImportError:
 
4414
            return False
 
4415
 
 
4416
    @property
 
4417
    def module(self):
 
4418
        if self.available(): # Make sure the probe has been done
 
4419
            return self._module
 
4420
        return None
 
4421
 
 
4422
    def feature_name(self):
 
4423
        return self.module_name
 
4424
 
 
4425
 
4402
4426
def probe_unicode_in_user_encoding():
4403
4427
    """Try to encode several unicode strings to use in unicode-aware tests.
4404
4428
    Return first successfull match.
4432
4456
    return None
4433
4457
 
4434
4458
 
 
4459
class _HTTPSServerFeature(Feature):
 
4460
    """Some tests want an https Server, check if one is available.
 
4461
 
 
4462
    Right now, the only way this is available is under python2.6 which provides
 
4463
    an ssl module.
 
4464
    """
 
4465
 
 
4466
    def _probe(self):
 
4467
        try:
 
4468
            import ssl
 
4469
            return True
 
4470
        except ImportError:
 
4471
            return False
 
4472
 
 
4473
    def feature_name(self):
 
4474
        return 'HTTPSServer'
 
4475
 
 
4476
 
 
4477
HTTPSServerFeature = _HTTPSServerFeature()
 
4478
 
 
4479
 
 
4480
class _UnicodeFilename(Feature):
 
4481
    """Does the filesystem support Unicode filenames?"""
 
4482
 
 
4483
    def _probe(self):
 
4484
        try:
 
4485
            os.stat(u'\u03b1')
 
4486
        except UnicodeEncodeError:
 
4487
            return False
 
4488
        except (IOError, OSError):
 
4489
            # The filesystem allows the Unicode filename but the file doesn't
 
4490
            # exist.
 
4491
            return True
 
4492
        else:
 
4493
            # The filesystem allows the Unicode filename and the file exists,
 
4494
            # for some reason.
 
4495
            return True
 
4496
 
 
4497
UnicodeFilename = _UnicodeFilename()
 
4498
 
 
4499
 
 
4500
class _ByteStringNamedFilesystem(Feature):
 
4501
    """Is the filesystem based on bytes?"""
 
4502
 
 
4503
    def _probe(self):
 
4504
        if os.name == "posix":
 
4505
            return True
 
4506
        return False
 
4507
 
 
4508
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4509
 
 
4510
 
 
4511
class _UTF8Filesystem(Feature):
 
4512
    """Is the filesystem UTF-8?"""
 
4513
 
 
4514
    def _probe(self):
 
4515
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4516
            return True
 
4517
        return False
 
4518
 
 
4519
UTF8Filesystem = _UTF8Filesystem()
 
4520
 
 
4521
 
 
4522
class _BreakinFeature(Feature):
 
4523
    """Does this platform support the breakin feature?"""
 
4524
 
 
4525
    def _probe(self):
 
4526
        from bzrlib import breakin
 
4527
        if breakin.determine_signal() is None:
 
4528
            return False
 
4529
        if sys.platform == 'win32':
 
4530
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4531
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4532
            # access the function
 
4533
            try:
 
4534
                import ctypes
 
4535
            except OSError:
 
4536
                return False
 
4537
        return True
 
4538
 
 
4539
    def feature_name(self):
 
4540
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4541
 
 
4542
 
 
4543
BreakinFeature = _BreakinFeature()
 
4544
 
 
4545
 
 
4546
class _CaseInsCasePresFilenameFeature(Feature):
 
4547
    """Is the file-system case insensitive, but case-preserving?"""
 
4548
 
 
4549
    def _probe(self):
 
4550
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4551
        try:
 
4552
            # first check truly case-preserving for created files, then check
 
4553
            # case insensitive when opening existing files.
 
4554
            name = osutils.normpath(name)
 
4555
            base, rel = osutils.split(name)
 
4556
            found_rel = osutils.canonical_relpath(base, name)
 
4557
            return (found_rel == rel
 
4558
                    and os.path.isfile(name.upper())
 
4559
                    and os.path.isfile(name.lower()))
 
4560
        finally:
 
4561
            os.close(fileno)
 
4562
            os.remove(name)
 
4563
 
 
4564
    def feature_name(self):
 
4565
        return "case-insensitive case-preserving filesystem"
 
4566
 
 
4567
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4568
 
 
4569
 
 
4570
class _CaseInsensitiveFilesystemFeature(Feature):
 
4571
    """Check if underlying filesystem is case-insensitive but *not* case
 
4572
    preserving.
 
4573
    """
 
4574
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4575
    # more likely to be case preserving, so this case is rare.
 
4576
 
 
4577
    def _probe(self):
 
4578
        if CaseInsCasePresFilenameFeature.available():
 
4579
            return False
 
4580
 
 
4581
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4582
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4583
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4584
        else:
 
4585
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4586
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4587
            dir=root)
 
4588
        name_a = osutils.pathjoin(tdir, 'a')
 
4589
        name_A = osutils.pathjoin(tdir, 'A')
 
4590
        os.mkdir(name_a)
 
4591
        result = osutils.isdir(name_A)
 
4592
        _rmtree_temp_dir(tdir)
 
4593
        return result
 
4594
 
 
4595
    def feature_name(self):
 
4596
        return 'case-insensitive filesystem'
 
4597
 
 
4598
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4599
 
 
4600
 
 
4601
class _CaseSensitiveFilesystemFeature(Feature):
 
4602
 
 
4603
    def _probe(self):
 
4604
        if CaseInsCasePresFilenameFeature.available():
 
4605
            return False
 
4606
        elif CaseInsensitiveFilesystemFeature.available():
 
4607
            return False
 
4608
        else:
 
4609
            return True
 
4610
 
 
4611
    def feature_name(self):
 
4612
        return 'case-sensitive filesystem'
 
4613
 
 
4614
# new coding style is for feature instances to be lowercase
 
4615
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4616
 
 
4617
 
4435
4618
# Only define SubUnitBzrRunner if subunit is available.
4436
4619
try:
4437
4620
    from subunit import TestProtocolClient
4438
4621
    from subunit.test_results import AutoTimingTestResultDecorator
4439
4622
    class SubUnitBzrProtocolClient(TestProtocolClient):
4440
4623
 
4441
 
        def stopTest(self, test):
4442
 
            super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
 
            _clear__type_equality_funcs(test)
4444
 
 
4445
4624
        def addSuccess(self, test, details=None):
4446
4625
            # The subunit client always includes the details in the subunit
4447
4626
            # stream, but we don't want to include it in ours.
4459
4638
except ImportError:
4460
4639
    pass
4461
4640
 
4462
 
 
4463
 
@deprecated_function(deprecated_in((2, 5, 0)))
4464
 
def ModuleAvailableFeature(name):
4465
 
    from bzrlib.tests import features
4466
 
    return features.ModuleAvailableFeature(name)
4467
 
    
 
4641
class _PosixPermissionsFeature(Feature):
 
4642
 
 
4643
    def _probe(self):
 
4644
        def has_perms():
 
4645
            # create temporary file and check if specified perms are maintained.
 
4646
            import tempfile
 
4647
 
 
4648
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4649
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4650
            fd, name = f
 
4651
            os.close(fd)
 
4652
            os.chmod(name, write_perms)
 
4653
 
 
4654
            read_perms = os.stat(name).st_mode & 0777
 
4655
            os.unlink(name)
 
4656
            return (write_perms == read_perms)
 
4657
 
 
4658
        return (os.name == 'posix') and has_perms()
 
4659
 
 
4660
    def feature_name(self):
 
4661
        return 'POSIX permissions support'
 
4662
 
 
4663
posix_permissions_feature = _PosixPermissionsFeature()