~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2010-03-01 05:15:11 UTC
  • mto: (4797.2.24 2.1)
  • mto: This revision was merged to the branch mainline in revision 5069.
  • Revision ID: mbp@canonical.com-20100301051511-rl4w81u1s4kjh7r1
Fix link to plugin guide

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
29
28
 
30
29
import atexit
31
30
import codecs
32
 
import copy
 
31
from copy import copy
33
32
from cStringIO import StringIO
34
33
import difflib
35
34
import doctest
36
35
import errno
37
 
import itertools
38
36
import logging
39
37
import math
40
38
import os
41
 
import pprint
 
39
from pprint import pformat
42
40
import random
43
41
import re
44
42
import shlex
45
43
import stat
46
 
import subprocess
 
44
from subprocess import Popen, PIPE, STDOUT
47
45
import sys
48
46
import tempfile
49
47
import threading
75
73
    ui,
76
74
    urlutils,
77
75
    registry,
78
 
    transport as _mod_transport,
79
76
    workingtree,
80
77
    )
81
78
import bzrlib.branch
104
101
    deprecated_passed,
105
102
    )
106
103
import bzrlib.trace
107
 
from bzrlib.transport import (
108
 
    memory,
109
 
    pathfilter,
110
 
    )
 
104
from bzrlib.transport import get_transport, pathfilter
 
105
import bzrlib.transport
 
106
from bzrlib.transport.local import LocalURLServer
 
107
from bzrlib.transport.memory import MemoryServer
 
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
 
from bzrlib.tests import (
113
 
    test_server,
114
 
    TestUtil,
115
 
    treeshape,
116
 
    )
 
110
from bzrlib.tests import TestUtil
 
111
from bzrlib.tests.http_server import HttpServer
 
112
from bzrlib.tests.TestUtil import (
 
113
                          TestSuite,
 
114
                          TestLoader,
 
115
                          )
 
116
from bzrlib.tests.treeshape import build_tree_contents
117
117
from bzrlib.ui import NullProgressView
118
118
from bzrlib.ui.text import TextUIFactory
119
119
import bzrlib.version_info_formats.format_custom
124
124
# shown frame is the test code, not our assertXYZ.
125
125
__unittest = 1
126
126
 
127
 
default_transport = test_server.LocalURLServer
128
 
 
129
 
 
130
 
_unitialized_attr = object()
131
 
"""A sentinel needed to act as a default value in a method signature."""
132
 
 
 
127
default_transport = LocalURLServer
133
128
 
134
129
# Subunit result codes, defined here to prevent a hard dependency on subunit.
135
130
SUBUNIT_SEEK_SET = 0
136
131
SUBUNIT_SEEK_CUR = 1
137
132
 
138
133
 
139
 
class ExtendedTestResult(testtools.TextTestResult):
 
134
class ExtendedTestResult(unittest._TextTestResult):
140
135
    """Accepts, reports and accumulates the results of running tests.
141
136
 
142
137
    Compared to the unittest version this class adds support for
163
158
        :param bench_history: Optionally, a writable file object to accumulate
164
159
            benchmark results.
165
160
        """
166
 
        testtools.TextTestResult.__init__(self, stream)
 
161
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
167
162
        if bench_history is not None:
168
163
            from bzrlib.version import _get_bzr_source_tree
169
164
            src_tree = _get_bzr_source_tree()
196
191
        actionTaken = "Ran"
197
192
        stopTime = time.time()
198
193
        timeTaken = stopTime - self.startTime
199
 
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
200
 
        #                the parent class method is similar have to duplicate
201
 
        self._show_list('ERROR', self.errors)
202
 
        self._show_list('FAIL', self.failures)
203
 
        self.stream.write(self.sep2)
204
 
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
194
        self.printErrors()
 
195
        self.stream.writeln(self.separator2)
 
196
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
205
197
                            run, run != 1 and "s" or "", timeTaken))
 
198
        self.stream.writeln()
206
199
        if not self.wasSuccessful():
207
200
            self.stream.write("FAILED (")
208
201
            failed, errored = map(len, (self.failures, self.errors))
215
208
                if failed or errored: self.stream.write(", ")
216
209
                self.stream.write("known_failure_count=%d" %
217
210
                    self.known_failure_count)
218
 
            self.stream.write(")\n")
 
211
            self.stream.writeln(")")
219
212
        else:
220
213
            if self.known_failure_count:
221
 
                self.stream.write("OK (known_failures=%d)\n" %
 
214
                self.stream.writeln("OK (known_failures=%d)" %
222
215
                    self.known_failure_count)
223
216
            else:
224
 
                self.stream.write("OK\n")
 
217
                self.stream.writeln("OK")
225
218
        if self.skip_count > 0:
226
219
            skipped = self.skip_count
227
 
            self.stream.write('%d test%s skipped\n' %
 
220
            self.stream.writeln('%d test%s skipped' %
228
221
                                (skipped, skipped != 1 and "s" or ""))
229
222
        if self.unsupported:
230
223
            for feature, count in sorted(self.unsupported.items()):
231
 
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
224
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
232
225
                    (feature, count))
233
226
        if self._strict:
234
227
            ok = self.wasStrictlySuccessful()
272
265
 
273
266
    def _shortened_test_description(self, test):
274
267
        what = test.id()
275
 
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
268
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
276
269
        return what
277
270
 
278
271
    def startTest(self, test):
279
 
        super(ExtendedTestResult, self).startTest(test)
 
272
        unittest.TestResult.startTest(self, test)
280
273
        if self.count == 0:
281
274
            self.startTests()
282
275
        self.report_test_start(test)
320
313
        fails with an unexpected error.
321
314
        """
322
315
        self._post_mortem()
323
 
        super(ExtendedTestResult, self).addError(test, err)
 
316
        unittest.TestResult.addError(self, test, err)
324
317
        self.error_count += 1
325
318
        self.report_error(test, err)
326
319
        if self.stop_early:
334
327
        fails because e.g. an assert() method failed.
335
328
        """
336
329
        self._post_mortem()
337
 
        super(ExtendedTestResult, self).addFailure(test, err)
 
330
        unittest.TestResult.addFailure(self, test, err)
338
331
        self.failure_count += 1
339
332
        self.report_failure(test, err)
340
333
        if self.stop_early:
354
347
                    test.id()))
355
348
        self.report_success(test)
356
349
        self._cleanupLogFile(test)
357
 
        super(ExtendedTestResult, self).addSuccess(test)
 
350
        unittest.TestResult.addSuccess(self, test)
358
351
        test._log_contents = ''
359
352
 
360
353
    def addExpectedFailure(self, test, err):
487
480
        return self._shortened_test_description(test)
488
481
 
489
482
    def report_error(self, test, err):
490
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
483
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
491
484
            self._test_description(test),
492
485
            err[1],
493
486
            ))
494
487
 
495
488
    def report_failure(self, test, err):
496
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
489
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
497
490
            self._test_description(test),
498
491
            err[1],
499
492
            ))
548
541
        return '%s%s' % (indent, err[1])
549
542
 
550
543
    def report_error(self, test, err):
551
 
        self.stream.write('ERROR %s\n%s\n'
 
544
        self.stream.writeln('ERROR %s\n%s'
552
545
                % (self._testTimeString(test),
553
546
                   self._error_summary(err)))
554
547
 
555
548
    def report_failure(self, test, err):
556
 
        self.stream.write(' FAIL %s\n%s\n'
 
549
        self.stream.writeln(' FAIL %s\n%s'
557
550
                % (self._testTimeString(test),
558
551
                   self._error_summary(err)))
559
552
 
560
553
    def report_known_failure(self, test, err):
561
 
        self.stream.write('XFAIL %s\n%s\n'
 
554
        self.stream.writeln('XFAIL %s\n%s'
562
555
                % (self._testTimeString(test),
563
556
                   self._error_summary(err)))
564
557
 
565
558
    def report_success(self, test):
566
 
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
559
        self.stream.writeln('   OK %s' % self._testTimeString(test))
567
560
        for bench_called, stats in getattr(test, '_benchcalls', []):
568
 
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
561
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
569
562
            stats.pprint(file=self.stream)
570
563
        # flush the stream so that we get smooth output. This verbose mode is
571
564
        # used to show the output in PQM.
572
565
        self.stream.flush()
573
566
 
574
567
    def report_skip(self, test, reason):
575
 
        self.stream.write(' SKIP %s\n%s\n'
 
568
        self.stream.writeln(' SKIP %s\n%s'
576
569
                % (self._testTimeString(test), reason))
577
570
 
578
571
    def report_not_applicable(self, test, reason):
579
 
        self.stream.write('  N/A %s\n    %s\n'
 
572
        self.stream.writeln('  N/A %s\n    %s'
580
573
                % (self._testTimeString(test), reason))
581
574
 
582
575
    def report_unsupported(self, test, feature):
583
576
        """test cannot be run because feature is missing."""
584
 
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
577
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
585
578
                %(self._testTimeString(test), feature))
586
579
 
587
580
 
616
609
            encode = codec.encode
617
610
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
618
611
        stream.encoding = new_encoding
619
 
        self.stream = stream
 
612
        self.stream = unittest._WritelnDecorator(stream)
620
613
        self.descriptions = descriptions
621
614
        self.verbosity = verbosity
622
615
        self._bench_history = bench_history
704
697
    """
705
698
 
706
699
 
 
700
class CommandFailed(Exception):
 
701
    pass
 
702
 
 
703
 
707
704
class StringIOWrapper(object):
708
705
    """A wrapper around cStringIO which just adds an encoding attribute.
709
706
 
746
743
    # XXX: Should probably unify more with CannedInputUIFactory or a
747
744
    # particular configuration of TextUIFactory, or otherwise have a clearer
748
745
    # idea of how they're supposed to be different.
749
 
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
746
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
750
747
 
751
748
    def __init__(self, stdout=None, stderr=None, stdin=None):
752
749
        if stdin is not None:
843
840
        # going away but leak one) but it seems less likely than the actual
844
841
        # false positives (the test see threads going away and does not leak).
845
842
        if leaked_threads > 0:
846
 
            if 'threads' in selftest_debug_flags:
847
 
                print '%s is leaking, active is now %d' % (self.id(), active)
848
843
            TestCase._leaking_threads_tests += 1
849
844
            if TestCase._first_thread_leaker_id is None:
850
845
                TestCase._first_thread_leaker_id = self.id()
855
850
        Tests that want to use debug flags can just set them in the
856
851
        debug_flags set during setup/teardown.
857
852
        """
858
 
        # Start with a copy of the current debug flags we can safely modify.
859
 
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
853
        self._preserved_debug_flags = set(debug.debug_flags)
860
854
        if 'allow_debug' not in selftest_debug_flags:
861
855
            debug.debug_flags.clear()
862
856
        if 'disable_lock_checks' not in selftest_debug_flags:
863
857
            debug.debug_flags.add('strict_locks')
 
858
        self.addCleanup(self._restore_debug_flags)
864
859
 
865
860
    def _clear_hooks(self):
866
861
        # prevent hooks affecting tests
887
882
    def _silenceUI(self):
888
883
        """Turn off UI for duration of test"""
889
884
        # by default the UI is off; tests can turn it on if they want it.
890
 
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
885
        saved = ui.ui_factory
 
886
        def _restore():
 
887
            ui.ui_factory = saved
 
888
        ui.ui_factory = ui.SilentUIFactory()
 
889
        self.addCleanup(_restore)
891
890
 
892
891
    def _check_locks(self):
893
892
        """Check that all lock take/release actions have been paired."""
942
941
 
943
942
    def permit_dir(self, name):
944
943
        """Permit a directory to be used by this test. See permit_url."""
945
 
        name_transport = _mod_transport.get_transport(name)
 
944
        name_transport = get_transport(name)
946
945
        self.permit_url(name)
947
946
        self.permit_url(name_transport.base)
948
947
 
1027
1026
        self.addCleanup(transport_server.stop_server)
1028
1027
        # Obtain a real transport because if the server supplies a password, it
1029
1028
        # will be hidden from the base on the client side.
1030
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1029
        t = get_transport(transport_server.get_url())
1031
1030
        # Some transport servers effectively chroot the backing transport;
1032
1031
        # others like SFTPServer don't - users of the transport can walk up the
1033
1032
        # transport to read the entire backing transport. This wouldn't matter
1044
1043
        if t.base.endswith('/work/'):
1045
1044
            # we have safety net/test root/work
1046
1045
            t = t.clone('../..')
1047
 
        elif isinstance(transport_server,
1048
 
                        test_server.SmartTCPServer_for_testing):
 
1046
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1049
1047
            # The smart server adds a path similar to work, which is traversed
1050
1048
            # up from by the client. But the server is chrooted - the actual
1051
1049
            # backing transport is not escaped from, and VFS requests to the
1094
1092
            message += '\n'
1095
1093
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1096
1094
            % (message,
1097
 
               pprint.pformat(a), pprint.pformat(b)))
 
1095
               pformat(a), pformat(b)))
1098
1096
 
1099
1097
    assertEquals = assertEqual
1100
1098
 
1206
1204
            raise AssertionError('pattern "%s" found in "%s"'
1207
1205
                    % (needle_re, haystack))
1208
1206
 
1209
 
    def assertContainsString(self, haystack, needle):
1210
 
        if haystack.find(needle) == -1:
1211
 
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1212
 
 
1213
1207
    def assertSubset(self, sublist, superlist):
1214
1208
        """Assert that every entry in sublist is present in superlist."""
1215
1209
        missing = set(sublist) - set(superlist)
1312
1306
            f.close()
1313
1307
        self.assertEqualDiff(content, s)
1314
1308
 
1315
 
    def assertDocstring(self, expected_docstring, obj):
1316
 
        """Fail if obj does not have expected_docstring"""
1317
 
        if __doc__ is None:
1318
 
            # With -OO the docstring should be None instead
1319
 
            self.assertIs(obj.__doc__, None)
1320
 
        else:
1321
 
            self.assertEqual(expected_docstring, obj.__doc__)
1322
 
 
1323
1309
    def failUnlessExists(self, path):
1324
1310
        """Fail unless path or paths, which may be abs or relative, exist."""
1325
1311
        if not isinstance(path, basestring):
1493
1479
        """
1494
1480
        self._cleanups.append((callable, args, kwargs))
1495
1481
 
1496
 
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1497
 
        """Overrides an object attribute restoring it after the test.
1498
 
 
1499
 
        :param obj: The object that will be mutated.
1500
 
 
1501
 
        :param attr_name: The attribute name we want to preserve/override in
1502
 
            the object.
1503
 
 
1504
 
        :param new: The optional value we want to set the attribute to.
1505
 
 
1506
 
        :returns: The actual attr value.
1507
 
        """
1508
 
        value = getattr(obj, attr_name)
1509
 
        # The actual value is captured by the call below
1510
 
        self.addCleanup(setattr, obj, attr_name, value)
1511
 
        if new is not _unitialized_attr:
1512
 
            setattr(obj, attr_name, new)
1513
 
        return value
1514
 
 
1515
1482
    def _cleanEnvironment(self):
1516
1483
        new_env = {
1517
1484
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1523
1490
            'EDITOR': None,
1524
1491
            'BZR_EMAIL': None,
1525
1492
            'BZREMAIL': None, # may still be present in the environment
1526
 
            'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
1493
            'EMAIL': None,
1527
1494
            'BZR_PROGRESS_BAR': None,
1528
1495
            'BZR_LOG': None,
1529
1496
            'BZR_PLUGIN_PATH': None,
1530
 
            'BZR_DISABLE_PLUGINS': None,
1531
 
            'BZR_PLUGINS_AT': None,
1532
1497
            'BZR_CONCURRENCY': None,
1533
1498
            # Make sure that any text ui tests are consistent regardless of
1534
1499
            # the environment the test case is run in; you may want tests that
1555
1520
            'ftp_proxy': None,
1556
1521
            'FTP_PROXY': None,
1557
1522
            'BZR_REMOTE_PATH': None,
1558
 
            # Generally speaking, we don't want apport reporting on crashes in
1559
 
            # the test envirnoment unless we're specifically testing apport,
1560
 
            # so that it doesn't leak into the real system environment.  We
1561
 
            # use an env var so it propagates to subprocesses.
1562
 
            'APPORT_DISABLE': '1',
1563
1523
        }
1564
 
        self._old_env = {}
 
1524
        self.__old_env = {}
1565
1525
        self.addCleanup(self._restoreEnvironment)
1566
1526
        for name, value in new_env.iteritems():
1567
1527
            self._captureVar(name, value)
1568
1528
 
1569
1529
    def _captureVar(self, name, newvalue):
1570
1530
        """Set an environment variable, and reset it when finished."""
1571
 
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1531
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1532
 
 
1533
    def _restore_debug_flags(self):
 
1534
        debug.debug_flags.clear()
 
1535
        debug.debug_flags.update(self._preserved_debug_flags)
1572
1536
 
1573
1537
    def _restoreEnvironment(self):
1574
 
        for name, value in self._old_env.iteritems():
 
1538
        for name, value in self.__old_env.iteritems():
1575
1539
            osutils.set_or_unset_env(name, value)
1576
1540
 
1577
1541
    def _restoreHooks(self):
1681
1645
                unicodestr = log_contents.decode('utf8', 'replace')
1682
1646
                log_contents = unicodestr.encode('utf8')
1683
1647
            if not keep_log_file:
1684
 
                close_attempts = 0
1685
 
                max_close_attempts = 100
1686
 
                first_close_error = None
1687
 
                while close_attempts < max_close_attempts:
1688
 
                    close_attempts += 1
1689
 
                    try:
1690
 
                        self._log_file.close()
1691
 
                    except IOError, ioe:
1692
 
                        if ioe.errno is None:
1693
 
                            # No errno implies 'close() called during
1694
 
                            # concurrent operation on the same file object', so
1695
 
                            # retry.  Probably a thread is trying to write to
1696
 
                            # the log file.
1697
 
                            if first_close_error is None:
1698
 
                                first_close_error = ioe
1699
 
                            continue
1700
 
                        raise
1701
 
                    else:
1702
 
                        break
1703
 
                if close_attempts > 1:
1704
 
                    sys.stderr.write(
1705
 
                        'Unable to close log file on first attempt, '
1706
 
                        'will retry: %s\n' % (first_close_error,))
1707
 
                    if close_attempts == max_close_attempts:
1708
 
                        sys.stderr.write(
1709
 
                            'Unable to close log file after %d attempts.\n'
1710
 
                            % (max_close_attempts,))
 
1648
                self._log_file.close()
1711
1649
                self._log_file = None
1712
1650
                # Permit multiple calls to get_log until we clean it up in
1713
1651
                # finishLogFile
1985
1923
            if not allow_plugins:
1986
1924
                command.append('--no-plugins')
1987
1925
            command.extend(process_args)
1988
 
            process = self._popen(command, stdin=subprocess.PIPE,
1989
 
                                  stdout=subprocess.PIPE,
1990
 
                                  stderr=subprocess.PIPE)
 
1926
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1991
1927
        finally:
1992
1928
            restore_environment()
1993
1929
            if cwd is not None:
2001
1937
        Allows tests to override this method to intercept the calls made to
2002
1938
        Popen for introspection.
2003
1939
        """
2004
 
        return subprocess.Popen(*args, **kwargs)
 
1940
        return Popen(*args, **kwargs)
2005
1941
 
2006
1942
    def get_source_path(self):
2007
1943
        """Return the path of the directory containing bzrlib."""
2009
1945
 
2010
1946
    def get_bzr_path(self):
2011
1947
        """Return the path of the 'bzr' executable for this test suite."""
2012
 
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
1948
        bzr_path = self.get_source_path()+'/bzr'
2013
1949
        if not os.path.isfile(bzr_path):
2014
1950
            # We are probably installed. Assume sys.argv is the right file
2015
1951
            bzr_path = sys.argv[0]
2101
2037
 
2102
2038
        Tests that expect to provoke LockContention errors should call this.
2103
2039
        """
2104
 
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2040
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
2041
        def resetTimeout():
 
2042
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
2043
        self.addCleanup(resetTimeout)
 
2044
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
2105
2045
 
2106
2046
    def make_utf8_encoded_stringio(self, encoding_type=None):
2107
2047
        """Return a StringIOWrapper instance, that will encode Unicode
2121
2061
        request_handlers = request.request_handlers
2122
2062
        orig_method = request_handlers.get(verb)
2123
2063
        request_handlers.remove(verb)
2124
 
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2064
        def restoreVerb():
 
2065
            request_handlers.register(verb, orig_method)
 
2066
        self.addCleanup(restoreVerb)
2125
2067
 
2126
2068
 
2127
2069
class CapturedCall(object):
2187
2129
 
2188
2130
        :param relpath: a path relative to the base url.
2189
2131
        """
2190
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2132
        t = get_transport(self.get_url(relpath))
2191
2133
        self.assertFalse(t.is_readonly())
2192
2134
        return t
2193
2135
 
2199
2141
 
2200
2142
        :param relpath: a path relative to the base url.
2201
2143
        """
2202
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2144
        t = get_transport(self.get_readonly_url(relpath))
2203
2145
        self.assertTrue(t.is_readonly())
2204
2146
        return t
2205
2147
 
2218
2160
        if self.__readonly_server is None:
2219
2161
            if self.transport_readonly_server is None:
2220
2162
                # readonly decorator requested
2221
 
                self.__readonly_server = test_server.ReadonlyServer()
 
2163
                self.__readonly_server = ReadonlyServer()
2222
2164
            else:
2223
2165
                # explicit readonly transport.
2224
2166
                self.__readonly_server = self.create_transport_readonly_server()
2247
2189
        is no means to override it.
2248
2190
        """
2249
2191
        if self.__vfs_server is None:
2250
 
            self.__vfs_server = memory.MemoryServer()
 
2192
            self.__vfs_server = MemoryServer()
2251
2193
            self.start_server(self.__vfs_server)
2252
2194
        return self.__vfs_server
2253
2195
 
2335
2277
        propagating. This method ensures than a test did not leaked.
2336
2278
        """
2337
2279
        root = TestCaseWithMemoryTransport.TEST_ROOT
2338
 
        self.permit_url(_mod_transport.get_transport(root).base)
 
2280
        self.permit_url(get_transport(root).base)
2339
2281
        wt = workingtree.WorkingTree.open(root)
2340
2282
        last_rev = wt.last_revision()
2341
2283
        if last_rev != 'null:':
2386
2328
            # might be a relative or absolute path
2387
2329
            maybe_a_url = self.get_url(relpath)
2388
2330
            segments = maybe_a_url.rsplit('/', 1)
2389
 
            t = _mod_transport.get_transport(maybe_a_url)
 
2331
            t = get_transport(maybe_a_url)
2390
2332
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2391
2333
                t.ensure_base()
2392
2334
            if format is None:
2409
2351
        made_control = self.make_bzrdir(relpath, format=format)
2410
2352
        return made_control.create_repository(shared=shared)
2411
2353
 
2412
 
    def make_smart_server(self, path, backing_server=None):
2413
 
        if backing_server is None:
2414
 
            backing_server = self.get_server()
2415
 
        smart_server = test_server.SmartTCPServer_for_testing()
2416
 
        self.start_server(smart_server, backing_server)
2417
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2418
 
                                                   ).clone(path)
 
2354
    def make_smart_server(self, path):
 
2355
        smart_server = server.SmartTCPServer_for_testing()
 
2356
        self.start_server(smart_server, self.get_server())
 
2357
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2419
2358
        return remote_transport
2420
2359
 
2421
2360
    def make_branch_and_memory_tree(self, relpath, format=None):
2436
2375
 
2437
2376
    def setUp(self):
2438
2377
        super(TestCaseWithMemoryTransport, self).setUp()
2439
 
        # Ensure that ConnectedTransport doesn't leak sockets
2440
 
        def get_transport_with_cleanup(*args, **kwargs):
2441
 
            t = orig_get_transport(*args, **kwargs)
2442
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2443
 
                self.addCleanup(t.disconnect)
2444
 
            return t
2445
 
 
2446
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2447
 
                                               get_transport_with_cleanup)
2448
2378
        self._make_test_root()
2449
 
        self.addCleanup(os.chdir, os.getcwdu())
 
2379
        _currentdir = os.getcwdu()
 
2380
        def _leaveDirectory():
 
2381
            os.chdir(_currentdir)
 
2382
        self.addCleanup(_leaveDirectory)
2450
2383
        self.makeAndChdirToTestDir()
2451
2384
        self.overrideEnvironmentForTesting()
2452
2385
        self.__readonly_server = None
2455
2388
 
2456
2389
    def setup_smart_server_with_call_log(self):
2457
2390
        """Sets up a smart server as the transport server with a call log."""
2458
 
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2391
        self.transport_server = server.SmartTCPServer_for_testing
2459
2392
        self.hpss_calls = []
2460
2393
        import traceback
2461
2394
        # Skip the current stack down to the caller of
2495
2428
 
2496
2429
    def check_file_contents(self, filename, expect):
2497
2430
        self.log("check contents of file %s" % filename)
2498
 
        f = file(filename)
2499
 
        try:
2500
 
            contents = f.read()
2501
 
        finally:
2502
 
            f.close()
 
2431
        contents = file(filename, 'r').read()
2503
2432
        if contents != expect:
2504
2433
            self.log("expected: %r" % expect)
2505
2434
            self.log("actually: %r" % contents)
2579
2508
                "a list or a tuple. Got %r instead" % (shape,))
2580
2509
        # It's OK to just create them using forward slashes on windows.
2581
2510
        if transport is None or transport.is_readonly():
2582
 
            transport = _mod_transport.get_transport(".")
 
2511
            transport = get_transport(".")
2583
2512
        for name in shape:
2584
2513
            self.assertIsInstance(name, basestring)
2585
2514
            if name[-1] == '/':
2595
2524
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2596
2525
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2597
2526
 
2598
 
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2527
    def build_tree_contents(self, shape):
 
2528
        build_tree_contents(shape)
2599
2529
 
2600
2530
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2601
2531
        """Assert whether path or paths are in the WorkingTree"""
2677
2607
            # We can only make working trees locally at the moment.  If the
2678
2608
            # transport can't support them, then we keep the non-disk-backed
2679
2609
            # branch and create a local checkout.
2680
 
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2610
            if self.vfs_transport_factory is LocalURLServer:
2681
2611
                # the branch is colocated on disk, we cannot create a checkout.
2682
2612
                # hopefully callers will expect this.
2683
2613
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2742
2672
    """
2743
2673
 
2744
2674
    def setUp(self):
2745
 
        from bzrlib.tests import http_server
2746
2675
        super(ChrootedTestCase, self).setUp()
2747
 
        if not self.vfs_transport_factory == memory.MemoryServer:
2748
 
            self.transport_readonly_server = http_server.HttpServer
 
2676
        if not self.vfs_transport_factory == MemoryServer:
 
2677
            self.transport_readonly_server = HttpServer
2749
2678
 
2750
2679
 
2751
2680
def condition_id_re(pattern):
2754
2683
    :param pattern: A regular expression string.
2755
2684
    :return: A callable that returns True if the re matches.
2756
2685
    """
2757
 
    filter_re = re.compile(pattern, 0)
 
2686
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2687
        'test filter')
2758
2688
    def condition(test):
2759
2689
        test_id = test.id()
2760
2690
        return filter_re.search(test_id)
3072
3002
    return suite
3073
3003
 
3074
3004
 
3075
 
class TestDecorator(TestUtil.TestSuite):
 
3005
class TestDecorator(TestSuite):
3076
3006
    """A decorator for TestCase/TestSuite objects.
3077
3007
    
3078
3008
    Usually, subclasses should override __iter__(used when flattening test
3081
3011
    """
3082
3012
 
3083
3013
    def __init__(self, suite):
3084
 
        TestUtil.TestSuite.__init__(self)
 
3014
        TestSuite.__init__(self)
3085
3015
        self.addTest(suite)
3086
3016
 
3087
3017
    def countTestCases(self):
3206
3136
 
3207
3137
def partition_tests(suite, count):
3208
3138
    """Partition suite into count lists of tests."""
3209
 
    # This just assigns tests in a round-robin fashion.  On one hand this
3210
 
    # splits up blocks of related tests that might run faster if they shared
3211
 
    # resources, but on the other it avoids assigning blocks of slow tests to
3212
 
    # just one partition.  So the slowest partition shouldn't be much slower
3213
 
    # than the fastest.
3214
 
    partitions = [list() for i in range(count)]
3215
 
    tests = iter_suite_tests(suite)
3216
 
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3217
 
        partition.append(test)
3218
 
    return partitions
3219
 
 
3220
 
 
3221
 
def workaround_zealous_crypto_random():
3222
 
    """Crypto.Random want to help us being secure, but we don't care here.
3223
 
 
3224
 
    This workaround some test failure related to the sftp server. Once paramiko
3225
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3226
 
    """
3227
 
    try:
3228
 
        from Crypto.Random import atfork
3229
 
        atfork()
3230
 
    except ImportError:
3231
 
        pass
 
3139
    result = []
 
3140
    tests = list(iter_suite_tests(suite))
 
3141
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3142
    for block in range(count):
 
3143
        low_test = block * tests_per_process
 
3144
        high_test = low_test + tests_per_process
 
3145
        process_tests = tests[low_test:high_test]
 
3146
        result.append(process_tests)
 
3147
    return result
3232
3148
 
3233
3149
 
3234
3150
def fork_for_tests(suite):
3251
3167
            try:
3252
3168
                ProtocolTestCase.run(self, result)
3253
3169
            finally:
3254
 
                os.waitpid(self.pid, 0)
 
3170
                os.waitpid(self.pid, os.WNOHANG)
3255
3171
 
3256
3172
    test_blocks = partition_tests(suite, concurrency)
3257
3173
    for process_tests in test_blocks:
3258
 
        process_suite = TestUtil.TestSuite()
 
3174
        process_suite = TestSuite()
3259
3175
        process_suite.addTests(process_tests)
3260
3176
        c2pread, c2pwrite = os.pipe()
3261
3177
        pid = os.fork()
3262
3178
        if pid == 0:
3263
 
            workaround_zealous_crypto_random()
3264
3179
            try:
3265
3180
                os.close(c2pread)
3266
3181
                # Leave stderr and stdout open so we can see test noise
3327
3242
                '--subunit']
3328
3243
            if '--no-plugins' in sys.argv:
3329
3244
                argv.append('--no-plugins')
3330
 
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3331
 
            # noise on stderr it can interrupt the subunit protocol.
3332
 
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3333
 
                                      stdout=subprocess.PIPE,
3334
 
                                      stderr=subprocess.PIPE,
3335
 
                                      bufsize=1)
 
3245
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3246
            # stderr it can interrupt the subunit protocol.
 
3247
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3248
                bufsize=1)
3336
3249
            test = TestInSubprocess(process, test_list_file_name)
3337
3250
            result.append(test)
3338
3251
        except:
3387
3300
 
3388
3301
    def startTest(self, test):
3389
3302
        self.profiler = bzrlib.lsprof.BzrProfiler()
3390
 
        # Prevent deadlocks in tests that use lsprof: those tests will
3391
 
        # unavoidably fail.
3392
 
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3393
3303
        self.profiler.start()
3394
3304
        ForwardingResult.startTest(self, test)
3395
3305
 
3416
3326
#                           rather than failing tests. And no longer raise
3417
3327
#                           LockContention when fctnl locks are not being used
3418
3328
#                           with proper exclusion rules.
3419
 
#   -Ethreads               Will display thread ident at creation/join time to
3420
 
#                           help track thread leaks
3421
3329
selftest_debug_flags = set()
3422
3330
 
3423
3331
 
3642
3550
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3643
3551
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3644
3552
 
3645
 
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3553
# Obvious higest levels prefixes, feel free to add your own via a plugin
3646
3554
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3647
3555
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3648
3556
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3656
3564
        'bzrlib.doc',
3657
3565
        'bzrlib.tests.blackbox',
3658
3566
        'bzrlib.tests.commands',
3659
 
        'bzrlib.tests.doc_generate',
3660
3567
        'bzrlib.tests.per_branch',
3661
 
        'bzrlib.tests.per_controldir',
3662
 
        'bzrlib.tests.per_controldir_colo',
 
3568
        'bzrlib.tests.per_bzrdir',
3663
3569
        'bzrlib.tests.per_foreign_vcs',
3664
3570
        'bzrlib.tests.per_interrepository',
3665
3571
        'bzrlib.tests.per_intertree',
3678
3584
        'bzrlib.tests.per_workingtree',
3679
3585
        'bzrlib.tests.test__annotator',
3680
3586
        'bzrlib.tests.test__bencode',
3681
 
        'bzrlib.tests.test__btree_serializer',
3682
3587
        'bzrlib.tests.test__chk_map',
3683
3588
        'bzrlib.tests.test__dirstate_helpers',
3684
3589
        'bzrlib.tests.test__groupcompress',
3706
3611
        'bzrlib.tests.test_chunk_writer',
3707
3612
        'bzrlib.tests.test_clean_tree',
3708
3613
        'bzrlib.tests.test_cleanup',
3709
 
        'bzrlib.tests.test_cmdline',
3710
3614
        'bzrlib.tests.test_commands',
3711
3615
        'bzrlib.tests.test_commit',
3712
3616
        'bzrlib.tests.test_commit_merge',
3727
3631
        'bzrlib.tests.test_export',
3728
3632
        'bzrlib.tests.test_extract',
3729
3633
        'bzrlib.tests.test_fetch',
3730
 
        'bzrlib.tests.test_fixtures',
3731
3634
        'bzrlib.tests.test_fifo_cache',
3732
3635
        'bzrlib.tests.test_filters',
3733
3636
        'bzrlib.tests.test_ftp_transport',
3747
3650
        'bzrlib.tests.test_identitymap',
3748
3651
        'bzrlib.tests.test_ignores',
3749
3652
        'bzrlib.tests.test_index',
3750
 
        'bzrlib.tests.test_import_tariff',
3751
3653
        'bzrlib.tests.test_info',
3752
3654
        'bzrlib.tests.test_inv',
3753
3655
        'bzrlib.tests.test_inventory_delta',
3754
3656
        'bzrlib.tests.test_knit',
3755
3657
        'bzrlib.tests.test_lazy_import',
3756
3658
        'bzrlib.tests.test_lazy_regex',
3757
 
        'bzrlib.tests.test_library_state',
3758
3659
        'bzrlib.tests.test_lock',
3759
3660
        'bzrlib.tests.test_lockable_files',
3760
3661
        'bzrlib.tests.test_lockdir',
3762
3663
        'bzrlib.tests.test_lru_cache',
3763
3664
        'bzrlib.tests.test_lsprof',
3764
3665
        'bzrlib.tests.test_mail_client',
3765
 
        'bzrlib.tests.test_matchers',
3766
3666
        'bzrlib.tests.test_memorytree',
3767
3667
        'bzrlib.tests.test_merge',
3768
3668
        'bzrlib.tests.test_merge3',
3817
3717
        'bzrlib.tests.test_switch',
3818
3718
        'bzrlib.tests.test_symbol_versioning',
3819
3719
        'bzrlib.tests.test_tag',
3820
 
        'bzrlib.tests.test_test_server',
3821
3720
        'bzrlib.tests.test_testament',
3822
3721
        'bzrlib.tests.test_textfile',
3823
3722
        'bzrlib.tests.test_textmerge',
3829
3728
        'bzrlib.tests.test_transport_log',
3830
3729
        'bzrlib.tests.test_tree',
3831
3730
        'bzrlib.tests.test_treebuilder',
3832
 
        'bzrlib.tests.test_treeshape',
3833
3731
        'bzrlib.tests.test_tsort',
3834
3732
        'bzrlib.tests.test_tuned_gzip',
3835
3733
        'bzrlib.tests.test_ui',
3839
3737
        'bzrlib.tests.test_urlutils',
3840
3738
        'bzrlib.tests.test_version',
3841
3739
        'bzrlib.tests.test_version_info',
3842
 
        'bzrlib.tests.test_versionedfile',
3843
3740
        'bzrlib.tests.test_weave',
3844
3741
        'bzrlib.tests.test_whitebox',
3845
3742
        'bzrlib.tests.test_win32utils',
3851
3748
 
3852
3749
 
3853
3750
def _test_suite_modules_to_doctest():
3854
 
    """Return the list of modules to doctest."""
3855
 
    if __doc__ is None:
3856
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3857
 
        return []
 
3751
    """Return the list of modules to doctest."""   
3858
3752
    return [
3859
3753
        'bzrlib',
3860
3754
        'bzrlib.branchbuilder',
3867
3761
        'bzrlib.option',
3868
3762
        'bzrlib.symbol_versioning',
3869
3763
        'bzrlib.tests',
3870
 
        'bzrlib.tests.fixtures',
3871
3764
        'bzrlib.timestamp',
3872
3765
        'bzrlib.version_info_formats.format_custom',
3873
3766
        ]
4014
3907
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4015
3908
    ...     [('one', dict(param=1)),
4016
3909
    ...      ('two', dict(param=2))],
4017
 
    ...     TestUtil.TestSuite())
 
3910
    ...     TestSuite())
4018
3911
    >>> tests = list(iter_suite_tests(r))
4019
3912
    >>> len(tests)
4020
3913
    2
4067
3960
    :param new_id: The id to assign to it.
4068
3961
    :return: The new test.
4069
3962
    """
4070
 
    new_test = copy.copy(test)
 
3963
    new_test = copy(test)
4071
3964
    new_test.id = lambda: new_id
4072
3965
    return new_test
4073
3966
 
4134
4027
        if test_id != None:
4135
4028
            ui.ui_factory.clear_term()
4136
4029
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4137
 
        # Ugly, but the last thing we want here is fail, so bear with it.
4138
 
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4139
 
                                    ).encode('ascii', 'replace')
4140
4030
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4141
 
                         % (os.path.basename(dirname), printable_e))
 
4031
                         % (os.path.basename(dirname), e))
4142
4032
 
4143
4033
 
4144
4034
class Feature(object):
4233
4123
    should really use a different feature.
4234
4124
    """
4235
4125
 
4236
 
    def __init__(self, dep_version, module, name,
4237
 
                 replacement_name, replacement_module=None):
 
4126
    def __init__(self, module, name, this_name, dep_version):
4238
4127
        super(_CompatabilityThunkFeature, self).__init__()
4239
4128
        self._module = module
4240
 
        if replacement_module is None:
4241
 
            replacement_module = module
4242
 
        self._replacement_module = replacement_module
4243
4129
        self._name = name
4244
 
        self._replacement_name = replacement_name
 
4130
        self._this_name = this_name
4245
4131
        self._dep_version = dep_version
4246
4132
        self._feature = None
4247
4133
 
4248
4134
    def _ensure(self):
4249
4135
        if self._feature is None:
4250
 
            depr_msg = self._dep_version % ('%s.%s'
4251
 
                                            % (self._module, self._name))
4252
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4253
 
                                               self._replacement_name)
4254
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4255
 
            # Import the new feature and use it as a replacement for the
4256
 
            # deprecated one.
4257
 
            mod = __import__(self._replacement_module, {}, {},
4258
 
                             [self._replacement_name])
4259
 
            self._feature = getattr(mod, self._replacement_name)
 
4136
            msg = (self._dep_version % self._this_name) + (
 
4137
                   ' Use %s.%s instead.' % (self._module, self._name))
 
4138
            symbol_versioning.warn(msg, DeprecationWarning)
 
4139
            mod = __import__(self._module, {}, {}, [self._name])
 
4140
            self._feature = getattr(mod, self._name)
4260
4141
 
4261
4142
    def _probe(self):
4262
4143
        self._ensure()
4288
4169
        if self.available(): # Make sure the probe has been done
4289
4170
            return self._module
4290
4171
        return None
4291
 
 
 
4172
    
4292
4173
    def feature_name(self):
4293
4174
        return self.module_name
4294
4175
 
4295
4176
 
4296
4177
# This is kept here for compatibility, it is recommended to use
4297
4178
# 'bzrlib.tests.feature.paramiko' instead
4298
 
ParamikoFeature = _CompatabilityThunkFeature(
4299
 
    deprecated_in((2,1,0)),
4300
 
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4179
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
 
4180
    'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4301
4181
 
4302
4182
 
4303
4183
def probe_unicode_in_user_encoding():
4374
4254
UnicodeFilename = _UnicodeFilename()
4375
4255
 
4376
4256
 
4377
 
class _ByteStringNamedFilesystem(Feature):
4378
 
    """Is the filesystem based on bytes?"""
4379
 
 
4380
 
    def _probe(self):
4381
 
        if os.name == "posix":
4382
 
            return True
4383
 
        return False
4384
 
 
4385
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4386
 
 
4387
 
 
4388
4257
class _UTF8Filesystem(Feature):
4389
4258
    """Is the filesystem UTF-8?"""
4390
4259
 
4493
4362
 
4494
4363
 
4495
4364
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4496
 
SubUnitFeature = _CompatabilityThunkFeature(
4497
 
    deprecated_in((2,1,0)),
4498
 
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
 
4365
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
 
4366
    'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
4499
4367
# Only define SubUnitBzrRunner if subunit is available.
4500
4368
try:
4501
4369
    from subunit import TestProtocolClient
4508
4376
            return result
4509
4377
except ImportError:
4510
4378
    pass
4511
 
 
4512
 
class _PosixPermissionsFeature(Feature):
4513
 
 
4514
 
    def _probe(self):
4515
 
        def has_perms():
4516
 
            # create temporary file and check if specified perms are maintained.
4517
 
            import tempfile
4518
 
 
4519
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4520
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4521
 
            fd, name = f
4522
 
            os.close(fd)
4523
 
            os.chmod(name, write_perms)
4524
 
 
4525
 
            read_perms = os.stat(name).st_mode & 0777
4526
 
            os.unlink(name)
4527
 
            return (write_perms == read_perms)
4528
 
 
4529
 
        return (os.name == 'posix') and has_perms()
4530
 
 
4531
 
    def feature_name(self):
4532
 
        return 'POSIX permissions support'
4533
 
 
4534
 
posix_permissions_feature = _PosixPermissionsFeature()