~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Packman
  • Date: 2011-12-08 19:00:14 UTC
  • mto: This revision was merged to the branch mainline in revision 6359.
  • Revision ID: martin.packman@canonical.com-20111208190014-mi8jm6v7jygmhb0r
Use --include-duplicates for make update-pot which already combines multiple msgid strings prettily

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
# NOTE: Some classes in here use camelCaseNaming() rather than
 
20
# underscore_naming().  That's for consistency with unittest; it's not the
 
21
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
22
# new assertFoo() methods.
 
23
 
 
24
import atexit
 
25
import codecs
 
26
import copy
 
27
from cStringIO import StringIO
 
28
import difflib
 
29
import doctest
 
30
import errno
 
31
import itertools
 
32
import logging
 
33
import os
 
34
import platform
 
35
import pprint
 
36
import random
 
37
import re
 
38
import shlex
 
39
import site
 
40
import stat
 
41
import subprocess
 
42
import sys
 
43
import tempfile
 
44
import threading
 
45
import time
 
46
import traceback
 
47
import unittest
 
48
import warnings
 
49
 
 
50
import testtools
 
51
# nb: check this before importing anything else from within it
 
52
_testtools_version = getattr(testtools, '__version__', ())
 
53
if _testtools_version < (0, 9, 5):
 
54
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
55
        % (testtools.__file__, _testtools_version))
 
56
from testtools import content
 
57
 
 
58
import bzrlib
 
59
from bzrlib import (
 
60
    branchbuilder,
 
61
    bzrdir,
 
62
    chk_map,
 
63
    commands as _mod_commands,
 
64
    config,
 
65
    i18n,
 
66
    debug,
 
67
    errors,
 
68
    hooks,
 
69
    lock as _mod_lock,
 
70
    lockdir,
 
71
    memorytree,
 
72
    osutils,
 
73
    plugin as _mod_plugin,
 
74
    pyutils,
 
75
    ui,
 
76
    urlutils,
 
77
    registry,
 
78
    symbol_versioning,
 
79
    trace,
 
80
    transport as _mod_transport,
 
81
    workingtree,
 
82
    )
 
83
try:
 
84
    import bzrlib.lsprof
 
85
except ImportError:
 
86
    # lsprof not available
 
87
    pass
 
88
from bzrlib.smart import client, request
 
89
from bzrlib.transport import (
 
90
    memory,
 
91
    pathfilter,
 
92
    )
 
93
from bzrlib.symbol_versioning import (
 
94
    deprecated_function,
 
95
    deprecated_in,
 
96
    )
 
97
from bzrlib.tests import (
 
98
    fixtures,
 
99
    test_server,
 
100
    TestUtil,
 
101
    treeshape,
 
102
    )
 
103
from bzrlib.ui import NullProgressView
 
104
from bzrlib.ui.text import TextUIFactory
 
105
from bzrlib.tests.features import _CompatabilityThunkFeature
 
106
 
 
107
# Mark this python module as being part of the implementation
 
108
# of unittest: this gives us better tracebacks where the last
 
109
# shown frame is the test code, not our assertXYZ.
 
110
__unittest = 1
 
111
 
 
112
default_transport = test_server.LocalURLServer
 
113
 
 
114
 
 
115
_unitialized_attr = object()
 
116
"""A sentinel needed to act as a default value in a method signature."""
 
117
 
 
118
 
 
119
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
120
SUBUNIT_SEEK_SET = 0
 
121
SUBUNIT_SEEK_CUR = 1
 
122
 
 
123
# These are intentionally brought into this namespace. That way plugins, etc
 
124
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
 
125
TestSuite = TestUtil.TestSuite
 
126
TestLoader = TestUtil.TestLoader
 
127
 
 
128
# Tests should run in a clean and clearly defined environment. The goal is to
 
129
# keep them isolated from the running environment as mush as possible. The test
 
130
# framework ensures the variables defined below are set (or deleted if the
 
131
# value is None) before a test is run and reset to their original value after
 
132
# the test is run. Generally if some code depends on an environment variable,
 
133
# the tests should start without this variable in the environment. There are a
 
134
# few exceptions but you shouldn't violate this rule lightly.
 
135
isolated_environ = {
 
136
    'BZR_HOME': None,
 
137
    'HOME': None,
 
138
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
139
    # tests do check our impls match APPDATA
 
140
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
141
    'VISUAL': None,
 
142
    'EDITOR': None,
 
143
    'BZR_EMAIL': None,
 
144
    'BZREMAIL': None, # may still be present in the environment
 
145
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
146
    'BZR_PROGRESS_BAR': None,
 
147
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
148
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
149
    # TestCase should not use disk resources, BZR_LOG is one.
 
150
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
151
    'BZR_PLUGIN_PATH': None,
 
152
    'BZR_DISABLE_PLUGINS': None,
 
153
    'BZR_PLUGINS_AT': None,
 
154
    'BZR_CONCURRENCY': None,
 
155
    # Make sure that any text ui tests are consistent regardless of
 
156
    # the environment the test case is run in; you may want tests that
 
157
    # test other combinations.  'dumb' is a reasonable guess for tests
 
158
    # going to a pipe or a StringIO.
 
159
    'TERM': 'dumb',
 
160
    'LINES': '25',
 
161
    'COLUMNS': '80',
 
162
    'BZR_COLUMNS': '80',
 
163
    # Disable SSH Agent
 
164
    'SSH_AUTH_SOCK': None,
 
165
    # Proxies
 
166
    'http_proxy': None,
 
167
    'HTTP_PROXY': None,
 
168
    'https_proxy': None,
 
169
    'HTTPS_PROXY': None,
 
170
    'no_proxy': None,
 
171
    'NO_PROXY': None,
 
172
    'all_proxy': None,
 
173
    'ALL_PROXY': None,
 
174
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
175
    # least. If you do (care), please update this comment
 
176
    # -- vila 20080401
 
177
    'ftp_proxy': None,
 
178
    'FTP_PROXY': None,
 
179
    'BZR_REMOTE_PATH': None,
 
180
    # Generally speaking, we don't want apport reporting on crashes in
 
181
    # the test envirnoment unless we're specifically testing apport,
 
182
    # so that it doesn't leak into the real system environment.  We
 
183
    # use an env var so it propagates to subprocesses.
 
184
    'APPORT_DISABLE': '1',
 
185
    }
 
186
 
 
187
 
 
188
def override_os_environ(test, env=None):
 
189
    """Modify os.environ keeping a copy.
 
190
    
 
191
    :param test: A test instance
 
192
 
 
193
    :param env: A dict containing variable definitions to be installed
 
194
    """
 
195
    if env is None:
 
196
        env = isolated_environ
 
197
    test._original_os_environ = dict([(var, value)
 
198
                                      for var, value in os.environ.iteritems()])
 
199
    for var, value in env.iteritems():
 
200
        osutils.set_or_unset_env(var, value)
 
201
        if var not in test._original_os_environ:
 
202
            # The var is new, add it with a value of None, so
 
203
            # restore_os_environ will delete it
 
204
            test._original_os_environ[var] = None
 
205
 
 
206
 
 
207
def restore_os_environ(test):
 
208
    """Restore os.environ to its original state.
 
209
 
 
210
    :param test: A test instance previously passed to override_os_environ.
 
211
    """
 
212
    for var, value in test._original_os_environ.iteritems():
 
213
        # Restore the original value (or delete it if the value has been set to
 
214
        # None in override_os_environ).
 
215
        osutils.set_or_unset_env(var, value)
 
216
 
 
217
 
 
218
def _clear__type_equality_funcs(test):
 
219
    """Cleanup bound methods stored on TestCase instances
 
220
 
 
221
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
222
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
223
 
 
224
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
225
    shipped in Oneiric, an object with no clear method was used, hence the
 
226
    extra complications, see bug 809048 for details.
 
227
    """
 
228
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
229
    if type_equality_funcs is not None:
 
230
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
231
        if tef_clear is None:
 
232
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
233
            if tef_instance_dict is not None:
 
234
                tef_clear = tef_instance_dict.clear
 
235
        if tef_clear is not None:
 
236
            tef_clear()
 
237
 
 
238
 
 
239
class ExtendedTestResult(testtools.TextTestResult):
 
240
    """Accepts, reports and accumulates the results of running tests.
 
241
 
 
242
    Compared to the unittest version this class adds support for
 
243
    profiling, benchmarking, stopping as soon as a test fails,  and
 
244
    skipping tests.  There are further-specialized subclasses for
 
245
    different types of display.
 
246
 
 
247
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
248
    addFailure or addError classes.  These in turn may redirect to a more
 
249
    specific case for the special test results supported by our extended
 
250
    tests.
 
251
 
 
252
    Note that just one of these objects is fed the results from many tests.
 
253
    """
 
254
 
 
255
    stop_early = False
 
256
 
 
257
    def __init__(self, stream, descriptions, verbosity,
 
258
                 bench_history=None,
 
259
                 strict=False,
 
260
                 ):
 
261
        """Construct new TestResult.
 
262
 
 
263
        :param bench_history: Optionally, a writable file object to accumulate
 
264
            benchmark results.
 
265
        """
 
266
        testtools.TextTestResult.__init__(self, stream)
 
267
        if bench_history is not None:
 
268
            from bzrlib.version import _get_bzr_source_tree
 
269
            src_tree = _get_bzr_source_tree()
 
270
            if src_tree:
 
271
                try:
 
272
                    revision_id = src_tree.get_parent_ids()[0]
 
273
                except IndexError:
 
274
                    # XXX: if this is a brand new tree, do the same as if there
 
275
                    # is no branch.
 
276
                    revision_id = ''
 
277
            else:
 
278
                # XXX: If there's no branch, what should we do?
 
279
                revision_id = ''
 
280
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
281
        self._bench_history = bench_history
 
282
        self.ui = ui.ui_factory
 
283
        self.num_tests = 0
 
284
        self.error_count = 0
 
285
        self.failure_count = 0
 
286
        self.known_failure_count = 0
 
287
        self.skip_count = 0
 
288
        self.not_applicable_count = 0
 
289
        self.unsupported = {}
 
290
        self.count = 0
 
291
        self._overall_start_time = time.time()
 
292
        self._strict = strict
 
293
        self._first_thread_leaker_id = None
 
294
        self._tests_leaking_threads_count = 0
 
295
        self._traceback_from_test = None
 
296
 
 
297
    def stopTestRun(self):
 
298
        run = self.testsRun
 
299
        actionTaken = "Ran"
 
300
        stopTime = time.time()
 
301
        timeTaken = stopTime - self.startTime
 
302
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
303
        #                the parent class method is similar have to duplicate
 
304
        self._show_list('ERROR', self.errors)
 
305
        self._show_list('FAIL', self.failures)
 
306
        self.stream.write(self.sep2)
 
307
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
308
                            run, run != 1 and "s" or "", timeTaken))
 
309
        if not self.wasSuccessful():
 
310
            self.stream.write("FAILED (")
 
311
            failed, errored = map(len, (self.failures, self.errors))
 
312
            if failed:
 
313
                self.stream.write("failures=%d" % failed)
 
314
            if errored:
 
315
                if failed: self.stream.write(", ")
 
316
                self.stream.write("errors=%d" % errored)
 
317
            if self.known_failure_count:
 
318
                if failed or errored: self.stream.write(", ")
 
319
                self.stream.write("known_failure_count=%d" %
 
320
                    self.known_failure_count)
 
321
            self.stream.write(")\n")
 
322
        else:
 
323
            if self.known_failure_count:
 
324
                self.stream.write("OK (known_failures=%d)\n" %
 
325
                    self.known_failure_count)
 
326
            else:
 
327
                self.stream.write("OK\n")
 
328
        if self.skip_count > 0:
 
329
            skipped = self.skip_count
 
330
            self.stream.write('%d test%s skipped\n' %
 
331
                                (skipped, skipped != 1 and "s" or ""))
 
332
        if self.unsupported:
 
333
            for feature, count in sorted(self.unsupported.items()):
 
334
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
335
                    (feature, count))
 
336
        if self._strict:
 
337
            ok = self.wasStrictlySuccessful()
 
338
        else:
 
339
            ok = self.wasSuccessful()
 
340
        if self._first_thread_leaker_id:
 
341
            self.stream.write(
 
342
                '%s is leaking threads among %d leaking tests.\n' % (
 
343
                self._first_thread_leaker_id,
 
344
                self._tests_leaking_threads_count))
 
345
            # We don't report the main thread as an active one.
 
346
            self.stream.write(
 
347
                '%d non-main threads were left active in the end.\n'
 
348
                % (len(self._active_threads) - 1))
 
349
 
 
350
    def getDescription(self, test):
 
351
        return test.id()
 
352
 
 
353
    def _extractBenchmarkTime(self, testCase, details=None):
 
354
        """Add a benchmark time for the current test case."""
 
355
        if details and 'benchtime' in details:
 
356
            return float(''.join(details['benchtime'].iter_bytes()))
 
357
        return getattr(testCase, "_benchtime", None)
 
358
 
 
359
    def _elapsedTestTimeString(self):
 
360
        """Return a time string for the overall time the current test has taken."""
 
361
        return self._formatTime(self._delta_to_float(
 
362
            self._now() - self._start_datetime))
 
363
 
 
364
    def _testTimeString(self, testCase):
 
365
        benchmark_time = self._extractBenchmarkTime(testCase)
 
366
        if benchmark_time is not None:
 
367
            return self._formatTime(benchmark_time) + "*"
 
368
        else:
 
369
            return self._elapsedTestTimeString()
 
370
 
 
371
    def _formatTime(self, seconds):
 
372
        """Format seconds as milliseconds with leading spaces."""
 
373
        # some benchmarks can take thousands of seconds to run, so we need 8
 
374
        # places
 
375
        return "%8dms" % (1000 * seconds)
 
376
 
 
377
    def _shortened_test_description(self, test):
 
378
        what = test.id()
 
379
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
380
        return what
 
381
 
 
382
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
383
    #                multiple times in a row, because the handler is added for
 
384
    #                each test but the container list is shared between cases.
 
385
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
386
    def _record_traceback_from_test(self, exc_info):
 
387
        """Store the traceback from passed exc_info tuple till"""
 
388
        self._traceback_from_test = exc_info[2]
 
389
 
 
390
    def startTest(self, test):
 
391
        super(ExtendedTestResult, self).startTest(test)
 
392
        if self.count == 0:
 
393
            self.startTests()
 
394
        self.count += 1
 
395
        self.report_test_start(test)
 
396
        test.number = self.count
 
397
        self._recordTestStartTime()
 
398
        # Make testtools cases give us the real traceback on failure
 
399
        addOnException = getattr(test, "addOnException", None)
 
400
        if addOnException is not None:
 
401
            addOnException(self._record_traceback_from_test)
 
402
        # Only check for thread leaks on bzrlib derived test cases
 
403
        if isinstance(test, TestCase):
 
404
            test.addCleanup(self._check_leaked_threads, test)
 
405
 
 
406
    def stopTest(self, test):
 
407
        super(ExtendedTestResult, self).stopTest(test)
 
408
        # Manually break cycles, means touching various private things but hey
 
409
        getDetails = getattr(test, "getDetails", None)
 
410
        if getDetails is not None:
 
411
            getDetails().clear()
 
412
        _clear__type_equality_funcs(test)
 
413
        self._traceback_from_test = None
 
414
 
 
415
    def startTests(self):
 
416
        self.report_tests_starting()
 
417
        self._active_threads = threading.enumerate()
 
418
 
 
419
    def _check_leaked_threads(self, test):
 
420
        """See if any threads have leaked since last call
 
421
 
 
422
        A sample of live threads is stored in the _active_threads attribute,
 
423
        when this method runs it compares the current live threads and any not
 
424
        in the previous sample are treated as having leaked.
 
425
        """
 
426
        now_active_threads = set(threading.enumerate())
 
427
        threads_leaked = now_active_threads.difference(self._active_threads)
 
428
        if threads_leaked:
 
429
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
430
            self._tests_leaking_threads_count += 1
 
431
            if self._first_thread_leaker_id is None:
 
432
                self._first_thread_leaker_id = test.id()
 
433
            self._active_threads = now_active_threads
 
434
 
 
435
    def _recordTestStartTime(self):
 
436
        """Record that a test has started."""
 
437
        self._start_datetime = self._now()
 
438
 
 
439
    def addError(self, test, err):
 
440
        """Tell result that test finished with an error.
 
441
 
 
442
        Called from the TestCase run() method when the test
 
443
        fails with an unexpected error.
 
444
        """
 
445
        self._post_mortem(self._traceback_from_test)
 
446
        super(ExtendedTestResult, self).addError(test, err)
 
447
        self.error_count += 1
 
448
        self.report_error(test, err)
 
449
        if self.stop_early:
 
450
            self.stop()
 
451
 
 
452
    def addFailure(self, test, err):
 
453
        """Tell result that test failed.
 
454
 
 
455
        Called from the TestCase run() method when the test
 
456
        fails because e.g. an assert() method failed.
 
457
        """
 
458
        self._post_mortem(self._traceback_from_test)
 
459
        super(ExtendedTestResult, self).addFailure(test, err)
 
460
        self.failure_count += 1
 
461
        self.report_failure(test, err)
 
462
        if self.stop_early:
 
463
            self.stop()
 
464
 
 
465
    def addSuccess(self, test, details=None):
 
466
        """Tell result that test completed successfully.
 
467
 
 
468
        Called from the TestCase run()
 
469
        """
 
470
        if self._bench_history is not None:
 
471
            benchmark_time = self._extractBenchmarkTime(test, details)
 
472
            if benchmark_time is not None:
 
473
                self._bench_history.write("%s %s\n" % (
 
474
                    self._formatTime(benchmark_time),
 
475
                    test.id()))
 
476
        self.report_success(test)
 
477
        super(ExtendedTestResult, self).addSuccess(test)
 
478
        test._log_contents = ''
 
479
 
 
480
    def addExpectedFailure(self, test, err):
 
481
        self.known_failure_count += 1
 
482
        self.report_known_failure(test, err)
 
483
 
 
484
    def addUnexpectedSuccess(self, test, details=None):
 
485
        """Tell result the test unexpectedly passed, counting as a failure
 
486
 
 
487
        When the minimum version of testtools required becomes 0.9.8 this
 
488
        can be updated to use the new handling there.
 
489
        """
 
490
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
491
        self.failure_count += 1
 
492
        self.report_unexpected_success(test,
 
493
            "".join(details["reason"].iter_text()))
 
494
        if self.stop_early:
 
495
            self.stop()
 
496
 
 
497
    def addNotSupported(self, test, feature):
 
498
        """The test will not be run because of a missing feature.
 
499
        """
 
500
        # this can be called in two different ways: it may be that the
 
501
        # test started running, and then raised (through requireFeature)
 
502
        # UnavailableFeature.  Alternatively this method can be called
 
503
        # while probing for features before running the test code proper; in
 
504
        # that case we will see startTest and stopTest, but the test will
 
505
        # never actually run.
 
506
        self.unsupported.setdefault(str(feature), 0)
 
507
        self.unsupported[str(feature)] += 1
 
508
        self.report_unsupported(test, feature)
 
509
 
 
510
    def addSkip(self, test, reason):
 
511
        """A test has not run for 'reason'."""
 
512
        self.skip_count += 1
 
513
        self.report_skip(test, reason)
 
514
 
 
515
    def addNotApplicable(self, test, reason):
 
516
        self.not_applicable_count += 1
 
517
        self.report_not_applicable(test, reason)
 
518
 
 
519
    def _count_stored_tests(self):
 
520
        """Count of tests instances kept alive due to not succeeding"""
 
521
        return self.error_count + self.failure_count + self.known_failure_count
 
522
 
 
523
    def _post_mortem(self, tb=None):
 
524
        """Start a PDB post mortem session."""
 
525
        if os.environ.get('BZR_TEST_PDB', None):
 
526
            import pdb
 
527
            pdb.post_mortem(tb)
 
528
 
 
529
    def progress(self, offset, whence):
 
530
        """The test is adjusting the count of tests to run."""
 
531
        if whence == SUBUNIT_SEEK_SET:
 
532
            self.num_tests = offset
 
533
        elif whence == SUBUNIT_SEEK_CUR:
 
534
            self.num_tests += offset
 
535
        else:
 
536
            raise errors.BzrError("Unknown whence %r" % whence)
 
537
 
 
538
    def report_tests_starting(self):
 
539
        """Display information before the test run begins"""
 
540
        if getattr(sys, 'frozen', None) is None:
 
541
            bzr_path = osutils.realpath(sys.argv[0])
 
542
        else:
 
543
            bzr_path = sys.executable
 
544
        self.stream.write(
 
545
            'bzr selftest: %s\n' % (bzr_path,))
 
546
        self.stream.write(
 
547
            '   %s\n' % (
 
548
                    bzrlib.__path__[0],))
 
549
        self.stream.write(
 
550
            '   bzr-%s python-%s %s\n' % (
 
551
                    bzrlib.version_string,
 
552
                    bzrlib._format_version_tuple(sys.version_info),
 
553
                    platform.platform(aliased=1),
 
554
                    ))
 
555
        self.stream.write('\n')
 
556
 
 
557
    def report_test_start(self, test):
 
558
        """Display information on the test just about to be run"""
 
559
 
 
560
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
561
        """Display information on a test that leaked one or more threads"""
 
562
        # GZ 2010-09-09: A leak summary reported separately from the general
 
563
        #                thread debugging would be nice. Tests under subunit
 
564
        #                need something not using stream, perhaps adding a
 
565
        #                testtools details object would be fitting.
 
566
        if 'threads' in selftest_debug_flags:
 
567
            self.stream.write('%s is leaking, active is now %d\n' %
 
568
                (test.id(), len(active_threads)))
 
569
 
 
570
    def startTestRun(self):
 
571
        self.startTime = time.time()
 
572
 
 
573
    def report_success(self, test):
 
574
        pass
 
575
 
 
576
    def wasStrictlySuccessful(self):
 
577
        if self.unsupported or self.known_failure_count:
 
578
            return False
 
579
        return self.wasSuccessful()
 
580
 
 
581
 
 
582
class TextTestResult(ExtendedTestResult):
 
583
    """Displays progress and results of tests in text form"""
 
584
 
 
585
    def __init__(self, stream, descriptions, verbosity,
 
586
                 bench_history=None,
 
587
                 pb=None,
 
588
                 strict=None,
 
589
                 ):
 
590
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
591
            bench_history, strict)
 
592
        # We no longer pass them around, but just rely on the UIFactory stack
 
593
        # for state
 
594
        if pb is not None:
 
595
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
596
        self.pb = self.ui.nested_progress_bar()
 
597
        self.pb.show_pct = False
 
598
        self.pb.show_spinner = False
 
599
        self.pb.show_eta = False,
 
600
        self.pb.show_count = False
 
601
        self.pb.show_bar = False
 
602
        self.pb.update_latency = 0
 
603
        self.pb.show_transport_activity = False
 
604
 
 
605
    def stopTestRun(self):
 
606
        # called when the tests that are going to run have run
 
607
        self.pb.clear()
 
608
        self.pb.finished()
 
609
        super(TextTestResult, self).stopTestRun()
 
610
 
 
611
    def report_tests_starting(self):
 
612
        super(TextTestResult, self).report_tests_starting()
 
613
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
614
 
 
615
    def _progress_prefix_text(self):
 
616
        # the longer this text, the less space we have to show the test
 
617
        # name...
 
618
        a = '[%d' % self.count              # total that have been run
 
619
        # tests skipped as known not to be relevant are not important enough
 
620
        # to show here
 
621
        ## if self.skip_count:
 
622
        ##     a += ', %d skip' % self.skip_count
 
623
        ## if self.known_failure_count:
 
624
        ##     a += '+%dX' % self.known_failure_count
 
625
        if self.num_tests:
 
626
            a +='/%d' % self.num_tests
 
627
        a += ' in '
 
628
        runtime = time.time() - self._overall_start_time
 
629
        if runtime >= 60:
 
630
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
631
        else:
 
632
            a += '%ds' % runtime
 
633
        total_fail_count = self.error_count + self.failure_count
 
634
        if total_fail_count:
 
635
            a += ', %d failed' % total_fail_count
 
636
        # if self.unsupported:
 
637
        #     a += ', %d missing' % len(self.unsupported)
 
638
        a += ']'
 
639
        return a
 
640
 
 
641
    def report_test_start(self, test):
 
642
        self.pb.update(
 
643
                self._progress_prefix_text()
 
644
                + ' '
 
645
                + self._shortened_test_description(test))
 
646
 
 
647
    def _test_description(self, test):
 
648
        return self._shortened_test_description(test)
 
649
 
 
650
    def report_error(self, test, err):
 
651
        self.stream.write('ERROR: %s\n    %s\n' % (
 
652
            self._test_description(test),
 
653
            err[1],
 
654
            ))
 
655
 
 
656
    def report_failure(self, test, err):
 
657
        self.stream.write('FAIL: %s\n    %s\n' % (
 
658
            self._test_description(test),
 
659
            err[1],
 
660
            ))
 
661
 
 
662
    def report_known_failure(self, test, err):
 
663
        pass
 
664
 
 
665
    def report_unexpected_success(self, test, reason):
 
666
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
667
            self._test_description(test),
 
668
            "Unexpected success. Should have failed",
 
669
            reason,
 
670
            ))
 
671
 
 
672
    def report_skip(self, test, reason):
 
673
        pass
 
674
 
 
675
    def report_not_applicable(self, test, reason):
 
676
        pass
 
677
 
 
678
    def report_unsupported(self, test, feature):
 
679
        """test cannot be run because feature is missing."""
 
680
 
 
681
 
 
682
class VerboseTestResult(ExtendedTestResult):
 
683
    """Produce long output, with one line per test run plus times"""
 
684
 
 
685
    def _ellipsize_to_right(self, a_string, final_width):
 
686
        """Truncate and pad a string, keeping the right hand side"""
 
687
        if len(a_string) > final_width:
 
688
            result = '...' + a_string[3-final_width:]
 
689
        else:
 
690
            result = a_string
 
691
        return result.ljust(final_width)
 
692
 
 
693
    def report_tests_starting(self):
 
694
        self.stream.write('running %d tests...\n' % self.num_tests)
 
695
        super(VerboseTestResult, self).report_tests_starting()
 
696
 
 
697
    def report_test_start(self, test):
 
698
        name = self._shortened_test_description(test)
 
699
        width = osutils.terminal_width()
 
700
        if width is not None:
 
701
            # width needs space for 6 char status, plus 1 for slash, plus an
 
702
            # 11-char time string, plus a trailing blank
 
703
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
704
            # space
 
705
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
706
        else:
 
707
            self.stream.write(name)
 
708
        self.stream.flush()
 
709
 
 
710
    def _error_summary(self, err):
 
711
        indent = ' ' * 4
 
712
        return '%s%s' % (indent, err[1])
 
713
 
 
714
    def report_error(self, test, err):
 
715
        self.stream.write('ERROR %s\n%s\n'
 
716
                % (self._testTimeString(test),
 
717
                   self._error_summary(err)))
 
718
 
 
719
    def report_failure(self, test, err):
 
720
        self.stream.write(' FAIL %s\n%s\n'
 
721
                % (self._testTimeString(test),
 
722
                   self._error_summary(err)))
 
723
 
 
724
    def report_known_failure(self, test, err):
 
725
        self.stream.write('XFAIL %s\n%s\n'
 
726
                % (self._testTimeString(test),
 
727
                   self._error_summary(err)))
 
728
 
 
729
    def report_unexpected_success(self, test, reason):
 
730
        self.stream.write(' FAIL %s\n%s: %s\n'
 
731
                % (self._testTimeString(test),
 
732
                   "Unexpected success. Should have failed",
 
733
                   reason))
 
734
 
 
735
    def report_success(self, test):
 
736
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
737
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
738
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
739
            stats.pprint(file=self.stream)
 
740
        # flush the stream so that we get smooth output. This verbose mode is
 
741
        # used to show the output in PQM.
 
742
        self.stream.flush()
 
743
 
 
744
    def report_skip(self, test, reason):
 
745
        self.stream.write(' SKIP %s\n%s\n'
 
746
                % (self._testTimeString(test), reason))
 
747
 
 
748
    def report_not_applicable(self, test, reason):
 
749
        self.stream.write('  N/A %s\n    %s\n'
 
750
                % (self._testTimeString(test), reason))
 
751
 
 
752
    def report_unsupported(self, test, feature):
 
753
        """test cannot be run because feature is missing."""
 
754
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
755
                %(self._testTimeString(test), feature))
 
756
 
 
757
 
 
758
class TextTestRunner(object):
 
759
    stop_on_failure = False
 
760
 
 
761
    def __init__(self,
 
762
                 stream=sys.stderr,
 
763
                 descriptions=0,
 
764
                 verbosity=1,
 
765
                 bench_history=None,
 
766
                 strict=False,
 
767
                 result_decorators=None,
 
768
                 ):
 
769
        """Create a TextTestRunner.
 
770
 
 
771
        :param result_decorators: An optional list of decorators to apply
 
772
            to the result object being used by the runner. Decorators are
 
773
            applied left to right - the first element in the list is the 
 
774
            innermost decorator.
 
775
        """
 
776
        # stream may know claim to know to write unicode strings, but in older
 
777
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
778
        # specifically a built in file with encoding 'UTF-8' will still try
 
779
        # to encode using ascii.
 
780
        new_encoding = osutils.get_terminal_encoding()
 
781
        codec = codecs.lookup(new_encoding)
 
782
        if type(codec) is tuple:
 
783
            # Python 2.4
 
784
            encode = codec[0]
 
785
        else:
 
786
            encode = codec.encode
 
787
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
788
        #                so should swap to the plain codecs.StreamWriter
 
789
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
790
            "backslashreplace")
 
791
        stream.encoding = new_encoding
 
792
        self.stream = stream
 
793
        self.descriptions = descriptions
 
794
        self.verbosity = verbosity
 
795
        self._bench_history = bench_history
 
796
        self._strict = strict
 
797
        self._result_decorators = result_decorators or []
 
798
 
 
799
    def run(self, test):
 
800
        "Run the given test case or test suite."
 
801
        if self.verbosity == 1:
 
802
            result_class = TextTestResult
 
803
        elif self.verbosity >= 2:
 
804
            result_class = VerboseTestResult
 
805
        original_result = result_class(self.stream,
 
806
                              self.descriptions,
 
807
                              self.verbosity,
 
808
                              bench_history=self._bench_history,
 
809
                              strict=self._strict,
 
810
                              )
 
811
        # Signal to result objects that look at stop early policy to stop,
 
812
        original_result.stop_early = self.stop_on_failure
 
813
        result = original_result
 
814
        for decorator in self._result_decorators:
 
815
            result = decorator(result)
 
816
            result.stop_early = self.stop_on_failure
 
817
        result.startTestRun()
 
818
        try:
 
819
            test.run(result)
 
820
        finally:
 
821
            result.stopTestRun()
 
822
        # higher level code uses our extended protocol to determine
 
823
        # what exit code to give.
 
824
        return original_result
 
825
 
 
826
 
 
827
def iter_suite_tests(suite):
 
828
    """Return all tests in a suite, recursing through nested suites"""
 
829
    if isinstance(suite, unittest.TestCase):
 
830
        yield suite
 
831
    elif isinstance(suite, unittest.TestSuite):
 
832
        for item in suite:
 
833
            for r in iter_suite_tests(item):
 
834
                yield r
 
835
    else:
 
836
        raise Exception('unknown type %r for object %r'
 
837
                        % (type(suite), suite))
 
838
 
 
839
 
 
840
TestSkipped = testtools.testcase.TestSkipped
 
841
 
 
842
 
 
843
class TestNotApplicable(TestSkipped):
 
844
    """A test is not applicable to the situation where it was run.
 
845
 
 
846
    This is only normally raised by parameterized tests, if they find that
 
847
    the instance they're constructed upon does not support one aspect
 
848
    of its interface.
 
849
    """
 
850
 
 
851
 
 
852
# traceback._some_str fails to format exceptions that have the default
 
853
# __str__ which does an implicit ascii conversion. However, repr() on those
 
854
# objects works, for all that its not quite what the doctor may have ordered.
 
855
def _clever_some_str(value):
 
856
    try:
 
857
        return str(value)
 
858
    except:
 
859
        try:
 
860
            return repr(value).replace('\\n', '\n')
 
861
        except:
 
862
            return '<unprintable %s object>' % type(value).__name__
 
863
 
 
864
traceback._some_str = _clever_some_str
 
865
 
 
866
 
 
867
# deprecated - use self.knownFailure(), or self.expectFailure.
 
868
KnownFailure = testtools.testcase._ExpectedFailure
 
869
 
 
870
 
 
871
class UnavailableFeature(Exception):
 
872
    """A feature required for this test was not available.
 
873
 
 
874
    This can be considered a specialised form of SkippedTest.
 
875
 
 
876
    The feature should be used to construct the exception.
 
877
    """
 
878
 
 
879
 
 
880
class StringIOWrapper(object):
 
881
    """A wrapper around cStringIO which just adds an encoding attribute.
 
882
 
 
883
    Internally we can check sys.stdout to see what the output encoding
 
884
    should be. However, cStringIO has no encoding attribute that we can
 
885
    set. So we wrap it instead.
 
886
    """
 
887
    encoding='ascii'
 
888
    _cstring = None
 
889
 
 
890
    def __init__(self, s=None):
 
891
        if s is not None:
 
892
            self.__dict__['_cstring'] = StringIO(s)
 
893
        else:
 
894
            self.__dict__['_cstring'] = StringIO()
 
895
 
 
896
    def __getattr__(self, name, getattr=getattr):
 
897
        return getattr(self.__dict__['_cstring'], name)
 
898
 
 
899
    def __setattr__(self, name, val):
 
900
        if name == 'encoding':
 
901
            self.__dict__['encoding'] = val
 
902
        else:
 
903
            return setattr(self._cstring, name, val)
 
904
 
 
905
 
 
906
class TestUIFactory(TextUIFactory):
 
907
    """A UI Factory for testing.
 
908
 
 
909
    Hide the progress bar but emit note()s.
 
910
    Redirect stdin.
 
911
    Allows get_password to be tested without real tty attached.
 
912
 
 
913
    See also CannedInputUIFactory which lets you provide programmatic input in
 
914
    a structured way.
 
915
    """
 
916
    # TODO: Capture progress events at the model level and allow them to be
 
917
    # observed by tests that care.
 
918
    #
 
919
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
920
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
921
    # idea of how they're supposed to be different.
 
922
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
923
 
 
924
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
925
        if stdin is not None:
 
926
            # We use a StringIOWrapper to be able to test various
 
927
            # encodings, but the user is still responsible to
 
928
            # encode the string and to set the encoding attribute
 
929
            # of StringIOWrapper.
 
930
            stdin = StringIOWrapper(stdin)
 
931
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
932
 
 
933
    def get_non_echoed_password(self):
 
934
        """Get password from stdin without trying to handle the echo mode"""
 
935
        password = self.stdin.readline()
 
936
        if not password:
 
937
            raise EOFError
 
938
        if password[-1] == '\n':
 
939
            password = password[:-1]
 
940
        return password
 
941
 
 
942
    def make_progress_view(self):
 
943
        return NullProgressView()
 
944
 
 
945
 
 
946
def isolated_doctest_setUp(test):
 
947
    override_os_environ(test)
 
948
 
 
949
 
 
950
def isolated_doctest_tearDown(test):
 
951
    restore_os_environ(test)
 
952
 
 
953
 
 
954
def IsolatedDocTestSuite(*args, **kwargs):
 
955
    """Overrides doctest.DocTestSuite to handle isolation.
 
956
 
 
957
    The method is really a factory and users are expected to use it as such.
 
958
    """
 
959
 
 
960
    kwargs['setUp'] = isolated_doctest_setUp
 
961
    kwargs['tearDown'] = isolated_doctest_tearDown
 
962
    return doctest.DocTestSuite(*args, **kwargs)
 
963
 
 
964
 
 
965
class TestCase(testtools.TestCase):
 
966
    """Base class for bzr unit tests.
 
967
 
 
968
    Tests that need access to disk resources should subclass
 
969
    TestCaseInTempDir not TestCase.
 
970
 
 
971
    Error and debug log messages are redirected from their usual
 
972
    location into a temporary file, the contents of which can be
 
973
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
974
    so that it can also capture file IO.  When the test completes this file
 
975
    is read into memory and removed from disk.
 
976
 
 
977
    There are also convenience functions to invoke bzr's command-line
 
978
    routine, and to build and check bzr trees.
 
979
 
 
980
    In addition to the usual method of overriding tearDown(), this class also
 
981
    allows subclasses to register cleanup functions via addCleanup, which are
 
982
    run in order as the object is torn down.  It's less likely this will be
 
983
    accidentally overlooked.
 
984
    """
 
985
 
 
986
    _log_file = None
 
987
    # record lsprof data when performing benchmark calls.
 
988
    _gather_lsprof_in_benchmarks = False
 
989
 
 
990
    def __init__(self, methodName='testMethod'):
 
991
        super(TestCase, self).__init__(methodName)
 
992
        self._directory_isolation = True
 
993
        self.exception_handlers.insert(0,
 
994
            (UnavailableFeature, self._do_unsupported_or_skip))
 
995
        self.exception_handlers.insert(0,
 
996
            (TestNotApplicable, self._do_not_applicable))
 
997
 
 
998
    def setUp(self):
 
999
        super(TestCase, self).setUp()
 
1000
 
 
1001
        timeout = config.GlobalStack().get('selftest.timeout')
 
1002
        if timeout:
 
1003
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1004
            timeout_fixture.setUp()
 
1005
            self.addCleanup(timeout_fixture.cleanUp)
 
1006
 
 
1007
        for feature in getattr(self, '_test_needs_features', []):
 
1008
            self.requireFeature(feature)
 
1009
        self._cleanEnvironment()
 
1010
 
 
1011
        if bzrlib.global_state is not None:
 
1012
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1013
                              config.CommandLineStore())
 
1014
 
 
1015
        self._silenceUI()
 
1016
        self._startLogFile()
 
1017
        self._benchcalls = []
 
1018
        self._benchtime = None
 
1019
        self._clear_hooks()
 
1020
        self._track_transports()
 
1021
        self._track_locks()
 
1022
        self._clear_debug_flags()
 
1023
        # Isolate global verbosity level, to make sure it's reproducible
 
1024
        # between tests.  We should get rid of this altogether: bug 656694. --
 
1025
        # mbp 20101008
 
1026
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
 
1027
        # Isolate config option expansion until its default value for bzrlib is
 
1028
        # settled on or a the FIXME associated with _get_expand_default_value
 
1029
        # is addressed -- vila 20110219
 
1030
        self.overrideAttr(config, '_expand_default_value', None)
 
1031
        self._log_files = set()
 
1032
        # Each key in the ``_counters`` dict holds a value for a different
 
1033
        # counter. When the test ends, addDetail() should be used to output the
 
1034
        # counter values. This happens in install_counter_hook().
 
1035
        self._counters = {}
 
1036
        if 'config_stats' in selftest_debug_flags:
 
1037
            self._install_config_stats_hooks()
 
1038
        # Do not use i18n for tests (unless the test reverses this)
 
1039
        i18n.disable_i18n()
 
1040
 
 
1041
    def debug(self):
 
1042
        # debug a frame up.
 
1043
        import pdb
 
1044
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1045
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1046
                ).set_trace(sys._getframe().f_back)
 
1047
 
 
1048
    def discardDetail(self, name):
 
1049
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1050
 
 
1051
        eg. bzr always adds the 'log' detail at startup, but we don't want to
 
1052
        include it for skipped, xfail, etc tests.
 
1053
 
 
1054
        It is safe to call this for a detail that doesn't exist, in case this
 
1055
        gets called multiple times.
 
1056
        """
 
1057
        # We cheat. details is stored in __details which means we shouldn't
 
1058
        # touch it. but getDetails() returns the dict directly, so we can
 
1059
        # mutate it.
 
1060
        details = self.getDetails()
 
1061
        if name in details:
 
1062
            del details[name]
 
1063
 
 
1064
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1065
        """Install a counting hook.
 
1066
 
 
1067
        Any hook can be counted as long as it doesn't need to return a value.
 
1068
 
 
1069
        :param hooks: Where the hook should be installed.
 
1070
 
 
1071
        :param name: The hook name that will be counted.
 
1072
 
 
1073
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1074
            to ``name``.
 
1075
        """
 
1076
        _counters = self._counters # Avoid closing over self
 
1077
        if counter_name is None:
 
1078
            counter_name = name
 
1079
        if _counters.has_key(counter_name):
 
1080
            raise AssertionError('%s is already used as a counter name'
 
1081
                                  % (counter_name,))
 
1082
        _counters[counter_name] = 0
 
1083
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1084
            lambda: ['%d' % (_counters[counter_name],)]))
 
1085
        def increment_counter(*args, **kwargs):
 
1086
            _counters[counter_name] += 1
 
1087
        label = 'count %s calls' % (counter_name,)
 
1088
        hooks.install_named_hook(name, increment_counter, label)
 
1089
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1090
 
 
1091
    def _install_config_stats_hooks(self):
 
1092
        """Install config hooks to count hook calls.
 
1093
 
 
1094
        """
 
1095
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1096
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1097
                                       'config.%s' % (hook_name,))
 
1098
 
 
1099
        # The OldConfigHooks are private and need special handling to protect
 
1100
        # against recursive tests (tests that run other tests), so we just do
 
1101
        # manually what registering them into _builtin_known_hooks will provide
 
1102
        # us.
 
1103
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1104
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1105
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1106
                                      'old_config.%s' % (hook_name,))
 
1107
 
 
1108
    def _clear_debug_flags(self):
 
1109
        """Prevent externally set debug flags affecting tests.
 
1110
 
 
1111
        Tests that want to use debug flags can just set them in the
 
1112
        debug_flags set during setup/teardown.
 
1113
        """
 
1114
        # Start with a copy of the current debug flags we can safely modify.
 
1115
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1116
        if 'allow_debug' not in selftest_debug_flags:
 
1117
            debug.debug_flags.clear()
 
1118
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1119
            debug.debug_flags.add('strict_locks')
 
1120
 
 
1121
    def _clear_hooks(self):
 
1122
        # prevent hooks affecting tests
 
1123
        known_hooks = hooks.known_hooks
 
1124
        self._preserved_hooks = {}
 
1125
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1126
            current_hooks = getattr(parent, name)
 
1127
            self._preserved_hooks[parent] = (name, current_hooks)
 
1128
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1129
        hooks._lazy_hooks = {}
 
1130
        self.addCleanup(self._restoreHooks)
 
1131
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1132
            factory = known_hooks.get(key)
 
1133
            setattr(parent, name, factory())
 
1134
        # this hook should always be installed
 
1135
        request._install_hook()
 
1136
 
 
1137
    def disable_directory_isolation(self):
 
1138
        """Turn off directory isolation checks."""
 
1139
        self._directory_isolation = False
 
1140
 
 
1141
    def enable_directory_isolation(self):
 
1142
        """Enable directory isolation checks."""
 
1143
        self._directory_isolation = True
 
1144
 
 
1145
    def _silenceUI(self):
 
1146
        """Turn off UI for duration of test"""
 
1147
        # by default the UI is off; tests can turn it on if they want it.
 
1148
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1149
 
 
1150
    def _check_locks(self):
 
1151
        """Check that all lock take/release actions have been paired."""
 
1152
        # We always check for mismatched locks. If a mismatch is found, we
 
1153
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1154
        # case we just print a warning.
 
1155
        # unhook:
 
1156
        acquired_locks = [lock for action, lock in self._lock_actions
 
1157
                          if action == 'acquired']
 
1158
        released_locks = [lock for action, lock in self._lock_actions
 
1159
                          if action == 'released']
 
1160
        broken_locks = [lock for action, lock in self._lock_actions
 
1161
                        if action == 'broken']
 
1162
        # trivially, given the tests for lock acquistion and release, if we
 
1163
        # have as many in each list, it should be ok. Some lock tests also
 
1164
        # break some locks on purpose and should be taken into account by
 
1165
        # considering that breaking a lock is just a dirty way of releasing it.
 
1166
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1167
            message = (
 
1168
                'Different number of acquired and '
 
1169
                'released or broken locks.\n'
 
1170
                'acquired=%s\n'
 
1171
                'released=%s\n'
 
1172
                'broken=%s\n' %
 
1173
                (acquired_locks, released_locks, broken_locks))
 
1174
            if not self._lock_check_thorough:
 
1175
                # Rather than fail, just warn
 
1176
                print "Broken test %s: %s" % (self, message)
 
1177
                return
 
1178
            self.fail(message)
 
1179
 
 
1180
    def _track_locks(self):
 
1181
        """Track lock activity during tests."""
 
1182
        self._lock_actions = []
 
1183
        if 'disable_lock_checks' in selftest_debug_flags:
 
1184
            self._lock_check_thorough = False
 
1185
        else:
 
1186
            self._lock_check_thorough = True
 
1187
 
 
1188
        self.addCleanup(self._check_locks)
 
1189
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1190
                                                self._lock_acquired, None)
 
1191
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1192
                                                self._lock_released, None)
 
1193
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1194
                                                self._lock_broken, None)
 
1195
 
 
1196
    def _lock_acquired(self, result):
 
1197
        self._lock_actions.append(('acquired', result))
 
1198
 
 
1199
    def _lock_released(self, result):
 
1200
        self._lock_actions.append(('released', result))
 
1201
 
 
1202
    def _lock_broken(self, result):
 
1203
        self._lock_actions.append(('broken', result))
 
1204
 
 
1205
    def permit_dir(self, name):
 
1206
        """Permit a directory to be used by this test. See permit_url."""
 
1207
        name_transport = _mod_transport.get_transport_from_path(name)
 
1208
        self.permit_url(name)
 
1209
        self.permit_url(name_transport.base)
 
1210
 
 
1211
    def permit_url(self, url):
 
1212
        """Declare that url is an ok url to use in this test.
 
1213
        
 
1214
        Do this for memory transports, temporary test directory etc.
 
1215
        
 
1216
        Do not do this for the current working directory, /tmp, or any other
 
1217
        preexisting non isolated url.
 
1218
        """
 
1219
        if not url.endswith('/'):
 
1220
            url += '/'
 
1221
        self._bzr_selftest_roots.append(url)
 
1222
 
 
1223
    def permit_source_tree_branch_repo(self):
 
1224
        """Permit the source tree bzr is running from to be opened.
 
1225
 
 
1226
        Some code such as bzrlib.version attempts to read from the bzr branch
 
1227
        that bzr is executing from (if any). This method permits that directory
 
1228
        to be used in the test suite.
 
1229
        """
 
1230
        path = self.get_source_path()
 
1231
        self.record_directory_isolation()
 
1232
        try:
 
1233
            try:
 
1234
                workingtree.WorkingTree.open(path)
 
1235
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1236
                raise TestSkipped('Needs a working tree of bzr sources')
 
1237
        finally:
 
1238
            self.enable_directory_isolation()
 
1239
 
 
1240
    def _preopen_isolate_transport(self, transport):
 
1241
        """Check that all transport openings are done in the test work area."""
 
1242
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1243
            # Unwrap pathfiltered transports
 
1244
            transport = transport.server.backing_transport.clone(
 
1245
                transport._filter('.'))
 
1246
        url = transport.base
 
1247
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1248
        # urls it is given by prepending readonly+. This is appropriate as the
 
1249
        # client shouldn't know that the server is readonly (or not readonly).
 
1250
        # We could register all servers twice, with readonly+ prepending, but
 
1251
        # that makes for a long list; this is about the same but easier to
 
1252
        # read.
 
1253
        if url.startswith('readonly+'):
 
1254
            url = url[len('readonly+'):]
 
1255
        self._preopen_isolate_url(url)
 
1256
 
 
1257
    def _preopen_isolate_url(self, url):
 
1258
        if not self._directory_isolation:
 
1259
            return
 
1260
        if self._directory_isolation == 'record':
 
1261
            self._bzr_selftest_roots.append(url)
 
1262
            return
 
1263
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1264
        # from working unless they are explicitly granted permission. We then
 
1265
        # depend on the code that sets up test transports to check that they are
 
1266
        # appropriately isolated and enable their use by calling
 
1267
        # self.permit_transport()
 
1268
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1269
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1270
                % (url, self._bzr_selftest_roots))
 
1271
 
 
1272
    def record_directory_isolation(self):
 
1273
        """Gather accessed directories to permit later access.
 
1274
        
 
1275
        This is used for tests that access the branch bzr is running from.
 
1276
        """
 
1277
        self._directory_isolation = "record"
 
1278
 
 
1279
    def start_server(self, transport_server, backing_server=None):
 
1280
        """Start transport_server for this test.
 
1281
 
 
1282
        This starts the server, registers a cleanup for it and permits the
 
1283
        server's urls to be used.
 
1284
        """
 
1285
        if backing_server is None:
 
1286
            transport_server.start_server()
 
1287
        else:
 
1288
            transport_server.start_server(backing_server)
 
1289
        self.addCleanup(transport_server.stop_server)
 
1290
        # Obtain a real transport because if the server supplies a password, it
 
1291
        # will be hidden from the base on the client side.
 
1292
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1293
        # Some transport servers effectively chroot the backing transport;
 
1294
        # others like SFTPServer don't - users of the transport can walk up the
 
1295
        # transport to read the entire backing transport. This wouldn't matter
 
1296
        # except that the workdir tests are given - and that they expect the
 
1297
        # server's url to point at - is one directory under the safety net. So
 
1298
        # Branch operations into the transport will attempt to walk up one
 
1299
        # directory. Chrooting all servers would avoid this but also mean that
 
1300
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1301
        # getting the test framework to start the server with a backing server
 
1302
        # at the actual safety net directory would work too, but this then
 
1303
        # means that the self.get_url/self.get_transport methods would need
 
1304
        # to transform all their results. On balance its cleaner to handle it
 
1305
        # here, and permit a higher url when we have one of these transports.
 
1306
        if t.base.endswith('/work/'):
 
1307
            # we have safety net/test root/work
 
1308
            t = t.clone('../..')
 
1309
        elif isinstance(transport_server,
 
1310
                        test_server.SmartTCPServer_for_testing):
 
1311
            # The smart server adds a path similar to work, which is traversed
 
1312
            # up from by the client. But the server is chrooted - the actual
 
1313
            # backing transport is not escaped from, and VFS requests to the
 
1314
            # root will error (because they try to escape the chroot).
 
1315
            t2 = t.clone('..')
 
1316
            while t2.base != t.base:
 
1317
                t = t2
 
1318
                t2 = t.clone('..')
 
1319
        self.permit_url(t.base)
 
1320
 
 
1321
    def _track_transports(self):
 
1322
        """Install checks for transport usage."""
 
1323
        # TestCase has no safe place it can write to.
 
1324
        self._bzr_selftest_roots = []
 
1325
        # Currently the easiest way to be sure that nothing is going on is to
 
1326
        # hook into bzr dir opening. This leaves a small window of error for
 
1327
        # transport tests, but they are well known, and we can improve on this
 
1328
        # step.
 
1329
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1330
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1331
 
 
1332
    def _ndiff_strings(self, a, b):
 
1333
        """Return ndiff between two strings containing lines.
 
1334
 
 
1335
        A trailing newline is added if missing to make the strings
 
1336
        print properly."""
 
1337
        if b and b[-1] != '\n':
 
1338
            b += '\n'
 
1339
        if a and a[-1] != '\n':
 
1340
            a += '\n'
 
1341
        difflines = difflib.ndiff(a.splitlines(True),
 
1342
                                  b.splitlines(True),
 
1343
                                  linejunk=lambda x: False,
 
1344
                                  charjunk=lambda x: False)
 
1345
        return ''.join(difflines)
 
1346
 
 
1347
    def assertEqual(self, a, b, message=''):
 
1348
        try:
 
1349
            if a == b:
 
1350
                return
 
1351
        except UnicodeError, e:
 
1352
            # If we can't compare without getting a UnicodeError, then
 
1353
            # obviously they are different
 
1354
            trace.mutter('UnicodeError: %s', e)
 
1355
        if message:
 
1356
            message += '\n'
 
1357
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1358
            % (message,
 
1359
               pprint.pformat(a), pprint.pformat(b)))
 
1360
 
 
1361
    assertEquals = assertEqual
 
1362
 
 
1363
    def assertEqualDiff(self, a, b, message=None):
 
1364
        """Assert two texts are equal, if not raise an exception.
 
1365
 
 
1366
        This is intended for use with multi-line strings where it can
 
1367
        be hard to find the differences by eye.
 
1368
        """
 
1369
        # TODO: perhaps override assertEquals to call this for strings?
 
1370
        if a == b:
 
1371
            return
 
1372
        if message is None:
 
1373
            message = "texts not equal:\n"
 
1374
        if a + '\n' == b:
 
1375
            message = 'first string is missing a final newline.\n'
 
1376
        if a == b + '\n':
 
1377
            message = 'second string is missing a final newline.\n'
 
1378
        raise AssertionError(message +
 
1379
                             self._ndiff_strings(a, b))
 
1380
 
 
1381
    def assertEqualMode(self, mode, mode_test):
 
1382
        self.assertEqual(mode, mode_test,
 
1383
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1384
 
 
1385
    def assertEqualStat(self, expected, actual):
 
1386
        """assert that expected and actual are the same stat result.
 
1387
 
 
1388
        :param expected: A stat result.
 
1389
        :param actual: A stat result.
 
1390
        :raises AssertionError: If the expected and actual stat values differ
 
1391
            other than by atime.
 
1392
        """
 
1393
        self.assertEqual(expected.st_size, actual.st_size,
 
1394
                         'st_size did not match')
 
1395
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1396
                         'st_mtime did not match')
 
1397
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1398
                         'st_ctime did not match')
 
1399
        if sys.platform == 'win32':
 
1400
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1401
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1402
            # odd. We just force it to always be 0 to avoid any problems.
 
1403
            self.assertEqual(0, expected.st_dev)
 
1404
            self.assertEqual(0, actual.st_dev)
 
1405
            self.assertEqual(0, expected.st_ino)
 
1406
            self.assertEqual(0, actual.st_ino)
 
1407
        else:
 
1408
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1409
                             'st_dev did not match')
 
1410
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1411
                             'st_ino did not match')
 
1412
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1413
                         'st_mode did not match')
 
1414
 
 
1415
    def assertLength(self, length, obj_with_len):
 
1416
        """Assert that obj_with_len is of length length."""
 
1417
        if len(obj_with_len) != length:
 
1418
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1419
                length, len(obj_with_len), obj_with_len))
 
1420
 
 
1421
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1422
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1423
        """
 
1424
        captured = []
 
1425
        orig_log_exception_quietly = trace.log_exception_quietly
 
1426
        try:
 
1427
            def capture():
 
1428
                orig_log_exception_quietly()
 
1429
                captured.append(sys.exc_info()[1])
 
1430
            trace.log_exception_quietly = capture
 
1431
            func(*args, **kwargs)
 
1432
        finally:
 
1433
            trace.log_exception_quietly = orig_log_exception_quietly
 
1434
        self.assertLength(1, captured)
 
1435
        err = captured[0]
 
1436
        self.assertIsInstance(err, exception_class)
 
1437
        return err
 
1438
 
 
1439
    def assertPositive(self, val):
 
1440
        """Assert that val is greater than 0."""
 
1441
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1442
 
 
1443
    def assertNegative(self, val):
 
1444
        """Assert that val is less than 0."""
 
1445
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1446
 
 
1447
    def assertStartsWith(self, s, prefix):
 
1448
        if not s.startswith(prefix):
 
1449
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1450
 
 
1451
    def assertEndsWith(self, s, suffix):
 
1452
        """Asserts that s ends with suffix."""
 
1453
        if not s.endswith(suffix):
 
1454
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1455
 
 
1456
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1457
        """Assert that a contains something matching a regular expression."""
 
1458
        if not re.search(needle_re, haystack, flags):
 
1459
            if '\n' in haystack or len(haystack) > 60:
 
1460
                # a long string, format it in a more readable way
 
1461
                raise AssertionError(
 
1462
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1463
                        % (needle_re, haystack))
 
1464
            else:
 
1465
                raise AssertionError('pattern "%s" not found in "%s"'
 
1466
                        % (needle_re, haystack))
 
1467
 
 
1468
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1469
        """Assert that a does not match a regular expression"""
 
1470
        if re.search(needle_re, haystack, flags):
 
1471
            raise AssertionError('pattern "%s" found in "%s"'
 
1472
                    % (needle_re, haystack))
 
1473
 
 
1474
    def assertContainsString(self, haystack, needle):
 
1475
        if haystack.find(needle) == -1:
 
1476
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1477
 
 
1478
    def assertNotContainsString(self, haystack, needle):
 
1479
        if haystack.find(needle) != -1:
 
1480
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1481
 
 
1482
    def assertSubset(self, sublist, superlist):
 
1483
        """Assert that every entry in sublist is present in superlist."""
 
1484
        missing = set(sublist) - set(superlist)
 
1485
        if len(missing) > 0:
 
1486
            raise AssertionError("value(s) %r not present in container %r" %
 
1487
                                 (missing, superlist))
 
1488
 
 
1489
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1490
        """Fail unless excClass is raised when the iterator from func is used.
 
1491
 
 
1492
        Many functions can return generators this makes sure
 
1493
        to wrap them in a list() call to make sure the whole generator
 
1494
        is run, and that the proper exception is raised.
 
1495
        """
 
1496
        try:
 
1497
            list(func(*args, **kwargs))
 
1498
        except excClass, e:
 
1499
            return e
 
1500
        else:
 
1501
            if getattr(excClass,'__name__', None) is not None:
 
1502
                excName = excClass.__name__
 
1503
            else:
 
1504
                excName = str(excClass)
 
1505
            raise self.failureException, "%s not raised" % excName
 
1506
 
 
1507
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1508
        """Assert that a callable raises a particular exception.
 
1509
 
 
1510
        :param excClass: As for the except statement, this may be either an
 
1511
            exception class, or a tuple of classes.
 
1512
        :param callableObj: A callable, will be passed ``*args`` and
 
1513
            ``**kwargs``.
 
1514
 
 
1515
        Returns the exception so that you can examine it.
 
1516
        """
 
1517
        try:
 
1518
            callableObj(*args, **kwargs)
 
1519
        except excClass, e:
 
1520
            return e
 
1521
        else:
 
1522
            if getattr(excClass,'__name__', None) is not None:
 
1523
                excName = excClass.__name__
 
1524
            else:
 
1525
                # probably a tuple
 
1526
                excName = str(excClass)
 
1527
            raise self.failureException, "%s not raised" % excName
 
1528
 
 
1529
    def assertIs(self, left, right, message=None):
 
1530
        if not (left is right):
 
1531
            if message is not None:
 
1532
                raise AssertionError(message)
 
1533
            else:
 
1534
                raise AssertionError("%r is not %r." % (left, right))
 
1535
 
 
1536
    def assertIsNot(self, left, right, message=None):
 
1537
        if (left is right):
 
1538
            if message is not None:
 
1539
                raise AssertionError(message)
 
1540
            else:
 
1541
                raise AssertionError("%r is %r." % (left, right))
 
1542
 
 
1543
    def assertTransportMode(self, transport, path, mode):
 
1544
        """Fail if a path does not have mode "mode".
 
1545
 
 
1546
        If modes are not supported on this transport, the assertion is ignored.
 
1547
        """
 
1548
        if not transport._can_roundtrip_unix_modebits():
 
1549
            return
 
1550
        path_stat = transport.stat(path)
 
1551
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1552
        self.assertEqual(mode, actual_mode,
 
1553
                         'mode of %r incorrect (%s != %s)'
 
1554
                         % (path, oct(mode), oct(actual_mode)))
 
1555
 
 
1556
    def assertIsSameRealPath(self, path1, path2):
 
1557
        """Fail if path1 and path2 points to different files"""
 
1558
        self.assertEqual(osutils.realpath(path1),
 
1559
                         osutils.realpath(path2),
 
1560
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1561
 
 
1562
    def assertIsInstance(self, obj, kls, msg=None):
 
1563
        """Fail if obj is not an instance of kls
 
1564
        
 
1565
        :param msg: Supplementary message to show if the assertion fails.
 
1566
        """
 
1567
        if not isinstance(obj, kls):
 
1568
            m = "%r is an instance of %s rather than %s" % (
 
1569
                obj, obj.__class__, kls)
 
1570
            if msg:
 
1571
                m += ": " + msg
 
1572
            self.fail(m)
 
1573
 
 
1574
    def assertFileEqual(self, content, path):
 
1575
        """Fail if path does not contain 'content'."""
 
1576
        self.assertPathExists(path)
 
1577
        f = file(path, 'rb')
 
1578
        try:
 
1579
            s = f.read()
 
1580
        finally:
 
1581
            f.close()
 
1582
        self.assertEqualDiff(content, s)
 
1583
 
 
1584
    def assertDocstring(self, expected_docstring, obj):
 
1585
        """Fail if obj does not have expected_docstring"""
 
1586
        if __doc__ is None:
 
1587
            # With -OO the docstring should be None instead
 
1588
            self.assertIs(obj.__doc__, None)
 
1589
        else:
 
1590
            self.assertEqual(expected_docstring, obj.__doc__)
 
1591
 
 
1592
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1593
    def failUnlessExists(self, path):
 
1594
        return self.assertPathExists(path)
 
1595
 
 
1596
    def assertPathExists(self, path):
 
1597
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1598
        if not isinstance(path, basestring):
 
1599
            for p in path:
 
1600
                self.assertPathExists(p)
 
1601
        else:
 
1602
            self.assertTrue(osutils.lexists(path),
 
1603
                path + " does not exist")
 
1604
 
 
1605
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1606
    def failIfExists(self, path):
 
1607
        return self.assertPathDoesNotExist(path)
 
1608
 
 
1609
    def assertPathDoesNotExist(self, path):
 
1610
        """Fail if path or paths, which may be abs or relative, exist."""
 
1611
        if not isinstance(path, basestring):
 
1612
            for p in path:
 
1613
                self.assertPathDoesNotExist(p)
 
1614
        else:
 
1615
            self.assertFalse(osutils.lexists(path),
 
1616
                path + " exists")
 
1617
 
 
1618
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1619
        """A helper for callDeprecated and applyDeprecated.
 
1620
 
 
1621
        :param a_callable: A callable to call.
 
1622
        :param args: The positional arguments for the callable
 
1623
        :param kwargs: The keyword arguments for the callable
 
1624
        :return: A tuple (warnings, result). result is the result of calling
 
1625
            a_callable(``*args``, ``**kwargs``).
 
1626
        """
 
1627
        local_warnings = []
 
1628
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1629
            # we've hooked into a deprecation specific callpath,
 
1630
            # only deprecations should getting sent via it.
 
1631
            self.assertEqual(cls, DeprecationWarning)
 
1632
            local_warnings.append(msg)
 
1633
        original_warning_method = symbol_versioning.warn
 
1634
        symbol_versioning.set_warning_method(capture_warnings)
 
1635
        try:
 
1636
            result = a_callable(*args, **kwargs)
 
1637
        finally:
 
1638
            symbol_versioning.set_warning_method(original_warning_method)
 
1639
        return (local_warnings, result)
 
1640
 
 
1641
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1642
        """Call a deprecated callable without warning the user.
 
1643
 
 
1644
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1645
        not other callers that go direct to the warning module.
 
1646
 
 
1647
        To test that a deprecated method raises an error, do something like
 
1648
        this (remember that both assertRaises and applyDeprecated delays *args
 
1649
        and **kwargs passing)::
 
1650
 
 
1651
            self.assertRaises(errors.ReservedId,
 
1652
                self.applyDeprecated,
 
1653
                deprecated_in((1, 5, 0)),
 
1654
                br.append_revision,
 
1655
                'current:')
 
1656
 
 
1657
        :param deprecation_format: The deprecation format that the callable
 
1658
            should have been deprecated with. This is the same type as the
 
1659
            parameter to deprecated_method/deprecated_function. If the
 
1660
            callable is not deprecated with this format, an assertion error
 
1661
            will be raised.
 
1662
        :param a_callable: A callable to call. This may be a bound method or
 
1663
            a regular function. It will be called with ``*args`` and
 
1664
            ``**kwargs``.
 
1665
        :param args: The positional arguments for the callable
 
1666
        :param kwargs: The keyword arguments for the callable
 
1667
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1668
        """
 
1669
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1670
            *args, **kwargs)
 
1671
        expected_first_warning = symbol_versioning.deprecation_string(
 
1672
            a_callable, deprecation_format)
 
1673
        if len(call_warnings) == 0:
 
1674
            self.fail("No deprecation warning generated by call to %s" %
 
1675
                a_callable)
 
1676
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1677
        return result
 
1678
 
 
1679
    def callCatchWarnings(self, fn, *args, **kw):
 
1680
        """Call a callable that raises python warnings.
 
1681
 
 
1682
        The caller's responsible for examining the returned warnings.
 
1683
 
 
1684
        If the callable raises an exception, the exception is not
 
1685
        caught and propagates up to the caller.  In that case, the list
 
1686
        of warnings is not available.
 
1687
 
 
1688
        :returns: ([warning_object, ...], fn_result)
 
1689
        """
 
1690
        # XXX: This is not perfect, because it completely overrides the
 
1691
        # warnings filters, and some code may depend on suppressing particular
 
1692
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1693
        # though.  -- Andrew, 20071062
 
1694
        wlist = []
 
1695
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1696
            # despite the name, 'message' is normally(?) a Warning subclass
 
1697
            # instance
 
1698
            wlist.append(message)
 
1699
        saved_showwarning = warnings.showwarning
 
1700
        saved_filters = warnings.filters
 
1701
        try:
 
1702
            warnings.showwarning = _catcher
 
1703
            warnings.filters = []
 
1704
            result = fn(*args, **kw)
 
1705
        finally:
 
1706
            warnings.showwarning = saved_showwarning
 
1707
            warnings.filters = saved_filters
 
1708
        return wlist, result
 
1709
 
 
1710
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1711
        """Assert that a callable is deprecated in a particular way.
 
1712
 
 
1713
        This is a very precise test for unusual requirements. The
 
1714
        applyDeprecated helper function is probably more suited for most tests
 
1715
        as it allows you to simply specify the deprecation format being used
 
1716
        and will ensure that that is issued for the function being called.
 
1717
 
 
1718
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1719
        not other callers that go direct to the warning module.  To catch
 
1720
        general warnings, use callCatchWarnings.
 
1721
 
 
1722
        :param expected: a list of the deprecation warnings expected, in order
 
1723
        :param callable: The callable to call
 
1724
        :param args: The positional arguments for the callable
 
1725
        :param kwargs: The keyword arguments for the callable
 
1726
        """
 
1727
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1728
            *args, **kwargs)
 
1729
        self.assertEqual(expected, call_warnings)
 
1730
        return result
 
1731
 
 
1732
    def _startLogFile(self):
 
1733
        """Setup a in-memory target for bzr and testcase log messages"""
 
1734
        pseudo_log_file = StringIO()
 
1735
        def _get_log_contents_for_weird_testtools_api():
 
1736
            return [pseudo_log_file.getvalue().decode(
 
1737
                "utf-8", "replace").encode("utf-8")]
 
1738
        self.addDetail("log", content.Content(content.ContentType("text",
 
1739
            "plain", {"charset": "utf8"}),
 
1740
            _get_log_contents_for_weird_testtools_api))
 
1741
        self._log_file = pseudo_log_file
 
1742
        self._log_memento = trace.push_log_file(self._log_file)
 
1743
        self.addCleanup(self._finishLogFile)
 
1744
 
 
1745
    def _finishLogFile(self):
 
1746
        """Flush and dereference the in-memory log for this testcase"""
 
1747
        if trace._trace_file:
 
1748
            # flush the log file, to get all content
 
1749
            trace._trace_file.flush()
 
1750
        trace.pop_log_file(self._log_memento)
 
1751
        # The logging module now tracks references for cleanup so discard ours
 
1752
        del self._log_memento
 
1753
 
 
1754
    def thisFailsStrictLockCheck(self):
 
1755
        """It is known that this test would fail with -Dstrict_locks.
 
1756
 
 
1757
        By default, all tests are run with strict lock checking unless
 
1758
        -Edisable_lock_checks is supplied. However there are some tests which
 
1759
        we know fail strict locks at this point that have not been fixed.
 
1760
        They should call this function to disable the strict checking.
 
1761
 
 
1762
        This should be used sparingly, it is much better to fix the locking
 
1763
        issues rather than papering over the problem by calling this function.
 
1764
        """
 
1765
        debug.debug_flags.discard('strict_locks')
 
1766
 
 
1767
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1768
        """Overrides an object attribute restoring it after the test.
 
1769
 
 
1770
        :note: This should be used with discretion; you should think about
 
1771
        whether it's better to make the code testable without monkey-patching.
 
1772
 
 
1773
        :param obj: The object that will be mutated.
 
1774
 
 
1775
        :param attr_name: The attribute name we want to preserve/override in
 
1776
            the object.
 
1777
 
 
1778
        :param new: The optional value we want to set the attribute to.
 
1779
 
 
1780
        :returns: The actual attr value.
 
1781
        """
 
1782
        value = getattr(obj, attr_name)
 
1783
        # The actual value is captured by the call below
 
1784
        self.addCleanup(setattr, obj, attr_name, value)
 
1785
        if new is not _unitialized_attr:
 
1786
            setattr(obj, attr_name, new)
 
1787
        return value
 
1788
 
 
1789
    def overrideEnv(self, name, new):
 
1790
        """Set an environment variable, and reset it after the test.
 
1791
 
 
1792
        :param name: The environment variable name.
 
1793
 
 
1794
        :param new: The value to set the variable to. If None, the 
 
1795
            variable is deleted from the environment.
 
1796
 
 
1797
        :returns: The actual variable value.
 
1798
        """
 
1799
        value = osutils.set_or_unset_env(name, new)
 
1800
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1801
        return value
 
1802
 
 
1803
    def recordCalls(self, obj, attr_name):
 
1804
        """Monkeypatch in a wrapper that will record calls.
 
1805
 
 
1806
        The monkeypatch is automatically removed when the test concludes.
 
1807
 
 
1808
        :param obj: The namespace holding the reference to be replaced;
 
1809
            typically a module, class, or object.
 
1810
        :param attr_name: A string for the name of the attribute to 
 
1811
            patch.
 
1812
        :returns: A list that will be extended with one item every time the
 
1813
            function is called, with a tuple of (args, kwargs).
 
1814
        """
 
1815
        calls = []
 
1816
 
 
1817
        def decorator(*args, **kwargs):
 
1818
            calls.append((args, kwargs))
 
1819
            return orig(*args, **kwargs)
 
1820
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1821
        return calls
 
1822
 
 
1823
    def _cleanEnvironment(self):
 
1824
        for name, value in isolated_environ.iteritems():
 
1825
            self.overrideEnv(name, value)
 
1826
 
 
1827
    def _restoreHooks(self):
 
1828
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1829
            setattr(klass, name, hooks)
 
1830
        self._preserved_hooks.clear()
 
1831
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1832
        self._preserved_lazy_hooks.clear()
 
1833
 
 
1834
    def knownFailure(self, reason):
 
1835
        """Declare that this test fails for a known reason
 
1836
 
 
1837
        Tests that are known to fail should generally be using expectedFailure
 
1838
        with an appropriate reverse assertion if a change could cause the test
 
1839
        to start passing. Conversely if the test has no immediate prospect of
 
1840
        succeeding then using skip is more suitable.
 
1841
 
 
1842
        When this method is called while an exception is being handled, that
 
1843
        traceback will be used, otherwise a new exception will be thrown to
 
1844
        provide one but won't be reported.
 
1845
        """
 
1846
        self._add_reason(reason)
 
1847
        try:
 
1848
            exc_info = sys.exc_info()
 
1849
            if exc_info != (None, None, None):
 
1850
                self._report_traceback(exc_info)
 
1851
            else:
 
1852
                try:
 
1853
                    raise self.failureException(reason)
 
1854
                except self.failureException:
 
1855
                    exc_info = sys.exc_info()
 
1856
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1857
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1858
        finally:
 
1859
            del exc_info
 
1860
 
 
1861
    def _suppress_log(self):
 
1862
        """Remove the log info from details."""
 
1863
        self.discardDetail('log')
 
1864
 
 
1865
    def _do_skip(self, result, reason):
 
1866
        self._suppress_log()
 
1867
        addSkip = getattr(result, 'addSkip', None)
 
1868
        if not callable(addSkip):
 
1869
            result.addSuccess(result)
 
1870
        else:
 
1871
            addSkip(self, reason)
 
1872
 
 
1873
    @staticmethod
 
1874
    def _do_known_failure(self, result, e):
 
1875
        self._suppress_log()
 
1876
        err = sys.exc_info()
 
1877
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1878
        if addExpectedFailure is not None:
 
1879
            addExpectedFailure(self, err)
 
1880
        else:
 
1881
            result.addSuccess(self)
 
1882
 
 
1883
    @staticmethod
 
1884
    def _do_not_applicable(self, result, e):
 
1885
        if not e.args:
 
1886
            reason = 'No reason given'
 
1887
        else:
 
1888
            reason = e.args[0]
 
1889
        self._suppress_log ()
 
1890
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1891
        if addNotApplicable is not None:
 
1892
            result.addNotApplicable(self, reason)
 
1893
        else:
 
1894
            self._do_skip(result, reason)
 
1895
 
 
1896
    @staticmethod
 
1897
    def _report_skip(self, result, err):
 
1898
        """Override the default _report_skip.
 
1899
 
 
1900
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1901
        already been formatted into the 'reason' string, and we can't pull it
 
1902
        out again.
 
1903
        """
 
1904
        self._suppress_log()
 
1905
        super(TestCase, self)._report_skip(self, result, err)
 
1906
 
 
1907
    @staticmethod
 
1908
    def _report_expected_failure(self, result, err):
 
1909
        """Strip the log.
 
1910
 
 
1911
        See _report_skip for motivation.
 
1912
        """
 
1913
        self._suppress_log()
 
1914
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1915
 
 
1916
    @staticmethod
 
1917
    def _do_unsupported_or_skip(self, result, e):
 
1918
        reason = e.args[0]
 
1919
        self._suppress_log()
 
1920
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1921
        if addNotSupported is not None:
 
1922
            result.addNotSupported(self, reason)
 
1923
        else:
 
1924
            self._do_skip(result, reason)
 
1925
 
 
1926
    def time(self, callable, *args, **kwargs):
 
1927
        """Run callable and accrue the time it takes to the benchmark time.
 
1928
 
 
1929
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1930
        this will cause lsprofile statistics to be gathered and stored in
 
1931
        self._benchcalls.
 
1932
        """
 
1933
        if self._benchtime is None:
 
1934
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1935
                "text", "plain"), lambda:[str(self._benchtime)]))
 
1936
            self._benchtime = 0
 
1937
        start = time.time()
 
1938
        try:
 
1939
            if not self._gather_lsprof_in_benchmarks:
 
1940
                return callable(*args, **kwargs)
 
1941
            else:
 
1942
                # record this benchmark
 
1943
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1944
                stats.sort()
 
1945
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1946
                return ret
 
1947
        finally:
 
1948
            self._benchtime += time.time() - start
 
1949
 
 
1950
    def log(self, *args):
 
1951
        trace.mutter(*args)
 
1952
 
 
1953
    def get_log(self):
 
1954
        """Get a unicode string containing the log from bzrlib.trace.
 
1955
 
 
1956
        Undecodable characters are replaced.
 
1957
        """
 
1958
        return u"".join(self.getDetails()['log'].iter_text())
 
1959
 
 
1960
    def requireFeature(self, feature):
 
1961
        """This test requires a specific feature is available.
 
1962
 
 
1963
        :raises UnavailableFeature: When feature is not available.
 
1964
        """
 
1965
        if not feature.available():
 
1966
            raise UnavailableFeature(feature)
 
1967
 
 
1968
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1969
            working_dir):
 
1970
        """Run bazaar command line, splitting up a string command line."""
 
1971
        if isinstance(args, basestring):
 
1972
            # shlex don't understand unicode strings,
 
1973
            # so args should be plain string (bialix 20070906)
 
1974
            args = list(shlex.split(str(args)))
 
1975
        return self._run_bzr_core(args, retcode=retcode,
 
1976
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1977
                )
 
1978
 
 
1979
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1980
            working_dir):
 
1981
        # Clear chk_map page cache, because the contents are likely to mask
 
1982
        # locking errors.
 
1983
        chk_map.clear_cache()
 
1984
        if encoding is None:
 
1985
            encoding = osutils.get_user_encoding()
 
1986
        stdout = StringIOWrapper()
 
1987
        stderr = StringIOWrapper()
 
1988
        stdout.encoding = encoding
 
1989
        stderr.encoding = encoding
 
1990
 
 
1991
        self.log('run bzr: %r', args)
 
1992
        # FIXME: don't call into logging here
 
1993
        handler = trace.EncodedStreamHandler(stderr, errors="replace",
 
1994
            level=logging.INFO)
 
1995
        logger = logging.getLogger('')
 
1996
        logger.addHandler(handler)
 
1997
        old_ui_factory = ui.ui_factory
 
1998
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1999
 
 
2000
        cwd = None
 
2001
        if working_dir is not None:
 
2002
            cwd = osutils.getcwd()
 
2003
            os.chdir(working_dir)
 
2004
 
 
2005
        try:
 
2006
            try:
 
2007
                result = self.apply_redirected(
 
2008
                    ui.ui_factory.stdin,
 
2009
                    stdout, stderr,
 
2010
                    _mod_commands.run_bzr_catch_user_errors,
 
2011
                    args)
 
2012
            except KeyboardInterrupt:
 
2013
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
2014
                # and stderr as arguments, for tests which are interested in
 
2015
                # stdout and stderr and are expecting the exception.
 
2016
                out = stdout.getvalue()
 
2017
                err = stderr.getvalue()
 
2018
                if out:
 
2019
                    self.log('output:\n%r', out)
 
2020
                if err:
 
2021
                    self.log('errors:\n%r', err)
 
2022
                raise KeyboardInterrupt(out, err)
 
2023
        finally:
 
2024
            logger.removeHandler(handler)
 
2025
            ui.ui_factory = old_ui_factory
 
2026
            if cwd is not None:
 
2027
                os.chdir(cwd)
 
2028
 
 
2029
        out = stdout.getvalue()
 
2030
        err = stderr.getvalue()
 
2031
        if out:
 
2032
            self.log('output:\n%r', out)
 
2033
        if err:
 
2034
            self.log('errors:\n%r', err)
 
2035
        if retcode is not None:
 
2036
            self.assertEquals(retcode, result,
 
2037
                              message='Unexpected return code')
 
2038
        return result, out, err
 
2039
 
 
2040
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
2041
                working_dir=None, error_regexes=[], output_encoding=None):
 
2042
        """Invoke bzr, as if it were run from the command line.
 
2043
 
 
2044
        The argument list should not include the bzr program name - the
 
2045
        first argument is normally the bzr command.  Arguments may be
 
2046
        passed in three ways:
 
2047
 
 
2048
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
2049
        when the command contains whitespace or metacharacters, or
 
2050
        is built up at run time.
 
2051
 
 
2052
        2- A single string, eg "add a".  This is the most convenient
 
2053
        for hardcoded commands.
 
2054
 
 
2055
        This runs bzr through the interface that catches and reports
 
2056
        errors, and with logging set to something approximating the
 
2057
        default, so that error reporting can be checked.
 
2058
 
 
2059
        This should be the main method for tests that want to exercise the
 
2060
        overall behavior of the bzr application (rather than a unit test
 
2061
        or a functional test of the library.)
 
2062
 
 
2063
        This sends the stdout/stderr results into the test's log,
 
2064
        where it may be useful for debugging.  See also run_captured.
 
2065
 
 
2066
        :keyword stdin: A string to be used as stdin for the command.
 
2067
        :keyword retcode: The status code the command should return;
 
2068
            default 0.
 
2069
        :keyword working_dir: The directory to run the command in
 
2070
        :keyword error_regexes: A list of expected error messages.  If
 
2071
            specified they must be seen in the error output of the command.
 
2072
        """
 
2073
        retcode, out, err = self._run_bzr_autosplit(
 
2074
            args=args,
 
2075
            retcode=retcode,
 
2076
            encoding=encoding,
 
2077
            stdin=stdin,
 
2078
            working_dir=working_dir,
 
2079
            )
 
2080
        self.assertIsInstance(error_regexes, (list, tuple))
 
2081
        for regex in error_regexes:
 
2082
            self.assertContainsRe(err, regex)
 
2083
        return out, err
 
2084
 
 
2085
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2086
        """Run bzr, and check that stderr contains the supplied regexes
 
2087
 
 
2088
        :param error_regexes: Sequence of regular expressions which
 
2089
            must each be found in the error output. The relative ordering
 
2090
            is not enforced.
 
2091
        :param args: command-line arguments for bzr
 
2092
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
2093
            This function changes the default value of retcode to be 3,
 
2094
            since in most cases this is run when you expect bzr to fail.
 
2095
 
 
2096
        :return: (out, err) The actual output of running the command (in case
 
2097
            you want to do more inspection)
 
2098
 
 
2099
        Examples of use::
 
2100
 
 
2101
            # Make sure that commit is failing because there is nothing to do
 
2102
            self.run_bzr_error(['no changes to commit'],
 
2103
                               ['commit', '-m', 'my commit comment'])
 
2104
            # Make sure --strict is handling an unknown file, rather than
 
2105
            # giving us the 'nothing to do' error
 
2106
            self.build_tree(['unknown'])
 
2107
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
2108
                               ['commit', --strict', '-m', 'my commit comment'])
 
2109
        """
 
2110
        kwargs.setdefault('retcode', 3)
 
2111
        kwargs['error_regexes'] = error_regexes
 
2112
        out, err = self.run_bzr(*args, **kwargs)
 
2113
        return out, err
 
2114
 
 
2115
    def run_bzr_subprocess(self, *args, **kwargs):
 
2116
        """Run bzr in a subprocess for testing.
 
2117
 
 
2118
        This starts a new Python interpreter and runs bzr in there.
 
2119
        This should only be used for tests that have a justifiable need for
 
2120
        this isolation: e.g. they are testing startup time, or signal
 
2121
        handling, or early startup code, etc.  Subprocess code can't be
 
2122
        profiled or debugged so easily.
 
2123
 
 
2124
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2125
            None is supplied, the status code is not checked.
 
2126
        :keyword env_changes: A dictionary which lists changes to environment
 
2127
            variables. A value of None will unset the env variable.
 
2128
            The values must be strings. The change will only occur in the
 
2129
            child, so you don't need to fix the environment after running.
 
2130
        :keyword universal_newlines: Convert CRLF => LF
 
2131
        :keyword allow_plugins: By default the subprocess is run with
 
2132
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2133
            for system-wide plugins to create unexpected output on stderr,
 
2134
            which can cause unnecessary test failures.
 
2135
        """
 
2136
        env_changes = kwargs.get('env_changes', {})
 
2137
        working_dir = kwargs.get('working_dir', None)
 
2138
        allow_plugins = kwargs.get('allow_plugins', False)
 
2139
        if len(args) == 1:
 
2140
            if isinstance(args[0], list):
 
2141
                args = args[0]
 
2142
            elif isinstance(args[0], basestring):
 
2143
                args = list(shlex.split(args[0]))
 
2144
        else:
 
2145
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2146
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2147
                                            working_dir=working_dir,
 
2148
                                            allow_plugins=allow_plugins)
 
2149
        # We distinguish between retcode=None and retcode not passed.
 
2150
        supplied_retcode = kwargs.get('retcode', 0)
 
2151
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2152
            universal_newlines=kwargs.get('universal_newlines', False),
 
2153
            process_args=args)
 
2154
 
 
2155
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2156
                             skip_if_plan_to_signal=False,
 
2157
                             working_dir=None,
 
2158
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2159
        """Start bzr in a subprocess for testing.
 
2160
 
 
2161
        This starts a new Python interpreter and runs bzr in there.
 
2162
        This should only be used for tests that have a justifiable need for
 
2163
        this isolation: e.g. they are testing startup time, or signal
 
2164
        handling, or early startup code, etc.  Subprocess code can't be
 
2165
        profiled or debugged so easily.
 
2166
 
 
2167
        :param process_args: a list of arguments to pass to the bzr executable,
 
2168
            for example ``['--version']``.
 
2169
        :param env_changes: A dictionary which lists changes to environment
 
2170
            variables. A value of None will unset the env variable.
 
2171
            The values must be strings. The change will only occur in the
 
2172
            child, so you don't need to fix the environment after running.
 
2173
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2174
            doesn't support signalling subprocesses.
 
2175
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2176
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2177
            are those valid for the stderr argument of `subprocess.Popen`.
 
2178
            Default value is ``subprocess.PIPE``.
 
2179
 
 
2180
        :returns: Popen object for the started process.
 
2181
        """
 
2182
        if skip_if_plan_to_signal:
 
2183
            if os.name != "posix":
 
2184
                raise TestSkipped("Sending signals not supported")
 
2185
 
 
2186
        if env_changes is None:
 
2187
            env_changes = {}
 
2188
        # Because $HOME is set to a tempdir for the context of a test, modules
 
2189
        # installed in the user dir will not be found unless $PYTHONUSERBASE
 
2190
        # gets set to the computed directory of this parent process.
 
2191
        if site.USER_BASE is not None:
 
2192
            env_changes["PYTHONUSERBASE"] = site.USER_BASE
 
2193
        old_env = {}
 
2194
 
 
2195
        def cleanup_environment():
 
2196
            for env_var, value in env_changes.iteritems():
 
2197
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2198
 
 
2199
        def restore_environment():
 
2200
            for env_var, value in old_env.iteritems():
 
2201
                osutils.set_or_unset_env(env_var, value)
 
2202
 
 
2203
        bzr_path = self.get_bzr_path()
 
2204
 
 
2205
        cwd = None
 
2206
        if working_dir is not None:
 
2207
            cwd = osutils.getcwd()
 
2208
            os.chdir(working_dir)
 
2209
 
 
2210
        try:
 
2211
            # win32 subprocess doesn't support preexec_fn
 
2212
            # so we will avoid using it on all platforms, just to
 
2213
            # make sure the code path is used, and we don't break on win32
 
2214
            cleanup_environment()
 
2215
            # Include the subprocess's log file in the test details, in case
 
2216
            # the test fails due to an error in the subprocess.
 
2217
            self._add_subprocess_log(trace._get_bzr_log_filename())
 
2218
            command = [sys.executable]
 
2219
            # frozen executables don't need the path to bzr
 
2220
            if getattr(sys, "frozen", None) is None:
 
2221
                command.append(bzr_path)
 
2222
            if not allow_plugins:
 
2223
                command.append('--no-plugins')
 
2224
            command.extend(process_args)
 
2225
            process = self._popen(command, stdin=subprocess.PIPE,
 
2226
                                  stdout=subprocess.PIPE,
 
2227
                                  stderr=stderr)
 
2228
        finally:
 
2229
            restore_environment()
 
2230
            if cwd is not None:
 
2231
                os.chdir(cwd)
 
2232
 
 
2233
        return process
 
2234
 
 
2235
    def _add_subprocess_log(self, log_file_path):
 
2236
        if len(self._log_files) == 0:
 
2237
            # Register an addCleanup func.  We do this on the first call to
 
2238
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2239
            # addCleanup is registered after any cleanups for tempdirs that
 
2240
            # subclasses might create, which will probably remove the log file
 
2241
            # we want to read.
 
2242
            self.addCleanup(self._subprocess_log_cleanup)
 
2243
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2244
        # twice.
 
2245
        self._log_files.add(log_file_path)
 
2246
 
 
2247
    def _subprocess_log_cleanup(self):
 
2248
        for count, log_file_path in enumerate(self._log_files):
 
2249
            # We use buffer_now=True to avoid holding the file open beyond
 
2250
            # the life of this function, which might interfere with e.g.
 
2251
            # cleaning tempdirs on Windows.
 
2252
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2253
            #detail_content = content.content_from_file(
 
2254
            #    log_file_path, buffer_now=True)
 
2255
            with open(log_file_path, 'rb') as log_file:
 
2256
                log_file_bytes = log_file.read()
 
2257
            detail_content = content.Content(content.ContentType("text",
 
2258
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2259
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2260
                detail_content)
 
2261
 
 
2262
    def _popen(self, *args, **kwargs):
 
2263
        """Place a call to Popen.
 
2264
 
 
2265
        Allows tests to override this method to intercept the calls made to
 
2266
        Popen for introspection.
 
2267
        """
 
2268
        return subprocess.Popen(*args, **kwargs)
 
2269
 
 
2270
    def get_source_path(self):
 
2271
        """Return the path of the directory containing bzrlib."""
 
2272
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2273
 
 
2274
    def get_bzr_path(self):
 
2275
        """Return the path of the 'bzr' executable for this test suite."""
 
2276
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
2277
        if not os.path.isfile(bzr_path):
 
2278
            # We are probably installed. Assume sys.argv is the right file
 
2279
            bzr_path = sys.argv[0]
 
2280
        return bzr_path
 
2281
 
 
2282
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2283
                              universal_newlines=False, process_args=None):
 
2284
        """Finish the execution of process.
 
2285
 
 
2286
        :param process: the Popen object returned from start_bzr_subprocess.
 
2287
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2288
            None is supplied, the status code is not checked.
 
2289
        :param send_signal: an optional signal to send to the process.
 
2290
        :param universal_newlines: Convert CRLF => LF
 
2291
        :returns: (stdout, stderr)
 
2292
        """
 
2293
        if send_signal is not None:
 
2294
            os.kill(process.pid, send_signal)
 
2295
        out, err = process.communicate()
 
2296
 
 
2297
        if universal_newlines:
 
2298
            out = out.replace('\r\n', '\n')
 
2299
            err = err.replace('\r\n', '\n')
 
2300
 
 
2301
        if retcode is not None and retcode != process.returncode:
 
2302
            if process_args is None:
 
2303
                process_args = "(unknown args)"
 
2304
            trace.mutter('Output of bzr %s:\n%s', process_args, out)
 
2305
            trace.mutter('Error for bzr %s:\n%s', process_args, err)
 
2306
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2307
                      % (process_args, retcode, process.returncode))
 
2308
        return [out, err]
 
2309
 
 
2310
    def check_tree_shape(self, tree, shape):
 
2311
        """Compare a tree to a list of expected names.
 
2312
 
 
2313
        Fail if they are not precisely equal.
 
2314
        """
 
2315
        extras = []
 
2316
        shape = list(shape)             # copy
 
2317
        for path, ie in tree.iter_entries_by_dir():
 
2318
            name = path.replace('\\', '/')
 
2319
            if ie.kind == 'directory':
 
2320
                name = name + '/'
 
2321
            if name == "/":
 
2322
                pass # ignore root entry
 
2323
            elif name in shape:
 
2324
                shape.remove(name)
 
2325
            else:
 
2326
                extras.append(name)
 
2327
        if shape:
 
2328
            self.fail("expected paths not found in inventory: %r" % shape)
 
2329
        if extras:
 
2330
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2331
 
 
2332
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2333
                         a_callable=None, *args, **kwargs):
 
2334
        """Call callable with redirected std io pipes.
 
2335
 
 
2336
        Returns the return code."""
 
2337
        if not callable(a_callable):
 
2338
            raise ValueError("a_callable must be callable.")
 
2339
        if stdin is None:
 
2340
            stdin = StringIO("")
 
2341
        if stdout is None:
 
2342
            if getattr(self, "_log_file", None) is not None:
 
2343
                stdout = self._log_file
 
2344
            else:
 
2345
                stdout = StringIO()
 
2346
        if stderr is None:
 
2347
            if getattr(self, "_log_file", None is not None):
 
2348
                stderr = self._log_file
 
2349
            else:
 
2350
                stderr = StringIO()
 
2351
        real_stdin = sys.stdin
 
2352
        real_stdout = sys.stdout
 
2353
        real_stderr = sys.stderr
 
2354
        try:
 
2355
            sys.stdout = stdout
 
2356
            sys.stderr = stderr
 
2357
            sys.stdin = stdin
 
2358
            return a_callable(*args, **kwargs)
 
2359
        finally:
 
2360
            sys.stdout = real_stdout
 
2361
            sys.stderr = real_stderr
 
2362
            sys.stdin = real_stdin
 
2363
 
 
2364
    def reduceLockdirTimeout(self):
 
2365
        """Reduce the default lock timeout for the duration of the test, so that
 
2366
        if LockContention occurs during a test, it does so quickly.
 
2367
 
 
2368
        Tests that expect to provoke LockContention errors should call this.
 
2369
        """
 
2370
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2371
 
 
2372
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2373
        """Return a StringIOWrapper instance, that will encode Unicode
 
2374
        input to UTF-8.
 
2375
        """
 
2376
        if encoding_type is None:
 
2377
            encoding_type = 'strict'
 
2378
        sio = StringIO()
 
2379
        output_encoding = 'utf-8'
 
2380
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2381
        sio.encoding = output_encoding
 
2382
        return sio
 
2383
 
 
2384
    def disable_verb(self, verb):
 
2385
        """Disable a smart server verb for one test."""
 
2386
        from bzrlib.smart import request
 
2387
        request_handlers = request.request_handlers
 
2388
        orig_method = request_handlers.get(verb)
 
2389
        orig_info = request_handlers.get_info(verb)
 
2390
        request_handlers.remove(verb)
 
2391
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2392
            info=orig_info)
 
2393
 
 
2394
 
 
2395
class CapturedCall(object):
 
2396
    """A helper for capturing smart server calls for easy debug analysis."""
 
2397
 
 
2398
    def __init__(self, params, prefix_length):
 
2399
        """Capture the call with params and skip prefix_length stack frames."""
 
2400
        self.call = params
 
2401
        import traceback
 
2402
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2403
        # client frames. Beyond this we could get more clever, but this is good
 
2404
        # enough for now.
 
2405
        stack = traceback.extract_stack()[prefix_length:-5]
 
2406
        self.stack = ''.join(traceback.format_list(stack))
 
2407
 
 
2408
    def __str__(self):
 
2409
        return self.call.method
 
2410
 
 
2411
    def __repr__(self):
 
2412
        return self.call.method
 
2413
 
 
2414
    def stack(self):
 
2415
        return self.stack
 
2416
 
 
2417
 
 
2418
class TestCaseWithMemoryTransport(TestCase):
 
2419
    """Common test class for tests that do not need disk resources.
 
2420
 
 
2421
    Tests that need disk resources should derive from TestCaseInTempDir
 
2422
    orTestCaseWithTransport.
 
2423
 
 
2424
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2425
 
 
2426
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2427
    a directory which does not exist. This serves to help ensure test isolation
 
2428
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2429
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2430
    defaults for the transport in tests, nor does it obey the command line
 
2431
    override, so tests that accidentally write to the common directory should
 
2432
    be rare.
 
2433
 
 
2434
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2435
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2436
    """
 
2437
 
 
2438
    TEST_ROOT = None
 
2439
    _TEST_NAME = 'test'
 
2440
 
 
2441
    def __init__(self, methodName='runTest'):
 
2442
        # allow test parameterization after test construction and before test
 
2443
        # execution. Variables that the parameterizer sets need to be
 
2444
        # ones that are not set by setUp, or setUp will trash them.
 
2445
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2446
        self.vfs_transport_factory = default_transport
 
2447
        self.transport_server = None
 
2448
        self.transport_readonly_server = None
 
2449
        self.__vfs_server = None
 
2450
 
 
2451
    def get_transport(self, relpath=None):
 
2452
        """Return a writeable transport.
 
2453
 
 
2454
        This transport is for the test scratch space relative to
 
2455
        "self._test_root"
 
2456
 
 
2457
        :param relpath: a path relative to the base url.
 
2458
        """
 
2459
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2460
        self.assertFalse(t.is_readonly())
 
2461
        return t
 
2462
 
 
2463
    def get_readonly_transport(self, relpath=None):
 
2464
        """Return a readonly transport for the test scratch space
 
2465
 
 
2466
        This can be used to test that operations which should only need
 
2467
        readonly access in fact do not try to write.
 
2468
 
 
2469
        :param relpath: a path relative to the base url.
 
2470
        """
 
2471
        t = _mod_transport.get_transport_from_url(
 
2472
            self.get_readonly_url(relpath))
 
2473
        self.assertTrue(t.is_readonly())
 
2474
        return t
 
2475
 
 
2476
    def create_transport_readonly_server(self):
 
2477
        """Create a transport server from class defined at init.
 
2478
 
 
2479
        This is mostly a hook for daughter classes.
 
2480
        """
 
2481
        return self.transport_readonly_server()
 
2482
 
 
2483
    def get_readonly_server(self):
 
2484
        """Get the server instance for the readonly transport
 
2485
 
 
2486
        This is useful for some tests with specific servers to do diagnostics.
 
2487
        """
 
2488
        if self.__readonly_server is None:
 
2489
            if self.transport_readonly_server is None:
 
2490
                # readonly decorator requested
 
2491
                self.__readonly_server = test_server.ReadonlyServer()
 
2492
            else:
 
2493
                # explicit readonly transport.
 
2494
                self.__readonly_server = self.create_transport_readonly_server()
 
2495
            self.start_server(self.__readonly_server,
 
2496
                self.get_vfs_only_server())
 
2497
        return self.__readonly_server
 
2498
 
 
2499
    def get_readonly_url(self, relpath=None):
 
2500
        """Get a URL for the readonly transport.
 
2501
 
 
2502
        This will either be backed by '.' or a decorator to the transport
 
2503
        used by self.get_url()
 
2504
        relpath provides for clients to get a path relative to the base url.
 
2505
        These should only be downwards relative, not upwards.
 
2506
        """
 
2507
        base = self.get_readonly_server().get_url()
 
2508
        return self._adjust_url(base, relpath)
 
2509
 
 
2510
    def get_vfs_only_server(self):
 
2511
        """Get the vfs only read/write server instance.
 
2512
 
 
2513
        This is useful for some tests with specific servers that need
 
2514
        diagnostics.
 
2515
 
 
2516
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2517
        is no means to override it.
 
2518
        """
 
2519
        if self.__vfs_server is None:
 
2520
            self.__vfs_server = memory.MemoryServer()
 
2521
            self.start_server(self.__vfs_server)
 
2522
        return self.__vfs_server
 
2523
 
 
2524
    def get_server(self):
 
2525
        """Get the read/write server instance.
 
2526
 
 
2527
        This is useful for some tests with specific servers that need
 
2528
        diagnostics.
 
2529
 
 
2530
        This is built from the self.transport_server factory. If that is None,
 
2531
        then the self.get_vfs_server is returned.
 
2532
        """
 
2533
        if self.__server is None:
 
2534
            if (self.transport_server is None or self.transport_server is
 
2535
                self.vfs_transport_factory):
 
2536
                self.__server = self.get_vfs_only_server()
 
2537
            else:
 
2538
                # bring up a decorated means of access to the vfs only server.
 
2539
                self.__server = self.transport_server()
 
2540
                self.start_server(self.__server, self.get_vfs_only_server())
 
2541
        return self.__server
 
2542
 
 
2543
    def _adjust_url(self, base, relpath):
 
2544
        """Get a URL (or maybe a path) for the readwrite transport.
 
2545
 
 
2546
        This will either be backed by '.' or to an equivalent non-file based
 
2547
        facility.
 
2548
        relpath provides for clients to get a path relative to the base url.
 
2549
        These should only be downwards relative, not upwards.
 
2550
        """
 
2551
        if relpath is not None and relpath != '.':
 
2552
            if not base.endswith('/'):
 
2553
                base = base + '/'
 
2554
            # XXX: Really base should be a url; we did after all call
 
2555
            # get_url()!  But sometimes it's just a path (from
 
2556
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2557
            # to a non-escaped local path.
 
2558
            if base.startswith('./') or base.startswith('/'):
 
2559
                base += relpath
 
2560
            else:
 
2561
                base += urlutils.escape(relpath)
 
2562
        return base
 
2563
 
 
2564
    def get_url(self, relpath=None):
 
2565
        """Get a URL (or maybe a path) for the readwrite transport.
 
2566
 
 
2567
        This will either be backed by '.' or to an equivalent non-file based
 
2568
        facility.
 
2569
        relpath provides for clients to get a path relative to the base url.
 
2570
        These should only be downwards relative, not upwards.
 
2571
        """
 
2572
        base = self.get_server().get_url()
 
2573
        return self._adjust_url(base, relpath)
 
2574
 
 
2575
    def get_vfs_only_url(self, relpath=None):
 
2576
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2577
 
 
2578
        This will never be a smart protocol.  It always has all the
 
2579
        capabilities of the local filesystem, but it might actually be a
 
2580
        MemoryTransport or some other similar virtual filesystem.
 
2581
 
 
2582
        This is the backing transport (if any) of the server returned by
 
2583
        get_url and get_readonly_url.
 
2584
 
 
2585
        :param relpath: provides for clients to get a path relative to the base
 
2586
            url.  These should only be downwards relative, not upwards.
 
2587
        :return: A URL
 
2588
        """
 
2589
        base = self.get_vfs_only_server().get_url()
 
2590
        return self._adjust_url(base, relpath)
 
2591
 
 
2592
    def _create_safety_net(self):
 
2593
        """Make a fake bzr directory.
 
2594
 
 
2595
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2596
        real branch.
 
2597
        """
 
2598
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2599
        # Make sure we get a readable and accessible home for .bzr.log
 
2600
        # and/or config files, and not fallback to weird defaults (see
 
2601
        # http://pad.lv/825027).
 
2602
        self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2603
        os.environ['BZR_HOME'] = root
 
2604
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2605
        del os.environ['BZR_HOME']
 
2606
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2607
        # we don't need to re-open the wt to check it hasn't changed.
 
2608
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2609
            wt.control_transport.get_bytes('dirstate'))
 
2610
 
 
2611
    def _check_safety_net(self):
 
2612
        """Check that the safety .bzr directory have not been touched.
 
2613
 
 
2614
        _make_test_root have created a .bzr directory to prevent tests from
 
2615
        propagating. This method ensures than a test did not leaked.
 
2616
        """
 
2617
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2618
        t = _mod_transport.get_transport_from_path(root)
 
2619
        self.permit_url(t.base)
 
2620
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2621
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2622
            # The current test have modified the /bzr directory, we need to
 
2623
            # recreate a new one or all the followng tests will fail.
 
2624
            # If you need to inspect its content uncomment the following line
 
2625
            # import pdb; pdb.set_trace()
 
2626
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2627
            self._create_safety_net()
 
2628
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2629
 
 
2630
    def _make_test_root(self):
 
2631
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2632
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2633
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2634
                                                    suffix='.tmp'))
 
2635
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2636
 
 
2637
            self._create_safety_net()
 
2638
 
 
2639
            # The same directory is used by all tests, and we're not
 
2640
            # specifically told when all tests are finished.  This will do.
 
2641
            atexit.register(_rmtree_temp_dir, root)
 
2642
 
 
2643
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2644
        self.addCleanup(self._check_safety_net)
 
2645
 
 
2646
    def makeAndChdirToTestDir(self):
 
2647
        """Create a temporary directories for this one test.
 
2648
 
 
2649
        This must set self.test_home_dir and self.test_dir and chdir to
 
2650
        self.test_dir.
 
2651
 
 
2652
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2653
        """
 
2654
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2655
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2656
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2657
        self.permit_dir(self.test_dir)
 
2658
 
 
2659
    def make_branch(self, relpath, format=None):
 
2660
        """Create a branch on the transport at relpath."""
 
2661
        repo = self.make_repository(relpath, format=format)
 
2662
        return repo.bzrdir.create_branch(append_revisions_only=False)
 
2663
 
 
2664
    def get_default_format(self):
 
2665
        return 'default'
 
2666
 
 
2667
    def resolve_format(self, format):
 
2668
        """Resolve an object to a ControlDir format object.
 
2669
 
 
2670
        The initial format object can either already be
 
2671
        a ControlDirFormat, None (for the default format),
 
2672
        or a string with the name of the control dir format.
 
2673
 
 
2674
        :param format: Object to resolve
 
2675
        :return A ControlDirFormat instance
 
2676
        """
 
2677
        if format is None:
 
2678
            format = self.get_default_format()
 
2679
        if isinstance(format, basestring):
 
2680
            format = bzrdir.format_registry.make_bzrdir(format)
 
2681
        return format
 
2682
 
 
2683
    def make_bzrdir(self, relpath, format=None):
 
2684
        try:
 
2685
            # might be a relative or absolute path
 
2686
            maybe_a_url = self.get_url(relpath)
 
2687
            segments = maybe_a_url.rsplit('/', 1)
 
2688
            t = _mod_transport.get_transport(maybe_a_url)
 
2689
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2690
                t.ensure_base()
 
2691
            format = self.resolve_format(format)
 
2692
            return format.initialize_on_transport(t)
 
2693
        except errors.UninitializableFormat:
 
2694
            raise TestSkipped("Format %s is not initializable." % format)
 
2695
 
 
2696
    def make_repository(self, relpath, shared=None, format=None):
 
2697
        """Create a repository on our default transport at relpath.
 
2698
 
 
2699
        Note that relpath must be a relative path, not a full url.
 
2700
        """
 
2701
        # FIXME: If you create a remoterepository this returns the underlying
 
2702
        # real format, which is incorrect.  Actually we should make sure that
 
2703
        # RemoteBzrDir returns a RemoteRepository.
 
2704
        # maybe  mbp 20070410
 
2705
        made_control = self.make_bzrdir(relpath, format=format)
 
2706
        return made_control.create_repository(shared=shared)
 
2707
 
 
2708
    def make_smart_server(self, path, backing_server=None):
 
2709
        if backing_server is None:
 
2710
            backing_server = self.get_server()
 
2711
        smart_server = test_server.SmartTCPServer_for_testing()
 
2712
        self.start_server(smart_server, backing_server)
 
2713
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2714
                                                   ).clone(path)
 
2715
        return remote_transport
 
2716
 
 
2717
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2718
        """Create a branch on the default transport and a MemoryTree for it."""
 
2719
        b = self.make_branch(relpath, format=format)
 
2720
        return memorytree.MemoryTree.create_on_branch(b)
 
2721
 
 
2722
    def make_branch_builder(self, relpath, format=None):
 
2723
        branch = self.make_branch(relpath, format=format)
 
2724
        return branchbuilder.BranchBuilder(branch=branch)
 
2725
 
 
2726
    def overrideEnvironmentForTesting(self):
 
2727
        test_home_dir = self.test_home_dir
 
2728
        if isinstance(test_home_dir, unicode):
 
2729
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2730
        self.overrideEnv('HOME', test_home_dir)
 
2731
        self.overrideEnv('BZR_HOME', test_home_dir)
 
2732
 
 
2733
    def setUp(self):
 
2734
        super(TestCaseWithMemoryTransport, self).setUp()
 
2735
        # Ensure that ConnectedTransport doesn't leak sockets
 
2736
        def get_transport_from_url_with_cleanup(*args, **kwargs):
 
2737
            t = orig_get_transport_from_url(*args, **kwargs)
 
2738
            if isinstance(t, _mod_transport.ConnectedTransport):
 
2739
                self.addCleanup(t.disconnect)
 
2740
            return t
 
2741
 
 
2742
        orig_get_transport_from_url = self.overrideAttr(
 
2743
            _mod_transport, 'get_transport_from_url',
 
2744
            get_transport_from_url_with_cleanup)
 
2745
        self._make_test_root()
 
2746
        self.addCleanup(os.chdir, os.getcwdu())
 
2747
        self.makeAndChdirToTestDir()
 
2748
        self.overrideEnvironmentForTesting()
 
2749
        self.__readonly_server = None
 
2750
        self.__server = None
 
2751
        self.reduceLockdirTimeout()
 
2752
 
 
2753
    def setup_smart_server_with_call_log(self):
 
2754
        """Sets up a smart server as the transport server with a call log."""
 
2755
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2756
        self.hpss_calls = []
 
2757
        import traceback
 
2758
        # Skip the current stack down to the caller of
 
2759
        # setup_smart_server_with_call_log
 
2760
        prefix_length = len(traceback.extract_stack()) - 2
 
2761
        def capture_hpss_call(params):
 
2762
            self.hpss_calls.append(
 
2763
                CapturedCall(params, prefix_length))
 
2764
        client._SmartClient.hooks.install_named_hook(
 
2765
            'call', capture_hpss_call, None)
 
2766
 
 
2767
    def reset_smart_call_log(self):
 
2768
        self.hpss_calls = []
 
2769
 
 
2770
 
 
2771
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2772
    """Derived class that runs a test within a temporary directory.
 
2773
 
 
2774
    This is useful for tests that need to create a branch, etc.
 
2775
 
 
2776
    The directory is created in a slightly complex way: for each
 
2777
    Python invocation, a new temporary top-level directory is created.
 
2778
    All test cases create their own directory within that.  If the
 
2779
    tests complete successfully, the directory is removed.
 
2780
 
 
2781
    :ivar test_base_dir: The path of the top-level directory for this
 
2782
    test, which contains a home directory and a work directory.
 
2783
 
 
2784
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2785
    which is used as $HOME for this test.
 
2786
 
 
2787
    :ivar test_dir: A directory under test_base_dir used as the current
 
2788
    directory when the test proper is run.
 
2789
    """
 
2790
 
 
2791
    OVERRIDE_PYTHON = 'python'
 
2792
 
 
2793
    def setUp(self):
 
2794
        super(TestCaseInTempDir, self).setUp()
 
2795
        # Remove the protection set in isolated_environ, we have a proper
 
2796
        # access to disk resources now.
 
2797
        self.overrideEnv('BZR_LOG', None)
 
2798
 
 
2799
    def check_file_contents(self, filename, expect):
 
2800
        self.log("check contents of file %s" % filename)
 
2801
        f = file(filename)
 
2802
        try:
 
2803
            contents = f.read()
 
2804
        finally:
 
2805
            f.close()
 
2806
        if contents != expect:
 
2807
            self.log("expected: %r" % expect)
 
2808
            self.log("actually: %r" % contents)
 
2809
            self.fail("contents of %s not as expected" % filename)
 
2810
 
 
2811
    def _getTestDirPrefix(self):
 
2812
        # create a directory within the top level test directory
 
2813
        if sys.platform in ('win32', 'cygwin'):
 
2814
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2815
            # windows is likely to have path-length limits so use a short name
 
2816
            name_prefix = name_prefix[-30:]
 
2817
        else:
 
2818
            name_prefix = re.sub('[/]', '_', self.id())
 
2819
        return name_prefix
 
2820
 
 
2821
    def makeAndChdirToTestDir(self):
 
2822
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2823
 
 
2824
        For TestCaseInTempDir we create a temporary directory based on the test
 
2825
        name and then create two subdirs - test and home under it.
 
2826
        """
 
2827
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2828
            self._getTestDirPrefix())
 
2829
        name = name_prefix
 
2830
        for i in range(100):
 
2831
            if os.path.exists(name):
 
2832
                name = name_prefix + '_' + str(i)
 
2833
            else:
 
2834
                # now create test and home directories within this dir
 
2835
                self.test_base_dir = name
 
2836
                self.addCleanup(self.deleteTestDir)
 
2837
                os.mkdir(self.test_base_dir)
 
2838
                break
 
2839
        self.permit_dir(self.test_base_dir)
 
2840
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2841
        # stacking policy to honour; create a bzr dir with an unshared
 
2842
        # repository (but not a branch - our code would be trying to escape
 
2843
        # then!) to stop them, and permit it to be read.
 
2844
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2845
        # control.create_repository()
 
2846
        self.test_home_dir = self.test_base_dir + '/home'
 
2847
        os.mkdir(self.test_home_dir)
 
2848
        self.test_dir = self.test_base_dir + '/work'
 
2849
        os.mkdir(self.test_dir)
 
2850
        os.chdir(self.test_dir)
 
2851
        # put name of test inside
 
2852
        f = file(self.test_base_dir + '/name', 'w')
 
2853
        try:
 
2854
            f.write(self.id())
 
2855
        finally:
 
2856
            f.close()
 
2857
 
 
2858
    def deleteTestDir(self):
 
2859
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2860
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2861
 
 
2862
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2863
        """Build a test tree according to a pattern.
 
2864
 
 
2865
        shape is a sequence of file specifications.  If the final
 
2866
        character is '/', a directory is created.
 
2867
 
 
2868
        This assumes that all the elements in the tree being built are new.
 
2869
 
 
2870
        This doesn't add anything to a branch.
 
2871
 
 
2872
        :type shape:    list or tuple.
 
2873
        :param line_endings: Either 'binary' or 'native'
 
2874
            in binary mode, exact contents are written in native mode, the
 
2875
            line endings match the default platform endings.
 
2876
        :param transport: A transport to write to, for building trees on VFS's.
 
2877
            If the transport is readonly or None, "." is opened automatically.
 
2878
        :return: None
 
2879
        """
 
2880
        if type(shape) not in (list, tuple):
 
2881
            raise AssertionError("Parameter 'shape' should be "
 
2882
                "a list or a tuple. Got %r instead" % (shape,))
 
2883
        # It's OK to just create them using forward slashes on windows.
 
2884
        if transport is None or transport.is_readonly():
 
2885
            transport = _mod_transport.get_transport_from_path(".")
 
2886
        for name in shape:
 
2887
            self.assertIsInstance(name, basestring)
 
2888
            if name[-1] == '/':
 
2889
                transport.mkdir(urlutils.escape(name[:-1]))
 
2890
            else:
 
2891
                if line_endings == 'binary':
 
2892
                    end = '\n'
 
2893
                elif line_endings == 'native':
 
2894
                    end = os.linesep
 
2895
                else:
 
2896
                    raise errors.BzrError(
 
2897
                        'Invalid line ending request %r' % line_endings)
 
2898
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2899
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2900
 
 
2901
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2902
 
 
2903
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2904
        """Assert whether path or paths are in the WorkingTree"""
 
2905
        if tree is None:
 
2906
            tree = workingtree.WorkingTree.open(root_path)
 
2907
        if not isinstance(path, basestring):
 
2908
            for p in path:
 
2909
                self.assertInWorkingTree(p, tree=tree)
 
2910
        else:
 
2911
            self.assertIsNot(tree.path2id(path), None,
 
2912
                path+' not in working tree.')
 
2913
 
 
2914
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2915
        """Assert whether path or paths are not in the WorkingTree"""
 
2916
        if tree is None:
 
2917
            tree = workingtree.WorkingTree.open(root_path)
 
2918
        if not isinstance(path, basestring):
 
2919
            for p in path:
 
2920
                self.assertNotInWorkingTree(p,tree=tree)
 
2921
        else:
 
2922
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2923
 
 
2924
 
 
2925
class TestCaseWithTransport(TestCaseInTempDir):
 
2926
    """A test case that provides get_url and get_readonly_url facilities.
 
2927
 
 
2928
    These back onto two transport servers, one for readonly access and one for
 
2929
    read write access.
 
2930
 
 
2931
    If no explicit class is provided for readonly access, a
 
2932
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2933
    based read write transports.
 
2934
 
 
2935
    If an explicit class is provided for readonly access, that server and the
 
2936
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2937
    """
 
2938
 
 
2939
    def get_vfs_only_server(self):
 
2940
        """See TestCaseWithMemoryTransport.
 
2941
 
 
2942
        This is useful for some tests with specific servers that need
 
2943
        diagnostics.
 
2944
        """
 
2945
        if self.__vfs_server is None:
 
2946
            self.__vfs_server = self.vfs_transport_factory()
 
2947
            self.start_server(self.__vfs_server)
 
2948
        return self.__vfs_server
 
2949
 
 
2950
    def make_branch_and_tree(self, relpath, format=None):
 
2951
        """Create a branch on the transport and a tree locally.
 
2952
 
 
2953
        If the transport is not a LocalTransport, the Tree can't be created on
 
2954
        the transport.  In that case if the vfs_transport_factory is
 
2955
        LocalURLServer the working tree is created in the local
 
2956
        directory backing the transport, and the returned tree's branch and
 
2957
        repository will also be accessed locally. Otherwise a lightweight
 
2958
        checkout is created and returned.
 
2959
 
 
2960
        We do this because we can't physically create a tree in the local
 
2961
        path, with a branch reference to the transport_factory url, and
 
2962
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2963
        namespace is distinct from the local disk - the two branch objects
 
2964
        would collide. While we could construct a tree with its branch object
 
2965
        pointing at the transport_factory transport in memory, reopening it
 
2966
        would behaving unexpectedly, and has in the past caused testing bugs
 
2967
        when we tried to do it that way.
 
2968
 
 
2969
        :param format: The BzrDirFormat.
 
2970
        :returns: the WorkingTree.
 
2971
        """
 
2972
        # TODO: always use the local disk path for the working tree,
 
2973
        # this obviously requires a format that supports branch references
 
2974
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2975
        # RBC 20060208
 
2976
        format = self.resolve_format(format=format)
 
2977
        if not format.supports_workingtrees:
 
2978
            b = self.make_branch(relpath+'.branch', format=format)
 
2979
            return b.create_checkout(relpath, lightweight=True)
 
2980
        b = self.make_branch(relpath, format=format)
 
2981
        try:
 
2982
            return b.bzrdir.create_workingtree()
 
2983
        except errors.NotLocalUrl:
 
2984
            # We can only make working trees locally at the moment.  If the
 
2985
            # transport can't support them, then we keep the non-disk-backed
 
2986
            # branch and create a local checkout.
 
2987
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2988
                # the branch is colocated on disk, we cannot create a checkout.
 
2989
                # hopefully callers will expect this.
 
2990
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2991
                wt = local_controldir.create_workingtree()
 
2992
                if wt.branch._format != b._format:
 
2993
                    wt._branch = b
 
2994
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2995
                    # in case the implementation details of workingtree objects
 
2996
                    # change.
 
2997
                    self.assertIs(b, wt.branch)
 
2998
                return wt
 
2999
            else:
 
3000
                return b.create_checkout(relpath, lightweight=True)
 
3001
 
 
3002
    def assertIsDirectory(self, relpath, transport):
 
3003
        """Assert that relpath within transport is a directory.
 
3004
 
 
3005
        This may not be possible on all transports; in that case it propagates
 
3006
        a TransportNotPossible.
 
3007
        """
 
3008
        try:
 
3009
            mode = transport.stat(relpath).st_mode
 
3010
        except errors.NoSuchFile:
 
3011
            self.fail("path %s is not a directory; no such file"
 
3012
                      % (relpath))
 
3013
        if not stat.S_ISDIR(mode):
 
3014
            self.fail("path %s is not a directory; has mode %#o"
 
3015
                      % (relpath, mode))
 
3016
 
 
3017
    def assertTreesEqual(self, left, right):
 
3018
        """Check that left and right have the same content and properties."""
 
3019
        # we use a tree delta to check for equality of the content, and we
 
3020
        # manually check for equality of other things such as the parents list.
 
3021
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
3022
        differences = left.changes_from(right)
 
3023
        self.assertFalse(differences.has_changed(),
 
3024
            "Trees %r and %r are different: %r" % (left, right, differences))
 
3025
 
 
3026
    def setUp(self):
 
3027
        super(TestCaseWithTransport, self).setUp()
 
3028
        self.__vfs_server = None
 
3029
 
 
3030
    def disable_missing_extensions_warning(self):
 
3031
        """Some tests expect a precise stderr content.
 
3032
 
 
3033
        There is no point in forcing them to duplicate the extension related
 
3034
        warning.
 
3035
        """
 
3036
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3037
 
 
3038
 
 
3039
class ChrootedTestCase(TestCaseWithTransport):
 
3040
    """A support class that provides readonly urls outside the local namespace.
 
3041
 
 
3042
    This is done by checking if self.transport_server is a MemoryServer. if it
 
3043
    is then we are chrooted already, if it is not then an HttpServer is used
 
3044
    for readonly urls.
 
3045
 
 
3046
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
3047
                       be used without needed to redo it when a different
 
3048
                       subclass is in use ?
 
3049
    """
 
3050
 
 
3051
    def setUp(self):
 
3052
        from bzrlib.tests import http_server
 
3053
        super(ChrootedTestCase, self).setUp()
 
3054
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3055
            self.transport_readonly_server = http_server.HttpServer
 
3056
 
 
3057
 
 
3058
def condition_id_re(pattern):
 
3059
    """Create a condition filter which performs a re check on a test's id.
 
3060
 
 
3061
    :param pattern: A regular expression string.
 
3062
    :return: A callable that returns True if the re matches.
 
3063
    """
 
3064
    filter_re = re.compile(pattern, 0)
 
3065
    def condition(test):
 
3066
        test_id = test.id()
 
3067
        return filter_re.search(test_id)
 
3068
    return condition
 
3069
 
 
3070
 
 
3071
def condition_isinstance(klass_or_klass_list):
 
3072
    """Create a condition filter which returns isinstance(param, klass).
 
3073
 
 
3074
    :return: A callable which when called with one parameter obj return the
 
3075
        result of isinstance(obj, klass_or_klass_list).
 
3076
    """
 
3077
    def condition(obj):
 
3078
        return isinstance(obj, klass_or_klass_list)
 
3079
    return condition
 
3080
 
 
3081
 
 
3082
def condition_id_in_list(id_list):
 
3083
    """Create a condition filter which verify that test's id in a list.
 
3084
 
 
3085
    :param id_list: A TestIdList object.
 
3086
    :return: A callable that returns True if the test's id appears in the list.
 
3087
    """
 
3088
    def condition(test):
 
3089
        return id_list.includes(test.id())
 
3090
    return condition
 
3091
 
 
3092
 
 
3093
def condition_id_startswith(starts):
 
3094
    """Create a condition filter verifying that test's id starts with a string.
 
3095
 
 
3096
    :param starts: A list of string.
 
3097
    :return: A callable that returns True if the test's id starts with one of
 
3098
        the given strings.
 
3099
    """
 
3100
    def condition(test):
 
3101
        for start in starts:
 
3102
            if test.id().startswith(start):
 
3103
                return True
 
3104
        return False
 
3105
    return condition
 
3106
 
 
3107
 
 
3108
def exclude_tests_by_condition(suite, condition):
 
3109
    """Create a test suite which excludes some tests from suite.
 
3110
 
 
3111
    :param suite: The suite to get tests from.
 
3112
    :param condition: A callable whose result evaluates True when called with a
 
3113
        test case which should be excluded from the result.
 
3114
    :return: A suite which contains the tests found in suite that fail
 
3115
        condition.
 
3116
    """
 
3117
    result = []
 
3118
    for test in iter_suite_tests(suite):
 
3119
        if not condition(test):
 
3120
            result.append(test)
 
3121
    return TestUtil.TestSuite(result)
 
3122
 
 
3123
 
 
3124
def filter_suite_by_condition(suite, condition):
 
3125
    """Create a test suite by filtering another one.
 
3126
 
 
3127
    :param suite: The source suite.
 
3128
    :param condition: A callable whose result evaluates True when called with a
 
3129
        test case which should be included in the result.
 
3130
    :return: A suite which contains the tests found in suite that pass
 
3131
        condition.
 
3132
    """
 
3133
    result = []
 
3134
    for test in iter_suite_tests(suite):
 
3135
        if condition(test):
 
3136
            result.append(test)
 
3137
    return TestUtil.TestSuite(result)
 
3138
 
 
3139
 
 
3140
def filter_suite_by_re(suite, pattern):
 
3141
    """Create a test suite by filtering another one.
 
3142
 
 
3143
    :param suite:           the source suite
 
3144
    :param pattern:         pattern that names must match
 
3145
    :returns: the newly created suite
 
3146
    """
 
3147
    condition = condition_id_re(pattern)
 
3148
    result_suite = filter_suite_by_condition(suite, condition)
 
3149
    return result_suite
 
3150
 
 
3151
 
 
3152
def filter_suite_by_id_list(suite, test_id_list):
 
3153
    """Create a test suite by filtering another one.
 
3154
 
 
3155
    :param suite: The source suite.
 
3156
    :param test_id_list: A list of the test ids to keep as strings.
 
3157
    :returns: the newly created suite
 
3158
    """
 
3159
    condition = condition_id_in_list(test_id_list)
 
3160
    result_suite = filter_suite_by_condition(suite, condition)
 
3161
    return result_suite
 
3162
 
 
3163
 
 
3164
def filter_suite_by_id_startswith(suite, start):
 
3165
    """Create a test suite by filtering another one.
 
3166
 
 
3167
    :param suite: The source suite.
 
3168
    :param start: A list of string the test id must start with one of.
 
3169
    :returns: the newly created suite
 
3170
    """
 
3171
    condition = condition_id_startswith(start)
 
3172
    result_suite = filter_suite_by_condition(suite, condition)
 
3173
    return result_suite
 
3174
 
 
3175
 
 
3176
def exclude_tests_by_re(suite, pattern):
 
3177
    """Create a test suite which excludes some tests from suite.
 
3178
 
 
3179
    :param suite: The suite to get tests from.
 
3180
    :param pattern: A regular expression string. Test ids that match this
 
3181
        pattern will be excluded from the result.
 
3182
    :return: A TestSuite that contains all the tests from suite without the
 
3183
        tests that matched pattern. The order of tests is the same as it was in
 
3184
        suite.
 
3185
    """
 
3186
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3187
 
 
3188
 
 
3189
def preserve_input(something):
 
3190
    """A helper for performing test suite transformation chains.
 
3191
 
 
3192
    :param something: Anything you want to preserve.
 
3193
    :return: Something.
 
3194
    """
 
3195
    return something
 
3196
 
 
3197
 
 
3198
def randomize_suite(suite):
 
3199
    """Return a new TestSuite with suite's tests in random order.
 
3200
 
 
3201
    The tests in the input suite are flattened into a single suite in order to
 
3202
    accomplish this. Any nested TestSuites are removed to provide global
 
3203
    randomness.
 
3204
    """
 
3205
    tests = list(iter_suite_tests(suite))
 
3206
    random.shuffle(tests)
 
3207
    return TestUtil.TestSuite(tests)
 
3208
 
 
3209
 
 
3210
def split_suite_by_condition(suite, condition):
 
3211
    """Split a test suite into two by a condition.
 
3212
 
 
3213
    :param suite: The suite to split.
 
3214
    :param condition: The condition to match on. Tests that match this
 
3215
        condition are returned in the first test suite, ones that do not match
 
3216
        are in the second suite.
 
3217
    :return: A tuple of two test suites, where the first contains tests from
 
3218
        suite matching the condition, and the second contains the remainder
 
3219
        from suite. The order within each output suite is the same as it was in
 
3220
        suite.
 
3221
    """
 
3222
    matched = []
 
3223
    did_not_match = []
 
3224
    for test in iter_suite_tests(suite):
 
3225
        if condition(test):
 
3226
            matched.append(test)
 
3227
        else:
 
3228
            did_not_match.append(test)
 
3229
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3230
 
 
3231
 
 
3232
def split_suite_by_re(suite, pattern):
 
3233
    """Split a test suite into two by a regular expression.
 
3234
 
 
3235
    :param suite: The suite to split.
 
3236
    :param pattern: A regular expression string. Test ids that match this
 
3237
        pattern will be in the first test suite returned, and the others in the
 
3238
        second test suite returned.
 
3239
    :return: A tuple of two test suites, where the first contains tests from
 
3240
        suite matching pattern, and the second contains the remainder from
 
3241
        suite. The order within each output suite is the same as it was in
 
3242
        suite.
 
3243
    """
 
3244
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
3245
 
 
3246
 
 
3247
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
3248
              stop_on_failure=False,
 
3249
              transport=None, lsprof_timed=None, bench_history=None,
 
3250
              matching_tests_first=None,
 
3251
              list_only=False,
 
3252
              random_seed=None,
 
3253
              exclude_pattern=None,
 
3254
              strict=False,
 
3255
              runner_class=None,
 
3256
              suite_decorators=None,
 
3257
              stream=None,
 
3258
              result_decorators=None,
 
3259
              ):
 
3260
    """Run a test suite for bzr selftest.
 
3261
 
 
3262
    :param runner_class: The class of runner to use. Must support the
 
3263
        constructor arguments passed by run_suite which are more than standard
 
3264
        python uses.
 
3265
    :return: A boolean indicating success.
 
3266
    """
 
3267
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
3268
    if verbose:
 
3269
        verbosity = 2
 
3270
    else:
 
3271
        verbosity = 1
 
3272
    if runner_class is None:
 
3273
        runner_class = TextTestRunner
 
3274
    if stream is None:
 
3275
        stream = sys.stdout
 
3276
    runner = runner_class(stream=stream,
 
3277
                            descriptions=0,
 
3278
                            verbosity=verbosity,
 
3279
                            bench_history=bench_history,
 
3280
                            strict=strict,
 
3281
                            result_decorators=result_decorators,
 
3282
                            )
 
3283
    runner.stop_on_failure=stop_on_failure
 
3284
    if isinstance(suite, unittest.TestSuite):
 
3285
        # Empty out _tests list of passed suite and populate new TestSuite
 
3286
        suite._tests[:], suite = [], TestSuite(suite)
 
3287
    # built in decorator factories:
 
3288
    decorators = [
 
3289
        random_order(random_seed, runner),
 
3290
        exclude_tests(exclude_pattern),
 
3291
        ]
 
3292
    if matching_tests_first:
 
3293
        decorators.append(tests_first(pattern))
 
3294
    else:
 
3295
        decorators.append(filter_tests(pattern))
 
3296
    if suite_decorators:
 
3297
        decorators.extend(suite_decorators)
 
3298
    # tell the result object how many tests will be running: (except if
 
3299
    # --parallel=fork is being used. Robert said he will provide a better
 
3300
    # progress design later -- vila 20090817)
 
3301
    if fork_decorator not in decorators:
 
3302
        decorators.append(CountingDecorator)
 
3303
    for decorator in decorators:
 
3304
        suite = decorator(suite)
 
3305
    if list_only:
 
3306
        # Done after test suite decoration to allow randomisation etc
 
3307
        # to take effect, though that is of marginal benefit.
 
3308
        if verbosity >= 2:
 
3309
            stream.write("Listing tests only ...\n")
 
3310
        for t in iter_suite_tests(suite):
 
3311
            stream.write("%s\n" % (t.id()))
 
3312
        return True
 
3313
    result = runner.run(suite)
 
3314
    if strict:
 
3315
        return result.wasStrictlySuccessful()
 
3316
    else:
 
3317
        return result.wasSuccessful()
 
3318
 
 
3319
 
 
3320
# A registry where get() returns a suite decorator.
 
3321
parallel_registry = registry.Registry()
 
3322
 
 
3323
 
 
3324
def fork_decorator(suite):
 
3325
    if getattr(os, "fork", None) is None:
 
3326
        raise errors.BzrCommandError("platform does not support fork,"
 
3327
            " try --parallel=subprocess instead.")
 
3328
    concurrency = osutils.local_concurrency()
 
3329
    if concurrency == 1:
 
3330
        return suite
 
3331
    from testtools import ConcurrentTestSuite
 
3332
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3333
parallel_registry.register('fork', fork_decorator)
 
3334
 
 
3335
 
 
3336
def subprocess_decorator(suite):
 
3337
    concurrency = osutils.local_concurrency()
 
3338
    if concurrency == 1:
 
3339
        return suite
 
3340
    from testtools import ConcurrentTestSuite
 
3341
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3342
parallel_registry.register('subprocess', subprocess_decorator)
 
3343
 
 
3344
 
 
3345
def exclude_tests(exclude_pattern):
 
3346
    """Return a test suite decorator that excludes tests."""
 
3347
    if exclude_pattern is None:
 
3348
        return identity_decorator
 
3349
    def decorator(suite):
 
3350
        return ExcludeDecorator(suite, exclude_pattern)
 
3351
    return decorator
 
3352
 
 
3353
 
 
3354
def filter_tests(pattern):
 
3355
    if pattern == '.*':
 
3356
        return identity_decorator
 
3357
    def decorator(suite):
 
3358
        return FilterTestsDecorator(suite, pattern)
 
3359
    return decorator
 
3360
 
 
3361
 
 
3362
def random_order(random_seed, runner):
 
3363
    """Return a test suite decorator factory for randomising tests order.
 
3364
    
 
3365
    :param random_seed: now, a string which casts to a long, or a long.
 
3366
    :param runner: A test runner with a stream attribute to report on.
 
3367
    """
 
3368
    if random_seed is None:
 
3369
        return identity_decorator
 
3370
    def decorator(suite):
 
3371
        return RandomDecorator(suite, random_seed, runner.stream)
 
3372
    return decorator
 
3373
 
 
3374
 
 
3375
def tests_first(pattern):
 
3376
    if pattern == '.*':
 
3377
        return identity_decorator
 
3378
    def decorator(suite):
 
3379
        return TestFirstDecorator(suite, pattern)
 
3380
    return decorator
 
3381
 
 
3382
 
 
3383
def identity_decorator(suite):
 
3384
    """Return suite."""
 
3385
    return suite
 
3386
 
 
3387
 
 
3388
class TestDecorator(TestUtil.TestSuite):
 
3389
    """A decorator for TestCase/TestSuite objects.
 
3390
 
 
3391
    Contains rather than flattening suite passed on construction
 
3392
    """
 
3393
 
 
3394
    def __init__(self, suite=None):
 
3395
        super(TestDecorator, self).__init__()
 
3396
        if suite is not None:
 
3397
            self.addTest(suite)
 
3398
 
 
3399
    # Don't need subclass run method with suite emptying
 
3400
    run = unittest.TestSuite.run
 
3401
 
 
3402
 
 
3403
class CountingDecorator(TestDecorator):
 
3404
    """A decorator which calls result.progress(self.countTestCases)."""
 
3405
 
 
3406
    def run(self, result):
 
3407
        progress_method = getattr(result, 'progress', None)
 
3408
        if callable(progress_method):
 
3409
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3410
        return super(CountingDecorator, self).run(result)
 
3411
 
 
3412
 
 
3413
class ExcludeDecorator(TestDecorator):
 
3414
    """A decorator which excludes test matching an exclude pattern."""
 
3415
 
 
3416
    def __init__(self, suite, exclude_pattern):
 
3417
        super(ExcludeDecorator, self).__init__(
 
3418
            exclude_tests_by_re(suite, exclude_pattern))
 
3419
 
 
3420
 
 
3421
class FilterTestsDecorator(TestDecorator):
 
3422
    """A decorator which filters tests to those matching a pattern."""
 
3423
 
 
3424
    def __init__(self, suite, pattern):
 
3425
        super(FilterTestsDecorator, self).__init__(
 
3426
            filter_suite_by_re(suite, pattern))
 
3427
 
 
3428
 
 
3429
class RandomDecorator(TestDecorator):
 
3430
    """A decorator which randomises the order of its tests."""
 
3431
 
 
3432
    def __init__(self, suite, random_seed, stream):
 
3433
        random_seed = self.actual_seed(random_seed)
 
3434
        stream.write("Randomizing test order using seed %s\n\n" %
 
3435
            (random_seed,))
 
3436
        # Initialise the random number generator.
 
3437
        random.seed(random_seed)
 
3438
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3439
 
 
3440
    @staticmethod
 
3441
    def actual_seed(seed):
 
3442
        if seed == "now":
 
3443
            # We convert the seed to a long to make it reuseable across
 
3444
            # invocations (because the user can reenter it).
 
3445
            return long(time.time())
 
3446
        else:
 
3447
            # Convert the seed to a long if we can
 
3448
            try:
 
3449
                return long(seed)
 
3450
            except (TypeError, ValueError):
 
3451
                pass
 
3452
        return seed
 
3453
 
 
3454
 
 
3455
class TestFirstDecorator(TestDecorator):
 
3456
    """A decorator which moves named tests to the front."""
 
3457
 
 
3458
    def __init__(self, suite, pattern):
 
3459
        super(TestFirstDecorator, self).__init__()
 
3460
        self.addTests(split_suite_by_re(suite, pattern))
 
3461
 
 
3462
 
 
3463
def partition_tests(suite, count):
 
3464
    """Partition suite into count lists of tests."""
 
3465
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3466
    # splits up blocks of related tests that might run faster if they shared
 
3467
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3468
    # just one partition.  So the slowest partition shouldn't be much slower
 
3469
    # than the fastest.
 
3470
    partitions = [list() for i in range(count)]
 
3471
    tests = iter_suite_tests(suite)
 
3472
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
 
3473
        partition.append(test)
 
3474
    return partitions
 
3475
 
 
3476
 
 
3477
def workaround_zealous_crypto_random():
 
3478
    """Crypto.Random want to help us being secure, but we don't care here.
 
3479
 
 
3480
    This workaround some test failure related to the sftp server. Once paramiko
 
3481
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3482
    """
 
3483
    try:
 
3484
        from Crypto.Random import atfork
 
3485
        atfork()
 
3486
    except ImportError:
 
3487
        pass
 
3488
 
 
3489
 
 
3490
def fork_for_tests(suite):
 
3491
    """Take suite and start up one runner per CPU by forking()
 
3492
 
 
3493
    :return: An iterable of TestCase-like objects which can each have
 
3494
        run(result) called on them to feed tests to result.
 
3495
    """
 
3496
    concurrency = osutils.local_concurrency()
 
3497
    result = []
 
3498
    from subunit import ProtocolTestCase
 
3499
    from subunit.test_results import AutoTimingTestResultDecorator
 
3500
    class TestInOtherProcess(ProtocolTestCase):
 
3501
        # Should be in subunit, I think. RBC.
 
3502
        def __init__(self, stream, pid):
 
3503
            ProtocolTestCase.__init__(self, stream)
 
3504
            self.pid = pid
 
3505
 
 
3506
        def run(self, result):
 
3507
            try:
 
3508
                ProtocolTestCase.run(self, result)
 
3509
            finally:
 
3510
                pid, status = os.waitpid(self.pid, 0)
 
3511
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3512
            #                that something went wrong.
 
3513
 
 
3514
    test_blocks = partition_tests(suite, concurrency)
 
3515
    # Clear the tests from the original suite so it doesn't keep them alive
 
3516
    suite._tests[:] = []
 
3517
    for process_tests in test_blocks:
 
3518
        process_suite = TestUtil.TestSuite(process_tests)
 
3519
        # Also clear each split list so new suite has only reference
 
3520
        process_tests[:] = []
 
3521
        c2pread, c2pwrite = os.pipe()
 
3522
        pid = os.fork()
 
3523
        if pid == 0:
 
3524
            try:
 
3525
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3526
                workaround_zealous_crypto_random()
 
3527
                os.close(c2pread)
 
3528
                # Leave stderr and stdout open so we can see test noise
 
3529
                # Close stdin so that the child goes away if it decides to
 
3530
                # read from stdin (otherwise its a roulette to see what
 
3531
                # child actually gets keystrokes for pdb etc).
 
3532
                sys.stdin.close()
 
3533
                subunit_result = AutoTimingTestResultDecorator(
 
3534
                    SubUnitBzrProtocolClient(stream))
 
3535
                process_suite.run(subunit_result)
 
3536
            except:
 
3537
                # Try and report traceback on stream, but exit with error even
 
3538
                # if stream couldn't be created or something else goes wrong.
 
3539
                # The traceback is formatted to a string and written in one go
 
3540
                # to avoid interleaving lines from multiple failing children.
 
3541
                try:
 
3542
                    stream.write(traceback.format_exc())
 
3543
                finally:
 
3544
                    os._exit(1)
 
3545
            os._exit(0)
 
3546
        else:
 
3547
            os.close(c2pwrite)
 
3548
            stream = os.fdopen(c2pread, 'rb', 1)
 
3549
            test = TestInOtherProcess(stream, pid)
 
3550
            result.append(test)
 
3551
    return result
 
3552
 
 
3553
 
 
3554
def reinvoke_for_tests(suite):
 
3555
    """Take suite and start up one runner per CPU using subprocess().
 
3556
 
 
3557
    :return: An iterable of TestCase-like objects which can each have
 
3558
        run(result) called on them to feed tests to result.
 
3559
    """
 
3560
    concurrency = osutils.local_concurrency()
 
3561
    result = []
 
3562
    from subunit import ProtocolTestCase
 
3563
    class TestInSubprocess(ProtocolTestCase):
 
3564
        def __init__(self, process, name):
 
3565
            ProtocolTestCase.__init__(self, process.stdout)
 
3566
            self.process = process
 
3567
            self.process.stdin.close()
 
3568
            self.name = name
 
3569
 
 
3570
        def run(self, result):
 
3571
            try:
 
3572
                ProtocolTestCase.run(self, result)
 
3573
            finally:
 
3574
                self.process.wait()
 
3575
                os.unlink(self.name)
 
3576
            # print "pid %d finished" % finished_process
 
3577
    test_blocks = partition_tests(suite, concurrency)
 
3578
    for process_tests in test_blocks:
 
3579
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3580
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3581
        if not os.path.isfile(bzr_path):
 
3582
            # We are probably installed. Assume sys.argv is the right file
 
3583
            bzr_path = sys.argv[0]
 
3584
        bzr_path = [bzr_path]
 
3585
        if sys.platform == "win32":
 
3586
            # if we're on windows, we can't execute the bzr script directly
 
3587
            bzr_path = [sys.executable] + bzr_path
 
3588
        fd, test_list_file_name = tempfile.mkstemp()
 
3589
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3590
        for test in process_tests:
 
3591
            test_list_file.write(test.id() + '\n')
 
3592
        test_list_file.close()
 
3593
        try:
 
3594
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3595
                '--subunit']
 
3596
            if '--no-plugins' in sys.argv:
 
3597
                argv.append('--no-plugins')
 
3598
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3599
            # noise on stderr it can interrupt the subunit protocol.
 
3600
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3601
                                      stdout=subprocess.PIPE,
 
3602
                                      stderr=subprocess.PIPE,
 
3603
                                      bufsize=1)
 
3604
            test = TestInSubprocess(process, test_list_file_name)
 
3605
            result.append(test)
 
3606
        except:
 
3607
            os.unlink(test_list_file_name)
 
3608
            raise
 
3609
    return result
 
3610
 
 
3611
 
 
3612
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3613
    """Generate profiling data for all activity between start and success.
 
3614
    
 
3615
    The profile data is appended to the test's _benchcalls attribute and can
 
3616
    be accessed by the forwarded-to TestResult.
 
3617
 
 
3618
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3619
    where our existing output support for lsprof is, and this class aims to
 
3620
    fit in with that: while it could be moved it's not necessary to accomplish
 
3621
    test profiling, nor would it be dramatically cleaner.
 
3622
    """
 
3623
 
 
3624
    def startTest(self, test):
 
3625
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3626
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3627
        # unavoidably fail.
 
3628
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
 
3629
        self.profiler.start()
 
3630
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3631
 
 
3632
    def addSuccess(self, test):
 
3633
        stats = self.profiler.stop()
 
3634
        try:
 
3635
            calls = test._benchcalls
 
3636
        except AttributeError:
 
3637
            test._benchcalls = []
 
3638
            calls = test._benchcalls
 
3639
        calls.append(((test.id(), "", ""), stats))
 
3640
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3641
 
 
3642
    def stopTest(self, test):
 
3643
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3644
        self.profiler = None
 
3645
 
 
3646
 
 
3647
# Controlled by "bzr selftest -E=..." option
 
3648
# Currently supported:
 
3649
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3650
#                           preserves any flags supplied at the command line.
 
3651
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3652
#                           rather than failing tests. And no longer raise
 
3653
#                           LockContention when fctnl locks are not being used
 
3654
#                           with proper exclusion rules.
 
3655
#   -Ethreads               Will display thread ident at creation/join time to
 
3656
#                           help track thread leaks
 
3657
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3658
#                           deallocated after being completed.
 
3659
#   -Econfig_stats          Will collect statistics using addDetail
 
3660
selftest_debug_flags = set()
 
3661
 
 
3662
 
 
3663
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3664
             transport=None,
 
3665
             test_suite_factory=None,
 
3666
             lsprof_timed=None,
 
3667
             bench_history=None,
 
3668
             matching_tests_first=None,
 
3669
             list_only=False,
 
3670
             random_seed=None,
 
3671
             exclude_pattern=None,
 
3672
             strict=False,
 
3673
             load_list=None,
 
3674
             debug_flags=None,
 
3675
             starting_with=None,
 
3676
             runner_class=None,
 
3677
             suite_decorators=None,
 
3678
             stream=None,
 
3679
             lsprof_tests=False,
 
3680
             ):
 
3681
    """Run the whole test suite under the enhanced runner"""
 
3682
    # XXX: Very ugly way to do this...
 
3683
    # Disable warning about old formats because we don't want it to disturb
 
3684
    # any blackbox tests.
 
3685
    from bzrlib import repository
 
3686
    repository._deprecation_warning_done = True
 
3687
 
 
3688
    global default_transport
 
3689
    if transport is None:
 
3690
        transport = default_transport
 
3691
    old_transport = default_transport
 
3692
    default_transport = transport
 
3693
    global selftest_debug_flags
 
3694
    old_debug_flags = selftest_debug_flags
 
3695
    if debug_flags is not None:
 
3696
        selftest_debug_flags = set(debug_flags)
 
3697
    try:
 
3698
        if load_list is None:
 
3699
            keep_only = None
 
3700
        else:
 
3701
            keep_only = load_test_id_list(load_list)
 
3702
        if starting_with:
 
3703
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3704
                             for start in starting_with]
 
3705
        if test_suite_factory is None:
 
3706
            # Reduce loading time by loading modules based on the starting_with
 
3707
            # patterns.
 
3708
            suite = test_suite(keep_only, starting_with)
 
3709
        else:
 
3710
            suite = test_suite_factory()
 
3711
        if starting_with:
 
3712
            # But always filter as requested.
 
3713
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3714
        result_decorators = []
 
3715
        if lsprof_tests:
 
3716
            result_decorators.append(ProfileResult)
 
3717
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3718
                     stop_on_failure=stop_on_failure,
 
3719
                     transport=transport,
 
3720
                     lsprof_timed=lsprof_timed,
 
3721
                     bench_history=bench_history,
 
3722
                     matching_tests_first=matching_tests_first,
 
3723
                     list_only=list_only,
 
3724
                     random_seed=random_seed,
 
3725
                     exclude_pattern=exclude_pattern,
 
3726
                     strict=strict,
 
3727
                     runner_class=runner_class,
 
3728
                     suite_decorators=suite_decorators,
 
3729
                     stream=stream,
 
3730
                     result_decorators=result_decorators,
 
3731
                     )
 
3732
    finally:
 
3733
        default_transport = old_transport
 
3734
        selftest_debug_flags = old_debug_flags
 
3735
 
 
3736
 
 
3737
def load_test_id_list(file_name):
 
3738
    """Load a test id list from a text file.
 
3739
 
 
3740
    The format is one test id by line.  No special care is taken to impose
 
3741
    strict rules, these test ids are used to filter the test suite so a test id
 
3742
    that do not match an existing test will do no harm. This allows user to add
 
3743
    comments, leave blank lines, etc.
 
3744
    """
 
3745
    test_list = []
 
3746
    try:
 
3747
        ftest = open(file_name, 'rt')
 
3748
    except IOError, e:
 
3749
        if e.errno != errno.ENOENT:
 
3750
            raise
 
3751
        else:
 
3752
            raise errors.NoSuchFile(file_name)
 
3753
 
 
3754
    for test_name in ftest.readlines():
 
3755
        test_list.append(test_name.strip())
 
3756
    ftest.close()
 
3757
    return test_list
 
3758
 
 
3759
 
 
3760
def suite_matches_id_list(test_suite, id_list):
 
3761
    """Warns about tests not appearing or appearing more than once.
 
3762
 
 
3763
    :param test_suite: A TestSuite object.
 
3764
    :param test_id_list: The list of test ids that should be found in
 
3765
         test_suite.
 
3766
 
 
3767
    :return: (absents, duplicates) absents is a list containing the test found
 
3768
        in id_list but not in test_suite, duplicates is a list containing the
 
3769
        test found multiple times in test_suite.
 
3770
 
 
3771
    When using a prefined test id list, it may occurs that some tests do not
 
3772
    exist anymore or that some tests use the same id. This function warns the
 
3773
    tester about potential problems in his workflow (test lists are volatile)
 
3774
    or in the test suite itself (using the same id for several tests does not
 
3775
    help to localize defects).
 
3776
    """
 
3777
    # Build a dict counting id occurrences
 
3778
    tests = dict()
 
3779
    for test in iter_suite_tests(test_suite):
 
3780
        id = test.id()
 
3781
        tests[id] = tests.get(id, 0) + 1
 
3782
 
 
3783
    not_found = []
 
3784
    duplicates = []
 
3785
    for id in id_list:
 
3786
        occurs = tests.get(id, 0)
 
3787
        if not occurs:
 
3788
            not_found.append(id)
 
3789
        elif occurs > 1:
 
3790
            duplicates.append(id)
 
3791
 
 
3792
    return not_found, duplicates
 
3793
 
 
3794
 
 
3795
class TestIdList(object):
 
3796
    """Test id list to filter a test suite.
 
3797
 
 
3798
    Relying on the assumption that test ids are built as:
 
3799
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3800
    notation, this class offers methods to :
 
3801
    - avoid building a test suite for modules not refered to in the test list,
 
3802
    - keep only the tests listed from the module test suite.
 
3803
    """
 
3804
 
 
3805
    def __init__(self, test_id_list):
 
3806
        # When a test suite needs to be filtered against us we compare test ids
 
3807
        # for equality, so a simple dict offers a quick and simple solution.
 
3808
        self.tests = dict().fromkeys(test_id_list, True)
 
3809
 
 
3810
        # While unittest.TestCase have ids like:
 
3811
        # <module>.<class>.<method>[(<param+)],
 
3812
        # doctest.DocTestCase can have ids like:
 
3813
        # <module>
 
3814
        # <module>.<class>
 
3815
        # <module>.<function>
 
3816
        # <module>.<class>.<method>
 
3817
 
 
3818
        # Since we can't predict a test class from its name only, we settle on
 
3819
        # a simple constraint: a test id always begins with its module name.
 
3820
 
 
3821
        modules = {}
 
3822
        for test_id in test_id_list:
 
3823
            parts = test_id.split('.')
 
3824
            mod_name = parts.pop(0)
 
3825
            modules[mod_name] = True
 
3826
            for part in parts:
 
3827
                mod_name += '.' + part
 
3828
                modules[mod_name] = True
 
3829
        self.modules = modules
 
3830
 
 
3831
    def refers_to(self, module_name):
 
3832
        """Is there tests for the module or one of its sub modules."""
 
3833
        return self.modules.has_key(module_name)
 
3834
 
 
3835
    def includes(self, test_id):
 
3836
        return self.tests.has_key(test_id)
 
3837
 
 
3838
 
 
3839
class TestPrefixAliasRegistry(registry.Registry):
 
3840
    """A registry for test prefix aliases.
 
3841
 
 
3842
    This helps implement shorcuts for the --starting-with selftest
 
3843
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3844
    warning will be emitted).
 
3845
    """
 
3846
 
 
3847
    def register(self, key, obj, help=None, info=None,
 
3848
                 override_existing=False):
 
3849
        """See Registry.register.
 
3850
 
 
3851
        Trying to override an existing alias causes a warning to be emitted,
 
3852
        not a fatal execption.
 
3853
        """
 
3854
        try:
 
3855
            super(TestPrefixAliasRegistry, self).register(
 
3856
                key, obj, help=help, info=info, override_existing=False)
 
3857
        except KeyError:
 
3858
            actual = self.get(key)
 
3859
            trace.note(
 
3860
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3861
                % (key, actual, obj))
 
3862
 
 
3863
    def resolve_alias(self, id_start):
 
3864
        """Replace the alias by the prefix in the given string.
 
3865
 
 
3866
        Using an unknown prefix is an error to help catching typos.
 
3867
        """
 
3868
        parts = id_start.split('.')
 
3869
        try:
 
3870
            parts[0] = self.get(parts[0])
 
3871
        except KeyError:
 
3872
            raise errors.BzrCommandError(
 
3873
                '%s is not a known test prefix alias' % parts[0])
 
3874
        return '.'.join(parts)
 
3875
 
 
3876
 
 
3877
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3878
"""Registry of test prefix aliases."""
 
3879
 
 
3880
 
 
3881
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3882
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3883
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3884
 
 
3885
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3886
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3887
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3888
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3889
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3890
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3891
 
 
3892
 
 
3893
def _test_suite_testmod_names():
 
3894
    """Return the standard list of test module names to test."""
 
3895
    return [
 
3896
        'bzrlib.doc',
 
3897
        'bzrlib.tests.blackbox',
 
3898
        'bzrlib.tests.commands',
 
3899
        'bzrlib.tests.doc_generate',
 
3900
        'bzrlib.tests.per_branch',
 
3901
        'bzrlib.tests.per_bzrdir',
 
3902
        'bzrlib.tests.per_controldir',
 
3903
        'bzrlib.tests.per_controldir_colo',
 
3904
        'bzrlib.tests.per_foreign_vcs',
 
3905
        'bzrlib.tests.per_interrepository',
 
3906
        'bzrlib.tests.per_intertree',
 
3907
        'bzrlib.tests.per_inventory',
 
3908
        'bzrlib.tests.per_interbranch',
 
3909
        'bzrlib.tests.per_lock',
 
3910
        'bzrlib.tests.per_merger',
 
3911
        'bzrlib.tests.per_transport',
 
3912
        'bzrlib.tests.per_tree',
 
3913
        'bzrlib.tests.per_pack_repository',
 
3914
        'bzrlib.tests.per_repository',
 
3915
        'bzrlib.tests.per_repository_chk',
 
3916
        'bzrlib.tests.per_repository_reference',
 
3917
        'bzrlib.tests.per_repository_vf',
 
3918
        'bzrlib.tests.per_uifactory',
 
3919
        'bzrlib.tests.per_versionedfile',
 
3920
        'bzrlib.tests.per_workingtree',
 
3921
        'bzrlib.tests.test__annotator',
 
3922
        'bzrlib.tests.test__bencode',
 
3923
        'bzrlib.tests.test__btree_serializer',
 
3924
        'bzrlib.tests.test__chk_map',
 
3925
        'bzrlib.tests.test__dirstate_helpers',
 
3926
        'bzrlib.tests.test__groupcompress',
 
3927
        'bzrlib.tests.test__known_graph',
 
3928
        'bzrlib.tests.test__rio',
 
3929
        'bzrlib.tests.test__simple_set',
 
3930
        'bzrlib.tests.test__static_tuple',
 
3931
        'bzrlib.tests.test__walkdirs_win32',
 
3932
        'bzrlib.tests.test_ancestry',
 
3933
        'bzrlib.tests.test_annotate',
 
3934
        'bzrlib.tests.test_api',
 
3935
        'bzrlib.tests.test_atomicfile',
 
3936
        'bzrlib.tests.test_bad_files',
 
3937
        'bzrlib.tests.test_bisect_multi',
 
3938
        'bzrlib.tests.test_branch',
 
3939
        'bzrlib.tests.test_branchbuilder',
 
3940
        'bzrlib.tests.test_btree_index',
 
3941
        'bzrlib.tests.test_bugtracker',
 
3942
        'bzrlib.tests.test_bundle',
 
3943
        'bzrlib.tests.test_bzrdir',
 
3944
        'bzrlib.tests.test__chunks_to_lines',
 
3945
        'bzrlib.tests.test_cache_utf8',
 
3946
        'bzrlib.tests.test_chk_map',
 
3947
        'bzrlib.tests.test_chk_serializer',
 
3948
        'bzrlib.tests.test_chunk_writer',
 
3949
        'bzrlib.tests.test_clean_tree',
 
3950
        'bzrlib.tests.test_cleanup',
 
3951
        'bzrlib.tests.test_cmdline',
 
3952
        'bzrlib.tests.test_commands',
 
3953
        'bzrlib.tests.test_commit',
 
3954
        'bzrlib.tests.test_commit_merge',
 
3955
        'bzrlib.tests.test_config',
 
3956
        'bzrlib.tests.test_conflicts',
 
3957
        'bzrlib.tests.test_controldir',
 
3958
        'bzrlib.tests.test_counted_lock',
 
3959
        'bzrlib.tests.test_crash',
 
3960
        'bzrlib.tests.test_decorators',
 
3961
        'bzrlib.tests.test_delta',
 
3962
        'bzrlib.tests.test_debug',
 
3963
        'bzrlib.tests.test_diff',
 
3964
        'bzrlib.tests.test_directory_service',
 
3965
        'bzrlib.tests.test_dirstate',
 
3966
        'bzrlib.tests.test_email_message',
 
3967
        'bzrlib.tests.test_eol_filters',
 
3968
        'bzrlib.tests.test_errors',
 
3969
        'bzrlib.tests.test_estimate_compressed_size',
 
3970
        'bzrlib.tests.test_export',
 
3971
        'bzrlib.tests.test_export_pot',
 
3972
        'bzrlib.tests.test_extract',
 
3973
        'bzrlib.tests.test_features',
 
3974
        'bzrlib.tests.test_fetch',
 
3975
        'bzrlib.tests.test_fixtures',
 
3976
        'bzrlib.tests.test_fifo_cache',
 
3977
        'bzrlib.tests.test_filters',
 
3978
        'bzrlib.tests.test_filter_tree',
 
3979
        'bzrlib.tests.test_ftp_transport',
 
3980
        'bzrlib.tests.test_foreign',
 
3981
        'bzrlib.tests.test_generate_docs',
 
3982
        'bzrlib.tests.test_generate_ids',
 
3983
        'bzrlib.tests.test_globbing',
 
3984
        'bzrlib.tests.test_gpg',
 
3985
        'bzrlib.tests.test_graph',
 
3986
        'bzrlib.tests.test_groupcompress',
 
3987
        'bzrlib.tests.test_hashcache',
 
3988
        'bzrlib.tests.test_help',
 
3989
        'bzrlib.tests.test_hooks',
 
3990
        'bzrlib.tests.test_http',
 
3991
        'bzrlib.tests.test_http_response',
 
3992
        'bzrlib.tests.test_https_ca_bundle',
 
3993
        'bzrlib.tests.test_i18n',
 
3994
        'bzrlib.tests.test_identitymap',
 
3995
        'bzrlib.tests.test_ignores',
 
3996
        'bzrlib.tests.test_index',
 
3997
        'bzrlib.tests.test_import_tariff',
 
3998
        'bzrlib.tests.test_info',
 
3999
        'bzrlib.tests.test_inv',
 
4000
        'bzrlib.tests.test_inventory_delta',
 
4001
        'bzrlib.tests.test_knit',
 
4002
        'bzrlib.tests.test_lazy_import',
 
4003
        'bzrlib.tests.test_lazy_regex',
 
4004
        'bzrlib.tests.test_library_state',
 
4005
        'bzrlib.tests.test_lock',
 
4006
        'bzrlib.tests.test_lockable_files',
 
4007
        'bzrlib.tests.test_lockdir',
 
4008
        'bzrlib.tests.test_log',
 
4009
        'bzrlib.tests.test_lru_cache',
 
4010
        'bzrlib.tests.test_lsprof',
 
4011
        'bzrlib.tests.test_mail_client',
 
4012
        'bzrlib.tests.test_matchers',
 
4013
        'bzrlib.tests.test_memorytree',
 
4014
        'bzrlib.tests.test_merge',
 
4015
        'bzrlib.tests.test_merge3',
 
4016
        'bzrlib.tests.test_merge_core',
 
4017
        'bzrlib.tests.test_merge_directive',
 
4018
        'bzrlib.tests.test_mergetools',
 
4019
        'bzrlib.tests.test_missing',
 
4020
        'bzrlib.tests.test_msgeditor',
 
4021
        'bzrlib.tests.test_multiparent',
 
4022
        'bzrlib.tests.test_mutabletree',
 
4023
        'bzrlib.tests.test_nonascii',
 
4024
        'bzrlib.tests.test_options',
 
4025
        'bzrlib.tests.test_osutils',
 
4026
        'bzrlib.tests.test_osutils_encodings',
 
4027
        'bzrlib.tests.test_pack',
 
4028
        'bzrlib.tests.test_patch',
 
4029
        'bzrlib.tests.test_patches',
 
4030
        'bzrlib.tests.test_permissions',
 
4031
        'bzrlib.tests.test_plugins',
 
4032
        'bzrlib.tests.test_progress',
 
4033
        'bzrlib.tests.test_pyutils',
 
4034
        'bzrlib.tests.test_read_bundle',
 
4035
        'bzrlib.tests.test_reconcile',
 
4036
        'bzrlib.tests.test_reconfigure',
 
4037
        'bzrlib.tests.test_registry',
 
4038
        'bzrlib.tests.test_remote',
 
4039
        'bzrlib.tests.test_rename_map',
 
4040
        'bzrlib.tests.test_repository',
 
4041
        'bzrlib.tests.test_revert',
 
4042
        'bzrlib.tests.test_revision',
 
4043
        'bzrlib.tests.test_revisionspec',
 
4044
        'bzrlib.tests.test_revisiontree',
 
4045
        'bzrlib.tests.test_rio',
 
4046
        'bzrlib.tests.test_rules',
 
4047
        'bzrlib.tests.test_sampler',
 
4048
        'bzrlib.tests.test_scenarios',
 
4049
        'bzrlib.tests.test_script',
 
4050
        'bzrlib.tests.test_selftest',
 
4051
        'bzrlib.tests.test_serializer',
 
4052
        'bzrlib.tests.test_setup',
 
4053
        'bzrlib.tests.test_sftp_transport',
 
4054
        'bzrlib.tests.test_shelf',
 
4055
        'bzrlib.tests.test_shelf_ui',
 
4056
        'bzrlib.tests.test_smart',
 
4057
        'bzrlib.tests.test_smart_add',
 
4058
        'bzrlib.tests.test_smart_request',
 
4059
        'bzrlib.tests.test_smart_signals',
 
4060
        'bzrlib.tests.test_smart_transport',
 
4061
        'bzrlib.tests.test_smtp_connection',
 
4062
        'bzrlib.tests.test_source',
 
4063
        'bzrlib.tests.test_ssh_transport',
 
4064
        'bzrlib.tests.test_status',
 
4065
        'bzrlib.tests.test_store',
 
4066
        'bzrlib.tests.test_strace',
 
4067
        'bzrlib.tests.test_subsume',
 
4068
        'bzrlib.tests.test_switch',
 
4069
        'bzrlib.tests.test_symbol_versioning',
 
4070
        'bzrlib.tests.test_tag',
 
4071
        'bzrlib.tests.test_test_server',
 
4072
        'bzrlib.tests.test_testament',
 
4073
        'bzrlib.tests.test_textfile',
 
4074
        'bzrlib.tests.test_textmerge',
 
4075
        'bzrlib.tests.test_cethread',
 
4076
        'bzrlib.tests.test_timestamp',
 
4077
        'bzrlib.tests.test_trace',
 
4078
        'bzrlib.tests.test_transactions',
 
4079
        'bzrlib.tests.test_transform',
 
4080
        'bzrlib.tests.test_transport',
 
4081
        'bzrlib.tests.test_transport_log',
 
4082
        'bzrlib.tests.test_tree',
 
4083
        'bzrlib.tests.test_treebuilder',
 
4084
        'bzrlib.tests.test_treeshape',
 
4085
        'bzrlib.tests.test_tsort',
 
4086
        'bzrlib.tests.test_tuned_gzip',
 
4087
        'bzrlib.tests.test_ui',
 
4088
        'bzrlib.tests.test_uncommit',
 
4089
        'bzrlib.tests.test_upgrade',
 
4090
        'bzrlib.tests.test_upgrade_stacked',
 
4091
        'bzrlib.tests.test_urlutils',
 
4092
        'bzrlib.tests.test_utextwrap',
 
4093
        'bzrlib.tests.test_version',
 
4094
        'bzrlib.tests.test_version_info',
 
4095
        'bzrlib.tests.test_versionedfile',
 
4096
        'bzrlib.tests.test_vf_search',
 
4097
        'bzrlib.tests.test_weave',
 
4098
        'bzrlib.tests.test_whitebox',
 
4099
        'bzrlib.tests.test_win32utils',
 
4100
        'bzrlib.tests.test_workingtree',
 
4101
        'bzrlib.tests.test_workingtree_4',
 
4102
        'bzrlib.tests.test_wsgi',
 
4103
        'bzrlib.tests.test_xml',
 
4104
        ]
 
4105
 
 
4106
 
 
4107
def _test_suite_modules_to_doctest():
 
4108
    """Return the list of modules to doctest."""
 
4109
    if __doc__ is None:
 
4110
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4111
        return []
 
4112
    return [
 
4113
        'bzrlib',
 
4114
        'bzrlib.branchbuilder',
 
4115
        'bzrlib.decorators',
 
4116
        'bzrlib.inventory',
 
4117
        'bzrlib.iterablefile',
 
4118
        'bzrlib.lockdir',
 
4119
        'bzrlib.merge3',
 
4120
        'bzrlib.option',
 
4121
        'bzrlib.pyutils',
 
4122
        'bzrlib.symbol_versioning',
 
4123
        'bzrlib.tests',
 
4124
        'bzrlib.tests.fixtures',
 
4125
        'bzrlib.timestamp',
 
4126
        'bzrlib.transport.http',
 
4127
        'bzrlib.version_info_formats.format_custom',
 
4128
        ]
 
4129
 
 
4130
 
 
4131
def test_suite(keep_only=None, starting_with=None):
 
4132
    """Build and return TestSuite for the whole of bzrlib.
 
4133
 
 
4134
    :param keep_only: A list of test ids limiting the suite returned.
 
4135
 
 
4136
    :param starting_with: An id limiting the suite returned to the tests
 
4137
         starting with it.
 
4138
 
 
4139
    This function can be replaced if you need to change the default test
 
4140
    suite on a global basis, but it is not encouraged.
 
4141
    """
 
4142
 
 
4143
    loader = TestUtil.TestLoader()
 
4144
 
 
4145
    if keep_only is not None:
 
4146
        id_filter = TestIdList(keep_only)
 
4147
    if starting_with:
 
4148
        # We take precedence over keep_only because *at loading time* using
 
4149
        # both options means we will load less tests for the same final result.
 
4150
        def interesting_module(name):
 
4151
            for start in starting_with:
 
4152
                if (
 
4153
                    # Either the module name starts with the specified string
 
4154
                    name.startswith(start)
 
4155
                    # or it may contain tests starting with the specified string
 
4156
                    or start.startswith(name)
 
4157
                    ):
 
4158
                    return True
 
4159
            return False
 
4160
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4161
 
 
4162
    elif keep_only is not None:
 
4163
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4164
        def interesting_module(name):
 
4165
            return id_filter.refers_to(name)
 
4166
 
 
4167
    else:
 
4168
        loader = TestUtil.TestLoader()
 
4169
        def interesting_module(name):
 
4170
            # No filtering, all modules are interesting
 
4171
            return True
 
4172
 
 
4173
    suite = loader.suiteClass()
 
4174
 
 
4175
    # modules building their suite with loadTestsFromModuleNames
 
4176
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4177
 
 
4178
    for mod in _test_suite_modules_to_doctest():
 
4179
        if not interesting_module(mod):
 
4180
            # No tests to keep here, move along
 
4181
            continue
 
4182
        try:
 
4183
            # note that this really does mean "report only" -- doctest
 
4184
            # still runs the rest of the examples
 
4185
            doc_suite = IsolatedDocTestSuite(
 
4186
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4187
        except ValueError, e:
 
4188
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
4189
            raise
 
4190
        if len(doc_suite._tests) == 0:
 
4191
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4192
        suite.addTest(doc_suite)
 
4193
 
 
4194
    default_encoding = sys.getdefaultencoding()
 
4195
    for name, plugin in _mod_plugin.plugins().items():
 
4196
        if not interesting_module(plugin.module.__name__):
 
4197
            continue
 
4198
        plugin_suite = plugin.test_suite()
 
4199
        # We used to catch ImportError here and turn it into just a warning,
 
4200
        # but really if you don't have --no-plugins this should be a failure.
 
4201
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4202
        if plugin_suite is None:
 
4203
            plugin_suite = plugin.load_plugin_tests(loader)
 
4204
        if plugin_suite is not None:
 
4205
            suite.addTest(plugin_suite)
 
4206
        if default_encoding != sys.getdefaultencoding():
 
4207
            trace.warning(
 
4208
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4209
                sys.getdefaultencoding())
 
4210
            reload(sys)
 
4211
            sys.setdefaultencoding(default_encoding)
 
4212
 
 
4213
    if keep_only is not None:
 
4214
        # Now that the referred modules have loaded their tests, keep only the
 
4215
        # requested ones.
 
4216
        suite = filter_suite_by_id_list(suite, id_filter)
 
4217
        # Do some sanity checks on the id_list filtering
 
4218
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4219
        if starting_with:
 
4220
            # The tester has used both keep_only and starting_with, so he is
 
4221
            # already aware that some tests are excluded from the list, there
 
4222
            # is no need to tell him which.
 
4223
            pass
 
4224
        else:
 
4225
            # Some tests mentioned in the list are not in the test suite. The
 
4226
            # list may be out of date, report to the tester.
 
4227
            for id in not_found:
 
4228
                trace.warning('"%s" not found in the test suite', id)
 
4229
        for id in duplicates:
 
4230
            trace.warning('"%s" is used as an id by several tests', id)
 
4231
 
 
4232
    return suite
 
4233
 
 
4234
 
 
4235
def multiply_scenarios(*scenarios):
 
4236
    """Multiply two or more iterables of scenarios.
 
4237
 
 
4238
    It is safe to pass scenario generators or iterators.
 
4239
 
 
4240
    :returns: A list of compound scenarios: the cross-product of all 
 
4241
        scenarios, with the names concatenated and the parameters
 
4242
        merged together.
 
4243
    """
 
4244
    return reduce(_multiply_two_scenarios, map(list, scenarios))
 
4245
 
 
4246
 
 
4247
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4248
    """Multiply two sets of scenarios.
 
4249
 
 
4250
    :returns: the cartesian product of the two sets of scenarios, that is
 
4251
        a scenario for every possible combination of a left scenario and a
 
4252
        right scenario.
 
4253
    """
 
4254
    return [
 
4255
        ('%s,%s' % (left_name, right_name),
 
4256
         dict(left_dict.items() + right_dict.items()))
 
4257
        for left_name, left_dict in scenarios_left
 
4258
        for right_name, right_dict in scenarios_right]
 
4259
 
 
4260
 
 
4261
def multiply_tests(tests, scenarios, result):
 
4262
    """Multiply tests_list by scenarios into result.
 
4263
 
 
4264
    This is the core workhorse for test parameterisation.
 
4265
 
 
4266
    Typically the load_tests() method for a per-implementation test suite will
 
4267
    call multiply_tests and return the result.
 
4268
 
 
4269
    :param tests: The tests to parameterise.
 
4270
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4271
        scenario_param_dict).
 
4272
    :param result: A TestSuite to add created tests to.
 
4273
 
 
4274
    This returns the passed in result TestSuite with the cross product of all
 
4275
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4276
    the scenario name at the end of its id(), and updating the test object's
 
4277
    __dict__ with the scenario_param_dict.
 
4278
 
 
4279
    >>> import bzrlib.tests.test_sampler
 
4280
    >>> r = multiply_tests(
 
4281
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4282
    ...     [('one', dict(param=1)),
 
4283
    ...      ('two', dict(param=2))],
 
4284
    ...     TestUtil.TestSuite())
 
4285
    >>> tests = list(iter_suite_tests(r))
 
4286
    >>> len(tests)
 
4287
    2
 
4288
    >>> tests[0].id()
 
4289
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4290
    >>> tests[0].param
 
4291
    1
 
4292
    >>> tests[1].param
 
4293
    2
 
4294
    """
 
4295
    for test in iter_suite_tests(tests):
 
4296
        apply_scenarios(test, scenarios, result)
 
4297
    return result
 
4298
 
 
4299
 
 
4300
def apply_scenarios(test, scenarios, result):
 
4301
    """Apply the scenarios in scenarios to test and add to result.
 
4302
 
 
4303
    :param test: The test to apply scenarios to.
 
4304
    :param scenarios: An iterable of scenarios to apply to test.
 
4305
    :return: result
 
4306
    :seealso: apply_scenario
 
4307
    """
 
4308
    for scenario in scenarios:
 
4309
        result.addTest(apply_scenario(test, scenario))
 
4310
    return result
 
4311
 
 
4312
 
 
4313
def apply_scenario(test, scenario):
 
4314
    """Copy test and apply scenario to it.
 
4315
 
 
4316
    :param test: A test to adapt.
 
4317
    :param scenario: A tuple describing the scenarion.
 
4318
        The first element of the tuple is the new test id.
 
4319
        The second element is a dict containing attributes to set on the
 
4320
        test.
 
4321
    :return: The adapted test.
 
4322
    """
 
4323
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4324
    new_test = clone_test(test, new_id)
 
4325
    for name, value in scenario[1].items():
 
4326
        setattr(new_test, name, value)
 
4327
    return new_test
 
4328
 
 
4329
 
 
4330
def clone_test(test, new_id):
 
4331
    """Clone a test giving it a new id.
 
4332
 
 
4333
    :param test: The test to clone.
 
4334
    :param new_id: The id to assign to it.
 
4335
    :return: The new test.
 
4336
    """
 
4337
    new_test = copy.copy(test)
 
4338
    new_test.id = lambda: new_id
 
4339
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4340
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4341
    # read the test output for parameterized tests, because tracebacks will be
 
4342
    # associated with irrelevant tests.
 
4343
    try:
 
4344
        details = new_test._TestCase__details
 
4345
    except AttributeError:
 
4346
        # must be a different version of testtools than expected.  Do nothing.
 
4347
        pass
 
4348
    else:
 
4349
        # Reset the '__details' dict.
 
4350
        new_test._TestCase__details = {}
 
4351
    return new_test
 
4352
 
 
4353
 
 
4354
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4355
                                ext_module_name):
 
4356
    """Helper for permutating tests against an extension module.
 
4357
 
 
4358
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4359
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4360
    against both implementations. Setting 'test.module' to the appropriate
 
4361
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4362
 
 
4363
    :param standard_tests: A test suite to permute
 
4364
    :param loader: A TestLoader
 
4365
    :param py_module_name: The python path to a python module that can always
 
4366
        be loaded, and will be considered the 'python' implementation. (eg
 
4367
        'bzrlib._chk_map_py')
 
4368
    :param ext_module_name: The python path to an extension module. If the
 
4369
        module cannot be loaded, a single test will be added, which notes that
 
4370
        the module is not available. If it can be loaded, all standard_tests
 
4371
        will be run against that module.
 
4372
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4373
        tests. feature is the Feature object that can be used to determine if
 
4374
        the module is available.
 
4375
    """
 
4376
 
 
4377
    from bzrlib.tests.features import ModuleAvailableFeature
 
4378
    py_module = pyutils.get_named_object(py_module_name)
 
4379
    scenarios = [
 
4380
        ('python', {'module': py_module}),
 
4381
    ]
 
4382
    suite = loader.suiteClass()
 
4383
    feature = ModuleAvailableFeature(ext_module_name)
 
4384
    if feature.available():
 
4385
        scenarios.append(('C', {'module': feature.module}))
 
4386
    else:
 
4387
        # the compiled module isn't available, so we add a failing test
 
4388
        class FailWithoutFeature(TestCase):
 
4389
            def test_fail(self):
 
4390
                self.requireFeature(feature)
 
4391
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4392
    result = multiply_tests(standard_tests, scenarios, suite)
 
4393
    return result, feature
 
4394
 
 
4395
 
 
4396
def _rmtree_temp_dir(dirname, test_id=None):
 
4397
    # If LANG=C we probably have created some bogus paths
 
4398
    # which rmtree(unicode) will fail to delete
 
4399
    # so make sure we are using rmtree(str) to delete everything
 
4400
    # except on win32, where rmtree(str) will fail
 
4401
    # since it doesn't have the property of byte-stream paths
 
4402
    # (they are either ascii or mbcs)
 
4403
    if sys.platform == 'win32':
 
4404
        # make sure we are using the unicode win32 api
 
4405
        dirname = unicode(dirname)
 
4406
    else:
 
4407
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4408
    try:
 
4409
        osutils.rmtree(dirname)
 
4410
    except OSError, e:
 
4411
        # We don't want to fail here because some useful display will be lost
 
4412
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4413
        # possible info to the test runner is even worse.
 
4414
        if test_id != None:
 
4415
            ui.ui_factory.clear_term()
 
4416
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4417
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4418
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4419
                                    ).encode('ascii', 'replace')
 
4420
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4421
                         % (os.path.basename(dirname), printable_e))
 
4422
 
 
4423
 
 
4424
def probe_unicode_in_user_encoding():
 
4425
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4426
    Return first successfull match.
 
4427
 
 
4428
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4429
    """
 
4430
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4431
    for uni_val in possible_vals:
 
4432
        try:
 
4433
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4434
        except UnicodeEncodeError:
 
4435
            # Try a different character
 
4436
            pass
 
4437
        else:
 
4438
            return uni_val, str_val
 
4439
    return None, None
 
4440
 
 
4441
 
 
4442
def probe_bad_non_ascii(encoding):
 
4443
    """Try to find [bad] character with code [128..255]
 
4444
    that cannot be decoded to unicode in some encoding.
 
4445
    Return None if all non-ascii characters is valid
 
4446
    for given encoding.
 
4447
    """
 
4448
    for i in xrange(128, 256):
 
4449
        char = chr(i)
 
4450
        try:
 
4451
            char.decode(encoding)
 
4452
        except UnicodeDecodeError:
 
4453
            return char
 
4454
    return None
 
4455
 
 
4456
 
 
4457
# Only define SubUnitBzrRunner if subunit is available.
 
4458
try:
 
4459
    from subunit import TestProtocolClient
 
4460
    from subunit.test_results import AutoTimingTestResultDecorator
 
4461
    class SubUnitBzrProtocolClient(TestProtocolClient):
 
4462
 
 
4463
        def stopTest(self, test):
 
4464
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4465
            _clear__type_equality_funcs(test)
 
4466
 
 
4467
        def addSuccess(self, test, details=None):
 
4468
            # The subunit client always includes the details in the subunit
 
4469
            # stream, but we don't want to include it in ours.
 
4470
            if details is not None and 'log' in details:
 
4471
                del details['log']
 
4472
            return super(SubUnitBzrProtocolClient, self).addSuccess(
 
4473
                test, details)
 
4474
 
 
4475
    class SubUnitBzrRunner(TextTestRunner):
 
4476
        def run(self, test):
 
4477
            result = AutoTimingTestResultDecorator(
 
4478
                SubUnitBzrProtocolClient(self.stream))
 
4479
            test.run(result)
 
4480
            return result
 
4481
except ImportError:
 
4482
    pass
 
4483
 
 
4484
 
 
4485
# API compatibility for old plugins; see bug 892622.
 
4486
for name in [
 
4487
    'Feature',
 
4488
    'HTTPServerFeature', 
 
4489
    'ModuleAvailableFeature',
 
4490
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4491
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4492
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4493
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4494
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4495
    'posix_permissions_feature',
 
4496
    ]:
 
4497
    globals()[name] = _CompatabilityThunkFeature(
 
4498
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4499
        'bzrlib.tests', name,
 
4500
        name, 'bzrlib.tests.features')
 
4501
 
 
4502
 
 
4503
for (old_name, new_name) in [
 
4504
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4505
    ]:
 
4506
    globals()[name] = _CompatabilityThunkFeature(
 
4507
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4508
        'bzrlib.tests', old_name,
 
4509
        new_name, 'bzrlib.tests.features')