~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Robert Collins
  • Date: 2007-04-19 02:27:44 UTC
  • mto: This revision was merged to the branch mainline in revision 2426.
  • Revision ID: robertc@robertcollins.net-20070419022744-pfdqz42kp1wizh43
``make docs`` now creates a man page at ``man1/bzr.1`` fixing bug 107388.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
27
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
28
27
# new assertFoo() methods.
29
28
 
30
 
import atexit
31
29
import codecs
32
 
import copy
33
30
from cStringIO import StringIO
34
31
import difflib
35
32
import doctest
36
33
import errno
37
 
import itertools
38
34
import logging
39
35
import os
40
 
import platform
41
 
import pprint
42
 
import random
 
36
from pprint import pformat
43
37
import re
44
38
import shlex
45
39
import stat
46
 
import subprocess
 
40
from subprocess import Popen, PIPE
47
41
import sys
48
42
import tempfile
49
 
import threading
 
43
import unittest
50
44
import time
51
 
import traceback
52
 
import unittest
53
 
import warnings
54
 
 
55
 
import testtools
56
 
# nb: check this before importing anything else from within it
57
 
_testtools_version = getattr(testtools, '__version__', ())
58
 
if _testtools_version < (0, 9, 5):
59
 
    raise ImportError("need at least testtools 0.9.5: %s is %r"
60
 
        % (testtools.__file__, _testtools_version))
61
 
from testtools import content
62
 
 
63
 
import bzrlib
 
45
 
 
46
 
64
47
from bzrlib import (
65
 
    branchbuilder,
66
48
    bzrdir,
67
 
    chk_map,
68
 
    commands as _mod_commands,
69
 
    config,
70
49
    debug,
71
50
    errors,
72
 
    hooks,
73
 
    lock as _mod_lock,
74
 
    lockdir,
75
51
    memorytree,
76
52
    osutils,
77
 
    plugin as _mod_plugin,
78
 
    pyutils,
 
53
    progress,
79
54
    ui,
80
55
    urlutils,
81
 
    registry,
82
 
    symbol_versioning,
83
 
    trace,
84
 
    transport as _mod_transport,
85
 
    workingtree,
86
56
    )
 
57
import bzrlib.branch
 
58
import bzrlib.commands
 
59
import bzrlib.timestamp
 
60
import bzrlib.export
 
61
import bzrlib.inventory
 
62
import bzrlib.iterablefile
 
63
import bzrlib.lockdir
87
64
try:
88
65
    import bzrlib.lsprof
89
66
except ImportError:
90
67
    # lsprof not available
91
68
    pass
92
 
from bzrlib.smart import client, request
93
 
from bzrlib.transport import (
94
 
    memory,
95
 
    pathfilter,
96
 
    )
97
 
from bzrlib.tests import (
98
 
    test_server,
99
 
    TestUtil,
100
 
    treeshape,
101
 
    )
102
 
from bzrlib.ui import NullProgressView
103
 
from bzrlib.ui.text import TextUIFactory
104
 
 
105
 
# Mark this python module as being part of the implementation
106
 
# of unittest: this gives us better tracebacks where the last
107
 
# shown frame is the test code, not our assertXYZ.
108
 
__unittest = 1
109
 
 
110
 
default_transport = test_server.LocalURLServer
111
 
 
112
 
 
113
 
_unitialized_attr = object()
114
 
"""A sentinel needed to act as a default value in a method signature."""
115
 
 
116
 
 
117
 
# Subunit result codes, defined here to prevent a hard dependency on subunit.
118
 
SUBUNIT_SEEK_SET = 0
119
 
SUBUNIT_SEEK_CUR = 1
120
 
 
121
 
# These are intentionally brought into this namespace. That way plugins, etc
122
 
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
123
 
TestSuite = TestUtil.TestSuite
124
 
TestLoader = TestUtil.TestLoader
125
 
 
126
 
# Tests should run in a clean and clearly defined environment. The goal is to
127
 
# keep them isolated from the running environment as mush as possible. The test
128
 
# framework ensures the variables defined below are set (or deleted if the
129
 
# value is None) before a test is run and reset to their original value after
130
 
# the test is run. Generally if some code depends on an environment variable,
131
 
# the tests should start without this variable in the environment. There are a
132
 
# few exceptions but you shouldn't violate this rule lightly.
133
 
isolated_environ = {
134
 
    'BZR_HOME': None,
135
 
    'HOME': None,
136
 
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
137
 
    # tests do check our impls match APPDATA
138
 
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
139
 
    'VISUAL': None,
140
 
    'EDITOR': None,
141
 
    'BZR_EMAIL': None,
142
 
    'BZREMAIL': None, # may still be present in the environment
143
 
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
144
 
    'BZR_PROGRESS_BAR': None,
145
 
    'BZR_LOG': None,
146
 
    'BZR_PLUGIN_PATH': None,
147
 
    'BZR_DISABLE_PLUGINS': None,
148
 
    'BZR_PLUGINS_AT': None,
149
 
    'BZR_CONCURRENCY': None,
150
 
    # Make sure that any text ui tests are consistent regardless of
151
 
    # the environment the test case is run in; you may want tests that
152
 
    # test other combinations.  'dumb' is a reasonable guess for tests
153
 
    # going to a pipe or a StringIO.
154
 
    'TERM': 'dumb',
155
 
    'LINES': '25',
156
 
    'COLUMNS': '80',
157
 
    'BZR_COLUMNS': '80',
158
 
    # Disable SSH Agent
159
 
    'SSH_AUTH_SOCK': None,
160
 
    # Proxies
161
 
    'http_proxy': None,
162
 
    'HTTP_PROXY': None,
163
 
    'https_proxy': None,
164
 
    'HTTPS_PROXY': None,
165
 
    'no_proxy': None,
166
 
    'NO_PROXY': None,
167
 
    'all_proxy': None,
168
 
    'ALL_PROXY': None,
169
 
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
170
 
    # least. If you do (care), please update this comment
171
 
    # -- vila 20080401
172
 
    'ftp_proxy': None,
173
 
    'FTP_PROXY': None,
174
 
    'BZR_REMOTE_PATH': None,
175
 
    # Generally speaking, we don't want apport reporting on crashes in
176
 
    # the test envirnoment unless we're specifically testing apport,
177
 
    # so that it doesn't leak into the real system environment.  We
178
 
    # use an env var so it propagates to subprocesses.
179
 
    'APPORT_DISABLE': '1',
180
 
    }
181
 
 
182
 
 
183
 
def override_os_environ(test, env=None):
184
 
    """Modify os.environ keeping a copy.
185
 
    
186
 
    :param test: A test instance
187
 
 
188
 
    :param env: A dict containing variable definitions to be installed
189
 
    """
190
 
    if env is None:
191
 
        env = isolated_environ
192
 
    test._original_os_environ = dict([(var, value)
193
 
                                      for var, value in os.environ.iteritems()])
194
 
    for var, value in env.iteritems():
195
 
        osutils.set_or_unset_env(var, value)
196
 
        if var not in test._original_os_environ:
197
 
            # The var is new, add it with a value of None, so
198
 
            # restore_os_environ will delete it
199
 
            test._original_os_environ[var] = None
200
 
 
201
 
 
202
 
def restore_os_environ(test):
203
 
    """Restore os.environ to its original state.
204
 
 
205
 
    :param test: A test instance previously passed to override_os_environ.
206
 
    """
207
 
    for var, value in test._original_os_environ.iteritems():
208
 
        # Restore the original value (or delete it if the value has been set to
209
 
        # None in override_os_environ).
210
 
        osutils.set_or_unset_env(var, value)
211
 
 
212
 
 
213
 
class ExtendedTestResult(testtools.TextTestResult):
 
69
from bzrlib.merge import merge_inner
 
70
import bzrlib.merge3
 
71
import bzrlib.osutils
 
72
import bzrlib.plugin
 
73
from bzrlib.revision import common_ancestor
 
74
import bzrlib.store
 
75
from bzrlib import symbol_versioning
 
76
import bzrlib.trace
 
77
from bzrlib.transport import get_transport
 
78
import bzrlib.transport
 
79
from bzrlib.transport.local import LocalURLServer
 
80
from bzrlib.transport.memory import MemoryServer
 
81
from bzrlib.transport.readonly import ReadonlyServer
 
82
from bzrlib.trace import mutter, note
 
83
from bzrlib.tests import TestUtil
 
84
from bzrlib.tests.HttpServer import HttpServer
 
85
from bzrlib.tests.TestUtil import (
 
86
                          TestSuite,
 
87
                          TestLoader,
 
88
                          )
 
89
from bzrlib.tests.treeshape import build_tree_contents
 
90
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
91
 
 
92
default_transport = LocalURLServer
 
93
 
 
94
MODULES_TO_TEST = []
 
95
MODULES_TO_DOCTEST = [
 
96
                      bzrlib.timestamp,
 
97
                      bzrlib.errors,
 
98
                      bzrlib.export,
 
99
                      bzrlib.inventory,
 
100
                      bzrlib.iterablefile,
 
101
                      bzrlib.lockdir,
 
102
                      bzrlib.merge3,
 
103
                      bzrlib.option,
 
104
                      bzrlib.store,
 
105
                      ]
 
106
 
 
107
 
 
108
def packages_to_test():
 
109
    """Return a list of packages to test.
 
110
 
 
111
    The packages are not globally imported so that import failures are
 
112
    triggered when running selftest, not when importing the command.
 
113
    """
 
114
    import bzrlib.doc
 
115
    import bzrlib.tests.blackbox
 
116
    import bzrlib.tests.branch_implementations
 
117
    import bzrlib.tests.bzrdir_implementations
 
118
    import bzrlib.tests.interrepository_implementations
 
119
    import bzrlib.tests.interversionedfile_implementations
 
120
    import bzrlib.tests.intertree_implementations
 
121
    import bzrlib.tests.per_lock
 
122
    import bzrlib.tests.repository_implementations
 
123
    import bzrlib.tests.revisionstore_implementations
 
124
    import bzrlib.tests.tree_implementations
 
125
    import bzrlib.tests.workingtree_implementations
 
126
    return [
 
127
            bzrlib.doc,
 
128
            bzrlib.tests.blackbox,
 
129
            bzrlib.tests.branch_implementations,
 
130
            bzrlib.tests.bzrdir_implementations,
 
131
            bzrlib.tests.interrepository_implementations,
 
132
            bzrlib.tests.interversionedfile_implementations,
 
133
            bzrlib.tests.intertree_implementations,
 
134
            bzrlib.tests.per_lock,
 
135
            bzrlib.tests.repository_implementations,
 
136
            bzrlib.tests.revisionstore_implementations,
 
137
            bzrlib.tests.tree_implementations,
 
138
            bzrlib.tests.workingtree_implementations,
 
139
            ]
 
140
 
 
141
 
 
142
class ExtendedTestResult(unittest._TextTestResult):
214
143
    """Accepts, reports and accumulates the results of running tests.
215
144
 
216
 
    Compared to the unittest version this class adds support for
217
 
    profiling, benchmarking, stopping as soon as a test fails,  and
218
 
    skipping tests.  There are further-specialized subclasses for
219
 
    different types of display.
220
 
 
221
 
    When a test finishes, in whatever way, it calls one of the addSuccess,
222
 
    addFailure or addError classes.  These in turn may redirect to a more
223
 
    specific case for the special test results supported by our extended
224
 
    tests.
225
 
 
226
 
    Note that just one of these objects is fed the results from many tests.
 
145
    Compared to this unittest version this class adds support for profiling,
 
146
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
147
    There are further-specialized subclasses for different types of display.
227
148
    """
228
149
 
229
150
    stop_early = False
230
 
 
 
151
    
231
152
    def __init__(self, stream, descriptions, verbosity,
232
153
                 bench_history=None,
233
 
                 strict=False,
 
154
                 num_tests=None,
 
155
                 use_numbered_dirs=False,
234
156
                 ):
235
157
        """Construct new TestResult.
236
158
 
237
159
        :param bench_history: Optionally, a writable file object to accumulate
238
160
            benchmark results.
239
161
        """
240
 
        testtools.TextTestResult.__init__(self, stream)
 
162
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
241
163
        if bench_history is not None:
242
164
            from bzrlib.version import _get_bzr_source_tree
243
165
            src_tree = _get_bzr_source_tree()
254
176
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
255
177
        self._bench_history = bench_history
256
178
        self.ui = ui.ui_factory
257
 
        self.num_tests = 0
 
179
        self.num_tests = num_tests
258
180
        self.error_count = 0
259
181
        self.failure_count = 0
260
182
        self.known_failure_count = 0
261
183
        self.skip_count = 0
262
 
        self.not_applicable_count = 0
263
184
        self.unsupported = {}
264
185
        self.count = 0
 
186
        self.use_numbered_dirs = use_numbered_dirs
265
187
        self._overall_start_time = time.time()
266
 
        self._strict = strict
267
 
        self._first_thread_leaker_id = None
268
 
        self._tests_leaking_threads_count = 0
269
 
        self._traceback_from_test = None
270
 
 
271
 
    def stopTestRun(self):
272
 
        run = self.testsRun
273
 
        actionTaken = "Ran"
274
 
        stopTime = time.time()
275
 
        timeTaken = stopTime - self.startTime
276
 
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
277
 
        #                the parent class method is similar have to duplicate
278
 
        self._show_list('ERROR', self.errors)
279
 
        self._show_list('FAIL', self.failures)
280
 
        self.stream.write(self.sep2)
281
 
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
282
 
                            run, run != 1 and "s" or "", timeTaken))
283
 
        if not self.wasSuccessful():
284
 
            self.stream.write("FAILED (")
285
 
            failed, errored = map(len, (self.failures, self.errors))
286
 
            if failed:
287
 
                self.stream.write("failures=%d" % failed)
288
 
            if errored:
289
 
                if failed: self.stream.write(", ")
290
 
                self.stream.write("errors=%d" % errored)
291
 
            if self.known_failure_count:
292
 
                if failed or errored: self.stream.write(", ")
293
 
                self.stream.write("known_failure_count=%d" %
294
 
                    self.known_failure_count)
295
 
            self.stream.write(")\n")
296
 
        else:
297
 
            if self.known_failure_count:
298
 
                self.stream.write("OK (known_failures=%d)\n" %
299
 
                    self.known_failure_count)
300
 
            else:
301
 
                self.stream.write("OK\n")
302
 
        if self.skip_count > 0:
303
 
            skipped = self.skip_count
304
 
            self.stream.write('%d test%s skipped\n' %
305
 
                                (skipped, skipped != 1 and "s" or ""))
306
 
        if self.unsupported:
307
 
            for feature, count in sorted(self.unsupported.items()):
308
 
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
309
 
                    (feature, count))
310
 
        if self._strict:
311
 
            ok = self.wasStrictlySuccessful()
312
 
        else:
313
 
            ok = self.wasSuccessful()
314
 
        if self._first_thread_leaker_id:
315
 
            self.stream.write(
316
 
                '%s is leaking threads among %d leaking tests.\n' % (
317
 
                self._first_thread_leaker_id,
318
 
                self._tests_leaking_threads_count))
319
 
            # We don't report the main thread as an active one.
320
 
            self.stream.write(
321
 
                '%d non-main threads were left active in the end.\n'
322
 
                % (len(self._active_threads) - 1))
323
 
 
324
 
    def getDescription(self, test):
325
 
        return test.id()
326
 
 
327
 
    def _extractBenchmarkTime(self, testCase, details=None):
 
188
    
 
189
    def extractBenchmarkTime(self, testCase):
328
190
        """Add a benchmark time for the current test case."""
329
 
        if details and 'benchtime' in details:
330
 
            return float(''.join(details['benchtime'].iter_bytes()))
331
 
        return getattr(testCase, "_benchtime", None)
332
 
 
 
191
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
192
    
333
193
    def _elapsedTestTimeString(self):
334
194
        """Return a time string for the overall time the current test has taken."""
335
 
        return self._formatTime(self._delta_to_float(
336
 
            self._now() - self._start_datetime))
 
195
        return self._formatTime(time.time() - self._start_time)
337
196
 
338
 
    def _testTimeString(self, testCase):
339
 
        benchmark_time = self._extractBenchmarkTime(testCase)
340
 
        if benchmark_time is not None:
341
 
            return self._formatTime(benchmark_time) + "*"
 
197
    def _testTimeString(self):
 
198
        if self._benchmarkTime is not None:
 
199
            return "%s/%s" % (
 
200
                self._formatTime(self._benchmarkTime),
 
201
                self._elapsedTestTimeString())
342
202
        else:
343
 
            return self._elapsedTestTimeString()
 
203
            return "           %s" % self._elapsedTestTimeString()
344
204
 
345
205
    def _formatTime(self, seconds):
346
206
        """Format seconds as milliseconds with leading spaces."""
350
210
 
351
211
    def _shortened_test_description(self, test):
352
212
        what = test.id()
353
 
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
213
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
354
214
        return what
355
215
 
356
 
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
357
 
    #                multiple times in a row, because the handler is added for
358
 
    #                each test but the container list is shared between cases.
359
 
    #                See lp:498869 lp:625574 and lp:637725 for background.
360
 
    def _record_traceback_from_test(self, exc_info):
361
 
        """Store the traceback from passed exc_info tuple till"""
362
 
        self._traceback_from_test = exc_info[2]
363
 
 
364
216
    def startTest(self, test):
365
 
        super(ExtendedTestResult, self).startTest(test)
366
 
        if self.count == 0:
367
 
            self.startTests()
368
 
        self.count += 1
 
217
        unittest.TestResult.startTest(self, test)
369
218
        self.report_test_start(test)
370
219
        test.number = self.count
371
220
        self._recordTestStartTime()
372
 
        # Make testtools cases give us the real traceback on failure
373
 
        addOnException = getattr(test, "addOnException", None)
374
 
        if addOnException is not None:
375
 
            addOnException(self._record_traceback_from_test)
376
 
        # Only check for thread leaks on bzrlib derived test cases
377
 
        if isinstance(test, TestCase):
378
 
            test.addCleanup(self._check_leaked_threads, test)
379
 
 
380
 
    def startTests(self):
381
 
        self.report_tests_starting()
382
 
        self._active_threads = threading.enumerate()
383
 
 
384
 
    def stopTest(self, test):
385
 
        self._traceback_from_test = None
386
 
 
387
 
    def _check_leaked_threads(self, test):
388
 
        """See if any threads have leaked since last call
389
 
 
390
 
        A sample of live threads is stored in the _active_threads attribute,
391
 
        when this method runs it compares the current live threads and any not
392
 
        in the previous sample are treated as having leaked.
393
 
        """
394
 
        now_active_threads = set(threading.enumerate())
395
 
        threads_leaked = now_active_threads.difference(self._active_threads)
396
 
        if threads_leaked:
397
 
            self._report_thread_leak(test, threads_leaked, now_active_threads)
398
 
            self._tests_leaking_threads_count += 1
399
 
            if self._first_thread_leaker_id is None:
400
 
                self._first_thread_leaker_id = test.id()
401
 
            self._active_threads = now_active_threads
402
221
 
403
222
    def _recordTestStartTime(self):
404
223
        """Record that a test has started."""
405
 
        self._start_datetime = self._now()
 
224
        self._start_time = time.time()
 
225
 
 
226
    def _cleanupLogFile(self, test):
 
227
        # We can only do this if we have one of our TestCases, not if
 
228
        # we have a doctest.
 
229
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
230
        if setKeepLogfile is not None:
 
231
            setKeepLogfile()
406
232
 
407
233
    def addError(self, test, err):
408
 
        """Tell result that test finished with an error.
409
 
 
410
 
        Called from the TestCase run() method when the test
411
 
        fails with an unexpected error.
412
 
        """
413
 
        self._post_mortem(self._traceback_from_test)
414
 
        super(ExtendedTestResult, self).addError(test, err)
 
234
        self.extractBenchmarkTime(test)
 
235
        self._cleanupLogFile(test)
 
236
        if isinstance(err[1], TestSkipped):
 
237
            return self.addSkipped(test, err)
 
238
        elif isinstance(err[1], UnavailableFeature):
 
239
            return self.addNotSupported(test, err[1].args[0])
 
240
        unittest.TestResult.addError(self, test, err)
415
241
        self.error_count += 1
416
242
        self.report_error(test, err)
417
243
        if self.stop_early:
418
244
            self.stop()
419
245
 
420
246
    def addFailure(self, test, err):
421
 
        """Tell result that test failed.
422
 
 
423
 
        Called from the TestCase run() method when the test
424
 
        fails because e.g. an assert() method failed.
425
 
        """
426
 
        self._post_mortem(self._traceback_from_test)
427
 
        super(ExtendedTestResult, self).addFailure(test, err)
 
247
        self._cleanupLogFile(test)
 
248
        self.extractBenchmarkTime(test)
 
249
        if isinstance(err[1], KnownFailure):
 
250
            return self.addKnownFailure(test, err)
 
251
        unittest.TestResult.addFailure(self, test, err)
428
252
        self.failure_count += 1
429
253
        self.report_failure(test, err)
430
254
        if self.stop_early:
431
255
            self.stop()
432
256
 
433
 
    def addSuccess(self, test, details=None):
434
 
        """Tell result that test completed successfully.
435
 
 
436
 
        Called from the TestCase run()
437
 
        """
438
 
        if self._bench_history is not None:
439
 
            benchmark_time = self._extractBenchmarkTime(test, details)
440
 
            if benchmark_time is not None:
441
 
                self._bench_history.write("%s %s\n" % (
442
 
                    self._formatTime(benchmark_time),
443
 
                    test.id()))
444
 
        self.report_success(test)
445
 
        super(ExtendedTestResult, self).addSuccess(test)
446
 
        test._log_contents = ''
447
 
 
448
 
    def addExpectedFailure(self, test, err):
 
257
    def addKnownFailure(self, test, err):
449
258
        self.known_failure_count += 1
450
259
        self.report_known_failure(test, err)
451
260
 
452
261
    def addNotSupported(self, test, feature):
453
 
        """The test will not be run because of a missing feature.
454
 
        """
455
 
        # this can be called in two different ways: it may be that the
456
 
        # test started running, and then raised (through requireFeature)
457
 
        # UnavailableFeature.  Alternatively this method can be called
458
 
        # while probing for features before running the test code proper; in
459
 
        # that case we will see startTest and stopTest, but the test will
460
 
        # never actually run.
461
262
        self.unsupported.setdefault(str(feature), 0)
462
263
        self.unsupported[str(feature)] += 1
463
264
        self.report_unsupported(test, feature)
464
265
 
465
 
    def addSkip(self, test, reason):
466
 
        """A test has not run for 'reason'."""
467
 
        self.skip_count += 1
468
 
        self.report_skip(test, reason)
469
 
 
470
 
    def addNotApplicable(self, test, reason):
471
 
        self.not_applicable_count += 1
472
 
        self.report_not_applicable(test, reason)
473
 
 
474
 
    def _post_mortem(self, tb=None):
475
 
        """Start a PDB post mortem session."""
476
 
        if os.environ.get('BZR_TEST_PDB', None):
477
 
            import pdb
478
 
            pdb.post_mortem(tb)
479
 
 
480
 
    def progress(self, offset, whence):
481
 
        """The test is adjusting the count of tests to run."""
482
 
        if whence == SUBUNIT_SEEK_SET:
483
 
            self.num_tests = offset
484
 
        elif whence == SUBUNIT_SEEK_CUR:
485
 
            self.num_tests += offset
486
 
        else:
487
 
            raise errors.BzrError("Unknown whence %r" % whence)
488
 
 
489
 
    def report_tests_starting(self):
490
 
        """Display information before the test run begins"""
491
 
        if getattr(sys, 'frozen', None) is None:
492
 
            bzr_path = osutils.realpath(sys.argv[0])
493
 
        else:
494
 
            bzr_path = sys.executable
495
 
        self.stream.write(
496
 
            'bzr selftest: %s\n' % (bzr_path,))
497
 
        self.stream.write(
498
 
            '   %s\n' % (
499
 
                    bzrlib.__path__[0],))
500
 
        self.stream.write(
501
 
            '   bzr-%s python-%s %s\n' % (
502
 
                    bzrlib.version_string,
503
 
                    bzrlib._format_version_tuple(sys.version_info),
504
 
                    platform.platform(aliased=1),
505
 
                    ))
506
 
        self.stream.write('\n')
507
 
 
508
 
    def report_test_start(self, test):
509
 
        """Display information on the test just about to be run"""
510
 
 
511
 
    def _report_thread_leak(self, test, leaked_threads, active_threads):
512
 
        """Display information on a test that leaked one or more threads"""
513
 
        # GZ 2010-09-09: A leak summary reported separately from the general
514
 
        #                thread debugging would be nice. Tests under subunit
515
 
        #                need something not using stream, perhaps adding a
516
 
        #                testtools details object would be fitting.
517
 
        if 'threads' in selftest_debug_flags:
518
 
            self.stream.write('%s is leaking, active is now %d\n' %
519
 
                (test.id(), len(active_threads)))
520
 
 
521
 
    def startTestRun(self):
522
 
        self.startTime = time.time()
 
266
    def addSuccess(self, test):
 
267
        self.extractBenchmarkTime(test)
 
268
        if self._bench_history is not None:
 
269
            if self._benchmarkTime is not None:
 
270
                self._bench_history.write("%s %s\n" % (
 
271
                    self._formatTime(self._benchmarkTime),
 
272
                    test.id()))
 
273
        self.report_success(test)
 
274
        unittest.TestResult.addSuccess(self, test)
 
275
 
 
276
    def addSkipped(self, test, skip_excinfo):
 
277
        self.report_skip(test, skip_excinfo)
 
278
        # seems best to treat this as success from point-of-view of unittest
 
279
        # -- it actually does nothing so it barely matters :)
 
280
        try:
 
281
            test.tearDown()
 
282
        except KeyboardInterrupt:
 
283
            raise
 
284
        except:
 
285
            self.addError(test, test.__exc_info())
 
286
        else:
 
287
            unittest.TestResult.addSuccess(self, test)
 
288
 
 
289
    def printErrorList(self, flavour, errors):
 
290
        for test, err in errors:
 
291
            self.stream.writeln(self.separator1)
 
292
            self.stream.write("%s: " % flavour)
 
293
            if self.use_numbered_dirs:
 
294
                self.stream.write('#%d ' % test.number)
 
295
            self.stream.writeln(self.getDescription(test))
 
296
            if getattr(test, '_get_log', None) is not None:
 
297
                print >>self.stream
 
298
                print >>self.stream, \
 
299
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
300
                print >>self.stream, test._get_log()
 
301
                print >>self.stream, \
 
302
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
303
            self.stream.writeln(self.separator2)
 
304
            self.stream.writeln("%s" % err)
 
305
 
 
306
    def finished(self):
 
307
        pass
 
308
 
 
309
    def report_cleaning_up(self):
 
310
        pass
523
311
 
524
312
    def report_success(self, test):
525
313
        pass
526
314
 
527
 
    def wasStrictlySuccessful(self):
528
 
        if self.unsupported or self.known_failure_count:
529
 
            return False
530
 
        return self.wasSuccessful()
531
 
 
532
315
 
533
316
class TextTestResult(ExtendedTestResult):
534
317
    """Displays progress and results of tests in text form"""
535
318
 
536
319
    def __init__(self, stream, descriptions, verbosity,
537
320
                 bench_history=None,
 
321
                 num_tests=None,
538
322
                 pb=None,
539
 
                 strict=None,
 
323
                 use_numbered_dirs=False,
540
324
                 ):
541
325
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
542
 
            bench_history, strict)
543
 
        # We no longer pass them around, but just rely on the UIFactory stack
544
 
        # for state
545
 
        if pb is not None:
546
 
            warnings.warn("Passing pb to TextTestResult is deprecated")
547
 
        self.pb = self.ui.nested_progress_bar()
 
326
            bench_history, num_tests, use_numbered_dirs)
 
327
        if pb is None:
 
328
            self.pb = self.ui.nested_progress_bar()
 
329
            self._supplied_pb = False
 
330
        else:
 
331
            self.pb = pb
 
332
            self._supplied_pb = True
548
333
        self.pb.show_pct = False
549
334
        self.pb.show_spinner = False
550
335
        self.pb.show_eta = False,
551
336
        self.pb.show_count = False
552
337
        self.pb.show_bar = False
553
 
        self.pb.update_latency = 0
554
 
        self.pb.show_transport_activity = False
555
 
 
556
 
    def stopTestRun(self):
557
 
        # called when the tests that are going to run have run
558
 
        self.pb.clear()
559
 
        self.pb.finished()
560
 
        super(TextTestResult, self).stopTestRun()
561
 
 
562
 
    def report_tests_starting(self):
563
 
        super(TextTestResult, self).report_tests_starting()
564
 
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
338
 
 
339
    def report_starting(self):
 
340
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
565
341
 
566
342
    def _progress_prefix_text(self):
567
 
        # the longer this text, the less space we have to show the test
568
 
        # name...
569
 
        a = '[%d' % self.count              # total that have been run
570
 
        # tests skipped as known not to be relevant are not important enough
571
 
        # to show here
572
 
        ## if self.skip_count:
573
 
        ##     a += ', %d skip' % self.skip_count
574
 
        ## if self.known_failure_count:
575
 
        ##     a += '+%dX' % self.known_failure_count
576
 
        if self.num_tests:
 
343
        a = '[%d' % self.count
 
344
        if self.num_tests is not None:
577
345
            a +='/%d' % self.num_tests
578
 
        a += ' in '
579
 
        runtime = time.time() - self._overall_start_time
580
 
        if runtime >= 60:
581
 
            a += '%dm%ds' % (runtime / 60, runtime % 60)
582
 
        else:
583
 
            a += '%ds' % runtime
584
 
        total_fail_count = self.error_count + self.failure_count
585
 
        if total_fail_count:
586
 
            a += ', %d failed' % total_fail_count
587
 
        # if self.unsupported:
588
 
        #     a += ', %d missing' % len(self.unsupported)
 
346
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
347
        if self.error_count:
 
348
            a += ', %d errors' % self.error_count
 
349
        if self.failure_count:
 
350
            a += ', %d failed' % self.failure_count
 
351
        if self.known_failure_count:
 
352
            a += ', %d known failures' % self.known_failure_count
 
353
        if self.skip_count:
 
354
            a += ', %d skipped' % self.skip_count
 
355
        if self.unsupported:
 
356
            a += ', %d missing features' % len(self.unsupported)
589
357
        a += ']'
590
358
        return a
591
359
 
592
360
    def report_test_start(self, test):
 
361
        self.count += 1
593
362
        self.pb.update(
594
363
                self._progress_prefix_text()
595
 
                + ' '
 
364
                + ' ' 
596
365
                + self._shortened_test_description(test))
597
366
 
598
367
    def _test_description(self, test):
599
 
        return self._shortened_test_description(test)
 
368
        if self.use_numbered_dirs:
 
369
            return '#%d %s' % (self.count,
 
370
                               self._shortened_test_description(test))
 
371
        else:
 
372
            return self._shortened_test_description(test)
600
373
 
601
374
    def report_error(self, test, err):
602
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
375
        self.pb.note('ERROR: %s\n    %s\n', 
603
376
            self._test_description(test),
604
377
            err[1],
605
 
            ))
 
378
            )
606
379
 
607
380
    def report_failure(self, test, err):
608
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
381
        self.pb.note('FAIL: %s\n    %s\n', 
609
382
            self._test_description(test),
610
383
            err[1],
611
 
            ))
 
384
            )
612
385
 
613
386
    def report_known_failure(self, test, err):
614
 
        pass
615
 
 
616
 
    def report_skip(self, test, reason):
617
 
        pass
618
 
 
619
 
    def report_not_applicable(self, test, reason):
620
 
        pass
 
387
        self.pb.note('XFAIL: %s\n%s\n',
 
388
            self._test_description(test), err[1])
 
389
 
 
390
    def report_skip(self, test, skip_excinfo):
 
391
        self.skip_count += 1
 
392
        if False:
 
393
            # at the moment these are mostly not things we can fix
 
394
            # and so they just produce stipple; use the verbose reporter
 
395
            # to see them.
 
396
            if False:
 
397
                # show test and reason for skip
 
398
                self.pb.note('SKIP: %s\n    %s\n', 
 
399
                    self._shortened_test_description(test),
 
400
                    skip_excinfo[1])
 
401
            else:
 
402
                # since the class name was left behind in the still-visible
 
403
                # progress bar...
 
404
                self.pb.note('SKIP: %s', skip_excinfo[1])
621
405
 
622
406
    def report_unsupported(self, test, feature):
623
407
        """test cannot be run because feature is missing."""
 
408
                  
 
409
    def report_cleaning_up(self):
 
410
        self.pb.update('cleaning up...')
 
411
 
 
412
    def finished(self):
 
413
        if not self._supplied_pb:
 
414
            self.pb.finished()
624
415
 
625
416
 
626
417
class VerboseTestResult(ExtendedTestResult):
634
425
            result = a_string
635
426
        return result.ljust(final_width)
636
427
 
637
 
    def report_tests_starting(self):
 
428
    def report_starting(self):
638
429
        self.stream.write('running %d tests...\n' % self.num_tests)
639
 
        super(VerboseTestResult, self).report_tests_starting()
640
430
 
641
431
    def report_test_start(self, test):
 
432
        self.count += 1
642
433
        name = self._shortened_test_description(test)
643
 
        width = osutils.terminal_width()
644
 
        if width is not None:
645
 
            # width needs space for 6 char status, plus 1 for slash, plus an
646
 
            # 11-char time string, plus a trailing blank
647
 
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
648
 
            # space
649
 
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
434
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
435
        # numbers, plus a trailing blank
 
436
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
437
        if self.use_numbered_dirs:
 
438
            self.stream.write('%5d ' % self.count)
 
439
            self.stream.write(self._ellipsize_to_right(name,
 
440
                                osutils.terminal_width()-36))
650
441
        else:
651
 
            self.stream.write(name)
 
442
            self.stream.write(self._ellipsize_to_right(name,
 
443
                                osutils.terminal_width()-30))
652
444
        self.stream.flush()
653
445
 
654
446
    def _error_summary(self, err):
655
447
        indent = ' ' * 4
 
448
        if self.use_numbered_dirs:
 
449
            indent += ' ' * 6
656
450
        return '%s%s' % (indent, err[1])
657
451
 
658
452
    def report_error(self, test, err):
659
 
        self.stream.write('ERROR %s\n%s\n'
660
 
                % (self._testTimeString(test),
 
453
        self.stream.writeln('ERROR %s\n%s'
 
454
                % (self._testTimeString(),
661
455
                   self._error_summary(err)))
662
456
 
663
457
    def report_failure(self, test, err):
664
 
        self.stream.write(' FAIL %s\n%s\n'
665
 
                % (self._testTimeString(test),
 
458
        self.stream.writeln(' FAIL %s\n%s'
 
459
                % (self._testTimeString(),
666
460
                   self._error_summary(err)))
667
461
 
668
462
    def report_known_failure(self, test, err):
669
 
        self.stream.write('XFAIL %s\n%s\n'
670
 
                % (self._testTimeString(test),
 
463
        self.stream.writeln('XFAIL %s\n%s'
 
464
                % (self._testTimeString(),
671
465
                   self._error_summary(err)))
672
466
 
673
467
    def report_success(self, test):
674
 
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
468
        self.stream.writeln('   OK %s' % self._testTimeString())
675
469
        for bench_called, stats in getattr(test, '_benchcalls', []):
676
 
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
470
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
677
471
            stats.pprint(file=self.stream)
678
472
        # flush the stream so that we get smooth output. This verbose mode is
679
473
        # used to show the output in PQM.
680
474
        self.stream.flush()
681
475
 
682
 
    def report_skip(self, test, reason):
683
 
        self.stream.write(' SKIP %s\n%s\n'
684
 
                % (self._testTimeString(test), reason))
685
 
 
686
 
    def report_not_applicable(self, test, reason):
687
 
        self.stream.write('  N/A %s\n    %s\n'
688
 
                % (self._testTimeString(test), reason))
 
476
    def report_skip(self, test, skip_excinfo):
 
477
        self.skip_count += 1
 
478
        self.stream.writeln(' SKIP %s\n%s'
 
479
                % (self._testTimeString(),
 
480
                   self._error_summary(skip_excinfo)))
689
481
 
690
482
    def report_unsupported(self, test, feature):
691
483
        """test cannot be run because feature is missing."""
692
 
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
693
 
                %(self._testTimeString(test), feature))
 
484
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
485
                %(self._testTimeString(), feature))
 
486
                  
694
487
 
695
488
 
696
489
class TextTestRunner(object):
700
493
                 stream=sys.stderr,
701
494
                 descriptions=0,
702
495
                 verbosity=1,
 
496
                 keep_output=False,
703
497
                 bench_history=None,
704
 
                 strict=False,
705
 
                 result_decorators=None,
 
498
                 use_numbered_dirs=False,
706
499
                 ):
707
 
        """Create a TextTestRunner.
708
 
 
709
 
        :param result_decorators: An optional list of decorators to apply
710
 
            to the result object being used by the runner. Decorators are
711
 
            applied left to right - the first element in the list is the 
712
 
            innermost decorator.
713
 
        """
714
 
        # stream may know claim to know to write unicode strings, but in older
715
 
        # pythons this goes sufficiently wrong that it is a bad idea. (
716
 
        # specifically a built in file with encoding 'UTF-8' will still try
717
 
        # to encode using ascii.
718
 
        new_encoding = osutils.get_terminal_encoding()
719
 
        codec = codecs.lookup(new_encoding)
720
 
        if type(codec) is tuple:
721
 
            # Python 2.4
722
 
            encode = codec[0]
723
 
        else:
724
 
            encode = codec.encode
725
 
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
726
 
        #                so should swap to the plain codecs.StreamWriter
727
 
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
728
 
            "backslashreplace")
729
 
        stream.encoding = new_encoding
730
 
        self.stream = stream
 
500
        self.stream = unittest._WritelnDecorator(stream)
731
501
        self.descriptions = descriptions
732
502
        self.verbosity = verbosity
 
503
        self.keep_output = keep_output
733
504
        self._bench_history = bench_history
734
 
        self._strict = strict
735
 
        self._result_decorators = result_decorators or []
 
505
        self.use_numbered_dirs = use_numbered_dirs
736
506
 
737
507
    def run(self, test):
738
508
        "Run the given test case or test suite."
 
509
        startTime = time.time()
739
510
        if self.verbosity == 1:
740
511
            result_class = TextTestResult
741
512
        elif self.verbosity >= 2:
742
513
            result_class = VerboseTestResult
743
 
        original_result = result_class(self.stream,
 
514
        result = result_class(self.stream,
744
515
                              self.descriptions,
745
516
                              self.verbosity,
746
517
                              bench_history=self._bench_history,
747
 
                              strict=self._strict,
 
518
                              num_tests=test.countTestCases(),
 
519
                              use_numbered_dirs=self.use_numbered_dirs,
748
520
                              )
749
 
        # Signal to result objects that look at stop early policy to stop,
750
 
        original_result.stop_early = self.stop_on_failure
751
 
        result = original_result
752
 
        for decorator in self._result_decorators:
753
 
            result = decorator(result)
754
 
            result.stop_early = self.stop_on_failure
755
 
        result.startTestRun()
756
 
        try:
757
 
            test.run(result)
758
 
        finally:
759
 
            result.stopTestRun()
760
 
        # higher level code uses our extended protocol to determine
761
 
        # what exit code to give.
762
 
        return original_result
 
521
        result.stop_early = self.stop_on_failure
 
522
        result.report_starting()
 
523
        test.run(result)
 
524
        stopTime = time.time()
 
525
        timeTaken = stopTime - startTime
 
526
        result.printErrors()
 
527
        self.stream.writeln(result.separator2)
 
528
        run = result.testsRun
 
529
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
530
                            (run, run != 1 and "s" or "", timeTaken))
 
531
        self.stream.writeln()
 
532
        if not result.wasSuccessful():
 
533
            self.stream.write("FAILED (")
 
534
            failed, errored = map(len, (result.failures, result.errors))
 
535
            if failed:
 
536
                self.stream.write("failures=%d" % failed)
 
537
            if errored:
 
538
                if failed: self.stream.write(", ")
 
539
                self.stream.write("errors=%d" % errored)
 
540
            if result.known_failure_count:
 
541
                if failed or errored: self.stream.write(", ")
 
542
                self.stream.write("known_failure_count=%d" %
 
543
                    result.known_failure_count)
 
544
            self.stream.writeln(")")
 
545
        else:
 
546
            if result.known_failure_count:
 
547
                self.stream.writeln("OK (known_failures=%d)" %
 
548
                    result.known_failure_count)
 
549
            else:
 
550
                self.stream.writeln("OK")
 
551
        if result.skip_count > 0:
 
552
            skipped = result.skip_count
 
553
            self.stream.writeln('%d test%s skipped' %
 
554
                                (skipped, skipped != 1 and "s" or ""))
 
555
        if result.unsupported:
 
556
            for feature, count in sorted(result.unsupported.items()):
 
557
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
558
                    (feature, count))
 
559
        result.report_cleaning_up()
 
560
        # This is still a little bogus, 
 
561
        # but only a little. Folk not using our testrunner will
 
562
        # have to delete their temp directories themselves.
 
563
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
564
        if result.wasSuccessful() or not self.keep_output:
 
565
            if test_root is not None:
 
566
                # If LANG=C we probably have created some bogus paths
 
567
                # which rmtree(unicode) will fail to delete
 
568
                # so make sure we are using rmtree(str) to delete everything
 
569
                # except on win32, where rmtree(str) will fail
 
570
                # since it doesn't have the property of byte-stream paths
 
571
                # (they are either ascii or mbcs)
 
572
                if sys.platform == 'win32':
 
573
                    # make sure we are using the unicode win32 api
 
574
                    test_root = unicode(test_root)
 
575
                else:
 
576
                    test_root = test_root.encode(
 
577
                        sys.getfilesystemencoding())
 
578
                _rmtree_temp_dir(test_root)
 
579
        else:
 
580
            note("Failed tests working directories are in '%s'\n", test_root)
 
581
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
582
        result.finished()
 
583
        return result
763
584
 
764
585
 
765
586
def iter_suite_tests(suite):
766
587
    """Return all tests in a suite, recursing through nested suites"""
767
 
    if isinstance(suite, unittest.TestCase):
768
 
        yield suite
769
 
    elif isinstance(suite, unittest.TestSuite):
770
 
        for item in suite:
 
588
    for item in suite._tests:
 
589
        if isinstance(item, unittest.TestCase):
 
590
            yield item
 
591
        elif isinstance(item, unittest.TestSuite):
771
592
            for r in iter_suite_tests(item):
772
593
                yield r
773
 
    else:
774
 
        raise Exception('unknown type %r for object %r'
775
 
                        % (type(suite), suite))
776
 
 
777
 
 
778
 
TestSkipped = testtools.testcase.TestSkipped
779
 
 
780
 
 
781
 
class TestNotApplicable(TestSkipped):
782
 
    """A test is not applicable to the situation where it was run.
783
 
 
784
 
    This is only normally raised by parameterized tests, if they find that
785
 
    the instance they're constructed upon does not support one aspect
786
 
    of its interface.
 
594
        else:
 
595
            raise Exception('unknown object %r inside test suite %r'
 
596
                            % (item, suite))
 
597
 
 
598
 
 
599
class TestSkipped(Exception):
 
600
    """Indicates that a test was intentionally skipped, rather than failing."""
 
601
 
 
602
 
 
603
class KnownFailure(AssertionError):
 
604
    """Indicates that a test failed in a precisely expected manner.
 
605
 
 
606
    Such failures dont block the whole test suite from passing because they are
 
607
    indicators of partially completed code or of future work. We have an
 
608
    explicit error for them so that we can ensure that they are always visible:
 
609
    KnownFailures are always shown in the output of bzr selftest.
787
610
    """
788
611
 
789
612
 
790
 
# traceback._some_str fails to format exceptions that have the default
791
 
# __str__ which does an implicit ascii conversion. However, repr() on those
792
 
# objects works, for all that its not quite what the doctor may have ordered.
793
 
def _clever_some_str(value):
794
 
    try:
795
 
        return str(value)
796
 
    except:
797
 
        try:
798
 
            return repr(value).replace('\\n', '\n')
799
 
        except:
800
 
            return '<unprintable %s object>' % type(value).__name__
801
 
 
802
 
traceback._some_str = _clever_some_str
803
 
 
804
 
 
805
 
# deprecated - use self.knownFailure(), or self.expectFailure.
806
 
KnownFailure = testtools.testcase._ExpectedFailure
807
 
 
808
 
 
809
613
class UnavailableFeature(Exception):
810
614
    """A feature required for this test was not available.
811
615
 
812
 
    This can be considered a specialised form of SkippedTest.
813
 
 
814
616
    The feature should be used to construct the exception.
815
617
    """
816
618
 
817
619
 
 
620
class CommandFailed(Exception):
 
621
    pass
 
622
 
 
623
 
818
624
class StringIOWrapper(object):
819
625
    """A wrapper around cStringIO which just adds an encoding attribute.
820
 
 
 
626
    
821
627
    Internally we can check sys.stdout to see what the output encoding
822
628
    should be. However, cStringIO has no encoding attribute that we can
823
629
    set. So we wrap it instead.
841
647
            return setattr(self._cstring, name, val)
842
648
 
843
649
 
844
 
class TestUIFactory(TextUIFactory):
 
650
class TestUIFactory(ui.CLIUIFactory):
845
651
    """A UI Factory for testing.
846
652
 
847
653
    Hide the progress bar but emit note()s.
848
654
    Redirect stdin.
849
655
    Allows get_password to be tested without real tty attached.
850
 
 
851
 
    See also CannedInputUIFactory which lets you provide programmatic input in
852
 
    a structured way.
853
656
    """
854
 
    # TODO: Capture progress events at the model level and allow them to be
855
 
    # observed by tests that care.
856
 
    #
857
 
    # XXX: Should probably unify more with CannedInputUIFactory or a
858
 
    # particular configuration of TextUIFactory, or otherwise have a clearer
859
 
    # idea of how they're supposed to be different.
860
 
    # See https://bugs.launchpad.net/bzr/+bug/408213
861
657
 
862
 
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
658
    def __init__(self,
 
659
                 stdout=None,
 
660
                 stderr=None,
 
661
                 stdin=None):
 
662
        super(TestUIFactory, self).__init__()
863
663
        if stdin is not None:
864
664
            # We use a StringIOWrapper to be able to test various
865
665
            # encodings, but the user is still responsible to
866
666
            # encode the string and to set the encoding attribute
867
667
            # of StringIOWrapper.
868
 
            stdin = StringIOWrapper(stdin)
869
 
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
870
 
 
871
 
    def get_non_echoed_password(self):
 
668
            self.stdin = StringIOWrapper(stdin)
 
669
        if stdout is None:
 
670
            self.stdout = sys.stdout
 
671
        else:
 
672
            self.stdout = stdout
 
673
        if stderr is None:
 
674
            self.stderr = sys.stderr
 
675
        else:
 
676
            self.stderr = stderr
 
677
 
 
678
    def clear(self):
 
679
        """See progress.ProgressBar.clear()."""
 
680
 
 
681
    def clear_term(self):
 
682
        """See progress.ProgressBar.clear_term()."""
 
683
 
 
684
    def clear_term(self):
 
685
        """See progress.ProgressBar.clear_term()."""
 
686
 
 
687
    def finished(self):
 
688
        """See progress.ProgressBar.finished()."""
 
689
 
 
690
    def note(self, fmt_string, *args, **kwargs):
 
691
        """See progress.ProgressBar.note()."""
 
692
        self.stdout.write((fmt_string + "\n") % args)
 
693
 
 
694
    def progress_bar(self):
 
695
        return self
 
696
 
 
697
    def nested_progress_bar(self):
 
698
        return self
 
699
 
 
700
    def update(self, message, count=None, total=None):
 
701
        """See progress.ProgressBar.update()."""
 
702
 
 
703
    def get_non_echoed_password(self, prompt):
872
704
        """Get password from stdin without trying to handle the echo mode"""
 
705
        if prompt:
 
706
            self.stdout.write(prompt)
873
707
        password = self.stdin.readline()
874
708
        if not password:
875
709
            raise EOFError
877
711
            password = password[:-1]
878
712
        return password
879
713
 
880
 
    def make_progress_view(self):
881
 
        return NullProgressView()
882
 
 
883
 
 
884
 
def isolated_doctest_setUp(test):
885
 
    override_os_environ(test)
886
 
 
887
 
 
888
 
def isolated_doctest_tearDown(test):
889
 
    restore_os_environ(test)
890
 
 
891
 
 
892
 
def IsolatedDocTestSuite(*args, **kwargs):
893
 
    """Overrides doctest.DocTestSuite to handle isolation.
894
 
 
895
 
    The method is really a factory and users are expected to use it as such.
896
 
    """
897
 
    
898
 
    kwargs['setUp'] = isolated_doctest_setUp
899
 
    kwargs['tearDown'] = isolated_doctest_tearDown
900
 
    return doctest.DocTestSuite(*args, **kwargs)
901
 
 
902
 
 
903
 
class TestCase(testtools.TestCase):
 
714
 
 
715
class TestCase(unittest.TestCase):
904
716
    """Base class for bzr unit tests.
905
 
 
906
 
    Tests that need access to disk resources should subclass
 
717
    
 
718
    Tests that need access to disk resources should subclass 
907
719
    TestCaseInTempDir not TestCase.
908
720
 
909
721
    Error and debug log messages are redirected from their usual
911
723
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
912
724
    so that it can also capture file IO.  When the test completes this file
913
725
    is read into memory and removed from disk.
914
 
 
 
726
       
915
727
    There are also convenience functions to invoke bzr's command-line
916
728
    routine, and to build and check bzr trees.
917
 
 
 
729
   
918
730
    In addition to the usual method of overriding tearDown(), this class also
919
 
    allows subclasses to register cleanup functions via addCleanup, which are
 
731
    allows subclasses to register functions into the _cleanups list, which is
920
732
    run in order as the object is torn down.  It's less likely this will be
921
733
    accidentally overlooked.
922
734
    """
923
735
 
924
 
    _log_file = None
 
736
    _log_file_name = None
 
737
    _log_contents = ''
 
738
    _keep_log_file = False
925
739
    # record lsprof data when performing benchmark calls.
926
740
    _gather_lsprof_in_benchmarks = False
927
741
 
928
742
    def __init__(self, methodName='testMethod'):
929
743
        super(TestCase, self).__init__(methodName)
930
 
        self._directory_isolation = True
931
 
        self.exception_handlers.insert(0,
932
 
            (UnavailableFeature, self._do_unsupported_or_skip))
933
 
        self.exception_handlers.insert(0,
934
 
            (TestNotApplicable, self._do_not_applicable))
 
744
        self._cleanups = []
935
745
 
936
746
    def setUp(self):
937
 
        super(TestCase, self).setUp()
938
 
        for feature in getattr(self, '_test_needs_features', []):
939
 
            self.requireFeature(feature)
940
 
        self._log_contents = None
941
 
        self.addDetail("log", content.Content(content.ContentType("text",
942
 
            "plain", {"charset": "utf8"}),
943
 
            lambda:[self._get_log(keep_log_file=True)]))
 
747
        unittest.TestCase.setUp(self)
944
748
        self._cleanEnvironment()
 
749
        bzrlib.trace.disable_default_logging()
945
750
        self._silenceUI()
946
751
        self._startLogFile()
947
752
        self._benchcalls = []
948
753
        self._benchtime = None
949
754
        self._clear_hooks()
950
 
        self._track_transports()
951
 
        self._track_locks()
952
 
        self._clear_debug_flags()
953
 
        # Isolate global verbosity level, to make sure it's reproducible
954
 
        # between tests.  We should get rid of this altogether: bug 656694. --
955
 
        # mbp 20101008
956
 
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
957
 
 
958
 
    def debug(self):
959
 
        # debug a frame up.
960
 
        import pdb
961
 
        pdb.Pdb().set_trace(sys._getframe().f_back)
962
 
 
963
 
    def discardDetail(self, name):
964
 
        """Extend the addDetail, getDetails api so we can remove a detail.
965
 
 
966
 
        eg. bzr always adds the 'log' detail at startup, but we don't want to
967
 
        include it for skipped, xfail, etc tests.
968
 
 
969
 
        It is safe to call this for a detail that doesn't exist, in case this
970
 
        gets called multiple times.
971
 
        """
972
 
        # We cheat. details is stored in __details which means we shouldn't
973
 
        # touch it. but getDetails() returns the dict directly, so we can
974
 
        # mutate it.
975
 
        details = self.getDetails()
976
 
        if name in details:
977
 
            del details[name]
978
 
 
979
 
    def _clear_debug_flags(self):
980
 
        """Prevent externally set debug flags affecting tests.
981
 
 
982
 
        Tests that want to use debug flags can just set them in the
983
 
        debug_flags set during setup/teardown.
984
 
        """
985
 
        # Start with a copy of the current debug flags we can safely modify.
986
 
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
987
 
        if 'allow_debug' not in selftest_debug_flags:
988
 
            debug.debug_flags.clear()
989
 
        if 'disable_lock_checks' not in selftest_debug_flags:
990
 
            debug.debug_flags.add('strict_locks')
991
755
 
992
756
    def _clear_hooks(self):
993
757
        # prevent hooks affecting tests
994
 
        known_hooks = hooks.known_hooks
995
 
        self._preserved_hooks = {}
996
 
        for key, (parent, name) in known_hooks.iter_parent_objects():
997
 
            current_hooks = getattr(parent, name)
998
 
            self._preserved_hooks[parent] = (name, current_hooks)
 
758
        import bzrlib.branch
 
759
        import bzrlib.smart.server
 
760
        self._preserved_hooks = {
 
761
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
762
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
763
            }
999
764
        self.addCleanup(self._restoreHooks)
1000
 
        for key, (parent, name) in known_hooks.iter_parent_objects():
1001
 
            factory = known_hooks.get(key)
1002
 
            setattr(parent, name, factory())
1003
 
        # this hook should always be installed
1004
 
        request._install_hook()
1005
 
 
1006
 
    def disable_directory_isolation(self):
1007
 
        """Turn off directory isolation checks."""
1008
 
        self._directory_isolation = False
1009
 
 
1010
 
    def enable_directory_isolation(self):
1011
 
        """Enable directory isolation checks."""
1012
 
        self._directory_isolation = True
 
765
        # this list of hooks must be kept in sync with the defaults
 
766
        # in branch.py
 
767
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
768
        bzrlib.smart.server.SmartTCPServer.hooks = \
 
769
            bzrlib.smart.server.SmartServerHooks()
1013
770
 
1014
771
    def _silenceUI(self):
1015
772
        """Turn off UI for duration of test"""
1016
773
        # by default the UI is off; tests can turn it on if they want it.
1017
 
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1018
 
 
1019
 
    def _check_locks(self):
1020
 
        """Check that all lock take/release actions have been paired."""
1021
 
        # We always check for mismatched locks. If a mismatch is found, we
1022
 
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
1023
 
        # case we just print a warning.
1024
 
        # unhook:
1025
 
        acquired_locks = [lock for action, lock in self._lock_actions
1026
 
                          if action == 'acquired']
1027
 
        released_locks = [lock for action, lock in self._lock_actions
1028
 
                          if action == 'released']
1029
 
        broken_locks = [lock for action, lock in self._lock_actions
1030
 
                        if action == 'broken']
1031
 
        # trivially, given the tests for lock acquistion and release, if we
1032
 
        # have as many in each list, it should be ok. Some lock tests also
1033
 
        # break some locks on purpose and should be taken into account by
1034
 
        # considering that breaking a lock is just a dirty way of releasing it.
1035
 
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1036
 
            message = ('Different number of acquired and '
1037
 
                       'released or broken locks. (%s, %s + %s)' %
1038
 
                       (acquired_locks, released_locks, broken_locks))
1039
 
            if not self._lock_check_thorough:
1040
 
                # Rather than fail, just warn
1041
 
                print "Broken test %s: %s" % (self, message)
1042
 
                return
1043
 
            self.fail(message)
1044
 
 
1045
 
    def _track_locks(self):
1046
 
        """Track lock activity during tests."""
1047
 
        self._lock_actions = []
1048
 
        if 'disable_lock_checks' in selftest_debug_flags:
1049
 
            self._lock_check_thorough = False
1050
 
        else:
1051
 
            self._lock_check_thorough = True
1052
 
 
1053
 
        self.addCleanup(self._check_locks)
1054
 
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1055
 
                                                self._lock_acquired, None)
1056
 
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
1057
 
                                                self._lock_released, None)
1058
 
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
1059
 
                                                self._lock_broken, None)
1060
 
 
1061
 
    def _lock_acquired(self, result):
1062
 
        self._lock_actions.append(('acquired', result))
1063
 
 
1064
 
    def _lock_released(self, result):
1065
 
        self._lock_actions.append(('released', result))
1066
 
 
1067
 
    def _lock_broken(self, result):
1068
 
        self._lock_actions.append(('broken', result))
1069
 
 
1070
 
    def permit_dir(self, name):
1071
 
        """Permit a directory to be used by this test. See permit_url."""
1072
 
        name_transport = _mod_transport.get_transport(name)
1073
 
        self.permit_url(name)
1074
 
        self.permit_url(name_transport.base)
1075
 
 
1076
 
    def permit_url(self, url):
1077
 
        """Declare that url is an ok url to use in this test.
1078
 
        
1079
 
        Do this for memory transports, temporary test directory etc.
1080
 
        
1081
 
        Do not do this for the current working directory, /tmp, or any other
1082
 
        preexisting non isolated url.
1083
 
        """
1084
 
        if not url.endswith('/'):
1085
 
            url += '/'
1086
 
        self._bzr_selftest_roots.append(url)
1087
 
 
1088
 
    def permit_source_tree_branch_repo(self):
1089
 
        """Permit the source tree bzr is running from to be opened.
1090
 
 
1091
 
        Some code such as bzrlib.version attempts to read from the bzr branch
1092
 
        that bzr is executing from (if any). This method permits that directory
1093
 
        to be used in the test suite.
1094
 
        """
1095
 
        path = self.get_source_path()
1096
 
        self.record_directory_isolation()
1097
 
        try:
1098
 
            try:
1099
 
                workingtree.WorkingTree.open(path)
1100
 
            except (errors.NotBranchError, errors.NoWorkingTree):
1101
 
                raise TestSkipped('Needs a working tree of bzr sources')
1102
 
        finally:
1103
 
            self.enable_directory_isolation()
1104
 
 
1105
 
    def _preopen_isolate_transport(self, transport):
1106
 
        """Check that all transport openings are done in the test work area."""
1107
 
        while isinstance(transport, pathfilter.PathFilteringTransport):
1108
 
            # Unwrap pathfiltered transports
1109
 
            transport = transport.server.backing_transport.clone(
1110
 
                transport._filter('.'))
1111
 
        url = transport.base
1112
 
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
1113
 
        # urls it is given by prepending readonly+. This is appropriate as the
1114
 
        # client shouldn't know that the server is readonly (or not readonly).
1115
 
        # We could register all servers twice, with readonly+ prepending, but
1116
 
        # that makes for a long list; this is about the same but easier to
1117
 
        # read.
1118
 
        if url.startswith('readonly+'):
1119
 
            url = url[len('readonly+'):]
1120
 
        self._preopen_isolate_url(url)
1121
 
 
1122
 
    def _preopen_isolate_url(self, url):
1123
 
        if not self._directory_isolation:
1124
 
            return
1125
 
        if self._directory_isolation == 'record':
1126
 
            self._bzr_selftest_roots.append(url)
1127
 
            return
1128
 
        # This prevents all transports, including e.g. sftp ones backed on disk
1129
 
        # from working unless they are explicitly granted permission. We then
1130
 
        # depend on the code that sets up test transports to check that they are
1131
 
        # appropriately isolated and enable their use by calling
1132
 
        # self.permit_transport()
1133
 
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1134
 
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
1135
 
                % (url, self._bzr_selftest_roots))
1136
 
 
1137
 
    def record_directory_isolation(self):
1138
 
        """Gather accessed directories to permit later access.
1139
 
        
1140
 
        This is used for tests that access the branch bzr is running from.
1141
 
        """
1142
 
        self._directory_isolation = "record"
1143
 
 
1144
 
    def start_server(self, transport_server, backing_server=None):
1145
 
        """Start transport_server for this test.
1146
 
 
1147
 
        This starts the server, registers a cleanup for it and permits the
1148
 
        server's urls to be used.
1149
 
        """
1150
 
        if backing_server is None:
1151
 
            transport_server.start_server()
1152
 
        else:
1153
 
            transport_server.start_server(backing_server)
1154
 
        self.addCleanup(transport_server.stop_server)
1155
 
        # Obtain a real transport because if the server supplies a password, it
1156
 
        # will be hidden from the base on the client side.
1157
 
        t = _mod_transport.get_transport(transport_server.get_url())
1158
 
        # Some transport servers effectively chroot the backing transport;
1159
 
        # others like SFTPServer don't - users of the transport can walk up the
1160
 
        # transport to read the entire backing transport. This wouldn't matter
1161
 
        # except that the workdir tests are given - and that they expect the
1162
 
        # server's url to point at - is one directory under the safety net. So
1163
 
        # Branch operations into the transport will attempt to walk up one
1164
 
        # directory. Chrooting all servers would avoid this but also mean that
1165
 
        # we wouldn't be testing directly against non-root urls. Alternatively
1166
 
        # getting the test framework to start the server with a backing server
1167
 
        # at the actual safety net directory would work too, but this then
1168
 
        # means that the self.get_url/self.get_transport methods would need
1169
 
        # to transform all their results. On balance its cleaner to handle it
1170
 
        # here, and permit a higher url when we have one of these transports.
1171
 
        if t.base.endswith('/work/'):
1172
 
            # we have safety net/test root/work
1173
 
            t = t.clone('../..')
1174
 
        elif isinstance(transport_server,
1175
 
                        test_server.SmartTCPServer_for_testing):
1176
 
            # The smart server adds a path similar to work, which is traversed
1177
 
            # up from by the client. But the server is chrooted - the actual
1178
 
            # backing transport is not escaped from, and VFS requests to the
1179
 
            # root will error (because they try to escape the chroot).
1180
 
            t2 = t.clone('..')
1181
 
            while t2.base != t.base:
1182
 
                t = t2
1183
 
                t2 = t.clone('..')
1184
 
        self.permit_url(t.base)
1185
 
 
1186
 
    def _track_transports(self):
1187
 
        """Install checks for transport usage."""
1188
 
        # TestCase has no safe place it can write to.
1189
 
        self._bzr_selftest_roots = []
1190
 
        # Currently the easiest way to be sure that nothing is going on is to
1191
 
        # hook into bzr dir opening. This leaves a small window of error for
1192
 
        # transport tests, but they are well known, and we can improve on this
1193
 
        # step.
1194
 
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
1195
 
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
774
        saved = ui.ui_factory
 
775
        def _restore():
 
776
            ui.ui_factory = saved
 
777
        ui.ui_factory = ui.SilentUIFactory()
 
778
        self.addCleanup(_restore)
1196
779
 
1197
780
    def _ndiff_strings(self, a, b):
1198
781
        """Return ndiff between two strings containing lines.
1199
 
 
 
782
        
1200
783
        A trailing newline is added if missing to make the strings
1201
784
        print properly."""
1202
785
        if b and b[-1] != '\n':
1216
799
        except UnicodeError, e:
1217
800
            # If we can't compare without getting a UnicodeError, then
1218
801
            # obviously they are different
1219
 
            trace.mutter('UnicodeError: %s', e)
 
802
            mutter('UnicodeError: %s', e)
1220
803
        if message:
1221
804
            message += '\n'
1222
805
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1223
806
            % (message,
1224
 
               pprint.pformat(a), pprint.pformat(b)))
 
807
               pformat(a, indent=4), pformat(b, indent=4)))
1225
808
 
1226
809
    assertEquals = assertEqual
1227
810
 
1228
811
    def assertEqualDiff(self, a, b, message=None):
1229
812
        """Assert two texts are equal, if not raise an exception.
1230
 
 
1231
 
        This is intended for use with multi-line strings where it can
 
813
        
 
814
        This is intended for use with multi-line strings where it can 
1232
815
        be hard to find the differences by eye.
1233
816
        """
1234
817
        # TODO: perhaps override assertEquals to call this for strings?
1236
819
            return
1237
820
        if message is None:
1238
821
            message = "texts not equal:\n"
1239
 
        if a + '\n' == b:
1240
 
            message = 'first string is missing a final newline.\n'
1241
 
        if a == b + '\n':
1242
 
            message = 'second string is missing a final newline.\n'
1243
 
        raise AssertionError(message +
1244
 
                             self._ndiff_strings(a, b))
1245
 
 
 
822
        raise AssertionError(message + 
 
823
                             self._ndiff_strings(a, b))      
 
824
        
1246
825
    def assertEqualMode(self, mode, mode_test):
1247
826
        self.assertEqual(mode, mode_test,
1248
827
                         'mode mismatch %o != %o' % (mode, mode_test))
1249
828
 
1250
 
    def assertEqualStat(self, expected, actual):
1251
 
        """assert that expected and actual are the same stat result.
1252
 
 
1253
 
        :param expected: A stat result.
1254
 
        :param actual: A stat result.
1255
 
        :raises AssertionError: If the expected and actual stat values differ
1256
 
            other than by atime.
1257
 
        """
1258
 
        self.assertEqual(expected.st_size, actual.st_size,
1259
 
                         'st_size did not match')
1260
 
        self.assertEqual(expected.st_mtime, actual.st_mtime,
1261
 
                         'st_mtime did not match')
1262
 
        self.assertEqual(expected.st_ctime, actual.st_ctime,
1263
 
                         'st_ctime did not match')
1264
 
        if sys.platform != 'win32':
1265
 
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1266
 
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1267
 
            # odd. Regardless we shouldn't actually try to assert anything
1268
 
            # about their values
1269
 
            self.assertEqual(expected.st_dev, actual.st_dev,
1270
 
                             'st_dev did not match')
1271
 
            self.assertEqual(expected.st_ino, actual.st_ino,
1272
 
                             'st_ino did not match')
1273
 
        self.assertEqual(expected.st_mode, actual.st_mode,
1274
 
                         'st_mode did not match')
1275
 
 
1276
 
    def assertLength(self, length, obj_with_len):
1277
 
        """Assert that obj_with_len is of length length."""
1278
 
        if len(obj_with_len) != length:
1279
 
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
1280
 
                length, len(obj_with_len), obj_with_len))
1281
 
 
1282
 
    def assertLogsError(self, exception_class, func, *args, **kwargs):
1283
 
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
1284
 
        """
1285
 
        captured = []
1286
 
        orig_log_exception_quietly = trace.log_exception_quietly
1287
 
        try:
1288
 
            def capture():
1289
 
                orig_log_exception_quietly()
1290
 
                captured.append(sys.exc_info())
1291
 
            trace.log_exception_quietly = capture
1292
 
            func(*args, **kwargs)
1293
 
        finally:
1294
 
            trace.log_exception_quietly = orig_log_exception_quietly
1295
 
        self.assertLength(1, captured)
1296
 
        err = captured[0][1]
1297
 
        self.assertIsInstance(err, exception_class)
1298
 
        return err
1299
 
 
1300
 
    def assertPositive(self, val):
1301
 
        """Assert that val is greater than 0."""
1302
 
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1303
 
 
1304
 
    def assertNegative(self, val):
1305
 
        """Assert that val is less than 0."""
1306
 
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1307
 
 
1308
829
    def assertStartsWith(self, s, prefix):
1309
830
        if not s.startswith(prefix):
1310
831
            raise AssertionError('string %r does not start with %r' % (s, prefix))
1314
835
        if not s.endswith(suffix):
1315
836
            raise AssertionError('string %r does not end with %r' % (s, suffix))
1316
837
 
1317
 
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
838
    def assertContainsRe(self, haystack, needle_re):
1318
839
        """Assert that a contains something matching a regular expression."""
1319
 
        if not re.search(needle_re, haystack, flags):
1320
 
            if '\n' in haystack or len(haystack) > 60:
1321
 
                # a long string, format it in a more readable way
1322
 
                raise AssertionError(
1323
 
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
1324
 
                        % (needle_re, haystack))
1325
 
            else:
1326
 
                raise AssertionError('pattern "%s" not found in "%s"'
1327
 
                        % (needle_re, haystack))
 
840
        if not re.search(needle_re, haystack):
 
841
            raise AssertionError('pattern "%r" not found in "%r"'
 
842
                    % (needle_re, haystack))
1328
843
 
1329
 
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
844
    def assertNotContainsRe(self, haystack, needle_re):
1330
845
        """Assert that a does not match a regular expression"""
1331
 
        if re.search(needle_re, haystack, flags):
 
846
        if re.search(needle_re, haystack):
1332
847
            raise AssertionError('pattern "%s" found in "%s"'
1333
848
                    % (needle_re, haystack))
1334
849
 
1335
 
    def assertContainsString(self, haystack, needle):
1336
 
        if haystack.find(needle) == -1:
1337
 
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1338
 
 
1339
 
    def assertNotContainsString(self, haystack, needle):
1340
 
        if haystack.find(needle) != -1:
1341
 
            self.fail("string %r found in '''%s'''" % (needle, haystack))
1342
 
 
1343
850
    def assertSubset(self, sublist, superlist):
1344
851
        """Assert that every entry in sublist is present in superlist."""
1345
 
        missing = set(sublist) - set(superlist)
 
852
        missing = []
 
853
        for entry in sublist:
 
854
            if entry not in superlist:
 
855
                missing.append(entry)
1346
856
        if len(missing) > 0:
1347
 
            raise AssertionError("value(s) %r not present in container %r" %
 
857
            raise AssertionError("value(s) %r not present in container %r" % 
1348
858
                                 (missing, superlist))
1349
859
 
1350
860
    def assertListRaises(self, excClass, func, *args, **kwargs):
1351
861
        """Fail unless excClass is raised when the iterator from func is used.
1352
 
 
 
862
        
1353
863
        Many functions can return generators this makes sure
1354
864
        to wrap them in a list() call to make sure the whole generator
1355
865
        is run, and that the proper exception is raised.
1356
866
        """
1357
867
        try:
1358
868
            list(func(*args, **kwargs))
1359
 
        except excClass, e:
1360
 
            return e
 
869
        except excClass:
 
870
            return
1361
871
        else:
1362
872
            if getattr(excClass,'__name__', None) is not None:
1363
873
                excName = excClass.__name__
1365
875
                excName = str(excClass)
1366
876
            raise self.failureException, "%s not raised" % excName
1367
877
 
1368
 
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
878
    def assertRaises(self, excClass, func, *args, **kwargs):
1369
879
        """Assert that a callable raises a particular exception.
1370
880
 
1371
881
        :param excClass: As for the except statement, this may be either an
1372
 
            exception class, or a tuple of classes.
1373
 
        :param callableObj: A callable, will be passed ``*args`` and
1374
 
            ``**kwargs``.
 
882
        exception class, or a tuple of classes.
1375
883
 
1376
884
        Returns the exception so that you can examine it.
1377
885
        """
1378
886
        try:
1379
 
            callableObj(*args, **kwargs)
 
887
            func(*args, **kwargs)
1380
888
        except excClass, e:
1381
889
            return e
1382
890
        else:
1402
910
                raise AssertionError("%r is %r." % (left, right))
1403
911
 
1404
912
    def assertTransportMode(self, transport, path, mode):
1405
 
        """Fail if a path does not have mode "mode".
1406
 
 
 
913
        """Fail if a path does not have mode mode.
 
914
        
1407
915
        If modes are not supported on this transport, the assertion is ignored.
1408
916
        """
1409
917
        if not transport._can_roundtrip_unix_modebits():
1411
919
        path_stat = transport.stat(path)
1412
920
        actual_mode = stat.S_IMODE(path_stat.st_mode)
1413
921
        self.assertEqual(mode, actual_mode,
1414
 
                         'mode of %r incorrect (%s != %s)'
1415
 
                         % (path, oct(mode), oct(actual_mode)))
1416
 
 
1417
 
    def assertIsSameRealPath(self, path1, path2):
1418
 
        """Fail if path1 and path2 points to different files"""
1419
 
        self.assertEqual(osutils.realpath(path1),
1420
 
                         osutils.realpath(path2),
1421
 
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1422
 
 
1423
 
    def assertIsInstance(self, obj, kls, msg=None):
1424
 
        """Fail if obj is not an instance of kls
1425
 
        
1426
 
        :param msg: Supplementary message to show if the assertion fails.
1427
 
        """
 
922
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
923
 
 
924
    def assertIsInstance(self, obj, kls):
 
925
        """Fail if obj is not an instance of kls"""
1428
926
        if not isinstance(obj, kls):
1429
 
            m = "%r is an instance of %s rather than %s" % (
1430
 
                obj, obj.__class__, kls)
1431
 
            if msg:
1432
 
                m += ": " + msg
1433
 
            self.fail(m)
1434
 
 
1435
 
    def assertFileEqual(self, content, path):
1436
 
        """Fail if path does not contain 'content'."""
1437
 
        self.failUnlessExists(path)
1438
 
        f = file(path, 'rb')
 
927
            self.fail("%r is an instance of %s rather than %s" % (
 
928
                obj, obj.__class__, kls))
 
929
 
 
930
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
931
        """Invoke a test, expecting it to fail for the given reason.
 
932
 
 
933
        This is for assertions that ought to succeed, but currently fail.
 
934
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
935
        about the failure you're expecting.  If a new bug is introduced,
 
936
        AssertionError should be raised, not KnownFailure.
 
937
 
 
938
        Frequently, expectFailure should be followed by an opposite assertion.
 
939
        See example below.
 
940
 
 
941
        Intended to be used with a callable that raises AssertionError as the
 
942
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
943
 
 
944
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
945
        test succeeds.
 
946
 
 
947
        example usage::
 
948
 
 
949
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
950
                             dynamic_val)
 
951
          self.assertEqual(42, dynamic_val)
 
952
 
 
953
          This means that a dynamic_val of 54 will cause the test to raise
 
954
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
955
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
956
          than 54 or 42 will cause an AssertionError.
 
957
        """
1439
958
        try:
1440
 
            s = f.read()
1441
 
        finally:
1442
 
            f.close()
1443
 
        self.assertEqualDiff(content, s)
1444
 
 
1445
 
    def assertDocstring(self, expected_docstring, obj):
1446
 
        """Fail if obj does not have expected_docstring"""
1447
 
        if __doc__ is None:
1448
 
            # With -OO the docstring should be None instead
1449
 
            self.assertIs(obj.__doc__, None)
1450
 
        else:
1451
 
            self.assertEqual(expected_docstring, obj.__doc__)
1452
 
 
1453
 
    def failUnlessExists(self, path):
1454
 
        """Fail unless path or paths, which may be abs or relative, exist."""
1455
 
        if not isinstance(path, basestring):
1456
 
            for p in path:
1457
 
                self.failUnlessExists(p)
1458
 
        else:
1459
 
            self.failUnless(osutils.lexists(path),path+" does not exist")
1460
 
 
1461
 
    def failIfExists(self, path):
1462
 
        """Fail if path or paths, which may be abs or relative, exist."""
1463
 
        if not isinstance(path, basestring):
1464
 
            for p in path:
1465
 
                self.failIfExists(p)
1466
 
        else:
1467
 
            self.failIf(osutils.lexists(path),path+" exists")
1468
 
 
1469
 
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
959
            assertion(*args, **kwargs)
 
960
        except AssertionError:
 
961
            raise KnownFailure(reason)
 
962
        else:
 
963
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
964
 
 
965
    def _capture_warnings(self, a_callable, *args, **kwargs):
1470
966
        """A helper for callDeprecated and applyDeprecated.
1471
967
 
1472
968
        :param a_callable: A callable to call.
1473
969
        :param args: The positional arguments for the callable
1474
970
        :param kwargs: The keyword arguments for the callable
1475
971
        :return: A tuple (warnings, result). result is the result of calling
1476
 
            a_callable(``*args``, ``**kwargs``).
 
972
            a_callable(*args, **kwargs).
1477
973
        """
1478
974
        local_warnings = []
1479
975
        def capture_warnings(msg, cls=None, stacklevel=None):
1492
988
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1493
989
        """Call a deprecated callable without warning the user.
1494
990
 
1495
 
        Note that this only captures warnings raised by symbol_versioning.warn,
1496
 
        not other callers that go direct to the warning module.
1497
 
 
1498
 
        To test that a deprecated method raises an error, do something like
1499
 
        this::
1500
 
 
1501
 
            self.assertRaises(errors.ReservedId,
1502
 
                self.applyDeprecated,
1503
 
                deprecated_in((1, 5, 0)),
1504
 
                br.append_revision,
1505
 
                'current:')
1506
 
 
1507
991
        :param deprecation_format: The deprecation format that the callable
1508
 
            should have been deprecated with. This is the same type as the
1509
 
            parameter to deprecated_method/deprecated_function. If the
 
992
            should have been deprecated with. This is the same type as the 
 
993
            parameter to deprecated_method/deprecated_function. If the 
1510
994
            callable is not deprecated with this format, an assertion error
1511
995
            will be raised.
1512
996
        :param a_callable: A callable to call. This may be a bound method or
1513
 
            a regular function. It will be called with ``*args`` and
1514
 
            ``**kwargs``.
 
997
            a regular function. It will be called with *args and **kwargs.
1515
998
        :param args: The positional arguments for the callable
1516
999
        :param kwargs: The keyword arguments for the callable
1517
 
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1000
        :return: The result of a_callable(*args, **kwargs)
1518
1001
        """
1519
 
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1002
        call_warnings, result = self._capture_warnings(a_callable,
1520
1003
            *args, **kwargs)
1521
1004
        expected_first_warning = symbol_versioning.deprecation_string(
1522
1005
            a_callable, deprecation_format)
1526
1009
        self.assertEqual(expected_first_warning, call_warnings[0])
1527
1010
        return result
1528
1011
 
1529
 
    def callCatchWarnings(self, fn, *args, **kw):
1530
 
        """Call a callable that raises python warnings.
1531
 
 
1532
 
        The caller's responsible for examining the returned warnings.
1533
 
 
1534
 
        If the callable raises an exception, the exception is not
1535
 
        caught and propagates up to the caller.  In that case, the list
1536
 
        of warnings is not available.
1537
 
 
1538
 
        :returns: ([warning_object, ...], fn_result)
1539
 
        """
1540
 
        # XXX: This is not perfect, because it completely overrides the
1541
 
        # warnings filters, and some code may depend on suppressing particular
1542
 
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
1543
 
        # though.  -- Andrew, 20071062
1544
 
        wlist = []
1545
 
        def _catcher(message, category, filename, lineno, file=None, line=None):
1546
 
            # despite the name, 'message' is normally(?) a Warning subclass
1547
 
            # instance
1548
 
            wlist.append(message)
1549
 
        saved_showwarning = warnings.showwarning
1550
 
        saved_filters = warnings.filters
1551
 
        try:
1552
 
            warnings.showwarning = _catcher
1553
 
            warnings.filters = []
1554
 
            result = fn(*args, **kw)
1555
 
        finally:
1556
 
            warnings.showwarning = saved_showwarning
1557
 
            warnings.filters = saved_filters
1558
 
        return wlist, result
1559
 
 
1560
1012
    def callDeprecated(self, expected, callable, *args, **kwargs):
1561
1013
        """Assert that a callable is deprecated in a particular way.
1562
1014
 
1563
 
        This is a very precise test for unusual requirements. The
 
1015
        This is a very precise test for unusual requirements. The 
1564
1016
        applyDeprecated helper function is probably more suited for most tests
1565
1017
        as it allows you to simply specify the deprecation format being used
1566
1018
        and will ensure that that is issued for the function being called.
1567
1019
 
1568
 
        Note that this only captures warnings raised by symbol_versioning.warn,
1569
 
        not other callers that go direct to the warning module.  To catch
1570
 
        general warnings, use callCatchWarnings.
1571
 
 
1572
1020
        :param expected: a list of the deprecation warnings expected, in order
1573
1021
        :param callable: The callable to call
1574
1022
        :param args: The positional arguments for the callable
1575
1023
        :param kwargs: The keyword arguments for the callable
1576
1024
        """
1577
 
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1025
        call_warnings, result = self._capture_warnings(callable,
1578
1026
            *args, **kwargs)
1579
1027
        self.assertEqual(expected, call_warnings)
1580
1028
        return result
1584
1032
 
1585
1033
        The file is removed as the test is torn down.
1586
1034
        """
1587
 
        self._log_file = StringIO()
1588
 
        self._log_memento = trace.push_log_file(self._log_file)
 
1035
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1036
        self._log_file = os.fdopen(fileno, 'w+')
 
1037
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1038
        self._log_file_name = name
1589
1039
        self.addCleanup(self._finishLogFile)
1590
1040
 
1591
1041
    def _finishLogFile(self):
1593
1043
 
1594
1044
        Close the file and delete it, unless setKeepLogfile was called.
1595
1045
        """
1596
 
        if trace._trace_file:
1597
 
            # flush the log file, to get all content
1598
 
            trace._trace_file.flush()
1599
 
        trace.pop_log_file(self._log_memento)
1600
 
        # Cache the log result and delete the file on disk
1601
 
        self._get_log(False)
1602
 
 
1603
 
    def thisFailsStrictLockCheck(self):
1604
 
        """It is known that this test would fail with -Dstrict_locks.
1605
 
 
1606
 
        By default, all tests are run with strict lock checking unless
1607
 
        -Edisable_lock_checks is supplied. However there are some tests which
1608
 
        we know fail strict locks at this point that have not been fixed.
1609
 
        They should call this function to disable the strict checking.
1610
 
 
1611
 
        This should be used sparingly, it is much better to fix the locking
1612
 
        issues rather than papering over the problem by calling this function.
1613
 
        """
1614
 
        debug.debug_flags.discard('strict_locks')
1615
 
 
1616
 
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1617
 
        """Overrides an object attribute restoring it after the test.
1618
 
 
1619
 
        :param obj: The object that will be mutated.
1620
 
 
1621
 
        :param attr_name: The attribute name we want to preserve/override in
1622
 
            the object.
1623
 
 
1624
 
        :param new: The optional value we want to set the attribute to.
1625
 
 
1626
 
        :returns: The actual attr value.
1627
 
        """
1628
 
        value = getattr(obj, attr_name)
1629
 
        # The actual value is captured by the call below
1630
 
        self.addCleanup(setattr, obj, attr_name, value)
1631
 
        if new is not _unitialized_attr:
1632
 
            setattr(obj, attr_name, new)
1633
 
        return value
1634
 
 
1635
 
    def overrideEnv(self, name, new):
1636
 
        """Set an environment variable, and reset it after the test.
1637
 
 
1638
 
        :param name: The environment variable name.
1639
 
 
1640
 
        :param new: The value to set the variable to. If None, the 
1641
 
            variable is deleted from the environment.
1642
 
 
1643
 
        :returns: The actual variable value.
1644
 
        """
1645
 
        value = osutils.set_or_unset_env(name, new)
1646
 
        self.addCleanup(osutils.set_or_unset_env, name, value)
1647
 
        return value
 
1046
        if self._log_file is None:
 
1047
            return
 
1048
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1049
        self._log_file.close()
 
1050
        self._log_file = None
 
1051
        if not self._keep_log_file:
 
1052
            os.remove(self._log_file_name)
 
1053
            self._log_file_name = None
 
1054
 
 
1055
    def setKeepLogfile(self):
 
1056
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1057
        self._keep_log_file = True
 
1058
 
 
1059
    def addCleanup(self, callable):
 
1060
        """Arrange to run a callable when this case is torn down.
 
1061
 
 
1062
        Callables are run in the reverse of the order they are registered, 
 
1063
        ie last-in first-out.
 
1064
        """
 
1065
        if callable in self._cleanups:
 
1066
            raise ValueError("cleanup function %r already registered on %s" 
 
1067
                    % (callable, self))
 
1068
        self._cleanups.append(callable)
1648
1069
 
1649
1070
    def _cleanEnvironment(self):
1650
 
        for name, value in isolated_environ.iteritems():
1651
 
            self.overrideEnv(name, value)
 
1071
        new_env = {
 
1072
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1073
            'HOME': os.getcwd(),
 
1074
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1075
            'BZR_EMAIL': None,
 
1076
            'BZREMAIL': None, # may still be present in the environment
 
1077
            'EMAIL': None,
 
1078
            'BZR_PROGRESS_BAR': None,
 
1079
            # Proxies
 
1080
            'http_proxy': None,
 
1081
            'HTTP_PROXY': None,
 
1082
            'https_proxy': None,
 
1083
            'HTTPS_PROXY': None,
 
1084
            'no_proxy': None,
 
1085
            'NO_PROXY': None,
 
1086
            'all_proxy': None,
 
1087
            'ALL_PROXY': None,
 
1088
            # Nobody cares about these ones AFAIK. So far at
 
1089
            # least. If you do (care), please update this comment
 
1090
            # -- vila 20061212
 
1091
            'ftp_proxy': None,
 
1092
            'FTP_PROXY': None,
 
1093
        }
 
1094
        self.__old_env = {}
 
1095
        self.addCleanup(self._restoreEnvironment)
 
1096
        for name, value in new_env.iteritems():
 
1097
            self._captureVar(name, value)
 
1098
 
 
1099
    def _captureVar(self, name, newvalue):
 
1100
        """Set an environment variable, and reset it when finished."""
 
1101
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1102
 
 
1103
    def _restoreEnvironment(self):
 
1104
        for name, value in self.__old_env.iteritems():
 
1105
            osutils.set_or_unset_env(name, value)
1652
1106
 
1653
1107
    def _restoreHooks(self):
1654
 
        for klass, (name, hooks) in self._preserved_hooks.items():
1655
 
            setattr(klass, name, hooks)
 
1108
        for klass, hooks in self._preserved_hooks.items():
 
1109
            setattr(klass, 'hooks', hooks)
1656
1110
 
1657
1111
    def knownFailure(self, reason):
1658
1112
        """This test has failed for some known reason."""
1659
1113
        raise KnownFailure(reason)
1660
1114
 
1661
 
    def _suppress_log(self):
1662
 
        """Remove the log info from details."""
1663
 
        self.discardDetail('log')
1664
 
 
1665
 
    def _do_skip(self, result, reason):
1666
 
        self._suppress_log()
1667
 
        addSkip = getattr(result, 'addSkip', None)
1668
 
        if not callable(addSkip):
1669
 
            result.addSuccess(result)
1670
 
        else:
1671
 
            addSkip(self, reason)
1672
 
 
1673
 
    @staticmethod
1674
 
    def _do_known_failure(self, result, e):
1675
 
        self._suppress_log()
1676
 
        err = sys.exc_info()
1677
 
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1678
 
        if addExpectedFailure is not None:
1679
 
            addExpectedFailure(self, err)
1680
 
        else:
1681
 
            result.addSuccess(self)
1682
 
 
1683
 
    @staticmethod
1684
 
    def _do_not_applicable(self, result, e):
1685
 
        if not e.args:
1686
 
            reason = 'No reason given'
1687
 
        else:
1688
 
            reason = e.args[0]
1689
 
        self._suppress_log ()
1690
 
        addNotApplicable = getattr(result, 'addNotApplicable', None)
1691
 
        if addNotApplicable is not None:
1692
 
            result.addNotApplicable(self, reason)
1693
 
        else:
1694
 
            self._do_skip(result, reason)
1695
 
 
1696
 
    @staticmethod
1697
 
    def _report_skip(self, result, err):
1698
 
        """Override the default _report_skip.
1699
 
 
1700
 
        We want to strip the 'log' detail. If we waint until _do_skip, it has
1701
 
        already been formatted into the 'reason' string, and we can't pull it
1702
 
        out again.
1703
 
        """
1704
 
        self._suppress_log()
1705
 
        super(TestCase, self)._report_skip(self, result, err)
1706
 
 
1707
 
    @staticmethod
1708
 
    def _report_expected_failure(self, result, err):
1709
 
        """Strip the log.
1710
 
 
1711
 
        See _report_skip for motivation.
1712
 
        """
1713
 
        self._suppress_log()
1714
 
        super(TestCase, self)._report_expected_failure(self, result, err)
1715
 
 
1716
 
    @staticmethod
1717
 
    def _do_unsupported_or_skip(self, result, e):
1718
 
        reason = e.args[0]
1719
 
        self._suppress_log()
1720
 
        addNotSupported = getattr(result, 'addNotSupported', None)
1721
 
        if addNotSupported is not None:
1722
 
            result.addNotSupported(self, reason)
1723
 
        else:
1724
 
            self._do_skip(result, reason)
 
1115
    def run(self, result=None):
 
1116
        if result is None: result = self.defaultTestResult()
 
1117
        for feature in getattr(self, '_test_needs_features', []):
 
1118
            if not feature.available():
 
1119
                result.startTest(self)
 
1120
                if getattr(result, 'addNotSupported', None):
 
1121
                    result.addNotSupported(self, feature)
 
1122
                else:
 
1123
                    result.addSuccess(self)
 
1124
                result.stopTest(self)
 
1125
                return
 
1126
        return unittest.TestCase.run(self, result)
 
1127
 
 
1128
    def tearDown(self):
 
1129
        self._runCleanups()
 
1130
        unittest.TestCase.tearDown(self)
1725
1131
 
1726
1132
    def time(self, callable, *args, **kwargs):
1727
1133
        """Run callable and accrue the time it takes to the benchmark time.
1728
 
 
 
1134
        
1729
1135
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1730
1136
        this will cause lsprofile statistics to be gathered and stored in
1731
1137
        self._benchcalls.
1732
1138
        """
1733
1139
        if self._benchtime is None:
1734
 
            self.addDetail('benchtime', content.Content(content.ContentType(
1735
 
                "text", "plain"), lambda:[str(self._benchtime)]))
1736
1140
            self._benchtime = 0
1737
1141
        start = time.time()
1738
1142
        try:
1747
1151
        finally:
1748
1152
            self._benchtime += time.time() - start
1749
1153
 
 
1154
    def _runCleanups(self):
 
1155
        """Run registered cleanup functions. 
 
1156
 
 
1157
        This should only be called from TestCase.tearDown.
 
1158
        """
 
1159
        # TODO: Perhaps this should keep running cleanups even if 
 
1160
        # one of them fails?
 
1161
 
 
1162
        # Actually pop the cleanups from the list so tearDown running
 
1163
        # twice is safe (this happens for skipped tests).
 
1164
        while self._cleanups:
 
1165
            self._cleanups.pop()()
 
1166
 
1750
1167
    def log(self, *args):
1751
 
        trace.mutter(*args)
 
1168
        mutter(*args)
1752
1169
 
1753
1170
    def _get_log(self, keep_log_file=False):
1754
 
        """Internal helper to get the log from bzrlib.trace for this test.
1755
 
 
1756
 
        Please use self.getDetails, or self.get_log to access this in test case
1757
 
        code.
1758
 
 
1759
 
        :param keep_log_file: When True, if the log is still a file on disk
1760
 
            leave it as a file on disk. When False, if the log is still a file
1761
 
            on disk, the log file is deleted and the log preserved as
1762
 
            self._log_contents.
1763
 
        :return: A string containing the log.
1764
 
        """
1765
 
        if self._log_contents is not None:
1766
 
            try:
1767
 
                self._log_contents.decode('utf8')
1768
 
            except UnicodeDecodeError:
1769
 
                unicodestr = self._log_contents.decode('utf8', 'replace')
1770
 
                self._log_contents = unicodestr.encode('utf8')
 
1171
        """Return as a string the log for this test. If the file is still
 
1172
        on disk and keep_log_file=False, delete the log file and store the
 
1173
        content in self._log_contents."""
 
1174
        # flush the log file, to get all content
 
1175
        import bzrlib.trace
 
1176
        bzrlib.trace._trace_file.flush()
 
1177
        if self._log_contents:
1771
1178
            return self._log_contents
1772
 
        if self._log_file is not None:
1773
 
            log_contents = self._log_file.getvalue()
 
1179
        if self._log_file_name is not None:
 
1180
            logfile = open(self._log_file_name)
1774
1181
            try:
1775
 
                log_contents.decode('utf8')
1776
 
            except UnicodeDecodeError:
1777
 
                unicodestr = log_contents.decode('utf8', 'replace')
1778
 
                log_contents = unicodestr.encode('utf8')
 
1182
                log_contents = logfile.read()
 
1183
            finally:
 
1184
                logfile.close()
1779
1185
            if not keep_log_file:
1780
 
                self._log_file = None
1781
 
                # Permit multiple calls to get_log until we clean it up in
1782
 
                # finishLogFile
1783
1186
                self._log_contents = log_contents
 
1187
                try:
 
1188
                    os.remove(self._log_file_name)
 
1189
                except OSError, e:
 
1190
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1191
                        print >>sys.stderr, ('Unable to delete log file '
 
1192
                                             ' %r' % self._log_file_name)
 
1193
                    else:
 
1194
                        raise
1784
1195
            return log_contents
1785
1196
        else:
1786
 
            return "No log file content."
1787
 
 
1788
 
    def get_log(self):
1789
 
        """Get a unicode string containing the log from bzrlib.trace.
1790
 
 
1791
 
        Undecodable characters are replaced.
1792
 
        """
1793
 
        return u"".join(self.getDetails()['log'].iter_text())
 
1197
            return "DELETED log file to reduce memory footprint"
 
1198
 
 
1199
    def capture(self, cmd, retcode=0):
 
1200
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1201
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1794
1202
 
1795
1203
    def requireFeature(self, feature):
1796
1204
        """This test requires a specific feature is available.
1800
1208
        if not feature.available():
1801
1209
            raise UnavailableFeature(feature)
1802
1210
 
1803
 
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1804
 
            working_dir):
1805
 
        """Run bazaar command line, splitting up a string command line."""
1806
 
        if isinstance(args, basestring):
1807
 
            # shlex don't understand unicode strings,
1808
 
            # so args should be plain string (bialix 20070906)
1809
 
            args = list(shlex.split(str(args)))
1810
 
        return self._run_bzr_core(args, retcode=retcode,
1811
 
                encoding=encoding, stdin=stdin, working_dir=working_dir,
1812
 
                )
1813
 
 
1814
 
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1815
 
            working_dir):
1816
 
        # Clear chk_map page cache, because the contents are likely to mask
1817
 
        # locking errors.
1818
 
        chk_map.clear_cache()
 
1211
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1212
                         working_dir=None):
 
1213
        """Invoke bzr and return (stdout, stderr).
 
1214
 
 
1215
        Useful for code that wants to check the contents of the
 
1216
        output, the way error messages are presented, etc.
 
1217
 
 
1218
        This should be the main method for tests that want to exercise the
 
1219
        overall behavior of the bzr application (rather than a unit test
 
1220
        or a functional test of the library.)
 
1221
 
 
1222
        Much of the old code runs bzr by forking a new copy of Python, but
 
1223
        that is slower, harder to debug, and generally not necessary.
 
1224
 
 
1225
        This runs bzr through the interface that catches and reports
 
1226
        errors, and with logging set to something approximating the
 
1227
        default, so that error reporting can be checked.
 
1228
 
 
1229
        :param argv: arguments to invoke bzr
 
1230
        :param retcode: expected return code, or None for don't-care.
 
1231
        :param encoding: encoding for sys.stdout and sys.stderr
 
1232
        :param stdin: A string to be used as stdin for the command.
 
1233
        :param working_dir: Change to this directory before running
 
1234
        """
1819
1235
        if encoding is None:
1820
 
            encoding = osutils.get_user_encoding()
 
1236
            encoding = bzrlib.user_encoding
1821
1237
        stdout = StringIOWrapper()
1822
1238
        stderr = StringIOWrapper()
1823
1239
        stdout.encoding = encoding
1824
1240
        stderr.encoding = encoding
1825
1241
 
1826
 
        self.log('run bzr: %r', args)
 
1242
        self.log('run bzr: %r', argv)
1827
1243
        # FIXME: don't call into logging here
1828
1244
        handler = logging.StreamHandler(stderr)
1829
1245
        handler.setLevel(logging.INFO)
1838
1254
            os.chdir(working_dir)
1839
1255
 
1840
1256
        try:
 
1257
            saved_debug_flags = frozenset(debug.debug_flags)
 
1258
            debug.debug_flags.clear()
1841
1259
            try:
1842
 
                result = self.apply_redirected(
1843
 
                    ui.ui_factory.stdin,
1844
 
                    stdout, stderr,
1845
 
                    _mod_commands.run_bzr_catch_user_errors,
1846
 
                    args)
1847
 
            except KeyboardInterrupt:
1848
 
                # Reraise KeyboardInterrupt with contents of redirected stdout
1849
 
                # and stderr as arguments, for tests which are interested in
1850
 
                # stdout and stderr and are expecting the exception.
1851
 
                out = stdout.getvalue()
1852
 
                err = stderr.getvalue()
1853
 
                if out:
1854
 
                    self.log('output:\n%r', out)
1855
 
                if err:
1856
 
                    self.log('errors:\n%r', err)
1857
 
                raise KeyboardInterrupt(out, err)
 
1260
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1261
                                               stdout, stderr,
 
1262
                                               bzrlib.commands.run_bzr_catch_errors,
 
1263
                                               argv)
 
1264
            finally:
 
1265
                debug.debug_flags.update(saved_debug_flags)
1858
1266
        finally:
1859
1267
            logger.removeHandler(handler)
1860
1268
            ui.ui_factory = old_ui_factory
1868
1276
        if err:
1869
1277
            self.log('errors:\n%r', err)
1870
1278
        if retcode is not None:
1871
 
            self.assertEquals(retcode, result,
1872
 
                              message='Unexpected return code')
1873
 
        return result, out, err
 
1279
            self.assertEquals(retcode, result)
 
1280
        return out, err
1874
1281
 
1875
 
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1876
 
                working_dir=None, error_regexes=[], output_encoding=None):
 
1282
    def run_bzr(self, *args, **kwargs):
1877
1283
        """Invoke bzr, as if it were run from the command line.
1878
1284
 
1879
 
        The argument list should not include the bzr program name - the
1880
 
        first argument is normally the bzr command.  Arguments may be
1881
 
        passed in three ways:
1882
 
 
1883
 
        1- A list of strings, eg ["commit", "a"].  This is recommended
1884
 
        when the command contains whitespace or metacharacters, or
1885
 
        is built up at run time.
1886
 
 
1887
 
        2- A single string, eg "add a".  This is the most convenient
1888
 
        for hardcoded commands.
1889
 
 
1890
 
        This runs bzr through the interface that catches and reports
1891
 
        errors, and with logging set to something approximating the
1892
 
        default, so that error reporting can be checked.
1893
 
 
1894
1285
        This should be the main method for tests that want to exercise the
1895
1286
        overall behavior of the bzr application (rather than a unit test
1896
1287
        or a functional test of the library.)
1898
1289
        This sends the stdout/stderr results into the test's log,
1899
1290
        where it may be useful for debugging.  See also run_captured.
1900
1291
 
1901
 
        :keyword stdin: A string to be used as stdin for the command.
1902
 
        :keyword retcode: The status code the command should return;
1903
 
            default 0.
1904
 
        :keyword working_dir: The directory to run the command in
1905
 
        :keyword error_regexes: A list of expected error messages.  If
1906
 
            specified they must be seen in the error output of the command.
 
1292
        :param stdin: A string to be used as stdin for the command.
 
1293
        :param retcode: The status code the command should return
 
1294
        :param working_dir: The directory to run the command in
1907
1295
        """
1908
 
        retcode, out, err = self._run_bzr_autosplit(
1909
 
            args=args,
1910
 
            retcode=retcode,
1911
 
            encoding=encoding,
1912
 
            stdin=stdin,
1913
 
            working_dir=working_dir,
1914
 
            )
1915
 
        self.assertIsInstance(error_regexes, (list, tuple))
1916
 
        for regex in error_regexes:
1917
 
            self.assertContainsRe(err, regex)
1918
 
        return out, err
 
1296
        retcode = kwargs.pop('retcode', 0)
 
1297
        encoding = kwargs.pop('encoding', None)
 
1298
        stdin = kwargs.pop('stdin', None)
 
1299
        working_dir = kwargs.pop('working_dir', None)
 
1300
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
1301
                                     stdin=stdin, working_dir=working_dir)
 
1302
 
 
1303
    def run_bzr_decode(self, *args, **kwargs):
 
1304
        if 'encoding' in kwargs:
 
1305
            encoding = kwargs['encoding']
 
1306
        else:
 
1307
            encoding = bzrlib.user_encoding
 
1308
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1919
1309
 
1920
1310
    def run_bzr_error(self, error_regexes, *args, **kwargs):
1921
1311
        """Run bzr, and check that stderr contains the supplied regexes
1922
 
 
1923
 
        :param error_regexes: Sequence of regular expressions which
 
1312
        
 
1313
        :param error_regexes: Sequence of regular expressions which 
1924
1314
            must each be found in the error output. The relative ordering
1925
1315
            is not enforced.
1926
1316
        :param args: command-line arguments for bzr
1927
1317
        :param kwargs: Keyword arguments which are interpreted by run_bzr
1928
1318
            This function changes the default value of retcode to be 3,
1929
1319
            since in most cases this is run when you expect bzr to fail.
1930
 
 
1931
 
        :return: (out, err) The actual output of running the command (in case
1932
 
            you want to do more inspection)
1933
 
 
1934
 
        Examples of use::
1935
 
 
 
1320
        :return: (out, err) The actual output of running the command (in case you
 
1321
                 want to do more inspection)
 
1322
 
 
1323
        Examples of use:
1936
1324
            # Make sure that commit is failing because there is nothing to do
1937
1325
            self.run_bzr_error(['no changes to commit'],
1938
 
                               ['commit', '-m', 'my commit comment'])
 
1326
                               'commit', '-m', 'my commit comment')
1939
1327
            # Make sure --strict is handling an unknown file, rather than
1940
1328
            # giving us the 'nothing to do' error
1941
1329
            self.build_tree(['unknown'])
1942
1330
            self.run_bzr_error(['Commit refused because there are unknown files'],
1943
 
                               ['commit', --strict', '-m', 'my commit comment'])
 
1331
                               'commit', '--strict', '-m', 'my commit comment')
1944
1332
        """
1945
1333
        kwargs.setdefault('retcode', 3)
1946
 
        kwargs['error_regexes'] = error_regexes
1947
1334
        out, err = self.run_bzr(*args, **kwargs)
 
1335
        for regex in error_regexes:
 
1336
            self.assertContainsRe(err, regex)
1948
1337
        return out, err
1949
1338
 
1950
1339
    def run_bzr_subprocess(self, *args, **kwargs):
1951
1340
        """Run bzr in a subprocess for testing.
1952
1341
 
1953
 
        This starts a new Python interpreter and runs bzr in there.
 
1342
        This starts a new Python interpreter and runs bzr in there. 
1954
1343
        This should only be used for tests that have a justifiable need for
1955
1344
        this isolation: e.g. they are testing startup time, or signal
1956
 
        handling, or early startup code, etc.  Subprocess code can't be
 
1345
        handling, or early startup code, etc.  Subprocess code can't be 
1957
1346
        profiled or debugged so easily.
1958
1347
 
1959
 
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1348
        :param retcode: The status code that is expected.  Defaults to 0.  If
1960
1349
            None is supplied, the status code is not checked.
1961
 
        :keyword env_changes: A dictionary which lists changes to environment
 
1350
        :param env_changes: A dictionary which lists changes to environment
1962
1351
            variables. A value of None will unset the env variable.
1963
1352
            The values must be strings. The change will only occur in the
1964
1353
            child, so you don't need to fix the environment after running.
1965
 
        :keyword universal_newlines: Convert CRLF => LF
1966
 
        :keyword allow_plugins: By default the subprocess is run with
 
1354
        :param universal_newlines: Convert CRLF => LF
 
1355
        :param allow_plugins: By default the subprocess is run with
1967
1356
            --no-plugins to ensure test reproducibility. Also, it is possible
1968
1357
            for system-wide plugins to create unexpected output on stderr,
1969
1358
            which can cause unnecessary test failures.
1971
1360
        env_changes = kwargs.get('env_changes', {})
1972
1361
        working_dir = kwargs.get('working_dir', None)
1973
1362
        allow_plugins = kwargs.get('allow_plugins', False)
1974
 
        if len(args) == 1:
1975
 
            if isinstance(args[0], list):
1976
 
                args = args[0]
1977
 
            elif isinstance(args[0], basestring):
1978
 
                args = list(shlex.split(args[0]))
1979
 
        else:
1980
 
            raise ValueError("passing varargs to run_bzr_subprocess")
1981
1363
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
1982
1364
                                            working_dir=working_dir,
1983
1365
                                            allow_plugins=allow_plugins)
2000
1382
        profiled or debugged so easily.
2001
1383
 
2002
1384
        :param process_args: a list of arguments to pass to the bzr executable,
2003
 
            for example ``['--version']``.
 
1385
            for example `['--version']`.
2004
1386
        :param env_changes: A dictionary which lists changes to environment
2005
1387
            variables. A value of None will unset the env variable.
2006
1388
            The values must be strings. The change will only occur in the
2007
1389
            child, so you don't need to fix the environment after running.
2008
 
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2009
 
            doesn't support signalling subprocesses.
 
1390
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1391
            is not available.
2010
1392
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
2011
1393
 
2012
1394
        :returns: Popen object for the started process.
2013
1395
        """
2014
1396
        if skip_if_plan_to_signal:
2015
 
            if os.name != "posix":
2016
 
                raise TestSkipped("Sending signals not supported")
 
1397
            if not getattr(os, 'kill', None):
 
1398
                raise TestSkipped("os.kill not available.")
2017
1399
 
2018
1400
        if env_changes is None:
2019
1401
            env_changes = {}
2039
1421
            # so we will avoid using it on all platforms, just to
2040
1422
            # make sure the code path is used, and we don't break on win32
2041
1423
            cleanup_environment()
2042
 
            command = [sys.executable]
2043
 
            # frozen executables don't need the path to bzr
2044
 
            if getattr(sys, "frozen", None) is None:
2045
 
                command.append(bzr_path)
 
1424
            command = [sys.executable, bzr_path]
2046
1425
            if not allow_plugins:
2047
1426
                command.append('--no-plugins')
2048
1427
            command.extend(process_args)
2049
 
            process = self._popen(command, stdin=subprocess.PIPE,
2050
 
                                  stdout=subprocess.PIPE,
2051
 
                                  stderr=subprocess.PIPE)
 
1428
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
2052
1429
        finally:
2053
1430
            restore_environment()
2054
1431
            if cwd is not None:
2062
1439
        Allows tests to override this method to intercept the calls made to
2063
1440
        Popen for introspection.
2064
1441
        """
2065
 
        return subprocess.Popen(*args, **kwargs)
2066
 
 
2067
 
    def get_source_path(self):
2068
 
        """Return the path of the directory containing bzrlib."""
2069
 
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
1442
        return Popen(*args, **kwargs)
2070
1443
 
2071
1444
    def get_bzr_path(self):
2072
1445
        """Return the path of the 'bzr' executable for this test suite."""
2073
 
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
1446
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
2074
1447
        if not os.path.isfile(bzr_path):
2075
1448
            # We are probably installed. Assume sys.argv is the right file
2076
1449
            bzr_path = sys.argv[0]
2098
1471
        if retcode is not None and retcode != process.returncode:
2099
1472
            if process_args is None:
2100
1473
                process_args = "(unknown args)"
2101
 
            trace.mutter('Output of bzr %s:\n%s', process_args, out)
2102
 
            trace.mutter('Error for bzr %s:\n%s', process_args, err)
 
1474
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1475
            mutter('Error for bzr %s:\n%s', process_args, err)
2103
1476
            self.fail('Command bzr %s failed with retcode %s != %s'
2104
1477
                      % (process_args, retcode, process.returncode))
2105
1478
        return [out, err]
2113
1486
        shape = list(shape)             # copy
2114
1487
        for path, ie in inv.entries():
2115
1488
            name = path.replace('\\', '/')
2116
 
            if ie.kind == 'directory':
 
1489
            if ie.kind == 'dir':
2117
1490
                name = name + '/'
2118
1491
            if name in shape:
2119
1492
                shape.remove(name)
2156
1529
            sys.stderr = real_stderr
2157
1530
            sys.stdin = real_stdin
2158
1531
 
 
1532
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1533
    def merge(self, branch_from, wt_to):
 
1534
        """A helper for tests to do a ui-less merge.
 
1535
 
 
1536
        This should move to the main library when someone has time to integrate
 
1537
        it in.
 
1538
        """
 
1539
        # minimal ui-less merge.
 
1540
        wt_to.branch.fetch(branch_from)
 
1541
        base_rev = common_ancestor(branch_from.last_revision(),
 
1542
                                   wt_to.branch.last_revision(),
 
1543
                                   wt_to.branch.repository)
 
1544
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1545
                    wt_to.branch.repository.revision_tree(base_rev),
 
1546
                    this_tree=wt_to)
 
1547
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1548
 
2159
1549
    def reduceLockdirTimeout(self):
2160
1550
        """Reduce the default lock timeout for the duration of the test, so that
2161
1551
        if LockContention occurs during a test, it does so quickly.
2162
1552
 
2163
1553
        Tests that expect to provoke LockContention errors should call this.
2164
1554
        """
2165
 
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2166
 
 
2167
 
    def make_utf8_encoded_stringio(self, encoding_type=None):
2168
 
        """Return a StringIOWrapper instance, that will encode Unicode
2169
 
        input to UTF-8.
2170
 
        """
2171
 
        if encoding_type is None:
2172
 
            encoding_type = 'strict'
2173
 
        sio = StringIO()
2174
 
        output_encoding = 'utf-8'
2175
 
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
2176
 
        sio.encoding = output_encoding
2177
 
        return sio
2178
 
 
2179
 
    def disable_verb(self, verb):
2180
 
        """Disable a smart server verb for one test."""
2181
 
        from bzrlib.smart import request
2182
 
        request_handlers = request.request_handlers
2183
 
        orig_method = request_handlers.get(verb)
2184
 
        request_handlers.remove(verb)
2185
 
        self.addCleanup(request_handlers.register, verb, orig_method)
2186
 
 
2187
 
 
2188
 
class CapturedCall(object):
2189
 
    """A helper for capturing smart server calls for easy debug analysis."""
2190
 
 
2191
 
    def __init__(self, params, prefix_length):
2192
 
        """Capture the call with params and skip prefix_length stack frames."""
2193
 
        self.call = params
2194
 
        import traceback
2195
 
        # The last 5 frames are the __init__, the hook frame, and 3 smart
2196
 
        # client frames. Beyond this we could get more clever, but this is good
2197
 
        # enough for now.
2198
 
        stack = traceback.extract_stack()[prefix_length:-5]
2199
 
        self.stack = ''.join(traceback.format_list(stack))
2200
 
 
2201
 
    def __str__(self):
2202
 
        return self.call.method
2203
 
 
2204
 
    def __repr__(self):
2205
 
        return self.call.method
2206
 
 
2207
 
    def stack(self):
2208
 
        return self.stack
 
1555
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1556
        def resetTimeout():
 
1557
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1558
        self.addCleanup(resetTimeout)
 
1559
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1560
 
 
1561
BzrTestBase = TestCase
2209
1562
 
2210
1563
 
2211
1564
class TestCaseWithMemoryTransport(TestCase):
2222
1575
    file defaults for the transport in tests, nor does it obey the command line
2223
1576
    override, so tests that accidentally write to the common directory should
2224
1577
    be rare.
2225
 
 
2226
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
2227
 
    a .bzr directory that stops us ascending higher into the filesystem.
2228
1578
    """
2229
1579
 
2230
1580
    TEST_ROOT = None
2231
1581
    _TEST_NAME = 'test'
2232
1582
 
 
1583
 
2233
1584
    def __init__(self, methodName='runTest'):
2234
 
        # allow test parameterization after test construction and before test
2235
 
        # execution. Variables that the parameterizer sets need to be
 
1585
        # allow test parameterisation after test construction and before test
 
1586
        # execution. Variables that the parameteriser sets need to be 
2236
1587
        # ones that are not set by setUp, or setUp will trash them.
2237
1588
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
2238
1589
        self.vfs_transport_factory = default_transport
2240
1591
        self.transport_readonly_server = None
2241
1592
        self.__vfs_server = None
2242
1593
 
2243
 
    def get_transport(self, relpath=None):
2244
 
        """Return a writeable transport.
2245
 
 
2246
 
        This transport is for the test scratch space relative to
2247
 
        "self._test_root"
2248
 
 
2249
 
        :param relpath: a path relative to the base url.
2250
 
        """
2251
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
1594
    def get_transport(self):
 
1595
        """Return a writeable transport for the test scratch space"""
 
1596
        t = get_transport(self.get_url())
2252
1597
        self.assertFalse(t.is_readonly())
2253
1598
        return t
2254
1599
 
2255
 
    def get_readonly_transport(self, relpath=None):
 
1600
    def get_readonly_transport(self):
2256
1601
        """Return a readonly transport for the test scratch space
2257
 
 
 
1602
        
2258
1603
        This can be used to test that operations which should only need
2259
1604
        readonly access in fact do not try to write.
2260
 
 
2261
 
        :param relpath: a path relative to the base url.
2262
1605
        """
2263
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
1606
        t = get_transport(self.get_readonly_url())
2264
1607
        self.assertTrue(t.is_readonly())
2265
1608
        return t
2266
1609
 
2279
1622
        if self.__readonly_server is None:
2280
1623
            if self.transport_readonly_server is None:
2281
1624
                # readonly decorator requested
2282
 
                self.__readonly_server = test_server.ReadonlyServer()
 
1625
                # bring up the server
 
1626
                self.__readonly_server = ReadonlyServer()
 
1627
                self.__readonly_server.setUp(self.get_vfs_only_server())
2283
1628
            else:
2284
 
                # explicit readonly transport.
2285
1629
                self.__readonly_server = self.create_transport_readonly_server()
2286
 
            self.start_server(self.__readonly_server,
2287
 
                self.get_vfs_only_server())
 
1630
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1631
            self.addCleanup(self.__readonly_server.tearDown)
2288
1632
        return self.__readonly_server
2289
1633
 
2290
1634
    def get_readonly_url(self, relpath=None):
2291
1635
        """Get a URL for the readonly transport.
2292
1636
 
2293
 
        This will either be backed by '.' or a decorator to the transport
 
1637
        This will either be backed by '.' or a decorator to the transport 
2294
1638
        used by self.get_url()
2295
1639
        relpath provides for clients to get a path relative to the base url.
2296
1640
        These should only be downwards relative, not upwards.
2297
1641
        """
2298
1642
        base = self.get_readonly_server().get_url()
2299
 
        return self._adjust_url(base, relpath)
 
1643
        if relpath is not None:
 
1644
            if not base.endswith('/'):
 
1645
                base = base + '/'
 
1646
            base = base + relpath
 
1647
        return base
2300
1648
 
2301
1649
    def get_vfs_only_server(self):
2302
1650
        """Get the vfs only read/write server instance.
2308
1656
        is no means to override it.
2309
1657
        """
2310
1658
        if self.__vfs_server is None:
2311
 
            self.__vfs_server = memory.MemoryServer()
2312
 
            self.start_server(self.__vfs_server)
 
1659
            self.__vfs_server = MemoryServer()
 
1660
            self.__vfs_server.setUp()
 
1661
            self.addCleanup(self.__vfs_server.tearDown)
2313
1662
        return self.__vfs_server
2314
1663
 
2315
1664
    def get_server(self):
2322
1671
        then the self.get_vfs_server is returned.
2323
1672
        """
2324
1673
        if self.__server is None:
2325
 
            if (self.transport_server is None or self.transport_server is
2326
 
                self.vfs_transport_factory):
2327
 
                self.__server = self.get_vfs_only_server()
 
1674
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1675
                return self.get_vfs_only_server()
2328
1676
            else:
2329
1677
                # bring up a decorated means of access to the vfs only server.
2330
1678
                self.__server = self.transport_server()
2331
 
                self.start_server(self.__server, self.get_vfs_only_server())
 
1679
                try:
 
1680
                    self.__server.setUp(self.get_vfs_only_server())
 
1681
                except TypeError, e:
 
1682
                    # This should never happen; the try:Except here is to assist
 
1683
                    # developers having to update code rather than seeing an
 
1684
                    # uninformative TypeError.
 
1685
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1686
            self.addCleanup(self.__server.tearDown)
2332
1687
        return self.__server
2333
1688
 
2334
1689
    def _adjust_url(self, base, relpath):
2366
1721
    def get_vfs_only_url(self, relpath=None):
2367
1722
        """Get a URL (or maybe a path for the plain old vfs transport.
2368
1723
 
2369
 
        This will never be a smart protocol.  It always has all the
2370
 
        capabilities of the local filesystem, but it might actually be a
2371
 
        MemoryTransport or some other similar virtual filesystem.
2372
 
 
2373
 
        This is the backing transport (if any) of the server returned by
2374
 
        get_url and get_readonly_url.
2375
 
 
 
1724
        This will never be a smart protocol.
2376
1725
        :param relpath: provides for clients to get a path relative to the base
2377
1726
            url.  These should only be downwards relative, not upwards.
2378
 
        :return: A URL
2379
1727
        """
2380
1728
        base = self.get_vfs_only_server().get_url()
2381
1729
        return self._adjust_url(base, relpath)
2382
1730
 
2383
 
    def _create_safety_net(self):
2384
 
        """Make a fake bzr directory.
2385
 
 
2386
 
        This prevents any tests propagating up onto the TEST_ROOT directory's
2387
 
        real branch.
2388
 
        """
2389
 
        root = TestCaseWithMemoryTransport.TEST_ROOT
2390
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
2391
 
 
2392
 
    def _check_safety_net(self):
2393
 
        """Check that the safety .bzr directory have not been touched.
2394
 
 
2395
 
        _make_test_root have created a .bzr directory to prevent tests from
2396
 
        propagating. This method ensures than a test did not leaked.
2397
 
        """
2398
 
        root = TestCaseWithMemoryTransport.TEST_ROOT
2399
 
        self.permit_url(_mod_transport.get_transport(root).base)
2400
 
        wt = workingtree.WorkingTree.open(root)
2401
 
        last_rev = wt.last_revision()
2402
 
        if last_rev != 'null:':
2403
 
            # The current test have modified the /bzr directory, we need to
2404
 
            # recreate a new one or all the followng tests will fail.
2405
 
            # If you need to inspect its content uncomment the following line
2406
 
            # import pdb; pdb.set_trace()
2407
 
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2408
 
            self._create_safety_net()
2409
 
            raise AssertionError('%s/.bzr should not be modified' % root)
2410
 
 
2411
1731
    def _make_test_root(self):
2412
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
2413
 
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2414
 
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2415
 
                                                    suffix='.tmp'))
2416
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
2417
 
 
2418
 
            self._create_safety_net()
2419
 
 
2420
 
            # The same directory is used by all tests, and we're not
2421
 
            # specifically told when all tests are finished.  This will do.
2422
 
            atexit.register(_rmtree_temp_dir, root)
2423
 
 
2424
 
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2425
 
        self.addCleanup(self._check_safety_net)
 
1732
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1733
            return
 
1734
        i = 0
 
1735
        while True:
 
1736
            root = u'test%04d.tmp' % i
 
1737
            try:
 
1738
                os.mkdir(root)
 
1739
            except OSError, e:
 
1740
                if e.errno == errno.EEXIST:
 
1741
                    i += 1
 
1742
                    continue
 
1743
                else:
 
1744
                    raise
 
1745
            # successfully created
 
1746
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1747
            break
 
1748
        # make a fake bzr directory there to prevent any tests propagating
 
1749
        # up onto the source directory's real branch
 
1750
        bzrdir.BzrDir.create_standalone_workingtree(
 
1751
            TestCaseWithMemoryTransport.TEST_ROOT)
2426
1752
 
2427
1753
    def makeAndChdirToTestDir(self):
2428
1754
        """Create a temporary directories for this one test.
2429
 
 
 
1755
        
2430
1756
        This must set self.test_home_dir and self.test_dir and chdir to
2431
1757
        self.test_dir.
2432
 
 
 
1758
        
2433
1759
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2434
1760
        """
2435
1761
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2436
1762
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2437
1763
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2438
 
        self.permit_dir(self.test_dir)
2439
 
 
 
1764
        
2440
1765
    def make_branch(self, relpath, format=None):
2441
1766
        """Create a branch on the transport at relpath."""
2442
1767
        repo = self.make_repository(relpath, format=format)
2447
1772
            # might be a relative or absolute path
2448
1773
            maybe_a_url = self.get_url(relpath)
2449
1774
            segments = maybe_a_url.rsplit('/', 1)
2450
 
            t = _mod_transport.get_transport(maybe_a_url)
 
1775
            t = get_transport(maybe_a_url)
2451
1776
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2452
 
                t.ensure_base()
 
1777
                try:
 
1778
                    t.mkdir('.')
 
1779
                except errors.FileExists:
 
1780
                    pass
2453
1781
            if format is None:
2454
1782
                format = 'default'
2455
1783
            if isinstance(format, basestring):
2459
1787
            raise TestSkipped("Format %s is not initializable." % format)
2460
1788
 
2461
1789
    def make_repository(self, relpath, shared=False, format=None):
2462
 
        """Create a repository on our default transport at relpath.
2463
 
 
2464
 
        Note that relpath must be a relative path, not a full url.
2465
 
        """
2466
 
        # FIXME: If you create a remoterepository this returns the underlying
2467
 
        # real format, which is incorrect.  Actually we should make sure that
2468
 
        # RemoteBzrDir returns a RemoteRepository.
2469
 
        # maybe  mbp 20070410
 
1790
        """Create a repository on our default transport at relpath."""
2470
1791
        made_control = self.make_bzrdir(relpath, format=format)
2471
1792
        return made_control.create_repository(shared=shared)
2472
1793
 
2473
 
    def make_smart_server(self, path, backing_server=None):
2474
 
        if backing_server is None:
2475
 
            backing_server = self.get_server()
2476
 
        smart_server = test_server.SmartTCPServer_for_testing()
2477
 
        self.start_server(smart_server, backing_server)
2478
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2479
 
                                                   ).clone(path)
2480
 
        return remote_transport
2481
 
 
2482
1794
    def make_branch_and_memory_tree(self, relpath, format=None):
2483
1795
        """Create a branch on the default transport and a MemoryTree for it."""
2484
1796
        b = self.make_branch(relpath, format=format)
2485
1797
        return memorytree.MemoryTree.create_on_branch(b)
2486
1798
 
2487
 
    def make_branch_builder(self, relpath, format=None):
2488
 
        branch = self.make_branch(relpath, format=format)
2489
 
        return branchbuilder.BranchBuilder(branch=branch)
2490
 
 
2491
1799
    def overrideEnvironmentForTesting(self):
2492
 
        test_home_dir = self.test_home_dir
2493
 
        if isinstance(test_home_dir, unicode):
2494
 
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2495
 
        self.overrideEnv('HOME', test_home_dir)
2496
 
        self.overrideEnv('BZR_HOME', test_home_dir)
2497
 
 
 
1800
        os.environ['HOME'] = self.test_home_dir
 
1801
        os.environ['BZR_HOME'] = self.test_home_dir
 
1802
        
2498
1803
    def setUp(self):
2499
1804
        super(TestCaseWithMemoryTransport, self).setUp()
2500
 
        # Ensure that ConnectedTransport doesn't leak sockets
2501
 
        def get_transport_with_cleanup(*args, **kwargs):
2502
 
            t = orig_get_transport(*args, **kwargs)
2503
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2504
 
                self.addCleanup(t.disconnect)
2505
 
            return t
2506
 
 
2507
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2508
 
                                               get_transport_with_cleanup)
2509
1805
        self._make_test_root()
2510
 
        self.addCleanup(os.chdir, os.getcwdu())
 
1806
        _currentdir = os.getcwdu()
 
1807
        def _leaveDirectory():
 
1808
            os.chdir(_currentdir)
 
1809
        self.addCleanup(_leaveDirectory)
2511
1810
        self.makeAndChdirToTestDir()
2512
1811
        self.overrideEnvironmentForTesting()
2513
1812
        self.__readonly_server = None
2514
1813
        self.__server = None
2515
1814
        self.reduceLockdirTimeout()
2516
1815
 
2517
 
    def setup_smart_server_with_call_log(self):
2518
 
        """Sets up a smart server as the transport server with a call log."""
2519
 
        self.transport_server = test_server.SmartTCPServer_for_testing
2520
 
        self.hpss_calls = []
2521
 
        import traceback
2522
 
        # Skip the current stack down to the caller of
2523
 
        # setup_smart_server_with_call_log
2524
 
        prefix_length = len(traceback.extract_stack()) - 2
2525
 
        def capture_hpss_call(params):
2526
 
            self.hpss_calls.append(
2527
 
                CapturedCall(params, prefix_length))
2528
 
        client._SmartClient.hooks.install_named_hook(
2529
 
            'call', capture_hpss_call, None)
2530
 
 
2531
 
    def reset_smart_call_log(self):
2532
 
        self.hpss_calls = []
2533
 
 
2534
 
 
 
1816
     
2535
1817
class TestCaseInTempDir(TestCaseWithMemoryTransport):
2536
1818
    """Derived class that runs a test within a temporary directory.
2537
1819
 
2542
1824
    All test cases create their own directory within that.  If the
2543
1825
    tests complete successfully, the directory is removed.
2544
1826
 
2545
 
    :ivar test_base_dir: The path of the top-level directory for this
2546
 
    test, which contains a home directory and a work directory.
2547
 
 
2548
 
    :ivar test_home_dir: An initially empty directory under test_base_dir
2549
 
    which is used as $HOME for this test.
2550
 
 
2551
 
    :ivar test_dir: A directory under test_base_dir used as the current
2552
 
    directory when the test proper is run.
 
1827
    InTempDir is an old alias for FunctionalTestCase.
2553
1828
    """
2554
1829
 
2555
1830
    OVERRIDE_PYTHON = 'python'
 
1831
    use_numbered_dirs = False
2556
1832
 
2557
1833
    def check_file_contents(self, filename, expect):
2558
1834
        self.log("check contents of file %s" % filename)
2559
 
        f = file(filename)
2560
 
        try:
2561
 
            contents = f.read()
2562
 
        finally:
2563
 
            f.close()
 
1835
        contents = file(filename, 'r').read()
2564
1836
        if contents != expect:
2565
1837
            self.log("expected: %r" % expect)
2566
1838
            self.log("actually: %r" % contents)
2567
1839
            self.fail("contents of %s not as expected" % filename)
2568
1840
 
2569
 
    def _getTestDirPrefix(self):
2570
 
        # create a directory within the top level test directory
2571
 
        if sys.platform in ('win32', 'cygwin'):
2572
 
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2573
 
            # windows is likely to have path-length limits so use a short name
2574
 
            name_prefix = name_prefix[-30:]
2575
 
        else:
2576
 
            name_prefix = re.sub('[/]', '_', self.id())
2577
 
        return name_prefix
2578
 
 
2579
1841
    def makeAndChdirToTestDir(self):
2580
1842
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2581
 
 
 
1843
        
2582
1844
        For TestCaseInTempDir we create a temporary directory based on the test
2583
1845
        name and then create two subdirs - test and home under it.
2584
1846
        """
2585
 
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2586
 
            self._getTestDirPrefix())
2587
 
        name = name_prefix
2588
 
        for i in range(100):
2589
 
            if os.path.exists(name):
2590
 
                name = name_prefix + '_' + str(i)
2591
 
            else:
2592
 
                # now create test and home directories within this dir
2593
 
                self.test_base_dir = name
2594
 
                self.addCleanup(self.deleteTestDir)
2595
 
                os.mkdir(self.test_base_dir)
2596
 
                break
2597
 
        self.permit_dir(self.test_base_dir)
2598
 
        # 'sprouting' and 'init' of a branch both walk up the tree to find
2599
 
        # stacking policy to honour; create a bzr dir with an unshared
2600
 
        # repository (but not a branch - our code would be trying to escape
2601
 
        # then!) to stop them, and permit it to be read.
2602
 
        # control = bzrdir.BzrDir.create(self.test_base_dir)
2603
 
        # control.create_repository()
2604
 
        self.test_home_dir = self.test_base_dir + '/home'
2605
 
        os.mkdir(self.test_home_dir)
2606
 
        self.test_dir = self.test_base_dir + '/work'
2607
 
        os.mkdir(self.test_dir)
2608
 
        os.chdir(self.test_dir)
2609
 
        # put name of test inside
2610
 
        f = file(self.test_base_dir + '/name', 'w')
2611
 
        try:
 
1847
        if self.use_numbered_dirs:  # strongly recommended on Windows
 
1848
                                    # due the path length limitation (260 ch.)
 
1849
            candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
 
1850
                                             int(self.number/1000),
 
1851
                                             self.number)
 
1852
            os.makedirs(candidate_dir)
 
1853
            self.test_home_dir = candidate_dir + '/home'
 
1854
            os.mkdir(self.test_home_dir)
 
1855
            self.test_dir = candidate_dir + '/work'
 
1856
            os.mkdir(self.test_dir)
 
1857
            os.chdir(self.test_dir)
 
1858
            # put name of test inside
 
1859
            f = file(candidate_dir + '/name', 'w')
2612
1860
            f.write(self.id())
2613
 
        finally:
2614
1861
            f.close()
2615
 
 
2616
 
    def deleteTestDir(self):
2617
 
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2618
 
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
1862
            return
 
1863
        # Else NAMED DIRS
 
1864
        # shorten the name, to avoid test failures due to path length
 
1865
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1866
                   .replace('__main__.', '')[-100:]
 
1867
        # it's possible the same test class is run several times for
 
1868
        # parameterized tests, so make sure the names don't collide.  
 
1869
        i = 0
 
1870
        while True:
 
1871
            if i > 0:
 
1872
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1873
            else:
 
1874
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1875
            if os.path.exists(candidate_dir):
 
1876
                i = i + 1
 
1877
                continue
 
1878
            else:
 
1879
                os.mkdir(candidate_dir)
 
1880
                self.test_home_dir = candidate_dir + '/home'
 
1881
                os.mkdir(self.test_home_dir)
 
1882
                self.test_dir = candidate_dir + '/work'
 
1883
                os.mkdir(self.test_dir)
 
1884
                os.chdir(self.test_dir)
 
1885
                break
2619
1886
 
2620
1887
    def build_tree(self, shape, line_endings='binary', transport=None):
2621
1888
        """Build a test tree according to a pattern.
2626
1893
        This assumes that all the elements in the tree being built are new.
2627
1894
 
2628
1895
        This doesn't add anything to a branch.
2629
 
 
2630
 
        :type shape:    list or tuple.
2631
1896
        :param line_endings: Either 'binary' or 'native'
2632
 
            in binary mode, exact contents are written in native mode, the
2633
 
            line endings match the default platform endings.
2634
 
        :param transport: A transport to write to, for building trees on VFS's.
2635
 
            If the transport is readonly or None, "." is opened automatically.
2636
 
        :return: None
 
1897
                             in binary mode, exact contents are written
 
1898
                             in native mode, the line endings match the
 
1899
                             default platform endings.
 
1900
 
 
1901
        :param transport: A transport to write to, for building trees on 
 
1902
                          VFS's. If the transport is readonly or None,
 
1903
                          "." is opened automatically.
2637
1904
        """
2638
 
        if type(shape) not in (list, tuple):
2639
 
            raise AssertionError("Parameter 'shape' should be "
2640
 
                "a list or a tuple. Got %r instead" % (shape,))
2641
1905
        # It's OK to just create them using forward slashes on windows.
2642
1906
        if transport is None or transport.is_readonly():
2643
 
            transport = _mod_transport.get_transport(".")
 
1907
            transport = get_transport(".")
2644
1908
        for name in shape:
2645
 
            self.assertIsInstance(name, basestring)
 
1909
            self.assert_(isinstance(name, basestring))
2646
1910
            if name[-1] == '/':
2647
1911
                transport.mkdir(urlutils.escape(name[:-1]))
2648
1912
            else:
2656
1920
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2657
1921
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2658
1922
 
2659
 
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
2660
 
 
2661
 
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2662
 
        """Assert whether path or paths are in the WorkingTree"""
2663
 
        if tree is None:
2664
 
            tree = workingtree.WorkingTree.open(root_path)
2665
 
        if not isinstance(path, basestring):
2666
 
            for p in path:
2667
 
                self.assertInWorkingTree(p, tree=tree)
2668
 
        else:
2669
 
            self.assertIsNot(tree.path2id(path), None,
2670
 
                path+' not in working tree.')
2671
 
 
2672
 
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2673
 
        """Assert whether path or paths are not in the WorkingTree"""
2674
 
        if tree is None:
2675
 
            tree = workingtree.WorkingTree.open(root_path)
2676
 
        if not isinstance(path, basestring):
2677
 
            for p in path:
2678
 
                self.assertNotInWorkingTree(p,tree=tree)
2679
 
        else:
2680
 
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
1923
    def build_tree_contents(self, shape):
 
1924
        build_tree_contents(shape)
 
1925
 
 
1926
    def assertFileEqual(self, content, path):
 
1927
        """Fail if path does not contain 'content'."""
 
1928
        self.failUnlessExists(path)
 
1929
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1930
        f = file(path, 'r')
 
1931
        try:
 
1932
            s = f.read()
 
1933
        finally:
 
1934
            f.close()
 
1935
        self.assertEqualDiff(content, s)
 
1936
 
 
1937
    def failUnlessExists(self, path):
 
1938
        """Fail unless path, which may be abs or relative, exists."""
 
1939
        self.failUnless(osutils.lexists(path),path+" does not exist")
 
1940
 
 
1941
    def failIfExists(self, path):
 
1942
        """Fail if path, which may be abs or relative, exists."""
 
1943
        self.failIf(osutils.lexists(path),path+" exists")
2681
1944
 
2682
1945
 
2683
1946
class TestCaseWithTransport(TestCaseInTempDir):
2690
1953
    ReadonlyTransportDecorator is used instead which allows the use of non disk
2691
1954
    based read write transports.
2692
1955
 
2693
 
    If an explicit class is provided for readonly access, that server and the
 
1956
    If an explicit class is provided for readonly access, that server and the 
2694
1957
    readwrite one must both define get_url() as resolving to os.getcwd().
2695
1958
    """
2696
1959
 
2702
1965
        """
2703
1966
        if self.__vfs_server is None:
2704
1967
            self.__vfs_server = self.vfs_transport_factory()
2705
 
            self.start_server(self.__vfs_server)
 
1968
            self.__vfs_server.setUp()
 
1969
            self.addCleanup(self.__vfs_server.tearDown)
2706
1970
        return self.__vfs_server
2707
1971
 
2708
1972
    def make_branch_and_tree(self, relpath, format=None):
2715
1979
        repository will also be accessed locally. Otherwise a lightweight
2716
1980
        checkout is created and returned.
2717
1981
 
2718
 
        We do this because we can't physically create a tree in the local
2719
 
        path, with a branch reference to the transport_factory url, and
2720
 
        a branch + repository in the vfs_transport, unless the vfs_transport
2721
 
        namespace is distinct from the local disk - the two branch objects
2722
 
        would collide. While we could construct a tree with its branch object
2723
 
        pointing at the transport_factory transport in memory, reopening it
2724
 
        would behaving unexpectedly, and has in the past caused testing bugs
2725
 
        when we tried to do it that way.
2726
 
 
2727
1982
        :param format: The BzrDirFormat.
2728
1983
        :returns: the WorkingTree.
2729
1984
        """
2738
1993
            # We can only make working trees locally at the moment.  If the
2739
1994
            # transport can't support them, then we keep the non-disk-backed
2740
1995
            # branch and create a local checkout.
2741
 
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
1996
            if self.vfs_transport_factory is LocalURLServer:
2742
1997
                # the branch is colocated on disk, we cannot create a checkout.
2743
1998
                # hopefully callers will expect this.
2744
1999
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2745
 
                wt = local_controldir.create_workingtree()
2746
 
                if wt.branch._format != b._format:
2747
 
                    wt._branch = b
2748
 
                    # Make sure that assigning to wt._branch fixes wt.branch,
2749
 
                    # in case the implementation details of workingtree objects
2750
 
                    # change.
2751
 
                    self.assertIs(b, wt.branch)
2752
 
                return wt
 
2000
                return local_controldir.create_workingtree()
2753
2001
            else:
2754
2002
                return b.create_checkout(relpath, lightweight=True)
2755
2003
 
2781
2029
        super(TestCaseWithTransport, self).setUp()
2782
2030
        self.__vfs_server = None
2783
2031
 
2784
 
    def disable_missing_extensions_warning(self):
2785
 
        """Some tests expect a precise stderr content.
2786
 
 
2787
 
        There is no point in forcing them to duplicate the extension related
2788
 
        warning.
2789
 
        """
2790
 
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
2791
 
 
2792
2032
 
2793
2033
class ChrootedTestCase(TestCaseWithTransport):
2794
2034
    """A support class that provides readonly urls outside the local namespace.
2798
2038
    for readonly urls.
2799
2039
 
2800
2040
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2801
 
                       be used without needed to redo it when a different
 
2041
                       be used without needed to redo it when a different 
2802
2042
                       subclass is in use ?
2803
2043
    """
2804
2044
 
2805
2045
    def setUp(self):
2806
 
        from bzrlib.tests import http_server
2807
2046
        super(ChrootedTestCase, self).setUp()
2808
 
        if not self.vfs_transport_factory == memory.MemoryServer:
2809
 
            self.transport_readonly_server = http_server.HttpServer
2810
 
 
2811
 
 
2812
 
def condition_id_re(pattern):
2813
 
    """Create a condition filter which performs a re check on a test's id.
2814
 
 
2815
 
    :param pattern: A regular expression string.
2816
 
    :return: A callable that returns True if the re matches.
2817
 
    """
2818
 
    filter_re = re.compile(pattern, 0)
2819
 
    def condition(test):
2820
 
        test_id = test.id()
2821
 
        return filter_re.search(test_id)
2822
 
    return condition
2823
 
 
2824
 
 
2825
 
def condition_isinstance(klass_or_klass_list):
2826
 
    """Create a condition filter which returns isinstance(param, klass).
2827
 
 
2828
 
    :return: A callable which when called with one parameter obj return the
2829
 
        result of isinstance(obj, klass_or_klass_list).
2830
 
    """
2831
 
    def condition(obj):
2832
 
        return isinstance(obj, klass_or_klass_list)
2833
 
    return condition
2834
 
 
2835
 
 
2836
 
def condition_id_in_list(id_list):
2837
 
    """Create a condition filter which verify that test's id in a list.
2838
 
 
2839
 
    :param id_list: A TestIdList object.
2840
 
    :return: A callable that returns True if the test's id appears in the list.
2841
 
    """
2842
 
    def condition(test):
2843
 
        return id_list.includes(test.id())
2844
 
    return condition
2845
 
 
2846
 
 
2847
 
def condition_id_startswith(starts):
2848
 
    """Create a condition filter verifying that test's id starts with a string.
2849
 
 
2850
 
    :param starts: A list of string.
2851
 
    :return: A callable that returns True if the test's id starts with one of
2852
 
        the given strings.
2853
 
    """
2854
 
    def condition(test):
2855
 
        for start in starts:
2856
 
            if test.id().startswith(start):
2857
 
                return True
2858
 
        return False
2859
 
    return condition
2860
 
 
2861
 
 
2862
 
def exclude_tests_by_condition(suite, condition):
2863
 
    """Create a test suite which excludes some tests from suite.
2864
 
 
2865
 
    :param suite: The suite to get tests from.
2866
 
    :param condition: A callable whose result evaluates True when called with a
2867
 
        test case which should be excluded from the result.
2868
 
    :return: A suite which contains the tests found in suite that fail
2869
 
        condition.
2870
 
    """
2871
 
    result = []
2872
 
    for test in iter_suite_tests(suite):
2873
 
        if not condition(test):
2874
 
            result.append(test)
2875
 
    return TestUtil.TestSuite(result)
2876
 
 
2877
 
 
2878
 
def filter_suite_by_condition(suite, condition):
2879
 
    """Create a test suite by filtering another one.
2880
 
 
2881
 
    :param suite: The source suite.
2882
 
    :param condition: A callable whose result evaluates True when called with a
2883
 
        test case which should be included in the result.
2884
 
    :return: A suite which contains the tests found in suite that pass
2885
 
        condition.
2886
 
    """
2887
 
    result = []
2888
 
    for test in iter_suite_tests(suite):
2889
 
        if condition(test):
2890
 
            result.append(test)
2891
 
    return TestUtil.TestSuite(result)
 
2047
        if not self.vfs_transport_factory == MemoryServer:
 
2048
            self.transport_readonly_server = HttpServer
2892
2049
 
2893
2050
 
2894
2051
def filter_suite_by_re(suite, pattern):
2895
 
    """Create a test suite by filtering another one.
2896
 
 
2897
 
    :param suite:           the source suite
2898
 
    :param pattern:         pattern that names must match
2899
 
    :returns: the newly created suite
2900
 
    """
2901
 
    condition = condition_id_re(pattern)
2902
 
    result_suite = filter_suite_by_condition(suite, condition)
2903
 
    return result_suite
2904
 
 
2905
 
 
2906
 
def filter_suite_by_id_list(suite, test_id_list):
2907
 
    """Create a test suite by filtering another one.
2908
 
 
2909
 
    :param suite: The source suite.
2910
 
    :param test_id_list: A list of the test ids to keep as strings.
2911
 
    :returns: the newly created suite
2912
 
    """
2913
 
    condition = condition_id_in_list(test_id_list)
2914
 
    result_suite = filter_suite_by_condition(suite, condition)
2915
 
    return result_suite
2916
 
 
2917
 
 
2918
 
def filter_suite_by_id_startswith(suite, start):
2919
 
    """Create a test suite by filtering another one.
2920
 
 
2921
 
    :param suite: The source suite.
2922
 
    :param start: A list of string the test id must start with one of.
2923
 
    :returns: the newly created suite
2924
 
    """
2925
 
    condition = condition_id_startswith(start)
2926
 
    result_suite = filter_suite_by_condition(suite, condition)
2927
 
    return result_suite
2928
 
 
2929
 
 
2930
 
def exclude_tests_by_re(suite, pattern):
2931
 
    """Create a test suite which excludes some tests from suite.
2932
 
 
2933
 
    :param suite: The suite to get tests from.
2934
 
    :param pattern: A regular expression string. Test ids that match this
2935
 
        pattern will be excluded from the result.
2936
 
    :return: A TestSuite that contains all the tests from suite without the
2937
 
        tests that matched pattern. The order of tests is the same as it was in
2938
 
        suite.
2939
 
    """
2940
 
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
2941
 
 
2942
 
 
2943
 
def preserve_input(something):
2944
 
    """A helper for performing test suite transformation chains.
2945
 
 
2946
 
    :param something: Anything you want to preserve.
2947
 
    :return: Something.
2948
 
    """
2949
 
    return something
2950
 
 
2951
 
 
2952
 
def randomize_suite(suite):
2953
 
    """Return a new TestSuite with suite's tests in random order.
2954
 
 
2955
 
    The tests in the input suite are flattened into a single suite in order to
2956
 
    accomplish this. Any nested TestSuites are removed to provide global
2957
 
    randomness.
2958
 
    """
2959
 
    tests = list(iter_suite_tests(suite))
2960
 
    random.shuffle(tests)
2961
 
    return TestUtil.TestSuite(tests)
2962
 
 
2963
 
 
2964
 
def split_suite_by_condition(suite, condition):
2965
 
    """Split a test suite into two by a condition.
2966
 
 
2967
 
    :param suite: The suite to split.
2968
 
    :param condition: The condition to match on. Tests that match this
2969
 
        condition are returned in the first test suite, ones that do not match
2970
 
        are in the second suite.
2971
 
    :return: A tuple of two test suites, where the first contains tests from
2972
 
        suite matching the condition, and the second contains the remainder
2973
 
        from suite. The order within each output suite is the same as it was in
2974
 
        suite.
2975
 
    """
2976
 
    matched = []
2977
 
    did_not_match = []
2978
 
    for test in iter_suite_tests(suite):
2979
 
        if condition(test):
2980
 
            matched.append(test)
 
2052
    result = TestUtil.TestSuite()
 
2053
    filter_re = re.compile(pattern)
 
2054
    for test in iter_suite_tests(suite):
 
2055
        if filter_re.search(test.id()):
 
2056
            result.addTest(test)
 
2057
    return result
 
2058
 
 
2059
 
 
2060
def sort_suite_by_re(suite, pattern):
 
2061
    first = []
 
2062
    second = []
 
2063
    filter_re = re.compile(pattern)
 
2064
    for test in iter_suite_tests(suite):
 
2065
        if filter_re.search(test.id()):
 
2066
            first.append(test)
2981
2067
        else:
2982
 
            did_not_match.append(test)
2983
 
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2984
 
 
2985
 
 
2986
 
def split_suite_by_re(suite, pattern):
2987
 
    """Split a test suite into two by a regular expression.
2988
 
 
2989
 
    :param suite: The suite to split.
2990
 
    :param pattern: A regular expression string. Test ids that match this
2991
 
        pattern will be in the first test suite returned, and the others in the
2992
 
        second test suite returned.
2993
 
    :return: A tuple of two test suites, where the first contains tests from
2994
 
        suite matching pattern, and the second contains the remainder from
2995
 
        suite. The order within each output suite is the same as it was in
2996
 
        suite.
2997
 
    """
2998
 
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2068
            second.append(test)
 
2069
    return TestUtil.TestSuite(first + second)
2999
2070
 
3000
2071
 
3001
2072
def run_suite(suite, name='test', verbose=False, pattern=".*",
3002
 
              stop_on_failure=False,
 
2073
              stop_on_failure=False, keep_output=False,
3003
2074
              transport=None, lsprof_timed=None, bench_history=None,
3004
2075
              matching_tests_first=None,
3005
 
              list_only=False,
3006
 
              random_seed=None,
3007
 
              exclude_pattern=None,
3008
 
              strict=False,
3009
 
              runner_class=None,
3010
 
              suite_decorators=None,
3011
 
              stream=None,
3012
 
              result_decorators=None,
3013
 
              ):
3014
 
    """Run a test suite for bzr selftest.
 
2076
              numbered_dirs=None):
 
2077
    use_numbered_dirs = bool(numbered_dirs)
3015
2078
 
3016
 
    :param runner_class: The class of runner to use. Must support the
3017
 
        constructor arguments passed by run_suite which are more than standard
3018
 
        python uses.
3019
 
    :return: A boolean indicating success.
3020
 
    """
3021
2079
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2080
    if numbered_dirs is not None:
 
2081
        TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
3022
2082
    if verbose:
3023
2083
        verbosity = 2
3024
2084
    else:
3025
2085
        verbosity = 1
3026
 
    if runner_class is None:
3027
 
        runner_class = TextTestRunner
3028
 
    if stream is None:
3029
 
        stream = sys.stdout
3030
 
    runner = runner_class(stream=stream,
 
2086
    runner = TextTestRunner(stream=sys.stdout,
3031
2087
                            descriptions=0,
3032
2088
                            verbosity=verbosity,
 
2089
                            keep_output=keep_output,
3033
2090
                            bench_history=bench_history,
3034
 
                            strict=strict,
3035
 
                            result_decorators=result_decorators,
 
2091
                            use_numbered_dirs=use_numbered_dirs,
3036
2092
                            )
3037
2093
    runner.stop_on_failure=stop_on_failure
3038
 
    # built in decorator factories:
3039
 
    decorators = [
3040
 
        random_order(random_seed, runner),
3041
 
        exclude_tests(exclude_pattern),
3042
 
        ]
3043
 
    if matching_tests_first:
3044
 
        decorators.append(tests_first(pattern))
3045
 
    else:
3046
 
        decorators.append(filter_tests(pattern))
3047
 
    if suite_decorators:
3048
 
        decorators.extend(suite_decorators)
3049
 
    # tell the result object how many tests will be running: (except if
3050
 
    # --parallel=fork is being used. Robert said he will provide a better
3051
 
    # progress design later -- vila 20090817)
3052
 
    if fork_decorator not in decorators:
3053
 
        decorators.append(CountingDecorator)
3054
 
    for decorator in decorators:
3055
 
        suite = decorator(suite)
3056
 
    if list_only:
3057
 
        # Done after test suite decoration to allow randomisation etc
3058
 
        # to take effect, though that is of marginal benefit.
3059
 
        if verbosity >= 2:
3060
 
            stream.write("Listing tests only ...\n")
3061
 
        for t in iter_suite_tests(suite):
3062
 
            stream.write("%s\n" % (t.id()))
3063
 
        return True
 
2094
    if pattern != '.*':
 
2095
        if matching_tests_first:
 
2096
            suite = sort_suite_by_re(suite, pattern)
 
2097
        else:
 
2098
            suite = filter_suite_by_re(suite, pattern)
3064
2099
    result = runner.run(suite)
3065
 
    if strict:
3066
 
        return result.wasStrictlySuccessful()
3067
 
    else:
3068
 
        return result.wasSuccessful()
3069
 
 
3070
 
 
3071
 
# A registry where get() returns a suite decorator.
3072
 
parallel_registry = registry.Registry()
3073
 
 
3074
 
 
3075
 
def fork_decorator(suite):
3076
 
    if getattr(os, "fork", None) is None:
3077
 
        raise errors.BzrCommandError("platform does not support fork,"
3078
 
            " try --parallel=subprocess instead.")
3079
 
    concurrency = osutils.local_concurrency()
3080
 
    if concurrency == 1:
3081
 
        return suite
3082
 
    from testtools import ConcurrentTestSuite
3083
 
    return ConcurrentTestSuite(suite, fork_for_tests)
3084
 
parallel_registry.register('fork', fork_decorator)
3085
 
 
3086
 
 
3087
 
def subprocess_decorator(suite):
3088
 
    concurrency = osutils.local_concurrency()
3089
 
    if concurrency == 1:
3090
 
        return suite
3091
 
    from testtools import ConcurrentTestSuite
3092
 
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
3093
 
parallel_registry.register('subprocess', subprocess_decorator)
3094
 
 
3095
 
 
3096
 
def exclude_tests(exclude_pattern):
3097
 
    """Return a test suite decorator that excludes tests."""
3098
 
    if exclude_pattern is None:
3099
 
        return identity_decorator
3100
 
    def decorator(suite):
3101
 
        return ExcludeDecorator(suite, exclude_pattern)
3102
 
    return decorator
3103
 
 
3104
 
 
3105
 
def filter_tests(pattern):
3106
 
    if pattern == '.*':
3107
 
        return identity_decorator
3108
 
    def decorator(suite):
3109
 
        return FilterTestsDecorator(suite, pattern)
3110
 
    return decorator
3111
 
 
3112
 
 
3113
 
def random_order(random_seed, runner):
3114
 
    """Return a test suite decorator factory for randomising tests order.
3115
 
    
3116
 
    :param random_seed: now, a string which casts to a long, or a long.
3117
 
    :param runner: A test runner with a stream attribute to report on.
3118
 
    """
3119
 
    if random_seed is None:
3120
 
        return identity_decorator
3121
 
    def decorator(suite):
3122
 
        return RandomDecorator(suite, random_seed, runner.stream)
3123
 
    return decorator
3124
 
 
3125
 
 
3126
 
def tests_first(pattern):
3127
 
    if pattern == '.*':
3128
 
        return identity_decorator
3129
 
    def decorator(suite):
3130
 
        return TestFirstDecorator(suite, pattern)
3131
 
    return decorator
3132
 
 
3133
 
 
3134
 
def identity_decorator(suite):
3135
 
    """Return suite."""
3136
 
    return suite
3137
 
 
3138
 
 
3139
 
class TestDecorator(TestUtil.TestSuite):
3140
 
    """A decorator for TestCase/TestSuite objects.
3141
 
    
3142
 
    Usually, subclasses should override __iter__(used when flattening test
3143
 
    suites), which we do to filter, reorder, parallelise and so on, run() and
3144
 
    debug().
3145
 
    """
3146
 
 
3147
 
    def __init__(self, suite):
3148
 
        TestUtil.TestSuite.__init__(self)
3149
 
        self.addTest(suite)
3150
 
 
3151
 
    def countTestCases(self):
3152
 
        cases = 0
3153
 
        for test in self:
3154
 
            cases += test.countTestCases()
3155
 
        return cases
3156
 
 
3157
 
    def debug(self):
3158
 
        for test in self:
3159
 
            test.debug()
3160
 
 
3161
 
    def run(self, result):
3162
 
        # Use iteration on self, not self._tests, to allow subclasses to hook
3163
 
        # into __iter__.
3164
 
        for test in self:
3165
 
            if result.shouldStop:
3166
 
                break
3167
 
            test.run(result)
3168
 
        return result
3169
 
 
3170
 
 
3171
 
class CountingDecorator(TestDecorator):
3172
 
    """A decorator which calls result.progress(self.countTestCases)."""
3173
 
 
3174
 
    def run(self, result):
3175
 
        progress_method = getattr(result, 'progress', None)
3176
 
        if callable(progress_method):
3177
 
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3178
 
        return super(CountingDecorator, self).run(result)
3179
 
 
3180
 
 
3181
 
class ExcludeDecorator(TestDecorator):
3182
 
    """A decorator which excludes test matching an exclude pattern."""
3183
 
 
3184
 
    def __init__(self, suite, exclude_pattern):
3185
 
        TestDecorator.__init__(self, suite)
3186
 
        self.exclude_pattern = exclude_pattern
3187
 
        self.excluded = False
3188
 
 
3189
 
    def __iter__(self):
3190
 
        if self.excluded:
3191
 
            return iter(self._tests)
3192
 
        self.excluded = True
3193
 
        suite = exclude_tests_by_re(self, self.exclude_pattern)
3194
 
        del self._tests[:]
3195
 
        self.addTests(suite)
3196
 
        return iter(self._tests)
3197
 
 
3198
 
 
3199
 
class FilterTestsDecorator(TestDecorator):
3200
 
    """A decorator which filters tests to those matching a pattern."""
3201
 
 
3202
 
    def __init__(self, suite, pattern):
3203
 
        TestDecorator.__init__(self, suite)
3204
 
        self.pattern = pattern
3205
 
        self.filtered = False
3206
 
 
3207
 
    def __iter__(self):
3208
 
        if self.filtered:
3209
 
            return iter(self._tests)
3210
 
        self.filtered = True
3211
 
        suite = filter_suite_by_re(self, self.pattern)
3212
 
        del self._tests[:]
3213
 
        self.addTests(suite)
3214
 
        return iter(self._tests)
3215
 
 
3216
 
 
3217
 
class RandomDecorator(TestDecorator):
3218
 
    """A decorator which randomises the order of its tests."""
3219
 
 
3220
 
    def __init__(self, suite, random_seed, stream):
3221
 
        TestDecorator.__init__(self, suite)
3222
 
        self.random_seed = random_seed
3223
 
        self.randomised = False
3224
 
        self.stream = stream
3225
 
 
3226
 
    def __iter__(self):
3227
 
        if self.randomised:
3228
 
            return iter(self._tests)
3229
 
        self.randomised = True
3230
 
        self.stream.write("Randomizing test order using seed %s\n\n" %
3231
 
            (self.actual_seed()))
3232
 
        # Initialise the random number generator.
3233
 
        random.seed(self.actual_seed())
3234
 
        suite = randomize_suite(self)
3235
 
        del self._tests[:]
3236
 
        self.addTests(suite)
3237
 
        return iter(self._tests)
3238
 
 
3239
 
    def actual_seed(self):
3240
 
        if self.random_seed == "now":
3241
 
            # We convert the seed to a long to make it reuseable across
3242
 
            # invocations (because the user can reenter it).
3243
 
            self.random_seed = long(time.time())
3244
 
        else:
3245
 
            # Convert the seed to a long if we can
3246
 
            try:
3247
 
                self.random_seed = long(self.random_seed)
3248
 
            except:
3249
 
                pass
3250
 
        return self.random_seed
3251
 
 
3252
 
 
3253
 
class TestFirstDecorator(TestDecorator):
3254
 
    """A decorator which moves named tests to the front."""
3255
 
 
3256
 
    def __init__(self, suite, pattern):
3257
 
        TestDecorator.__init__(self, suite)
3258
 
        self.pattern = pattern
3259
 
        self.filtered = False
3260
 
 
3261
 
    def __iter__(self):
3262
 
        if self.filtered:
3263
 
            return iter(self._tests)
3264
 
        self.filtered = True
3265
 
        suites = split_suite_by_re(self, self.pattern)
3266
 
        del self._tests[:]
3267
 
        self.addTests(suites)
3268
 
        return iter(self._tests)
3269
 
 
3270
 
 
3271
 
def partition_tests(suite, count):
3272
 
    """Partition suite into count lists of tests."""
3273
 
    # This just assigns tests in a round-robin fashion.  On one hand this
3274
 
    # splits up blocks of related tests that might run faster if they shared
3275
 
    # resources, but on the other it avoids assigning blocks of slow tests to
3276
 
    # just one partition.  So the slowest partition shouldn't be much slower
3277
 
    # than the fastest.
3278
 
    partitions = [list() for i in range(count)]
3279
 
    tests = iter_suite_tests(suite)
3280
 
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3281
 
        partition.append(test)
3282
 
    return partitions
3283
 
 
3284
 
 
3285
 
def workaround_zealous_crypto_random():
3286
 
    """Crypto.Random want to help us being secure, but we don't care here.
3287
 
 
3288
 
    This workaround some test failure related to the sftp server. Once paramiko
3289
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3290
 
    """
3291
 
    try:
3292
 
        from Crypto.Random import atfork
3293
 
        atfork()
3294
 
    except ImportError:
3295
 
        pass
3296
 
 
3297
 
 
3298
 
def fork_for_tests(suite):
3299
 
    """Take suite and start up one runner per CPU by forking()
3300
 
 
3301
 
    :return: An iterable of TestCase-like objects which can each have
3302
 
        run(result) called on them to feed tests to result.
3303
 
    """
3304
 
    concurrency = osutils.local_concurrency()
3305
 
    result = []
3306
 
    from subunit import TestProtocolClient, ProtocolTestCase
3307
 
    from subunit.test_results import AutoTimingTestResultDecorator
3308
 
    class TestInOtherProcess(ProtocolTestCase):
3309
 
        # Should be in subunit, I think. RBC.
3310
 
        def __init__(self, stream, pid):
3311
 
            ProtocolTestCase.__init__(self, stream)
3312
 
            self.pid = pid
3313
 
 
3314
 
        def run(self, result):
3315
 
            try:
3316
 
                ProtocolTestCase.run(self, result)
3317
 
            finally:
3318
 
                os.waitpid(self.pid, 0)
3319
 
 
3320
 
    test_blocks = partition_tests(suite, concurrency)
3321
 
    for process_tests in test_blocks:
3322
 
        process_suite = TestUtil.TestSuite()
3323
 
        process_suite.addTests(process_tests)
3324
 
        c2pread, c2pwrite = os.pipe()
3325
 
        pid = os.fork()
3326
 
        if pid == 0:
3327
 
            workaround_zealous_crypto_random()
3328
 
            try:
3329
 
                os.close(c2pread)
3330
 
                # Leave stderr and stdout open so we can see test noise
3331
 
                # Close stdin so that the child goes away if it decides to
3332
 
                # read from stdin (otherwise its a roulette to see what
3333
 
                # child actually gets keystrokes for pdb etc).
3334
 
                sys.stdin.close()
3335
 
                sys.stdin = None
3336
 
                stream = os.fdopen(c2pwrite, 'wb', 1)
3337
 
                subunit_result = AutoTimingTestResultDecorator(
3338
 
                    TestProtocolClient(stream))
3339
 
                process_suite.run(subunit_result)
3340
 
            finally:
3341
 
                os._exit(0)
3342
 
        else:
3343
 
            os.close(c2pwrite)
3344
 
            stream = os.fdopen(c2pread, 'rb', 1)
3345
 
            test = TestInOtherProcess(stream, pid)
3346
 
            result.append(test)
3347
 
    return result
3348
 
 
3349
 
 
3350
 
def reinvoke_for_tests(suite):
3351
 
    """Take suite and start up one runner per CPU using subprocess().
3352
 
 
3353
 
    :return: An iterable of TestCase-like objects which can each have
3354
 
        run(result) called on them to feed tests to result.
3355
 
    """
3356
 
    concurrency = osutils.local_concurrency()
3357
 
    result = []
3358
 
    from subunit import ProtocolTestCase
3359
 
    class TestInSubprocess(ProtocolTestCase):
3360
 
        def __init__(self, process, name):
3361
 
            ProtocolTestCase.__init__(self, process.stdout)
3362
 
            self.process = process
3363
 
            self.process.stdin.close()
3364
 
            self.name = name
3365
 
 
3366
 
        def run(self, result):
3367
 
            try:
3368
 
                ProtocolTestCase.run(self, result)
3369
 
            finally:
3370
 
                self.process.wait()
3371
 
                os.unlink(self.name)
3372
 
            # print "pid %d finished" % finished_process
3373
 
    test_blocks = partition_tests(suite, concurrency)
3374
 
    for process_tests in test_blocks:
3375
 
        # ugly; currently reimplement rather than reuses TestCase methods.
3376
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
3377
 
        if not os.path.isfile(bzr_path):
3378
 
            # We are probably installed. Assume sys.argv is the right file
3379
 
            bzr_path = sys.argv[0]
3380
 
        bzr_path = [bzr_path]
3381
 
        if sys.platform == "win32":
3382
 
            # if we're on windows, we can't execute the bzr script directly
3383
 
            bzr_path = [sys.executable] + bzr_path
3384
 
        fd, test_list_file_name = tempfile.mkstemp()
3385
 
        test_list_file = os.fdopen(fd, 'wb', 1)
3386
 
        for test in process_tests:
3387
 
            test_list_file.write(test.id() + '\n')
3388
 
        test_list_file.close()
3389
 
        try:
3390
 
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3391
 
                '--subunit']
3392
 
            if '--no-plugins' in sys.argv:
3393
 
                argv.append('--no-plugins')
3394
 
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3395
 
            # noise on stderr it can interrupt the subunit protocol.
3396
 
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3397
 
                                      stdout=subprocess.PIPE,
3398
 
                                      stderr=subprocess.PIPE,
3399
 
                                      bufsize=1)
3400
 
            test = TestInSubprocess(process, test_list_file_name)
3401
 
            result.append(test)
3402
 
        except:
3403
 
            os.unlink(test_list_file_name)
3404
 
            raise
3405
 
    return result
3406
 
 
3407
 
 
3408
 
class ProfileResult(testtools.ExtendedToOriginalDecorator):
3409
 
    """Generate profiling data for all activity between start and success.
3410
 
    
3411
 
    The profile data is appended to the test's _benchcalls attribute and can
3412
 
    be accessed by the forwarded-to TestResult.
3413
 
 
3414
 
    While it might be cleaner do accumulate this in stopTest, addSuccess is
3415
 
    where our existing output support for lsprof is, and this class aims to
3416
 
    fit in with that: while it could be moved it's not necessary to accomplish
3417
 
    test profiling, nor would it be dramatically cleaner.
3418
 
    """
3419
 
 
3420
 
    def startTest(self, test):
3421
 
        self.profiler = bzrlib.lsprof.BzrProfiler()
3422
 
        # Prevent deadlocks in tests that use lsprof: those tests will
3423
 
        # unavoidably fail.
3424
 
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3425
 
        self.profiler.start()
3426
 
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
3427
 
 
3428
 
    def addSuccess(self, test):
3429
 
        stats = self.profiler.stop()
3430
 
        try:
3431
 
            calls = test._benchcalls
3432
 
        except AttributeError:
3433
 
            test._benchcalls = []
3434
 
            calls = test._benchcalls
3435
 
        calls.append(((test.id(), "", ""), stats))
3436
 
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3437
 
 
3438
 
    def stopTest(self, test):
3439
 
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3440
 
        self.profiler = None
3441
 
 
3442
 
 
3443
 
# Controlled by "bzr selftest -E=..." option
3444
 
# Currently supported:
3445
 
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
3446
 
#                           preserves any flags supplied at the command line.
3447
 
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
3448
 
#                           rather than failing tests. And no longer raise
3449
 
#                           LockContention when fctnl locks are not being used
3450
 
#                           with proper exclusion rules.
3451
 
#   -Ethreads               Will display thread ident at creation/join time to
3452
 
#                           help track thread leaks
3453
 
selftest_debug_flags = set()
 
2100
    return result.wasSuccessful()
3454
2101
 
3455
2102
 
3456
2103
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2104
             keep_output=False,
3457
2105
             transport=None,
3458
2106
             test_suite_factory=None,
3459
2107
             lsprof_timed=None,
3460
2108
             bench_history=None,
3461
2109
             matching_tests_first=None,
3462
 
             list_only=False,
3463
 
             random_seed=None,
3464
 
             exclude_pattern=None,
3465
 
             strict=False,
3466
 
             load_list=None,
3467
 
             debug_flags=None,
3468
 
             starting_with=None,
3469
 
             runner_class=None,
3470
 
             suite_decorators=None,
3471
 
             stream=None,
3472
 
             lsprof_tests=False,
3473
 
             ):
 
2110
             numbered_dirs=None):
3474
2111
    """Run the whole test suite under the enhanced runner"""
3475
2112
    # XXX: Very ugly way to do this...
3476
2113
    # Disable warning about old formats because we don't want it to disturb
3483
2120
        transport = default_transport
3484
2121
    old_transport = default_transport
3485
2122
    default_transport = transport
3486
 
    global selftest_debug_flags
3487
 
    old_debug_flags = selftest_debug_flags
3488
 
    if debug_flags is not None:
3489
 
        selftest_debug_flags = set(debug_flags)
3490
2123
    try:
3491
 
        if load_list is None:
3492
 
            keep_only = None
3493
 
        else:
3494
 
            keep_only = load_test_id_list(load_list)
3495
 
        if starting_with:
3496
 
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
3497
 
                             for start in starting_with]
3498
2124
        if test_suite_factory is None:
3499
 
            # Reduce loading time by loading modules based on the starting_with
3500
 
            # patterns.
3501
 
            suite = test_suite(keep_only, starting_with)
 
2125
            suite = test_suite()
3502
2126
        else:
3503
2127
            suite = test_suite_factory()
3504
 
        if starting_with:
3505
 
            # But always filter as requested.
3506
 
            suite = filter_suite_by_id_startswith(suite, starting_with)
3507
 
        result_decorators = []
3508
 
        if lsprof_tests:
3509
 
            result_decorators.append(ProfileResult)
3510
2128
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3511
 
                     stop_on_failure=stop_on_failure,
 
2129
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
3512
2130
                     transport=transport,
3513
2131
                     lsprof_timed=lsprof_timed,
3514
2132
                     bench_history=bench_history,
3515
2133
                     matching_tests_first=matching_tests_first,
3516
 
                     list_only=list_only,
3517
 
                     random_seed=random_seed,
3518
 
                     exclude_pattern=exclude_pattern,
3519
 
                     strict=strict,
3520
 
                     runner_class=runner_class,
3521
 
                     suite_decorators=suite_decorators,
3522
 
                     stream=stream,
3523
 
                     result_decorators=result_decorators,
3524
 
                     )
 
2134
                     numbered_dirs=numbered_dirs)
3525
2135
    finally:
3526
2136
        default_transport = old_transport
3527
 
        selftest_debug_flags = old_debug_flags
3528
 
 
3529
 
 
3530
 
def load_test_id_list(file_name):
3531
 
    """Load a test id list from a text file.
3532
 
 
3533
 
    The format is one test id by line.  No special care is taken to impose
3534
 
    strict rules, these test ids are used to filter the test suite so a test id
3535
 
    that do not match an existing test will do no harm. This allows user to add
3536
 
    comments, leave blank lines, etc.
3537
 
    """
3538
 
    test_list = []
3539
 
    try:
3540
 
        ftest = open(file_name, 'rt')
3541
 
    except IOError, e:
3542
 
        if e.errno != errno.ENOENT:
3543
 
            raise
3544
 
        else:
3545
 
            raise errors.NoSuchFile(file_name)
3546
 
 
3547
 
    for test_name in ftest.readlines():
3548
 
        test_list.append(test_name.strip())
3549
 
    ftest.close()
3550
 
    return test_list
3551
 
 
3552
 
 
3553
 
def suite_matches_id_list(test_suite, id_list):
3554
 
    """Warns about tests not appearing or appearing more than once.
3555
 
 
3556
 
    :param test_suite: A TestSuite object.
3557
 
    :param test_id_list: The list of test ids that should be found in
3558
 
         test_suite.
3559
 
 
3560
 
    :return: (absents, duplicates) absents is a list containing the test found
3561
 
        in id_list but not in test_suite, duplicates is a list containing the
3562
 
        test found multiple times in test_suite.
3563
 
 
3564
 
    When using a prefined test id list, it may occurs that some tests do not
3565
 
    exist anymore or that some tests use the same id. This function warns the
3566
 
    tester about potential problems in his workflow (test lists are volatile)
3567
 
    or in the test suite itself (using the same id for several tests does not
3568
 
    help to localize defects).
3569
 
    """
3570
 
    # Build a dict counting id occurrences
3571
 
    tests = dict()
3572
 
    for test in iter_suite_tests(test_suite):
3573
 
        id = test.id()
3574
 
        tests[id] = tests.get(id, 0) + 1
3575
 
 
3576
 
    not_found = []
3577
 
    duplicates = []
3578
 
    for id in id_list:
3579
 
        occurs = tests.get(id, 0)
3580
 
        if not occurs:
3581
 
            not_found.append(id)
3582
 
        elif occurs > 1:
3583
 
            duplicates.append(id)
3584
 
 
3585
 
    return not_found, duplicates
3586
 
 
3587
 
 
3588
 
class TestIdList(object):
3589
 
    """Test id list to filter a test suite.
3590
 
 
3591
 
    Relying on the assumption that test ids are built as:
3592
 
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3593
 
    notation, this class offers methods to :
3594
 
    - avoid building a test suite for modules not refered to in the test list,
3595
 
    - keep only the tests listed from the module test suite.
3596
 
    """
3597
 
 
3598
 
    def __init__(self, test_id_list):
3599
 
        # When a test suite needs to be filtered against us we compare test ids
3600
 
        # for equality, so a simple dict offers a quick and simple solution.
3601
 
        self.tests = dict().fromkeys(test_id_list, True)
3602
 
 
3603
 
        # While unittest.TestCase have ids like:
3604
 
        # <module>.<class>.<method>[(<param+)],
3605
 
        # doctest.DocTestCase can have ids like:
3606
 
        # <module>
3607
 
        # <module>.<class>
3608
 
        # <module>.<function>
3609
 
        # <module>.<class>.<method>
3610
 
 
3611
 
        # Since we can't predict a test class from its name only, we settle on
3612
 
        # a simple constraint: a test id always begins with its module name.
3613
 
 
3614
 
        modules = {}
3615
 
        for test_id in test_id_list:
3616
 
            parts = test_id.split('.')
3617
 
            mod_name = parts.pop(0)
3618
 
            modules[mod_name] = True
3619
 
            for part in parts:
3620
 
                mod_name += '.' + part
3621
 
                modules[mod_name] = True
3622
 
        self.modules = modules
3623
 
 
3624
 
    def refers_to(self, module_name):
3625
 
        """Is there tests for the module or one of its sub modules."""
3626
 
        return self.modules.has_key(module_name)
3627
 
 
3628
 
    def includes(self, test_id):
3629
 
        return self.tests.has_key(test_id)
3630
 
 
3631
 
 
3632
 
class TestPrefixAliasRegistry(registry.Registry):
3633
 
    """A registry for test prefix aliases.
3634
 
 
3635
 
    This helps implement shorcuts for the --starting-with selftest
3636
 
    option. Overriding existing prefixes is not allowed but not fatal (a
3637
 
    warning will be emitted).
3638
 
    """
3639
 
 
3640
 
    def register(self, key, obj, help=None, info=None,
3641
 
                 override_existing=False):
3642
 
        """See Registry.register.
3643
 
 
3644
 
        Trying to override an existing alias causes a warning to be emitted,
3645
 
        not a fatal execption.
3646
 
        """
3647
 
        try:
3648
 
            super(TestPrefixAliasRegistry, self).register(
3649
 
                key, obj, help=help, info=info, override_existing=False)
3650
 
        except KeyError:
3651
 
            actual = self.get(key)
3652
 
            trace.note(
3653
 
                'Test prefix alias %s is already used for %s, ignoring %s'
3654
 
                % (key, actual, obj))
3655
 
 
3656
 
    def resolve_alias(self, id_start):
3657
 
        """Replace the alias by the prefix in the given string.
3658
 
 
3659
 
        Using an unknown prefix is an error to help catching typos.
3660
 
        """
3661
 
        parts = id_start.split('.')
3662
 
        try:
3663
 
            parts[0] = self.get(parts[0])
3664
 
        except KeyError:
3665
 
            raise errors.BzrCommandError(
3666
 
                '%s is not a known test prefix alias' % parts[0])
3667
 
        return '.'.join(parts)
3668
 
 
3669
 
 
3670
 
test_prefix_alias_registry = TestPrefixAliasRegistry()
3671
 
"""Registry of test prefix aliases."""
3672
 
 
3673
 
 
3674
 
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3675
 
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3676
 
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3677
 
 
3678
 
# Obvious highest levels prefixes, feel free to add your own via a plugin
3679
 
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3680
 
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3681
 
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3682
 
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
3683
 
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
3684
 
 
3685
 
 
3686
 
def _test_suite_testmod_names():
3687
 
    """Return the standard list of test module names to test."""
3688
 
    return [
3689
 
        'bzrlib.doc',
3690
 
        'bzrlib.tests.blackbox',
3691
 
        'bzrlib.tests.commands',
3692
 
        'bzrlib.tests.doc_generate',
3693
 
        'bzrlib.tests.per_branch',
3694
 
        'bzrlib.tests.per_bzrdir',
3695
 
        'bzrlib.tests.per_controldir',
3696
 
        'bzrlib.tests.per_controldir_colo',
3697
 
        'bzrlib.tests.per_foreign_vcs',
3698
 
        'bzrlib.tests.per_interrepository',
3699
 
        'bzrlib.tests.per_intertree',
3700
 
        'bzrlib.tests.per_inventory',
3701
 
        'bzrlib.tests.per_interbranch',
3702
 
        'bzrlib.tests.per_lock',
3703
 
        'bzrlib.tests.per_merger',
3704
 
        'bzrlib.tests.per_transport',
3705
 
        'bzrlib.tests.per_tree',
3706
 
        'bzrlib.tests.per_pack_repository',
3707
 
        'bzrlib.tests.per_repository',
3708
 
        'bzrlib.tests.per_repository_chk',
3709
 
        'bzrlib.tests.per_repository_reference',
3710
 
        'bzrlib.tests.per_uifactory',
3711
 
        'bzrlib.tests.per_versionedfile',
3712
 
        'bzrlib.tests.per_workingtree',
3713
 
        'bzrlib.tests.test__annotator',
3714
 
        'bzrlib.tests.test__bencode',
3715
 
        'bzrlib.tests.test__btree_serializer',
3716
 
        'bzrlib.tests.test__chk_map',
3717
 
        'bzrlib.tests.test__dirstate_helpers',
3718
 
        'bzrlib.tests.test__groupcompress',
3719
 
        'bzrlib.tests.test__known_graph',
3720
 
        'bzrlib.tests.test__rio',
3721
 
        'bzrlib.tests.test__simple_set',
3722
 
        'bzrlib.tests.test__static_tuple',
3723
 
        'bzrlib.tests.test__walkdirs_win32',
3724
 
        'bzrlib.tests.test_ancestry',
3725
 
        'bzrlib.tests.test_annotate',
3726
 
        'bzrlib.tests.test_api',
3727
 
        'bzrlib.tests.test_atomicfile',
3728
 
        'bzrlib.tests.test_bad_files',
3729
 
        'bzrlib.tests.test_bisect_multi',
3730
 
        'bzrlib.tests.test_branch',
3731
 
        'bzrlib.tests.test_branchbuilder',
3732
 
        'bzrlib.tests.test_btree_index',
3733
 
        'bzrlib.tests.test_bugtracker',
3734
 
        'bzrlib.tests.test_bundle',
3735
 
        'bzrlib.tests.test_bzrdir',
3736
 
        'bzrlib.tests.test__chunks_to_lines',
3737
 
        'bzrlib.tests.test_cache_utf8',
3738
 
        'bzrlib.tests.test_chk_map',
3739
 
        'bzrlib.tests.test_chk_serializer',
3740
 
        'bzrlib.tests.test_chunk_writer',
3741
 
        'bzrlib.tests.test_clean_tree',
3742
 
        'bzrlib.tests.test_cleanup',
3743
 
        'bzrlib.tests.test_cmdline',
3744
 
        'bzrlib.tests.test_commands',
3745
 
        'bzrlib.tests.test_commit',
3746
 
        'bzrlib.tests.test_commit_merge',
3747
 
        'bzrlib.tests.test_config',
3748
 
        'bzrlib.tests.test_conflicts',
3749
 
        'bzrlib.tests.test_counted_lock',
3750
 
        'bzrlib.tests.test_crash',
3751
 
        'bzrlib.tests.test_decorators',
3752
 
        'bzrlib.tests.test_delta',
3753
 
        'bzrlib.tests.test_debug',
3754
 
        'bzrlib.tests.test_deprecated_graph',
3755
 
        'bzrlib.tests.test_diff',
3756
 
        'bzrlib.tests.test_directory_service',
3757
 
        'bzrlib.tests.test_dirstate',
3758
 
        'bzrlib.tests.test_email_message',
3759
 
        'bzrlib.tests.test_eol_filters',
3760
 
        'bzrlib.tests.test_errors',
3761
 
        'bzrlib.tests.test_export',
3762
 
        'bzrlib.tests.test_extract',
3763
 
        'bzrlib.tests.test_fetch',
3764
 
        'bzrlib.tests.test_fixtures',
3765
 
        'bzrlib.tests.test_fifo_cache',
3766
 
        'bzrlib.tests.test_filters',
3767
 
        'bzrlib.tests.test_ftp_transport',
3768
 
        'bzrlib.tests.test_foreign',
3769
 
        'bzrlib.tests.test_generate_docs',
3770
 
        'bzrlib.tests.test_generate_ids',
3771
 
        'bzrlib.tests.test_globbing',
3772
 
        'bzrlib.tests.test_gpg',
3773
 
        'bzrlib.tests.test_graph',
3774
 
        'bzrlib.tests.test_groupcompress',
3775
 
        'bzrlib.tests.test_hashcache',
3776
 
        'bzrlib.tests.test_help',
3777
 
        'bzrlib.tests.test_hooks',
3778
 
        'bzrlib.tests.test_http',
3779
 
        'bzrlib.tests.test_http_response',
3780
 
        'bzrlib.tests.test_https_ca_bundle',
3781
 
        'bzrlib.tests.test_identitymap',
3782
 
        'bzrlib.tests.test_ignores',
3783
 
        'bzrlib.tests.test_index',
3784
 
        'bzrlib.tests.test_import_tariff',
3785
 
        'bzrlib.tests.test_info',
3786
 
        'bzrlib.tests.test_inv',
3787
 
        'bzrlib.tests.test_inventory_delta',
3788
 
        'bzrlib.tests.test_knit',
3789
 
        'bzrlib.tests.test_lazy_import',
3790
 
        'bzrlib.tests.test_lazy_regex',
3791
 
        'bzrlib.tests.test_library_state',
3792
 
        'bzrlib.tests.test_lock',
3793
 
        'bzrlib.tests.test_lockable_files',
3794
 
        'bzrlib.tests.test_lockdir',
3795
 
        'bzrlib.tests.test_log',
3796
 
        'bzrlib.tests.test_lru_cache',
3797
 
        'bzrlib.tests.test_lsprof',
3798
 
        'bzrlib.tests.test_mail_client',
3799
 
        'bzrlib.tests.test_matchers',
3800
 
        'bzrlib.tests.test_memorytree',
3801
 
        'bzrlib.tests.test_merge',
3802
 
        'bzrlib.tests.test_merge3',
3803
 
        'bzrlib.tests.test_merge_core',
3804
 
        'bzrlib.tests.test_merge_directive',
3805
 
        'bzrlib.tests.test_mergetools',
3806
 
        'bzrlib.tests.test_missing',
3807
 
        'bzrlib.tests.test_msgeditor',
3808
 
        'bzrlib.tests.test_multiparent',
3809
 
        'bzrlib.tests.test_mutabletree',
3810
 
        'bzrlib.tests.test_nonascii',
3811
 
        'bzrlib.tests.test_options',
3812
 
        'bzrlib.tests.test_osutils',
3813
 
        'bzrlib.tests.test_osutils_encodings',
3814
 
        'bzrlib.tests.test_pack',
3815
 
        'bzrlib.tests.test_patch',
3816
 
        'bzrlib.tests.test_patches',
3817
 
        'bzrlib.tests.test_permissions',
3818
 
        'bzrlib.tests.test_plugins',
3819
 
        'bzrlib.tests.test_progress',
3820
 
        'bzrlib.tests.test_pyutils',
3821
 
        'bzrlib.tests.test_read_bundle',
3822
 
        'bzrlib.tests.test_reconcile',
3823
 
        'bzrlib.tests.test_reconfigure',
3824
 
        'bzrlib.tests.test_registry',
3825
 
        'bzrlib.tests.test_remote',
3826
 
        'bzrlib.tests.test_rename_map',
3827
 
        'bzrlib.tests.test_repository',
3828
 
        'bzrlib.tests.test_revert',
3829
 
        'bzrlib.tests.test_revision',
3830
 
        'bzrlib.tests.test_revisionspec',
3831
 
        'bzrlib.tests.test_revisiontree',
3832
 
        'bzrlib.tests.test_rio',
3833
 
        'bzrlib.tests.test_rules',
3834
 
        'bzrlib.tests.test_sampler',
3835
 
        'bzrlib.tests.test_scenarios',
3836
 
        'bzrlib.tests.test_script',
3837
 
        'bzrlib.tests.test_selftest',
3838
 
        'bzrlib.tests.test_serializer',
3839
 
        'bzrlib.tests.test_setup',
3840
 
        'bzrlib.tests.test_sftp_transport',
3841
 
        'bzrlib.tests.test_shelf',
3842
 
        'bzrlib.tests.test_shelf_ui',
3843
 
        'bzrlib.tests.test_smart',
3844
 
        'bzrlib.tests.test_smart_add',
3845
 
        'bzrlib.tests.test_smart_request',
3846
 
        'bzrlib.tests.test_smart_transport',
3847
 
        'bzrlib.tests.test_smtp_connection',
3848
 
        'bzrlib.tests.test_source',
3849
 
        'bzrlib.tests.test_ssh_transport',
3850
 
        'bzrlib.tests.test_status',
3851
 
        'bzrlib.tests.test_store',
3852
 
        'bzrlib.tests.test_strace',
3853
 
        'bzrlib.tests.test_subsume',
3854
 
        'bzrlib.tests.test_switch',
3855
 
        'bzrlib.tests.test_symbol_versioning',
3856
 
        'bzrlib.tests.test_tag',
3857
 
        'bzrlib.tests.test_test_server',
3858
 
        'bzrlib.tests.test_testament',
3859
 
        'bzrlib.tests.test_textfile',
3860
 
        'bzrlib.tests.test_textmerge',
3861
 
        'bzrlib.tests.test_cethread',
3862
 
        'bzrlib.tests.test_timestamp',
3863
 
        'bzrlib.tests.test_trace',
3864
 
        'bzrlib.tests.test_transactions',
3865
 
        'bzrlib.tests.test_transform',
3866
 
        'bzrlib.tests.test_transport',
3867
 
        'bzrlib.tests.test_transport_log',
3868
 
        'bzrlib.tests.test_tree',
3869
 
        'bzrlib.tests.test_treebuilder',
3870
 
        'bzrlib.tests.test_treeshape',
3871
 
        'bzrlib.tests.test_tsort',
3872
 
        'bzrlib.tests.test_tuned_gzip',
3873
 
        'bzrlib.tests.test_ui',
3874
 
        'bzrlib.tests.test_uncommit',
3875
 
        'bzrlib.tests.test_upgrade',
3876
 
        'bzrlib.tests.test_upgrade_stacked',
3877
 
        'bzrlib.tests.test_urlutils',
3878
 
        'bzrlib.tests.test_version',
3879
 
        'bzrlib.tests.test_version_info',
3880
 
        'bzrlib.tests.test_versionedfile',
3881
 
        'bzrlib.tests.test_weave',
3882
 
        'bzrlib.tests.test_whitebox',
3883
 
        'bzrlib.tests.test_win32utils',
3884
 
        'bzrlib.tests.test_workingtree',
3885
 
        'bzrlib.tests.test_workingtree_4',
3886
 
        'bzrlib.tests.test_wsgi',
3887
 
        'bzrlib.tests.test_xml',
3888
 
        ]
3889
 
 
3890
 
 
3891
 
def _test_suite_modules_to_doctest():
3892
 
    """Return the list of modules to doctest."""
3893
 
    if __doc__ is None:
3894
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3895
 
        return []
3896
 
    return [
3897
 
        'bzrlib',
3898
 
        'bzrlib.branchbuilder',
3899
 
        'bzrlib.decorators',
3900
 
        'bzrlib.export',
3901
 
        'bzrlib.inventory',
3902
 
        'bzrlib.iterablefile',
3903
 
        'bzrlib.lockdir',
3904
 
        'bzrlib.merge3',
3905
 
        'bzrlib.option',
3906
 
        'bzrlib.pyutils',
3907
 
        'bzrlib.symbol_versioning',
3908
 
        'bzrlib.tests',
3909
 
        'bzrlib.tests.fixtures',
3910
 
        'bzrlib.timestamp',
3911
 
        'bzrlib.transport.http',
3912
 
        'bzrlib.version_info_formats.format_custom',
3913
 
        ]
3914
 
 
3915
 
 
3916
 
def test_suite(keep_only=None, starting_with=None):
 
2137
 
 
2138
 
 
2139
def test_suite():
3917
2140
    """Build and return TestSuite for the whole of bzrlib.
3918
 
 
3919
 
    :param keep_only: A list of test ids limiting the suite returned.
3920
 
 
3921
 
    :param starting_with: An id limiting the suite returned to the tests
3922
 
         starting with it.
3923
 
 
 
2141
    
3924
2142
    This function can be replaced if you need to change the default test
3925
2143
    suite on a global basis, but it is not encouraged.
3926
2144
    """
3927
 
 
 
2145
    testmod_names = [
 
2146
                   'bzrlib.tests.test_ancestry',
 
2147
                   'bzrlib.tests.test_annotate',
 
2148
                   'bzrlib.tests.test_api',
 
2149
                   'bzrlib.tests.test_atomicfile',
 
2150
                   'bzrlib.tests.test_bad_files',
 
2151
                   'bzrlib.tests.test_branch',
 
2152
                   'bzrlib.tests.test_bundle',
 
2153
                   'bzrlib.tests.test_bzrdir',
 
2154
                   'bzrlib.tests.test_cache_utf8',
 
2155
                   'bzrlib.tests.test_commands',
 
2156
                   'bzrlib.tests.test_commit',
 
2157
                   'bzrlib.tests.test_commit_merge',
 
2158
                   'bzrlib.tests.test_config',
 
2159
                   'bzrlib.tests.test_conflicts',
 
2160
                   'bzrlib.tests.test_decorators',
 
2161
                   'bzrlib.tests.test_delta',
 
2162
                   'bzrlib.tests.test_diff',
 
2163
                   'bzrlib.tests.test_dirstate',
 
2164
                   'bzrlib.tests.test_doc_generate',
 
2165
                   'bzrlib.tests.test_errors',
 
2166
                   'bzrlib.tests.test_escaped_store',
 
2167
                   'bzrlib.tests.test_extract',
 
2168
                   'bzrlib.tests.test_fetch',
 
2169
                   'bzrlib.tests.test_ftp_transport',
 
2170
                   'bzrlib.tests.test_generate_docs',
 
2171
                   'bzrlib.tests.test_generate_ids',
 
2172
                   'bzrlib.tests.test_globbing',
 
2173
                   'bzrlib.tests.test_gpg',
 
2174
                   'bzrlib.tests.test_graph',
 
2175
                   'bzrlib.tests.test_hashcache',
 
2176
                   'bzrlib.tests.test_http',
 
2177
                   'bzrlib.tests.test_http_response',
 
2178
                   'bzrlib.tests.test_https_ca_bundle',
 
2179
                   'bzrlib.tests.test_identitymap',
 
2180
                   'bzrlib.tests.test_ignores',
 
2181
                   'bzrlib.tests.test_inv',
 
2182
                   'bzrlib.tests.test_knit',
 
2183
                   'bzrlib.tests.test_lazy_import',
 
2184
                   'bzrlib.tests.test_lazy_regex',
 
2185
                   'bzrlib.tests.test_lockdir',
 
2186
                   'bzrlib.tests.test_lockable_files',
 
2187
                   'bzrlib.tests.test_log',
 
2188
                   'bzrlib.tests.test_memorytree',
 
2189
                   'bzrlib.tests.test_merge',
 
2190
                   'bzrlib.tests.test_merge3',
 
2191
                   'bzrlib.tests.test_merge_core',
 
2192
                   'bzrlib.tests.test_merge_directive',
 
2193
                   'bzrlib.tests.test_missing',
 
2194
                   'bzrlib.tests.test_msgeditor',
 
2195
                   'bzrlib.tests.test_nonascii',
 
2196
                   'bzrlib.tests.test_options',
 
2197
                   'bzrlib.tests.test_osutils',
 
2198
                   'bzrlib.tests.test_osutils_encodings',
 
2199
                   'bzrlib.tests.test_patch',
 
2200
                   'bzrlib.tests.test_patches',
 
2201
                   'bzrlib.tests.test_permissions',
 
2202
                   'bzrlib.tests.test_plugins',
 
2203
                   'bzrlib.tests.test_progress',
 
2204
                   'bzrlib.tests.test_reconcile',
 
2205
                   'bzrlib.tests.test_registry',
 
2206
                   'bzrlib.tests.test_repository',
 
2207
                   'bzrlib.tests.test_revert',
 
2208
                   'bzrlib.tests.test_revision',
 
2209
                   'bzrlib.tests.test_revisionnamespaces',
 
2210
                   'bzrlib.tests.test_revisiontree',
 
2211
                   'bzrlib.tests.test_rio',
 
2212
                   'bzrlib.tests.test_sampler',
 
2213
                   'bzrlib.tests.test_selftest',
 
2214
                   'bzrlib.tests.test_setup',
 
2215
                   'bzrlib.tests.test_sftp_transport',
 
2216
                   'bzrlib.tests.test_smart_add',
 
2217
                   'bzrlib.tests.test_smart_transport',
 
2218
                   'bzrlib.tests.test_source',
 
2219
                   'bzrlib.tests.test_ssh_transport',
 
2220
                   'bzrlib.tests.test_status',
 
2221
                   'bzrlib.tests.test_store',
 
2222
                   'bzrlib.tests.test_strace',
 
2223
                   'bzrlib.tests.test_subsume',
 
2224
                   'bzrlib.tests.test_symbol_versioning',
 
2225
                   'bzrlib.tests.test_tag',
 
2226
                   'bzrlib.tests.test_testament',
 
2227
                   'bzrlib.tests.test_textfile',
 
2228
                   'bzrlib.tests.test_textmerge',
 
2229
                   'bzrlib.tests.test_timestamp',
 
2230
                   'bzrlib.tests.test_trace',
 
2231
                   'bzrlib.tests.test_transactions',
 
2232
                   'bzrlib.tests.test_transform',
 
2233
                   'bzrlib.tests.test_transport',
 
2234
                   'bzrlib.tests.test_tree',
 
2235
                   'bzrlib.tests.test_treebuilder',
 
2236
                   'bzrlib.tests.test_tsort',
 
2237
                   'bzrlib.tests.test_tuned_gzip',
 
2238
                   'bzrlib.tests.test_ui',
 
2239
                   'bzrlib.tests.test_upgrade',
 
2240
                   'bzrlib.tests.test_urlutils',
 
2241
                   'bzrlib.tests.test_versionedfile',
 
2242
                   'bzrlib.tests.test_version',
 
2243
                   'bzrlib.tests.test_version_info',
 
2244
                   'bzrlib.tests.test_weave',
 
2245
                   'bzrlib.tests.test_whitebox',
 
2246
                   'bzrlib.tests.test_workingtree',
 
2247
                   'bzrlib.tests.test_workingtree_4',
 
2248
                   'bzrlib.tests.test_wsgi',
 
2249
                   'bzrlib.tests.test_xml',
 
2250
                   ]
 
2251
    test_transport_implementations = [
 
2252
        'bzrlib.tests.test_transport_implementations',
 
2253
        'bzrlib.tests.test_read_bundle',
 
2254
        ]
 
2255
    suite = TestUtil.TestSuite()
3928
2256
    loader = TestUtil.TestLoader()
3929
 
 
3930
 
    if keep_only is not None:
3931
 
        id_filter = TestIdList(keep_only)
3932
 
    if starting_with:
3933
 
        # We take precedence over keep_only because *at loading time* using
3934
 
        # both options means we will load less tests for the same final result.
3935
 
        def interesting_module(name):
3936
 
            for start in starting_with:
3937
 
                if (
3938
 
                    # Either the module name starts with the specified string
3939
 
                    name.startswith(start)
3940
 
                    # or it may contain tests starting with the specified string
3941
 
                    or start.startswith(name)
3942
 
                    ):
3943
 
                    return True
3944
 
            return False
3945
 
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3946
 
 
3947
 
    elif keep_only is not None:
3948
 
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3949
 
        def interesting_module(name):
3950
 
            return id_filter.refers_to(name)
3951
 
 
3952
 
    else:
3953
 
        loader = TestUtil.TestLoader()
3954
 
        def interesting_module(name):
3955
 
            # No filtering, all modules are interesting
3956
 
            return True
3957
 
 
3958
 
    suite = loader.suiteClass()
3959
 
 
3960
 
    # modules building their suite with loadTestsFromModuleNames
3961
 
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
3962
 
 
3963
 
    for mod in _test_suite_modules_to_doctest():
3964
 
        if not interesting_module(mod):
3965
 
            # No tests to keep here, move along
3966
 
            continue
 
2257
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2258
    from bzrlib.transport import TransportTestProviderAdapter
 
2259
    adapter = TransportTestProviderAdapter()
 
2260
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2261
    for package in packages_to_test():
 
2262
        suite.addTest(package.test_suite())
 
2263
    for m in MODULES_TO_TEST:
 
2264
        suite.addTest(loader.loadTestsFromModule(m))
 
2265
    for m in MODULES_TO_DOCTEST:
3967
2266
        try:
3968
 
            # note that this really does mean "report only" -- doctest
3969
 
            # still runs the rest of the examples
3970
 
            doc_suite = IsolatedDocTestSuite(
3971
 
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
2267
            suite.addTest(doctest.DocTestSuite(m))
3972
2268
        except ValueError, e:
3973
 
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
2269
            print '**failed to get doctest for: %s\n%s' %(m,e)
3974
2270
            raise
3975
 
        if len(doc_suite._tests) == 0:
3976
 
            raise errors.BzrError("no doctests found in %s" % (mod,))
3977
 
        suite.addTest(doc_suite)
3978
 
 
3979
 
    default_encoding = sys.getdefaultencoding()
3980
 
    for name, plugin in _mod_plugin.plugins().items():
3981
 
        if not interesting_module(plugin.module.__name__):
3982
 
            continue
3983
 
        plugin_suite = plugin.test_suite()
3984
 
        # We used to catch ImportError here and turn it into just a warning,
3985
 
        # but really if you don't have --no-plugins this should be a failure.
3986
 
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3987
 
        if plugin_suite is None:
3988
 
            plugin_suite = plugin.load_plugin_tests(loader)
3989
 
        if plugin_suite is not None:
3990
 
            suite.addTest(plugin_suite)
3991
 
        if default_encoding != sys.getdefaultencoding():
3992
 
            trace.warning(
3993
 
                'Plugin "%s" tried to reset default encoding to: %s', name,
3994
 
                sys.getdefaultencoding())
3995
 
            reload(sys)
3996
 
            sys.setdefaultencoding(default_encoding)
3997
 
 
3998
 
    if keep_only is not None:
3999
 
        # Now that the referred modules have loaded their tests, keep only the
4000
 
        # requested ones.
4001
 
        suite = filter_suite_by_id_list(suite, id_filter)
4002
 
        # Do some sanity checks on the id_list filtering
4003
 
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
4004
 
        if starting_with:
4005
 
            # The tester has used both keep_only and starting_with, so he is
4006
 
            # already aware that some tests are excluded from the list, there
4007
 
            # is no need to tell him which.
4008
 
            pass
4009
 
        else:
4010
 
            # Some tests mentioned in the list are not in the test suite. The
4011
 
            # list may be out of date, report to the tester.
4012
 
            for id in not_found:
4013
 
                trace.warning('"%s" not found in the test suite', id)
4014
 
        for id in duplicates:
4015
 
            trace.warning('"%s" is used as an id by several tests', id)
4016
 
 
 
2271
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2272
        if getattr(plugin, 'test_suite', None) is not None:
 
2273
            default_encoding = sys.getdefaultencoding()
 
2274
            try:
 
2275
                plugin_suite = plugin.test_suite()
 
2276
            except ImportError, e:
 
2277
                bzrlib.trace.warning(
 
2278
                    'Unable to test plugin "%s": %s', name, e)
 
2279
            else:
 
2280
                suite.addTest(plugin_suite)
 
2281
            if default_encoding != sys.getdefaultencoding():
 
2282
                bzrlib.trace.warning(
 
2283
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2284
                    sys.getdefaultencoding())
 
2285
                reload(sys)
 
2286
                sys.setdefaultencoding(default_encoding)
4017
2287
    return suite
4018
2288
 
4019
2289
 
4020
 
def multiply_scenarios(*scenarios):
4021
 
    """Multiply two or more iterables of scenarios.
4022
 
 
4023
 
    It is safe to pass scenario generators or iterators.
4024
 
 
4025
 
    :returns: A list of compound scenarios: the cross-product of all 
4026
 
        scenarios, with the names concatenated and the parameters
4027
 
        merged together.
4028
 
    """
4029
 
    return reduce(_multiply_two_scenarios, map(list, scenarios))
4030
 
 
4031
 
 
4032
 
def _multiply_two_scenarios(scenarios_left, scenarios_right):
4033
 
    """Multiply two sets of scenarios.
4034
 
 
4035
 
    :returns: the cartesian product of the two sets of scenarios, that is
4036
 
        a scenario for every possible combination of a left scenario and a
4037
 
        right scenario.
4038
 
    """
4039
 
    return [
4040
 
        ('%s,%s' % (left_name, right_name),
4041
 
         dict(left_dict.items() + right_dict.items()))
4042
 
        for left_name, left_dict in scenarios_left
4043
 
        for right_name, right_dict in scenarios_right]
4044
 
 
4045
 
 
4046
 
def multiply_tests(tests, scenarios, result):
4047
 
    """Multiply tests_list by scenarios into result.
4048
 
 
4049
 
    This is the core workhorse for test parameterisation.
4050
 
 
4051
 
    Typically the load_tests() method for a per-implementation test suite will
4052
 
    call multiply_tests and return the result.
4053
 
 
4054
 
    :param tests: The tests to parameterise.
4055
 
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
4056
 
        scenario_param_dict).
4057
 
    :param result: A TestSuite to add created tests to.
4058
 
 
4059
 
    This returns the passed in result TestSuite with the cross product of all
4060
 
    the tests repeated once for each scenario.  Each test is adapted by adding
4061
 
    the scenario name at the end of its id(), and updating the test object's
4062
 
    __dict__ with the scenario_param_dict.
4063
 
 
4064
 
    >>> import bzrlib.tests.test_sampler
4065
 
    >>> r = multiply_tests(
4066
 
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4067
 
    ...     [('one', dict(param=1)),
4068
 
    ...      ('two', dict(param=2))],
4069
 
    ...     TestUtil.TestSuite())
4070
 
    >>> tests = list(iter_suite_tests(r))
4071
 
    >>> len(tests)
4072
 
    2
4073
 
    >>> tests[0].id()
4074
 
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
4075
 
    >>> tests[0].param
4076
 
    1
4077
 
    >>> tests[1].param
4078
 
    2
4079
 
    """
4080
 
    for test in iter_suite_tests(tests):
4081
 
        apply_scenarios(test, scenarios, result)
4082
 
    return result
4083
 
 
4084
 
 
4085
 
def apply_scenarios(test, scenarios, result):
4086
 
    """Apply the scenarios in scenarios to test and add to result.
4087
 
 
4088
 
    :param test: The test to apply scenarios to.
4089
 
    :param scenarios: An iterable of scenarios to apply to test.
4090
 
    :return: result
4091
 
    :seealso: apply_scenario
4092
 
    """
4093
 
    for scenario in scenarios:
4094
 
        result.addTest(apply_scenario(test, scenario))
4095
 
    return result
4096
 
 
4097
 
 
4098
 
def apply_scenario(test, scenario):
4099
 
    """Copy test and apply scenario to it.
4100
 
 
4101
 
    :param test: A test to adapt.
4102
 
    :param scenario: A tuple describing the scenarion.
4103
 
        The first element of the tuple is the new test id.
4104
 
        The second element is a dict containing attributes to set on the
4105
 
        test.
4106
 
    :return: The adapted test.
4107
 
    """
4108
 
    new_id = "%s(%s)" % (test.id(), scenario[0])
4109
 
    new_test = clone_test(test, new_id)
4110
 
    for name, value in scenario[1].items():
4111
 
        setattr(new_test, name, value)
4112
 
    return new_test
4113
 
 
4114
 
 
4115
 
def clone_test(test, new_id):
4116
 
    """Clone a test giving it a new id.
4117
 
 
4118
 
    :param test: The test to clone.
4119
 
    :param new_id: The id to assign to it.
4120
 
    :return: The new test.
4121
 
    """
4122
 
    new_test = copy.copy(test)
4123
 
    new_test.id = lambda: new_id
4124
 
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4125
 
    # causes cloned tests to share the 'details' dict.  This makes it hard to
4126
 
    # read the test output for parameterized tests, because tracebacks will be
4127
 
    # associated with irrelevant tests.
4128
 
    try:
4129
 
        details = new_test._TestCase__details
4130
 
    except AttributeError:
4131
 
        # must be a different version of testtools than expected.  Do nothing.
4132
 
        pass
4133
 
    else:
4134
 
        # Reset the '__details' dict.
4135
 
        new_test._TestCase__details = {}
4136
 
    return new_test
4137
 
 
4138
 
 
4139
 
def permute_tests_for_extension(standard_tests, loader, py_module_name,
4140
 
                                ext_module_name):
4141
 
    """Helper for permutating tests against an extension module.
4142
 
 
4143
 
    This is meant to be used inside a modules 'load_tests()' function. It will
4144
 
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4145
 
    against both implementations. Setting 'test.module' to the appropriate
4146
 
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
4147
 
 
4148
 
    :param standard_tests: A test suite to permute
4149
 
    :param loader: A TestLoader
4150
 
    :param py_module_name: The python path to a python module that can always
4151
 
        be loaded, and will be considered the 'python' implementation. (eg
4152
 
        'bzrlib._chk_map_py')
4153
 
    :param ext_module_name: The python path to an extension module. If the
4154
 
        module cannot be loaded, a single test will be added, which notes that
4155
 
        the module is not available. If it can be loaded, all standard_tests
4156
 
        will be run against that module.
4157
 
    :return: (suite, feature) suite is a test-suite that has all the permuted
4158
 
        tests. feature is the Feature object that can be used to determine if
4159
 
        the module is available.
4160
 
    """
4161
 
 
4162
 
    py_module = pyutils.get_named_object(py_module_name)
4163
 
    scenarios = [
4164
 
        ('python', {'module': py_module}),
4165
 
    ]
4166
 
    suite = loader.suiteClass()
4167
 
    feature = ModuleAvailableFeature(ext_module_name)
4168
 
    if feature.available():
4169
 
        scenarios.append(('C', {'module': feature.module}))
4170
 
    else:
4171
 
        # the compiled module isn't available, so we add a failing test
4172
 
        class FailWithoutFeature(TestCase):
4173
 
            def test_fail(self):
4174
 
                self.requireFeature(feature)
4175
 
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4176
 
    result = multiply_tests(standard_tests, scenarios, suite)
4177
 
    return result, feature
4178
 
 
4179
 
 
4180
 
def _rmtree_temp_dir(dirname, test_id=None):
4181
 
    # If LANG=C we probably have created some bogus paths
4182
 
    # which rmtree(unicode) will fail to delete
4183
 
    # so make sure we are using rmtree(str) to delete everything
4184
 
    # except on win32, where rmtree(str) will fail
4185
 
    # since it doesn't have the property of byte-stream paths
4186
 
    # (they are either ascii or mbcs)
4187
 
    if sys.platform == 'win32':
4188
 
        # make sure we are using the unicode win32 api
4189
 
        dirname = unicode(dirname)
4190
 
    else:
4191
 
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2290
def adapt_modules(mods_list, adapter, loader, suite):
 
2291
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2292
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2293
        suite.addTests(adapter.adapt(test))
 
2294
 
 
2295
 
 
2296
def _rmtree_temp_dir(dirname):
4192
2297
    try:
4193
2298
        osutils.rmtree(dirname)
4194
2299
    except OSError, e:
4195
 
        # We don't want to fail here because some useful display will be lost
4196
 
        # otherwise. Polluting the tmp dir is bad, but not giving all the
4197
 
        # possible info to the test runner is even worse.
4198
 
        if test_id != None:
4199
 
            ui.ui_factory.clear_term()
4200
 
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4201
 
        # Ugly, but the last thing we want here is fail, so bear with it.
4202
 
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4203
 
                                    ).encode('ascii', 'replace')
4204
 
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4205
 
                         % (os.path.basename(dirname), printable_e))
 
2300
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2301
            print >>sys.stderr, ('Permission denied: '
 
2302
                                 'unable to remove testing dir '
 
2303
                                 '%s' % os.path.basename(dirname))
 
2304
        else:
 
2305
            raise
 
2306
 
 
2307
 
 
2308
def clean_selftest_output(root=None, quiet=False):
 
2309
    """Remove all selftest output directories from root directory.
 
2310
 
 
2311
    :param  root:   root directory for clean
 
2312
                    (if ommitted or None then clean current directory).
 
2313
    :param  quiet:  suppress report about deleting directories
 
2314
    """
 
2315
    import re
 
2316
    import shutil
 
2317
 
 
2318
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
2319
    if root is None:
 
2320
        root = u'.'
 
2321
    for i in os.listdir(root):
 
2322
        if os.path.isdir(i) and re_dir.match(i):
 
2323
            if not quiet:
 
2324
                print 'delete directory:', i
 
2325
            _rmtree_temp_dir(i)
4206
2326
 
4207
2327
 
4208
2328
class Feature(object):
4231
2351
        if getattr(self, 'feature_name', None):
4232
2352
            return self.feature_name()
4233
2353
        return self.__class__.__name__
4234
 
 
4235
 
 
4236
 
class _SymlinkFeature(Feature):
4237
 
 
4238
 
    def _probe(self):
4239
 
        return osutils.has_symlinks()
4240
 
 
4241
 
    def feature_name(self):
4242
 
        return 'symlinks'
4243
 
 
4244
 
SymlinkFeature = _SymlinkFeature()
4245
 
 
4246
 
 
4247
 
class _HardlinkFeature(Feature):
4248
 
 
4249
 
    def _probe(self):
4250
 
        return osutils.has_hardlinks()
4251
 
 
4252
 
    def feature_name(self):
4253
 
        return 'hardlinks'
4254
 
 
4255
 
HardlinkFeature = _HardlinkFeature()
4256
 
 
4257
 
 
4258
 
class _OsFifoFeature(Feature):
4259
 
 
4260
 
    def _probe(self):
4261
 
        return getattr(os, 'mkfifo', None)
4262
 
 
4263
 
    def feature_name(self):
4264
 
        return 'filesystem fifos'
4265
 
 
4266
 
OsFifoFeature = _OsFifoFeature()
4267
 
 
4268
 
 
4269
 
class _UnicodeFilenameFeature(Feature):
4270
 
    """Does the filesystem support Unicode filenames?"""
4271
 
 
4272
 
    def _probe(self):
4273
 
        try:
4274
 
            # Check for character combinations unlikely to be covered by any
4275
 
            # single non-unicode encoding. We use the characters
4276
 
            # - greek small letter alpha (U+03B1) and
4277
 
            # - braille pattern dots-123456 (U+283F).
4278
 
            os.stat(u'\u03b1\u283f')
4279
 
        except UnicodeEncodeError:
4280
 
            return False
4281
 
        except (IOError, OSError):
4282
 
            # The filesystem allows the Unicode filename but the file doesn't
4283
 
            # exist.
4284
 
            return True
4285
 
        else:
4286
 
            # The filesystem allows the Unicode filename and the file exists,
4287
 
            # for some reason.
4288
 
            return True
4289
 
 
4290
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4291
 
 
4292
 
 
4293
 
class _CompatabilityThunkFeature(Feature):
4294
 
    """This feature is just a thunk to another feature.
4295
 
 
4296
 
    It issues a deprecation warning if it is accessed, to let you know that you
4297
 
    should really use a different feature.
4298
 
    """
4299
 
 
4300
 
    def __init__(self, dep_version, module, name,
4301
 
                 replacement_name, replacement_module=None):
4302
 
        super(_CompatabilityThunkFeature, self).__init__()
4303
 
        self._module = module
4304
 
        if replacement_module is None:
4305
 
            replacement_module = module
4306
 
        self._replacement_module = replacement_module
4307
 
        self._name = name
4308
 
        self._replacement_name = replacement_name
4309
 
        self._dep_version = dep_version
4310
 
        self._feature = None
4311
 
 
4312
 
    def _ensure(self):
4313
 
        if self._feature is None:
4314
 
            depr_msg = self._dep_version % ('%s.%s'
4315
 
                                            % (self._module, self._name))
4316
 
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
4317
 
                                               self._replacement_name)
4318
 
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
4319
 
            # Import the new feature and use it as a replacement for the
4320
 
            # deprecated one.
4321
 
            self._feature = pyutils.get_named_object(
4322
 
                self._replacement_module, self._replacement_name)
4323
 
 
4324
 
    def _probe(self):
4325
 
        self._ensure()
4326
 
        return self._feature._probe()
4327
 
 
4328
 
 
4329
 
class ModuleAvailableFeature(Feature):
4330
 
    """This is a feature than describes a module we want to be available.
4331
 
 
4332
 
    Declare the name of the module in __init__(), and then after probing, the
4333
 
    module will be available as 'self.module'.
4334
 
 
4335
 
    :ivar module: The module if it is available, else None.
4336
 
    """
4337
 
 
4338
 
    def __init__(self, module_name):
4339
 
        super(ModuleAvailableFeature, self).__init__()
4340
 
        self.module_name = module_name
4341
 
 
4342
 
    def _probe(self):
4343
 
        try:
4344
 
            self._module = __import__(self.module_name, {}, {}, [''])
4345
 
            return True
4346
 
        except ImportError:
4347
 
            return False
4348
 
 
4349
 
    @property
4350
 
    def module(self):
4351
 
        if self.available(): # Make sure the probe has been done
4352
 
            return self._module
4353
 
        return None
4354
 
 
4355
 
    def feature_name(self):
4356
 
        return self.module_name
4357
 
 
4358
 
 
4359
 
def probe_unicode_in_user_encoding():
4360
 
    """Try to encode several unicode strings to use in unicode-aware tests.
4361
 
    Return first successfull match.
4362
 
 
4363
 
    :return:  (unicode value, encoded plain string value) or (None, None)
4364
 
    """
4365
 
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4366
 
    for uni_val in possible_vals:
4367
 
        try:
4368
 
            str_val = uni_val.encode(osutils.get_user_encoding())
4369
 
        except UnicodeEncodeError:
4370
 
            # Try a different character
4371
 
            pass
4372
 
        else:
4373
 
            return uni_val, str_val
4374
 
    return None, None
4375
 
 
4376
 
 
4377
 
def probe_bad_non_ascii(encoding):
4378
 
    """Try to find [bad] character with code [128..255]
4379
 
    that cannot be decoded to unicode in some encoding.
4380
 
    Return None if all non-ascii characters is valid
4381
 
    for given encoding.
4382
 
    """
4383
 
    for i in xrange(128, 256):
4384
 
        char = chr(i)
4385
 
        try:
4386
 
            char.decode(encoding)
4387
 
        except UnicodeDecodeError:
4388
 
            return char
4389
 
    return None
4390
 
 
4391
 
 
4392
 
class _HTTPSServerFeature(Feature):
4393
 
    """Some tests want an https Server, check if one is available.
4394
 
 
4395
 
    Right now, the only way this is available is under python2.6 which provides
4396
 
    an ssl module.
4397
 
    """
4398
 
 
4399
 
    def _probe(self):
4400
 
        try:
4401
 
            import ssl
4402
 
            return True
4403
 
        except ImportError:
4404
 
            return False
4405
 
 
4406
 
    def feature_name(self):
4407
 
        return 'HTTPSServer'
4408
 
 
4409
 
 
4410
 
HTTPSServerFeature = _HTTPSServerFeature()
4411
 
 
4412
 
 
4413
 
class _UnicodeFilename(Feature):
4414
 
    """Does the filesystem support Unicode filenames?"""
4415
 
 
4416
 
    def _probe(self):
4417
 
        try:
4418
 
            os.stat(u'\u03b1')
4419
 
        except UnicodeEncodeError:
4420
 
            return False
4421
 
        except (IOError, OSError):
4422
 
            # The filesystem allows the Unicode filename but the file doesn't
4423
 
            # exist.
4424
 
            return True
4425
 
        else:
4426
 
            # The filesystem allows the Unicode filename and the file exists,
4427
 
            # for some reason.
4428
 
            return True
4429
 
 
4430
 
UnicodeFilename = _UnicodeFilename()
4431
 
 
4432
 
 
4433
 
class _ByteStringNamedFilesystem(Feature):
4434
 
    """Is the filesystem based on bytes?"""
4435
 
 
4436
 
    def _probe(self):
4437
 
        if os.name == "posix":
4438
 
            return True
4439
 
        return False
4440
 
 
4441
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4442
 
 
4443
 
 
4444
 
class _UTF8Filesystem(Feature):
4445
 
    """Is the filesystem UTF-8?"""
4446
 
 
4447
 
    def _probe(self):
4448
 
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
4449
 
            return True
4450
 
        return False
4451
 
 
4452
 
UTF8Filesystem = _UTF8Filesystem()
4453
 
 
4454
 
 
4455
 
class _BreakinFeature(Feature):
4456
 
    """Does this platform support the breakin feature?"""
4457
 
 
4458
 
    def _probe(self):
4459
 
        from bzrlib import breakin
4460
 
        if breakin.determine_signal() is None:
4461
 
            return False
4462
 
        if sys.platform == 'win32':
4463
 
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
4464
 
            # We trigger SIGBREAK via a Console api so we need ctypes to
4465
 
            # access the function
4466
 
            try:
4467
 
                import ctypes
4468
 
            except OSError:
4469
 
                return False
4470
 
        return True
4471
 
 
4472
 
    def feature_name(self):
4473
 
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
4474
 
 
4475
 
 
4476
 
BreakinFeature = _BreakinFeature()
4477
 
 
4478
 
 
4479
 
class _CaseInsCasePresFilenameFeature(Feature):
4480
 
    """Is the file-system case insensitive, but case-preserving?"""
4481
 
 
4482
 
    def _probe(self):
4483
 
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
4484
 
        try:
4485
 
            # first check truly case-preserving for created files, then check
4486
 
            # case insensitive when opening existing files.
4487
 
            name = osutils.normpath(name)
4488
 
            base, rel = osutils.split(name)
4489
 
            found_rel = osutils.canonical_relpath(base, name)
4490
 
            return (found_rel == rel
4491
 
                    and os.path.isfile(name.upper())
4492
 
                    and os.path.isfile(name.lower()))
4493
 
        finally:
4494
 
            os.close(fileno)
4495
 
            os.remove(name)
4496
 
 
4497
 
    def feature_name(self):
4498
 
        return "case-insensitive case-preserving filesystem"
4499
 
 
4500
 
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
4501
 
 
4502
 
 
4503
 
class _CaseInsensitiveFilesystemFeature(Feature):
4504
 
    """Check if underlying filesystem is case-insensitive but *not* case
4505
 
    preserving.
4506
 
    """
4507
 
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
4508
 
    # more likely to be case preserving, so this case is rare.
4509
 
 
4510
 
    def _probe(self):
4511
 
        if CaseInsCasePresFilenameFeature.available():
4512
 
            return False
4513
 
 
4514
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
4515
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
4516
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
4517
 
        else:
4518
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
4519
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
4520
 
            dir=root)
4521
 
        name_a = osutils.pathjoin(tdir, 'a')
4522
 
        name_A = osutils.pathjoin(tdir, 'A')
4523
 
        os.mkdir(name_a)
4524
 
        result = osutils.isdir(name_A)
4525
 
        _rmtree_temp_dir(tdir)
4526
 
        return result
4527
 
 
4528
 
    def feature_name(self):
4529
 
        return 'case-insensitive filesystem'
4530
 
 
4531
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4532
 
 
4533
 
 
4534
 
class _CaseSensitiveFilesystemFeature(Feature):
4535
 
 
4536
 
    def _probe(self):
4537
 
        if CaseInsCasePresFilenameFeature.available():
4538
 
            return False
4539
 
        elif CaseInsensitiveFilesystemFeature.available():
4540
 
            return False
4541
 
        else:
4542
 
            return True
4543
 
 
4544
 
    def feature_name(self):
4545
 
        return 'case-sensitive filesystem'
4546
 
 
4547
 
# new coding style is for feature instances to be lowercase
4548
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4549
 
 
4550
 
 
4551
 
# Only define SubUnitBzrRunner if subunit is available.
4552
 
try:
4553
 
    from subunit import TestProtocolClient
4554
 
    from subunit.test_results import AutoTimingTestResultDecorator
4555
 
    class SubUnitBzrProtocolClient(TestProtocolClient):
4556
 
 
4557
 
        def addSuccess(self, test, details=None):
4558
 
            # The subunit client always includes the details in the subunit
4559
 
            # stream, but we don't want to include it in ours.
4560
 
            if details is not None and 'log' in details:
4561
 
                del details['log']
4562
 
            return super(SubUnitBzrProtocolClient, self).addSuccess(
4563
 
                test, details)
4564
 
 
4565
 
    class SubUnitBzrRunner(TextTestRunner):
4566
 
        def run(self, test):
4567
 
            result = AutoTimingTestResultDecorator(
4568
 
                SubUnitBzrProtocolClient(self.stream))
4569
 
            test.run(result)
4570
 
            return result
4571
 
except ImportError:
4572
 
    pass
4573
 
 
4574
 
class _PosixPermissionsFeature(Feature):
4575
 
 
4576
 
    def _probe(self):
4577
 
        def has_perms():
4578
 
            # create temporary file and check if specified perms are maintained.
4579
 
            import tempfile
4580
 
 
4581
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4582
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4583
 
            fd, name = f
4584
 
            os.close(fd)
4585
 
            os.chmod(name, write_perms)
4586
 
 
4587
 
            read_perms = os.stat(name).st_mode & 0777
4588
 
            os.unlink(name)
4589
 
            return (write_perms == read_perms)
4590
 
 
4591
 
        return (os.name == 'posix') and has_perms()
4592
 
 
4593
 
    def feature_name(self):
4594
 
        return 'POSIX permissions support'
4595
 
 
4596
 
posix_permissions_feature = _PosixPermissionsFeature()