~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

(vila) Calling super() instead of mentioning the base class in setUp avoid
 mistakes. (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
from __future__ import absolute_import
23
20
 
24
21
# NOTE: Some classes in here use camelCaseNaming() rather than
25
22
# underscore_naming().  That's for consistency with unittest; it's not the
26
23
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
24
# new assertFoo() methods.
28
25
 
 
26
import atexit
29
27
import codecs
 
28
import copy
30
29
from cStringIO import StringIO
31
30
import difflib
 
31
import doctest
32
32
import errno
 
33
import itertools
33
34
import logging
34
35
import os
 
36
import platform
 
37
import pprint
 
38
import random
35
39
import re
36
 
import shutil
 
40
import shlex
 
41
import site
37
42
import stat
 
43
import subprocess
38
44
import sys
39
45
import tempfile
 
46
import threading
 
47
import time
 
48
import traceback
40
49
import unittest
41
 
import time
42
 
 
43
 
 
44
 
import bzrlib.branch
45
 
import bzrlib.bzrdir as bzrdir
46
 
import bzrlib.commands
47
 
import bzrlib.errors as errors
48
 
import bzrlib.inventory
49
 
import bzrlib.iterablefile
50
 
import bzrlib.lockdir
51
 
import bzrlib.merge3
52
 
import bzrlib.osutils
53
 
import bzrlib.osutils as osutils
54
 
import bzrlib.plugin
55
 
import bzrlib.store
56
 
import bzrlib.trace
57
 
from bzrlib.transport import urlescape, get_transport
58
 
import bzrlib.transport
59
 
from bzrlib.transport.local import LocalRelpathServer
60
 
from bzrlib.transport.readonly import ReadonlyServer
61
 
from bzrlib.trace import mutter
62
 
from bzrlib.tests.TestUtil import TestLoader, TestSuite
63
 
from bzrlib.tests.treeshape import build_tree_contents
64
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
65
 
 
66
 
default_transport = LocalRelpathServer
67
 
 
68
 
MODULES_TO_TEST = []
69
 
MODULES_TO_DOCTEST = [
70
 
                      bzrlib.branch,
71
 
                      bzrlib.commands,
72
 
                      bzrlib.errors,
73
 
                      bzrlib.inventory,
74
 
                      bzrlib.iterablefile,
75
 
                      bzrlib.lockdir,
76
 
                      bzrlib.merge3,
77
 
                      bzrlib.option,
78
 
                      bzrlib.osutils,
79
 
                      bzrlib.store
80
 
                      ]
81
 
def packages_to_test():
82
 
    """Return a list of packages to test.
83
 
 
84
 
    The packages are not globally imported so that import failures are
85
 
    triggered when running selftest, not when importing the command.
86
 
    """
87
 
    import bzrlib.doc
88
 
    import bzrlib.tests.blackbox
89
 
    import bzrlib.tests.branch_implementations
90
 
    import bzrlib.tests.bzrdir_implementations
91
 
    import bzrlib.tests.interrepository_implementations
92
 
    import bzrlib.tests.interversionedfile_implementations
93
 
    import bzrlib.tests.repository_implementations
94
 
    import bzrlib.tests.revisionstore_implementations
95
 
    import bzrlib.tests.workingtree_implementations
96
 
    return [
97
 
            bzrlib.doc,
98
 
            bzrlib.tests.blackbox,
99
 
            bzrlib.tests.branch_implementations,
100
 
            bzrlib.tests.bzrdir_implementations,
101
 
            bzrlib.tests.interrepository_implementations,
102
 
            bzrlib.tests.interversionedfile_implementations,
103
 
            bzrlib.tests.repository_implementations,
104
 
            bzrlib.tests.revisionstore_implementations,
105
 
            bzrlib.tests.workingtree_implementations,
106
 
            ]
107
 
 
108
 
 
109
 
class _MyResult(unittest._TextTestResult):
110
 
    """Custom TestResult.
111
 
 
112
 
    Shows output in a different format, including displaying runtime for tests.
113
 
    """
 
50
import warnings
 
51
 
 
52
import testtools
 
53
# nb: check this before importing anything else from within it
 
54
_testtools_version = getattr(testtools, '__version__', ())
 
55
if _testtools_version < (0, 9, 5):
 
56
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
57
        % (testtools.__file__, _testtools_version))
 
58
from testtools import content
 
59
 
 
60
import bzrlib
 
61
from bzrlib import (
 
62
    branchbuilder,
 
63
    controldir,
 
64
    chk_map,
 
65
    commands as _mod_commands,
 
66
    config,
 
67
    i18n,
 
68
    debug,
 
69
    errors,
 
70
    hooks,
 
71
    lock as _mod_lock,
 
72
    lockdir,
 
73
    memorytree,
 
74
    osutils,
 
75
    plugin as _mod_plugin,
 
76
    pyutils,
 
77
    ui,
 
78
    urlutils,
 
79
    registry,
 
80
    symbol_versioning,
 
81
    trace,
 
82
    transport as _mod_transport,
 
83
    workingtree,
 
84
    )
 
85
try:
 
86
    import bzrlib.lsprof
 
87
except ImportError:
 
88
    # lsprof not available
 
89
    pass
 
90
from bzrlib.smart import client, request
 
91
from bzrlib.transport import (
 
92
    memory,
 
93
    pathfilter,
 
94
    )
 
95
from bzrlib.symbol_versioning import (
 
96
    deprecated_function,
 
97
    deprecated_in,
 
98
    )
 
99
from bzrlib.tests import (
 
100
    fixtures,
 
101
    test_server,
 
102
    TestUtil,
 
103
    treeshape,
 
104
    )
 
105
from bzrlib.ui import NullProgressView
 
106
from bzrlib.ui.text import TextUIFactory
 
107
from bzrlib.tests.features import _CompatabilityThunkFeature
 
108
 
 
109
# Mark this python module as being part of the implementation
 
110
# of unittest: this gives us better tracebacks where the last
 
111
# shown frame is the test code, not our assertXYZ.
 
112
__unittest = 1
 
113
 
 
114
default_transport = test_server.LocalURLServer
 
115
 
 
116
 
 
117
_unitialized_attr = object()
 
118
"""A sentinel needed to act as a default value in a method signature."""
 
119
 
 
120
 
 
121
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
122
SUBUNIT_SEEK_SET = 0
 
123
SUBUNIT_SEEK_CUR = 1
 
124
 
 
125
# These are intentionally brought into this namespace. That way plugins, etc
 
126
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
 
127
TestSuite = TestUtil.TestSuite
 
128
TestLoader = TestUtil.TestLoader
 
129
 
 
130
# Tests should run in a clean and clearly defined environment. The goal is to
 
131
# keep them isolated from the running environment as mush as possible. The test
 
132
# framework ensures the variables defined below are set (or deleted if the
 
133
# value is None) before a test is run and reset to their original value after
 
134
# the test is run. Generally if some code depends on an environment variable,
 
135
# the tests should start without this variable in the environment. There are a
 
136
# few exceptions but you shouldn't violate this rule lightly.
 
137
isolated_environ = {
 
138
    'BZR_HOME': None,
 
139
    'HOME': None,
 
140
    'XDG_CONFIG_HOME': None,
 
141
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
142
    # tests do check our impls match APPDATA
 
143
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
144
    'VISUAL': None,
 
145
    'EDITOR': None,
 
146
    'BZR_EMAIL': None,
 
147
    'BZREMAIL': None, # may still be present in the environment
 
148
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
149
    'BZR_PROGRESS_BAR': None,
 
150
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
151
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
152
    # TestCase should not use disk resources, BZR_LOG is one.
 
153
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
154
    'BZR_PLUGIN_PATH': None,
 
155
    'BZR_DISABLE_PLUGINS': None,
 
156
    'BZR_PLUGINS_AT': None,
 
157
    'BZR_CONCURRENCY': None,
 
158
    # Make sure that any text ui tests are consistent regardless of
 
159
    # the environment the test case is run in; you may want tests that
 
160
    # test other combinations.  'dumb' is a reasonable guess for tests
 
161
    # going to a pipe or a StringIO.
 
162
    'TERM': 'dumb',
 
163
    'LINES': '25',
 
164
    'COLUMNS': '80',
 
165
    'BZR_COLUMNS': '80',
 
166
    # Disable SSH Agent
 
167
    'SSH_AUTH_SOCK': None,
 
168
    # Proxies
 
169
    'http_proxy': None,
 
170
    'HTTP_PROXY': None,
 
171
    'https_proxy': None,
 
172
    'HTTPS_PROXY': None,
 
173
    'no_proxy': None,
 
174
    'NO_PROXY': None,
 
175
    'all_proxy': None,
 
176
    'ALL_PROXY': None,
 
177
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
178
    # least. If you do (care), please update this comment
 
179
    # -- vila 20080401
 
180
    'ftp_proxy': None,
 
181
    'FTP_PROXY': None,
 
182
    'BZR_REMOTE_PATH': None,
 
183
    # Generally speaking, we don't want apport reporting on crashes in
 
184
    # the test envirnoment unless we're specifically testing apport,
 
185
    # so that it doesn't leak into the real system environment.  We
 
186
    # use an env var so it propagates to subprocesses.
 
187
    'APPORT_DISABLE': '1',
 
188
    }
 
189
 
 
190
 
 
191
def override_os_environ(test, env=None):
 
192
    """Modify os.environ keeping a copy.
 
193
    
 
194
    :param test: A test instance
 
195
 
 
196
    :param env: A dict containing variable definitions to be installed
 
197
    """
 
198
    if env is None:
 
199
        env = isolated_environ
 
200
    test._original_os_environ = dict([(var, value)
 
201
                                      for var, value in os.environ.iteritems()])
 
202
    for var, value in env.iteritems():
 
203
        osutils.set_or_unset_env(var, value)
 
204
        if var not in test._original_os_environ:
 
205
            # The var is new, add it with a value of None, so
 
206
            # restore_os_environ will delete it
 
207
            test._original_os_environ[var] = None
 
208
 
 
209
 
 
210
def restore_os_environ(test):
 
211
    """Restore os.environ to its original state.
 
212
 
 
213
    :param test: A test instance previously passed to override_os_environ.
 
214
    """
 
215
    for var, value in test._original_os_environ.iteritems():
 
216
        # Restore the original value (or delete it if the value has been set to
 
217
        # None in override_os_environ).
 
218
        osutils.set_or_unset_env(var, value)
 
219
 
 
220
 
 
221
def _clear__type_equality_funcs(test):
 
222
    """Cleanup bound methods stored on TestCase instances
 
223
 
 
224
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
225
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
226
 
 
227
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
228
    shipped in Oneiric, an object with no clear method was used, hence the
 
229
    extra complications, see bug 809048 for details.
 
230
    """
 
231
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
232
    if type_equality_funcs is not None:
 
233
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
234
        if tef_clear is None:
 
235
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
236
            if tef_instance_dict is not None:
 
237
                tef_clear = tef_instance_dict.clear
 
238
        if tef_clear is not None:
 
239
            tef_clear()
 
240
 
 
241
 
 
242
class ExtendedTestResult(testtools.TextTestResult):
 
243
    """Accepts, reports and accumulates the results of running tests.
 
244
 
 
245
    Compared to the unittest version this class adds support for
 
246
    profiling, benchmarking, stopping as soon as a test fails,  and
 
247
    skipping tests.  There are further-specialized subclasses for
 
248
    different types of display.
 
249
 
 
250
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
251
    addFailure or addError methods.  These in turn may redirect to a more
 
252
    specific case for the special test results supported by our extended
 
253
    tests.
 
254
 
 
255
    Note that just one of these objects is fed the results from many tests.
 
256
    """
 
257
 
114
258
    stop_early = False
115
259
 
116
 
    def _elapsedTime(self):
117
 
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
260
    def __init__(self, stream, descriptions, verbosity,
 
261
                 bench_history=None,
 
262
                 strict=False,
 
263
                 ):
 
264
        """Construct new TestResult.
 
265
 
 
266
        :param bench_history: Optionally, a writable file object to accumulate
 
267
            benchmark results.
 
268
        """
 
269
        testtools.TextTestResult.__init__(self, stream)
 
270
        if bench_history is not None:
 
271
            from bzrlib.version import _get_bzr_source_tree
 
272
            src_tree = _get_bzr_source_tree()
 
273
            if src_tree:
 
274
                try:
 
275
                    revision_id = src_tree.get_parent_ids()[0]
 
276
                except IndexError:
 
277
                    # XXX: if this is a brand new tree, do the same as if there
 
278
                    # is no branch.
 
279
                    revision_id = ''
 
280
            else:
 
281
                # XXX: If there's no branch, what should we do?
 
282
                revision_id = ''
 
283
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
284
        self._bench_history = bench_history
 
285
        self.ui = ui.ui_factory
 
286
        self.num_tests = 0
 
287
        self.error_count = 0
 
288
        self.failure_count = 0
 
289
        self.known_failure_count = 0
 
290
        self.skip_count = 0
 
291
        self.not_applicable_count = 0
 
292
        self.unsupported = {}
 
293
        self.count = 0
 
294
        self._overall_start_time = time.time()
 
295
        self._strict = strict
 
296
        self._first_thread_leaker_id = None
 
297
        self._tests_leaking_threads_count = 0
 
298
        self._traceback_from_test = None
 
299
 
 
300
    def stopTestRun(self):
 
301
        run = self.testsRun
 
302
        actionTaken = "Ran"
 
303
        stopTime = time.time()
 
304
        timeTaken = stopTime - self.startTime
 
305
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
306
        #                the parent class method is similar have to duplicate
 
307
        self._show_list('ERROR', self.errors)
 
308
        self._show_list('FAIL', self.failures)
 
309
        self.stream.write(self.sep2)
 
310
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
311
                            run, run != 1 and "s" or "", timeTaken))
 
312
        if not self.wasSuccessful():
 
313
            self.stream.write("FAILED (")
 
314
            failed, errored = map(len, (self.failures, self.errors))
 
315
            if failed:
 
316
                self.stream.write("failures=%d" % failed)
 
317
            if errored:
 
318
                if failed: self.stream.write(", ")
 
319
                self.stream.write("errors=%d" % errored)
 
320
            if self.known_failure_count:
 
321
                if failed or errored: self.stream.write(", ")
 
322
                self.stream.write("known_failure_count=%d" %
 
323
                    self.known_failure_count)
 
324
            self.stream.write(")\n")
 
325
        else:
 
326
            if self.known_failure_count:
 
327
                self.stream.write("OK (known_failures=%d)\n" %
 
328
                    self.known_failure_count)
 
329
            else:
 
330
                self.stream.write("OK\n")
 
331
        if self.skip_count > 0:
 
332
            skipped = self.skip_count
 
333
            self.stream.write('%d test%s skipped\n' %
 
334
                                (skipped, skipped != 1 and "s" or ""))
 
335
        if self.unsupported:
 
336
            for feature, count in sorted(self.unsupported.items()):
 
337
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
338
                    (feature, count))
 
339
        if self._strict:
 
340
            ok = self.wasStrictlySuccessful()
 
341
        else:
 
342
            ok = self.wasSuccessful()
 
343
        if self._first_thread_leaker_id:
 
344
            self.stream.write(
 
345
                '%s is leaking threads among %d leaking tests.\n' % (
 
346
                self._first_thread_leaker_id,
 
347
                self._tests_leaking_threads_count))
 
348
            # We don't report the main thread as an active one.
 
349
            self.stream.write(
 
350
                '%d non-main threads were left active in the end.\n'
 
351
                % (len(self._active_threads) - 1))
 
352
 
 
353
    def getDescription(self, test):
 
354
        return test.id()
 
355
 
 
356
    def _extractBenchmarkTime(self, testCase, details=None):
 
357
        """Add a benchmark time for the current test case."""
 
358
        if details and 'benchtime' in details:
 
359
            return float(''.join(details['benchtime'].iter_bytes()))
 
360
        return getattr(testCase, "_benchtime", None)
 
361
 
 
362
    def _elapsedTestTimeString(self):
 
363
        """Return a time string for the overall time the current test has taken."""
 
364
        return self._formatTime(self._delta_to_float(
 
365
            self._now() - self._start_datetime))
 
366
 
 
367
    def _testTimeString(self, testCase):
 
368
        benchmark_time = self._extractBenchmarkTime(testCase)
 
369
        if benchmark_time is not None:
 
370
            return self._formatTime(benchmark_time) + "*"
 
371
        else:
 
372
            return self._elapsedTestTimeString()
 
373
 
 
374
    def _formatTime(self, seconds):
 
375
        """Format seconds as milliseconds with leading spaces."""
 
376
        # some benchmarks can take thousands of seconds to run, so we need 8
 
377
        # places
 
378
        return "%8dms" % (1000 * seconds)
 
379
 
 
380
    def _shortened_test_description(self, test):
 
381
        what = test.id()
 
382
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
383
        return what
 
384
 
 
385
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
386
    #                multiple times in a row, because the handler is added for
 
387
    #                each test but the container list is shared between cases.
 
388
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
389
    def _record_traceback_from_test(self, exc_info):
 
390
        """Store the traceback from passed exc_info tuple till"""
 
391
        self._traceback_from_test = exc_info[2]
118
392
 
119
393
    def startTest(self, test):
120
 
        unittest.TestResult.startTest(self, test)
121
 
        # In a short description, the important words are in
122
 
        # the beginning, but in an id, the important words are
123
 
        # at the end
124
 
        SHOW_DESCRIPTIONS = False
125
 
        if self.showAll:
126
 
            width = osutils.terminal_width()
127
 
            name_width = width - 15
128
 
            what = None
129
 
            if SHOW_DESCRIPTIONS:
130
 
                what = test.shortDescription()
131
 
                if what:
132
 
                    if len(what) > name_width:
133
 
                        what = what[:name_width-3] + '...'
134
 
            if what is None:
135
 
                what = test.id()
136
 
                if what.startswith('bzrlib.tests.'):
137
 
                    what = what[13:]
138
 
                if len(what) > name_width:
139
 
                    what = '...' + what[3-name_width:]
140
 
            what = what.ljust(name_width)
141
 
            self.stream.write(what)
142
 
        self.stream.flush()
143
 
        self._start_time = time.time()
 
394
        super(ExtendedTestResult, self).startTest(test)
 
395
        if self.count == 0:
 
396
            self.startTests()
 
397
        self.count += 1
 
398
        self.report_test_start(test)
 
399
        test.number = self.count
 
400
        self._recordTestStartTime()
 
401
        # Make testtools cases give us the real traceback on failure
 
402
        addOnException = getattr(test, "addOnException", None)
 
403
        if addOnException is not None:
 
404
            addOnException(self._record_traceback_from_test)
 
405
        # Only check for thread leaks on bzrlib derived test cases
 
406
        if isinstance(test, TestCase):
 
407
            test.addCleanup(self._check_leaked_threads, test)
 
408
 
 
409
    def stopTest(self, test):
 
410
        super(ExtendedTestResult, self).stopTest(test)
 
411
        # Manually break cycles, means touching various private things but hey
 
412
        getDetails = getattr(test, "getDetails", None)
 
413
        if getDetails is not None:
 
414
            getDetails().clear()
 
415
        _clear__type_equality_funcs(test)
 
416
        self._traceback_from_test = None
 
417
 
 
418
    def startTests(self):
 
419
        self.report_tests_starting()
 
420
        self._active_threads = threading.enumerate()
 
421
 
 
422
    def _check_leaked_threads(self, test):
 
423
        """See if any threads have leaked since last call
 
424
 
 
425
        A sample of live threads is stored in the _active_threads attribute,
 
426
        when this method runs it compares the current live threads and any not
 
427
        in the previous sample are treated as having leaked.
 
428
        """
 
429
        now_active_threads = set(threading.enumerate())
 
430
        threads_leaked = now_active_threads.difference(self._active_threads)
 
431
        if threads_leaked:
 
432
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
433
            self._tests_leaking_threads_count += 1
 
434
            if self._first_thread_leaker_id is None:
 
435
                self._first_thread_leaker_id = test.id()
 
436
            self._active_threads = now_active_threads
 
437
 
 
438
    def _recordTestStartTime(self):
 
439
        """Record that a test has started."""
 
440
        self._start_datetime = self._now()
144
441
 
145
442
    def addError(self, test, err):
146
 
        if isinstance(err[1], TestSkipped):
147
 
            return self.addSkipped(test, err)    
148
 
        unittest.TestResult.addError(self, test, err)
149
 
        if self.showAll:
150
 
            self.stream.writeln("ERROR %s" % self._elapsedTime())
151
 
        elif self.dots:
152
 
            self.stream.write('E')
153
 
        self.stream.flush()
 
443
        """Tell result that test finished with an error.
 
444
 
 
445
        Called from the TestCase run() method when the test
 
446
        fails with an unexpected error.
 
447
        """
 
448
        self._post_mortem(self._traceback_from_test)
 
449
        super(ExtendedTestResult, self).addError(test, err)
 
450
        self.error_count += 1
 
451
        self.report_error(test, err)
154
452
        if self.stop_early:
155
453
            self.stop()
156
454
 
157
455
    def addFailure(self, test, err):
158
 
        unittest.TestResult.addFailure(self, test, err)
159
 
        if self.showAll:
160
 
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
161
 
        elif self.dots:
162
 
            self.stream.write('F')
163
 
        self.stream.flush()
164
 
        if self.stop_early:
165
 
            self.stop()
166
 
 
167
 
    def addSuccess(self, test):
168
 
        if self.showAll:
169
 
            self.stream.writeln('   OK %s' % self._elapsedTime())
170
 
        elif self.dots:
171
 
            self.stream.write('~')
172
 
        self.stream.flush()
173
 
        unittest.TestResult.addSuccess(self, test)
174
 
 
175
 
    def addSkipped(self, test, skip_excinfo):
176
 
        if self.showAll:
177
 
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
178
 
            print >>self.stream, '     %s' % skip_excinfo[1]
179
 
        elif self.dots:
180
 
            self.stream.write('S')
181
 
        self.stream.flush()
182
 
        # seems best to treat this as success from point-of-view of unittest
183
 
        # -- it actually does nothing so it barely matters :)
184
 
        unittest.TestResult.addSuccess(self, test)
185
 
 
186
 
    def printErrorList(self, flavour, errors):
187
 
        for test, err in errors:
188
 
            self.stream.writeln(self.separator1)
189
 
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
190
 
            if getattr(test, '_get_log', None) is not None:
191
 
                print >>self.stream
192
 
                print >>self.stream, \
193
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
194
 
                print >>self.stream, test._get_log()
195
 
                print >>self.stream, \
196
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
197
 
            self.stream.writeln(self.separator2)
198
 
            self.stream.writeln("%s" % err)
199
 
 
200
 
 
201
 
class TextTestRunner(unittest.TextTestRunner):
 
456
        """Tell result that test failed.
 
457
 
 
458
        Called from the TestCase run() method when the test
 
459
        fails because e.g. an assert() method failed.
 
460
        """
 
461
        self._post_mortem(self._traceback_from_test)
 
462
        super(ExtendedTestResult, self).addFailure(test, err)
 
463
        self.failure_count += 1
 
464
        self.report_failure(test, err)
 
465
        if self.stop_early:
 
466
            self.stop()
 
467
 
 
468
    def addSuccess(self, test, details=None):
 
469
        """Tell result that test completed successfully.
 
470
 
 
471
        Called from the TestCase run()
 
472
        """
 
473
        if self._bench_history is not None:
 
474
            benchmark_time = self._extractBenchmarkTime(test, details)
 
475
            if benchmark_time is not None:
 
476
                self._bench_history.write("%s %s\n" % (
 
477
                    self._formatTime(benchmark_time),
 
478
                    test.id()))
 
479
        self.report_success(test)
 
480
        super(ExtendedTestResult, self).addSuccess(test)
 
481
        test._log_contents = ''
 
482
 
 
483
    def addExpectedFailure(self, test, err):
 
484
        self.known_failure_count += 1
 
485
        self.report_known_failure(test, err)
 
486
 
 
487
    def addUnexpectedSuccess(self, test, details=None):
 
488
        """Tell result the test unexpectedly passed, counting as a failure
 
489
 
 
490
        When the minimum version of testtools required becomes 0.9.8 this
 
491
        can be updated to use the new handling there.
 
492
        """
 
493
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
494
        self.failure_count += 1
 
495
        self.report_unexpected_success(test,
 
496
            "".join(details["reason"].iter_text()))
 
497
        if self.stop_early:
 
498
            self.stop()
 
499
 
 
500
    def addNotSupported(self, test, feature):
 
501
        """The test will not be run because of a missing feature.
 
502
        """
 
503
        # this can be called in two different ways: it may be that the
 
504
        # test started running, and then raised (through requireFeature)
 
505
        # UnavailableFeature.  Alternatively this method can be called
 
506
        # while probing for features before running the test code proper; in
 
507
        # that case we will see startTest and stopTest, but the test will
 
508
        # never actually run.
 
509
        self.unsupported.setdefault(str(feature), 0)
 
510
        self.unsupported[str(feature)] += 1
 
511
        self.report_unsupported(test, feature)
 
512
 
 
513
    def addSkip(self, test, reason):
 
514
        """A test has not run for 'reason'."""
 
515
        self.skip_count += 1
 
516
        self.report_skip(test, reason)
 
517
 
 
518
    def addNotApplicable(self, test, reason):
 
519
        self.not_applicable_count += 1
 
520
        self.report_not_applicable(test, reason)
 
521
 
 
522
    def _count_stored_tests(self):
 
523
        """Count of tests instances kept alive due to not succeeding"""
 
524
        return self.error_count + self.failure_count + self.known_failure_count
 
525
 
 
526
    def _post_mortem(self, tb=None):
 
527
        """Start a PDB post mortem session."""
 
528
        if os.environ.get('BZR_TEST_PDB', None):
 
529
            import pdb
 
530
            pdb.post_mortem(tb)
 
531
 
 
532
    def progress(self, offset, whence):
 
533
        """The test is adjusting the count of tests to run."""
 
534
        if whence == SUBUNIT_SEEK_SET:
 
535
            self.num_tests = offset
 
536
        elif whence == SUBUNIT_SEEK_CUR:
 
537
            self.num_tests += offset
 
538
        else:
 
539
            raise errors.BzrError("Unknown whence %r" % whence)
 
540
 
 
541
    def report_tests_starting(self):
 
542
        """Display information before the test run begins"""
 
543
        if getattr(sys, 'frozen', None) is None:
 
544
            bzr_path = osutils.realpath(sys.argv[0])
 
545
        else:
 
546
            bzr_path = sys.executable
 
547
        self.stream.write(
 
548
            'bzr selftest: %s\n' % (bzr_path,))
 
549
        self.stream.write(
 
550
            '   %s\n' % (
 
551
                    bzrlib.__path__[0],))
 
552
        self.stream.write(
 
553
            '   bzr-%s python-%s %s\n' % (
 
554
                    bzrlib.version_string,
 
555
                    bzrlib._format_version_tuple(sys.version_info),
 
556
                    platform.platform(aliased=1),
 
557
                    ))
 
558
        self.stream.write('\n')
 
559
 
 
560
    def report_test_start(self, test):
 
561
        """Display information on the test just about to be run"""
 
562
 
 
563
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
564
        """Display information on a test that leaked one or more threads"""
 
565
        # GZ 2010-09-09: A leak summary reported separately from the general
 
566
        #                thread debugging would be nice. Tests under subunit
 
567
        #                need something not using stream, perhaps adding a
 
568
        #                testtools details object would be fitting.
 
569
        if 'threads' in selftest_debug_flags:
 
570
            self.stream.write('%s is leaking, active is now %d\n' %
 
571
                (test.id(), len(active_threads)))
 
572
 
 
573
    def startTestRun(self):
 
574
        self.startTime = time.time()
 
575
 
 
576
    def report_success(self, test):
 
577
        pass
 
578
 
 
579
    def wasStrictlySuccessful(self):
 
580
        if self.unsupported or self.known_failure_count:
 
581
            return False
 
582
        return self.wasSuccessful()
 
583
 
 
584
 
 
585
class TextTestResult(ExtendedTestResult):
 
586
    """Displays progress and results of tests in text form"""
 
587
 
 
588
    def __init__(self, stream, descriptions, verbosity,
 
589
                 bench_history=None,
 
590
                 pb=None,
 
591
                 strict=None,
 
592
                 ):
 
593
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
594
            bench_history, strict)
 
595
        # We no longer pass them around, but just rely on the UIFactory stack
 
596
        # for state
 
597
        if pb is not None:
 
598
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
599
        self.pb = self.ui.nested_progress_bar()
 
600
        self.pb.show_pct = False
 
601
        self.pb.show_spinner = False
 
602
        self.pb.show_eta = False,
 
603
        self.pb.show_count = False
 
604
        self.pb.show_bar = False
 
605
        self.pb.update_latency = 0
 
606
        self.pb.show_transport_activity = False
 
607
 
 
608
    def stopTestRun(self):
 
609
        # called when the tests that are going to run have run
 
610
        self.pb.clear()
 
611
        self.pb.finished()
 
612
        super(TextTestResult, self).stopTestRun()
 
613
 
 
614
    def report_tests_starting(self):
 
615
        super(TextTestResult, self).report_tests_starting()
 
616
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
617
 
 
618
    def _progress_prefix_text(self):
 
619
        # the longer this text, the less space we have to show the test
 
620
        # name...
 
621
        a = '[%d' % self.count              # total that have been run
 
622
        # tests skipped as known not to be relevant are not important enough
 
623
        # to show here
 
624
        ## if self.skip_count:
 
625
        ##     a += ', %d skip' % self.skip_count
 
626
        ## if self.known_failure_count:
 
627
        ##     a += '+%dX' % self.known_failure_count
 
628
        if self.num_tests:
 
629
            a +='/%d' % self.num_tests
 
630
        a += ' in '
 
631
        runtime = time.time() - self._overall_start_time
 
632
        if runtime >= 60:
 
633
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
634
        else:
 
635
            a += '%ds' % runtime
 
636
        total_fail_count = self.error_count + self.failure_count
 
637
        if total_fail_count:
 
638
            a += ', %d failed' % total_fail_count
 
639
        # if self.unsupported:
 
640
        #     a += ', %d missing' % len(self.unsupported)
 
641
        a += ']'
 
642
        return a
 
643
 
 
644
    def report_test_start(self, test):
 
645
        self.pb.update(
 
646
                self._progress_prefix_text()
 
647
                + ' '
 
648
                + self._shortened_test_description(test))
 
649
 
 
650
    def _test_description(self, test):
 
651
        return self._shortened_test_description(test)
 
652
 
 
653
    def report_error(self, test, err):
 
654
        self.stream.write('ERROR: %s\n    %s\n' % (
 
655
            self._test_description(test),
 
656
            err[1],
 
657
            ))
 
658
 
 
659
    def report_failure(self, test, err):
 
660
        self.stream.write('FAIL: %s\n    %s\n' % (
 
661
            self._test_description(test),
 
662
            err[1],
 
663
            ))
 
664
 
 
665
    def report_known_failure(self, test, err):
 
666
        pass
 
667
 
 
668
    def report_unexpected_success(self, test, reason):
 
669
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
670
            self._test_description(test),
 
671
            "Unexpected success. Should have failed",
 
672
            reason,
 
673
            ))
 
674
 
 
675
    def report_skip(self, test, reason):
 
676
        pass
 
677
 
 
678
    def report_not_applicable(self, test, reason):
 
679
        pass
 
680
 
 
681
    def report_unsupported(self, test, feature):
 
682
        """test cannot be run because feature is missing."""
 
683
 
 
684
 
 
685
class VerboseTestResult(ExtendedTestResult):
 
686
    """Produce long output, with one line per test run plus times"""
 
687
 
 
688
    def _ellipsize_to_right(self, a_string, final_width):
 
689
        """Truncate and pad a string, keeping the right hand side"""
 
690
        if len(a_string) > final_width:
 
691
            result = '...' + a_string[3-final_width:]
 
692
        else:
 
693
            result = a_string
 
694
        return result.ljust(final_width)
 
695
 
 
696
    def report_tests_starting(self):
 
697
        self.stream.write('running %d tests...\n' % self.num_tests)
 
698
        super(VerboseTestResult, self).report_tests_starting()
 
699
 
 
700
    def report_test_start(self, test):
 
701
        name = self._shortened_test_description(test)
 
702
        width = osutils.terminal_width()
 
703
        if width is not None:
 
704
            # width needs space for 6 char status, plus 1 for slash, plus an
 
705
            # 11-char time string, plus a trailing blank
 
706
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
707
            # space
 
708
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
709
        else:
 
710
            self.stream.write(name)
 
711
        self.stream.flush()
 
712
 
 
713
    def _error_summary(self, err):
 
714
        indent = ' ' * 4
 
715
        return '%s%s' % (indent, err[1])
 
716
 
 
717
    def report_error(self, test, err):
 
718
        self.stream.write('ERROR %s\n%s\n'
 
719
                % (self._testTimeString(test),
 
720
                   self._error_summary(err)))
 
721
 
 
722
    def report_failure(self, test, err):
 
723
        self.stream.write(' FAIL %s\n%s\n'
 
724
                % (self._testTimeString(test),
 
725
                   self._error_summary(err)))
 
726
 
 
727
    def report_known_failure(self, test, err):
 
728
        self.stream.write('XFAIL %s\n%s\n'
 
729
                % (self._testTimeString(test),
 
730
                   self._error_summary(err)))
 
731
 
 
732
    def report_unexpected_success(self, test, reason):
 
733
        self.stream.write(' FAIL %s\n%s: %s\n'
 
734
                % (self._testTimeString(test),
 
735
                   "Unexpected success. Should have failed",
 
736
                   reason))
 
737
 
 
738
    def report_success(self, test):
 
739
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
740
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
741
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
742
            stats.pprint(file=self.stream)
 
743
        # flush the stream so that we get smooth output. This verbose mode is
 
744
        # used to show the output in PQM.
 
745
        self.stream.flush()
 
746
 
 
747
    def report_skip(self, test, reason):
 
748
        self.stream.write(' SKIP %s\n%s\n'
 
749
                % (self._testTimeString(test), reason))
 
750
 
 
751
    def report_not_applicable(self, test, reason):
 
752
        self.stream.write('  N/A %s\n    %s\n'
 
753
                % (self._testTimeString(test), reason))
 
754
 
 
755
    def report_unsupported(self, test, feature):
 
756
        """test cannot be run because feature is missing."""
 
757
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
758
                %(self._testTimeString(test), feature))
 
759
 
 
760
 
 
761
class TextTestRunner(object):
202
762
    stop_on_failure = False
203
763
 
204
 
    def _makeResult(self):
205
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
206
 
        result.stop_early = self.stop_on_failure
207
 
        return result
 
764
    def __init__(self,
 
765
                 stream=sys.stderr,
 
766
                 descriptions=0,
 
767
                 verbosity=1,
 
768
                 bench_history=None,
 
769
                 strict=False,
 
770
                 result_decorators=None,
 
771
                 ):
 
772
        """Create a TextTestRunner.
 
773
 
 
774
        :param result_decorators: An optional list of decorators to apply
 
775
            to the result object being used by the runner. Decorators are
 
776
            applied left to right - the first element in the list is the 
 
777
            innermost decorator.
 
778
        """
 
779
        # stream may know claim to know to write unicode strings, but in older
 
780
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
781
        # specifically a built in file with encoding 'UTF-8' will still try
 
782
        # to encode using ascii.
 
783
        new_encoding = osutils.get_terminal_encoding()
 
784
        codec = codecs.lookup(new_encoding)
 
785
        if type(codec) is tuple:
 
786
            # Python 2.4
 
787
            encode = codec[0]
 
788
        else:
 
789
            encode = codec.encode
 
790
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
791
        #                so should swap to the plain codecs.StreamWriter
 
792
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
793
            "backslashreplace")
 
794
        stream.encoding = new_encoding
 
795
        self.stream = stream
 
796
        self.descriptions = descriptions
 
797
        self.verbosity = verbosity
 
798
        self._bench_history = bench_history
 
799
        self._strict = strict
 
800
        self._result_decorators = result_decorators or []
 
801
 
 
802
    def run(self, test):
 
803
        "Run the given test case or test suite."
 
804
        if self.verbosity == 1:
 
805
            result_class = TextTestResult
 
806
        elif self.verbosity >= 2:
 
807
            result_class = VerboseTestResult
 
808
        original_result = result_class(self.stream,
 
809
                              self.descriptions,
 
810
                              self.verbosity,
 
811
                              bench_history=self._bench_history,
 
812
                              strict=self._strict,
 
813
                              )
 
814
        # Signal to result objects that look at stop early policy to stop,
 
815
        original_result.stop_early = self.stop_on_failure
 
816
        result = original_result
 
817
        for decorator in self._result_decorators:
 
818
            result = decorator(result)
 
819
            result.stop_early = self.stop_on_failure
 
820
        result.startTestRun()
 
821
        try:
 
822
            test.run(result)
 
823
        finally:
 
824
            result.stopTestRun()
 
825
        # higher level code uses our extended protocol to determine
 
826
        # what exit code to give.
 
827
        return original_result
208
828
 
209
829
 
210
830
def iter_suite_tests(suite):
211
831
    """Return all tests in a suite, recursing through nested suites"""
212
 
    for item in suite._tests:
213
 
        if isinstance(item, unittest.TestCase):
214
 
            yield item
215
 
        elif isinstance(item, unittest.TestSuite):
 
832
    if isinstance(suite, unittest.TestCase):
 
833
        yield suite
 
834
    elif isinstance(suite, unittest.TestSuite):
 
835
        for item in suite:
216
836
            for r in iter_suite_tests(item):
217
837
                yield r
218
 
        else:
219
 
            raise Exception('unknown object %r inside test suite %r'
220
 
                            % (item, suite))
221
 
 
222
 
 
223
 
class TestSkipped(Exception):
224
 
    """Indicates that a test was intentionally skipped, rather than failing."""
225
 
    # XXX: Not used yet
226
 
 
227
 
 
228
 
class CommandFailed(Exception):
229
 
    pass
230
 
 
231
 
class TestCase(unittest.TestCase):
 
838
    else:
 
839
        raise Exception('unknown type %r for object %r'
 
840
                        % (type(suite), suite))
 
841
 
 
842
 
 
843
TestSkipped = testtools.testcase.TestSkipped
 
844
 
 
845
 
 
846
class TestNotApplicable(TestSkipped):
 
847
    """A test is not applicable to the situation where it was run.
 
848
 
 
849
    This is only normally raised by parameterized tests, if they find that
 
850
    the instance they're constructed upon does not support one aspect
 
851
    of its interface.
 
852
    """
 
853
 
 
854
 
 
855
# traceback._some_str fails to format exceptions that have the default
 
856
# __str__ which does an implicit ascii conversion. However, repr() on those
 
857
# objects works, for all that its not quite what the doctor may have ordered.
 
858
def _clever_some_str(value):
 
859
    try:
 
860
        return str(value)
 
861
    except:
 
862
        try:
 
863
            return repr(value).replace('\\n', '\n')
 
864
        except:
 
865
            return '<unprintable %s object>' % type(value).__name__
 
866
 
 
867
traceback._some_str = _clever_some_str
 
868
 
 
869
 
 
870
# deprecated - use self.knownFailure(), or self.expectFailure.
 
871
KnownFailure = testtools.testcase._ExpectedFailure
 
872
 
 
873
 
 
874
class UnavailableFeature(Exception):
 
875
    """A feature required for this test was not available.
 
876
 
 
877
    This can be considered a specialised form of SkippedTest.
 
878
 
 
879
    The feature should be used to construct the exception.
 
880
    """
 
881
 
 
882
 
 
883
class StringIOWrapper(object):
 
884
    """A wrapper around cStringIO which just adds an encoding attribute.
 
885
 
 
886
    Internally we can check sys.stdout to see what the output encoding
 
887
    should be. However, cStringIO has no encoding attribute that we can
 
888
    set. So we wrap it instead.
 
889
    """
 
890
    encoding='ascii'
 
891
    _cstring = None
 
892
 
 
893
    def __init__(self, s=None):
 
894
        if s is not None:
 
895
            self.__dict__['_cstring'] = StringIO(s)
 
896
        else:
 
897
            self.__dict__['_cstring'] = StringIO()
 
898
 
 
899
    def __getattr__(self, name, getattr=getattr):
 
900
        return getattr(self.__dict__['_cstring'], name)
 
901
 
 
902
    def __setattr__(self, name, val):
 
903
        if name == 'encoding':
 
904
            self.__dict__['encoding'] = val
 
905
        else:
 
906
            return setattr(self._cstring, name, val)
 
907
 
 
908
 
 
909
class TestUIFactory(TextUIFactory):
 
910
    """A UI Factory for testing.
 
911
 
 
912
    Hide the progress bar but emit note()s.
 
913
    Redirect stdin.
 
914
    Allows get_password to be tested without real tty attached.
 
915
 
 
916
    See also CannedInputUIFactory which lets you provide programmatic input in
 
917
    a structured way.
 
918
    """
 
919
    # TODO: Capture progress events at the model level and allow them to be
 
920
    # observed by tests that care.
 
921
    #
 
922
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
923
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
924
    # idea of how they're supposed to be different.
 
925
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
926
 
 
927
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
928
        if stdin is not None:
 
929
            # We use a StringIOWrapper to be able to test various
 
930
            # encodings, but the user is still responsible to
 
931
            # encode the string and to set the encoding attribute
 
932
            # of StringIOWrapper.
 
933
            stdin = StringIOWrapper(stdin)
 
934
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
935
 
 
936
    def get_non_echoed_password(self):
 
937
        """Get password from stdin without trying to handle the echo mode"""
 
938
        password = self.stdin.readline()
 
939
        if not password:
 
940
            raise EOFError
 
941
        if password[-1] == '\n':
 
942
            password = password[:-1]
 
943
        return password
 
944
 
 
945
    def make_progress_view(self):
 
946
        return NullProgressView()
 
947
 
 
948
 
 
949
def isolated_doctest_setUp(test):
 
950
    override_os_environ(test)
 
951
 
 
952
 
 
953
def isolated_doctest_tearDown(test):
 
954
    restore_os_environ(test)
 
955
 
 
956
 
 
957
def IsolatedDocTestSuite(*args, **kwargs):
 
958
    """Overrides doctest.DocTestSuite to handle isolation.
 
959
 
 
960
    The method is really a factory and users are expected to use it as such.
 
961
    """
 
962
 
 
963
    kwargs['setUp'] = isolated_doctest_setUp
 
964
    kwargs['tearDown'] = isolated_doctest_tearDown
 
965
    return doctest.DocTestSuite(*args, **kwargs)
 
966
 
 
967
 
 
968
class TestCase(testtools.TestCase):
232
969
    """Base class for bzr unit tests.
233
 
    
234
 
    Tests that need access to disk resources should subclass 
 
970
 
 
971
    Tests that need access to disk resources should subclass
235
972
    TestCaseInTempDir not TestCase.
236
973
 
237
974
    Error and debug log messages are redirected from their usual
239
976
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
240
977
    so that it can also capture file IO.  When the test completes this file
241
978
    is read into memory and removed from disk.
242
 
       
 
979
 
243
980
    There are also convenience functions to invoke bzr's command-line
244
981
    routine, and to build and check bzr trees.
245
 
   
 
982
 
246
983
    In addition to the usual method of overriding tearDown(), this class also
247
 
    allows subclasses to register functions into the _cleanups list, which is
 
984
    allows subclasses to register cleanup functions via addCleanup, which are
248
985
    run in order as the object is torn down.  It's less likely this will be
249
986
    accidentally overlooked.
250
987
    """
251
988
 
252
 
    BZRPATH = 'bzr'
253
 
    _log_file_name = None
254
 
    _log_contents = ''
 
989
    _log_file = None
 
990
    # record lsprof data when performing benchmark calls.
 
991
    _gather_lsprof_in_benchmarks = False
255
992
 
256
993
    def __init__(self, methodName='testMethod'):
257
994
        super(TestCase, self).__init__(methodName)
258
 
        self._cleanups = []
 
995
        self._directory_isolation = True
 
996
        self.exception_handlers.insert(0,
 
997
            (UnavailableFeature, self._do_unsupported_or_skip))
 
998
        self.exception_handlers.insert(0,
 
999
            (TestNotApplicable, self._do_not_applicable))
259
1000
 
260
1001
    def setUp(self):
261
 
        unittest.TestCase.setUp(self)
 
1002
        super(TestCase, self).setUp()
 
1003
 
 
1004
        timeout = config.GlobalStack().get('selftest.timeout')
 
1005
        if timeout:
 
1006
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1007
            timeout_fixture.setUp()
 
1008
            self.addCleanup(timeout_fixture.cleanUp)
 
1009
 
 
1010
        for feature in getattr(self, '_test_needs_features', []):
 
1011
            self.requireFeature(feature)
262
1012
        self._cleanEnvironment()
263
 
        bzrlib.trace.disable_default_logging()
 
1013
 
 
1014
        if bzrlib.global_state is not None:
 
1015
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1016
                              config.CommandLineStore())
 
1017
 
 
1018
        self._silenceUI()
264
1019
        self._startLogFile()
 
1020
        self._benchcalls = []
 
1021
        self._benchtime = None
 
1022
        self._clear_hooks()
 
1023
        self._track_transports()
 
1024
        self._track_locks()
 
1025
        self._clear_debug_flags()
 
1026
        # Isolate global verbosity level, to make sure it's reproducible
 
1027
        # between tests.  We should get rid of this altogether: bug 656694. --
 
1028
        # mbp 20101008
 
1029
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
 
1030
        self._log_files = set()
 
1031
        # Each key in the ``_counters`` dict holds a value for a different
 
1032
        # counter. When the test ends, addDetail() should be used to output the
 
1033
        # counter values. This happens in install_counter_hook().
 
1034
        self._counters = {}
 
1035
        if 'config_stats' in selftest_debug_flags:
 
1036
            self._install_config_stats_hooks()
 
1037
        # Do not use i18n for tests (unless the test reverses this)
 
1038
        i18n.disable_i18n()
 
1039
 
 
1040
    def debug(self):
 
1041
        # debug a frame up.
 
1042
        import pdb
 
1043
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1044
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1045
                ).set_trace(sys._getframe().f_back)
 
1046
 
 
1047
    def discardDetail(self, name):
 
1048
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1049
 
 
1050
        eg. bzr always adds the 'log' detail at startup, but we don't want to
 
1051
        include it for skipped, xfail, etc tests.
 
1052
 
 
1053
        It is safe to call this for a detail that doesn't exist, in case this
 
1054
        gets called multiple times.
 
1055
        """
 
1056
        # We cheat. details is stored in __details which means we shouldn't
 
1057
        # touch it. but getDetails() returns the dict directly, so we can
 
1058
        # mutate it.
 
1059
        details = self.getDetails()
 
1060
        if name in details:
 
1061
            del details[name]
 
1062
 
 
1063
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1064
        """Install a counting hook.
 
1065
 
 
1066
        Any hook can be counted as long as it doesn't need to return a value.
 
1067
 
 
1068
        :param hooks: Where the hook should be installed.
 
1069
 
 
1070
        :param name: The hook name that will be counted.
 
1071
 
 
1072
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1073
            to ``name``.
 
1074
        """
 
1075
        _counters = self._counters # Avoid closing over self
 
1076
        if counter_name is None:
 
1077
            counter_name = name
 
1078
        if _counters.has_key(counter_name):
 
1079
            raise AssertionError('%s is already used as a counter name'
 
1080
                                  % (counter_name,))
 
1081
        _counters[counter_name] = 0
 
1082
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1083
            lambda: ['%d' % (_counters[counter_name],)]))
 
1084
        def increment_counter(*args, **kwargs):
 
1085
            _counters[counter_name] += 1
 
1086
        label = 'count %s calls' % (counter_name,)
 
1087
        hooks.install_named_hook(name, increment_counter, label)
 
1088
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1089
 
 
1090
    def _install_config_stats_hooks(self):
 
1091
        """Install config hooks to count hook calls.
 
1092
 
 
1093
        """
 
1094
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1095
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1096
                                       'config.%s' % (hook_name,))
 
1097
 
 
1098
        # The OldConfigHooks are private and need special handling to protect
 
1099
        # against recursive tests (tests that run other tests), so we just do
 
1100
        # manually what registering them into _builtin_known_hooks will provide
 
1101
        # us.
 
1102
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1103
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1104
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1105
                                      'old_config.%s' % (hook_name,))
 
1106
 
 
1107
    def _clear_debug_flags(self):
 
1108
        """Prevent externally set debug flags affecting tests.
 
1109
 
 
1110
        Tests that want to use debug flags can just set them in the
 
1111
        debug_flags set during setup/teardown.
 
1112
        """
 
1113
        # Start with a copy of the current debug flags we can safely modify.
 
1114
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1115
        if 'allow_debug' not in selftest_debug_flags:
 
1116
            debug.debug_flags.clear()
 
1117
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1118
            debug.debug_flags.add('strict_locks')
 
1119
 
 
1120
    def _clear_hooks(self):
 
1121
        # prevent hooks affecting tests
 
1122
        known_hooks = hooks.known_hooks
 
1123
        self._preserved_hooks = {}
 
1124
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1125
            current_hooks = getattr(parent, name)
 
1126
            self._preserved_hooks[parent] = (name, current_hooks)
 
1127
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1128
        hooks._lazy_hooks = {}
 
1129
        self.addCleanup(self._restoreHooks)
 
1130
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1131
            factory = known_hooks.get(key)
 
1132
            setattr(parent, name, factory())
 
1133
        # this hook should always be installed
 
1134
        request._install_hook()
 
1135
 
 
1136
    def disable_directory_isolation(self):
 
1137
        """Turn off directory isolation checks."""
 
1138
        self._directory_isolation = False
 
1139
 
 
1140
    def enable_directory_isolation(self):
 
1141
        """Enable directory isolation checks."""
 
1142
        self._directory_isolation = True
 
1143
 
 
1144
    def _silenceUI(self):
 
1145
        """Turn off UI for duration of test"""
 
1146
        # by default the UI is off; tests can turn it on if they want it.
 
1147
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1148
 
 
1149
    def _check_locks(self):
 
1150
        """Check that all lock take/release actions have been paired."""
 
1151
        # We always check for mismatched locks. If a mismatch is found, we
 
1152
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1153
        # case we just print a warning.
 
1154
        # unhook:
 
1155
        acquired_locks = [lock for action, lock in self._lock_actions
 
1156
                          if action == 'acquired']
 
1157
        released_locks = [lock for action, lock in self._lock_actions
 
1158
                          if action == 'released']
 
1159
        broken_locks = [lock for action, lock in self._lock_actions
 
1160
                        if action == 'broken']
 
1161
        # trivially, given the tests for lock acquistion and release, if we
 
1162
        # have as many in each list, it should be ok. Some lock tests also
 
1163
        # break some locks on purpose and should be taken into account by
 
1164
        # considering that breaking a lock is just a dirty way of releasing it.
 
1165
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1166
            message = (
 
1167
                'Different number of acquired and '
 
1168
                'released or broken locks.\n'
 
1169
                'acquired=%s\n'
 
1170
                'released=%s\n'
 
1171
                'broken=%s\n' %
 
1172
                (acquired_locks, released_locks, broken_locks))
 
1173
            if not self._lock_check_thorough:
 
1174
                # Rather than fail, just warn
 
1175
                print "Broken test %s: %s" % (self, message)
 
1176
                return
 
1177
            self.fail(message)
 
1178
 
 
1179
    def _track_locks(self):
 
1180
        """Track lock activity during tests."""
 
1181
        self._lock_actions = []
 
1182
        if 'disable_lock_checks' in selftest_debug_flags:
 
1183
            self._lock_check_thorough = False
 
1184
        else:
 
1185
            self._lock_check_thorough = True
 
1186
 
 
1187
        self.addCleanup(self._check_locks)
 
1188
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1189
                                                self._lock_acquired, None)
 
1190
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1191
                                                self._lock_released, None)
 
1192
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1193
                                                self._lock_broken, None)
 
1194
 
 
1195
    def _lock_acquired(self, result):
 
1196
        self._lock_actions.append(('acquired', result))
 
1197
 
 
1198
    def _lock_released(self, result):
 
1199
        self._lock_actions.append(('released', result))
 
1200
 
 
1201
    def _lock_broken(self, result):
 
1202
        self._lock_actions.append(('broken', result))
 
1203
 
 
1204
    def permit_dir(self, name):
 
1205
        """Permit a directory to be used by this test. See permit_url."""
 
1206
        name_transport = _mod_transport.get_transport_from_path(name)
 
1207
        self.permit_url(name)
 
1208
        self.permit_url(name_transport.base)
 
1209
 
 
1210
    def permit_url(self, url):
 
1211
        """Declare that url is an ok url to use in this test.
 
1212
        
 
1213
        Do this for memory transports, temporary test directory etc.
 
1214
        
 
1215
        Do not do this for the current working directory, /tmp, or any other
 
1216
        preexisting non isolated url.
 
1217
        """
 
1218
        if not url.endswith('/'):
 
1219
            url += '/'
 
1220
        self._bzr_selftest_roots.append(url)
 
1221
 
 
1222
    def permit_source_tree_branch_repo(self):
 
1223
        """Permit the source tree bzr is running from to be opened.
 
1224
 
 
1225
        Some code such as bzrlib.version attempts to read from the bzr branch
 
1226
        that bzr is executing from (if any). This method permits that directory
 
1227
        to be used in the test suite.
 
1228
        """
 
1229
        path = self.get_source_path()
 
1230
        self.record_directory_isolation()
 
1231
        try:
 
1232
            try:
 
1233
                workingtree.WorkingTree.open(path)
 
1234
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1235
                raise TestSkipped('Needs a working tree of bzr sources')
 
1236
        finally:
 
1237
            self.enable_directory_isolation()
 
1238
 
 
1239
    def _preopen_isolate_transport(self, transport):
 
1240
        """Check that all transport openings are done in the test work area."""
 
1241
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1242
            # Unwrap pathfiltered transports
 
1243
            transport = transport.server.backing_transport.clone(
 
1244
                transport._filter('.'))
 
1245
        url = transport.base
 
1246
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1247
        # urls it is given by prepending readonly+. This is appropriate as the
 
1248
        # client shouldn't know that the server is readonly (or not readonly).
 
1249
        # We could register all servers twice, with readonly+ prepending, but
 
1250
        # that makes for a long list; this is about the same but easier to
 
1251
        # read.
 
1252
        if url.startswith('readonly+'):
 
1253
            url = url[len('readonly+'):]
 
1254
        self._preopen_isolate_url(url)
 
1255
 
 
1256
    def _preopen_isolate_url(self, url):
 
1257
        if not self._directory_isolation:
 
1258
            return
 
1259
        if self._directory_isolation == 'record':
 
1260
            self._bzr_selftest_roots.append(url)
 
1261
            return
 
1262
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1263
        # from working unless they are explicitly granted permission. We then
 
1264
        # depend on the code that sets up test transports to check that they are
 
1265
        # appropriately isolated and enable their use by calling
 
1266
        # self.permit_transport()
 
1267
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1268
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1269
                % (url, self._bzr_selftest_roots))
 
1270
 
 
1271
    def record_directory_isolation(self):
 
1272
        """Gather accessed directories to permit later access.
 
1273
        
 
1274
        This is used for tests that access the branch bzr is running from.
 
1275
        """
 
1276
        self._directory_isolation = "record"
 
1277
 
 
1278
    def start_server(self, transport_server, backing_server=None):
 
1279
        """Start transport_server for this test.
 
1280
 
 
1281
        This starts the server, registers a cleanup for it and permits the
 
1282
        server's urls to be used.
 
1283
        """
 
1284
        if backing_server is None:
 
1285
            transport_server.start_server()
 
1286
        else:
 
1287
            transport_server.start_server(backing_server)
 
1288
        self.addCleanup(transport_server.stop_server)
 
1289
        # Obtain a real transport because if the server supplies a password, it
 
1290
        # will be hidden from the base on the client side.
 
1291
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1292
        # Some transport servers effectively chroot the backing transport;
 
1293
        # others like SFTPServer don't - users of the transport can walk up the
 
1294
        # transport to read the entire backing transport. This wouldn't matter
 
1295
        # except that the workdir tests are given - and that they expect the
 
1296
        # server's url to point at - is one directory under the safety net. So
 
1297
        # Branch operations into the transport will attempt to walk up one
 
1298
        # directory. Chrooting all servers would avoid this but also mean that
 
1299
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1300
        # getting the test framework to start the server with a backing server
 
1301
        # at the actual safety net directory would work too, but this then
 
1302
        # means that the self.get_url/self.get_transport methods would need
 
1303
        # to transform all their results. On balance its cleaner to handle it
 
1304
        # here, and permit a higher url when we have one of these transports.
 
1305
        if t.base.endswith('/work/'):
 
1306
            # we have safety net/test root/work
 
1307
            t = t.clone('../..')
 
1308
        elif isinstance(transport_server,
 
1309
                        test_server.SmartTCPServer_for_testing):
 
1310
            # The smart server adds a path similar to work, which is traversed
 
1311
            # up from by the client. But the server is chrooted - the actual
 
1312
            # backing transport is not escaped from, and VFS requests to the
 
1313
            # root will error (because they try to escape the chroot).
 
1314
            t2 = t.clone('..')
 
1315
            while t2.base != t.base:
 
1316
                t = t2
 
1317
                t2 = t.clone('..')
 
1318
        self.permit_url(t.base)
 
1319
 
 
1320
    def _track_transports(self):
 
1321
        """Install checks for transport usage."""
 
1322
        # TestCase has no safe place it can write to.
 
1323
        self._bzr_selftest_roots = []
 
1324
        # Currently the easiest way to be sure that nothing is going on is to
 
1325
        # hook into bzr dir opening. This leaves a small window of error for
 
1326
        # transport tests, but they are well known, and we can improve on this
 
1327
        # step.
 
1328
        controldir.ControlDir.hooks.install_named_hook("pre_open",
 
1329
            self._preopen_isolate_transport, "Check bzr directories are safe.")
265
1330
 
266
1331
    def _ndiff_strings(self, a, b):
267
1332
        """Return ndiff between two strings containing lines.
268
 
        
 
1333
 
269
1334
        A trailing newline is added if missing to make the strings
270
1335
        print properly."""
271
1336
        if b and b[-1] != '\n':
278
1343
                                  charjunk=lambda x: False)
279
1344
        return ''.join(difflines)
280
1345
 
 
1346
    def assertEqual(self, a, b, message=''):
 
1347
        try:
 
1348
            if a == b:
 
1349
                return
 
1350
        except UnicodeError, e:
 
1351
            # If we can't compare without getting a UnicodeError, then
 
1352
            # obviously they are different
 
1353
            trace.mutter('UnicodeError: %s', e)
 
1354
        if message:
 
1355
            message += '\n'
 
1356
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1357
            % (message,
 
1358
               pprint.pformat(a), pprint.pformat(b)))
 
1359
 
 
1360
    assertEquals = assertEqual
 
1361
 
281
1362
    def assertEqualDiff(self, a, b, message=None):
282
1363
        """Assert two texts are equal, if not raise an exception.
283
 
        
284
 
        This is intended for use with multi-line strings where it can 
 
1364
 
 
1365
        This is intended for use with multi-line strings where it can
285
1366
        be hard to find the differences by eye.
286
1367
        """
287
1368
        # TODO: perhaps override assertEquals to call this for strings?
289
1370
            return
290
1371
        if message is None:
291
1372
            message = "texts not equal:\n"
292
 
        raise AssertionError(message + 
293
 
                             self._ndiff_strings(a, b))      
294
 
        
 
1373
        if a + '\n' == b:
 
1374
            message = 'first string is missing a final newline.\n'
 
1375
        if a == b + '\n':
 
1376
            message = 'second string is missing a final newline.\n'
 
1377
        raise AssertionError(message +
 
1378
                             self._ndiff_strings(a, b))
 
1379
 
295
1380
    def assertEqualMode(self, mode, mode_test):
296
1381
        self.assertEqual(mode, mode_test,
297
1382
                         'mode mismatch %o != %o' % (mode, mode_test))
298
1383
 
 
1384
    def assertEqualStat(self, expected, actual):
 
1385
        """assert that expected and actual are the same stat result.
 
1386
 
 
1387
        :param expected: A stat result.
 
1388
        :param actual: A stat result.
 
1389
        :raises AssertionError: If the expected and actual stat values differ
 
1390
            other than by atime.
 
1391
        """
 
1392
        self.assertEqual(expected.st_size, actual.st_size,
 
1393
                         'st_size did not match')
 
1394
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1395
                         'st_mtime did not match')
 
1396
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1397
                         'st_ctime did not match')
 
1398
        if sys.platform == 'win32':
 
1399
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1400
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1401
            # odd. We just force it to always be 0 to avoid any problems.
 
1402
            self.assertEqual(0, expected.st_dev)
 
1403
            self.assertEqual(0, actual.st_dev)
 
1404
            self.assertEqual(0, expected.st_ino)
 
1405
            self.assertEqual(0, actual.st_ino)
 
1406
        else:
 
1407
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1408
                             'st_dev did not match')
 
1409
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1410
                             'st_ino did not match')
 
1411
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1412
                         'st_mode did not match')
 
1413
 
 
1414
    def assertLength(self, length, obj_with_len):
 
1415
        """Assert that obj_with_len is of length length."""
 
1416
        if len(obj_with_len) != length:
 
1417
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1418
                length, len(obj_with_len), obj_with_len))
 
1419
 
 
1420
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1421
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1422
        """
 
1423
        captured = []
 
1424
        orig_log_exception_quietly = trace.log_exception_quietly
 
1425
        try:
 
1426
            def capture():
 
1427
                orig_log_exception_quietly()
 
1428
                captured.append(sys.exc_info()[1])
 
1429
            trace.log_exception_quietly = capture
 
1430
            func(*args, **kwargs)
 
1431
        finally:
 
1432
            trace.log_exception_quietly = orig_log_exception_quietly
 
1433
        self.assertLength(1, captured)
 
1434
        err = captured[0]
 
1435
        self.assertIsInstance(err, exception_class)
 
1436
        return err
 
1437
 
 
1438
    def assertPositive(self, val):
 
1439
        """Assert that val is greater than 0."""
 
1440
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1441
 
 
1442
    def assertNegative(self, val):
 
1443
        """Assert that val is less than 0."""
 
1444
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1445
 
299
1446
    def assertStartsWith(self, s, prefix):
300
1447
        if not s.startswith(prefix):
301
1448
            raise AssertionError('string %r does not start with %r' % (s, prefix))
302
1449
 
303
1450
    def assertEndsWith(self, s, suffix):
304
 
        if not s.endswith(prefix):
 
1451
        """Asserts that s ends with suffix."""
 
1452
        if not s.endswith(suffix):
305
1453
            raise AssertionError('string %r does not end with %r' % (s, suffix))
306
1454
 
307
 
    def assertContainsRe(self, haystack, needle_re):
 
1455
    def assertContainsRe(self, haystack, needle_re, flags=0):
308
1456
        """Assert that a contains something matching a regular expression."""
309
 
        if not re.search(needle_re, haystack):
310
 
            raise AssertionError('pattern "%s" not found in "%s"'
 
1457
        if not re.search(needle_re, haystack, flags):
 
1458
            if '\n' in haystack or len(haystack) > 60:
 
1459
                # a long string, format it in a more readable way
 
1460
                raise AssertionError(
 
1461
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1462
                        % (needle_re, haystack))
 
1463
            else:
 
1464
                raise AssertionError('pattern "%s" not found in "%s"'
 
1465
                        % (needle_re, haystack))
 
1466
 
 
1467
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1468
        """Assert that a does not match a regular expression"""
 
1469
        if re.search(needle_re, haystack, flags):
 
1470
            raise AssertionError('pattern "%s" found in "%s"'
311
1471
                    % (needle_re, haystack))
312
1472
 
 
1473
    def assertContainsString(self, haystack, needle):
 
1474
        if haystack.find(needle) == -1:
 
1475
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1476
 
 
1477
    def assertNotContainsString(self, haystack, needle):
 
1478
        if haystack.find(needle) != -1:
 
1479
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1480
 
313
1481
    def assertSubset(self, sublist, superlist):
314
1482
        """Assert that every entry in sublist is present in superlist."""
315
 
        missing = []
316
 
        for entry in sublist:
317
 
            if entry not in superlist:
318
 
                missing.append(entry)
 
1483
        missing = set(sublist) - set(superlist)
319
1484
        if len(missing) > 0:
320
 
            raise AssertionError("value(s) %r not present in container %r" % 
 
1485
            raise AssertionError("value(s) %r not present in container %r" %
321
1486
                                 (missing, superlist))
322
1487
 
323
 
    def assertIs(self, left, right):
 
1488
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1489
        """Fail unless excClass is raised when the iterator from func is used.
 
1490
 
 
1491
        Many functions can return generators this makes sure
 
1492
        to wrap them in a list() call to make sure the whole generator
 
1493
        is run, and that the proper exception is raised.
 
1494
        """
 
1495
        try:
 
1496
            list(func(*args, **kwargs))
 
1497
        except excClass, e:
 
1498
            return e
 
1499
        else:
 
1500
            if getattr(excClass,'__name__', None) is not None:
 
1501
                excName = excClass.__name__
 
1502
            else:
 
1503
                excName = str(excClass)
 
1504
            raise self.failureException, "%s not raised" % excName
 
1505
 
 
1506
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1507
        """Assert that a callable raises a particular exception.
 
1508
 
 
1509
        :param excClass: As for the except statement, this may be either an
 
1510
            exception class, or a tuple of classes.
 
1511
        :param callableObj: A callable, will be passed ``*args`` and
 
1512
            ``**kwargs``.
 
1513
 
 
1514
        Returns the exception so that you can examine it.
 
1515
        """
 
1516
        try:
 
1517
            callableObj(*args, **kwargs)
 
1518
        except excClass, e:
 
1519
            return e
 
1520
        else:
 
1521
            if getattr(excClass,'__name__', None) is not None:
 
1522
                excName = excClass.__name__
 
1523
            else:
 
1524
                # probably a tuple
 
1525
                excName = str(excClass)
 
1526
            raise self.failureException, "%s not raised" % excName
 
1527
 
 
1528
    def assertIs(self, left, right, message=None):
324
1529
        if not (left is right):
325
 
            raise AssertionError("%r is not %r." % (left, right))
 
1530
            if message is not None:
 
1531
                raise AssertionError(message)
 
1532
            else:
 
1533
                raise AssertionError("%r is not %r." % (left, right))
 
1534
 
 
1535
    def assertIsNot(self, left, right, message=None):
 
1536
        if (left is right):
 
1537
            if message is not None:
 
1538
                raise AssertionError(message)
 
1539
            else:
 
1540
                raise AssertionError("%r is %r." % (left, right))
326
1541
 
327
1542
    def assertTransportMode(self, transport, path, mode):
328
 
        """Fail if a path does not have mode mode.
329
 
        
330
 
        If modes are not supported on this platform, the test is skipped.
 
1543
        """Fail if a path does not have mode "mode".
 
1544
 
 
1545
        If modes are not supported on this transport, the assertion is ignored.
331
1546
        """
332
 
        if sys.platform == 'win32':
 
1547
        if not transport._can_roundtrip_unix_modebits():
333
1548
            return
334
1549
        path_stat = transport.stat(path)
335
1550
        actual_mode = stat.S_IMODE(path_stat.st_mode)
336
1551
        self.assertEqual(mode, actual_mode,
337
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
338
 
 
339
 
    def assertIsInstance(self, obj, kls):
340
 
        """Fail if obj is not an instance of kls"""
 
1552
                         'mode of %r incorrect (%s != %s)'
 
1553
                         % (path, oct(mode), oct(actual_mode)))
 
1554
 
 
1555
    def assertIsSameRealPath(self, path1, path2):
 
1556
        """Fail if path1 and path2 points to different files"""
 
1557
        self.assertEqual(osutils.realpath(path1),
 
1558
                         osutils.realpath(path2),
 
1559
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1560
 
 
1561
    def assertIsInstance(self, obj, kls, msg=None):
 
1562
        """Fail if obj is not an instance of kls
 
1563
        
 
1564
        :param msg: Supplementary message to show if the assertion fails.
 
1565
        """
341
1566
        if not isinstance(obj, kls):
342
 
            self.fail("%r is not an instance of %s" % (obj, kls))
 
1567
            m = "%r is an instance of %s rather than %s" % (
 
1568
                obj, obj.__class__, kls)
 
1569
            if msg:
 
1570
                m += ": " + msg
 
1571
            self.fail(m)
 
1572
 
 
1573
    def assertFileEqual(self, content, path):
 
1574
        """Fail if path does not contain 'content'."""
 
1575
        self.assertPathExists(path)
 
1576
        f = file(path, 'rb')
 
1577
        try:
 
1578
            s = f.read()
 
1579
        finally:
 
1580
            f.close()
 
1581
        self.assertEqualDiff(content, s)
 
1582
 
 
1583
    def assertDocstring(self, expected_docstring, obj):
 
1584
        """Fail if obj does not have expected_docstring"""
 
1585
        if __doc__ is None:
 
1586
            # With -OO the docstring should be None instead
 
1587
            self.assertIs(obj.__doc__, None)
 
1588
        else:
 
1589
            self.assertEqual(expected_docstring, obj.__doc__)
 
1590
 
 
1591
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1592
    def failUnlessExists(self, path):
 
1593
        return self.assertPathExists(path)
 
1594
 
 
1595
    def assertPathExists(self, path):
 
1596
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1597
        if not isinstance(path, basestring):
 
1598
            for p in path:
 
1599
                self.assertPathExists(p)
 
1600
        else:
 
1601
            self.assertTrue(osutils.lexists(path),
 
1602
                path + " does not exist")
 
1603
 
 
1604
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1605
    def failIfExists(self, path):
 
1606
        return self.assertPathDoesNotExist(path)
 
1607
 
 
1608
    def assertPathDoesNotExist(self, path):
 
1609
        """Fail if path or paths, which may be abs or relative, exist."""
 
1610
        if not isinstance(path, basestring):
 
1611
            for p in path:
 
1612
                self.assertPathDoesNotExist(p)
 
1613
        else:
 
1614
            self.assertFalse(osutils.lexists(path),
 
1615
                path + " exists")
 
1616
 
 
1617
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1618
        """A helper for callDeprecated and applyDeprecated.
 
1619
 
 
1620
        :param a_callable: A callable to call.
 
1621
        :param args: The positional arguments for the callable
 
1622
        :param kwargs: The keyword arguments for the callable
 
1623
        :return: A tuple (warnings, result). result is the result of calling
 
1624
            a_callable(``*args``, ``**kwargs``).
 
1625
        """
 
1626
        local_warnings = []
 
1627
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1628
            # we've hooked into a deprecation specific callpath,
 
1629
            # only deprecations should getting sent via it.
 
1630
            self.assertEqual(cls, DeprecationWarning)
 
1631
            local_warnings.append(msg)
 
1632
        original_warning_method = symbol_versioning.warn
 
1633
        symbol_versioning.set_warning_method(capture_warnings)
 
1634
        try:
 
1635
            result = a_callable(*args, **kwargs)
 
1636
        finally:
 
1637
            symbol_versioning.set_warning_method(original_warning_method)
 
1638
        return (local_warnings, result)
 
1639
 
 
1640
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1641
        """Call a deprecated callable without warning the user.
 
1642
 
 
1643
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1644
        not other callers that go direct to the warning module.
 
1645
 
 
1646
        To test that a deprecated method raises an error, do something like
 
1647
        this (remember that both assertRaises and applyDeprecated delays *args
 
1648
        and **kwargs passing)::
 
1649
 
 
1650
            self.assertRaises(errors.ReservedId,
 
1651
                self.applyDeprecated,
 
1652
                deprecated_in((1, 5, 0)),
 
1653
                br.append_revision,
 
1654
                'current:')
 
1655
 
 
1656
        :param deprecation_format: The deprecation format that the callable
 
1657
            should have been deprecated with. This is the same type as the
 
1658
            parameter to deprecated_method/deprecated_function. If the
 
1659
            callable is not deprecated with this format, an assertion error
 
1660
            will be raised.
 
1661
        :param a_callable: A callable to call. This may be a bound method or
 
1662
            a regular function. It will be called with ``*args`` and
 
1663
            ``**kwargs``.
 
1664
        :param args: The positional arguments for the callable
 
1665
        :param kwargs: The keyword arguments for the callable
 
1666
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1667
        """
 
1668
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1669
            *args, **kwargs)
 
1670
        expected_first_warning = symbol_versioning.deprecation_string(
 
1671
            a_callable, deprecation_format)
 
1672
        if len(call_warnings) == 0:
 
1673
            self.fail("No deprecation warning generated by call to %s" %
 
1674
                a_callable)
 
1675
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1676
        return result
 
1677
 
 
1678
    def callCatchWarnings(self, fn, *args, **kw):
 
1679
        """Call a callable that raises python warnings.
 
1680
 
 
1681
        The caller's responsible for examining the returned warnings.
 
1682
 
 
1683
        If the callable raises an exception, the exception is not
 
1684
        caught and propagates up to the caller.  In that case, the list
 
1685
        of warnings is not available.
 
1686
 
 
1687
        :returns: ([warning_object, ...], fn_result)
 
1688
        """
 
1689
        # XXX: This is not perfect, because it completely overrides the
 
1690
        # warnings filters, and some code may depend on suppressing particular
 
1691
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1692
        # though.  -- Andrew, 20071062
 
1693
        wlist = []
 
1694
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1695
            # despite the name, 'message' is normally(?) a Warning subclass
 
1696
            # instance
 
1697
            wlist.append(message)
 
1698
        saved_showwarning = warnings.showwarning
 
1699
        saved_filters = warnings.filters
 
1700
        try:
 
1701
            warnings.showwarning = _catcher
 
1702
            warnings.filters = []
 
1703
            result = fn(*args, **kw)
 
1704
        finally:
 
1705
            warnings.showwarning = saved_showwarning
 
1706
            warnings.filters = saved_filters
 
1707
        return wlist, result
 
1708
 
 
1709
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1710
        """Assert that a callable is deprecated in a particular way.
 
1711
 
 
1712
        This is a very precise test for unusual requirements. The
 
1713
        applyDeprecated helper function is probably more suited for most tests
 
1714
        as it allows you to simply specify the deprecation format being used
 
1715
        and will ensure that that is issued for the function being called.
 
1716
 
 
1717
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1718
        not other callers that go direct to the warning module.  To catch
 
1719
        general warnings, use callCatchWarnings.
 
1720
 
 
1721
        :param expected: a list of the deprecation warnings expected, in order
 
1722
        :param callable: The callable to call
 
1723
        :param args: The positional arguments for the callable
 
1724
        :param kwargs: The keyword arguments for the callable
 
1725
        """
 
1726
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1727
            *args, **kwargs)
 
1728
        self.assertEqual(expected, call_warnings)
 
1729
        return result
343
1730
 
344
1731
    def _startLogFile(self):
345
 
        """Send bzr and test log messages to a temporary file.
346
 
 
347
 
        The file is removed as the test is torn down.
348
 
        """
349
 
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
350
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
351
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
352
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
353
 
        self._log_file_name = name
 
1732
        """Setup a in-memory target for bzr and testcase log messages"""
 
1733
        pseudo_log_file = StringIO()
 
1734
        def _get_log_contents_for_weird_testtools_api():
 
1735
            return [pseudo_log_file.getvalue().decode(
 
1736
                "utf-8", "replace").encode("utf-8")]
 
1737
        self.addDetail("log", content.Content(content.ContentType("text",
 
1738
            "plain", {"charset": "utf8"}),
 
1739
            _get_log_contents_for_weird_testtools_api))
 
1740
        self._log_file = pseudo_log_file
 
1741
        self._log_memento = trace.push_log_file(self._log_file)
354
1742
        self.addCleanup(self._finishLogFile)
355
1743
 
356
1744
    def _finishLogFile(self):
357
 
        """Finished with the log file.
358
 
 
359
 
        Read contents into memory, close, and delete.
360
 
        """
361
 
        bzrlib.trace.disable_test_log(self._log_nonce)
362
 
        self._log_file.seek(0)
363
 
        self._log_contents = self._log_file.read()
364
 
        self._log_file.close()
365
 
        os.remove(self._log_file_name)
366
 
        self._log_file = self._log_file_name = None
367
 
 
368
 
    def addCleanup(self, callable):
369
 
        """Arrange to run a callable when this case is torn down.
370
 
 
371
 
        Callables are run in the reverse of the order they are registered, 
372
 
        ie last-in first-out.
373
 
        """
374
 
        if callable in self._cleanups:
375
 
            raise ValueError("cleanup function %r already registered on %s" 
376
 
                    % (callable, self))
377
 
        self._cleanups.append(callable)
 
1745
        """Flush and dereference the in-memory log for this testcase"""
 
1746
        if trace._trace_file:
 
1747
            # flush the log file, to get all content
 
1748
            trace._trace_file.flush()
 
1749
        trace.pop_log_file(self._log_memento)
 
1750
        # The logging module now tracks references for cleanup so discard ours
 
1751
        del self._log_memento
 
1752
 
 
1753
    def thisFailsStrictLockCheck(self):
 
1754
        """It is known that this test would fail with -Dstrict_locks.
 
1755
 
 
1756
        By default, all tests are run with strict lock checking unless
 
1757
        -Edisable_lock_checks is supplied. However there are some tests which
 
1758
        we know fail strict locks at this point that have not been fixed.
 
1759
        They should call this function to disable the strict checking.
 
1760
 
 
1761
        This should be used sparingly, it is much better to fix the locking
 
1762
        issues rather than papering over the problem by calling this function.
 
1763
        """
 
1764
        debug.debug_flags.discard('strict_locks')
 
1765
 
 
1766
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1767
        """Overrides an object attribute restoring it after the test.
 
1768
 
 
1769
        :note: This should be used with discretion; you should think about
 
1770
        whether it's better to make the code testable without monkey-patching.
 
1771
 
 
1772
        :param obj: The object that will be mutated.
 
1773
 
 
1774
        :param attr_name: The attribute name we want to preserve/override in
 
1775
            the object.
 
1776
 
 
1777
        :param new: The optional value we want to set the attribute to.
 
1778
 
 
1779
        :returns: The actual attr value.
 
1780
        """
 
1781
        value = getattr(obj, attr_name)
 
1782
        # The actual value is captured by the call below
 
1783
        self.addCleanup(setattr, obj, attr_name, value)
 
1784
        if new is not _unitialized_attr:
 
1785
            setattr(obj, attr_name, new)
 
1786
        return value
 
1787
 
 
1788
    def overrideEnv(self, name, new):
 
1789
        """Set an environment variable, and reset it after the test.
 
1790
 
 
1791
        :param name: The environment variable name.
 
1792
 
 
1793
        :param new: The value to set the variable to. If None, the 
 
1794
            variable is deleted from the environment.
 
1795
 
 
1796
        :returns: The actual variable value.
 
1797
        """
 
1798
        value = osutils.set_or_unset_env(name, new)
 
1799
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1800
        return value
 
1801
 
 
1802
    def recordCalls(self, obj, attr_name):
 
1803
        """Monkeypatch in a wrapper that will record calls.
 
1804
 
 
1805
        The monkeypatch is automatically removed when the test concludes.
 
1806
 
 
1807
        :param obj: The namespace holding the reference to be replaced;
 
1808
            typically a module, class, or object.
 
1809
        :param attr_name: A string for the name of the attribute to 
 
1810
            patch.
 
1811
        :returns: A list that will be extended with one item every time the
 
1812
            function is called, with a tuple of (args, kwargs).
 
1813
        """
 
1814
        calls = []
 
1815
 
 
1816
        def decorator(*args, **kwargs):
 
1817
            calls.append((args, kwargs))
 
1818
            return orig(*args, **kwargs)
 
1819
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1820
        return calls
378
1821
 
379
1822
    def _cleanEnvironment(self):
380
 
        new_env = {
381
 
            'HOME': os.getcwd(),
382
 
            'APPDATA': os.getcwd(),
383
 
            'BZREMAIL': None,
384
 
            'EMAIL': None,
385
 
        }
386
 
        self.__old_env = {}
387
 
        self.addCleanup(self._restoreEnvironment)
388
 
        for name, value in new_env.iteritems():
389
 
            self._captureVar(name, value)
390
 
 
391
 
 
392
 
    def _captureVar(self, name, newvalue):
393
 
        """Set an environment variable, preparing it to be reset when finished."""
394
 
        self.__old_env[name] = os.environ.get(name, None)
395
 
        if newvalue is None:
396
 
            if name in os.environ:
397
 
                del os.environ[name]
398
 
        else:
399
 
            os.environ[name] = newvalue
400
 
 
401
 
    @staticmethod
402
 
    def _restoreVar(name, value):
403
 
        if value is None:
404
 
            if name in os.environ:
405
 
                del os.environ[name]
406
 
        else:
407
 
            os.environ[name] = value
408
 
 
409
 
    def _restoreEnvironment(self):
410
 
        for name, value in self.__old_env.iteritems():
411
 
            self._restoreVar(name, value)
412
 
 
413
 
    def tearDown(self):
414
 
        self._runCleanups()
415
 
        unittest.TestCase.tearDown(self)
416
 
 
417
 
    def _runCleanups(self):
418
 
        """Run registered cleanup functions. 
419
 
 
420
 
        This should only be called from TestCase.tearDown.
421
 
        """
422
 
        # TODO: Perhaps this should keep running cleanups even if 
423
 
        # one of them fails?
424
 
        for cleanup_fn in reversed(self._cleanups):
425
 
            cleanup_fn()
 
1823
        for name, value in isolated_environ.iteritems():
 
1824
            self.overrideEnv(name, value)
 
1825
 
 
1826
    def _restoreHooks(self):
 
1827
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1828
            setattr(klass, name, hooks)
 
1829
        self._preserved_hooks.clear()
 
1830
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1831
        self._preserved_lazy_hooks.clear()
 
1832
 
 
1833
    def knownFailure(self, reason):
 
1834
        """Declare that this test fails for a known reason
 
1835
 
 
1836
        Tests that are known to fail should generally be using expectedFailure
 
1837
        with an appropriate reverse assertion if a change could cause the test
 
1838
        to start passing. Conversely if the test has no immediate prospect of
 
1839
        succeeding then using skip is more suitable.
 
1840
 
 
1841
        When this method is called while an exception is being handled, that
 
1842
        traceback will be used, otherwise a new exception will be thrown to
 
1843
        provide one but won't be reported.
 
1844
        """
 
1845
        self._add_reason(reason)
 
1846
        try:
 
1847
            exc_info = sys.exc_info()
 
1848
            if exc_info != (None, None, None):
 
1849
                self._report_traceback(exc_info)
 
1850
            else:
 
1851
                try:
 
1852
                    raise self.failureException(reason)
 
1853
                except self.failureException:
 
1854
                    exc_info = sys.exc_info()
 
1855
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1856
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1857
        finally:
 
1858
            del exc_info
 
1859
 
 
1860
    def _suppress_log(self):
 
1861
        """Remove the log info from details."""
 
1862
        self.discardDetail('log')
 
1863
 
 
1864
    def _do_skip(self, result, reason):
 
1865
        self._suppress_log()
 
1866
        addSkip = getattr(result, 'addSkip', None)
 
1867
        if not callable(addSkip):
 
1868
            result.addSuccess(result)
 
1869
        else:
 
1870
            addSkip(self, reason)
 
1871
 
 
1872
    @staticmethod
 
1873
    def _do_known_failure(self, result, e):
 
1874
        self._suppress_log()
 
1875
        err = sys.exc_info()
 
1876
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1877
        if addExpectedFailure is not None:
 
1878
            addExpectedFailure(self, err)
 
1879
        else:
 
1880
            result.addSuccess(self)
 
1881
 
 
1882
    @staticmethod
 
1883
    def _do_not_applicable(self, result, e):
 
1884
        if not e.args:
 
1885
            reason = 'No reason given'
 
1886
        else:
 
1887
            reason = e.args[0]
 
1888
        self._suppress_log ()
 
1889
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1890
        if addNotApplicable is not None:
 
1891
            result.addNotApplicable(self, reason)
 
1892
        else:
 
1893
            self._do_skip(result, reason)
 
1894
 
 
1895
    @staticmethod
 
1896
    def _report_skip(self, result, err):
 
1897
        """Override the default _report_skip.
 
1898
 
 
1899
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1900
        already been formatted into the 'reason' string, and we can't pull it
 
1901
        out again.
 
1902
        """
 
1903
        self._suppress_log()
 
1904
        super(TestCase, self)._report_skip(self, result, err)
 
1905
 
 
1906
    @staticmethod
 
1907
    def _report_expected_failure(self, result, err):
 
1908
        """Strip the log.
 
1909
 
 
1910
        See _report_skip for motivation.
 
1911
        """
 
1912
        self._suppress_log()
 
1913
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1914
 
 
1915
    @staticmethod
 
1916
    def _do_unsupported_or_skip(self, result, e):
 
1917
        reason = e.args[0]
 
1918
        self._suppress_log()
 
1919
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1920
        if addNotSupported is not None:
 
1921
            result.addNotSupported(self, reason)
 
1922
        else:
 
1923
            self._do_skip(result, reason)
 
1924
 
 
1925
    def time(self, callable, *args, **kwargs):
 
1926
        """Run callable and accrue the time it takes to the benchmark time.
 
1927
 
 
1928
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1929
        this will cause lsprofile statistics to be gathered and stored in
 
1930
        self._benchcalls.
 
1931
        """
 
1932
        if self._benchtime is None:
 
1933
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1934
                "text", "plain"), lambda:[str(self._benchtime)]))
 
1935
            self._benchtime = 0
 
1936
        start = time.time()
 
1937
        try:
 
1938
            if not self._gather_lsprof_in_benchmarks:
 
1939
                return callable(*args, **kwargs)
 
1940
            else:
 
1941
                # record this benchmark
 
1942
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1943
                stats.sort()
 
1944
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1945
                return ret
 
1946
        finally:
 
1947
            self._benchtime += time.time() - start
426
1948
 
427
1949
    def log(self, *args):
428
 
        mutter(*args)
429
 
 
430
 
    def _get_log(self):
431
 
        """Return as a string the log for this test"""
432
 
        if self._log_file_name:
433
 
            return open(self._log_file_name).read()
434
 
        else:
435
 
            return self._log_contents
436
 
        # TODO: Delete the log after it's been read in
437
 
 
438
 
    def capture(self, cmd, retcode=0):
439
 
        """Shortcut that splits cmd into words, runs, and returns stdout"""
440
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
441
 
 
442
 
    def run_bzr_captured(self, argv, retcode=0):
443
 
        """Invoke bzr and return (stdout, stderr).
444
 
 
445
 
        Useful for code that wants to check the contents of the
446
 
        output, the way error messages are presented, etc.
447
 
 
448
 
        This should be the main method for tests that want to exercise the
449
 
        overall behavior of the bzr application (rather than a unit test
450
 
        or a functional test of the library.)
451
 
 
452
 
        Much of the old code runs bzr by forking a new copy of Python, but
453
 
        that is slower, harder to debug, and generally not necessary.
454
 
 
455
 
        This runs bzr through the interface that catches and reports
456
 
        errors, and with logging set to something approximating the
457
 
        default, so that error reporting can be checked.
458
 
 
459
 
        argv -- arguments to invoke bzr
460
 
        retcode -- expected return code, or None for don't-care.
461
 
        """
462
 
        stdout = StringIO()
463
 
        stderr = StringIO()
464
 
        self.log('run bzr: %s', ' '.join(argv))
 
1950
        trace.mutter(*args)
 
1951
 
 
1952
    def get_log(self):
 
1953
        """Get a unicode string containing the log from bzrlib.trace.
 
1954
 
 
1955
        Undecodable characters are replaced.
 
1956
        """
 
1957
        return u"".join(self.getDetails()['log'].iter_text())
 
1958
 
 
1959
    def requireFeature(self, feature):
 
1960
        """This test requires a specific feature is available.
 
1961
 
 
1962
        :raises UnavailableFeature: When feature is not available.
 
1963
        """
 
1964
        if not feature.available():
 
1965
            raise UnavailableFeature(feature)
 
1966
 
 
1967
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1968
            working_dir):
 
1969
        """Run bazaar command line, splitting up a string command line."""
 
1970
        if isinstance(args, basestring):
 
1971
            # shlex don't understand unicode strings,
 
1972
            # so args should be plain string (bialix 20070906)
 
1973
            args = list(shlex.split(str(args)))
 
1974
        return self._run_bzr_core(args, retcode=retcode,
 
1975
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1976
                )
 
1977
 
 
1978
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1979
            working_dir):
 
1980
        # Clear chk_map page cache, because the contents are likely to mask
 
1981
        # locking errors.
 
1982
        chk_map.clear_cache()
 
1983
        if encoding is None:
 
1984
            encoding = osutils.get_user_encoding()
 
1985
        stdout = StringIOWrapper()
 
1986
        stderr = StringIOWrapper()
 
1987
        stdout.encoding = encoding
 
1988
        stderr.encoding = encoding
 
1989
 
 
1990
        self.log('run bzr: %r', args)
465
1991
        # FIXME: don't call into logging here
466
 
        handler = logging.StreamHandler(stderr)
467
 
        handler.setFormatter(bzrlib.trace.QuietFormatter())
468
 
        handler.setLevel(logging.INFO)
 
1992
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
1993
            level=logging.INFO)
469
1994
        logger = logging.getLogger('')
470
1995
        logger.addHandler(handler)
 
1996
        old_ui_factory = ui.ui_factory
 
1997
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1998
 
 
1999
        cwd = None
 
2000
        if working_dir is not None:
 
2001
            cwd = osutils.getcwd()
 
2002
            os.chdir(working_dir)
 
2003
 
471
2004
        try:
472
 
            result = self.apply_redirected(None, stdout, stderr,
473
 
                                           bzrlib.commands.run_bzr_catch_errors,
474
 
                                           argv)
 
2005
            try:
 
2006
                result = self.apply_redirected(
 
2007
                    ui.ui_factory.stdin,
 
2008
                    stdout, stderr,
 
2009
                    _mod_commands.run_bzr_catch_user_errors,
 
2010
                    args)
 
2011
            except KeyboardInterrupt:
 
2012
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
2013
                # and stderr as arguments, for tests which are interested in
 
2014
                # stdout and stderr and are expecting the exception.
 
2015
                out = stdout.getvalue()
 
2016
                err = stderr.getvalue()
 
2017
                if out:
 
2018
                    self.log('output:\n%r', out)
 
2019
                if err:
 
2020
                    self.log('errors:\n%r', err)
 
2021
                raise KeyboardInterrupt(out, err)
475
2022
        finally:
476
2023
            logger.removeHandler(handler)
 
2024
            ui.ui_factory = old_ui_factory
 
2025
            if cwd is not None:
 
2026
                os.chdir(cwd)
 
2027
 
477
2028
        out = stdout.getvalue()
478
2029
        err = stderr.getvalue()
479
2030
        if out:
480
 
            self.log('output:\n%s', out)
 
2031
            self.log('output:\n%r', out)
481
2032
        if err:
482
 
            self.log('errors:\n%s', err)
 
2033
            self.log('errors:\n%r', err)
483
2034
        if retcode is not None:
484
 
            self.assertEquals(result, retcode)
485
 
        return out, err
 
2035
            self.assertEquals(retcode, result,
 
2036
                              message='Unexpected return code')
 
2037
        return result, out, err
486
2038
 
487
 
    def run_bzr(self, *args, **kwargs):
 
2039
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
2040
                working_dir=None, error_regexes=[], output_encoding=None):
488
2041
        """Invoke bzr, as if it were run from the command line.
489
2042
 
 
2043
        The argument list should not include the bzr program name - the
 
2044
        first argument is normally the bzr command.  Arguments may be
 
2045
        passed in three ways:
 
2046
 
 
2047
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
2048
        when the command contains whitespace or metacharacters, or
 
2049
        is built up at run time.
 
2050
 
 
2051
        2- A single string, eg "add a".  This is the most convenient
 
2052
        for hardcoded commands.
 
2053
 
 
2054
        This runs bzr through the interface that catches and reports
 
2055
        errors, and with logging set to something approximating the
 
2056
        default, so that error reporting can be checked.
 
2057
 
490
2058
        This should be the main method for tests that want to exercise the
491
2059
        overall behavior of the bzr application (rather than a unit test
492
2060
        or a functional test of the library.)
493
2061
 
494
2062
        This sends the stdout/stderr results into the test's log,
495
2063
        where it may be useful for debugging.  See also run_captured.
496
 
        """
497
 
        retcode = kwargs.pop('retcode', 0)
498
 
        return self.run_bzr_captured(args, retcode)
499
 
 
500
 
    def check_inventory_shape(self, inv, shape):
501
 
        """Compare an inventory to a list of expected names.
 
2064
 
 
2065
        :keyword stdin: A string to be used as stdin for the command.
 
2066
        :keyword retcode: The status code the command should return;
 
2067
            default 0.
 
2068
        :keyword working_dir: The directory to run the command in
 
2069
        :keyword error_regexes: A list of expected error messages.  If
 
2070
            specified they must be seen in the error output of the command.
 
2071
        """
 
2072
        retcode, out, err = self._run_bzr_autosplit(
 
2073
            args=args,
 
2074
            retcode=retcode,
 
2075
            encoding=encoding,
 
2076
            stdin=stdin,
 
2077
            working_dir=working_dir,
 
2078
            )
 
2079
        self.assertIsInstance(error_regexes, (list, tuple))
 
2080
        for regex in error_regexes:
 
2081
            self.assertContainsRe(err, regex)
 
2082
        return out, err
 
2083
 
 
2084
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2085
        """Run bzr, and check that stderr contains the supplied regexes
 
2086
 
 
2087
        :param error_regexes: Sequence of regular expressions which
 
2088
            must each be found in the error output. The relative ordering
 
2089
            is not enforced.
 
2090
        :param args: command-line arguments for bzr
 
2091
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
2092
            This function changes the default value of retcode to be 3,
 
2093
            since in most cases this is run when you expect bzr to fail.
 
2094
 
 
2095
        :return: (out, err) The actual output of running the command (in case
 
2096
            you want to do more inspection)
 
2097
 
 
2098
        Examples of use::
 
2099
 
 
2100
            # Make sure that commit is failing because there is nothing to do
 
2101
            self.run_bzr_error(['no changes to commit'],
 
2102
                               ['commit', '-m', 'my commit comment'])
 
2103
            # Make sure --strict is handling an unknown file, rather than
 
2104
            # giving us the 'nothing to do' error
 
2105
            self.build_tree(['unknown'])
 
2106
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
2107
                               ['commit', --strict', '-m', 'my commit comment'])
 
2108
        """
 
2109
        kwargs.setdefault('retcode', 3)
 
2110
        kwargs['error_regexes'] = error_regexes
 
2111
        out, err = self.run_bzr(*args, **kwargs)
 
2112
        return out, err
 
2113
 
 
2114
    def run_bzr_subprocess(self, *args, **kwargs):
 
2115
        """Run bzr in a subprocess for testing.
 
2116
 
 
2117
        This starts a new Python interpreter and runs bzr in there.
 
2118
        This should only be used for tests that have a justifiable need for
 
2119
        this isolation: e.g. they are testing startup time, or signal
 
2120
        handling, or early startup code, etc.  Subprocess code can't be
 
2121
        profiled or debugged so easily.
 
2122
 
 
2123
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2124
            None is supplied, the status code is not checked.
 
2125
        :keyword env_changes: A dictionary which lists changes to environment
 
2126
            variables. A value of None will unset the env variable.
 
2127
            The values must be strings. The change will only occur in the
 
2128
            child, so you don't need to fix the environment after running.
 
2129
        :keyword universal_newlines: Convert CRLF => LF
 
2130
        :keyword allow_plugins: By default the subprocess is run with
 
2131
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2132
            for system-wide plugins to create unexpected output on stderr,
 
2133
            which can cause unnecessary test failures.
 
2134
        """
 
2135
        env_changes = kwargs.get('env_changes', {})
 
2136
        working_dir = kwargs.get('working_dir', None)
 
2137
        allow_plugins = kwargs.get('allow_plugins', False)
 
2138
        if len(args) == 1:
 
2139
            if isinstance(args[0], list):
 
2140
                args = args[0]
 
2141
            elif isinstance(args[0], basestring):
 
2142
                args = list(shlex.split(args[0]))
 
2143
        else:
 
2144
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2145
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2146
                                            working_dir=working_dir,
 
2147
                                            allow_plugins=allow_plugins)
 
2148
        # We distinguish between retcode=None and retcode not passed.
 
2149
        supplied_retcode = kwargs.get('retcode', 0)
 
2150
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2151
            universal_newlines=kwargs.get('universal_newlines', False),
 
2152
            process_args=args)
 
2153
 
 
2154
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2155
                             skip_if_plan_to_signal=False,
 
2156
                             working_dir=None,
 
2157
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2158
        """Start bzr in a subprocess for testing.
 
2159
 
 
2160
        This starts a new Python interpreter and runs bzr in there.
 
2161
        This should only be used for tests that have a justifiable need for
 
2162
        this isolation: e.g. they are testing startup time, or signal
 
2163
        handling, or early startup code, etc.  Subprocess code can't be
 
2164
        profiled or debugged so easily.
 
2165
 
 
2166
        :param process_args: a list of arguments to pass to the bzr executable,
 
2167
            for example ``['--version']``.
 
2168
        :param env_changes: A dictionary which lists changes to environment
 
2169
            variables. A value of None will unset the env variable.
 
2170
            The values must be strings. The change will only occur in the
 
2171
            child, so you don't need to fix the environment after running.
 
2172
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2173
            doesn't support signalling subprocesses.
 
2174
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2175
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2176
            are those valid for the stderr argument of `subprocess.Popen`.
 
2177
            Default value is ``subprocess.PIPE``.
 
2178
 
 
2179
        :returns: Popen object for the started process.
 
2180
        """
 
2181
        if skip_if_plan_to_signal:
 
2182
            if os.name != "posix":
 
2183
                raise TestSkipped("Sending signals not supported")
 
2184
 
 
2185
        if env_changes is None:
 
2186
            env_changes = {}
 
2187
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2188
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2189
        # gets set to the computed directory of this parent process.
 
2190
        if site.USER_BASE is not None:
 
2191
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
 
2192
        old_env = {}
 
2193
 
 
2194
        def cleanup_environment():
 
2195
            for env_var, value in env_changes.iteritems():
 
2196
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2197
 
 
2198
        def restore_environment():
 
2199
            for env_var, value in old_env.iteritems():
 
2200
                osutils.set_or_unset_env(env_var, value)
 
2201
 
 
2202
        bzr_path = self.get_bzr_path()
 
2203
 
 
2204
        cwd = None
 
2205
        if working_dir is not None:
 
2206
            cwd = osutils.getcwd()
 
2207
            os.chdir(working_dir)
 
2208
 
 
2209
        try:
 
2210
            # win32 subprocess doesn't support preexec_fn
 
2211
            # so we will avoid using it on all platforms, just to
 
2212
            # make sure the code path is used, and we don't break on win32
 
2213
            cleanup_environment()
 
2214
            # Include the subprocess's log file in the test details, in case
 
2215
            # the test fails due to an error in the subprocess.
 
2216
            self._add_subprocess_log(trace._get_bzr_log_filename())
 
2217
            command = [sys.executable]
 
2218
            # frozen executables don't need the path to bzr
 
2219
            if getattr(sys, "frozen", None) is None:
 
2220
                command.append(bzr_path)
 
2221
            if not allow_plugins:
 
2222
                command.append('--no-plugins')
 
2223
            command.extend(process_args)
 
2224
            process = self._popen(command, stdin=subprocess.PIPE,
 
2225
                                  stdout=subprocess.PIPE,
 
2226
                                  stderr=stderr)
 
2227
        finally:
 
2228
            restore_environment()
 
2229
            if cwd is not None:
 
2230
                os.chdir(cwd)
 
2231
 
 
2232
        return process
 
2233
 
 
2234
    def _add_subprocess_log(self, log_file_path):
 
2235
        if len(self._log_files) == 0:
 
2236
            # Register an addCleanup func.  We do this on the first call to
 
2237
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2238
            # addCleanup is registered after any cleanups for tempdirs that
 
2239
            # subclasses might create, which will probably remove the log file
 
2240
            # we want to read.
 
2241
            self.addCleanup(self._subprocess_log_cleanup)
 
2242
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2243
        # twice.
 
2244
        self._log_files.add(log_file_path)
 
2245
 
 
2246
    def _subprocess_log_cleanup(self):
 
2247
        for count, log_file_path in enumerate(self._log_files):
 
2248
            # We use buffer_now=True to avoid holding the file open beyond
 
2249
            # the life of this function, which might interfere with e.g.
 
2250
            # cleaning tempdirs on Windows.
 
2251
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2252
            #detail_content = content.content_from_file(
 
2253
            #    log_file_path, buffer_now=True)
 
2254
            with open(log_file_path, 'rb') as log_file:
 
2255
                log_file_bytes = log_file.read()
 
2256
            detail_content = content.Content(content.ContentType("text",
 
2257
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2258
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2259
                detail_content)
 
2260
 
 
2261
    def _popen(self, *args, **kwargs):
 
2262
        """Place a call to Popen.
 
2263
 
 
2264
        Allows tests to override this method to intercept the calls made to
 
2265
        Popen for introspection.
 
2266
        """
 
2267
        return subprocess.Popen(*args, **kwargs)
 
2268
 
 
2269
    def get_source_path(self):
 
2270
        """Return the path of the directory containing bzrlib."""
 
2271
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2272
 
 
2273
    def get_bzr_path(self):
 
2274
        """Return the path of the 'bzr' executable for this test suite."""
 
2275
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
2276
        if not os.path.isfile(bzr_path):
 
2277
            # We are probably installed. Assume sys.argv is the right file
 
2278
            bzr_path = sys.argv[0]
 
2279
        return bzr_path
 
2280
 
 
2281
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2282
                              universal_newlines=False, process_args=None):
 
2283
        """Finish the execution of process.
 
2284
 
 
2285
        :param process: the Popen object returned from start_bzr_subprocess.
 
2286
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2287
            None is supplied, the status code is not checked.
 
2288
        :param send_signal: an optional signal to send to the process.
 
2289
        :param universal_newlines: Convert CRLF => LF
 
2290
        :returns: (stdout, stderr)
 
2291
        """
 
2292
        if send_signal is not None:
 
2293
            os.kill(process.pid, send_signal)
 
2294
        out, err = process.communicate()
 
2295
 
 
2296
        if universal_newlines:
 
2297
            out = out.replace('\r\n', '\n')
 
2298
            err = err.replace('\r\n', '\n')
 
2299
 
 
2300
        if retcode is not None and retcode != process.returncode:
 
2301
            if process_args is None:
 
2302
                process_args = "(unknown args)"
 
2303
            trace.mutter('Output of bzr %s:\n%s', process_args, out)
 
2304
            trace.mutter('Error for bzr %s:\n%s', process_args, err)
 
2305
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2306
                      % (process_args, retcode, process.returncode))
 
2307
        return [out, err]
 
2308
 
 
2309
    def check_tree_shape(self, tree, shape):
 
2310
        """Compare a tree to a list of expected names.
502
2311
 
503
2312
        Fail if they are not precisely equal.
504
2313
        """
505
2314
        extras = []
506
2315
        shape = list(shape)             # copy
507
 
        for path, ie in inv.entries():
 
2316
        for path, ie in tree.iter_entries_by_dir():
508
2317
            name = path.replace('\\', '/')
509
 
            if ie.kind == 'dir':
 
2318
            if ie.kind == 'directory':
510
2319
                name = name + '/'
511
 
            if name in shape:
 
2320
            if name == "/":
 
2321
                pass # ignore root entry
 
2322
            elif name in shape:
512
2323
                shape.remove(name)
513
2324
            else:
514
2325
                extras.append(name)
549
2360
            sys.stderr = real_stderr
550
2361
            sys.stdin = real_stdin
551
2362
 
552
 
 
553
 
BzrTestBase = TestCase
554
 
 
555
 
     
556
 
class TestCaseInTempDir(TestCase):
 
2363
    def reduceLockdirTimeout(self):
 
2364
        """Reduce the default lock timeout for the duration of the test, so that
 
2365
        if LockContention occurs during a test, it does so quickly.
 
2366
 
 
2367
        Tests that expect to provoke LockContention errors should call this.
 
2368
        """
 
2369
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2370
 
 
2371
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2372
        """Return a StringIOWrapper instance, that will encode Unicode
 
2373
        input to UTF-8.
 
2374
        """
 
2375
        if encoding_type is None:
 
2376
            encoding_type = 'strict'
 
2377
        sio = StringIO()
 
2378
        output_encoding = 'utf-8'
 
2379
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2380
        sio.encoding = output_encoding
 
2381
        return sio
 
2382
 
 
2383
    def disable_verb(self, verb):
 
2384
        """Disable a smart server verb for one test."""
 
2385
        from bzrlib.smart import request
 
2386
        request_handlers = request.request_handlers
 
2387
        orig_method = request_handlers.get(verb)
 
2388
        orig_info = request_handlers.get_info(verb)
 
2389
        request_handlers.remove(verb)
 
2390
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2391
            info=orig_info)
 
2392
 
 
2393
 
 
2394
class CapturedCall(object):
 
2395
    """A helper for capturing smart server calls for easy debug analysis."""
 
2396
 
 
2397
    def __init__(self, params, prefix_length):
 
2398
        """Capture the call with params and skip prefix_length stack frames."""
 
2399
        self.call = params
 
2400
        import traceback
 
2401
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2402
        # client frames. Beyond this we could get more clever, but this is good
 
2403
        # enough for now.
 
2404
        stack = traceback.extract_stack()[prefix_length:-5]
 
2405
        self.stack = ''.join(traceback.format_list(stack))
 
2406
 
 
2407
    def __str__(self):
 
2408
        return self.call.method
 
2409
 
 
2410
    def __repr__(self):
 
2411
        return self.call.method
 
2412
 
 
2413
    def stack(self):
 
2414
        return self.stack
 
2415
 
 
2416
 
 
2417
class TestCaseWithMemoryTransport(TestCase):
 
2418
    """Common test class for tests that do not need disk resources.
 
2419
 
 
2420
    Tests that need disk resources should derive from TestCaseInTempDir
 
2421
    orTestCaseWithTransport.
 
2422
 
 
2423
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2424
 
 
2425
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2426
    a directory which does not exist. This serves to help ensure test isolation
 
2427
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2428
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2429
    defaults for the transport in tests, nor does it obey the command line
 
2430
    override, so tests that accidentally write to the common directory should
 
2431
    be rare.
 
2432
 
 
2433
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2434
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2435
    """
 
2436
 
 
2437
    TEST_ROOT = None
 
2438
    _TEST_NAME = 'test'
 
2439
 
 
2440
    def __init__(self, methodName='runTest'):
 
2441
        # allow test parameterization after test construction and before test
 
2442
        # execution. Variables that the parameterizer sets need to be
 
2443
        # ones that are not set by setUp, or setUp will trash them.
 
2444
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2445
        self.vfs_transport_factory = default_transport
 
2446
        self.transport_server = None
 
2447
        self.transport_readonly_server = None
 
2448
        self.__vfs_server = None
 
2449
 
 
2450
    def get_transport(self, relpath=None):
 
2451
        """Return a writeable transport.
 
2452
 
 
2453
        This transport is for the test scratch space relative to
 
2454
        "self._test_root"
 
2455
 
 
2456
        :param relpath: a path relative to the base url.
 
2457
        """
 
2458
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2459
        self.assertFalse(t.is_readonly())
 
2460
        return t
 
2461
 
 
2462
    def get_readonly_transport(self, relpath=None):
 
2463
        """Return a readonly transport for the test scratch space
 
2464
 
 
2465
        This can be used to test that operations which should only need
 
2466
        readonly access in fact do not try to write.
 
2467
 
 
2468
        :param relpath: a path relative to the base url.
 
2469
        """
 
2470
        t = _mod_transport.get_transport_from_url(
 
2471
            self.get_readonly_url(relpath))
 
2472
        self.assertTrue(t.is_readonly())
 
2473
        return t
 
2474
 
 
2475
    def create_transport_readonly_server(self):
 
2476
        """Create a transport server from class defined at init.
 
2477
 
 
2478
        This is mostly a hook for daughter classes.
 
2479
        """
 
2480
        return self.transport_readonly_server()
 
2481
 
 
2482
    def get_readonly_server(self):
 
2483
        """Get the server instance for the readonly transport
 
2484
 
 
2485
        This is useful for some tests with specific servers to do diagnostics.
 
2486
        """
 
2487
        if self.__readonly_server is None:
 
2488
            if self.transport_readonly_server is None:
 
2489
                # readonly decorator requested
 
2490
                self.__readonly_server = test_server.ReadonlyServer()
 
2491
            else:
 
2492
                # explicit readonly transport.
 
2493
                self.__readonly_server = self.create_transport_readonly_server()
 
2494
            self.start_server(self.__readonly_server,
 
2495
                self.get_vfs_only_server())
 
2496
        return self.__readonly_server
 
2497
 
 
2498
    def get_readonly_url(self, relpath=None):
 
2499
        """Get a URL for the readonly transport.
 
2500
 
 
2501
        This will either be backed by '.' or a decorator to the transport
 
2502
        used by self.get_url()
 
2503
        relpath provides for clients to get a path relative to the base url.
 
2504
        These should only be downwards relative, not upwards.
 
2505
        """
 
2506
        base = self.get_readonly_server().get_url()
 
2507
        return self._adjust_url(base, relpath)
 
2508
 
 
2509
    def get_vfs_only_server(self):
 
2510
        """Get the vfs only read/write server instance.
 
2511
 
 
2512
        This is useful for some tests with specific servers that need
 
2513
        diagnostics.
 
2514
 
 
2515
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2516
        is no means to override it.
 
2517
        """
 
2518
        if self.__vfs_server is None:
 
2519
            self.__vfs_server = memory.MemoryServer()
 
2520
            self.start_server(self.__vfs_server)
 
2521
        return self.__vfs_server
 
2522
 
 
2523
    def get_server(self):
 
2524
        """Get the read/write server instance.
 
2525
 
 
2526
        This is useful for some tests with specific servers that need
 
2527
        diagnostics.
 
2528
 
 
2529
        This is built from the self.transport_server factory. If that is None,
 
2530
        then the self.get_vfs_server is returned.
 
2531
        """
 
2532
        if self.__server is None:
 
2533
            if (self.transport_server is None or self.transport_server is
 
2534
                self.vfs_transport_factory):
 
2535
                self.__server = self.get_vfs_only_server()
 
2536
            else:
 
2537
                # bring up a decorated means of access to the vfs only server.
 
2538
                self.__server = self.transport_server()
 
2539
                self.start_server(self.__server, self.get_vfs_only_server())
 
2540
        return self.__server
 
2541
 
 
2542
    def _adjust_url(self, base, relpath):
 
2543
        """Get a URL (or maybe a path) for the readwrite transport.
 
2544
 
 
2545
        This will either be backed by '.' or to an equivalent non-file based
 
2546
        facility.
 
2547
        relpath provides for clients to get a path relative to the base url.
 
2548
        These should only be downwards relative, not upwards.
 
2549
        """
 
2550
        if relpath is not None and relpath != '.':
 
2551
            if not base.endswith('/'):
 
2552
                base = base + '/'
 
2553
            # XXX: Really base should be a url; we did after all call
 
2554
            # get_url()!  But sometimes it's just a path (from
 
2555
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2556
            # to a non-escaped local path.
 
2557
            if base.startswith('./') or base.startswith('/'):
 
2558
                base += relpath
 
2559
            else:
 
2560
                base += urlutils.escape(relpath)
 
2561
        return base
 
2562
 
 
2563
    def get_url(self, relpath=None):
 
2564
        """Get a URL (or maybe a path) for the readwrite transport.
 
2565
 
 
2566
        This will either be backed by '.' or to an equivalent non-file based
 
2567
        facility.
 
2568
        relpath provides for clients to get a path relative to the base url.
 
2569
        These should only be downwards relative, not upwards.
 
2570
        """
 
2571
        base = self.get_server().get_url()
 
2572
        return self._adjust_url(base, relpath)
 
2573
 
 
2574
    def get_vfs_only_url(self, relpath=None):
 
2575
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2576
 
 
2577
        This will never be a smart protocol.  It always has all the
 
2578
        capabilities of the local filesystem, but it might actually be a
 
2579
        MemoryTransport or some other similar virtual filesystem.
 
2580
 
 
2581
        This is the backing transport (if any) of the server returned by
 
2582
        get_url and get_readonly_url.
 
2583
 
 
2584
        :param relpath: provides for clients to get a path relative to the base
 
2585
            url.  These should only be downwards relative, not upwards.
 
2586
        :return: A URL
 
2587
        """
 
2588
        base = self.get_vfs_only_server().get_url()
 
2589
        return self._adjust_url(base, relpath)
 
2590
 
 
2591
    def _create_safety_net(self):
 
2592
        """Make a fake bzr directory.
 
2593
 
 
2594
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2595
        real branch.
 
2596
        """
 
2597
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2598
        try:
 
2599
            # Make sure we get a readable and accessible home for .bzr.log
 
2600
            # and/or config files, and not fallback to weird defaults (see
 
2601
            # http://pad.lv/825027).
 
2602
            self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2603
            os.environ['BZR_HOME'] = root
 
2604
            wt = controldir.ControlDir.create_standalone_workingtree(root)
 
2605
            del os.environ['BZR_HOME']
 
2606
        except Exception, e:
 
2607
            self.fail("Fail to initialize the safety net: %r\n" % (e,))
 
2608
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2609
        # we don't need to re-open the wt to check it hasn't changed.
 
2610
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2611
            wt.control_transport.get_bytes('dirstate'))
 
2612
 
 
2613
    def _check_safety_net(self):
 
2614
        """Check that the safety .bzr directory have not been touched.
 
2615
 
 
2616
        _make_test_root have created a .bzr directory to prevent tests from
 
2617
        propagating. This method ensures than a test did not leaked.
 
2618
        """
 
2619
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2620
        t = _mod_transport.get_transport_from_path(root)
 
2621
        self.permit_url(t.base)
 
2622
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2623
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2624
            # The current test have modified the /bzr directory, we need to
 
2625
            # recreate a new one or all the followng tests will fail.
 
2626
            # If you need to inspect its content uncomment the following line
 
2627
            # import pdb; pdb.set_trace()
 
2628
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2629
            self._create_safety_net()
 
2630
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2631
 
 
2632
    def _make_test_root(self):
 
2633
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2634
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2635
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2636
                                                    suffix='.tmp'))
 
2637
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2638
 
 
2639
            self._create_safety_net()
 
2640
 
 
2641
            # The same directory is used by all tests, and we're not
 
2642
            # specifically told when all tests are finished.  This will do.
 
2643
            atexit.register(_rmtree_temp_dir, root)
 
2644
 
 
2645
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2646
        self.addCleanup(self._check_safety_net)
 
2647
 
 
2648
    def makeAndChdirToTestDir(self):
 
2649
        """Create a temporary directories for this one test.
 
2650
 
 
2651
        This must set self.test_home_dir and self.test_dir and chdir to
 
2652
        self.test_dir.
 
2653
 
 
2654
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2655
        """
 
2656
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2657
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2658
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2659
        self.permit_dir(self.test_dir)
 
2660
 
 
2661
    def make_branch(self, relpath, format=None, name=None):
 
2662
        """Create a branch on the transport at relpath."""
 
2663
        repo = self.make_repository(relpath, format=format)
 
2664
        return repo.bzrdir.create_branch(append_revisions_only=False,
 
2665
                                         name=name)
 
2666
 
 
2667
    def get_default_format(self):
 
2668
        return 'default'
 
2669
 
 
2670
    def resolve_format(self, format):
 
2671
        """Resolve an object to a ControlDir format object.
 
2672
 
 
2673
        The initial format object can either already be
 
2674
        a ControlDirFormat, None (for the default format),
 
2675
        or a string with the name of the control dir format.
 
2676
 
 
2677
        :param format: Object to resolve
 
2678
        :return A ControlDirFormat instance
 
2679
        """
 
2680
        if format is None:
 
2681
            format = self.get_default_format()
 
2682
        if isinstance(format, basestring):
 
2683
            format = controldir.format_registry.make_bzrdir(format)
 
2684
        return format
 
2685
 
 
2686
    def make_bzrdir(self, relpath, format=None):
 
2687
        try:
 
2688
            # might be a relative or absolute path
 
2689
            maybe_a_url = self.get_url(relpath)
 
2690
            segments = maybe_a_url.rsplit('/', 1)
 
2691
            t = _mod_transport.get_transport(maybe_a_url)
 
2692
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2693
                t.ensure_base()
 
2694
            format = self.resolve_format(format)
 
2695
            return format.initialize_on_transport(t)
 
2696
        except errors.UninitializableFormat:
 
2697
            raise TestSkipped("Format %s is not initializable." % format)
 
2698
 
 
2699
    def make_repository(self, relpath, shared=None, format=None):
 
2700
        """Create a repository on our default transport at relpath.
 
2701
 
 
2702
        Note that relpath must be a relative path, not a full url.
 
2703
        """
 
2704
        # FIXME: If you create a remoterepository this returns the underlying
 
2705
        # real format, which is incorrect.  Actually we should make sure that
 
2706
        # RemoteBzrDir returns a RemoteRepository.
 
2707
        # maybe  mbp 20070410
 
2708
        made_control = self.make_bzrdir(relpath, format=format)
 
2709
        return made_control.create_repository(shared=shared)
 
2710
 
 
2711
    def make_smart_server(self, path, backing_server=None):
 
2712
        if backing_server is None:
 
2713
            backing_server = self.get_server()
 
2714
        smart_server = test_server.SmartTCPServer_for_testing()
 
2715
        self.start_server(smart_server, backing_server)
 
2716
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2717
                                                   ).clone(path)
 
2718
        return remote_transport
 
2719
 
 
2720
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2721
        """Create a branch on the default transport and a MemoryTree for it."""
 
2722
        b = self.make_branch(relpath, format=format)
 
2723
        return memorytree.MemoryTree.create_on_branch(b)
 
2724
 
 
2725
    def make_branch_builder(self, relpath, format=None):
 
2726
        branch = self.make_branch(relpath, format=format)
 
2727
        return branchbuilder.BranchBuilder(branch=branch)
 
2728
 
 
2729
    def overrideEnvironmentForTesting(self):
 
2730
        test_home_dir = self.test_home_dir
 
2731
        if isinstance(test_home_dir, unicode):
 
2732
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2733
        self.overrideEnv('HOME', test_home_dir)
 
2734
        self.overrideEnv('BZR_HOME', test_home_dir)
 
2735
 
 
2736
    def setUp(self):
 
2737
        super(TestCaseWithMemoryTransport, self).setUp()
 
2738
 
 
2739
        def _add_disconnect_cleanup(transport):
 
2740
            """Schedule disconnection of given transport at test cleanup
 
2741
 
 
2742
            This needs to happen for all connected transports or leaks occur.
 
2743
 
 
2744
            Note reconnections may mean we call disconnect multiple times per
 
2745
            transport which is suboptimal but seems harmless.
 
2746
            """
 
2747
            self.addCleanup(transport.disconnect)
 
2748
 
 
2749
        _mod_transport.Transport.hooks.install_named_hook('post_connect',
 
2750
            _add_disconnect_cleanup, None)
 
2751
 
 
2752
        self._make_test_root()
 
2753
        self.addCleanup(os.chdir, os.getcwdu())
 
2754
        self.makeAndChdirToTestDir()
 
2755
        self.overrideEnvironmentForTesting()
 
2756
        self.__readonly_server = None
 
2757
        self.__server = None
 
2758
        self.reduceLockdirTimeout()
 
2759
 
 
2760
    def setup_smart_server_with_call_log(self):
 
2761
        """Sets up a smart server as the transport server with a call log."""
 
2762
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2763
        self.hpss_connections = []
 
2764
        self.hpss_calls = []
 
2765
        import traceback
 
2766
        # Skip the current stack down to the caller of
 
2767
        # setup_smart_server_with_call_log
 
2768
        prefix_length = len(traceback.extract_stack()) - 2
 
2769
        def capture_hpss_call(params):
 
2770
            self.hpss_calls.append(
 
2771
                CapturedCall(params, prefix_length))
 
2772
        def capture_connect(transport):
 
2773
            self.hpss_connections.append(transport)
 
2774
        client._SmartClient.hooks.install_named_hook(
 
2775
            'call', capture_hpss_call, None)
 
2776
        _mod_transport.Transport.hooks.install_named_hook(
 
2777
            'post_connect', capture_connect, None)
 
2778
 
 
2779
    def reset_smart_call_log(self):
 
2780
        self.hpss_calls = []
 
2781
        self.hpss_connections = []
 
2782
 
 
2783
 
 
2784
class TestCaseInTempDir(TestCaseWithMemoryTransport):
557
2785
    """Derived class that runs a test within a temporary directory.
558
2786
 
559
2787
    This is useful for tests that need to create a branch, etc.
563
2791
    All test cases create their own directory within that.  If the
564
2792
    tests complete successfully, the directory is removed.
565
2793
 
566
 
    InTempDir is an old alias for FunctionalTestCase.
 
2794
    :ivar test_base_dir: The path of the top-level directory for this
 
2795
    test, which contains a home directory and a work directory.
 
2796
 
 
2797
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2798
    which is used as $HOME for this test.
 
2799
 
 
2800
    :ivar test_dir: A directory under test_base_dir used as the current
 
2801
    directory when the test proper is run.
567
2802
    """
568
2803
 
569
 
    TEST_ROOT = None
570
 
    _TEST_NAME = 'test'
571
2804
    OVERRIDE_PYTHON = 'python'
572
2805
 
 
2806
    def setUp(self):
 
2807
        super(TestCaseInTempDir, self).setUp()
 
2808
        # Remove the protection set in isolated_environ, we have a proper
 
2809
        # access to disk resources now.
 
2810
        self.overrideEnv('BZR_LOG', None)
 
2811
 
573
2812
    def check_file_contents(self, filename, expect):
574
2813
        self.log("check contents of file %s" % filename)
575
 
        contents = file(filename, 'r').read()
 
2814
        f = file(filename)
 
2815
        try:
 
2816
            contents = f.read()
 
2817
        finally:
 
2818
            f.close()
576
2819
        if contents != expect:
577
2820
            self.log("expected: %r" % expect)
578
2821
            self.log("actually: %r" % contents)
579
2822
            self.fail("contents of %s not as expected" % filename)
580
2823
 
581
 
    def _make_test_root(self):
582
 
        if TestCaseInTempDir.TEST_ROOT is not None:
583
 
            return
584
 
        i = 0
585
 
        while True:
586
 
            root = u'test%04d.tmp' % i
587
 
            try:
588
 
                os.mkdir(root)
589
 
            except OSError, e:
590
 
                if e.errno == errno.EEXIST:
591
 
                    i += 1
592
 
                    continue
593
 
                else:
594
 
                    raise
595
 
            # successfully created
596
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
597
 
            break
598
 
        # make a fake bzr directory there to prevent any tests propagating
599
 
        # up onto the source directory's real branch
600
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
601
 
 
602
 
    def setUp(self):
603
 
        super(TestCaseInTempDir, self).setUp()
604
 
        self._make_test_root()
605
 
        _currentdir = os.getcwdu()
606
 
        # shorten the name, to avoid test failures due to path length
607
 
        short_id = self.id().replace('bzrlib.tests.', '') \
608
 
                   .replace('__main__.', '')[-100:]
609
 
        # it's possible the same test class is run several times for
610
 
        # parameterized tests, so make sure the names don't collide.  
611
 
        i = 0
612
 
        while True:
613
 
            if i > 0:
614
 
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
615
 
            else:
616
 
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
617
 
            if os.path.exists(candidate_dir):
618
 
                i = i + 1
619
 
                continue
620
 
            else:
621
 
                self.test_dir = candidate_dir
622
 
                os.mkdir(self.test_dir)
623
 
                os.chdir(self.test_dir)
 
2824
    def _getTestDirPrefix(self):
 
2825
        # create a directory within the top level test directory
 
2826
        if sys.platform in ('win32', 'cygwin'):
 
2827
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2828
            # windows is likely to have path-length limits so use a short name
 
2829
            name_prefix = name_prefix[-30:]
 
2830
        else:
 
2831
            name_prefix = re.sub('[/]', '_', self.id())
 
2832
        return name_prefix
 
2833
 
 
2834
    def makeAndChdirToTestDir(self):
 
2835
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2836
 
 
2837
        For TestCaseInTempDir we create a temporary directory based on the test
 
2838
        name and then create two subdirs - test and home under it.
 
2839
        """
 
2840
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2841
            self._getTestDirPrefix())
 
2842
        name = name_prefix
 
2843
        for i in range(100):
 
2844
            if os.path.exists(name):
 
2845
                name = name_prefix + '_' + str(i)
 
2846
            else:
 
2847
                # now create test and home directories within this dir
 
2848
                self.test_base_dir = name
 
2849
                self.addCleanup(self.deleteTestDir)
 
2850
                os.mkdir(self.test_base_dir)
624
2851
                break
625
 
        os.environ['HOME'] = self.test_dir
626
 
        os.environ['APPDATA'] = self.test_dir
627
 
        def _leaveDirectory():
628
 
            os.chdir(_currentdir)
629
 
        self.addCleanup(_leaveDirectory)
630
 
        
631
 
    def build_tree(self, shape, line_endings='native', transport=None):
 
2852
        self.permit_dir(self.test_base_dir)
 
2853
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2854
        # stacking policy to honour; create a bzr dir with an unshared
 
2855
        # repository (but not a branch - our code would be trying to escape
 
2856
        # then!) to stop them, and permit it to be read.
 
2857
        # control = controldir.ControlDir.create(self.test_base_dir)
 
2858
        # control.create_repository()
 
2859
        self.test_home_dir = self.test_base_dir + '/home'
 
2860
        os.mkdir(self.test_home_dir)
 
2861
        self.test_dir = self.test_base_dir + '/work'
 
2862
        os.mkdir(self.test_dir)
 
2863
        os.chdir(self.test_dir)
 
2864
        # put name of test inside
 
2865
        f = file(self.test_base_dir + '/name', 'w')
 
2866
        try:
 
2867
            f.write(self.id())
 
2868
        finally:
 
2869
            f.close()
 
2870
 
 
2871
    def deleteTestDir(self):
 
2872
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2873
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2874
 
 
2875
    def build_tree(self, shape, line_endings='binary', transport=None):
632
2876
        """Build a test tree according to a pattern.
633
2877
 
634
2878
        shape is a sequence of file specifications.  If the final
635
2879
        character is '/', a directory is created.
636
2880
 
 
2881
        This assumes that all the elements in the tree being built are new.
 
2882
 
637
2883
        This doesn't add anything to a branch.
 
2884
 
 
2885
        :type shape:    list or tuple.
638
2886
        :param line_endings: Either 'binary' or 'native'
639
 
                             in binary mode, exact contents are written
640
 
                             in native mode, the line endings match the
641
 
                             default platform endings.
642
 
 
643
 
        :param transport: A transport to write to, for building trees on 
644
 
                          VFS's. If the transport is readonly or None,
645
 
                          "." is opened automatically.
 
2887
            in binary mode, exact contents are written in native mode, the
 
2888
            line endings match the default platform endings.
 
2889
        :param transport: A transport to write to, for building trees on VFS's.
 
2890
            If the transport is readonly or None, "." is opened automatically.
 
2891
        :return: None
646
2892
        """
647
 
        # XXX: It's OK to just create them using forward slashes on windows?
 
2893
        if type(shape) not in (list, tuple):
 
2894
            raise AssertionError("Parameter 'shape' should be "
 
2895
                "a list or a tuple. Got %r instead" % (shape,))
 
2896
        # It's OK to just create them using forward slashes on windows.
648
2897
        if transport is None or transport.is_readonly():
649
 
            transport = get_transport(".")
 
2898
            transport = _mod_transport.get_transport_from_path(".")
650
2899
        for name in shape:
651
 
            self.assert_(isinstance(name, basestring))
 
2900
            self.assertIsInstance(name, basestring)
652
2901
            if name[-1] == '/':
653
 
                transport.mkdir(urlescape(name[:-1]))
 
2902
                transport.mkdir(urlutils.escape(name[:-1]))
654
2903
            else:
655
2904
                if line_endings == 'binary':
656
2905
                    end = '\n'
657
2906
                elif line_endings == 'native':
658
2907
                    end = os.linesep
659
2908
                else:
660
 
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
661
 
                content = "contents of %s%s" % (name, end)
662
 
                transport.put(urlescape(name), StringIO(content))
663
 
 
664
 
    def build_tree_contents(self, shape):
665
 
        build_tree_contents(shape)
666
 
 
667
 
    def failUnlessExists(self, path):
668
 
        """Fail unless path, which may be abs or relative, exists."""
669
 
        self.failUnless(osutils.lexists(path))
670
 
 
671
 
    def failIfExists(self, path):
672
 
        """Fail if path, which may be abs or relative, exists."""
673
 
        self.failIf(osutils.lexists(path))
674
 
        
675
 
    def assertFileEqual(self, content, path):
676
 
        """Fail if path does not contain 'content'."""
677
 
        self.failUnless(osutils.lexists(path))
678
 
        self.assertEqualDiff(content, open(path, 'r').read())
 
2909
                    raise errors.BzrError(
 
2910
                        'Invalid line ending request %r' % line_endings)
 
2911
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2912
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2913
 
 
2914
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2915
 
 
2916
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2917
        """Assert whether path or paths are in the WorkingTree"""
 
2918
        if tree is None:
 
2919
            tree = workingtree.WorkingTree.open(root_path)
 
2920
        if not isinstance(path, basestring):
 
2921
            for p in path:
 
2922
                self.assertInWorkingTree(p, tree=tree)
 
2923
        else:
 
2924
            self.assertIsNot(tree.path2id(path), None,
 
2925
                path+' not in working tree.')
 
2926
 
 
2927
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2928
        """Assert whether path or paths are not in the WorkingTree"""
 
2929
        if tree is None:
 
2930
            tree = workingtree.WorkingTree.open(root_path)
 
2931
        if not isinstance(path, basestring):
 
2932
            for p in path:
 
2933
                self.assertNotInWorkingTree(p,tree=tree)
 
2934
        else:
 
2935
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
679
2936
 
680
2937
 
681
2938
class TestCaseWithTransport(TestCaseInTempDir):
688
2945
    ReadonlyTransportDecorator is used instead which allows the use of non disk
689
2946
    based read write transports.
690
2947
 
691
 
    If an explicit class is provided for readonly access, that server and the 
 
2948
    If an explicit class is provided for readonly access, that server and the
692
2949
    readwrite one must both define get_url() as resolving to os.getcwd().
693
2950
    """
694
2951
 
695
 
    def __init__(self, methodName='testMethod'):
696
 
        super(TestCaseWithTransport, self).__init__(methodName)
697
 
        self.__readonly_server = None
698
 
        self.__server = None
699
 
        self.transport_server = default_transport
700
 
        self.transport_readonly_server = None
701
 
 
702
 
    def get_readonly_url(self, relpath=None):
703
 
        """Get a URL for the readonly transport.
704
 
 
705
 
        This will either be backed by '.' or a decorator to the transport 
706
 
        used by self.get_url()
707
 
        relpath provides for clients to get a path relative to the base url.
708
 
        These should only be downwards relative, not upwards.
709
 
        """
710
 
        base = self.get_readonly_server().get_url()
711
 
        if relpath is not None:
712
 
            if not base.endswith('/'):
713
 
                base = base + '/'
714
 
            base = base + relpath
715
 
        return base
716
 
 
717
 
    def get_readonly_server(self):
718
 
        """Get the server instance for the readonly transport
719
 
 
720
 
        This is useful for some tests with specific servers to do diagnostics.
721
 
        """
722
 
        if self.__readonly_server is None:
723
 
            if self.transport_readonly_server is None:
724
 
                # readonly decorator requested
725
 
                # bring up the server
726
 
                self.get_url()
727
 
                self.__readonly_server = ReadonlyServer()
728
 
                self.__readonly_server.setUp(self.__server)
729
 
            else:
730
 
                self.__readonly_server = self.transport_readonly_server()
731
 
                self.__readonly_server.setUp()
732
 
            self.addCleanup(self.__readonly_server.tearDown)
733
 
        return self.__readonly_server
734
 
 
735
 
    def get_server(self):
736
 
        """Get the read/write server instance.
 
2952
    def get_vfs_only_server(self):
 
2953
        """See TestCaseWithMemoryTransport.
737
2954
 
738
2955
        This is useful for some tests with specific servers that need
739
2956
        diagnostics.
740
2957
        """
741
 
        if self.__server is None:
742
 
            self.__server = self.transport_server()
743
 
            self.__server.setUp()
744
 
            self.addCleanup(self.__server.tearDown)
745
 
        return self.__server
746
 
 
747
 
    def get_url(self, relpath=None):
748
 
        """Get a URL for the readwrite transport.
749
 
 
750
 
        This will either be backed by '.' or to an equivalent non-file based
751
 
        facility.
752
 
        relpath provides for clients to get a path relative to the base url.
753
 
        These should only be downwards relative, not upwards.
754
 
        """
755
 
        base = self.get_server().get_url()
756
 
        if relpath is not None and relpath != '.':
757
 
            if not base.endswith('/'):
758
 
                base = base + '/'
759
 
            base = base + relpath
760
 
        return base
761
 
 
762
 
    def get_transport(self):
763
 
        """Return a writeable transport for the test scratch space"""
764
 
        t = get_transport(self.get_url())
765
 
        self.assertFalse(t.is_readonly())
766
 
        return t
767
 
 
768
 
    def get_readonly_transport(self):
769
 
        """Return a readonly transport for the test scratch space
770
 
        
771
 
        This can be used to test that operations which should only need
772
 
        readonly access in fact do not try to write.
773
 
        """
774
 
        t = get_transport(self.get_readonly_url())
775
 
        self.assertTrue(t.is_readonly())
776
 
        return t
777
 
 
778
 
    def make_branch(self, relpath):
779
 
        """Create a branch on the transport at relpath."""
780
 
        repo = self.make_repository(relpath)
781
 
        return repo.bzrdir.create_branch()
782
 
 
783
 
    def make_bzrdir(self, relpath):
784
 
        try:
785
 
            url = self.get_url(relpath)
786
 
            segments = relpath.split('/')
787
 
            if segments and segments[-1] not in ('', '.'):
788
 
                parent = self.get_url('/'.join(segments[:-1]))
789
 
                t = get_transport(parent)
790
 
                try:
791
 
                    t.mkdir(segments[-1])
792
 
                except errors.FileExists:
793
 
                    pass
794
 
            return bzrlib.bzrdir.BzrDir.create(url)
795
 
        except errors.UninitializableFormat:
796
 
            raise TestSkipped("Format %s is not initializable.")
797
 
 
798
 
    def make_repository(self, relpath, shared=False):
799
 
        """Create a repository on our default transport at relpath."""
800
 
        made_control = self.make_bzrdir(relpath)
801
 
        return made_control.create_repository(shared=shared)
802
 
 
803
 
    def make_branch_and_tree(self, relpath):
 
2958
        if self.__vfs_server is None:
 
2959
            self.__vfs_server = self.vfs_transport_factory()
 
2960
            self.start_server(self.__vfs_server)
 
2961
        return self.__vfs_server
 
2962
 
 
2963
    def make_branch_and_tree(self, relpath, format=None):
804
2964
        """Create a branch on the transport and a tree locally.
805
2965
 
806
 
        Returns the tree.
 
2966
        If the transport is not a LocalTransport, the Tree can't be created on
 
2967
        the transport.  In that case if the vfs_transport_factory is
 
2968
        LocalURLServer the working tree is created in the local
 
2969
        directory backing the transport, and the returned tree's branch and
 
2970
        repository will also be accessed locally. Otherwise a lightweight
 
2971
        checkout is created and returned.
 
2972
 
 
2973
        We do this because we can't physically create a tree in the local
 
2974
        path, with a branch reference to the transport_factory url, and
 
2975
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2976
        namespace is distinct from the local disk - the two branch objects
 
2977
        would collide. While we could construct a tree with its branch object
 
2978
        pointing at the transport_factory transport in memory, reopening it
 
2979
        would behaving unexpectedly, and has in the past caused testing bugs
 
2980
        when we tried to do it that way.
 
2981
 
 
2982
        :param format: The BzrDirFormat.
 
2983
        :returns: the WorkingTree.
807
2984
        """
808
2985
        # TODO: always use the local disk path for the working tree,
809
2986
        # this obviously requires a format that supports branch references
810
2987
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
811
2988
        # RBC 20060208
812
 
        b = self.make_branch(relpath)
 
2989
        format = self.resolve_format(format=format)
 
2990
        if not format.supports_workingtrees:
 
2991
            b = self.make_branch(relpath+'.branch', format=format)
 
2992
            return b.create_checkout(relpath, lightweight=True)
 
2993
        b = self.make_branch(relpath, format=format)
813
2994
        try:
814
2995
            return b.bzrdir.create_workingtree()
815
2996
        except errors.NotLocalUrl:
816
 
            # new formats - catch No tree error and create
817
 
            # a branch reference and a checkout.
818
 
            # old formats at that point - raise TestSkipped.
819
 
            # TODO: rbc 20060208
820
 
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
2997
            # We can only make working trees locally at the moment.  If the
 
2998
            # transport can't support them, then we keep the non-disk-backed
 
2999
            # branch and create a local checkout.
 
3000
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
3001
                # the branch is colocated on disk, we cannot create a checkout.
 
3002
                # hopefully callers will expect this.
 
3003
                local_controldir = controldir.ControlDir.open(
 
3004
                    self.get_vfs_only_url(relpath))
 
3005
                wt = local_controldir.create_workingtree()
 
3006
                if wt.branch._format != b._format:
 
3007
                    wt._branch = b
 
3008
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
3009
                    # in case the implementation details of workingtree objects
 
3010
                    # change.
 
3011
                    self.assertIs(b, wt.branch)
 
3012
                return wt
 
3013
            else:
 
3014
                return b.create_checkout(relpath, lightweight=True)
821
3015
 
822
3016
    def assertIsDirectory(self, relpath, transport):
823
3017
        """Assert that relpath within transport is a directory.
834
3028
            self.fail("path %s is not a directory; has mode %#o"
835
3029
                      % (relpath, mode))
836
3030
 
 
3031
    def assertTreesEqual(self, left, right):
 
3032
        """Check that left and right have the same content and properties."""
 
3033
        # we use a tree delta to check for equality of the content, and we
 
3034
        # manually check for equality of other things such as the parents list.
 
3035
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
3036
        differences = left.changes_from(right)
 
3037
        self.assertFalse(differences.has_changed(),
 
3038
            "Trees %r and %r are different: %r" % (left, right, differences))
 
3039
 
 
3040
    def setUp(self):
 
3041
        super(TestCaseWithTransport, self).setUp()
 
3042
        self.__vfs_server = None
 
3043
 
 
3044
    def disable_missing_extensions_warning(self):
 
3045
        """Some tests expect a precise stderr content.
 
3046
 
 
3047
        There is no point in forcing them to duplicate the extension related
 
3048
        warning.
 
3049
        """
 
3050
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3051
 
837
3052
 
838
3053
class ChrootedTestCase(TestCaseWithTransport):
839
3054
    """A support class that provides readonly urls outside the local namespace.
843
3058
    for readonly urls.
844
3059
 
845
3060
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
846
 
                       be used without needed to redo it when a different 
 
3061
                       be used without needed to redo it when a different
847
3062
                       subclass is in use ?
848
3063
    """
849
3064
 
850
3065
    def setUp(self):
 
3066
        from bzrlib.tests import http_server
851
3067
        super(ChrootedTestCase, self).setUp()
852
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
853
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
3068
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3069
            self.transport_readonly_server = http_server.HttpServer
 
3070
 
 
3071
 
 
3072
def condition_id_re(pattern):
 
3073
    """Create a condition filter which performs a re check on a test's id.
 
3074
 
 
3075
    :param pattern: A regular expression string.
 
3076
    :return: A callable that returns True if the re matches.
 
3077
    """
 
3078
    filter_re = re.compile(pattern, 0)
 
3079
    def condition(test):
 
3080
        test_id = test.id()
 
3081
        return filter_re.search(test_id)
 
3082
    return condition
 
3083
 
 
3084
 
 
3085
def condition_isinstance(klass_or_klass_list):
 
3086
    """Create a condition filter which returns isinstance(param, klass).
 
3087
 
 
3088
    :return: A callable which when called with one parameter obj return the
 
3089
        result of isinstance(obj, klass_or_klass_list).
 
3090
    """
 
3091
    def condition(obj):
 
3092
        return isinstance(obj, klass_or_klass_list)
 
3093
    return condition
 
3094
 
 
3095
 
 
3096
def condition_id_in_list(id_list):
 
3097
    """Create a condition filter which verify that test's id in a list.
 
3098
 
 
3099
    :param id_list: A TestIdList object.
 
3100
    :return: A callable that returns True if the test's id appears in the list.
 
3101
    """
 
3102
    def condition(test):
 
3103
        return id_list.includes(test.id())
 
3104
    return condition
 
3105
 
 
3106
 
 
3107
def condition_id_startswith(starts):
 
3108
    """Create a condition filter verifying that test's id starts with a string.
 
3109
 
 
3110
    :param starts: A list of string.
 
3111
    :return: A callable that returns True if the test's id starts with one of
 
3112
        the given strings.
 
3113
    """
 
3114
    def condition(test):
 
3115
        for start in starts:
 
3116
            if test.id().startswith(start):
 
3117
                return True
 
3118
        return False
 
3119
    return condition
 
3120
 
 
3121
 
 
3122
def exclude_tests_by_condition(suite, condition):
 
3123
    """Create a test suite which excludes some tests from suite.
 
3124
 
 
3125
    :param suite: The suite to get tests from.
 
3126
    :param condition: A callable whose result evaluates True when called with a
 
3127
        test case which should be excluded from the result.
 
3128
    :return: A suite which contains the tests found in suite that fail
 
3129
        condition.
 
3130
    """
 
3131
    result = []
 
3132
    for test in iter_suite_tests(suite):
 
3133
        if not condition(test):
 
3134
            result.append(test)
 
3135
    return TestUtil.TestSuite(result)
 
3136
 
 
3137
 
 
3138
def filter_suite_by_condition(suite, condition):
 
3139
    """Create a test suite by filtering another one.
 
3140
 
 
3141
    :param suite: The source suite.
 
3142
    :param condition: A callable whose result evaluates True when called with a
 
3143
        test case which should be included in the result.
 
3144
    :return: A suite which contains the tests found in suite that pass
 
3145
        condition.
 
3146
    """
 
3147
    result = []
 
3148
    for test in iter_suite_tests(suite):
 
3149
        if condition(test):
 
3150
            result.append(test)
 
3151
    return TestUtil.TestSuite(result)
854
3152
 
855
3153
 
856
3154
def filter_suite_by_re(suite, pattern):
857
 
    result = TestSuite()
858
 
    filter_re = re.compile(pattern)
 
3155
    """Create a test suite by filtering another one.
 
3156
 
 
3157
    :param suite:           the source suite
 
3158
    :param pattern:         pattern that names must match
 
3159
    :returns: the newly created suite
 
3160
    """
 
3161
    condition = condition_id_re(pattern)
 
3162
    result_suite = filter_suite_by_condition(suite, condition)
 
3163
    return result_suite
 
3164
 
 
3165
 
 
3166
def filter_suite_by_id_list(suite, test_id_list):
 
3167
    """Create a test suite by filtering another one.
 
3168
 
 
3169
    :param suite: The source suite.
 
3170
    :param test_id_list: A list of the test ids to keep as strings.
 
3171
    :returns: the newly created suite
 
3172
    """
 
3173
    condition = condition_id_in_list(test_id_list)
 
3174
    result_suite = filter_suite_by_condition(suite, condition)
 
3175
    return result_suite
 
3176
 
 
3177
 
 
3178
def filter_suite_by_id_startswith(suite, start):
 
3179
    """Create a test suite by filtering another one.
 
3180
 
 
3181
    :param suite: The source suite.
 
3182
    :param start: A list of string the test id must start with one of.
 
3183
    :returns: the newly created suite
 
3184
    """
 
3185
    condition = condition_id_startswith(start)
 
3186
    result_suite = filter_suite_by_condition(suite, condition)
 
3187
    return result_suite
 
3188
 
 
3189
 
 
3190
def exclude_tests_by_re(suite, pattern):
 
3191
    """Create a test suite which excludes some tests from suite.
 
3192
 
 
3193
    :param suite: The suite to get tests from.
 
3194
    :param pattern: A regular expression string. Test ids that match this
 
3195
        pattern will be excluded from the result.
 
3196
    :return: A TestSuite that contains all the tests from suite without the
 
3197
        tests that matched pattern. The order of tests is the same as it was in
 
3198
        suite.
 
3199
    """
 
3200
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3201
 
 
3202
 
 
3203
def preserve_input(something):
 
3204
    """A helper for performing test suite transformation chains.
 
3205
 
 
3206
    :param something: Anything you want to preserve.
 
3207
    :return: Something.
 
3208
    """
 
3209
    return something
 
3210
 
 
3211
 
 
3212
def randomize_suite(suite):
 
3213
    """Return a new TestSuite with suite's tests in random order.
 
3214
 
 
3215
    The tests in the input suite are flattened into a single suite in order to
 
3216
    accomplish this. Any nested TestSuites are removed to provide global
 
3217
    randomness.
 
3218
    """
 
3219
    tests = list(iter_suite_tests(suite))
 
3220
    random.shuffle(tests)
 
3221
    return TestUtil.TestSuite(tests)
 
3222
 
 
3223
 
 
3224
def split_suite_by_condition(suite, condition):
 
3225
    """Split a test suite into two by a condition.
 
3226
 
 
3227
    :param suite: The suite to split.
 
3228
    :param condition: The condition to match on. Tests that match this
 
3229
        condition are returned in the first test suite, ones that do not match
 
3230
        are in the second suite.
 
3231
    :return: A tuple of two test suites, where the first contains tests from
 
3232
        suite matching the condition, and the second contains the remainder
 
3233
        from suite. The order within each output suite is the same as it was in
 
3234
        suite.
 
3235
    """
 
3236
    matched = []
 
3237
    did_not_match = []
859
3238
    for test in iter_suite_tests(suite):
860
 
        if filter_re.search(test.id()):
861
 
            result.addTest(test)
862
 
    return result
 
3239
        if condition(test):
 
3240
            matched.append(test)
 
3241
        else:
 
3242
            did_not_match.append(test)
 
3243
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3244
 
 
3245
 
 
3246
def split_suite_by_re(suite, pattern):
 
3247
    """Split a test suite into two by a regular expression.
 
3248
 
 
3249
    :param suite: The suite to split.
 
3250
    :param pattern: A regular expression string. Test ids that match this
 
3251
        pattern will be in the first test suite returned, and the others in the
 
3252
        second test suite returned.
 
3253
    :return: A tuple of two test suites, where the first contains tests from
 
3254
        suite matching pattern, and the second contains the remainder from
 
3255
        suite. The order within each output suite is the same as it was in
 
3256
        suite.
 
3257
    """
 
3258
    return split_suite_by_condition(suite, condition_id_re(pattern))
863
3259
 
864
3260
 
865
3261
def run_suite(suite, name='test', verbose=False, pattern=".*",
866
 
              stop_on_failure=False, keep_output=False,
867
 
              transport=None):
868
 
    TestCaseInTempDir._TEST_NAME = name
 
3262
              stop_on_failure=False,
 
3263
              transport=None, lsprof_timed=None, bench_history=None,
 
3264
              matching_tests_first=None,
 
3265
              list_only=False,
 
3266
              random_seed=None,
 
3267
              exclude_pattern=None,
 
3268
              strict=False,
 
3269
              runner_class=None,
 
3270
              suite_decorators=None,
 
3271
              stream=None,
 
3272
              result_decorators=None,
 
3273
              ):
 
3274
    """Run a test suite for bzr selftest.
 
3275
 
 
3276
    :param runner_class: The class of runner to use. Must support the
 
3277
        constructor arguments passed by run_suite which are more than standard
 
3278
        python uses.
 
3279
    :return: A boolean indicating success.
 
3280
    """
 
3281
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
869
3282
    if verbose:
870
3283
        verbosity = 2
871
3284
    else:
872
3285
        verbosity = 1
873
 
    runner = TextTestRunner(stream=sys.stdout,
 
3286
    if runner_class is None:
 
3287
        runner_class = TextTestRunner
 
3288
    if stream is None:
 
3289
        stream = sys.stdout
 
3290
    runner = runner_class(stream=stream,
874
3291
                            descriptions=0,
875
 
                            verbosity=verbosity)
 
3292
                            verbosity=verbosity,
 
3293
                            bench_history=bench_history,
 
3294
                            strict=strict,
 
3295
                            result_decorators=result_decorators,
 
3296
                            )
876
3297
    runner.stop_on_failure=stop_on_failure
877
 
    if pattern != '.*':
878
 
        suite = filter_suite_by_re(suite, pattern)
 
3298
    if isinstance(suite, unittest.TestSuite):
 
3299
        # Empty out _tests list of passed suite and populate new TestSuite
 
3300
        suite._tests[:], suite = [], TestSuite(suite)
 
3301
    # built in decorator factories:
 
3302
    decorators = [
 
3303
        random_order(random_seed, runner),
 
3304
        exclude_tests(exclude_pattern),
 
3305
        ]
 
3306
    if matching_tests_first:
 
3307
        decorators.append(tests_first(pattern))
 
3308
    else:
 
3309
        decorators.append(filter_tests(pattern))
 
3310
    if suite_decorators:
 
3311
        decorators.extend(suite_decorators)
 
3312
    # tell the result object how many tests will be running: (except if
 
3313
    # --parallel=fork is being used. Robert said he will provide a better
 
3314
    # progress design later -- vila 20090817)
 
3315
    if fork_decorator not in decorators:
 
3316
        decorators.append(CountingDecorator)
 
3317
    for decorator in decorators:
 
3318
        suite = decorator(suite)
 
3319
    if list_only:
 
3320
        # Done after test suite decoration to allow randomisation etc
 
3321
        # to take effect, though that is of marginal benefit.
 
3322
        if verbosity >= 2:
 
3323
            stream.write("Listing tests only ...\n")
 
3324
        for t in iter_suite_tests(suite):
 
3325
            stream.write("%s\n" % (t.id()))
 
3326
        return True
879
3327
    result = runner.run(suite)
880
 
    # This is still a little bogus, 
881
 
    # but only a little. Folk not using our testrunner will
882
 
    # have to delete their temp directories themselves.
883
 
    test_root = TestCaseInTempDir.TEST_ROOT
884
 
    if result.wasSuccessful() or not keep_output:
885
 
        if test_root is not None:
886
 
            print 'Deleting test root %s...' % test_root
887
 
            try:
888
 
                shutil.rmtree(test_root)
889
 
            finally:
890
 
                print
 
3328
    if strict:
 
3329
        return result.wasStrictlySuccessful()
891
3330
    else:
892
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
893
 
    return result.wasSuccessful()
 
3331
        return result.wasSuccessful()
 
3332
 
 
3333
 
 
3334
# A registry where get() returns a suite decorator.
 
3335
parallel_registry = registry.Registry()
 
3336
 
 
3337
 
 
3338
def fork_decorator(suite):
 
3339
    if getattr(os, "fork", None) is None:
 
3340
        raise errors.BzrCommandError("platform does not support fork,"
 
3341
            " try --parallel=subprocess instead.")
 
3342
    concurrency = osutils.local_concurrency()
 
3343
    if concurrency == 1:
 
3344
        return suite
 
3345
    from testtools import ConcurrentTestSuite
 
3346
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3347
parallel_registry.register('fork', fork_decorator)
 
3348
 
 
3349
 
 
3350
def subprocess_decorator(suite):
 
3351
    concurrency = osutils.local_concurrency()
 
3352
    if concurrency == 1:
 
3353
        return suite
 
3354
    from testtools import ConcurrentTestSuite
 
3355
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3356
parallel_registry.register('subprocess', subprocess_decorator)
 
3357
 
 
3358
 
 
3359
def exclude_tests(exclude_pattern):
 
3360
    """Return a test suite decorator that excludes tests."""
 
3361
    if exclude_pattern is None:
 
3362
        return identity_decorator
 
3363
    def decorator(suite):
 
3364
        return ExcludeDecorator(suite, exclude_pattern)
 
3365
    return decorator
 
3366
 
 
3367
 
 
3368
def filter_tests(pattern):
 
3369
    if pattern == '.*':
 
3370
        return identity_decorator
 
3371
    def decorator(suite):
 
3372
        return FilterTestsDecorator(suite, pattern)
 
3373
    return decorator
 
3374
 
 
3375
 
 
3376
def random_order(random_seed, runner):
 
3377
    """Return a test suite decorator factory for randomising tests order.
 
3378
    
 
3379
    :param random_seed: now, a string which casts to a long, or a long.
 
3380
    :param runner: A test runner with a stream attribute to report on.
 
3381
    """
 
3382
    if random_seed is None:
 
3383
        return identity_decorator
 
3384
    def decorator(suite):
 
3385
        return RandomDecorator(suite, random_seed, runner.stream)
 
3386
    return decorator
 
3387
 
 
3388
 
 
3389
def tests_first(pattern):
 
3390
    if pattern == '.*':
 
3391
        return identity_decorator
 
3392
    def decorator(suite):
 
3393
        return TestFirstDecorator(suite, pattern)
 
3394
    return decorator
 
3395
 
 
3396
 
 
3397
def identity_decorator(suite):
 
3398
    """Return suite."""
 
3399
    return suite
 
3400
 
 
3401
 
 
3402
class TestDecorator(TestUtil.TestSuite):
 
3403
    """A decorator for TestCase/TestSuite objects.
 
3404
 
 
3405
    Contains rather than flattening suite passed on construction
 
3406
    """
 
3407
 
 
3408
    def __init__(self, suite=None):
 
3409
        super(TestDecorator, self).__init__()
 
3410
        if suite is not None:
 
3411
            self.addTest(suite)
 
3412
 
 
3413
    # Don't need subclass run method with suite emptying
 
3414
    run = unittest.TestSuite.run
 
3415
 
 
3416
 
 
3417
class CountingDecorator(TestDecorator):
 
3418
    """A decorator which calls result.progress(self.countTestCases)."""
 
3419
 
 
3420
    def run(self, result):
 
3421
        progress_method = getattr(result, 'progress', None)
 
3422
        if callable(progress_method):
 
3423
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3424
        return super(CountingDecorator, self).run(result)
 
3425
 
 
3426
 
 
3427
class ExcludeDecorator(TestDecorator):
 
3428
    """A decorator which excludes test matching an exclude pattern."""
 
3429
 
 
3430
    def __init__(self, suite, exclude_pattern):
 
3431
        super(ExcludeDecorator, self).__init__(
 
3432
            exclude_tests_by_re(suite, exclude_pattern))
 
3433
 
 
3434
 
 
3435
class FilterTestsDecorator(TestDecorator):
 
3436
    """A decorator which filters tests to those matching a pattern."""
 
3437
 
 
3438
    def __init__(self, suite, pattern):
 
3439
        super(FilterTestsDecorator, self).__init__(
 
3440
            filter_suite_by_re(suite, pattern))
 
3441
 
 
3442
 
 
3443
class RandomDecorator(TestDecorator):
 
3444
    """A decorator which randomises the order of its tests."""
 
3445
 
 
3446
    def __init__(self, suite, random_seed, stream):
 
3447
        random_seed = self.actual_seed(random_seed)
 
3448
        stream.write("Randomizing test order using seed %s\n\n" %
 
3449
            (random_seed,))
 
3450
        # Initialise the random number generator.
 
3451
        random.seed(random_seed)
 
3452
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3453
 
 
3454
    @staticmethod
 
3455
    def actual_seed(seed):
 
3456
        if seed == "now":
 
3457
            # We convert the seed to a long to make it reuseable across
 
3458
            # invocations (because the user can reenter it).
 
3459
            return long(time.time())
 
3460
        else:
 
3461
            # Convert the seed to a long if we can
 
3462
            try:
 
3463
                return long(seed)
 
3464
            except (TypeError, ValueError):
 
3465
                pass
 
3466
        return seed
 
3467
 
 
3468
 
 
3469
class TestFirstDecorator(TestDecorator):
 
3470
    """A decorator which moves named tests to the front."""
 
3471
 
 
3472
    def __init__(self, suite, pattern):
 
3473
        super(TestFirstDecorator, self).__init__()
 
3474
        self.addTests(split_suite_by_re(suite, pattern))
 
3475
 
 
3476
 
 
3477
def partition_tests(suite, count):
 
3478
    """Partition suite into count lists of tests."""
 
3479
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3480
    # splits up blocks of related tests that might run faster if they shared
 
3481
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3482
    # just one partition.  So the slowest partition shouldn't be much slower
 
3483
    # than the fastest.
 
3484
    partitions = [list() for i in range(count)]
 
3485
    tests = iter_suite_tests(suite)
 
3486
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
 
3487
        partition.append(test)
 
3488
    return partitions
 
3489
 
 
3490
 
 
3491
def workaround_zealous_crypto_random():
 
3492
    """Crypto.Random want to help us being secure, but we don't care here.
 
3493
 
 
3494
    This workaround some test failure related to the sftp server. Once paramiko
 
3495
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3496
    """
 
3497
    try:
 
3498
        from Crypto.Random import atfork
 
3499
        atfork()
 
3500
    except ImportError:
 
3501
        pass
 
3502
 
 
3503
 
 
3504
def fork_for_tests(suite):
 
3505
    """Take suite and start up one runner per CPU by forking()
 
3506
 
 
3507
    :return: An iterable of TestCase-like objects which can each have
 
3508
        run(result) called on them to feed tests to result.
 
3509
    """
 
3510
    concurrency = osutils.local_concurrency()
 
3511
    result = []
 
3512
    from subunit import ProtocolTestCase
 
3513
    from subunit.test_results import AutoTimingTestResultDecorator
 
3514
    class TestInOtherProcess(ProtocolTestCase):
 
3515
        # Should be in subunit, I think. RBC.
 
3516
        def __init__(self, stream, pid):
 
3517
            ProtocolTestCase.__init__(self, stream)
 
3518
            self.pid = pid
 
3519
 
 
3520
        def run(self, result):
 
3521
            try:
 
3522
                ProtocolTestCase.run(self, result)
 
3523
            finally:
 
3524
                pid, status = os.waitpid(self.pid, 0)
 
3525
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3526
            #                that something went wrong.
 
3527
 
 
3528
    test_blocks = partition_tests(suite, concurrency)
 
3529
    # Clear the tests from the original suite so it doesn't keep them alive
 
3530
    suite._tests[:] = []
 
3531
    for process_tests in test_blocks:
 
3532
        process_suite = TestUtil.TestSuite(process_tests)
 
3533
        # Also clear each split list so new suite has only reference
 
3534
        process_tests[:] = []
 
3535
        c2pread, c2pwrite = os.pipe()
 
3536
        pid = os.fork()
 
3537
        if pid == 0:
 
3538
            try:
 
3539
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3540
                workaround_zealous_crypto_random()
 
3541
                os.close(c2pread)
 
3542
                # Leave stderr and stdout open so we can see test noise
 
3543
                # Close stdin so that the child goes away if it decides to
 
3544
                # read from stdin (otherwise its a roulette to see what
 
3545
                # child actually gets keystrokes for pdb etc).
 
3546
                sys.stdin.close()
 
3547
                subunit_result = AutoTimingTestResultDecorator(
 
3548
                    SubUnitBzrProtocolClient(stream))
 
3549
                process_suite.run(subunit_result)
 
3550
            except:
 
3551
                # Try and report traceback on stream, but exit with error even
 
3552
                # if stream couldn't be created or something else goes wrong.
 
3553
                # The traceback is formatted to a string and written in one go
 
3554
                # to avoid interleaving lines from multiple failing children.
 
3555
                try:
 
3556
                    stream.write(traceback.format_exc())
 
3557
                finally:
 
3558
                    os._exit(1)
 
3559
            os._exit(0)
 
3560
        else:
 
3561
            os.close(c2pwrite)
 
3562
            stream = os.fdopen(c2pread, 'rb', 1)
 
3563
            test = TestInOtherProcess(stream, pid)
 
3564
            result.append(test)
 
3565
    return result
 
3566
 
 
3567
 
 
3568
def reinvoke_for_tests(suite):
 
3569
    """Take suite and start up one runner per CPU using subprocess().
 
3570
 
 
3571
    :return: An iterable of TestCase-like objects which can each have
 
3572
        run(result) called on them to feed tests to result.
 
3573
    """
 
3574
    concurrency = osutils.local_concurrency()
 
3575
    result = []
 
3576
    from subunit import ProtocolTestCase
 
3577
    class TestInSubprocess(ProtocolTestCase):
 
3578
        def __init__(self, process, name):
 
3579
            ProtocolTestCase.__init__(self, process.stdout)
 
3580
            self.process = process
 
3581
            self.process.stdin.close()
 
3582
            self.name = name
 
3583
 
 
3584
        def run(self, result):
 
3585
            try:
 
3586
                ProtocolTestCase.run(self, result)
 
3587
            finally:
 
3588
                self.process.wait()
 
3589
                os.unlink(self.name)
 
3590
            # print "pid %d finished" % finished_process
 
3591
    test_blocks = partition_tests(suite, concurrency)
 
3592
    for process_tests in test_blocks:
 
3593
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3594
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3595
        if not os.path.isfile(bzr_path):
 
3596
            # We are probably installed. Assume sys.argv is the right file
 
3597
            bzr_path = sys.argv[0]
 
3598
        bzr_path = [bzr_path]
 
3599
        if sys.platform == "win32":
 
3600
            # if we're on windows, we can't execute the bzr script directly
 
3601
            bzr_path = [sys.executable] + bzr_path
 
3602
        fd, test_list_file_name = tempfile.mkstemp()
 
3603
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3604
        for test in process_tests:
 
3605
            test_list_file.write(test.id() + '\n')
 
3606
        test_list_file.close()
 
3607
        try:
 
3608
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3609
                '--subunit']
 
3610
            if '--no-plugins' in sys.argv:
 
3611
                argv.append('--no-plugins')
 
3612
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3613
            # noise on stderr it can interrupt the subunit protocol.
 
3614
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3615
                                      stdout=subprocess.PIPE,
 
3616
                                      stderr=subprocess.PIPE,
 
3617
                                      bufsize=1)
 
3618
            test = TestInSubprocess(process, test_list_file_name)
 
3619
            result.append(test)
 
3620
        except:
 
3621
            os.unlink(test_list_file_name)
 
3622
            raise
 
3623
    return result
 
3624
 
 
3625
 
 
3626
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3627
    """Generate profiling data for all activity between start and success.
 
3628
    
 
3629
    The profile data is appended to the test's _benchcalls attribute and can
 
3630
    be accessed by the forwarded-to TestResult.
 
3631
 
 
3632
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3633
    where our existing output support for lsprof is, and this class aims to
 
3634
    fit in with that: while it could be moved it's not necessary to accomplish
 
3635
    test profiling, nor would it be dramatically cleaner.
 
3636
    """
 
3637
 
 
3638
    def startTest(self, test):
 
3639
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3640
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3641
        # unavoidably fail.
 
3642
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
 
3643
        self.profiler.start()
 
3644
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3645
 
 
3646
    def addSuccess(self, test):
 
3647
        stats = self.profiler.stop()
 
3648
        try:
 
3649
            calls = test._benchcalls
 
3650
        except AttributeError:
 
3651
            test._benchcalls = []
 
3652
            calls = test._benchcalls
 
3653
        calls.append(((test.id(), "", ""), stats))
 
3654
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3655
 
 
3656
    def stopTest(self, test):
 
3657
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3658
        self.profiler = None
 
3659
 
 
3660
 
 
3661
# Controlled by "bzr selftest -E=..." option
 
3662
# Currently supported:
 
3663
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3664
#                           preserves any flags supplied at the command line.
 
3665
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3666
#                           rather than failing tests. And no longer raise
 
3667
#                           LockContention when fctnl locks are not being used
 
3668
#                           with proper exclusion rules.
 
3669
#   -Ethreads               Will display thread ident at creation/join time to
 
3670
#                           help track thread leaks
 
3671
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3672
#                           deallocated after being completed.
 
3673
#   -Econfig_stats          Will collect statistics using addDetail
 
3674
selftest_debug_flags = set()
894
3675
 
895
3676
 
896
3677
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
897
 
             keep_output=False,
898
 
             transport=None):
 
3678
             transport=None,
 
3679
             test_suite_factory=None,
 
3680
             lsprof_timed=None,
 
3681
             bench_history=None,
 
3682
             matching_tests_first=None,
 
3683
             list_only=False,
 
3684
             random_seed=None,
 
3685
             exclude_pattern=None,
 
3686
             strict=False,
 
3687
             load_list=None,
 
3688
             debug_flags=None,
 
3689
             starting_with=None,
 
3690
             runner_class=None,
 
3691
             suite_decorators=None,
 
3692
             stream=None,
 
3693
             lsprof_tests=False,
 
3694
             ):
899
3695
    """Run the whole test suite under the enhanced runner"""
 
3696
    # XXX: Very ugly way to do this...
 
3697
    # Disable warning about old formats because we don't want it to disturb
 
3698
    # any blackbox tests.
 
3699
    from bzrlib import repository
 
3700
    repository._deprecation_warning_done = True
 
3701
 
900
3702
    global default_transport
901
3703
    if transport is None:
902
3704
        transport = default_transport
903
3705
    old_transport = default_transport
904
3706
    default_transport = transport
905
 
    suite = test_suite()
 
3707
    global selftest_debug_flags
 
3708
    old_debug_flags = selftest_debug_flags
 
3709
    if debug_flags is not None:
 
3710
        selftest_debug_flags = set(debug_flags)
906
3711
    try:
 
3712
        if load_list is None:
 
3713
            keep_only = None
 
3714
        else:
 
3715
            keep_only = load_test_id_list(load_list)
 
3716
        if starting_with:
 
3717
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3718
                             for start in starting_with]
 
3719
        if test_suite_factory is None:
 
3720
            # Reduce loading time by loading modules based on the starting_with
 
3721
            # patterns.
 
3722
            suite = test_suite(keep_only, starting_with)
 
3723
        else:
 
3724
            suite = test_suite_factory()
 
3725
        if starting_with:
 
3726
            # But always filter as requested.
 
3727
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3728
        result_decorators = []
 
3729
        if lsprof_tests:
 
3730
            result_decorators.append(ProfileResult)
907
3731
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
908
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
909
 
                     transport=transport)
 
3732
                     stop_on_failure=stop_on_failure,
 
3733
                     transport=transport,
 
3734
                     lsprof_timed=lsprof_timed,
 
3735
                     bench_history=bench_history,
 
3736
                     matching_tests_first=matching_tests_first,
 
3737
                     list_only=list_only,
 
3738
                     random_seed=random_seed,
 
3739
                     exclude_pattern=exclude_pattern,
 
3740
                     strict=strict,
 
3741
                     runner_class=runner_class,
 
3742
                     suite_decorators=suite_decorators,
 
3743
                     stream=stream,
 
3744
                     result_decorators=result_decorators,
 
3745
                     )
910
3746
    finally:
911
3747
        default_transport = old_transport
912
 
 
913
 
 
914
 
 
915
 
def test_suite():
916
 
    """Build and return TestSuite for the whole program."""
917
 
    from doctest import DocTestSuite
918
 
 
919
 
    global MODULES_TO_DOCTEST
920
 
 
921
 
    testmod_names = [ \
922
 
                   'bzrlib.tests.test_ancestry',
923
 
                   'bzrlib.tests.test_annotate',
924
 
                   'bzrlib.tests.test_api',
925
 
                   'bzrlib.tests.test_bad_files',
926
 
                   'bzrlib.tests.test_basis_inventory',
927
 
                   'bzrlib.tests.test_branch',
928
 
                   'bzrlib.tests.test_bzrdir',
929
 
                   'bzrlib.tests.test_command',
930
 
                   'bzrlib.tests.test_commit',
931
 
                   'bzrlib.tests.test_commit_merge',
932
 
                   'bzrlib.tests.test_config',
933
 
                   'bzrlib.tests.test_conflicts',
934
 
                   'bzrlib.tests.test_decorators',
935
 
                   'bzrlib.tests.test_diff',
936
 
                   'bzrlib.tests.test_doc_generate',
937
 
                   'bzrlib.tests.test_errors',
938
 
                   'bzrlib.tests.test_fetch',
939
 
                   'bzrlib.tests.test_gpg',
940
 
                   'bzrlib.tests.test_graph',
941
 
                   'bzrlib.tests.test_hashcache',
942
 
                   'bzrlib.tests.test_http',
943
 
                   'bzrlib.tests.test_identitymap',
944
 
                   'bzrlib.tests.test_inv',
945
 
                   'bzrlib.tests.test_knit',
946
 
                   'bzrlib.tests.test_lockdir',
947
 
                   'bzrlib.tests.test_lockable_files',
948
 
                   'bzrlib.tests.test_log',
949
 
                   'bzrlib.tests.test_merge',
950
 
                   'bzrlib.tests.test_merge3',
951
 
                   'bzrlib.tests.test_merge_core',
952
 
                   'bzrlib.tests.test_missing',
953
 
                   'bzrlib.tests.test_msgeditor',
954
 
                   'bzrlib.tests.test_nonascii',
955
 
                   'bzrlib.tests.test_options',
956
 
                   'bzrlib.tests.test_osutils',
957
 
                   'bzrlib.tests.test_permissions',
958
 
                   'bzrlib.tests.test_plugins',
959
 
                   'bzrlib.tests.test_progress',
960
 
                   'bzrlib.tests.test_reconcile',
961
 
                   'bzrlib.tests.test_repository',
962
 
                   'bzrlib.tests.test_revision',
963
 
                   'bzrlib.tests.test_revisionnamespaces',
964
 
                   'bzrlib.tests.test_revprops',
965
 
                   'bzrlib.tests.test_rio',
966
 
                   'bzrlib.tests.test_sampler',
967
 
                   'bzrlib.tests.test_selftest',
968
 
                   'bzrlib.tests.test_setup',
969
 
                   'bzrlib.tests.test_sftp_transport',
970
 
                   'bzrlib.tests.test_smart_add',
971
 
                   'bzrlib.tests.test_source',
972
 
                   'bzrlib.tests.test_store',
973
 
                   'bzrlib.tests.test_symbol_versioning',
974
 
                   'bzrlib.tests.test_testament',
975
 
                   'bzrlib.tests.test_trace',
976
 
                   'bzrlib.tests.test_transactions',
977
 
                   'bzrlib.tests.test_transform',
978
 
                   'bzrlib.tests.test_transport',
979
 
                   'bzrlib.tests.test_tsort',
980
 
                   'bzrlib.tests.test_ui',
981
 
                   'bzrlib.tests.test_uncommit',
982
 
                   'bzrlib.tests.test_upgrade',
983
 
                   'bzrlib.tests.test_versionedfile',
984
 
                   'bzrlib.tests.test_weave',
985
 
                   'bzrlib.tests.test_whitebox',
986
 
                   'bzrlib.tests.test_workingtree',
987
 
                   'bzrlib.tests.test_xml',
988
 
                   ]
989
 
    test_transport_implementations = [
990
 
        'bzrlib.tests.test_transport_implementations']
991
 
 
992
 
    TestCase.BZRPATH = osutils.pathjoin(
993
 
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
994
 
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
995
 
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
996
 
    print
997
 
    suite = TestSuite()
998
 
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
999
 
    # errors if it fails to load a named module - no indication of what's
1000
 
    # actually wrong, just "no such module".  We should probably override that
1001
 
    # class, but for the moment just load them ourselves. (mbp 20051202)
1002
 
    loader = TestLoader()
1003
 
    from bzrlib.transport import TransportTestProviderAdapter
1004
 
    adapter = TransportTestProviderAdapter()
1005
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
1006
 
    for mod_name in testmod_names:
1007
 
        mod = _load_module_by_name(mod_name)
1008
 
        suite.addTest(loader.loadTestsFromModule(mod))
1009
 
    for package in packages_to_test():
1010
 
        suite.addTest(package.test_suite())
1011
 
    for m in MODULES_TO_TEST:
1012
 
        suite.addTest(loader.loadTestsFromModule(m))
1013
 
    for m in (MODULES_TO_DOCTEST):
1014
 
        suite.addTest(DocTestSuite(m))
1015
 
    for name, plugin in bzrlib.plugin.all_plugins().items():
1016
 
        if getattr(plugin, 'test_suite', None) is not None:
1017
 
            suite.addTest(plugin.test_suite())
 
3748
        selftest_debug_flags = old_debug_flags
 
3749
 
 
3750
 
 
3751
def load_test_id_list(file_name):
 
3752
    """Load a test id list from a text file.
 
3753
 
 
3754
    The format is one test id by line.  No special care is taken to impose
 
3755
    strict rules, these test ids are used to filter the test suite so a test id
 
3756
    that do not match an existing test will do no harm. This allows user to add
 
3757
    comments, leave blank lines, etc.
 
3758
    """
 
3759
    test_list = []
 
3760
    try:
 
3761
        ftest = open(file_name, 'rt')
 
3762
    except IOError, e:
 
3763
        if e.errno != errno.ENOENT:
 
3764
            raise
 
3765
        else:
 
3766
            raise errors.NoSuchFile(file_name)
 
3767
 
 
3768
    for test_name in ftest.readlines():
 
3769
        test_list.append(test_name.strip())
 
3770
    ftest.close()
 
3771
    return test_list
 
3772
 
 
3773
 
 
3774
def suite_matches_id_list(test_suite, id_list):
 
3775
    """Warns about tests not appearing or appearing more than once.
 
3776
 
 
3777
    :param test_suite: A TestSuite object.
 
3778
    :param test_id_list: The list of test ids that should be found in
 
3779
         test_suite.
 
3780
 
 
3781
    :return: (absents, duplicates) absents is a list containing the test found
 
3782
        in id_list but not in test_suite, duplicates is a list containing the
 
3783
        test found multiple times in test_suite.
 
3784
 
 
3785
    When using a prefined test id list, it may occurs that some tests do not
 
3786
    exist anymore or that some tests use the same id. This function warns the
 
3787
    tester about potential problems in his workflow (test lists are volatile)
 
3788
    or in the test suite itself (using the same id for several tests does not
 
3789
    help to localize defects).
 
3790
    """
 
3791
    # Build a dict counting id occurrences
 
3792
    tests = dict()
 
3793
    for test in iter_suite_tests(test_suite):
 
3794
        id = test.id()
 
3795
        tests[id] = tests.get(id, 0) + 1
 
3796
 
 
3797
    not_found = []
 
3798
    duplicates = []
 
3799
    for id in id_list:
 
3800
        occurs = tests.get(id, 0)
 
3801
        if not occurs:
 
3802
            not_found.append(id)
 
3803
        elif occurs > 1:
 
3804
            duplicates.append(id)
 
3805
 
 
3806
    return not_found, duplicates
 
3807
 
 
3808
 
 
3809
class TestIdList(object):
 
3810
    """Test id list to filter a test suite.
 
3811
 
 
3812
    Relying on the assumption that test ids are built as:
 
3813
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3814
    notation, this class offers methods to :
 
3815
    - avoid building a test suite for modules not refered to in the test list,
 
3816
    - keep only the tests listed from the module test suite.
 
3817
    """
 
3818
 
 
3819
    def __init__(self, test_id_list):
 
3820
        # When a test suite needs to be filtered against us we compare test ids
 
3821
        # for equality, so a simple dict offers a quick and simple solution.
 
3822
        self.tests = dict().fromkeys(test_id_list, True)
 
3823
 
 
3824
        # While unittest.TestCase have ids like:
 
3825
        # <module>.<class>.<method>[(<param+)],
 
3826
        # doctest.DocTestCase can have ids like:
 
3827
        # <module>
 
3828
        # <module>.<class>
 
3829
        # <module>.<function>
 
3830
        # <module>.<class>.<method>
 
3831
 
 
3832
        # Since we can't predict a test class from its name only, we settle on
 
3833
        # a simple constraint: a test id always begins with its module name.
 
3834
 
 
3835
        modules = {}
 
3836
        for test_id in test_id_list:
 
3837
            parts = test_id.split('.')
 
3838
            mod_name = parts.pop(0)
 
3839
            modules[mod_name] = True
 
3840
            for part in parts:
 
3841
                mod_name += '.' + part
 
3842
                modules[mod_name] = True
 
3843
        self.modules = modules
 
3844
 
 
3845
    def refers_to(self, module_name):
 
3846
        """Is there tests for the module or one of its sub modules."""
 
3847
        return self.modules.has_key(module_name)
 
3848
 
 
3849
    def includes(self, test_id):
 
3850
        return self.tests.has_key(test_id)
 
3851
 
 
3852
 
 
3853
class TestPrefixAliasRegistry(registry.Registry):
 
3854
    """A registry for test prefix aliases.
 
3855
 
 
3856
    This helps implement shorcuts for the --starting-with selftest
 
3857
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3858
    warning will be emitted).
 
3859
    """
 
3860
 
 
3861
    def register(self, key, obj, help=None, info=None,
 
3862
                 override_existing=False):
 
3863
        """See Registry.register.
 
3864
 
 
3865
        Trying to override an existing alias causes a warning to be emitted,
 
3866
        not a fatal execption.
 
3867
        """
 
3868
        try:
 
3869
            super(TestPrefixAliasRegistry, self).register(
 
3870
                key, obj, help=help, info=info, override_existing=False)
 
3871
        except KeyError:
 
3872
            actual = self.get(key)
 
3873
            trace.note(
 
3874
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3875
                % (key, actual, obj))
 
3876
 
 
3877
    def resolve_alias(self, id_start):
 
3878
        """Replace the alias by the prefix in the given string.
 
3879
 
 
3880
        Using an unknown prefix is an error to help catching typos.
 
3881
        """
 
3882
        parts = id_start.split('.')
 
3883
        try:
 
3884
            parts[0] = self.get(parts[0])
 
3885
        except KeyError:
 
3886
            raise errors.BzrCommandError(
 
3887
                '%s is not a known test prefix alias' % parts[0])
 
3888
        return '.'.join(parts)
 
3889
 
 
3890
 
 
3891
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3892
"""Registry of test prefix aliases."""
 
3893
 
 
3894
 
 
3895
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3896
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3897
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3898
 
 
3899
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3900
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3901
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3902
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3903
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3904
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3905
 
 
3906
 
 
3907
def _test_suite_testmod_names():
 
3908
    """Return the standard list of test module names to test."""
 
3909
    return [
 
3910
        'bzrlib.doc',
 
3911
        'bzrlib.tests.blackbox',
 
3912
        'bzrlib.tests.commands',
 
3913
        'bzrlib.tests.per_branch',
 
3914
        'bzrlib.tests.per_bzrdir',
 
3915
        'bzrlib.tests.per_controldir',
 
3916
        'bzrlib.tests.per_controldir_colo',
 
3917
        'bzrlib.tests.per_foreign_vcs',
 
3918
        'bzrlib.tests.per_interrepository',
 
3919
        'bzrlib.tests.per_intertree',
 
3920
        'bzrlib.tests.per_inventory',
 
3921
        'bzrlib.tests.per_interbranch',
 
3922
        'bzrlib.tests.per_lock',
 
3923
        'bzrlib.tests.per_merger',
 
3924
        'bzrlib.tests.per_transport',
 
3925
        'bzrlib.tests.per_tree',
 
3926
        'bzrlib.tests.per_pack_repository',
 
3927
        'bzrlib.tests.per_repository',
 
3928
        'bzrlib.tests.per_repository_chk',
 
3929
        'bzrlib.tests.per_repository_reference',
 
3930
        'bzrlib.tests.per_repository_vf',
 
3931
        'bzrlib.tests.per_uifactory',
 
3932
        'bzrlib.tests.per_versionedfile',
 
3933
        'bzrlib.tests.per_workingtree',
 
3934
        'bzrlib.tests.test__annotator',
 
3935
        'bzrlib.tests.test__bencode',
 
3936
        'bzrlib.tests.test__btree_serializer',
 
3937
        'bzrlib.tests.test__chk_map',
 
3938
        'bzrlib.tests.test__dirstate_helpers',
 
3939
        'bzrlib.tests.test__groupcompress',
 
3940
        'bzrlib.tests.test__known_graph',
 
3941
        'bzrlib.tests.test__rio',
 
3942
        'bzrlib.tests.test__simple_set',
 
3943
        'bzrlib.tests.test__static_tuple',
 
3944
        'bzrlib.tests.test__walkdirs_win32',
 
3945
        'bzrlib.tests.test_ancestry',
 
3946
        'bzrlib.tests.test_annotate',
 
3947
        'bzrlib.tests.test_api',
 
3948
        'bzrlib.tests.test_atomicfile',
 
3949
        'bzrlib.tests.test_bad_files',
 
3950
        'bzrlib.tests.test_bisect_multi',
 
3951
        'bzrlib.tests.test_branch',
 
3952
        'bzrlib.tests.test_branchbuilder',
 
3953
        'bzrlib.tests.test_btree_index',
 
3954
        'bzrlib.tests.test_bugtracker',
 
3955
        'bzrlib.tests.test_bundle',
 
3956
        'bzrlib.tests.test_bzrdir',
 
3957
        'bzrlib.tests.test__chunks_to_lines',
 
3958
        'bzrlib.tests.test_cache_utf8',
 
3959
        'bzrlib.tests.test_chk_map',
 
3960
        'bzrlib.tests.test_chk_serializer',
 
3961
        'bzrlib.tests.test_chunk_writer',
 
3962
        'bzrlib.tests.test_clean_tree',
 
3963
        'bzrlib.tests.test_cleanup',
 
3964
        'bzrlib.tests.test_cmdline',
 
3965
        'bzrlib.tests.test_commands',
 
3966
        'bzrlib.tests.test_commit',
 
3967
        'bzrlib.tests.test_commit_merge',
 
3968
        'bzrlib.tests.test_config',
 
3969
        'bzrlib.tests.test_conflicts',
 
3970
        'bzrlib.tests.test_controldir',
 
3971
        'bzrlib.tests.test_counted_lock',
 
3972
        'bzrlib.tests.test_crash',
 
3973
        'bzrlib.tests.test_decorators',
 
3974
        'bzrlib.tests.test_delta',
 
3975
        'bzrlib.tests.test_debug',
 
3976
        'bzrlib.tests.test_diff',
 
3977
        'bzrlib.tests.test_directory_service',
 
3978
        'bzrlib.tests.test_dirstate',
 
3979
        'bzrlib.tests.test_email_message',
 
3980
        'bzrlib.tests.test_eol_filters',
 
3981
        'bzrlib.tests.test_errors',
 
3982
        'bzrlib.tests.test_estimate_compressed_size',
 
3983
        'bzrlib.tests.test_export',
 
3984
        'bzrlib.tests.test_export_pot',
 
3985
        'bzrlib.tests.test_extract',
 
3986
        'bzrlib.tests.test_features',
 
3987
        'bzrlib.tests.test_fetch',
 
3988
        'bzrlib.tests.test_fixtures',
 
3989
        'bzrlib.tests.test_fifo_cache',
 
3990
        'bzrlib.tests.test_filters',
 
3991
        'bzrlib.tests.test_filter_tree',
 
3992
        'bzrlib.tests.test_ftp_transport',
 
3993
        'bzrlib.tests.test_foreign',
 
3994
        'bzrlib.tests.test_generate_docs',
 
3995
        'bzrlib.tests.test_generate_ids',
 
3996
        'bzrlib.tests.test_globbing',
 
3997
        'bzrlib.tests.test_gpg',
 
3998
        'bzrlib.tests.test_graph',
 
3999
        'bzrlib.tests.test_groupcompress',
 
4000
        'bzrlib.tests.test_hashcache',
 
4001
        'bzrlib.tests.test_help',
 
4002
        'bzrlib.tests.test_hooks',
 
4003
        'bzrlib.tests.test_http',
 
4004
        'bzrlib.tests.test_http_response',
 
4005
        'bzrlib.tests.test_https_ca_bundle',
 
4006
        'bzrlib.tests.test_https_urllib',
 
4007
        'bzrlib.tests.test_i18n',
 
4008
        'bzrlib.tests.test_identitymap',
 
4009
        'bzrlib.tests.test_ignores',
 
4010
        'bzrlib.tests.test_index',
 
4011
        'bzrlib.tests.test_import_tariff',
 
4012
        'bzrlib.tests.test_info',
 
4013
        'bzrlib.tests.test_inv',
 
4014
        'bzrlib.tests.test_inventory_delta',
 
4015
        'bzrlib.tests.test_knit',
 
4016
        'bzrlib.tests.test_lazy_import',
 
4017
        'bzrlib.tests.test_lazy_regex',
 
4018
        'bzrlib.tests.test_library_state',
 
4019
        'bzrlib.tests.test_lock',
 
4020
        'bzrlib.tests.test_lockable_files',
 
4021
        'bzrlib.tests.test_lockdir',
 
4022
        'bzrlib.tests.test_log',
 
4023
        'bzrlib.tests.test_lru_cache',
 
4024
        'bzrlib.tests.test_lsprof',
 
4025
        'bzrlib.tests.test_mail_client',
 
4026
        'bzrlib.tests.test_matchers',
 
4027
        'bzrlib.tests.test_memorytree',
 
4028
        'bzrlib.tests.test_merge',
 
4029
        'bzrlib.tests.test_merge3',
 
4030
        'bzrlib.tests.test_merge_core',
 
4031
        'bzrlib.tests.test_merge_directive',
 
4032
        'bzrlib.tests.test_mergetools',
 
4033
        'bzrlib.tests.test_missing',
 
4034
        'bzrlib.tests.test_msgeditor',
 
4035
        'bzrlib.tests.test_multiparent',
 
4036
        'bzrlib.tests.test_mutabletree',
 
4037
        'bzrlib.tests.test_nonascii',
 
4038
        'bzrlib.tests.test_options',
 
4039
        'bzrlib.tests.test_osutils',
 
4040
        'bzrlib.tests.test_osutils_encodings',
 
4041
        'bzrlib.tests.test_pack',
 
4042
        'bzrlib.tests.test_patch',
 
4043
        'bzrlib.tests.test_patches',
 
4044
        'bzrlib.tests.test_permissions',
 
4045
        'bzrlib.tests.test_plugins',
 
4046
        'bzrlib.tests.test_progress',
 
4047
        'bzrlib.tests.test_pyutils',
 
4048
        'bzrlib.tests.test_read_bundle',
 
4049
        'bzrlib.tests.test_reconcile',
 
4050
        'bzrlib.tests.test_reconfigure',
 
4051
        'bzrlib.tests.test_registry',
 
4052
        'bzrlib.tests.test_remote',
 
4053
        'bzrlib.tests.test_rename_map',
 
4054
        'bzrlib.tests.test_repository',
 
4055
        'bzrlib.tests.test_revert',
 
4056
        'bzrlib.tests.test_revision',
 
4057
        'bzrlib.tests.test_revisionspec',
 
4058
        'bzrlib.tests.test_revisiontree',
 
4059
        'bzrlib.tests.test_rio',
 
4060
        'bzrlib.tests.test_rules',
 
4061
        'bzrlib.tests.test_url_policy_open',
 
4062
        'bzrlib.tests.test_sampler',
 
4063
        'bzrlib.tests.test_scenarios',
 
4064
        'bzrlib.tests.test_script',
 
4065
        'bzrlib.tests.test_selftest',
 
4066
        'bzrlib.tests.test_serializer',
 
4067
        'bzrlib.tests.test_setup',
 
4068
        'bzrlib.tests.test_sftp_transport',
 
4069
        'bzrlib.tests.test_shelf',
 
4070
        'bzrlib.tests.test_shelf_ui',
 
4071
        'bzrlib.tests.test_smart',
 
4072
        'bzrlib.tests.test_smart_add',
 
4073
        'bzrlib.tests.test_smart_request',
 
4074
        'bzrlib.tests.test_smart_signals',
 
4075
        'bzrlib.tests.test_smart_transport',
 
4076
        'bzrlib.tests.test_smtp_connection',
 
4077
        'bzrlib.tests.test_source',
 
4078
        'bzrlib.tests.test_ssh_transport',
 
4079
        'bzrlib.tests.test_status',
 
4080
        'bzrlib.tests.test_store',
 
4081
        'bzrlib.tests.test_strace',
 
4082
        'bzrlib.tests.test_subsume',
 
4083
        'bzrlib.tests.test_switch',
 
4084
        'bzrlib.tests.test_symbol_versioning',
 
4085
        'bzrlib.tests.test_tag',
 
4086
        'bzrlib.tests.test_test_server',
 
4087
        'bzrlib.tests.test_testament',
 
4088
        'bzrlib.tests.test_textfile',
 
4089
        'bzrlib.tests.test_textmerge',
 
4090
        'bzrlib.tests.test_cethread',
 
4091
        'bzrlib.tests.test_timestamp',
 
4092
        'bzrlib.tests.test_trace',
 
4093
        'bzrlib.tests.test_transactions',
 
4094
        'bzrlib.tests.test_transform',
 
4095
        'bzrlib.tests.test_transport',
 
4096
        'bzrlib.tests.test_transport_log',
 
4097
        'bzrlib.tests.test_tree',
 
4098
        'bzrlib.tests.test_treebuilder',
 
4099
        'bzrlib.tests.test_treeshape',
 
4100
        'bzrlib.tests.test_tsort',
 
4101
        'bzrlib.tests.test_tuned_gzip',
 
4102
        'bzrlib.tests.test_ui',
 
4103
        'bzrlib.tests.test_uncommit',
 
4104
        'bzrlib.tests.test_upgrade',
 
4105
        'bzrlib.tests.test_upgrade_stacked',
 
4106
        'bzrlib.tests.test_urlutils',
 
4107
        'bzrlib.tests.test_utextwrap',
 
4108
        'bzrlib.tests.test_version',
 
4109
        'bzrlib.tests.test_version_info',
 
4110
        'bzrlib.tests.test_versionedfile',
 
4111
        'bzrlib.tests.test_vf_search',
 
4112
        'bzrlib.tests.test_weave',
 
4113
        'bzrlib.tests.test_whitebox',
 
4114
        'bzrlib.tests.test_win32utils',
 
4115
        'bzrlib.tests.test_workingtree',
 
4116
        'bzrlib.tests.test_workingtree_4',
 
4117
        'bzrlib.tests.test_wsgi',
 
4118
        'bzrlib.tests.test_xml',
 
4119
        ]
 
4120
 
 
4121
 
 
4122
def _test_suite_modules_to_doctest():
 
4123
    """Return the list of modules to doctest."""
 
4124
    if __doc__ is None:
 
4125
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4126
        return []
 
4127
    return [
 
4128
        'bzrlib',
 
4129
        'bzrlib.branchbuilder',
 
4130
        'bzrlib.decorators',
 
4131
        'bzrlib.inventory',
 
4132
        'bzrlib.iterablefile',
 
4133
        'bzrlib.lockdir',
 
4134
        'bzrlib.merge3',
 
4135
        'bzrlib.option',
 
4136
        'bzrlib.pyutils',
 
4137
        'bzrlib.symbol_versioning',
 
4138
        'bzrlib.tests',
 
4139
        'bzrlib.tests.fixtures',
 
4140
        'bzrlib.timestamp',
 
4141
        'bzrlib.transport.http',
 
4142
        'bzrlib.version_info_formats.format_custom',
 
4143
        ]
 
4144
 
 
4145
 
 
4146
def test_suite(keep_only=None, starting_with=None):
 
4147
    """Build and return TestSuite for the whole of bzrlib.
 
4148
 
 
4149
    :param keep_only: A list of test ids limiting the suite returned.
 
4150
 
 
4151
    :param starting_with: An id limiting the suite returned to the tests
 
4152
         starting with it.
 
4153
 
 
4154
    This function can be replaced if you need to change the default test
 
4155
    suite on a global basis, but it is not encouraged.
 
4156
    """
 
4157
 
 
4158
    loader = TestUtil.TestLoader()
 
4159
 
 
4160
    if keep_only is not None:
 
4161
        id_filter = TestIdList(keep_only)
 
4162
    if starting_with:
 
4163
        # We take precedence over keep_only because *at loading time* using
 
4164
        # both options means we will load less tests for the same final result.
 
4165
        def interesting_module(name):
 
4166
            for start in starting_with:
 
4167
                if (
 
4168
                    # Either the module name starts with the specified string
 
4169
                    name.startswith(start)
 
4170
                    # or it may contain tests starting with the specified string
 
4171
                    or start.startswith(name)
 
4172
                    ):
 
4173
                    return True
 
4174
            return False
 
4175
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4176
 
 
4177
    elif keep_only is not None:
 
4178
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4179
        def interesting_module(name):
 
4180
            return id_filter.refers_to(name)
 
4181
 
 
4182
    else:
 
4183
        loader = TestUtil.TestLoader()
 
4184
        def interesting_module(name):
 
4185
            # No filtering, all modules are interesting
 
4186
            return True
 
4187
 
 
4188
    suite = loader.suiteClass()
 
4189
 
 
4190
    # modules building their suite with loadTestsFromModuleNames
 
4191
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4192
 
 
4193
    for mod in _test_suite_modules_to_doctest():
 
4194
        if not interesting_module(mod):
 
4195
            # No tests to keep here, move along
 
4196
            continue
 
4197
        try:
 
4198
            # note that this really does mean "report only" -- doctest
 
4199
            # still runs the rest of the examples
 
4200
            doc_suite = IsolatedDocTestSuite(
 
4201
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4202
        except ValueError, e:
 
4203
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
4204
            raise
 
4205
        if len(doc_suite._tests) == 0:
 
4206
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4207
        suite.addTest(doc_suite)
 
4208
 
 
4209
    default_encoding = sys.getdefaultencoding()
 
4210
    for name, plugin in _mod_plugin.plugins().items():
 
4211
        if not interesting_module(plugin.module.__name__):
 
4212
            continue
 
4213
        plugin_suite = plugin.test_suite()
 
4214
        # We used to catch ImportError here and turn it into just a warning,
 
4215
        # but really if you don't have --no-plugins this should be a failure.
 
4216
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4217
        if plugin_suite is None:
 
4218
            plugin_suite = plugin.load_plugin_tests(loader)
 
4219
        if plugin_suite is not None:
 
4220
            suite.addTest(plugin_suite)
 
4221
        if default_encoding != sys.getdefaultencoding():
 
4222
            trace.warning(
 
4223
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4224
                sys.getdefaultencoding())
 
4225
            reload(sys)
 
4226
            sys.setdefaultencoding(default_encoding)
 
4227
 
 
4228
    if keep_only is not None:
 
4229
        # Now that the referred modules have loaded their tests, keep only the
 
4230
        # requested ones.
 
4231
        suite = filter_suite_by_id_list(suite, id_filter)
 
4232
        # Do some sanity checks on the id_list filtering
 
4233
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4234
        if starting_with:
 
4235
            # The tester has used both keep_only and starting_with, so he is
 
4236
            # already aware that some tests are excluded from the list, there
 
4237
            # is no need to tell him which.
 
4238
            pass
 
4239
        else:
 
4240
            # Some tests mentioned in the list are not in the test suite. The
 
4241
            # list may be out of date, report to the tester.
 
4242
            for id in not_found:
 
4243
                trace.warning('"%s" not found in the test suite', id)
 
4244
        for id in duplicates:
 
4245
            trace.warning('"%s" is used as an id by several tests', id)
 
4246
 
1018
4247
    return suite
1019
4248
 
1020
4249
 
1021
 
def adapt_modules(mods_list, adapter, loader, suite):
1022
 
    """Adapt the modules in mods_list using adapter and add to suite."""
1023
 
    for mod_name in mods_list:
1024
 
        mod = _load_module_by_name(mod_name)
1025
 
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
1026
 
            suite.addTests(adapter.adapt(test))
1027
 
 
1028
 
 
1029
 
def _load_module_by_name(mod_name):
1030
 
    parts = mod_name.split('.')
1031
 
    module = __import__(mod_name)
1032
 
    del parts[0]
1033
 
    # for historical reasons python returns the top-level module even though
1034
 
    # it loads the submodule; we need to walk down to get the one we want.
1035
 
    while parts:
1036
 
        module = getattr(module, parts.pop(0))
1037
 
    return module
 
4250
def multiply_scenarios(*scenarios):
 
4251
    """Multiply two or more iterables of scenarios.
 
4252
 
 
4253
    It is safe to pass scenario generators or iterators.
 
4254
 
 
4255
    :returns: A list of compound scenarios: the cross-product of all 
 
4256
        scenarios, with the names concatenated and the parameters
 
4257
        merged together.
 
4258
    """
 
4259
    return reduce(_multiply_two_scenarios, map(list, scenarios))
 
4260
 
 
4261
 
 
4262
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4263
    """Multiply two sets of scenarios.
 
4264
 
 
4265
    :returns: the cartesian product of the two sets of scenarios, that is
 
4266
        a scenario for every possible combination of a left scenario and a
 
4267
        right scenario.
 
4268
    """
 
4269
    return [
 
4270
        ('%s,%s' % (left_name, right_name),
 
4271
         dict(left_dict.items() + right_dict.items()))
 
4272
        for left_name, left_dict in scenarios_left
 
4273
        for right_name, right_dict in scenarios_right]
 
4274
 
 
4275
 
 
4276
def multiply_tests(tests, scenarios, result):
 
4277
    """Multiply tests_list by scenarios into result.
 
4278
 
 
4279
    This is the core workhorse for test parameterisation.
 
4280
 
 
4281
    Typically the load_tests() method for a per-implementation test suite will
 
4282
    call multiply_tests and return the result.
 
4283
 
 
4284
    :param tests: The tests to parameterise.
 
4285
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4286
        scenario_param_dict).
 
4287
    :param result: A TestSuite to add created tests to.
 
4288
 
 
4289
    This returns the passed in result TestSuite with the cross product of all
 
4290
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4291
    the scenario name at the end of its id(), and updating the test object's
 
4292
    __dict__ with the scenario_param_dict.
 
4293
 
 
4294
    >>> import bzrlib.tests.test_sampler
 
4295
    >>> r = multiply_tests(
 
4296
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4297
    ...     [('one', dict(param=1)),
 
4298
    ...      ('two', dict(param=2))],
 
4299
    ...     TestUtil.TestSuite())
 
4300
    >>> tests = list(iter_suite_tests(r))
 
4301
    >>> len(tests)
 
4302
    2
 
4303
    >>> tests[0].id()
 
4304
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4305
    >>> tests[0].param
 
4306
    1
 
4307
    >>> tests[1].param
 
4308
    2
 
4309
    """
 
4310
    for test in iter_suite_tests(tests):
 
4311
        apply_scenarios(test, scenarios, result)
 
4312
    return result
 
4313
 
 
4314
 
 
4315
def apply_scenarios(test, scenarios, result):
 
4316
    """Apply the scenarios in scenarios to test and add to result.
 
4317
 
 
4318
    :param test: The test to apply scenarios to.
 
4319
    :param scenarios: An iterable of scenarios to apply to test.
 
4320
    :return: result
 
4321
    :seealso: apply_scenario
 
4322
    """
 
4323
    for scenario in scenarios:
 
4324
        result.addTest(apply_scenario(test, scenario))
 
4325
    return result
 
4326
 
 
4327
 
 
4328
def apply_scenario(test, scenario):
 
4329
    """Copy test and apply scenario to it.
 
4330
 
 
4331
    :param test: A test to adapt.
 
4332
    :param scenario: A tuple describing the scenarion.
 
4333
        The first element of the tuple is the new test id.
 
4334
        The second element is a dict containing attributes to set on the
 
4335
        test.
 
4336
    :return: The adapted test.
 
4337
    """
 
4338
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4339
    new_test = clone_test(test, new_id)
 
4340
    for name, value in scenario[1].items():
 
4341
        setattr(new_test, name, value)
 
4342
    return new_test
 
4343
 
 
4344
 
 
4345
def clone_test(test, new_id):
 
4346
    """Clone a test giving it a new id.
 
4347
 
 
4348
    :param test: The test to clone.
 
4349
    :param new_id: The id to assign to it.
 
4350
    :return: The new test.
 
4351
    """
 
4352
    new_test = copy.copy(test)
 
4353
    new_test.id = lambda: new_id
 
4354
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4355
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4356
    # read the test output for parameterized tests, because tracebacks will be
 
4357
    # associated with irrelevant tests.
 
4358
    try:
 
4359
        details = new_test._TestCase__details
 
4360
    except AttributeError:
 
4361
        # must be a different version of testtools than expected.  Do nothing.
 
4362
        pass
 
4363
    else:
 
4364
        # Reset the '__details' dict.
 
4365
        new_test._TestCase__details = {}
 
4366
    return new_test
 
4367
 
 
4368
 
 
4369
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4370
                                ext_module_name):
 
4371
    """Helper for permutating tests against an extension module.
 
4372
 
 
4373
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4374
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4375
    against both implementations. Setting 'test.module' to the appropriate
 
4376
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4377
 
 
4378
    :param standard_tests: A test suite to permute
 
4379
    :param loader: A TestLoader
 
4380
    :param py_module_name: The python path to a python module that can always
 
4381
        be loaded, and will be considered the 'python' implementation. (eg
 
4382
        'bzrlib._chk_map_py')
 
4383
    :param ext_module_name: The python path to an extension module. If the
 
4384
        module cannot be loaded, a single test will be added, which notes that
 
4385
        the module is not available. If it can be loaded, all standard_tests
 
4386
        will be run against that module.
 
4387
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4388
        tests. feature is the Feature object that can be used to determine if
 
4389
        the module is available.
 
4390
    """
 
4391
 
 
4392
    from bzrlib.tests.features import ModuleAvailableFeature
 
4393
    py_module = pyutils.get_named_object(py_module_name)
 
4394
    scenarios = [
 
4395
        ('python', {'module': py_module}),
 
4396
    ]
 
4397
    suite = loader.suiteClass()
 
4398
    feature = ModuleAvailableFeature(ext_module_name)
 
4399
    if feature.available():
 
4400
        scenarios.append(('C', {'module': feature.module}))
 
4401
    else:
 
4402
        # the compiled module isn't available, so we add a failing test
 
4403
        class FailWithoutFeature(TestCase):
 
4404
            def test_fail(self):
 
4405
                self.requireFeature(feature)
 
4406
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4407
    result = multiply_tests(standard_tests, scenarios, suite)
 
4408
    return result, feature
 
4409
 
 
4410
 
 
4411
def _rmtree_temp_dir(dirname, test_id=None):
 
4412
    # If LANG=C we probably have created some bogus paths
 
4413
    # which rmtree(unicode) will fail to delete
 
4414
    # so make sure we are using rmtree(str) to delete everything
 
4415
    # except on win32, where rmtree(str) will fail
 
4416
    # since it doesn't have the property of byte-stream paths
 
4417
    # (they are either ascii or mbcs)
 
4418
    if sys.platform == 'win32':
 
4419
        # make sure we are using the unicode win32 api
 
4420
        dirname = unicode(dirname)
 
4421
    else:
 
4422
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4423
    try:
 
4424
        osutils.rmtree(dirname)
 
4425
    except OSError, e:
 
4426
        # We don't want to fail here because some useful display will be lost
 
4427
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4428
        # possible info to the test runner is even worse.
 
4429
        if test_id != None:
 
4430
            ui.ui_factory.clear_term()
 
4431
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4432
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4433
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4434
                                    ).encode('ascii', 'replace')
 
4435
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4436
                         % (os.path.basename(dirname), printable_e))
 
4437
 
 
4438
 
 
4439
def probe_unicode_in_user_encoding():
 
4440
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4441
    Return first successfull match.
 
4442
 
 
4443
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4444
    """
 
4445
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4446
    for uni_val in possible_vals:
 
4447
        try:
 
4448
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4449
        except UnicodeEncodeError:
 
4450
            # Try a different character
 
4451
            pass
 
4452
        else:
 
4453
            return uni_val, str_val
 
4454
    return None, None
 
4455
 
 
4456
 
 
4457
def probe_bad_non_ascii(encoding):
 
4458
    """Try to find [bad] character with code [128..255]
 
4459
    that cannot be decoded to unicode in some encoding.
 
4460
    Return None if all non-ascii characters is valid
 
4461
    for given encoding.
 
4462
    """
 
4463
    for i in xrange(128, 256):
 
4464
        char = chr(i)
 
4465
        try:
 
4466
            char.decode(encoding)
 
4467
        except UnicodeDecodeError:
 
4468
            return char
 
4469
    return None
 
4470
 
 
4471
 
 
4472
# Only define SubUnitBzrRunner if subunit is available.
 
4473
try:
 
4474
    from subunit import TestProtocolClient
 
4475
    from subunit.test_results import AutoTimingTestResultDecorator
 
4476
    class SubUnitBzrProtocolClient(TestProtocolClient):
 
4477
 
 
4478
        def stopTest(self, test):
 
4479
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4480
            _clear__type_equality_funcs(test)
 
4481
 
 
4482
        def addSuccess(self, test, details=None):
 
4483
            # The subunit client always includes the details in the subunit
 
4484
            # stream, but we don't want to include it in ours.
 
4485
            if details is not None and 'log' in details:
 
4486
                del details['log']
 
4487
            return super(SubUnitBzrProtocolClient, self).addSuccess(
 
4488
                test, details)
 
4489
 
 
4490
    class SubUnitBzrRunner(TextTestRunner):
 
4491
        def run(self, test):
 
4492
            result = AutoTimingTestResultDecorator(
 
4493
                SubUnitBzrProtocolClient(self.stream))
 
4494
            test.run(result)
 
4495
            return result
 
4496
except ImportError:
 
4497
    pass
 
4498
 
 
4499
 
 
4500
# API compatibility for old plugins; see bug 892622.
 
4501
for name in [
 
4502
    'Feature',
 
4503
    'HTTPServerFeature', 
 
4504
    'ModuleAvailableFeature',
 
4505
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4506
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4507
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4508
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4509
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4510
    'posix_permissions_feature',
 
4511
    ]:
 
4512
    globals()[name] = _CompatabilityThunkFeature(
 
4513
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4514
        'bzrlib.tests', name,
 
4515
        name, 'bzrlib.tests.features')
 
4516
 
 
4517
 
 
4518
for (old_name, new_name) in [
 
4519
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4520
    ]:
 
4521
    globals()[name] = _CompatabilityThunkFeature(
 
4522
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4523
        'bzrlib.tests', old_name,
 
4524
        new_name, 'bzrlib.tests.features')