~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-11 02:16:42 UTC
  • mfrom: (5017.1.2 initialize)
  • Revision ID: pqm@pqm.ubuntu.com-20100211021642-eitum30b2e09oalf
(mbp) Add bzrlib.initialize

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
29
28
 
30
29
import atexit
31
30
import codecs
32
 
import copy
 
31
from copy import copy
33
32
from cStringIO import StringIO
34
33
import difflib
35
34
import doctest
36
35
import errno
37
 
import itertools
38
36
import logging
39
37
import math
40
38
import os
41
 
import pprint
 
39
from pprint import pformat
42
40
import random
43
41
import re
44
42
import shlex
45
43
import stat
46
 
import subprocess
 
44
from subprocess import Popen, PIPE, STDOUT
47
45
import sys
48
46
import tempfile
49
47
import threading
75
73
    ui,
76
74
    urlutils,
77
75
    registry,
78
 
    transport as _mod_transport,
79
76
    workingtree,
80
77
    )
81
78
import bzrlib.branch
104
101
    deprecated_passed,
105
102
    )
106
103
import bzrlib.trace
107
 
from bzrlib.transport import (
108
 
    memory,
109
 
    pathfilter,
110
 
    )
 
104
from bzrlib.transport import get_transport, pathfilter
 
105
import bzrlib.transport
 
106
from bzrlib.transport.local import LocalURLServer
 
107
from bzrlib.transport.memory import MemoryServer
 
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
 
from bzrlib.tests import (
113
 
    test_server,
114
 
    TestUtil,
115
 
    treeshape,
116
 
    )
 
110
from bzrlib.tests import TestUtil
 
111
from bzrlib.tests.http_server import HttpServer
 
112
from bzrlib.tests.TestUtil import (
 
113
                          TestSuite,
 
114
                          TestLoader,
 
115
                          )
 
116
from bzrlib.tests.treeshape import build_tree_contents
117
117
from bzrlib.ui import NullProgressView
118
118
from bzrlib.ui.text import TextUIFactory
119
119
import bzrlib.version_info_formats.format_custom
124
124
# shown frame is the test code, not our assertXYZ.
125
125
__unittest = 1
126
126
 
127
 
default_transport = test_server.LocalURLServer
 
127
default_transport = LocalURLServer
128
128
 
129
129
 
130
130
_unitialized_attr = object()
135
135
SUBUNIT_SEEK_SET = 0
136
136
SUBUNIT_SEEK_CUR = 1
137
137
 
138
 
# These are intentionally brought into this namespace. That way plugins, etc
139
 
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
140
 
TestSuite = TestUtil.TestSuite
141
 
TestLoader = TestUtil.TestLoader
142
138
 
143
 
class ExtendedTestResult(testtools.TextTestResult):
 
139
class ExtendedTestResult(unittest._TextTestResult):
144
140
    """Accepts, reports and accumulates the results of running tests.
145
141
 
146
142
    Compared to the unittest version this class adds support for
167
163
        :param bench_history: Optionally, a writable file object to accumulate
168
164
            benchmark results.
169
165
        """
170
 
        testtools.TextTestResult.__init__(self, stream)
 
166
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
171
167
        if bench_history is not None:
172
168
            from bzrlib.version import _get_bzr_source_tree
173
169
            src_tree = _get_bzr_source_tree()
200
196
        actionTaken = "Ran"
201
197
        stopTime = time.time()
202
198
        timeTaken = stopTime - self.startTime
203
 
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
204
 
        #                the parent class method is similar have to duplicate
205
 
        self._show_list('ERROR', self.errors)
206
 
        self._show_list('FAIL', self.failures)
207
 
        self.stream.write(self.sep2)
208
 
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
199
        self.printErrors()
 
200
        self.stream.writeln(self.separator2)
 
201
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
209
202
                            run, run != 1 and "s" or "", timeTaken))
 
203
        self.stream.writeln()
210
204
        if not self.wasSuccessful():
211
205
            self.stream.write("FAILED (")
212
206
            failed, errored = map(len, (self.failures, self.errors))
219
213
                if failed or errored: self.stream.write(", ")
220
214
                self.stream.write("known_failure_count=%d" %
221
215
                    self.known_failure_count)
222
 
            self.stream.write(")\n")
 
216
            self.stream.writeln(")")
223
217
        else:
224
218
            if self.known_failure_count:
225
 
                self.stream.write("OK (known_failures=%d)\n" %
 
219
                self.stream.writeln("OK (known_failures=%d)" %
226
220
                    self.known_failure_count)
227
221
            else:
228
 
                self.stream.write("OK\n")
 
222
                self.stream.writeln("OK")
229
223
        if self.skip_count > 0:
230
224
            skipped = self.skip_count
231
 
            self.stream.write('%d test%s skipped\n' %
 
225
            self.stream.writeln('%d test%s skipped' %
232
226
                                (skipped, skipped != 1 and "s" or ""))
233
227
        if self.unsupported:
234
228
            for feature, count in sorted(self.unsupported.items()):
235
 
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
229
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
236
230
                    (feature, count))
237
231
        if self._strict:
238
232
            ok = self.wasStrictlySuccessful()
276
270
 
277
271
    def _shortened_test_description(self, test):
278
272
        what = test.id()
279
 
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
273
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
280
274
        return what
281
275
 
282
276
    def startTest(self, test):
283
 
        super(ExtendedTestResult, self).startTest(test)
 
277
        unittest.TestResult.startTest(self, test)
284
278
        if self.count == 0:
285
279
            self.startTests()
286
280
        self.report_test_start(test)
324
318
        fails with an unexpected error.
325
319
        """
326
320
        self._post_mortem()
327
 
        super(ExtendedTestResult, self).addError(test, err)
 
321
        unittest.TestResult.addError(self, test, err)
328
322
        self.error_count += 1
329
323
        self.report_error(test, err)
330
324
        if self.stop_early:
338
332
        fails because e.g. an assert() method failed.
339
333
        """
340
334
        self._post_mortem()
341
 
        super(ExtendedTestResult, self).addFailure(test, err)
 
335
        unittest.TestResult.addFailure(self, test, err)
342
336
        self.failure_count += 1
343
337
        self.report_failure(test, err)
344
338
        if self.stop_early:
358
352
                    test.id()))
359
353
        self.report_success(test)
360
354
        self._cleanupLogFile(test)
361
 
        super(ExtendedTestResult, self).addSuccess(test)
 
355
        unittest.TestResult.addSuccess(self, test)
362
356
        test._log_contents = ''
363
357
 
364
358
    def addExpectedFailure(self, test, err):
491
485
        return self._shortened_test_description(test)
492
486
 
493
487
    def report_error(self, test, err):
494
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
488
        self.ui.note('ERROR: %s\n    %s\n' % (
495
489
            self._test_description(test),
496
490
            err[1],
497
491
            ))
498
492
 
499
493
    def report_failure(self, test, err):
500
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
494
        self.ui.note('FAIL: %s\n    %s\n' % (
501
495
            self._test_description(test),
502
496
            err[1],
503
497
            ))
552
546
        return '%s%s' % (indent, err[1])
553
547
 
554
548
    def report_error(self, test, err):
555
 
        self.stream.write('ERROR %s\n%s\n'
 
549
        self.stream.writeln('ERROR %s\n%s'
556
550
                % (self._testTimeString(test),
557
551
                   self._error_summary(err)))
558
552
 
559
553
    def report_failure(self, test, err):
560
 
        self.stream.write(' FAIL %s\n%s\n'
 
554
        self.stream.writeln(' FAIL %s\n%s'
561
555
                % (self._testTimeString(test),
562
556
                   self._error_summary(err)))
563
557
 
564
558
    def report_known_failure(self, test, err):
565
 
        self.stream.write('XFAIL %s\n%s\n'
 
559
        self.stream.writeln('XFAIL %s\n%s'
566
560
                % (self._testTimeString(test),
567
561
                   self._error_summary(err)))
568
562
 
569
563
    def report_success(self, test):
570
 
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
564
        self.stream.writeln('   OK %s' % self._testTimeString(test))
571
565
        for bench_called, stats in getattr(test, '_benchcalls', []):
572
 
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
566
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
573
567
            stats.pprint(file=self.stream)
574
568
        # flush the stream so that we get smooth output. This verbose mode is
575
569
        # used to show the output in PQM.
576
570
        self.stream.flush()
577
571
 
578
572
    def report_skip(self, test, reason):
579
 
        self.stream.write(' SKIP %s\n%s\n'
 
573
        self.stream.writeln(' SKIP %s\n%s'
580
574
                % (self._testTimeString(test), reason))
581
575
 
582
576
    def report_not_applicable(self, test, reason):
583
 
        self.stream.write('  N/A %s\n    %s\n'
 
577
        self.stream.writeln('  N/A %s\n    %s'
584
578
                % (self._testTimeString(test), reason))
585
579
 
586
580
    def report_unsupported(self, test, feature):
587
581
        """test cannot be run because feature is missing."""
588
 
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
582
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
589
583
                %(self._testTimeString(test), feature))
590
584
 
591
585
 
620
614
            encode = codec.encode
621
615
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
622
616
        stream.encoding = new_encoding
623
 
        self.stream = stream
 
617
        self.stream = unittest._WritelnDecorator(stream)
624
618
        self.descriptions = descriptions
625
619
        self.verbosity = verbosity
626
620
        self._bench_history = bench_history
750
744
    # XXX: Should probably unify more with CannedInputUIFactory or a
751
745
    # particular configuration of TextUIFactory, or otherwise have a clearer
752
746
    # idea of how they're supposed to be different.
753
 
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
747
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
754
748
 
755
749
    def __init__(self, stdout=None, stderr=None, stdin=None):
756
750
        if stdin is not None:
798
792
    _active_threads = None
799
793
    _leaking_threads_tests = 0
800
794
    _first_thread_leaker_id = None
801
 
    _log_file = None
 
795
    _log_file_name = None
802
796
    # record lsprof data when performing benchmark calls.
803
797
    _gather_lsprof_in_benchmarks = False
804
798
 
847
841
        # going away but leak one) but it seems less likely than the actual
848
842
        # false positives (the test see threads going away and does not leak).
849
843
        if leaked_threads > 0:
850
 
            if 'threads' in selftest_debug_flags:
851
 
                print '%s is leaking, active is now %d' % (self.id(), active)
852
844
            TestCase._leaking_threads_tests += 1
853
845
            if TestCase._first_thread_leaker_id is None:
854
846
                TestCase._first_thread_leaker_id = self.id()
946
938
 
947
939
    def permit_dir(self, name):
948
940
        """Permit a directory to be used by this test. See permit_url."""
949
 
        name_transport = _mod_transport.get_transport(name)
 
941
        name_transport = get_transport(name)
950
942
        self.permit_url(name)
951
943
        self.permit_url(name_transport.base)
952
944
 
1031
1023
        self.addCleanup(transport_server.stop_server)
1032
1024
        # Obtain a real transport because if the server supplies a password, it
1033
1025
        # will be hidden from the base on the client side.
1034
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1026
        t = get_transport(transport_server.get_url())
1035
1027
        # Some transport servers effectively chroot the backing transport;
1036
1028
        # others like SFTPServer don't - users of the transport can walk up the
1037
1029
        # transport to read the entire backing transport. This wouldn't matter
1048
1040
        if t.base.endswith('/work/'):
1049
1041
            # we have safety net/test root/work
1050
1042
            t = t.clone('../..')
1051
 
        elif isinstance(transport_server,
1052
 
                        test_server.SmartTCPServer_for_testing):
 
1043
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1053
1044
            # The smart server adds a path similar to work, which is traversed
1054
1045
            # up from by the client. But the server is chrooted - the actual
1055
1046
            # backing transport is not escaped from, and VFS requests to the
1098
1089
            message += '\n'
1099
1090
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1100
1091
            % (message,
1101
 
               pprint.pformat(a), pprint.pformat(b)))
 
1092
               pformat(a), pformat(b)))
1102
1093
 
1103
1094
    assertEquals = assertEqual
1104
1095
 
1210
1201
            raise AssertionError('pattern "%s" found in "%s"'
1211
1202
                    % (needle_re, haystack))
1212
1203
 
1213
 
    def assertContainsString(self, haystack, needle):
1214
 
        if haystack.find(needle) == -1:
1215
 
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1216
 
 
1217
1204
    def assertSubset(self, sublist, superlist):
1218
1205
        """Assert that every entry in sublist is present in superlist."""
1219
1206
        missing = set(sublist) - set(superlist)
1316
1303
            f.close()
1317
1304
        self.assertEqualDiff(content, s)
1318
1305
 
1319
 
    def assertDocstring(self, expected_docstring, obj):
1320
 
        """Fail if obj does not have expected_docstring"""
1321
 
        if __doc__ is None:
1322
 
            # With -OO the docstring should be None instead
1323
 
            self.assertIs(obj.__doc__, None)
1324
 
        else:
1325
 
            self.assertEqual(expected_docstring, obj.__doc__)
1326
 
 
1327
1306
    def failUnlessExists(self, path):
1328
1307
        """Fail unless path or paths, which may be abs or relative, exist."""
1329
1308
        if not isinstance(path, basestring):
1458
1437
 
1459
1438
        The file is removed as the test is torn down.
1460
1439
        """
1461
 
        self._log_file = StringIO()
 
1440
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1441
        self._log_file = os.fdopen(fileno, 'w+')
1462
1442
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1443
        self._log_file_name = name
1463
1444
        self.addCleanup(self._finishLogFile)
1464
1445
 
1465
1446
    def _finishLogFile(self):
1525
1506
            'EDITOR': None,
1526
1507
            'BZR_EMAIL': None,
1527
1508
            'BZREMAIL': None, # may still be present in the environment
1528
 
            'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
1509
            'EMAIL': None,
1529
1510
            'BZR_PROGRESS_BAR': None,
1530
1511
            'BZR_LOG': None,
1531
1512
            'BZR_PLUGIN_PATH': None,
1532
 
            'BZR_DISABLE_PLUGINS': None,
1533
 
            'BZR_PLUGINS_AT': None,
1534
1513
            'BZR_CONCURRENCY': None,
1535
1514
            # Make sure that any text ui tests are consistent regardless of
1536
1515
            # the environment the test case is run in; you may want tests that
1563
1542
            # use an env var so it propagates to subprocesses.
1564
1543
            'APPORT_DISABLE': '1',
1565
1544
        }
1566
 
        self._old_env = {}
 
1545
        self.__old_env = {}
1567
1546
        self.addCleanup(self._restoreEnvironment)
1568
1547
        for name, value in new_env.iteritems():
1569
1548
            self._captureVar(name, value)
1570
1549
 
1571
1550
    def _captureVar(self, name, newvalue):
1572
1551
        """Set an environment variable, and reset it when finished."""
1573
 
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1552
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1574
1553
 
1575
1554
    def _restoreEnvironment(self):
1576
 
        for name, value in self._old_env.iteritems():
 
1555
        for name, value in self.__old_env.iteritems():
1577
1556
            osutils.set_or_unset_env(name, value)
1578
1557
 
1579
1558
    def _restoreHooks(self):
1667
1646
                unicodestr = self._log_contents.decode('utf8', 'replace')
1668
1647
                self._log_contents = unicodestr.encode('utf8')
1669
1648
            return self._log_contents
1670
 
        if self._log_file is not None:
1671
 
            log_contents = self._log_file.getvalue()
 
1649
        import bzrlib.trace
 
1650
        if bzrlib.trace._trace_file:
 
1651
            # flush the log file, to get all content
 
1652
            bzrlib.trace._trace_file.flush()
 
1653
        if self._log_file_name is not None:
 
1654
            logfile = open(self._log_file_name)
 
1655
            try:
 
1656
                log_contents = logfile.read()
 
1657
            finally:
 
1658
                logfile.close()
1672
1659
            try:
1673
1660
                log_contents.decode('utf8')
1674
1661
            except UnicodeDecodeError:
1675
1662
                unicodestr = log_contents.decode('utf8', 'replace')
1676
1663
                log_contents = unicodestr.encode('utf8')
1677
1664
            if not keep_log_file:
 
1665
                self._log_file.close()
1678
1666
                self._log_file = None
1679
1667
                # Permit multiple calls to get_log until we clean it up in
1680
1668
                # finishLogFile
1681
1669
                self._log_contents = log_contents
 
1670
                try:
 
1671
                    os.remove(self._log_file_name)
 
1672
                except OSError, e:
 
1673
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1674
                        sys.stderr.write(('Unable to delete log file '
 
1675
                                             ' %r\n' % self._log_file_name))
 
1676
                    else:
 
1677
                        raise
 
1678
                self._log_file_name = None
1682
1679
            return log_contents
1683
1680
        else:
1684
 
            return "No log file content."
 
1681
            return "No log file content and no log file name."
1685
1682
 
1686
1683
    def get_log(self):
1687
1684
        """Get a unicode string containing the log from bzrlib.trace.
1902
1899
            variables. A value of None will unset the env variable.
1903
1900
            The values must be strings. The change will only occur in the
1904
1901
            child, so you don't need to fix the environment after running.
1905
 
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
1906
 
            doesn't support signalling subprocesses.
 
1902
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1903
            is not available.
1907
1904
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
1908
1905
 
1909
1906
        :returns: Popen object for the started process.
1910
1907
        """
1911
1908
        if skip_if_plan_to_signal:
1912
 
            if os.name != "posix":
1913
 
                raise TestSkipped("Sending signals not supported")
 
1909
            if not getattr(os, 'kill', None):
 
1910
                raise TestSkipped("os.kill not available.")
1914
1911
 
1915
1912
        if env_changes is None:
1916
1913
            env_changes = {}
1943
1940
            if not allow_plugins:
1944
1941
                command.append('--no-plugins')
1945
1942
            command.extend(process_args)
1946
 
            process = self._popen(command, stdin=subprocess.PIPE,
1947
 
                                  stdout=subprocess.PIPE,
1948
 
                                  stderr=subprocess.PIPE)
 
1943
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1949
1944
        finally:
1950
1945
            restore_environment()
1951
1946
            if cwd is not None:
1959
1954
        Allows tests to override this method to intercept the calls made to
1960
1955
        Popen for introspection.
1961
1956
        """
1962
 
        return subprocess.Popen(*args, **kwargs)
 
1957
        return Popen(*args, **kwargs)
1963
1958
 
1964
1959
    def get_source_path(self):
1965
1960
        """Return the path of the directory containing bzrlib."""
1967
1962
 
1968
1963
    def get_bzr_path(self):
1969
1964
        """Return the path of the 'bzr' executable for this test suite."""
1970
 
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
1965
        bzr_path = self.get_source_path()+'/bzr'
1971
1966
        if not os.path.isfile(bzr_path):
1972
1967
            # We are probably installed. Assume sys.argv is the right file
1973
1968
            bzr_path = sys.argv[0]
2145
2140
 
2146
2141
        :param relpath: a path relative to the base url.
2147
2142
        """
2148
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2143
        t = get_transport(self.get_url(relpath))
2149
2144
        self.assertFalse(t.is_readonly())
2150
2145
        return t
2151
2146
 
2157
2152
 
2158
2153
        :param relpath: a path relative to the base url.
2159
2154
        """
2160
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2155
        t = get_transport(self.get_readonly_url(relpath))
2161
2156
        self.assertTrue(t.is_readonly())
2162
2157
        return t
2163
2158
 
2176
2171
        if self.__readonly_server is None:
2177
2172
            if self.transport_readonly_server is None:
2178
2173
                # readonly decorator requested
2179
 
                self.__readonly_server = test_server.ReadonlyServer()
 
2174
                self.__readonly_server = ReadonlyServer()
2180
2175
            else:
2181
2176
                # explicit readonly transport.
2182
2177
                self.__readonly_server = self.create_transport_readonly_server()
2205
2200
        is no means to override it.
2206
2201
        """
2207
2202
        if self.__vfs_server is None:
2208
 
            self.__vfs_server = memory.MemoryServer()
 
2203
            self.__vfs_server = MemoryServer()
2209
2204
            self.start_server(self.__vfs_server)
2210
2205
        return self.__vfs_server
2211
2206
 
2293
2288
        propagating. This method ensures than a test did not leaked.
2294
2289
        """
2295
2290
        root = TestCaseWithMemoryTransport.TEST_ROOT
2296
 
        self.permit_url(_mod_transport.get_transport(root).base)
 
2291
        self.permit_url(get_transport(root).base)
2297
2292
        wt = workingtree.WorkingTree.open(root)
2298
2293
        last_rev = wt.last_revision()
2299
2294
        if last_rev != 'null:':
2344
2339
            # might be a relative or absolute path
2345
2340
            maybe_a_url = self.get_url(relpath)
2346
2341
            segments = maybe_a_url.rsplit('/', 1)
2347
 
            t = _mod_transport.get_transport(maybe_a_url)
 
2342
            t = get_transport(maybe_a_url)
2348
2343
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2349
2344
                t.ensure_base()
2350
2345
            if format is None:
2367
2362
        made_control = self.make_bzrdir(relpath, format=format)
2368
2363
        return made_control.create_repository(shared=shared)
2369
2364
 
2370
 
    def make_smart_server(self, path, backing_server=None):
2371
 
        if backing_server is None:
2372
 
            backing_server = self.get_server()
2373
 
        smart_server = test_server.SmartTCPServer_for_testing()
2374
 
        self.start_server(smart_server, backing_server)
2375
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2376
 
                                                   ).clone(path)
 
2365
    def make_smart_server(self, path):
 
2366
        smart_server = server.SmartTCPServer_for_testing()
 
2367
        self.start_server(smart_server, self.get_server())
 
2368
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2377
2369
        return remote_transport
2378
2370
 
2379
2371
    def make_branch_and_memory_tree(self, relpath, format=None):
2394
2386
 
2395
2387
    def setUp(self):
2396
2388
        super(TestCaseWithMemoryTransport, self).setUp()
2397
 
        # Ensure that ConnectedTransport doesn't leak sockets
2398
 
        def get_transport_with_cleanup(*args, **kwargs):
2399
 
            t = orig_get_transport(*args, **kwargs)
2400
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2401
 
                self.addCleanup(t.disconnect)
2402
 
            return t
2403
 
 
2404
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2405
 
                                               get_transport_with_cleanup)
2406
2389
        self._make_test_root()
2407
2390
        self.addCleanup(os.chdir, os.getcwdu())
2408
2391
        self.makeAndChdirToTestDir()
2413
2396
 
2414
2397
    def setup_smart_server_with_call_log(self):
2415
2398
        """Sets up a smart server as the transport server with a call log."""
2416
 
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2399
        self.transport_server = server.SmartTCPServer_for_testing
2417
2400
        self.hpss_calls = []
2418
2401
        import traceback
2419
2402
        # Skip the current stack down to the caller of
2453
2436
 
2454
2437
    def check_file_contents(self, filename, expect):
2455
2438
        self.log("check contents of file %s" % filename)
2456
 
        f = file(filename)
2457
 
        try:
2458
 
            contents = f.read()
2459
 
        finally:
2460
 
            f.close()
 
2439
        contents = file(filename, 'r').read()
2461
2440
        if contents != expect:
2462
2441
            self.log("expected: %r" % expect)
2463
2442
            self.log("actually: %r" % contents)
2537
2516
                "a list or a tuple. Got %r instead" % (shape,))
2538
2517
        # It's OK to just create them using forward slashes on windows.
2539
2518
        if transport is None or transport.is_readonly():
2540
 
            transport = _mod_transport.get_transport(".")
 
2519
            transport = get_transport(".")
2541
2520
        for name in shape:
2542
2521
            self.assertIsInstance(name, basestring)
2543
2522
            if name[-1] == '/':
2553
2532
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2554
2533
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2555
2534
 
2556
 
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2535
    def build_tree_contents(self, shape):
 
2536
        build_tree_contents(shape)
2557
2537
 
2558
2538
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2559
2539
        """Assert whether path or paths are in the WorkingTree"""
2635
2615
            # We can only make working trees locally at the moment.  If the
2636
2616
            # transport can't support them, then we keep the non-disk-backed
2637
2617
            # branch and create a local checkout.
2638
 
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2618
            if self.vfs_transport_factory is LocalURLServer:
2639
2619
                # the branch is colocated on disk, we cannot create a checkout.
2640
2620
                # hopefully callers will expect this.
2641
2621
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2700
2680
    """
2701
2681
 
2702
2682
    def setUp(self):
2703
 
        from bzrlib.tests import http_server
2704
2683
        super(ChrootedTestCase, self).setUp()
2705
 
        if not self.vfs_transport_factory == memory.MemoryServer:
2706
 
            self.transport_readonly_server = http_server.HttpServer
 
2684
        if not self.vfs_transport_factory == MemoryServer:
 
2685
            self.transport_readonly_server = HttpServer
2707
2686
 
2708
2687
 
2709
2688
def condition_id_re(pattern):
2712
2691
    :param pattern: A regular expression string.
2713
2692
    :return: A callable that returns True if the re matches.
2714
2693
    """
2715
 
    filter_re = re.compile(pattern, 0)
 
2694
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2695
        'test filter')
2716
2696
    def condition(test):
2717
2697
        test_id = test.id()
2718
2698
        return filter_re.search(test_id)
3030
3010
    return suite
3031
3011
 
3032
3012
 
3033
 
class TestDecorator(TestUtil.TestSuite):
 
3013
class TestDecorator(TestSuite):
3034
3014
    """A decorator for TestCase/TestSuite objects.
3035
3015
    
3036
3016
    Usually, subclasses should override __iter__(used when flattening test
3039
3019
    """
3040
3020
 
3041
3021
    def __init__(self, suite):
3042
 
        TestUtil.TestSuite.__init__(self)
 
3022
        TestSuite.__init__(self)
3043
3023
        self.addTest(suite)
3044
3024
 
3045
3025
    def countTestCases(self):
3164
3144
 
3165
3145
def partition_tests(suite, count):
3166
3146
    """Partition suite into count lists of tests."""
3167
 
    # This just assigns tests in a round-robin fashion.  On one hand this
3168
 
    # splits up blocks of related tests that might run faster if they shared
3169
 
    # resources, but on the other it avoids assigning blocks of slow tests to
3170
 
    # just one partition.  So the slowest partition shouldn't be much slower
3171
 
    # than the fastest.
3172
 
    partitions = [list() for i in range(count)]
3173
 
    tests = iter_suite_tests(suite)
3174
 
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3175
 
        partition.append(test)
3176
 
    return partitions
3177
 
 
3178
 
 
3179
 
def workaround_zealous_crypto_random():
3180
 
    """Crypto.Random want to help us being secure, but we don't care here.
3181
 
 
3182
 
    This workaround some test failure related to the sftp server. Once paramiko
3183
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3184
 
    """
3185
 
    try:
3186
 
        from Crypto.Random import atfork
3187
 
        atfork()
3188
 
    except ImportError:
3189
 
        pass
 
3147
    result = []
 
3148
    tests = list(iter_suite_tests(suite))
 
3149
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3150
    for block in range(count):
 
3151
        low_test = block * tests_per_process
 
3152
        high_test = low_test + tests_per_process
 
3153
        process_tests = tests[low_test:high_test]
 
3154
        result.append(process_tests)
 
3155
    return result
3190
3156
 
3191
3157
 
3192
3158
def fork_for_tests(suite):
3209
3175
            try:
3210
3176
                ProtocolTestCase.run(self, result)
3211
3177
            finally:
3212
 
                os.waitpid(self.pid, 0)
 
3178
                os.waitpid(self.pid, os.WNOHANG)
3213
3179
 
3214
3180
    test_blocks = partition_tests(suite, concurrency)
3215
3181
    for process_tests in test_blocks:
3216
 
        process_suite = TestUtil.TestSuite()
 
3182
        process_suite = TestSuite()
3217
3183
        process_suite.addTests(process_tests)
3218
3184
        c2pread, c2pwrite = os.pipe()
3219
3185
        pid = os.fork()
3220
3186
        if pid == 0:
3221
 
            workaround_zealous_crypto_random()
3222
3187
            try:
3223
3188
                os.close(c2pread)
3224
3189
                # Leave stderr and stdout open so we can see test noise
3285
3250
                '--subunit']
3286
3251
            if '--no-plugins' in sys.argv:
3287
3252
                argv.append('--no-plugins')
3288
 
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3289
 
            # noise on stderr it can interrupt the subunit protocol.
3290
 
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3291
 
                                      stdout=subprocess.PIPE,
3292
 
                                      stderr=subprocess.PIPE,
3293
 
                                      bufsize=1)
 
3253
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3254
            # stderr it can interrupt the subunit protocol.
 
3255
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3256
                bufsize=1)
3294
3257
            test = TestInSubprocess(process, test_list_file_name)
3295
3258
            result.append(test)
3296
3259
        except:
3345
3308
 
3346
3309
    def startTest(self, test):
3347
3310
        self.profiler = bzrlib.lsprof.BzrProfiler()
3348
 
        # Prevent deadlocks in tests that use lsprof: those tests will
3349
 
        # unavoidably fail.
3350
 
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3351
3311
        self.profiler.start()
3352
3312
        ForwardingResult.startTest(self, test)
3353
3313
 
3374
3334
#                           rather than failing tests. And no longer raise
3375
3335
#                           LockContention when fctnl locks are not being used
3376
3336
#                           with proper exclusion rules.
3377
 
#   -Ethreads               Will display thread ident at creation/join time to
3378
 
#                           help track thread leaks
3379
3337
selftest_debug_flags = set()
3380
3338
 
3381
3339
 
3614
3572
        'bzrlib.doc',
3615
3573
        'bzrlib.tests.blackbox',
3616
3574
        'bzrlib.tests.commands',
3617
 
        'bzrlib.tests.doc_generate',
3618
3575
        'bzrlib.tests.per_branch',
3619
 
        'bzrlib.tests.per_controldir',
3620
 
        'bzrlib.tests.per_controldir_colo',
 
3576
        'bzrlib.tests.per_bzrdir',
3621
3577
        'bzrlib.tests.per_foreign_vcs',
3622
3578
        'bzrlib.tests.per_interrepository',
3623
3579
        'bzrlib.tests.per_intertree',
3636
3592
        'bzrlib.tests.per_workingtree',
3637
3593
        'bzrlib.tests.test__annotator',
3638
3594
        'bzrlib.tests.test__bencode',
3639
 
        'bzrlib.tests.test__btree_serializer',
3640
3595
        'bzrlib.tests.test__chk_map',
3641
3596
        'bzrlib.tests.test__dirstate_helpers',
3642
3597
        'bzrlib.tests.test__groupcompress',
3664
3619
        'bzrlib.tests.test_chunk_writer',
3665
3620
        'bzrlib.tests.test_clean_tree',
3666
3621
        'bzrlib.tests.test_cleanup',
3667
 
        'bzrlib.tests.test_cmdline',
3668
3622
        'bzrlib.tests.test_commands',
3669
3623
        'bzrlib.tests.test_commit',
3670
3624
        'bzrlib.tests.test_commit_merge',
3685
3639
        'bzrlib.tests.test_export',
3686
3640
        'bzrlib.tests.test_extract',
3687
3641
        'bzrlib.tests.test_fetch',
3688
 
        'bzrlib.tests.test_fixtures',
3689
3642
        'bzrlib.tests.test_fifo_cache',
3690
3643
        'bzrlib.tests.test_filters',
3691
3644
        'bzrlib.tests.test_ftp_transport',
3705
3658
        'bzrlib.tests.test_identitymap',
3706
3659
        'bzrlib.tests.test_ignores',
3707
3660
        'bzrlib.tests.test_index',
3708
 
        'bzrlib.tests.test_import_tariff',
3709
3661
        'bzrlib.tests.test_info',
3710
3662
        'bzrlib.tests.test_inv',
3711
3663
        'bzrlib.tests.test_inventory_delta',
3712
3664
        'bzrlib.tests.test_knit',
3713
3665
        'bzrlib.tests.test_lazy_import',
3714
3666
        'bzrlib.tests.test_lazy_regex',
3715
 
        'bzrlib.tests.test_library_state',
3716
3667
        'bzrlib.tests.test_lock',
3717
3668
        'bzrlib.tests.test_lockable_files',
3718
3669
        'bzrlib.tests.test_lockdir',
3720
3671
        'bzrlib.tests.test_lru_cache',
3721
3672
        'bzrlib.tests.test_lsprof',
3722
3673
        'bzrlib.tests.test_mail_client',
3723
 
        'bzrlib.tests.test_matchers',
3724
3674
        'bzrlib.tests.test_memorytree',
3725
3675
        'bzrlib.tests.test_merge',
3726
3676
        'bzrlib.tests.test_merge3',
3775
3725
        'bzrlib.tests.test_switch',
3776
3726
        'bzrlib.tests.test_symbol_versioning',
3777
3727
        'bzrlib.tests.test_tag',
3778
 
        'bzrlib.tests.test_test_server',
3779
3728
        'bzrlib.tests.test_testament',
3780
3729
        'bzrlib.tests.test_textfile',
3781
3730
        'bzrlib.tests.test_textmerge',
3787
3736
        'bzrlib.tests.test_transport_log',
3788
3737
        'bzrlib.tests.test_tree',
3789
3738
        'bzrlib.tests.test_treebuilder',
3790
 
        'bzrlib.tests.test_treeshape',
3791
3739
        'bzrlib.tests.test_tsort',
3792
3740
        'bzrlib.tests.test_tuned_gzip',
3793
3741
        'bzrlib.tests.test_ui',
3797
3745
        'bzrlib.tests.test_urlutils',
3798
3746
        'bzrlib.tests.test_version',
3799
3747
        'bzrlib.tests.test_version_info',
3800
 
        'bzrlib.tests.test_versionedfile',
3801
3748
        'bzrlib.tests.test_weave',
3802
3749
        'bzrlib.tests.test_whitebox',
3803
3750
        'bzrlib.tests.test_win32utils',
3809
3756
 
3810
3757
 
3811
3758
def _test_suite_modules_to_doctest():
3812
 
    """Return the list of modules to doctest."""
3813
 
    if __doc__ is None:
3814
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3815
 
        return []
 
3759
    """Return the list of modules to doctest."""   
3816
3760
    return [
3817
3761
        'bzrlib',
3818
3762
        'bzrlib.branchbuilder',
3825
3769
        'bzrlib.option',
3826
3770
        'bzrlib.symbol_versioning',
3827
3771
        'bzrlib.tests',
3828
 
        'bzrlib.tests.fixtures',
3829
3772
        'bzrlib.timestamp',
3830
3773
        'bzrlib.version_info_formats.format_custom',
3831
3774
        ]
3972
3915
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3973
3916
    ...     [('one', dict(param=1)),
3974
3917
    ...      ('two', dict(param=2))],
3975
 
    ...     TestUtil.TestSuite())
 
3918
    ...     TestSuite())
3976
3919
    >>> tests = list(iter_suite_tests(r))
3977
3920
    >>> len(tests)
3978
3921
    2
4025
3968
    :param new_id: The id to assign to it.
4026
3969
    :return: The new test.
4027
3970
    """
4028
 
    new_test = copy.copy(test)
 
3971
    new_test = copy(test)
4029
3972
    new_test.id = lambda: new_id
4030
3973
    return new_test
4031
3974
 
4092
4035
        if test_id != None:
4093
4036
            ui.ui_factory.clear_term()
4094
4037
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4095
 
        # Ugly, but the last thing we want here is fail, so bear with it.
4096
 
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4097
 
                                    ).encode('ascii', 'replace')
4098
4038
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4099
 
                         % (os.path.basename(dirname), printable_e))
 
4039
                         % (os.path.basename(dirname), e))
4100
4040
 
4101
4041
 
4102
4042
class Feature(object):
4332
4272
UnicodeFilename = _UnicodeFilename()
4333
4273
 
4334
4274
 
4335
 
class _ByteStringNamedFilesystem(Feature):
4336
 
    """Is the filesystem based on bytes?"""
4337
 
 
4338
 
    def _probe(self):
4339
 
        if os.name == "posix":
4340
 
            return True
4341
 
        return False
4342
 
 
4343
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4344
 
 
4345
 
 
4346
4275
class _UTF8Filesystem(Feature):
4347
4276
    """Is the filesystem UTF-8?"""
4348
4277
 
4433
4362
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4434
4363
 
4435
4364
 
4436
 
class _CaseSensitiveFilesystemFeature(Feature):
4437
 
 
4438
 
    def _probe(self):
4439
 
        if CaseInsCasePresFilenameFeature.available():
4440
 
            return False
4441
 
        elif CaseInsensitiveFilesystemFeature.available():
4442
 
            return False
4443
 
        else:
4444
 
            return True
4445
 
 
4446
 
    def feature_name(self):
4447
 
        return 'case-sensitive filesystem'
4448
 
 
4449
 
# new coding style is for feature instances to be lowercase
4450
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4451
 
 
4452
 
 
4453
4365
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4454
4366
SubUnitFeature = _CompatabilityThunkFeature(
4455
4367
    deprecated_in((2,1,0)),
4466
4378
            return result
4467
4379
except ImportError:
4468
4380
    pass
4469
 
 
4470
 
class _PosixPermissionsFeature(Feature):
4471
 
 
4472
 
    def _probe(self):
4473
 
        def has_perms():
4474
 
            # create temporary file and check if specified perms are maintained.
4475
 
            import tempfile
4476
 
 
4477
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4478
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4479
 
            fd, name = f
4480
 
            os.close(fd)
4481
 
            os.chmod(name, write_perms)
4482
 
 
4483
 
            read_perms = os.stat(name).st_mode & 0777
4484
 
            os.unlink(name)
4485
 
            return (write_perms == read_perms)
4486
 
 
4487
 
        return (os.name == 'posix') and has_perms()
4488
 
 
4489
 
    def feature_name(self):
4490
 
        return 'POSIX permissions support'
4491
 
 
4492
 
posix_permissions_feature = _PosixPermissionsFeature()