~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Robert Collins
  • Date: 2010-06-25 06:23:08 UTC
  • mto: This revision was merged to the branch mainline in revision 5324.
  • Revision ID: robertc@robertcollins.net-20100625062308-qx287gzfrehs1d21
Restore the original ui_factory when existing BzrLibraryState.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
 
30
30
import atexit
31
31
import codecs
32
 
import copy
 
32
from copy import copy
33
33
from cStringIO import StringIO
34
34
import difflib
35
35
import doctest
36
36
import errno
37
 
import itertools
38
37
import logging
39
38
import math
40
39
import os
41
 
import pprint
 
40
from pprint import pformat
42
41
import random
43
42
import re
44
43
import shlex
45
44
import stat
46
 
import subprocess
 
45
from subprocess import Popen, PIPE, STDOUT
47
46
import sys
48
47
import tempfile
49
48
import threading
75
74
    ui,
76
75
    urlutils,
77
76
    registry,
78
 
    transport as _mod_transport,
79
77
    workingtree,
80
78
    )
81
79
import bzrlib.branch
105
103
    )
106
104
import bzrlib.trace
107
105
from bzrlib.transport import (
 
106
    get_transport,
108
107
    memory,
109
108
    pathfilter,
110
109
    )
 
110
import bzrlib.transport
111
111
from bzrlib.trace import mutter, note
112
112
from bzrlib.tests import (
113
113
    test_server,
114
114
    TestUtil,
115
115
    treeshape,
116
116
    )
 
117
from bzrlib.tests.http_server import HttpServer
 
118
from bzrlib.tests.TestUtil import (
 
119
                          TestSuite,
 
120
                          TestLoader,
 
121
                          )
117
122
from bzrlib.ui import NullProgressView
118
123
from bzrlib.ui.text import TextUIFactory
119
124
import bzrlib.version_info_formats.format_custom
136
141
SUBUNIT_SEEK_CUR = 1
137
142
 
138
143
 
139
 
class ExtendedTestResult(testtools.TextTestResult):
 
144
class ExtendedTestResult(unittest._TextTestResult):
140
145
    """Accepts, reports and accumulates the results of running tests.
141
146
 
142
147
    Compared to the unittest version this class adds support for
163
168
        :param bench_history: Optionally, a writable file object to accumulate
164
169
            benchmark results.
165
170
        """
166
 
        testtools.TextTestResult.__init__(self, stream)
 
171
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
167
172
        if bench_history is not None:
168
173
            from bzrlib.version import _get_bzr_source_tree
169
174
            src_tree = _get_bzr_source_tree()
196
201
        actionTaken = "Ran"
197
202
        stopTime = time.time()
198
203
        timeTaken = stopTime - self.startTime
199
 
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
200
 
        #                the parent class method is similar have to duplicate
201
 
        self._show_list('ERROR', self.errors)
202
 
        self._show_list('FAIL', self.failures)
203
 
        self.stream.write(self.sep2)
204
 
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
204
        self.printErrors()
 
205
        self.stream.writeln(self.separator2)
 
206
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
205
207
                            run, run != 1 and "s" or "", timeTaken))
 
208
        self.stream.writeln()
206
209
        if not self.wasSuccessful():
207
210
            self.stream.write("FAILED (")
208
211
            failed, errored = map(len, (self.failures, self.errors))
215
218
                if failed or errored: self.stream.write(", ")
216
219
                self.stream.write("known_failure_count=%d" %
217
220
                    self.known_failure_count)
218
 
            self.stream.write(")\n")
 
221
            self.stream.writeln(")")
219
222
        else:
220
223
            if self.known_failure_count:
221
 
                self.stream.write("OK (known_failures=%d)\n" %
 
224
                self.stream.writeln("OK (known_failures=%d)" %
222
225
                    self.known_failure_count)
223
226
            else:
224
 
                self.stream.write("OK\n")
 
227
                self.stream.writeln("OK")
225
228
        if self.skip_count > 0:
226
229
            skipped = self.skip_count
227
 
            self.stream.write('%d test%s skipped\n' %
 
230
            self.stream.writeln('%d test%s skipped' %
228
231
                                (skipped, skipped != 1 and "s" or ""))
229
232
        if self.unsupported:
230
233
            for feature, count in sorted(self.unsupported.items()):
231
 
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
234
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
232
235
                    (feature, count))
233
236
        if self._strict:
234
237
            ok = self.wasStrictlySuccessful()
272
275
 
273
276
    def _shortened_test_description(self, test):
274
277
        what = test.id()
275
 
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
278
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
276
279
        return what
277
280
 
278
281
    def startTest(self, test):
279
 
        super(ExtendedTestResult, self).startTest(test)
 
282
        unittest.TestResult.startTest(self, test)
280
283
        if self.count == 0:
281
284
            self.startTests()
282
285
        self.report_test_start(test)
320
323
        fails with an unexpected error.
321
324
        """
322
325
        self._post_mortem()
323
 
        super(ExtendedTestResult, self).addError(test, err)
 
326
        unittest.TestResult.addError(self, test, err)
324
327
        self.error_count += 1
325
328
        self.report_error(test, err)
326
329
        if self.stop_early:
334
337
        fails because e.g. an assert() method failed.
335
338
        """
336
339
        self._post_mortem()
337
 
        super(ExtendedTestResult, self).addFailure(test, err)
 
340
        unittest.TestResult.addFailure(self, test, err)
338
341
        self.failure_count += 1
339
342
        self.report_failure(test, err)
340
343
        if self.stop_early:
354
357
                    test.id()))
355
358
        self.report_success(test)
356
359
        self._cleanupLogFile(test)
357
 
        super(ExtendedTestResult, self).addSuccess(test)
 
360
        unittest.TestResult.addSuccess(self, test)
358
361
        test._log_contents = ''
359
362
 
360
363
    def addExpectedFailure(self, test, err):
548
551
        return '%s%s' % (indent, err[1])
549
552
 
550
553
    def report_error(self, test, err):
551
 
        self.stream.write('ERROR %s\n%s\n'
 
554
        self.stream.writeln('ERROR %s\n%s'
552
555
                % (self._testTimeString(test),
553
556
                   self._error_summary(err)))
554
557
 
555
558
    def report_failure(self, test, err):
556
 
        self.stream.write(' FAIL %s\n%s\n'
 
559
        self.stream.writeln(' FAIL %s\n%s'
557
560
                % (self._testTimeString(test),
558
561
                   self._error_summary(err)))
559
562
 
560
563
    def report_known_failure(self, test, err):
561
 
        self.stream.write('XFAIL %s\n%s\n'
 
564
        self.stream.writeln('XFAIL %s\n%s'
562
565
                % (self._testTimeString(test),
563
566
                   self._error_summary(err)))
564
567
 
565
568
    def report_success(self, test):
566
 
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
569
        self.stream.writeln('   OK %s' % self._testTimeString(test))
567
570
        for bench_called, stats in getattr(test, '_benchcalls', []):
568
 
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
571
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
569
572
            stats.pprint(file=self.stream)
570
573
        # flush the stream so that we get smooth output. This verbose mode is
571
574
        # used to show the output in PQM.
572
575
        self.stream.flush()
573
576
 
574
577
    def report_skip(self, test, reason):
575
 
        self.stream.write(' SKIP %s\n%s\n'
 
578
        self.stream.writeln(' SKIP %s\n%s'
576
579
                % (self._testTimeString(test), reason))
577
580
 
578
581
    def report_not_applicable(self, test, reason):
579
 
        self.stream.write('  N/A %s\n    %s\n'
 
582
        self.stream.writeln('  N/A %s\n    %s'
580
583
                % (self._testTimeString(test), reason))
581
584
 
582
585
    def report_unsupported(self, test, feature):
583
586
        """test cannot be run because feature is missing."""
584
 
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
587
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
585
588
                %(self._testTimeString(test), feature))
586
589
 
587
590
 
616
619
            encode = codec.encode
617
620
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
618
621
        stream.encoding = new_encoding
619
 
        self.stream = stream
 
622
        self.stream = unittest._WritelnDecorator(stream)
620
623
        self.descriptions = descriptions
621
624
        self.verbosity = verbosity
622
625
        self._bench_history = bench_history
843
846
        # going away but leak one) but it seems less likely than the actual
844
847
        # false positives (the test see threads going away and does not leak).
845
848
        if leaked_threads > 0:
846
 
            if 'threads' in selftest_debug_flags:
847
 
                print '%s is leaking, active is now %d' % (self.id(), active)
848
849
            TestCase._leaking_threads_tests += 1
849
850
            if TestCase._first_thread_leaker_id is None:
850
851
                TestCase._first_thread_leaker_id = self.id()
942
943
 
943
944
    def permit_dir(self, name):
944
945
        """Permit a directory to be used by this test. See permit_url."""
945
 
        name_transport = _mod_transport.get_transport(name)
 
946
        name_transport = get_transport(name)
946
947
        self.permit_url(name)
947
948
        self.permit_url(name_transport.base)
948
949
 
1027
1028
        self.addCleanup(transport_server.stop_server)
1028
1029
        # Obtain a real transport because if the server supplies a password, it
1029
1030
        # will be hidden from the base on the client side.
1030
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1031
        t = get_transport(transport_server.get_url())
1031
1032
        # Some transport servers effectively chroot the backing transport;
1032
1033
        # others like SFTPServer don't - users of the transport can walk up the
1033
1034
        # transport to read the entire backing transport. This wouldn't matter
1094
1095
            message += '\n'
1095
1096
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1096
1097
            % (message,
1097
 
               pprint.pformat(a), pprint.pformat(b)))
 
1098
               pformat(a), pformat(b)))
1098
1099
 
1099
1100
    assertEquals = assertEqual
1100
1101
 
1985
1986
            if not allow_plugins:
1986
1987
                command.append('--no-plugins')
1987
1988
            command.extend(process_args)
1988
 
            process = self._popen(command, stdin=subprocess.PIPE,
1989
 
                                  stdout=subprocess.PIPE,
1990
 
                                  stderr=subprocess.PIPE)
 
1989
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1991
1990
        finally:
1992
1991
            restore_environment()
1993
1992
            if cwd is not None:
2001
2000
        Allows tests to override this method to intercept the calls made to
2002
2001
        Popen for introspection.
2003
2002
        """
2004
 
        return subprocess.Popen(*args, **kwargs)
 
2003
        return Popen(*args, **kwargs)
2005
2004
 
2006
2005
    def get_source_path(self):
2007
2006
        """Return the path of the directory containing bzrlib."""
2009
2008
 
2010
2009
    def get_bzr_path(self):
2011
2010
        """Return the path of the 'bzr' executable for this test suite."""
2012
 
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
2011
        bzr_path = self.get_source_path()+'/bzr'
2013
2012
        if not os.path.isfile(bzr_path):
2014
2013
            # We are probably installed. Assume sys.argv is the right file
2015
2014
            bzr_path = sys.argv[0]
2187
2186
 
2188
2187
        :param relpath: a path relative to the base url.
2189
2188
        """
2190
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2189
        t = get_transport(self.get_url(relpath))
2191
2190
        self.assertFalse(t.is_readonly())
2192
2191
        return t
2193
2192
 
2199
2198
 
2200
2199
        :param relpath: a path relative to the base url.
2201
2200
        """
2202
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2201
        t = get_transport(self.get_readonly_url(relpath))
2203
2202
        self.assertTrue(t.is_readonly())
2204
2203
        return t
2205
2204
 
2335
2334
        propagating. This method ensures than a test did not leaked.
2336
2335
        """
2337
2336
        root = TestCaseWithMemoryTransport.TEST_ROOT
2338
 
        self.permit_url(_mod_transport.get_transport(root).base)
 
2337
        self.permit_url(get_transport(root).base)
2339
2338
        wt = workingtree.WorkingTree.open(root)
2340
2339
        last_rev = wt.last_revision()
2341
2340
        if last_rev != 'null:':
2386
2385
            # might be a relative or absolute path
2387
2386
            maybe_a_url = self.get_url(relpath)
2388
2387
            segments = maybe_a_url.rsplit('/', 1)
2389
 
            t = _mod_transport.get_transport(maybe_a_url)
 
2388
            t = get_transport(maybe_a_url)
2390
2389
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2391
2390
                t.ensure_base()
2392
2391
            if format is None:
2414
2413
            backing_server = self.get_server()
2415
2414
        smart_server = test_server.SmartTCPServer_for_testing()
2416
2415
        self.start_server(smart_server, backing_server)
2417
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2418
 
                                                   ).clone(path)
 
2416
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2419
2417
        return remote_transport
2420
2418
 
2421
2419
    def make_branch_and_memory_tree(self, relpath, format=None):
2436
2434
 
2437
2435
    def setUp(self):
2438
2436
        super(TestCaseWithMemoryTransport, self).setUp()
2439
 
        # Ensure that ConnectedTransport doesn't leak sockets
2440
 
        def get_transport_with_cleanup(*args, **kwargs):
2441
 
            t = orig_get_transport(*args, **kwargs)
2442
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2443
 
                self.addCleanup(t.disconnect)
2444
 
            return t
2445
 
 
2446
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
 
                                               get_transport_with_cleanup)
2448
2437
        self._make_test_root()
2449
2438
        self.addCleanup(os.chdir, os.getcwdu())
2450
2439
        self.makeAndChdirToTestDir()
2579
2568
                "a list or a tuple. Got %r instead" % (shape,))
2580
2569
        # It's OK to just create them using forward slashes on windows.
2581
2570
        if transport is None or transport.is_readonly():
2582
 
            transport = _mod_transport.get_transport(".")
 
2571
            transport = get_transport(".")
2583
2572
        for name in shape:
2584
2573
            self.assertIsInstance(name, basestring)
2585
2574
            if name[-1] == '/':
2742
2731
    """
2743
2732
 
2744
2733
    def setUp(self):
2745
 
        from bzrlib.tests import http_server
2746
2734
        super(ChrootedTestCase, self).setUp()
2747
2735
        if not self.vfs_transport_factory == memory.MemoryServer:
2748
 
            self.transport_readonly_server = http_server.HttpServer
 
2736
            self.transport_readonly_server = HttpServer
2749
2737
 
2750
2738
 
2751
2739
def condition_id_re(pattern):
2754
2742
    :param pattern: A regular expression string.
2755
2743
    :return: A callable that returns True if the re matches.
2756
2744
    """
2757
 
    filter_re = re.compile(pattern, 0)
 
2745
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2746
        'test filter')
2758
2747
    def condition(test):
2759
2748
        test_id = test.id()
2760
2749
        return filter_re.search(test_id)
3072
3061
    return suite
3073
3062
 
3074
3063
 
3075
 
class TestDecorator(TestUtil.TestSuite):
 
3064
class TestDecorator(TestSuite):
3076
3065
    """A decorator for TestCase/TestSuite objects.
3077
3066
    
3078
3067
    Usually, subclasses should override __iter__(used when flattening test
3081
3070
    """
3082
3071
 
3083
3072
    def __init__(self, suite):
3084
 
        TestUtil.TestSuite.__init__(self)
 
3073
        TestSuite.__init__(self)
3085
3074
        self.addTest(suite)
3086
3075
 
3087
3076
    def countTestCases(self):
3206
3195
 
3207
3196
def partition_tests(suite, count):
3208
3197
    """Partition suite into count lists of tests."""
3209
 
    # This just assigns tests in a round-robin fashion.  On one hand this
3210
 
    # splits up blocks of related tests that might run faster if they shared
3211
 
    # resources, but on the other it avoids assigning blocks of slow tests to
3212
 
    # just one partition.  So the slowest partition shouldn't be much slower
3213
 
    # than the fastest.
3214
 
    partitions = [list() for i in range(count)]
3215
 
    tests = iter_suite_tests(suite)
3216
 
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3217
 
        partition.append(test)
3218
 
    return partitions
 
3198
    result = []
 
3199
    tests = list(iter_suite_tests(suite))
 
3200
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3201
    for block in range(count):
 
3202
        low_test = block * tests_per_process
 
3203
        high_test = low_test + tests_per_process
 
3204
        process_tests = tests[low_test:high_test]
 
3205
        result.append(process_tests)
 
3206
    return result
3219
3207
 
3220
3208
 
3221
3209
def workaround_zealous_crypto_random():
3255
3243
 
3256
3244
    test_blocks = partition_tests(suite, concurrency)
3257
3245
    for process_tests in test_blocks:
3258
 
        process_suite = TestUtil.TestSuite()
 
3246
        process_suite = TestSuite()
3259
3247
        process_suite.addTests(process_tests)
3260
3248
        c2pread, c2pwrite = os.pipe()
3261
3249
        pid = os.fork()
3327
3315
                '--subunit']
3328
3316
            if '--no-plugins' in sys.argv:
3329
3317
                argv.append('--no-plugins')
3330
 
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3331
 
            # noise on stderr it can interrupt the subunit protocol.
3332
 
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3333
 
                                      stdout=subprocess.PIPE,
3334
 
                                      stderr=subprocess.PIPE,
3335
 
                                      bufsize=1)
 
3318
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3319
            # stderr it can interrupt the subunit protocol.
 
3320
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3321
                bufsize=1)
3336
3322
            test = TestInSubprocess(process, test_list_file_name)
3337
3323
            result.append(test)
3338
3324
        except:
3387
3373
 
3388
3374
    def startTest(self, test):
3389
3375
        self.profiler = bzrlib.lsprof.BzrProfiler()
3390
 
        # Prevent deadlocks in tests that use lsprof: those tests will
3391
 
        # unavoidably fail.
3392
 
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3393
3376
        self.profiler.start()
3394
3377
        ForwardingResult.startTest(self, test)
3395
3378
 
3416
3399
#                           rather than failing tests. And no longer raise
3417
3400
#                           LockContention when fctnl locks are not being used
3418
3401
#                           with proper exclusion rules.
3419
 
#   -Ethreads               Will display thread ident at creation/join time to
3420
 
#                           help track thread leaks
3421
3402
selftest_debug_flags = set()
3422
3403
 
3423
3404
 
3656
3637
        'bzrlib.doc',
3657
3638
        'bzrlib.tests.blackbox',
3658
3639
        'bzrlib.tests.commands',
3659
 
        'bzrlib.tests.doc_generate',
3660
3640
        'bzrlib.tests.per_branch',
3661
 
        'bzrlib.tests.per_controldir',
3662
 
        'bzrlib.tests.per_controldir_colo',
 
3641
        'bzrlib.tests.per_bzrdir',
 
3642
        'bzrlib.tests.per_bzrdir_colo',
3663
3643
        'bzrlib.tests.per_foreign_vcs',
3664
3644
        'bzrlib.tests.per_interrepository',
3665
3645
        'bzrlib.tests.per_intertree',
3678
3658
        'bzrlib.tests.per_workingtree',
3679
3659
        'bzrlib.tests.test__annotator',
3680
3660
        'bzrlib.tests.test__bencode',
3681
 
        'bzrlib.tests.test__btree_serializer',
3682
3661
        'bzrlib.tests.test__chk_map',
3683
3662
        'bzrlib.tests.test__dirstate_helpers',
3684
3663
        'bzrlib.tests.test__groupcompress',
3817
3796
        'bzrlib.tests.test_switch',
3818
3797
        'bzrlib.tests.test_symbol_versioning',
3819
3798
        'bzrlib.tests.test_tag',
3820
 
        'bzrlib.tests.test_test_server',
3821
3799
        'bzrlib.tests.test_testament',
3822
3800
        'bzrlib.tests.test_textfile',
3823
3801
        'bzrlib.tests.test_textmerge',
3829
3807
        'bzrlib.tests.test_transport_log',
3830
3808
        'bzrlib.tests.test_tree',
3831
3809
        'bzrlib.tests.test_treebuilder',
3832
 
        'bzrlib.tests.test_treeshape',
3833
3810
        'bzrlib.tests.test_tsort',
3834
3811
        'bzrlib.tests.test_tuned_gzip',
3835
3812
        'bzrlib.tests.test_ui',
3839
3816
        'bzrlib.tests.test_urlutils',
3840
3817
        'bzrlib.tests.test_version',
3841
3818
        'bzrlib.tests.test_version_info',
3842
 
        'bzrlib.tests.test_versionedfile',
3843
3819
        'bzrlib.tests.test_weave',
3844
3820
        'bzrlib.tests.test_whitebox',
3845
3821
        'bzrlib.tests.test_win32utils',
4014
3990
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4015
3991
    ...     [('one', dict(param=1)),
4016
3992
    ...      ('two', dict(param=2))],
4017
 
    ...     TestUtil.TestSuite())
 
3993
    ...     TestSuite())
4018
3994
    >>> tests = list(iter_suite_tests(r))
4019
3995
    >>> len(tests)
4020
3996
    2
4067
4043
    :param new_id: The id to assign to it.
4068
4044
    :return: The new test.
4069
4045
    """
4070
 
    new_test = copy.copy(test)
 
4046
    new_test = copy(test)
4071
4047
    new_test.id = lambda: new_id
4072
4048
    return new_test
4073
4049