~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Vincent Ladeuil
  • Date: 2011-06-27 15:42:09 UTC
  • mfrom: (5993 +trunk)
  • mto: (5993.1.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 5994.
  • Revision ID: v.ladeuil+lp@free.fr-20110627154209-azubuhbuxsz109hq
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Testing framework extensions"""
18
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
19
25
# NOTE: Some classes in here use camelCaseNaming() rather than
20
26
# underscore_naming().  That's for consistency with unittest; it's not the
21
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
61
67
    chk_map,
62
68
    commands as _mod_commands,
63
69
    config,
64
 
    i18n,
65
70
    debug,
66
71
    errors,
67
72
    hooks,
89
94
    memory,
90
95
    pathfilter,
91
96
    )
92
 
from bzrlib.symbol_versioning import (
93
 
    deprecated_function,
94
 
    deprecated_in,
95
 
    )
96
97
from bzrlib.tests import (
97
98
    test_server,
98
99
    TestUtil,
212
213
        osutils.set_or_unset_env(var, value)
213
214
 
214
215
 
215
 
def _clear__type_equality_funcs(test):
216
 
    """Cleanup bound methods stored on TestCase instances
217
 
 
218
 
    Clear the dict breaking a few (mostly) harmless cycles in the affected
219
 
    unittests released with Python 2.6 and initial Python 2.7 versions.
220
 
 
221
 
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222
 
    shipped in Oneiric, an object with no clear method was used, hence the
223
 
    extra complications, see bug 809048 for details.
224
 
    """
225
 
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226
 
    if type_equality_funcs is not None:
227
 
        tef_clear = getattr(type_equality_funcs, "clear", None)
228
 
        if tef_clear is None:
229
 
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230
 
            if tef_instance_dict is not None:
231
 
                tef_clear = tef_instance_dict.clear
232
 
        if tef_clear is not None:
233
 
            tef_clear()
234
 
 
235
 
 
236
216
class ExtendedTestResult(testtools.TextTestResult):
237
217
    """Accepts, reports and accumulates the results of running tests.
238
218
 
406
386
        getDetails = getattr(test, "getDetails", None)
407
387
        if getDetails is not None:
408
388
            getDetails().clear()
409
 
        _clear__type_equality_funcs(test)
 
389
        type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
390
        if type_equality_funcs is not None:
 
391
            type_equality_funcs.clear()
410
392
        self._traceback_from_test = None
411
393
 
412
394
    def startTests(self):
513
495
        self.not_applicable_count += 1
514
496
        self.report_not_applicable(test, reason)
515
497
 
516
 
    def _count_stored_tests(self):
517
 
        """Count of tests instances kept alive due to not succeeding"""
518
 
        return self.error_count + self.failure_count + self.known_failure_count
519
 
 
520
498
    def _post_mortem(self, tb=None):
521
499
        """Start a PDB post mortem session."""
522
500
        if os.environ.get('BZR_TEST_PDB', None):
997
975
        for feature in getattr(self, '_test_needs_features', []):
998
976
            self.requireFeature(feature)
999
977
        self._cleanEnvironment()
1000
 
        if bzrlib.global_state is not None:
1001
 
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
1002
 
                              config.CommandLineSection())
1003
978
        self._silenceUI()
1004
979
        self._startLogFile()
1005
980
        self._benchcalls = []
1023
998
        self._counters = {}
1024
999
        if 'config_stats' in selftest_debug_flags:
1025
1000
            self._install_config_stats_hooks()
1026
 
        # Do not use i18n for tests (unless the test reverses this)
1027
 
        i18n.disable_i18n()
1028
1001
 
1029
1002
    def debug(self):
1030
1003
        # debug a frame up.
1031
1004
        import pdb
1032
 
        # The sys preserved stdin/stdout should allow blackbox tests debugging
1033
 
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
1034
 
                ).set_trace(sys._getframe().f_back)
 
1005
        pdb.Pdb().set_trace(sys._getframe().f_back)
1035
1006
 
1036
1007
    def discardDetail(self, name):
1037
1008
        """Extend the addDetail, getDetails api so we can remove a detail.
1192
1163
 
1193
1164
    def permit_dir(self, name):
1194
1165
        """Permit a directory to be used by this test. See permit_url."""
1195
 
        name_transport = _mod_transport.get_transport_from_path(name)
 
1166
        name_transport = _mod_transport.get_transport(name)
1196
1167
        self.permit_url(name)
1197
1168
        self.permit_url(name_transport.base)
1198
1169
 
1277
1248
        self.addCleanup(transport_server.stop_server)
1278
1249
        # Obtain a real transport because if the server supplies a password, it
1279
1250
        # will be hidden from the base on the client side.
1280
 
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1251
        t = _mod_transport.get_transport(transport_server.get_url())
1281
1252
        # Some transport servers effectively chroot the backing transport;
1282
1253
        # others like SFTPServer don't - users of the transport can walk up the
1283
1254
        # transport to read the entire backing transport. This wouldn't matter
1759
1730
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1760
1731
        """Overrides an object attribute restoring it after the test.
1761
1732
 
1762
 
        :note: This should be used with discretion; you should think about
1763
 
        whether it's better to make the code testable without monkey-patching.
1764
 
 
1765
1733
        :param obj: The object that will be mutated.
1766
1734
 
1767
1735
        :param attr_name: The attribute name we want to preserve/override in
1792
1760
        self.addCleanup(osutils.set_or_unset_env, name, value)
1793
1761
        return value
1794
1762
 
1795
 
    def recordCalls(self, obj, attr_name):
1796
 
        """Monkeypatch in a wrapper that will record calls.
1797
 
 
1798
 
        The monkeypatch is automatically removed when the test concludes.
1799
 
 
1800
 
        :param obj: The namespace holding the reference to be replaced;
1801
 
            typically a module, class, or object.
1802
 
        :param attr_name: A string for the name of the attribute to 
1803
 
            patch.
1804
 
        :returns: A list that will be extended with one item every time the
1805
 
            function is called, with a tuple of (args, kwargs).
1806
 
        """
1807
 
        calls = []
1808
 
 
1809
 
        def decorator(*args, **kwargs):
1810
 
            calls.append((args, kwargs))
1811
 
            return orig(*args, **kwargs)
1812
 
        orig = self.overrideAttr(obj, attr_name, decorator)
1813
 
        return calls
1814
 
 
1815
1763
    def _cleanEnvironment(self):
1816
1764
        for name, value in isolated_environ.iteritems():
1817
1765
            self.overrideEnv(name, value)
1824
1772
        self._preserved_lazy_hooks.clear()
1825
1773
 
1826
1774
    def knownFailure(self, reason):
1827
 
        """Declare that this test fails for a known reason
1828
 
 
1829
 
        Tests that are known to fail should generally be using expectedFailure
1830
 
        with an appropriate reverse assertion if a change could cause the test
1831
 
        to start passing. Conversely if the test has no immediate prospect of
1832
 
        succeeding then using skip is more suitable.
1833
 
 
1834
 
        When this method is called while an exception is being handled, that
1835
 
        traceback will be used, otherwise a new exception will be thrown to
1836
 
        provide one but won't be reported.
1837
 
        """
1838
 
        self._add_reason(reason)
1839
 
        try:
1840
 
            exc_info = sys.exc_info()
1841
 
            if exc_info != (None, None, None):
1842
 
                self._report_traceback(exc_info)
1843
 
            else:
1844
 
                try:
1845
 
                    raise self.failureException(reason)
1846
 
                except self.failureException:
1847
 
                    exc_info = sys.exc_info()
1848
 
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1849
 
            raise testtools.testcase._ExpectedFailure(exc_info)
1850
 
        finally:
1851
 
            del exc_info
 
1775
        """This test has failed for some known reason."""
 
1776
        raise KnownFailure(reason)
1852
1777
 
1853
1778
    def _suppress_log(self):
1854
1779
        """Remove the log info from details."""
2441
2366
 
2442
2367
        :param relpath: a path relative to the base url.
2443
2368
        """
2444
 
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2369
        t = _mod_transport.get_transport(self.get_url(relpath))
2445
2370
        self.assertFalse(t.is_readonly())
2446
2371
        return t
2447
2372
 
2453
2378
 
2454
2379
        :param relpath: a path relative to the base url.
2455
2380
        """
2456
 
        t = _mod_transport.get_transport_from_url(
2457
 
            self.get_readonly_url(relpath))
 
2381
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
2458
2382
        self.assertTrue(t.is_readonly())
2459
2383
        return t
2460
2384
 
2581
2505
        real branch.
2582
2506
        """
2583
2507
        root = TestCaseWithMemoryTransport.TEST_ROOT
2584
 
        # Make sure we get a readable and accessible home for .bzr.log
2585
 
        # and/or config files, and not fallback to weird defaults (see
2586
 
        # http://pad.lv/825027).
2587
 
        self.assertIs(None, os.environ.get('BZR_HOME', None))
2588
 
        os.environ['BZR_HOME'] = root
2589
 
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
2590
 
        del os.environ['BZR_HOME']
2591
 
        # Hack for speed: remember the raw bytes of the dirstate file so that
2592
 
        # we don't need to re-open the wt to check it hasn't changed.
2593
 
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2594
 
            wt.control_transport.get_bytes('dirstate'))
 
2508
        bzrdir.BzrDir.create_standalone_workingtree(root)
2595
2509
 
2596
2510
    def _check_safety_net(self):
2597
2511
        """Check that the safety .bzr directory have not been touched.
2600
2514
        propagating. This method ensures than a test did not leaked.
2601
2515
        """
2602
2516
        root = TestCaseWithMemoryTransport.TEST_ROOT
2603
 
        t = _mod_transport.get_transport_from_path(root)
2604
 
        self.permit_url(t.base)
2605
 
        if (t.get_bytes('.bzr/checkout/dirstate') != 
2606
 
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2517
        self.permit_url(_mod_transport.get_transport(root).base)
 
2518
        wt = workingtree.WorkingTree.open(root)
 
2519
        last_rev = wt.last_revision()
 
2520
        if last_rev != 'null:':
2607
2521
            # The current test have modified the /bzr directory, we need to
2608
2522
            # recreate a new one or all the followng tests will fail.
2609
2523
            # If you need to inspect its content uncomment the following line
2644
2558
    def make_branch(self, relpath, format=None):
2645
2559
        """Create a branch on the transport at relpath."""
2646
2560
        repo = self.make_repository(relpath, format=format)
2647
 
        return repo.bzrdir.create_branch(append_revisions_only=False)
2648
 
 
2649
 
    def get_default_format(self):
2650
 
        return 'default'
2651
 
 
2652
 
    def resolve_format(self, format):
2653
 
        """Resolve an object to a ControlDir format object.
2654
 
 
2655
 
        The initial format object can either already be
2656
 
        a ControlDirFormat, None (for the default format),
2657
 
        or a string with the name of the control dir format.
2658
 
 
2659
 
        :param format: Object to resolve
2660
 
        :return A ControlDirFormat instance
2661
 
        """
2662
 
        if format is None:
2663
 
            format = self.get_default_format()
2664
 
        if isinstance(format, basestring):
2665
 
            format = bzrdir.format_registry.make_bzrdir(format)
2666
 
        return format
 
2561
        return repo.bzrdir.create_branch()
2667
2562
 
2668
2563
    def make_bzrdir(self, relpath, format=None):
2669
2564
        try:
2673
2568
            t = _mod_transport.get_transport(maybe_a_url)
2674
2569
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2675
2570
                t.ensure_base()
2676
 
            format = self.resolve_format(format)
 
2571
            if format is None:
 
2572
                format = 'default'
 
2573
            if isinstance(format, basestring):
 
2574
                format = bzrdir.format_registry.make_bzrdir(format)
2677
2575
            return format.initialize_on_transport(t)
2678
2576
        except errors.UninitializableFormat:
2679
2577
            raise TestSkipped("Format %s is not initializable." % format)
2680
2578
 
2681
 
    def make_repository(self, relpath, shared=None, format=None):
 
2579
    def make_repository(self, relpath, shared=False, format=None):
2682
2580
        """Create a repository on our default transport at relpath.
2683
2581
 
2684
2582
        Note that relpath must be a relative path, not a full url.
2695
2593
            backing_server = self.get_server()
2696
2594
        smart_server = test_server.SmartTCPServer_for_testing()
2697
2595
        self.start_server(smart_server, backing_server)
2698
 
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2596
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2699
2597
                                                   ).clone(path)
2700
2598
        return remote_transport
2701
2599
 
2718
2616
    def setUp(self):
2719
2617
        super(TestCaseWithMemoryTransport, self).setUp()
2720
2618
        # Ensure that ConnectedTransport doesn't leak sockets
2721
 
        def get_transport_from_url_with_cleanup(*args, **kwargs):
2722
 
            t = orig_get_transport_from_url(*args, **kwargs)
 
2619
        def get_transport_with_cleanup(*args, **kwargs):
 
2620
            t = orig_get_transport(*args, **kwargs)
2723
2621
            if isinstance(t, _mod_transport.ConnectedTransport):
2724
2622
                self.addCleanup(t.disconnect)
2725
2623
            return t
2726
2624
 
2727
 
        orig_get_transport_from_url = self.overrideAttr(
2728
 
            _mod_transport, 'get_transport_from_url',
2729
 
            get_transport_from_url_with_cleanup)
 
2625
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2626
                                               get_transport_with_cleanup)
2730
2627
        self._make_test_root()
2731
2628
        self.addCleanup(os.chdir, os.getcwdu())
2732
2629
        self.makeAndChdirToTestDir()
2867
2764
                "a list or a tuple. Got %r instead" % (shape,))
2868
2765
        # It's OK to just create them using forward slashes on windows.
2869
2766
        if transport is None or transport.is_readonly():
2870
 
            transport = _mod_transport.get_transport_from_path(".")
 
2767
            transport = _mod_transport.get_transport(".")
2871
2768
        for name in shape:
2872
2769
            self.assertIsInstance(name, basestring)
2873
2770
            if name[-1] == '/':
2958
2855
        # this obviously requires a format that supports branch references
2959
2856
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2960
2857
        # RBC 20060208
2961
 
        format = self.resolve_format(format=format)
2962
 
        if not format.supports_workingtrees:
2963
 
            b = self.make_branch(relpath+'.branch', format=format)
2964
 
            return b.create_checkout(relpath, lightweight=True)
2965
2858
        b = self.make_branch(relpath, format=format)
2966
2859
        try:
2967
2860
            return b.bzrdir.create_workingtree()
3266
3159
                            result_decorators=result_decorators,
3267
3160
                            )
3268
3161
    runner.stop_on_failure=stop_on_failure
3269
 
    if isinstance(suite, unittest.TestSuite):
3270
 
        # Empty out _tests list of passed suite and populate new TestSuite
3271
 
        suite._tests[:], suite = [], TestSuite(suite)
3272
3162
    # built in decorator factories:
3273
3163
    decorators = [
3274
3164
        random_order(random_seed, runner),
3372
3262
 
3373
3263
class TestDecorator(TestUtil.TestSuite):
3374
3264
    """A decorator for TestCase/TestSuite objects.
3375
 
 
3376
 
    Contains rather than flattening suite passed on construction
 
3265
    
 
3266
    Usually, subclasses should override __iter__(used when flattening test
 
3267
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
3268
    debug().
3377
3269
    """
3378
3270
 
3379
 
    def __init__(self, suite=None):
3380
 
        super(TestDecorator, self).__init__()
3381
 
        if suite is not None:
3382
 
            self.addTest(suite)
3383
 
 
3384
 
    # Don't need subclass run method with suite emptying
3385
 
    run = unittest.TestSuite.run
 
3271
    def __init__(self, suite):
 
3272
        TestUtil.TestSuite.__init__(self)
 
3273
        self.addTest(suite)
 
3274
 
 
3275
    def countTestCases(self):
 
3276
        cases = 0
 
3277
        for test in self:
 
3278
            cases += test.countTestCases()
 
3279
        return cases
 
3280
 
 
3281
    def debug(self):
 
3282
        for test in self:
 
3283
            test.debug()
 
3284
 
 
3285
    def run(self, result):
 
3286
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
3287
        # into __iter__.
 
3288
        for test in self:
 
3289
            if result.shouldStop:
 
3290
                break
 
3291
            test.run(result)
 
3292
        return result
3386
3293
 
3387
3294
 
3388
3295
class CountingDecorator(TestDecorator):
3399
3306
    """A decorator which excludes test matching an exclude pattern."""
3400
3307
 
3401
3308
    def __init__(self, suite, exclude_pattern):
3402
 
        super(ExcludeDecorator, self).__init__(
3403
 
            exclude_tests_by_re(suite, exclude_pattern))
 
3309
        TestDecorator.__init__(self, suite)
 
3310
        self.exclude_pattern = exclude_pattern
 
3311
        self.excluded = False
 
3312
 
 
3313
    def __iter__(self):
 
3314
        if self.excluded:
 
3315
            return iter(self._tests)
 
3316
        self.excluded = True
 
3317
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
3318
        del self._tests[:]
 
3319
        self.addTests(suite)
 
3320
        return iter(self._tests)
3404
3321
 
3405
3322
 
3406
3323
class FilterTestsDecorator(TestDecorator):
3407
3324
    """A decorator which filters tests to those matching a pattern."""
3408
3325
 
3409
3326
    def __init__(self, suite, pattern):
3410
 
        super(FilterTestsDecorator, self).__init__(
3411
 
            filter_suite_by_re(suite, pattern))
 
3327
        TestDecorator.__init__(self, suite)
 
3328
        self.pattern = pattern
 
3329
        self.filtered = False
 
3330
 
 
3331
    def __iter__(self):
 
3332
        if self.filtered:
 
3333
            return iter(self._tests)
 
3334
        self.filtered = True
 
3335
        suite = filter_suite_by_re(self, self.pattern)
 
3336
        del self._tests[:]
 
3337
        self.addTests(suite)
 
3338
        return iter(self._tests)
3412
3339
 
3413
3340
 
3414
3341
class RandomDecorator(TestDecorator):
3415
3342
    """A decorator which randomises the order of its tests."""
3416
3343
 
3417
3344
    def __init__(self, suite, random_seed, stream):
3418
 
        random_seed = self.actual_seed(random_seed)
3419
 
        stream.write("Randomizing test order using seed %s\n\n" %
3420
 
            (random_seed,))
 
3345
        TestDecorator.__init__(self, suite)
 
3346
        self.random_seed = random_seed
 
3347
        self.randomised = False
 
3348
        self.stream = stream
 
3349
 
 
3350
    def __iter__(self):
 
3351
        if self.randomised:
 
3352
            return iter(self._tests)
 
3353
        self.randomised = True
 
3354
        self.stream.write("Randomizing test order using seed %s\n\n" %
 
3355
            (self.actual_seed()))
3421
3356
        # Initialise the random number generator.
3422
 
        random.seed(random_seed)
3423
 
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3357
        random.seed(self.actual_seed())
 
3358
        suite = randomize_suite(self)
 
3359
        del self._tests[:]
 
3360
        self.addTests(suite)
 
3361
        return iter(self._tests)
3424
3362
 
3425
 
    @staticmethod
3426
 
    def actual_seed(seed):
3427
 
        if seed == "now":
 
3363
    def actual_seed(self):
 
3364
        if self.random_seed == "now":
3428
3365
            # We convert the seed to a long to make it reuseable across
3429
3366
            # invocations (because the user can reenter it).
3430
 
            return long(time.time())
 
3367
            self.random_seed = long(time.time())
3431
3368
        else:
3432
3369
            # Convert the seed to a long if we can
3433
3370
            try:
3434
 
                return long(seed)
3435
 
            except (TypeError, ValueError):
 
3371
                self.random_seed = long(self.random_seed)
 
3372
            except:
3436
3373
                pass
3437
 
        return seed
 
3374
        return self.random_seed
3438
3375
 
3439
3376
 
3440
3377
class TestFirstDecorator(TestDecorator):
3441
3378
    """A decorator which moves named tests to the front."""
3442
3379
 
3443
3380
    def __init__(self, suite, pattern):
3444
 
        super(TestFirstDecorator, self).__init__()
3445
 
        self.addTests(split_suite_by_re(suite, pattern))
 
3381
        TestDecorator.__init__(self, suite)
 
3382
        self.pattern = pattern
 
3383
        self.filtered = False
 
3384
 
 
3385
    def __iter__(self):
 
3386
        if self.filtered:
 
3387
            return iter(self._tests)
 
3388
        self.filtered = True
 
3389
        suites = split_suite_by_re(self, self.pattern)
 
3390
        del self._tests[:]
 
3391
        self.addTests(suites)
 
3392
        return iter(self._tests)
3446
3393
 
3447
3394
 
3448
3395
def partition_tests(suite, count):
3480
3427
    """
3481
3428
    concurrency = osutils.local_concurrency()
3482
3429
    result = []
3483
 
    from subunit import ProtocolTestCase
 
3430
    from subunit import TestProtocolClient, ProtocolTestCase
3484
3431
    from subunit.test_results import AutoTimingTestResultDecorator
3485
3432
    class TestInOtherProcess(ProtocolTestCase):
3486
3433
        # Should be in subunit, I think. RBC.
3495
3442
                os.waitpid(self.pid, 0)
3496
3443
 
3497
3444
    test_blocks = partition_tests(suite, concurrency)
3498
 
    # Clear the tests from the original suite so it doesn't keep them alive
3499
 
    suite._tests[:] = []
3500
3445
    for process_tests in test_blocks:
3501
 
        process_suite = TestUtil.TestSuite(process_tests)
3502
 
        # Also clear each split list so new suite has only reference
3503
 
        process_tests[:] = []
 
3446
        process_suite = TestUtil.TestSuite()
 
3447
        process_suite.addTests(process_tests)
3504
3448
        c2pread, c2pwrite = os.pipe()
3505
3449
        pid = os.fork()
3506
3450
        if pid == 0:
3512
3456
                # read from stdin (otherwise its a roulette to see what
3513
3457
                # child actually gets keystrokes for pdb etc).
3514
3458
                sys.stdin.close()
3515
 
                # GZ 2011-06-16: Why set stdin to None? Breaks multi fork.
3516
 
                #sys.stdin = None
 
3459
                sys.stdin = None
3517
3460
                stream = os.fdopen(c2pwrite, 'wb', 1)
3518
3461
                subunit_result = AutoTimingTestResultDecorator(
3519
 
                    SubUnitBzrProtocolClient(stream))
 
3462
                    TestProtocolClient(stream))
3520
3463
                process_suite.run(subunit_result)
3521
3464
            finally:
3522
 
                # GZ 2011-06-16: Is always exiting with silent success
3523
 
                #                really the right thing? Hurts debugging.
3524
3465
                os._exit(0)
3525
3466
        else:
3526
3467
            os.close(c2pwrite)
3633
3574
#                           with proper exclusion rules.
3634
3575
#   -Ethreads               Will display thread ident at creation/join time to
3635
3576
#                           help track thread leaks
3636
 
#   -Euncollected_cases     Display the identity of any test cases that weren't
3637
 
#                           deallocated after being completed.
 
3577
 
3638
3578
#   -Econfig_stats          Will collect statistics using addDetail
3639
3579
selftest_debug_flags = set()
3640
3580
 
3945
3885
        'bzrlib.tests.test_email_message',
3946
3886
        'bzrlib.tests.test_eol_filters',
3947
3887
        'bzrlib.tests.test_errors',
3948
 
        'bzrlib.tests.test_estimate_compressed_size',
3949
3888
        'bzrlib.tests.test_export',
3950
3889
        'bzrlib.tests.test_export_pot',
3951
3890
        'bzrlib.tests.test_extract',
3952
 
        'bzrlib.tests.test_features',
3953
3891
        'bzrlib.tests.test_fetch',
3954
3892
        'bzrlib.tests.test_fixtures',
3955
3893
        'bzrlib.tests.test_fifo_cache',
3956
3894
        'bzrlib.tests.test_filters',
3957
 
        'bzrlib.tests.test_filter_tree',
3958
3895
        'bzrlib.tests.test_ftp_transport',
3959
3896
        'bzrlib.tests.test_foreign',
3960
3897
        'bzrlib.tests.test_generate_docs',
4035
3972
        'bzrlib.tests.test_smart',
4036
3973
        'bzrlib.tests.test_smart_add',
4037
3974
        'bzrlib.tests.test_smart_request',
4038
 
        'bzrlib.tests.test_smart_signals',
4039
3975
        'bzrlib.tests.test_smart_transport',
4040
3976
        'bzrlib.tests.test_smtp_connection',
4041
3977
        'bzrlib.tests.test_source',
4352
4288
        the module is available.
4353
4289
    """
4354
4290
 
4355
 
    from bzrlib.tests.features import ModuleAvailableFeature
4356
4291
    py_module = pyutils.get_named_object(py_module_name)
4357
4292
    scenarios = [
4358
4293
        ('python', {'module': py_module}),
4399
4334
                         % (os.path.basename(dirname), printable_e))
4400
4335
 
4401
4336
 
 
4337
class Feature(object):
 
4338
    """An operating system Feature."""
 
4339
 
 
4340
    def __init__(self):
 
4341
        self._available = None
 
4342
 
 
4343
    def available(self):
 
4344
        """Is the feature available?
 
4345
 
 
4346
        :return: True if the feature is available.
 
4347
        """
 
4348
        if self._available is None:
 
4349
            self._available = self._probe()
 
4350
        return self._available
 
4351
 
 
4352
    def _probe(self):
 
4353
        """Implement this method in concrete features.
 
4354
 
 
4355
        :return: True if the feature is available.
 
4356
        """
 
4357
        raise NotImplementedError
 
4358
 
 
4359
    def __str__(self):
 
4360
        if getattr(self, 'feature_name', None):
 
4361
            return self.feature_name()
 
4362
        return self.__class__.__name__
 
4363
 
 
4364
 
 
4365
class _SymlinkFeature(Feature):
 
4366
 
 
4367
    def _probe(self):
 
4368
        return osutils.has_symlinks()
 
4369
 
 
4370
    def feature_name(self):
 
4371
        return 'symlinks'
 
4372
 
 
4373
SymlinkFeature = _SymlinkFeature()
 
4374
 
 
4375
 
 
4376
class _HardlinkFeature(Feature):
 
4377
 
 
4378
    def _probe(self):
 
4379
        return osutils.has_hardlinks()
 
4380
 
 
4381
    def feature_name(self):
 
4382
        return 'hardlinks'
 
4383
 
 
4384
HardlinkFeature = _HardlinkFeature()
 
4385
 
 
4386
 
 
4387
class _OsFifoFeature(Feature):
 
4388
 
 
4389
    def _probe(self):
 
4390
        return getattr(os, 'mkfifo', None)
 
4391
 
 
4392
    def feature_name(self):
 
4393
        return 'filesystem fifos'
 
4394
 
 
4395
OsFifoFeature = _OsFifoFeature()
 
4396
 
 
4397
 
 
4398
class _UnicodeFilenameFeature(Feature):
 
4399
    """Does the filesystem support Unicode filenames?"""
 
4400
 
 
4401
    def _probe(self):
 
4402
        try:
 
4403
            # Check for character combinations unlikely to be covered by any
 
4404
            # single non-unicode encoding. We use the characters
 
4405
            # - greek small letter alpha (U+03B1) and
 
4406
            # - braille pattern dots-123456 (U+283F).
 
4407
            os.stat(u'\u03b1\u283f')
 
4408
        except UnicodeEncodeError:
 
4409
            return False
 
4410
        except (IOError, OSError):
 
4411
            # The filesystem allows the Unicode filename but the file doesn't
 
4412
            # exist.
 
4413
            return True
 
4414
        else:
 
4415
            # The filesystem allows the Unicode filename and the file exists,
 
4416
            # for some reason.
 
4417
            return True
 
4418
 
 
4419
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4420
 
 
4421
 
 
4422
class _CompatabilityThunkFeature(Feature):
 
4423
    """This feature is just a thunk to another feature.
 
4424
 
 
4425
    It issues a deprecation warning if it is accessed, to let you know that you
 
4426
    should really use a different feature.
 
4427
    """
 
4428
 
 
4429
    def __init__(self, dep_version, module, name,
 
4430
                 replacement_name, replacement_module=None):
 
4431
        super(_CompatabilityThunkFeature, self).__init__()
 
4432
        self._module = module
 
4433
        if replacement_module is None:
 
4434
            replacement_module = module
 
4435
        self._replacement_module = replacement_module
 
4436
        self._name = name
 
4437
        self._replacement_name = replacement_name
 
4438
        self._dep_version = dep_version
 
4439
        self._feature = None
 
4440
 
 
4441
    def _ensure(self):
 
4442
        if self._feature is None:
 
4443
            depr_msg = self._dep_version % ('%s.%s'
 
4444
                                            % (self._module, self._name))
 
4445
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4446
                                               self._replacement_name)
 
4447
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4448
            # Import the new feature and use it as a replacement for the
 
4449
            # deprecated one.
 
4450
            self._feature = pyutils.get_named_object(
 
4451
                self._replacement_module, self._replacement_name)
 
4452
 
 
4453
    def _probe(self):
 
4454
        self._ensure()
 
4455
        return self._feature._probe()
 
4456
 
 
4457
 
 
4458
class ModuleAvailableFeature(Feature):
 
4459
    """This is a feature than describes a module we want to be available.
 
4460
 
 
4461
    Declare the name of the module in __init__(), and then after probing, the
 
4462
    module will be available as 'self.module'.
 
4463
 
 
4464
    :ivar module: The module if it is available, else None.
 
4465
    """
 
4466
 
 
4467
    def __init__(self, module_name):
 
4468
        super(ModuleAvailableFeature, self).__init__()
 
4469
        self.module_name = module_name
 
4470
 
 
4471
    def _probe(self):
 
4472
        try:
 
4473
            self._module = __import__(self.module_name, {}, {}, [''])
 
4474
            return True
 
4475
        except ImportError:
 
4476
            return False
 
4477
 
 
4478
    @property
 
4479
    def module(self):
 
4480
        if self.available(): # Make sure the probe has been done
 
4481
            return self._module
 
4482
        return None
 
4483
 
 
4484
    def feature_name(self):
 
4485
        return self.module_name
 
4486
 
 
4487
 
4402
4488
def probe_unicode_in_user_encoding():
4403
4489
    """Try to encode several unicode strings to use in unicode-aware tests.
4404
4490
    Return first successfull match.
4432
4518
    return None
4433
4519
 
4434
4520
 
 
4521
class _HTTPSServerFeature(Feature):
 
4522
    """Some tests want an https Server, check if one is available.
 
4523
 
 
4524
    Right now, the only way this is available is under python2.6 which provides
 
4525
    an ssl module.
 
4526
    """
 
4527
 
 
4528
    def _probe(self):
 
4529
        try:
 
4530
            import ssl
 
4531
            return True
 
4532
        except ImportError:
 
4533
            return False
 
4534
 
 
4535
    def feature_name(self):
 
4536
        return 'HTTPSServer'
 
4537
 
 
4538
 
 
4539
HTTPSServerFeature = _HTTPSServerFeature()
 
4540
 
 
4541
 
 
4542
class _UnicodeFilename(Feature):
 
4543
    """Does the filesystem support Unicode filenames?"""
 
4544
 
 
4545
    def _probe(self):
 
4546
        try:
 
4547
            os.stat(u'\u03b1')
 
4548
        except UnicodeEncodeError:
 
4549
            return False
 
4550
        except (IOError, OSError):
 
4551
            # The filesystem allows the Unicode filename but the file doesn't
 
4552
            # exist.
 
4553
            return True
 
4554
        else:
 
4555
            # The filesystem allows the Unicode filename and the file exists,
 
4556
            # for some reason.
 
4557
            return True
 
4558
 
 
4559
UnicodeFilename = _UnicodeFilename()
 
4560
 
 
4561
 
 
4562
class _ByteStringNamedFilesystem(Feature):
 
4563
    """Is the filesystem based on bytes?"""
 
4564
 
 
4565
    def _probe(self):
 
4566
        if os.name == "posix":
 
4567
            return True
 
4568
        return False
 
4569
 
 
4570
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4571
 
 
4572
 
 
4573
class _UTF8Filesystem(Feature):
 
4574
    """Is the filesystem UTF-8?"""
 
4575
 
 
4576
    def _probe(self):
 
4577
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4578
            return True
 
4579
        return False
 
4580
 
 
4581
UTF8Filesystem = _UTF8Filesystem()
 
4582
 
 
4583
 
 
4584
class _BreakinFeature(Feature):
 
4585
    """Does this platform support the breakin feature?"""
 
4586
 
 
4587
    def _probe(self):
 
4588
        from bzrlib import breakin
 
4589
        if breakin.determine_signal() is None:
 
4590
            return False
 
4591
        if sys.platform == 'win32':
 
4592
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4593
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4594
            # access the function
 
4595
            try:
 
4596
                import ctypes
 
4597
            except OSError:
 
4598
                return False
 
4599
        return True
 
4600
 
 
4601
    def feature_name(self):
 
4602
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4603
 
 
4604
 
 
4605
BreakinFeature = _BreakinFeature()
 
4606
 
 
4607
 
 
4608
class _CaseInsCasePresFilenameFeature(Feature):
 
4609
    """Is the file-system case insensitive, but case-preserving?"""
 
4610
 
 
4611
    def _probe(self):
 
4612
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4613
        try:
 
4614
            # first check truly case-preserving for created files, then check
 
4615
            # case insensitive when opening existing files.
 
4616
            name = osutils.normpath(name)
 
4617
            base, rel = osutils.split(name)
 
4618
            found_rel = osutils.canonical_relpath(base, name)
 
4619
            return (found_rel == rel
 
4620
                    and os.path.isfile(name.upper())
 
4621
                    and os.path.isfile(name.lower()))
 
4622
        finally:
 
4623
            os.close(fileno)
 
4624
            os.remove(name)
 
4625
 
 
4626
    def feature_name(self):
 
4627
        return "case-insensitive case-preserving filesystem"
 
4628
 
 
4629
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4630
 
 
4631
 
 
4632
class _CaseInsensitiveFilesystemFeature(Feature):
 
4633
    """Check if underlying filesystem is case-insensitive but *not* case
 
4634
    preserving.
 
4635
    """
 
4636
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4637
    # more likely to be case preserving, so this case is rare.
 
4638
 
 
4639
    def _probe(self):
 
4640
        if CaseInsCasePresFilenameFeature.available():
 
4641
            return False
 
4642
 
 
4643
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4644
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4645
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4646
        else:
 
4647
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4648
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4649
            dir=root)
 
4650
        name_a = osutils.pathjoin(tdir, 'a')
 
4651
        name_A = osutils.pathjoin(tdir, 'A')
 
4652
        os.mkdir(name_a)
 
4653
        result = osutils.isdir(name_A)
 
4654
        _rmtree_temp_dir(tdir)
 
4655
        return result
 
4656
 
 
4657
    def feature_name(self):
 
4658
        return 'case-insensitive filesystem'
 
4659
 
 
4660
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4661
 
 
4662
 
 
4663
class _CaseSensitiveFilesystemFeature(Feature):
 
4664
 
 
4665
    def _probe(self):
 
4666
        if CaseInsCasePresFilenameFeature.available():
 
4667
            return False
 
4668
        elif CaseInsensitiveFilesystemFeature.available():
 
4669
            return False
 
4670
        else:
 
4671
            return True
 
4672
 
 
4673
    def feature_name(self):
 
4674
        return 'case-sensitive filesystem'
 
4675
 
 
4676
# new coding style is for feature instances to be lowercase
 
4677
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4678
 
 
4679
 
4435
4680
# Only define SubUnitBzrRunner if subunit is available.
4436
4681
try:
4437
4682
    from subunit import TestProtocolClient
4438
4683
    from subunit.test_results import AutoTimingTestResultDecorator
4439
4684
    class SubUnitBzrProtocolClient(TestProtocolClient):
4440
4685
 
4441
 
        def stopTest(self, test):
4442
 
            super(SubUnitBzrProtocolClient, self).stopTest(test)
4443
 
            _clear__type_equality_funcs(test)
4444
 
 
4445
4686
        def addSuccess(self, test, details=None):
4446
4687
            # The subunit client always includes the details in the subunit
4447
4688
            # stream, but we don't want to include it in ours.
4459
4700
except ImportError:
4460
4701
    pass
4461
4702
 
4462
 
 
4463
 
@deprecated_function(deprecated_in((2, 5, 0)))
4464
 
def ModuleAvailableFeature(name):
4465
 
    from bzrlib.tests import features
4466
 
    return features.ModuleAvailableFeature(name)
4467
 
    
 
4703
class _PosixPermissionsFeature(Feature):
 
4704
 
 
4705
    def _probe(self):
 
4706
        def has_perms():
 
4707
            # create temporary file and check if specified perms are maintained.
 
4708
            import tempfile
 
4709
 
 
4710
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4711
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4712
            fd, name = f
 
4713
            os.close(fd)
 
4714
            os.chmod(name, write_perms)
 
4715
 
 
4716
            read_perms = os.stat(name).st_mode & 0777
 
4717
            os.unlink(name)
 
4718
            return (write_perms == read_perms)
 
4719
 
 
4720
        return (os.name == 'posix') and has_perms()
 
4721
 
 
4722
    def feature_name(self):
 
4723
        return 'POSIX permissions support'
 
4724
 
 
4725
posix_permissions_feature = _PosixPermissionsFeature()