~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Jelmer Vernooij
  • Date: 2011-11-30 20:02:16 UTC
  • mto: This revision was merged to the branch mainline in revision 6333.
  • Revision ID: jelmer@samba.org-20111130200216-aoju21pdl20d1gkd
Consistently pass tree path when exporting.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
# NOTE: Some classes in here use camelCaseNaming() rather than
 
20
# underscore_naming().  That's for consistency with unittest; it's not the
 
21
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
22
# new assertFoo() methods.
 
23
 
 
24
import atexit
 
25
import codecs
 
26
import copy
 
27
from cStringIO import StringIO
 
28
import difflib
 
29
import doctest
 
30
import errno
 
31
import itertools
 
32
import logging
 
33
import os
 
34
import platform
 
35
import pprint
 
36
import random
 
37
import re
 
38
import shlex
 
39
import stat
 
40
import subprocess
 
41
import sys
 
42
import tempfile
 
43
import threading
 
44
import time
 
45
import traceback
 
46
import unittest
 
47
import warnings
 
48
 
 
49
import testtools
 
50
# nb: check this before importing anything else from within it
 
51
_testtools_version = getattr(testtools, '__version__', ())
 
52
if _testtools_version < (0, 9, 5):
 
53
    raise ImportError("need at least testtools 0.9.5: %s is %r"
 
54
        % (testtools.__file__, _testtools_version))
 
55
from testtools import content
 
56
 
 
57
import bzrlib
 
58
from bzrlib import (
 
59
    branchbuilder,
 
60
    bzrdir,
 
61
    chk_map,
 
62
    commands as _mod_commands,
 
63
    config,
 
64
    i18n,
 
65
    debug,
 
66
    errors,
 
67
    hooks,
 
68
    lock as _mod_lock,
 
69
    lockdir,
 
70
    memorytree,
 
71
    osutils,
 
72
    plugin as _mod_plugin,
 
73
    pyutils,
 
74
    ui,
 
75
    urlutils,
 
76
    registry,
 
77
    symbol_versioning,
 
78
    trace,
 
79
    transport as _mod_transport,
 
80
    workingtree,
 
81
    )
 
82
try:
 
83
    import bzrlib.lsprof
 
84
except ImportError:
 
85
    # lsprof not available
 
86
    pass
 
87
from bzrlib.smart import client, request
 
88
from bzrlib.transport import (
 
89
    memory,
 
90
    pathfilter,
 
91
    )
 
92
from bzrlib.symbol_versioning import (
 
93
    deprecated_function,
 
94
    deprecated_in,
 
95
    )
 
96
from bzrlib.tests import (
 
97
    fixtures,
 
98
    test_server,
 
99
    TestUtil,
 
100
    treeshape,
 
101
    )
 
102
from bzrlib.ui import NullProgressView
 
103
from bzrlib.ui.text import TextUIFactory
 
104
from bzrlib.tests.features import _CompatabilityThunkFeature
 
105
 
 
106
# Mark this python module as being part of the implementation
 
107
# of unittest: this gives us better tracebacks where the last
 
108
# shown frame is the test code, not our assertXYZ.
 
109
__unittest = 1
 
110
 
 
111
default_transport = test_server.LocalURLServer
 
112
 
 
113
 
 
114
_unitialized_attr = object()
 
115
"""A sentinel needed to act as a default value in a method signature."""
 
116
 
 
117
 
 
118
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
119
SUBUNIT_SEEK_SET = 0
 
120
SUBUNIT_SEEK_CUR = 1
 
121
 
 
122
# These are intentionally brought into this namespace. That way plugins, etc
 
123
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
 
124
TestSuite = TestUtil.TestSuite
 
125
TestLoader = TestUtil.TestLoader
 
126
 
 
127
# Tests should run in a clean and clearly defined environment. The goal is to
 
128
# keep them isolated from the running environment as mush as possible. The test
 
129
# framework ensures the variables defined below are set (or deleted if the
 
130
# value is None) before a test is run and reset to their original value after
 
131
# the test is run. Generally if some code depends on an environment variable,
 
132
# the tests should start without this variable in the environment. There are a
 
133
# few exceptions but you shouldn't violate this rule lightly.
 
134
isolated_environ = {
 
135
    'BZR_HOME': None,
 
136
    'HOME': None,
 
137
    # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
138
    # tests do check our impls match APPDATA
 
139
    'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
140
    'VISUAL': None,
 
141
    'EDITOR': None,
 
142
    'BZR_EMAIL': None,
 
143
    'BZREMAIL': None, # may still be present in the environment
 
144
    'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
145
    'BZR_PROGRESS_BAR': None,
 
146
    # This should trap leaks to ~/.bzr.log. This occurs when tests use TestCase
 
147
    # as a base class instead of TestCaseInTempDir. Tests inheriting from
 
148
    # TestCase should not use disk resources, BZR_LOG is one.
 
149
    'BZR_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
 
150
    'BZR_PLUGIN_PATH': None,
 
151
    'BZR_DISABLE_PLUGINS': None,
 
152
    'BZR_PLUGINS_AT': None,
 
153
    'BZR_CONCURRENCY': None,
 
154
    # Make sure that any text ui tests are consistent regardless of
 
155
    # the environment the test case is run in; you may want tests that
 
156
    # test other combinations.  'dumb' is a reasonable guess for tests
 
157
    # going to a pipe or a StringIO.
 
158
    'TERM': 'dumb',
 
159
    'LINES': '25',
 
160
    'COLUMNS': '80',
 
161
    'BZR_COLUMNS': '80',
 
162
    # Disable SSH Agent
 
163
    'SSH_AUTH_SOCK': None,
 
164
    # Proxies
 
165
    'http_proxy': None,
 
166
    'HTTP_PROXY': None,
 
167
    'https_proxy': None,
 
168
    'HTTPS_PROXY': None,
 
169
    'no_proxy': None,
 
170
    'NO_PROXY': None,
 
171
    'all_proxy': None,
 
172
    'ALL_PROXY': None,
 
173
    # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
174
    # least. If you do (care), please update this comment
 
175
    # -- vila 20080401
 
176
    'ftp_proxy': None,
 
177
    'FTP_PROXY': None,
 
178
    'BZR_REMOTE_PATH': None,
 
179
    # Generally speaking, we don't want apport reporting on crashes in
 
180
    # the test envirnoment unless we're specifically testing apport,
 
181
    # so that it doesn't leak into the real system environment.  We
 
182
    # use an env var so it propagates to subprocesses.
 
183
    'APPORT_DISABLE': '1',
 
184
    }
 
185
 
 
186
 
 
187
def override_os_environ(test, env=None):
 
188
    """Modify os.environ keeping a copy.
 
189
    
 
190
    :param test: A test instance
 
191
 
 
192
    :param env: A dict containing variable definitions to be installed
 
193
    """
 
194
    if env is None:
 
195
        env = isolated_environ
 
196
    test._original_os_environ = dict([(var, value)
 
197
                                      for var, value in os.environ.iteritems()])
 
198
    for var, value in env.iteritems():
 
199
        osutils.set_or_unset_env(var, value)
 
200
        if var not in test._original_os_environ:
 
201
            # The var is new, add it with a value of None, so
 
202
            # restore_os_environ will delete it
 
203
            test._original_os_environ[var] = None
 
204
 
 
205
 
 
206
def restore_os_environ(test):
 
207
    """Restore os.environ to its original state.
 
208
 
 
209
    :param test: A test instance previously passed to override_os_environ.
 
210
    """
 
211
    for var, value in test._original_os_environ.iteritems():
 
212
        # Restore the original value (or delete it if the value has been set to
 
213
        # None in override_os_environ).
 
214
        osutils.set_or_unset_env(var, value)
 
215
 
 
216
 
 
217
def _clear__type_equality_funcs(test):
 
218
    """Cleanup bound methods stored on TestCase instances
 
219
 
 
220
    Clear the dict breaking a few (mostly) harmless cycles in the affected
 
221
    unittests released with Python 2.6 and initial Python 2.7 versions.
 
222
 
 
223
    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
 
224
    shipped in Oneiric, an object with no clear method was used, hence the
 
225
    extra complications, see bug 809048 for details.
 
226
    """
 
227
    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
 
228
    if type_equality_funcs is not None:
 
229
        tef_clear = getattr(type_equality_funcs, "clear", None)
 
230
        if tef_clear is None:
 
231
            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
 
232
            if tef_instance_dict is not None:
 
233
                tef_clear = tef_instance_dict.clear
 
234
        if tef_clear is not None:
 
235
            tef_clear()
 
236
 
 
237
 
 
238
class ExtendedTestResult(testtools.TextTestResult):
 
239
    """Accepts, reports and accumulates the results of running tests.
 
240
 
 
241
    Compared to the unittest version this class adds support for
 
242
    profiling, benchmarking, stopping as soon as a test fails,  and
 
243
    skipping tests.  There are further-specialized subclasses for
 
244
    different types of display.
 
245
 
 
246
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
247
    addFailure or addError classes.  These in turn may redirect to a more
 
248
    specific case for the special test results supported by our extended
 
249
    tests.
 
250
 
 
251
    Note that just one of these objects is fed the results from many tests.
 
252
    """
 
253
 
 
254
    stop_early = False
 
255
 
 
256
    def __init__(self, stream, descriptions, verbosity,
 
257
                 bench_history=None,
 
258
                 strict=False,
 
259
                 ):
 
260
        """Construct new TestResult.
 
261
 
 
262
        :param bench_history: Optionally, a writable file object to accumulate
 
263
            benchmark results.
 
264
        """
 
265
        testtools.TextTestResult.__init__(self, stream)
 
266
        if bench_history is not None:
 
267
            from bzrlib.version import _get_bzr_source_tree
 
268
            src_tree = _get_bzr_source_tree()
 
269
            if src_tree:
 
270
                try:
 
271
                    revision_id = src_tree.get_parent_ids()[0]
 
272
                except IndexError:
 
273
                    # XXX: if this is a brand new tree, do the same as if there
 
274
                    # is no branch.
 
275
                    revision_id = ''
 
276
            else:
 
277
                # XXX: If there's no branch, what should we do?
 
278
                revision_id = ''
 
279
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
280
        self._bench_history = bench_history
 
281
        self.ui = ui.ui_factory
 
282
        self.num_tests = 0
 
283
        self.error_count = 0
 
284
        self.failure_count = 0
 
285
        self.known_failure_count = 0
 
286
        self.skip_count = 0
 
287
        self.not_applicable_count = 0
 
288
        self.unsupported = {}
 
289
        self.count = 0
 
290
        self._overall_start_time = time.time()
 
291
        self._strict = strict
 
292
        self._first_thread_leaker_id = None
 
293
        self._tests_leaking_threads_count = 0
 
294
        self._traceback_from_test = None
 
295
 
 
296
    def stopTestRun(self):
 
297
        run = self.testsRun
 
298
        actionTaken = "Ran"
 
299
        stopTime = time.time()
 
300
        timeTaken = stopTime - self.startTime
 
301
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
302
        #                the parent class method is similar have to duplicate
 
303
        self._show_list('ERROR', self.errors)
 
304
        self._show_list('FAIL', self.failures)
 
305
        self.stream.write(self.sep2)
 
306
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
307
                            run, run != 1 and "s" or "", timeTaken))
 
308
        if not self.wasSuccessful():
 
309
            self.stream.write("FAILED (")
 
310
            failed, errored = map(len, (self.failures, self.errors))
 
311
            if failed:
 
312
                self.stream.write("failures=%d" % failed)
 
313
            if errored:
 
314
                if failed: self.stream.write(", ")
 
315
                self.stream.write("errors=%d" % errored)
 
316
            if self.known_failure_count:
 
317
                if failed or errored: self.stream.write(", ")
 
318
                self.stream.write("known_failure_count=%d" %
 
319
                    self.known_failure_count)
 
320
            self.stream.write(")\n")
 
321
        else:
 
322
            if self.known_failure_count:
 
323
                self.stream.write("OK (known_failures=%d)\n" %
 
324
                    self.known_failure_count)
 
325
            else:
 
326
                self.stream.write("OK\n")
 
327
        if self.skip_count > 0:
 
328
            skipped = self.skip_count
 
329
            self.stream.write('%d test%s skipped\n' %
 
330
                                (skipped, skipped != 1 and "s" or ""))
 
331
        if self.unsupported:
 
332
            for feature, count in sorted(self.unsupported.items()):
 
333
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
334
                    (feature, count))
 
335
        if self._strict:
 
336
            ok = self.wasStrictlySuccessful()
 
337
        else:
 
338
            ok = self.wasSuccessful()
 
339
        if self._first_thread_leaker_id:
 
340
            self.stream.write(
 
341
                '%s is leaking threads among %d leaking tests.\n' % (
 
342
                self._first_thread_leaker_id,
 
343
                self._tests_leaking_threads_count))
 
344
            # We don't report the main thread as an active one.
 
345
            self.stream.write(
 
346
                '%d non-main threads were left active in the end.\n'
 
347
                % (len(self._active_threads) - 1))
 
348
 
 
349
    def getDescription(self, test):
 
350
        return test.id()
 
351
 
 
352
    def _extractBenchmarkTime(self, testCase, details=None):
 
353
        """Add a benchmark time for the current test case."""
 
354
        if details and 'benchtime' in details:
 
355
            return float(''.join(details['benchtime'].iter_bytes()))
 
356
        return getattr(testCase, "_benchtime", None)
 
357
 
 
358
    def _elapsedTestTimeString(self):
 
359
        """Return a time string for the overall time the current test has taken."""
 
360
        return self._formatTime(self._delta_to_float(
 
361
            self._now() - self._start_datetime))
 
362
 
 
363
    def _testTimeString(self, testCase):
 
364
        benchmark_time = self._extractBenchmarkTime(testCase)
 
365
        if benchmark_time is not None:
 
366
            return self._formatTime(benchmark_time) + "*"
 
367
        else:
 
368
            return self._elapsedTestTimeString()
 
369
 
 
370
    def _formatTime(self, seconds):
 
371
        """Format seconds as milliseconds with leading spaces."""
 
372
        # some benchmarks can take thousands of seconds to run, so we need 8
 
373
        # places
 
374
        return "%8dms" % (1000 * seconds)
 
375
 
 
376
    def _shortened_test_description(self, test):
 
377
        what = test.id()
 
378
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
379
        return what
 
380
 
 
381
    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
 
382
    #                multiple times in a row, because the handler is added for
 
383
    #                each test but the container list is shared between cases.
 
384
    #                See lp:498869 lp:625574 and lp:637725 for background.
 
385
    def _record_traceback_from_test(self, exc_info):
 
386
        """Store the traceback from passed exc_info tuple till"""
 
387
        self._traceback_from_test = exc_info[2]
 
388
 
 
389
    def startTest(self, test):
 
390
        super(ExtendedTestResult, self).startTest(test)
 
391
        if self.count == 0:
 
392
            self.startTests()
 
393
        self.count += 1
 
394
        self.report_test_start(test)
 
395
        test.number = self.count
 
396
        self._recordTestStartTime()
 
397
        # Make testtools cases give us the real traceback on failure
 
398
        addOnException = getattr(test, "addOnException", None)
 
399
        if addOnException is not None:
 
400
            addOnException(self._record_traceback_from_test)
 
401
        # Only check for thread leaks on bzrlib derived test cases
 
402
        if isinstance(test, TestCase):
 
403
            test.addCleanup(self._check_leaked_threads, test)
 
404
 
 
405
    def stopTest(self, test):
 
406
        super(ExtendedTestResult, self).stopTest(test)
 
407
        # Manually break cycles, means touching various private things but hey
 
408
        getDetails = getattr(test, "getDetails", None)
 
409
        if getDetails is not None:
 
410
            getDetails().clear()
 
411
        _clear__type_equality_funcs(test)
 
412
        self._traceback_from_test = None
 
413
 
 
414
    def startTests(self):
 
415
        self.report_tests_starting()
 
416
        self._active_threads = threading.enumerate()
 
417
 
 
418
    def _check_leaked_threads(self, test):
 
419
        """See if any threads have leaked since last call
 
420
 
 
421
        A sample of live threads is stored in the _active_threads attribute,
 
422
        when this method runs it compares the current live threads and any not
 
423
        in the previous sample are treated as having leaked.
 
424
        """
 
425
        now_active_threads = set(threading.enumerate())
 
426
        threads_leaked = now_active_threads.difference(self._active_threads)
 
427
        if threads_leaked:
 
428
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
429
            self._tests_leaking_threads_count += 1
 
430
            if self._first_thread_leaker_id is None:
 
431
                self._first_thread_leaker_id = test.id()
 
432
            self._active_threads = now_active_threads
 
433
 
 
434
    def _recordTestStartTime(self):
 
435
        """Record that a test has started."""
 
436
        self._start_datetime = self._now()
 
437
 
 
438
    def addError(self, test, err):
 
439
        """Tell result that test finished with an error.
 
440
 
 
441
        Called from the TestCase run() method when the test
 
442
        fails with an unexpected error.
 
443
        """
 
444
        self._post_mortem(self._traceback_from_test)
 
445
        super(ExtendedTestResult, self).addError(test, err)
 
446
        self.error_count += 1
 
447
        self.report_error(test, err)
 
448
        if self.stop_early:
 
449
            self.stop()
 
450
 
 
451
    def addFailure(self, test, err):
 
452
        """Tell result that test failed.
 
453
 
 
454
        Called from the TestCase run() method when the test
 
455
        fails because e.g. an assert() method failed.
 
456
        """
 
457
        self._post_mortem(self._traceback_from_test)
 
458
        super(ExtendedTestResult, self).addFailure(test, err)
 
459
        self.failure_count += 1
 
460
        self.report_failure(test, err)
 
461
        if self.stop_early:
 
462
            self.stop()
 
463
 
 
464
    def addSuccess(self, test, details=None):
 
465
        """Tell result that test completed successfully.
 
466
 
 
467
        Called from the TestCase run()
 
468
        """
 
469
        if self._bench_history is not None:
 
470
            benchmark_time = self._extractBenchmarkTime(test, details)
 
471
            if benchmark_time is not None:
 
472
                self._bench_history.write("%s %s\n" % (
 
473
                    self._formatTime(benchmark_time),
 
474
                    test.id()))
 
475
        self.report_success(test)
 
476
        super(ExtendedTestResult, self).addSuccess(test)
 
477
        test._log_contents = ''
 
478
 
 
479
    def addExpectedFailure(self, test, err):
 
480
        self.known_failure_count += 1
 
481
        self.report_known_failure(test, err)
 
482
 
 
483
    def addUnexpectedSuccess(self, test, details=None):
 
484
        """Tell result the test unexpectedly passed, counting as a failure
 
485
 
 
486
        When the minimum version of testtools required becomes 0.9.8 this
 
487
        can be updated to use the new handling there.
 
488
        """
 
489
        super(ExtendedTestResult, self).addFailure(test, details=details)
 
490
        self.failure_count += 1
 
491
        self.report_unexpected_success(test,
 
492
            "".join(details["reason"].iter_text()))
 
493
        if self.stop_early:
 
494
            self.stop()
 
495
 
 
496
    def addNotSupported(self, test, feature):
 
497
        """The test will not be run because of a missing feature.
 
498
        """
 
499
        # this can be called in two different ways: it may be that the
 
500
        # test started running, and then raised (through requireFeature)
 
501
        # UnavailableFeature.  Alternatively this method can be called
 
502
        # while probing for features before running the test code proper; in
 
503
        # that case we will see startTest and stopTest, but the test will
 
504
        # never actually run.
 
505
        self.unsupported.setdefault(str(feature), 0)
 
506
        self.unsupported[str(feature)] += 1
 
507
        self.report_unsupported(test, feature)
 
508
 
 
509
    def addSkip(self, test, reason):
 
510
        """A test has not run for 'reason'."""
 
511
        self.skip_count += 1
 
512
        self.report_skip(test, reason)
 
513
 
 
514
    def addNotApplicable(self, test, reason):
 
515
        self.not_applicable_count += 1
 
516
        self.report_not_applicable(test, reason)
 
517
 
 
518
    def _count_stored_tests(self):
 
519
        """Count of tests instances kept alive due to not succeeding"""
 
520
        return self.error_count + self.failure_count + self.known_failure_count
 
521
 
 
522
    def _post_mortem(self, tb=None):
 
523
        """Start a PDB post mortem session."""
 
524
        if os.environ.get('BZR_TEST_PDB', None):
 
525
            import pdb
 
526
            pdb.post_mortem(tb)
 
527
 
 
528
    def progress(self, offset, whence):
 
529
        """The test is adjusting the count of tests to run."""
 
530
        if whence == SUBUNIT_SEEK_SET:
 
531
            self.num_tests = offset
 
532
        elif whence == SUBUNIT_SEEK_CUR:
 
533
            self.num_tests += offset
 
534
        else:
 
535
            raise errors.BzrError("Unknown whence %r" % whence)
 
536
 
 
537
    def report_tests_starting(self):
 
538
        """Display information before the test run begins"""
 
539
        if getattr(sys, 'frozen', None) is None:
 
540
            bzr_path = osutils.realpath(sys.argv[0])
 
541
        else:
 
542
            bzr_path = sys.executable
 
543
        self.stream.write(
 
544
            'bzr selftest: %s\n' % (bzr_path,))
 
545
        self.stream.write(
 
546
            '   %s\n' % (
 
547
                    bzrlib.__path__[0],))
 
548
        self.stream.write(
 
549
            '   bzr-%s python-%s %s\n' % (
 
550
                    bzrlib.version_string,
 
551
                    bzrlib._format_version_tuple(sys.version_info),
 
552
                    platform.platform(aliased=1),
 
553
                    ))
 
554
        self.stream.write('\n')
 
555
 
 
556
    def report_test_start(self, test):
 
557
        """Display information on the test just about to be run"""
 
558
 
 
559
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
560
        """Display information on a test that leaked one or more threads"""
 
561
        # GZ 2010-09-09: A leak summary reported separately from the general
 
562
        #                thread debugging would be nice. Tests under subunit
 
563
        #                need something not using stream, perhaps adding a
 
564
        #                testtools details object would be fitting.
 
565
        if 'threads' in selftest_debug_flags:
 
566
            self.stream.write('%s is leaking, active is now %d\n' %
 
567
                (test.id(), len(active_threads)))
 
568
 
 
569
    def startTestRun(self):
 
570
        self.startTime = time.time()
 
571
 
 
572
    def report_success(self, test):
 
573
        pass
 
574
 
 
575
    def wasStrictlySuccessful(self):
 
576
        if self.unsupported or self.known_failure_count:
 
577
            return False
 
578
        return self.wasSuccessful()
 
579
 
 
580
 
 
581
class TextTestResult(ExtendedTestResult):
 
582
    """Displays progress and results of tests in text form"""
 
583
 
 
584
    def __init__(self, stream, descriptions, verbosity,
 
585
                 bench_history=None,
 
586
                 pb=None,
 
587
                 strict=None,
 
588
                 ):
 
589
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
590
            bench_history, strict)
 
591
        # We no longer pass them around, but just rely on the UIFactory stack
 
592
        # for state
 
593
        if pb is not None:
 
594
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
595
        self.pb = self.ui.nested_progress_bar()
 
596
        self.pb.show_pct = False
 
597
        self.pb.show_spinner = False
 
598
        self.pb.show_eta = False,
 
599
        self.pb.show_count = False
 
600
        self.pb.show_bar = False
 
601
        self.pb.update_latency = 0
 
602
        self.pb.show_transport_activity = False
 
603
 
 
604
    def stopTestRun(self):
 
605
        # called when the tests that are going to run have run
 
606
        self.pb.clear()
 
607
        self.pb.finished()
 
608
        super(TextTestResult, self).stopTestRun()
 
609
 
 
610
    def report_tests_starting(self):
 
611
        super(TextTestResult, self).report_tests_starting()
 
612
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
613
 
 
614
    def _progress_prefix_text(self):
 
615
        # the longer this text, the less space we have to show the test
 
616
        # name...
 
617
        a = '[%d' % self.count              # total that have been run
 
618
        # tests skipped as known not to be relevant are not important enough
 
619
        # to show here
 
620
        ## if self.skip_count:
 
621
        ##     a += ', %d skip' % self.skip_count
 
622
        ## if self.known_failure_count:
 
623
        ##     a += '+%dX' % self.known_failure_count
 
624
        if self.num_tests:
 
625
            a +='/%d' % self.num_tests
 
626
        a += ' in '
 
627
        runtime = time.time() - self._overall_start_time
 
628
        if runtime >= 60:
 
629
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
630
        else:
 
631
            a += '%ds' % runtime
 
632
        total_fail_count = self.error_count + self.failure_count
 
633
        if total_fail_count:
 
634
            a += ', %d failed' % total_fail_count
 
635
        # if self.unsupported:
 
636
        #     a += ', %d missing' % len(self.unsupported)
 
637
        a += ']'
 
638
        return a
 
639
 
 
640
    def report_test_start(self, test):
 
641
        self.pb.update(
 
642
                self._progress_prefix_text()
 
643
                + ' '
 
644
                + self._shortened_test_description(test))
 
645
 
 
646
    def _test_description(self, test):
 
647
        return self._shortened_test_description(test)
 
648
 
 
649
    def report_error(self, test, err):
 
650
        self.stream.write('ERROR: %s\n    %s\n' % (
 
651
            self._test_description(test),
 
652
            err[1],
 
653
            ))
 
654
 
 
655
    def report_failure(self, test, err):
 
656
        self.stream.write('FAIL: %s\n    %s\n' % (
 
657
            self._test_description(test),
 
658
            err[1],
 
659
            ))
 
660
 
 
661
    def report_known_failure(self, test, err):
 
662
        pass
 
663
 
 
664
    def report_unexpected_success(self, test, reason):
 
665
        self.stream.write('FAIL: %s\n    %s: %s\n' % (
 
666
            self._test_description(test),
 
667
            "Unexpected success. Should have failed",
 
668
            reason,
 
669
            ))
 
670
 
 
671
    def report_skip(self, test, reason):
 
672
        pass
 
673
 
 
674
    def report_not_applicable(self, test, reason):
 
675
        pass
 
676
 
 
677
    def report_unsupported(self, test, feature):
 
678
        """test cannot be run because feature is missing."""
 
679
 
 
680
 
 
681
class VerboseTestResult(ExtendedTestResult):
 
682
    """Produce long output, with one line per test run plus times"""
 
683
 
 
684
    def _ellipsize_to_right(self, a_string, final_width):
 
685
        """Truncate and pad a string, keeping the right hand side"""
 
686
        if len(a_string) > final_width:
 
687
            result = '...' + a_string[3-final_width:]
 
688
        else:
 
689
            result = a_string
 
690
        return result.ljust(final_width)
 
691
 
 
692
    def report_tests_starting(self):
 
693
        self.stream.write('running %d tests...\n' % self.num_tests)
 
694
        super(VerboseTestResult, self).report_tests_starting()
 
695
 
 
696
    def report_test_start(self, test):
 
697
        name = self._shortened_test_description(test)
 
698
        width = osutils.terminal_width()
 
699
        if width is not None:
 
700
            # width needs space for 6 char status, plus 1 for slash, plus an
 
701
            # 11-char time string, plus a trailing blank
 
702
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
703
            # space
 
704
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
705
        else:
 
706
            self.stream.write(name)
 
707
        self.stream.flush()
 
708
 
 
709
    def _error_summary(self, err):
 
710
        indent = ' ' * 4
 
711
        return '%s%s' % (indent, err[1])
 
712
 
 
713
    def report_error(self, test, err):
 
714
        self.stream.write('ERROR %s\n%s\n'
 
715
                % (self._testTimeString(test),
 
716
                   self._error_summary(err)))
 
717
 
 
718
    def report_failure(self, test, err):
 
719
        self.stream.write(' FAIL %s\n%s\n'
 
720
                % (self._testTimeString(test),
 
721
                   self._error_summary(err)))
 
722
 
 
723
    def report_known_failure(self, test, err):
 
724
        self.stream.write('XFAIL %s\n%s\n'
 
725
                % (self._testTimeString(test),
 
726
                   self._error_summary(err)))
 
727
 
 
728
    def report_unexpected_success(self, test, reason):
 
729
        self.stream.write(' FAIL %s\n%s: %s\n'
 
730
                % (self._testTimeString(test),
 
731
                   "Unexpected success. Should have failed",
 
732
                   reason))
 
733
 
 
734
    def report_success(self, test):
 
735
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
736
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
737
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
738
            stats.pprint(file=self.stream)
 
739
        # flush the stream so that we get smooth output. This verbose mode is
 
740
        # used to show the output in PQM.
 
741
        self.stream.flush()
 
742
 
 
743
    def report_skip(self, test, reason):
 
744
        self.stream.write(' SKIP %s\n%s\n'
 
745
                % (self._testTimeString(test), reason))
 
746
 
 
747
    def report_not_applicable(self, test, reason):
 
748
        self.stream.write('  N/A %s\n    %s\n'
 
749
                % (self._testTimeString(test), reason))
 
750
 
 
751
    def report_unsupported(self, test, feature):
 
752
        """test cannot be run because feature is missing."""
 
753
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
754
                %(self._testTimeString(test), feature))
 
755
 
 
756
 
 
757
class TextTestRunner(object):
 
758
    stop_on_failure = False
 
759
 
 
760
    def __init__(self,
 
761
                 stream=sys.stderr,
 
762
                 descriptions=0,
 
763
                 verbosity=1,
 
764
                 bench_history=None,
 
765
                 strict=False,
 
766
                 result_decorators=None,
 
767
                 ):
 
768
        """Create a TextTestRunner.
 
769
 
 
770
        :param result_decorators: An optional list of decorators to apply
 
771
            to the result object being used by the runner. Decorators are
 
772
            applied left to right - the first element in the list is the 
 
773
            innermost decorator.
 
774
        """
 
775
        # stream may know claim to know to write unicode strings, but in older
 
776
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
777
        # specifically a built in file with encoding 'UTF-8' will still try
 
778
        # to encode using ascii.
 
779
        new_encoding = osutils.get_terminal_encoding()
 
780
        codec = codecs.lookup(new_encoding)
 
781
        if type(codec) is tuple:
 
782
            # Python 2.4
 
783
            encode = codec[0]
 
784
        else:
 
785
            encode = codec.encode
 
786
        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
 
787
        #                so should swap to the plain codecs.StreamWriter
 
788
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
 
789
            "backslashreplace")
 
790
        stream.encoding = new_encoding
 
791
        self.stream = stream
 
792
        self.descriptions = descriptions
 
793
        self.verbosity = verbosity
 
794
        self._bench_history = bench_history
 
795
        self._strict = strict
 
796
        self._result_decorators = result_decorators or []
 
797
 
 
798
    def run(self, test):
 
799
        "Run the given test case or test suite."
 
800
        if self.verbosity == 1:
 
801
            result_class = TextTestResult
 
802
        elif self.verbosity >= 2:
 
803
            result_class = VerboseTestResult
 
804
        original_result = result_class(self.stream,
 
805
                              self.descriptions,
 
806
                              self.verbosity,
 
807
                              bench_history=self._bench_history,
 
808
                              strict=self._strict,
 
809
                              )
 
810
        # Signal to result objects that look at stop early policy to stop,
 
811
        original_result.stop_early = self.stop_on_failure
 
812
        result = original_result
 
813
        for decorator in self._result_decorators:
 
814
            result = decorator(result)
 
815
            result.stop_early = self.stop_on_failure
 
816
        result.startTestRun()
 
817
        try:
 
818
            test.run(result)
 
819
        finally:
 
820
            result.stopTestRun()
 
821
        # higher level code uses our extended protocol to determine
 
822
        # what exit code to give.
 
823
        return original_result
 
824
 
 
825
 
 
826
def iter_suite_tests(suite):
 
827
    """Return all tests in a suite, recursing through nested suites"""
 
828
    if isinstance(suite, unittest.TestCase):
 
829
        yield suite
 
830
    elif isinstance(suite, unittest.TestSuite):
 
831
        for item in suite:
 
832
            for r in iter_suite_tests(item):
 
833
                yield r
 
834
    else:
 
835
        raise Exception('unknown type %r for object %r'
 
836
                        % (type(suite), suite))
 
837
 
 
838
 
 
839
TestSkipped = testtools.testcase.TestSkipped
 
840
 
 
841
 
 
842
class TestNotApplicable(TestSkipped):
 
843
    """A test is not applicable to the situation where it was run.
 
844
 
 
845
    This is only normally raised by parameterized tests, if they find that
 
846
    the instance they're constructed upon does not support one aspect
 
847
    of its interface.
 
848
    """
 
849
 
 
850
 
 
851
# traceback._some_str fails to format exceptions that have the default
 
852
# __str__ which does an implicit ascii conversion. However, repr() on those
 
853
# objects works, for all that its not quite what the doctor may have ordered.
 
854
def _clever_some_str(value):
 
855
    try:
 
856
        return str(value)
 
857
    except:
 
858
        try:
 
859
            return repr(value).replace('\\n', '\n')
 
860
        except:
 
861
            return '<unprintable %s object>' % type(value).__name__
 
862
 
 
863
traceback._some_str = _clever_some_str
 
864
 
 
865
 
 
866
# deprecated - use self.knownFailure(), or self.expectFailure.
 
867
KnownFailure = testtools.testcase._ExpectedFailure
 
868
 
 
869
 
 
870
class UnavailableFeature(Exception):
 
871
    """A feature required for this test was not available.
 
872
 
 
873
    This can be considered a specialised form of SkippedTest.
 
874
 
 
875
    The feature should be used to construct the exception.
 
876
    """
 
877
 
 
878
 
 
879
class StringIOWrapper(object):
 
880
    """A wrapper around cStringIO which just adds an encoding attribute.
 
881
 
 
882
    Internally we can check sys.stdout to see what the output encoding
 
883
    should be. However, cStringIO has no encoding attribute that we can
 
884
    set. So we wrap it instead.
 
885
    """
 
886
    encoding='ascii'
 
887
    _cstring = None
 
888
 
 
889
    def __init__(self, s=None):
 
890
        if s is not None:
 
891
            self.__dict__['_cstring'] = StringIO(s)
 
892
        else:
 
893
            self.__dict__['_cstring'] = StringIO()
 
894
 
 
895
    def __getattr__(self, name, getattr=getattr):
 
896
        return getattr(self.__dict__['_cstring'], name)
 
897
 
 
898
    def __setattr__(self, name, val):
 
899
        if name == 'encoding':
 
900
            self.__dict__['encoding'] = val
 
901
        else:
 
902
            return setattr(self._cstring, name, val)
 
903
 
 
904
 
 
905
class TestUIFactory(TextUIFactory):
 
906
    """A UI Factory for testing.
 
907
 
 
908
    Hide the progress bar but emit note()s.
 
909
    Redirect stdin.
 
910
    Allows get_password to be tested without real tty attached.
 
911
 
 
912
    See also CannedInputUIFactory which lets you provide programmatic input in
 
913
    a structured way.
 
914
    """
 
915
    # TODO: Capture progress events at the model level and allow them to be
 
916
    # observed by tests that care.
 
917
    #
 
918
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
919
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
920
    # idea of how they're supposed to be different.
 
921
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
922
 
 
923
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
924
        if stdin is not None:
 
925
            # We use a StringIOWrapper to be able to test various
 
926
            # encodings, but the user is still responsible to
 
927
            # encode the string and to set the encoding attribute
 
928
            # of StringIOWrapper.
 
929
            stdin = StringIOWrapper(stdin)
 
930
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
931
 
 
932
    def get_non_echoed_password(self):
 
933
        """Get password from stdin without trying to handle the echo mode"""
 
934
        password = self.stdin.readline()
 
935
        if not password:
 
936
            raise EOFError
 
937
        if password[-1] == '\n':
 
938
            password = password[:-1]
 
939
        return password
 
940
 
 
941
    def make_progress_view(self):
 
942
        return NullProgressView()
 
943
 
 
944
 
 
945
def isolated_doctest_setUp(test):
 
946
    override_os_environ(test)
 
947
 
 
948
 
 
949
def isolated_doctest_tearDown(test):
 
950
    restore_os_environ(test)
 
951
 
 
952
 
 
953
def IsolatedDocTestSuite(*args, **kwargs):
 
954
    """Overrides doctest.DocTestSuite to handle isolation.
 
955
 
 
956
    The method is really a factory and users are expected to use it as such.
 
957
    """
 
958
 
 
959
    kwargs['setUp'] = isolated_doctest_setUp
 
960
    kwargs['tearDown'] = isolated_doctest_tearDown
 
961
    return doctest.DocTestSuite(*args, **kwargs)
 
962
 
 
963
 
 
964
class TestCase(testtools.TestCase):
 
965
    """Base class for bzr unit tests.
 
966
 
 
967
    Tests that need access to disk resources should subclass
 
968
    TestCaseInTempDir not TestCase.
 
969
 
 
970
    Error and debug log messages are redirected from their usual
 
971
    location into a temporary file, the contents of which can be
 
972
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
973
    so that it can also capture file IO.  When the test completes this file
 
974
    is read into memory and removed from disk.
 
975
 
 
976
    There are also convenience functions to invoke bzr's command-line
 
977
    routine, and to build and check bzr trees.
 
978
 
 
979
    In addition to the usual method of overriding tearDown(), this class also
 
980
    allows subclasses to register cleanup functions via addCleanup, which are
 
981
    run in order as the object is torn down.  It's less likely this will be
 
982
    accidentally overlooked.
 
983
    """
 
984
 
 
985
    _log_file = None
 
986
    # record lsprof data when performing benchmark calls.
 
987
    _gather_lsprof_in_benchmarks = False
 
988
 
 
989
    def __init__(self, methodName='testMethod'):
 
990
        super(TestCase, self).__init__(methodName)
 
991
        self._directory_isolation = True
 
992
        self.exception_handlers.insert(0,
 
993
            (UnavailableFeature, self._do_unsupported_or_skip))
 
994
        self.exception_handlers.insert(0,
 
995
            (TestNotApplicable, self._do_not_applicable))
 
996
 
 
997
    def setUp(self):
 
998
        super(TestCase, self).setUp()
 
999
 
 
1000
        timeout = config.GlobalStack().get('selftest.timeout')
 
1001
        if timeout:
 
1002
            timeout_fixture = fixtures.TimeoutFixture(timeout)
 
1003
            timeout_fixture.setUp()
 
1004
            self.addCleanup(timeout_fixture.cleanUp)
 
1005
 
 
1006
        for feature in getattr(self, '_test_needs_features', []):
 
1007
            self.requireFeature(feature)
 
1008
        self._cleanEnvironment()
 
1009
 
 
1010
        if bzrlib.global_state is not None:
 
1011
            self.overrideAttr(bzrlib.global_state, 'cmdline_overrides',
 
1012
                              config.CommandLineStore())
 
1013
 
 
1014
        self._silenceUI()
 
1015
        self._startLogFile()
 
1016
        self._benchcalls = []
 
1017
        self._benchtime = None
 
1018
        self._clear_hooks()
 
1019
        self._track_transports()
 
1020
        self._track_locks()
 
1021
        self._clear_debug_flags()
 
1022
        # Isolate global verbosity level, to make sure it's reproducible
 
1023
        # between tests.  We should get rid of this altogether: bug 656694. --
 
1024
        # mbp 20101008
 
1025
        self.overrideAttr(bzrlib.trace, '_verbosity_level', 0)
 
1026
        # Isolate config option expansion until its default value for bzrlib is
 
1027
        # settled on or a the FIXME associated with _get_expand_default_value
 
1028
        # is addressed -- vila 20110219
 
1029
        self.overrideAttr(config, '_expand_default_value', None)
 
1030
        self._log_files = set()
 
1031
        # Each key in the ``_counters`` dict holds a value for a different
 
1032
        # counter. When the test ends, addDetail() should be used to output the
 
1033
        # counter values. This happens in install_counter_hook().
 
1034
        self._counters = {}
 
1035
        if 'config_stats' in selftest_debug_flags:
 
1036
            self._install_config_stats_hooks()
 
1037
        # Do not use i18n for tests (unless the test reverses this)
 
1038
        i18n.disable_i18n()
 
1039
 
 
1040
    def debug(self):
 
1041
        # debug a frame up.
 
1042
        import pdb
 
1043
        # The sys preserved stdin/stdout should allow blackbox tests debugging
 
1044
        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
 
1045
                ).set_trace(sys._getframe().f_back)
 
1046
 
 
1047
    def discardDetail(self, name):
 
1048
        """Extend the addDetail, getDetails api so we can remove a detail.
 
1049
 
 
1050
        eg. bzr always adds the 'log' detail at startup, but we don't want to
 
1051
        include it for skipped, xfail, etc tests.
 
1052
 
 
1053
        It is safe to call this for a detail that doesn't exist, in case this
 
1054
        gets called multiple times.
 
1055
        """
 
1056
        # We cheat. details is stored in __details which means we shouldn't
 
1057
        # touch it. but getDetails() returns the dict directly, so we can
 
1058
        # mutate it.
 
1059
        details = self.getDetails()
 
1060
        if name in details:
 
1061
            del details[name]
 
1062
 
 
1063
    def install_counter_hook(self, hooks, name, counter_name=None):
 
1064
        """Install a counting hook.
 
1065
 
 
1066
        Any hook can be counted as long as it doesn't need to return a value.
 
1067
 
 
1068
        :param hooks: Where the hook should be installed.
 
1069
 
 
1070
        :param name: The hook name that will be counted.
 
1071
 
 
1072
        :param counter_name: The counter identifier in ``_counters``, defaults
 
1073
            to ``name``.
 
1074
        """
 
1075
        _counters = self._counters # Avoid closing over self
 
1076
        if counter_name is None:
 
1077
            counter_name = name
 
1078
        if _counters.has_key(counter_name):
 
1079
            raise AssertionError('%s is already used as a counter name'
 
1080
                                  % (counter_name,))
 
1081
        _counters[counter_name] = 0
 
1082
        self.addDetail(counter_name, content.Content(content.UTF8_TEXT,
 
1083
            lambda: ['%d' % (_counters[counter_name],)]))
 
1084
        def increment_counter(*args, **kwargs):
 
1085
            _counters[counter_name] += 1
 
1086
        label = 'count %s calls' % (counter_name,)
 
1087
        hooks.install_named_hook(name, increment_counter, label)
 
1088
        self.addCleanup(hooks.uninstall_named_hook, name, label)
 
1089
 
 
1090
    def _install_config_stats_hooks(self):
 
1091
        """Install config hooks to count hook calls.
 
1092
 
 
1093
        """
 
1094
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1095
            self.install_counter_hook(config.ConfigHooks, hook_name,
 
1096
                                       'config.%s' % (hook_name,))
 
1097
 
 
1098
        # The OldConfigHooks are private and need special handling to protect
 
1099
        # against recursive tests (tests that run other tests), so we just do
 
1100
        # manually what registering them into _builtin_known_hooks will provide
 
1101
        # us.
 
1102
        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
 
1103
        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
 
1104
            self.install_counter_hook(config.OldConfigHooks, hook_name,
 
1105
                                      'old_config.%s' % (hook_name,))
 
1106
 
 
1107
    def _clear_debug_flags(self):
 
1108
        """Prevent externally set debug flags affecting tests.
 
1109
 
 
1110
        Tests that want to use debug flags can just set them in the
 
1111
        debug_flags set during setup/teardown.
 
1112
        """
 
1113
        # Start with a copy of the current debug flags we can safely modify.
 
1114
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
1115
        if 'allow_debug' not in selftest_debug_flags:
 
1116
            debug.debug_flags.clear()
 
1117
        if 'disable_lock_checks' not in selftest_debug_flags:
 
1118
            debug.debug_flags.add('strict_locks')
 
1119
 
 
1120
    def _clear_hooks(self):
 
1121
        # prevent hooks affecting tests
 
1122
        known_hooks = hooks.known_hooks
 
1123
        self._preserved_hooks = {}
 
1124
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1125
            current_hooks = getattr(parent, name)
 
1126
            self._preserved_hooks[parent] = (name, current_hooks)
 
1127
        self._preserved_lazy_hooks = hooks._lazy_hooks
 
1128
        hooks._lazy_hooks = {}
 
1129
        self.addCleanup(self._restoreHooks)
 
1130
        for key, (parent, name) in known_hooks.iter_parent_objects():
 
1131
            factory = known_hooks.get(key)
 
1132
            setattr(parent, name, factory())
 
1133
        # this hook should always be installed
 
1134
        request._install_hook()
 
1135
 
 
1136
    def disable_directory_isolation(self):
 
1137
        """Turn off directory isolation checks."""
 
1138
        self._directory_isolation = False
 
1139
 
 
1140
    def enable_directory_isolation(self):
 
1141
        """Enable directory isolation checks."""
 
1142
        self._directory_isolation = True
 
1143
 
 
1144
    def _silenceUI(self):
 
1145
        """Turn off UI for duration of test"""
 
1146
        # by default the UI is off; tests can turn it on if they want it.
 
1147
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
1148
 
 
1149
    def _check_locks(self):
 
1150
        """Check that all lock take/release actions have been paired."""
 
1151
        # We always check for mismatched locks. If a mismatch is found, we
 
1152
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
1153
        # case we just print a warning.
 
1154
        # unhook:
 
1155
        acquired_locks = [lock for action, lock in self._lock_actions
 
1156
                          if action == 'acquired']
 
1157
        released_locks = [lock for action, lock in self._lock_actions
 
1158
                          if action == 'released']
 
1159
        broken_locks = [lock for action, lock in self._lock_actions
 
1160
                        if action == 'broken']
 
1161
        # trivially, given the tests for lock acquistion and release, if we
 
1162
        # have as many in each list, it should be ok. Some lock tests also
 
1163
        # break some locks on purpose and should be taken into account by
 
1164
        # considering that breaking a lock is just a dirty way of releasing it.
 
1165
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
1166
            message = (
 
1167
                'Different number of acquired and '
 
1168
                'released or broken locks.\n'
 
1169
                'acquired=%s\n'
 
1170
                'released=%s\n'
 
1171
                'broken=%s\n' %
 
1172
                (acquired_locks, released_locks, broken_locks))
 
1173
            if not self._lock_check_thorough:
 
1174
                # Rather than fail, just warn
 
1175
                print "Broken test %s: %s" % (self, message)
 
1176
                return
 
1177
            self.fail(message)
 
1178
 
 
1179
    def _track_locks(self):
 
1180
        """Track lock activity during tests."""
 
1181
        self._lock_actions = []
 
1182
        if 'disable_lock_checks' in selftest_debug_flags:
 
1183
            self._lock_check_thorough = False
 
1184
        else:
 
1185
            self._lock_check_thorough = True
 
1186
 
 
1187
        self.addCleanup(self._check_locks)
 
1188
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
1189
                                                self._lock_acquired, None)
 
1190
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
1191
                                                self._lock_released, None)
 
1192
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
1193
                                                self._lock_broken, None)
 
1194
 
 
1195
    def _lock_acquired(self, result):
 
1196
        self._lock_actions.append(('acquired', result))
 
1197
 
 
1198
    def _lock_released(self, result):
 
1199
        self._lock_actions.append(('released', result))
 
1200
 
 
1201
    def _lock_broken(self, result):
 
1202
        self._lock_actions.append(('broken', result))
 
1203
 
 
1204
    def permit_dir(self, name):
 
1205
        """Permit a directory to be used by this test. See permit_url."""
 
1206
        name_transport = _mod_transport.get_transport_from_path(name)
 
1207
        self.permit_url(name)
 
1208
        self.permit_url(name_transport.base)
 
1209
 
 
1210
    def permit_url(self, url):
 
1211
        """Declare that url is an ok url to use in this test.
 
1212
        
 
1213
        Do this for memory transports, temporary test directory etc.
 
1214
        
 
1215
        Do not do this for the current working directory, /tmp, or any other
 
1216
        preexisting non isolated url.
 
1217
        """
 
1218
        if not url.endswith('/'):
 
1219
            url += '/'
 
1220
        self._bzr_selftest_roots.append(url)
 
1221
 
 
1222
    def permit_source_tree_branch_repo(self):
 
1223
        """Permit the source tree bzr is running from to be opened.
 
1224
 
 
1225
        Some code such as bzrlib.version attempts to read from the bzr branch
 
1226
        that bzr is executing from (if any). This method permits that directory
 
1227
        to be used in the test suite.
 
1228
        """
 
1229
        path = self.get_source_path()
 
1230
        self.record_directory_isolation()
 
1231
        try:
 
1232
            try:
 
1233
                workingtree.WorkingTree.open(path)
 
1234
            except (errors.NotBranchError, errors.NoWorkingTree):
 
1235
                raise TestSkipped('Needs a working tree of bzr sources')
 
1236
        finally:
 
1237
            self.enable_directory_isolation()
 
1238
 
 
1239
    def _preopen_isolate_transport(self, transport):
 
1240
        """Check that all transport openings are done in the test work area."""
 
1241
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
1242
            # Unwrap pathfiltered transports
 
1243
            transport = transport.server.backing_transport.clone(
 
1244
                transport._filter('.'))
 
1245
        url = transport.base
 
1246
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1247
        # urls it is given by prepending readonly+. This is appropriate as the
 
1248
        # client shouldn't know that the server is readonly (or not readonly).
 
1249
        # We could register all servers twice, with readonly+ prepending, but
 
1250
        # that makes for a long list; this is about the same but easier to
 
1251
        # read.
 
1252
        if url.startswith('readonly+'):
 
1253
            url = url[len('readonly+'):]
 
1254
        self._preopen_isolate_url(url)
 
1255
 
 
1256
    def _preopen_isolate_url(self, url):
 
1257
        if not self._directory_isolation:
 
1258
            return
 
1259
        if self._directory_isolation == 'record':
 
1260
            self._bzr_selftest_roots.append(url)
 
1261
            return
 
1262
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1263
        # from working unless they are explicitly granted permission. We then
 
1264
        # depend on the code that sets up test transports to check that they are
 
1265
        # appropriately isolated and enable their use by calling
 
1266
        # self.permit_transport()
 
1267
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1268
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1269
                % (url, self._bzr_selftest_roots))
 
1270
 
 
1271
    def record_directory_isolation(self):
 
1272
        """Gather accessed directories to permit later access.
 
1273
        
 
1274
        This is used for tests that access the branch bzr is running from.
 
1275
        """
 
1276
        self._directory_isolation = "record"
 
1277
 
 
1278
    def start_server(self, transport_server, backing_server=None):
 
1279
        """Start transport_server for this test.
 
1280
 
 
1281
        This starts the server, registers a cleanup for it and permits the
 
1282
        server's urls to be used.
 
1283
        """
 
1284
        if backing_server is None:
 
1285
            transport_server.start_server()
 
1286
        else:
 
1287
            transport_server.start_server(backing_server)
 
1288
        self.addCleanup(transport_server.stop_server)
 
1289
        # Obtain a real transport because if the server supplies a password, it
 
1290
        # will be hidden from the base on the client side.
 
1291
        t = _mod_transport.get_transport_from_url(transport_server.get_url())
 
1292
        # Some transport servers effectively chroot the backing transport;
 
1293
        # others like SFTPServer don't - users of the transport can walk up the
 
1294
        # transport to read the entire backing transport. This wouldn't matter
 
1295
        # except that the workdir tests are given - and that they expect the
 
1296
        # server's url to point at - is one directory under the safety net. So
 
1297
        # Branch operations into the transport will attempt to walk up one
 
1298
        # directory. Chrooting all servers would avoid this but also mean that
 
1299
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1300
        # getting the test framework to start the server with a backing server
 
1301
        # at the actual safety net directory would work too, but this then
 
1302
        # means that the self.get_url/self.get_transport methods would need
 
1303
        # to transform all their results. On balance its cleaner to handle it
 
1304
        # here, and permit a higher url when we have one of these transports.
 
1305
        if t.base.endswith('/work/'):
 
1306
            # we have safety net/test root/work
 
1307
            t = t.clone('../..')
 
1308
        elif isinstance(transport_server,
 
1309
                        test_server.SmartTCPServer_for_testing):
 
1310
            # The smart server adds a path similar to work, which is traversed
 
1311
            # up from by the client. But the server is chrooted - the actual
 
1312
            # backing transport is not escaped from, and VFS requests to the
 
1313
            # root will error (because they try to escape the chroot).
 
1314
            t2 = t.clone('..')
 
1315
            while t2.base != t.base:
 
1316
                t = t2
 
1317
                t2 = t.clone('..')
 
1318
        self.permit_url(t.base)
 
1319
 
 
1320
    def _track_transports(self):
 
1321
        """Install checks for transport usage."""
 
1322
        # TestCase has no safe place it can write to.
 
1323
        self._bzr_selftest_roots = []
 
1324
        # Currently the easiest way to be sure that nothing is going on is to
 
1325
        # hook into bzr dir opening. This leaves a small window of error for
 
1326
        # transport tests, but they are well known, and we can improve on this
 
1327
        # step.
 
1328
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1329
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1330
 
 
1331
    def _ndiff_strings(self, a, b):
 
1332
        """Return ndiff between two strings containing lines.
 
1333
 
 
1334
        A trailing newline is added if missing to make the strings
 
1335
        print properly."""
 
1336
        if b and b[-1] != '\n':
 
1337
            b += '\n'
 
1338
        if a and a[-1] != '\n':
 
1339
            a += '\n'
 
1340
        difflines = difflib.ndiff(a.splitlines(True),
 
1341
                                  b.splitlines(True),
 
1342
                                  linejunk=lambda x: False,
 
1343
                                  charjunk=lambda x: False)
 
1344
        return ''.join(difflines)
 
1345
 
 
1346
    def assertEqual(self, a, b, message=''):
 
1347
        try:
 
1348
            if a == b:
 
1349
                return
 
1350
        except UnicodeError, e:
 
1351
            # If we can't compare without getting a UnicodeError, then
 
1352
            # obviously they are different
 
1353
            trace.mutter('UnicodeError: %s', e)
 
1354
        if message:
 
1355
            message += '\n'
 
1356
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1357
            % (message,
 
1358
               pprint.pformat(a), pprint.pformat(b)))
 
1359
 
 
1360
    assertEquals = assertEqual
 
1361
 
 
1362
    def assertEqualDiff(self, a, b, message=None):
 
1363
        """Assert two texts are equal, if not raise an exception.
 
1364
 
 
1365
        This is intended for use with multi-line strings where it can
 
1366
        be hard to find the differences by eye.
 
1367
        """
 
1368
        # TODO: perhaps override assertEquals to call this for strings?
 
1369
        if a == b:
 
1370
            return
 
1371
        if message is None:
 
1372
            message = "texts not equal:\n"
 
1373
        if a + '\n' == b:
 
1374
            message = 'first string is missing a final newline.\n'
 
1375
        if a == b + '\n':
 
1376
            message = 'second string is missing a final newline.\n'
 
1377
        raise AssertionError(message +
 
1378
                             self._ndiff_strings(a, b))
 
1379
 
 
1380
    def assertEqualMode(self, mode, mode_test):
 
1381
        self.assertEqual(mode, mode_test,
 
1382
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1383
 
 
1384
    def assertEqualStat(self, expected, actual):
 
1385
        """assert that expected and actual are the same stat result.
 
1386
 
 
1387
        :param expected: A stat result.
 
1388
        :param actual: A stat result.
 
1389
        :raises AssertionError: If the expected and actual stat values differ
 
1390
            other than by atime.
 
1391
        """
 
1392
        self.assertEqual(expected.st_size, actual.st_size,
 
1393
                         'st_size did not match')
 
1394
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1395
                         'st_mtime did not match')
 
1396
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1397
                         'st_ctime did not match')
 
1398
        if sys.platform == 'win32':
 
1399
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1400
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1401
            # odd. We just force it to always be 0 to avoid any problems.
 
1402
            self.assertEqual(0, expected.st_dev)
 
1403
            self.assertEqual(0, actual.st_dev)
 
1404
            self.assertEqual(0, expected.st_ino)
 
1405
            self.assertEqual(0, actual.st_ino)
 
1406
        else:
 
1407
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1408
                             'st_dev did not match')
 
1409
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1410
                             'st_ino did not match')
 
1411
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1412
                         'st_mode did not match')
 
1413
 
 
1414
    def assertLength(self, length, obj_with_len):
 
1415
        """Assert that obj_with_len is of length length."""
 
1416
        if len(obj_with_len) != length:
 
1417
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1418
                length, len(obj_with_len), obj_with_len))
 
1419
 
 
1420
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1421
        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
 
1422
        """
 
1423
        captured = []
 
1424
        orig_log_exception_quietly = trace.log_exception_quietly
 
1425
        try:
 
1426
            def capture():
 
1427
                orig_log_exception_quietly()
 
1428
                captured.append(sys.exc_info()[1])
 
1429
            trace.log_exception_quietly = capture
 
1430
            func(*args, **kwargs)
 
1431
        finally:
 
1432
            trace.log_exception_quietly = orig_log_exception_quietly
 
1433
        self.assertLength(1, captured)
 
1434
        err = captured[0]
 
1435
        self.assertIsInstance(err, exception_class)
 
1436
        return err
 
1437
 
 
1438
    def assertPositive(self, val):
 
1439
        """Assert that val is greater than 0."""
 
1440
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1441
 
 
1442
    def assertNegative(self, val):
 
1443
        """Assert that val is less than 0."""
 
1444
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1445
 
 
1446
    def assertStartsWith(self, s, prefix):
 
1447
        if not s.startswith(prefix):
 
1448
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1449
 
 
1450
    def assertEndsWith(self, s, suffix):
 
1451
        """Asserts that s ends with suffix."""
 
1452
        if not s.endswith(suffix):
 
1453
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1454
 
 
1455
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1456
        """Assert that a contains something matching a regular expression."""
 
1457
        if not re.search(needle_re, haystack, flags):
 
1458
            if '\n' in haystack or len(haystack) > 60:
 
1459
                # a long string, format it in a more readable way
 
1460
                raise AssertionError(
 
1461
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1462
                        % (needle_re, haystack))
 
1463
            else:
 
1464
                raise AssertionError('pattern "%s" not found in "%s"'
 
1465
                        % (needle_re, haystack))
 
1466
 
 
1467
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1468
        """Assert that a does not match a regular expression"""
 
1469
        if re.search(needle_re, haystack, flags):
 
1470
            raise AssertionError('pattern "%s" found in "%s"'
 
1471
                    % (needle_re, haystack))
 
1472
 
 
1473
    def assertContainsString(self, haystack, needle):
 
1474
        if haystack.find(needle) == -1:
 
1475
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1476
 
 
1477
    def assertNotContainsString(self, haystack, needle):
 
1478
        if haystack.find(needle) != -1:
 
1479
            self.fail("string %r found in '''%s'''" % (needle, haystack))
 
1480
 
 
1481
    def assertSubset(self, sublist, superlist):
 
1482
        """Assert that every entry in sublist is present in superlist."""
 
1483
        missing = set(sublist) - set(superlist)
 
1484
        if len(missing) > 0:
 
1485
            raise AssertionError("value(s) %r not present in container %r" %
 
1486
                                 (missing, superlist))
 
1487
 
 
1488
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1489
        """Fail unless excClass is raised when the iterator from func is used.
 
1490
 
 
1491
        Many functions can return generators this makes sure
 
1492
        to wrap them in a list() call to make sure the whole generator
 
1493
        is run, and that the proper exception is raised.
 
1494
        """
 
1495
        try:
 
1496
            list(func(*args, **kwargs))
 
1497
        except excClass, e:
 
1498
            return e
 
1499
        else:
 
1500
            if getattr(excClass,'__name__', None) is not None:
 
1501
                excName = excClass.__name__
 
1502
            else:
 
1503
                excName = str(excClass)
 
1504
            raise self.failureException, "%s not raised" % excName
 
1505
 
 
1506
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1507
        """Assert that a callable raises a particular exception.
 
1508
 
 
1509
        :param excClass: As for the except statement, this may be either an
 
1510
            exception class, or a tuple of classes.
 
1511
        :param callableObj: A callable, will be passed ``*args`` and
 
1512
            ``**kwargs``.
 
1513
 
 
1514
        Returns the exception so that you can examine it.
 
1515
        """
 
1516
        try:
 
1517
            callableObj(*args, **kwargs)
 
1518
        except excClass, e:
 
1519
            return e
 
1520
        else:
 
1521
            if getattr(excClass,'__name__', None) is not None:
 
1522
                excName = excClass.__name__
 
1523
            else:
 
1524
                # probably a tuple
 
1525
                excName = str(excClass)
 
1526
            raise self.failureException, "%s not raised" % excName
 
1527
 
 
1528
    def assertIs(self, left, right, message=None):
 
1529
        if not (left is right):
 
1530
            if message is not None:
 
1531
                raise AssertionError(message)
 
1532
            else:
 
1533
                raise AssertionError("%r is not %r." % (left, right))
 
1534
 
 
1535
    def assertIsNot(self, left, right, message=None):
 
1536
        if (left is right):
 
1537
            if message is not None:
 
1538
                raise AssertionError(message)
 
1539
            else:
 
1540
                raise AssertionError("%r is %r." % (left, right))
 
1541
 
 
1542
    def assertTransportMode(self, transport, path, mode):
 
1543
        """Fail if a path does not have mode "mode".
 
1544
 
 
1545
        If modes are not supported on this transport, the assertion is ignored.
 
1546
        """
 
1547
        if not transport._can_roundtrip_unix_modebits():
 
1548
            return
 
1549
        path_stat = transport.stat(path)
 
1550
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1551
        self.assertEqual(mode, actual_mode,
 
1552
                         'mode of %r incorrect (%s != %s)'
 
1553
                         % (path, oct(mode), oct(actual_mode)))
 
1554
 
 
1555
    def assertIsSameRealPath(self, path1, path2):
 
1556
        """Fail if path1 and path2 points to different files"""
 
1557
        self.assertEqual(osutils.realpath(path1),
 
1558
                         osutils.realpath(path2),
 
1559
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1560
 
 
1561
    def assertIsInstance(self, obj, kls, msg=None):
 
1562
        """Fail if obj is not an instance of kls
 
1563
        
 
1564
        :param msg: Supplementary message to show if the assertion fails.
 
1565
        """
 
1566
        if not isinstance(obj, kls):
 
1567
            m = "%r is an instance of %s rather than %s" % (
 
1568
                obj, obj.__class__, kls)
 
1569
            if msg:
 
1570
                m += ": " + msg
 
1571
            self.fail(m)
 
1572
 
 
1573
    def assertFileEqual(self, content, path):
 
1574
        """Fail if path does not contain 'content'."""
 
1575
        self.assertPathExists(path)
 
1576
        f = file(path, 'rb')
 
1577
        try:
 
1578
            s = f.read()
 
1579
        finally:
 
1580
            f.close()
 
1581
        self.assertEqualDiff(content, s)
 
1582
 
 
1583
    def assertDocstring(self, expected_docstring, obj):
 
1584
        """Fail if obj does not have expected_docstring"""
 
1585
        if __doc__ is None:
 
1586
            # With -OO the docstring should be None instead
 
1587
            self.assertIs(obj.__doc__, None)
 
1588
        else:
 
1589
            self.assertEqual(expected_docstring, obj.__doc__)
 
1590
 
 
1591
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1592
    def failUnlessExists(self, path):
 
1593
        return self.assertPathExists(path)
 
1594
 
 
1595
    def assertPathExists(self, path):
 
1596
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1597
        if not isinstance(path, basestring):
 
1598
            for p in path:
 
1599
                self.assertPathExists(p)
 
1600
        else:
 
1601
            self.assertTrue(osutils.lexists(path),
 
1602
                path + " does not exist")
 
1603
 
 
1604
    @symbol_versioning.deprecated_method(symbol_versioning.deprecated_in((2, 4)))
 
1605
    def failIfExists(self, path):
 
1606
        return self.assertPathDoesNotExist(path)
 
1607
 
 
1608
    def assertPathDoesNotExist(self, path):
 
1609
        """Fail if path or paths, which may be abs or relative, exist."""
 
1610
        if not isinstance(path, basestring):
 
1611
            for p in path:
 
1612
                self.assertPathDoesNotExist(p)
 
1613
        else:
 
1614
            self.assertFalse(osutils.lexists(path),
 
1615
                path + " exists")
 
1616
 
 
1617
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1618
        """A helper for callDeprecated and applyDeprecated.
 
1619
 
 
1620
        :param a_callable: A callable to call.
 
1621
        :param args: The positional arguments for the callable
 
1622
        :param kwargs: The keyword arguments for the callable
 
1623
        :return: A tuple (warnings, result). result is the result of calling
 
1624
            a_callable(``*args``, ``**kwargs``).
 
1625
        """
 
1626
        local_warnings = []
 
1627
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1628
            # we've hooked into a deprecation specific callpath,
 
1629
            # only deprecations should getting sent via it.
 
1630
            self.assertEqual(cls, DeprecationWarning)
 
1631
            local_warnings.append(msg)
 
1632
        original_warning_method = symbol_versioning.warn
 
1633
        symbol_versioning.set_warning_method(capture_warnings)
 
1634
        try:
 
1635
            result = a_callable(*args, **kwargs)
 
1636
        finally:
 
1637
            symbol_versioning.set_warning_method(original_warning_method)
 
1638
        return (local_warnings, result)
 
1639
 
 
1640
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1641
        """Call a deprecated callable without warning the user.
 
1642
 
 
1643
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1644
        not other callers that go direct to the warning module.
 
1645
 
 
1646
        To test that a deprecated method raises an error, do something like
 
1647
        this (remember that both assertRaises and applyDeprecated delays *args
 
1648
        and **kwargs passing)::
 
1649
 
 
1650
            self.assertRaises(errors.ReservedId,
 
1651
                self.applyDeprecated,
 
1652
                deprecated_in((1, 5, 0)),
 
1653
                br.append_revision,
 
1654
                'current:')
 
1655
 
 
1656
        :param deprecation_format: The deprecation format that the callable
 
1657
            should have been deprecated with. This is the same type as the
 
1658
            parameter to deprecated_method/deprecated_function. If the
 
1659
            callable is not deprecated with this format, an assertion error
 
1660
            will be raised.
 
1661
        :param a_callable: A callable to call. This may be a bound method or
 
1662
            a regular function. It will be called with ``*args`` and
 
1663
            ``**kwargs``.
 
1664
        :param args: The positional arguments for the callable
 
1665
        :param kwargs: The keyword arguments for the callable
 
1666
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1667
        """
 
1668
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1669
            *args, **kwargs)
 
1670
        expected_first_warning = symbol_versioning.deprecation_string(
 
1671
            a_callable, deprecation_format)
 
1672
        if len(call_warnings) == 0:
 
1673
            self.fail("No deprecation warning generated by call to %s" %
 
1674
                a_callable)
 
1675
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1676
        return result
 
1677
 
 
1678
    def callCatchWarnings(self, fn, *args, **kw):
 
1679
        """Call a callable that raises python warnings.
 
1680
 
 
1681
        The caller's responsible for examining the returned warnings.
 
1682
 
 
1683
        If the callable raises an exception, the exception is not
 
1684
        caught and propagates up to the caller.  In that case, the list
 
1685
        of warnings is not available.
 
1686
 
 
1687
        :returns: ([warning_object, ...], fn_result)
 
1688
        """
 
1689
        # XXX: This is not perfect, because it completely overrides the
 
1690
        # warnings filters, and some code may depend on suppressing particular
 
1691
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1692
        # though.  -- Andrew, 20071062
 
1693
        wlist = []
 
1694
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1695
            # despite the name, 'message' is normally(?) a Warning subclass
 
1696
            # instance
 
1697
            wlist.append(message)
 
1698
        saved_showwarning = warnings.showwarning
 
1699
        saved_filters = warnings.filters
 
1700
        try:
 
1701
            warnings.showwarning = _catcher
 
1702
            warnings.filters = []
 
1703
            result = fn(*args, **kw)
 
1704
        finally:
 
1705
            warnings.showwarning = saved_showwarning
 
1706
            warnings.filters = saved_filters
 
1707
        return wlist, result
 
1708
 
 
1709
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1710
        """Assert that a callable is deprecated in a particular way.
 
1711
 
 
1712
        This is a very precise test for unusual requirements. The
 
1713
        applyDeprecated helper function is probably more suited for most tests
 
1714
        as it allows you to simply specify the deprecation format being used
 
1715
        and will ensure that that is issued for the function being called.
 
1716
 
 
1717
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1718
        not other callers that go direct to the warning module.  To catch
 
1719
        general warnings, use callCatchWarnings.
 
1720
 
 
1721
        :param expected: a list of the deprecation warnings expected, in order
 
1722
        :param callable: The callable to call
 
1723
        :param args: The positional arguments for the callable
 
1724
        :param kwargs: The keyword arguments for the callable
 
1725
        """
 
1726
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1727
            *args, **kwargs)
 
1728
        self.assertEqual(expected, call_warnings)
 
1729
        return result
 
1730
 
 
1731
    def _startLogFile(self):
 
1732
        """Setup a in-memory target for bzr and testcase log messages"""
 
1733
        pseudo_log_file = StringIO()
 
1734
        def _get_log_contents_for_weird_testtools_api():
 
1735
            return [pseudo_log_file.getvalue().decode(
 
1736
                "utf-8", "replace").encode("utf-8")]
 
1737
        self.addDetail("log", content.Content(content.ContentType("text",
 
1738
            "plain", {"charset": "utf8"}),
 
1739
            _get_log_contents_for_weird_testtools_api))
 
1740
        self._log_file = pseudo_log_file
 
1741
        self._log_memento = trace.push_log_file(self._log_file)
 
1742
        self.addCleanup(self._finishLogFile)
 
1743
 
 
1744
    def _finishLogFile(self):
 
1745
        """Flush and dereference the in-memory log for this testcase"""
 
1746
        if trace._trace_file:
 
1747
            # flush the log file, to get all content
 
1748
            trace._trace_file.flush()
 
1749
        trace.pop_log_file(self._log_memento)
 
1750
        # The logging module now tracks references for cleanup so discard ours
 
1751
        del self._log_memento
 
1752
 
 
1753
    def thisFailsStrictLockCheck(self):
 
1754
        """It is known that this test would fail with -Dstrict_locks.
 
1755
 
 
1756
        By default, all tests are run with strict lock checking unless
 
1757
        -Edisable_lock_checks is supplied. However there are some tests which
 
1758
        we know fail strict locks at this point that have not been fixed.
 
1759
        They should call this function to disable the strict checking.
 
1760
 
 
1761
        This should be used sparingly, it is much better to fix the locking
 
1762
        issues rather than papering over the problem by calling this function.
 
1763
        """
 
1764
        debug.debug_flags.discard('strict_locks')
 
1765
 
 
1766
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1767
        """Overrides an object attribute restoring it after the test.
 
1768
 
 
1769
        :note: This should be used with discretion; you should think about
 
1770
        whether it's better to make the code testable without monkey-patching.
 
1771
 
 
1772
        :param obj: The object that will be mutated.
 
1773
 
 
1774
        :param attr_name: The attribute name we want to preserve/override in
 
1775
            the object.
 
1776
 
 
1777
        :param new: The optional value we want to set the attribute to.
 
1778
 
 
1779
        :returns: The actual attr value.
 
1780
        """
 
1781
        value = getattr(obj, attr_name)
 
1782
        # The actual value is captured by the call below
 
1783
        self.addCleanup(setattr, obj, attr_name, value)
 
1784
        if new is not _unitialized_attr:
 
1785
            setattr(obj, attr_name, new)
 
1786
        return value
 
1787
 
 
1788
    def overrideEnv(self, name, new):
 
1789
        """Set an environment variable, and reset it after the test.
 
1790
 
 
1791
        :param name: The environment variable name.
 
1792
 
 
1793
        :param new: The value to set the variable to. If None, the 
 
1794
            variable is deleted from the environment.
 
1795
 
 
1796
        :returns: The actual variable value.
 
1797
        """
 
1798
        value = osutils.set_or_unset_env(name, new)
 
1799
        self.addCleanup(osutils.set_or_unset_env, name, value)
 
1800
        return value
 
1801
 
 
1802
    def recordCalls(self, obj, attr_name):
 
1803
        """Monkeypatch in a wrapper that will record calls.
 
1804
 
 
1805
        The monkeypatch is automatically removed when the test concludes.
 
1806
 
 
1807
        :param obj: The namespace holding the reference to be replaced;
 
1808
            typically a module, class, or object.
 
1809
        :param attr_name: A string for the name of the attribute to 
 
1810
            patch.
 
1811
        :returns: A list that will be extended with one item every time the
 
1812
            function is called, with a tuple of (args, kwargs).
 
1813
        """
 
1814
        calls = []
 
1815
 
 
1816
        def decorator(*args, **kwargs):
 
1817
            calls.append((args, kwargs))
 
1818
            return orig(*args, **kwargs)
 
1819
        orig = self.overrideAttr(obj, attr_name, decorator)
 
1820
        return calls
 
1821
 
 
1822
    def _cleanEnvironment(self):
 
1823
        for name, value in isolated_environ.iteritems():
 
1824
            self.overrideEnv(name, value)
 
1825
 
 
1826
    def _restoreHooks(self):
 
1827
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1828
            setattr(klass, name, hooks)
 
1829
        self._preserved_hooks.clear()
 
1830
        bzrlib.hooks._lazy_hooks = self._preserved_lazy_hooks
 
1831
        self._preserved_lazy_hooks.clear()
 
1832
 
 
1833
    def knownFailure(self, reason):
 
1834
        """Declare that this test fails for a known reason
 
1835
 
 
1836
        Tests that are known to fail should generally be using expectedFailure
 
1837
        with an appropriate reverse assertion if a change could cause the test
 
1838
        to start passing. Conversely if the test has no immediate prospect of
 
1839
        succeeding then using skip is more suitable.
 
1840
 
 
1841
        When this method is called while an exception is being handled, that
 
1842
        traceback will be used, otherwise a new exception will be thrown to
 
1843
        provide one but won't be reported.
 
1844
        """
 
1845
        self._add_reason(reason)
 
1846
        try:
 
1847
            exc_info = sys.exc_info()
 
1848
            if exc_info != (None, None, None):
 
1849
                self._report_traceback(exc_info)
 
1850
            else:
 
1851
                try:
 
1852
                    raise self.failureException(reason)
 
1853
                except self.failureException:
 
1854
                    exc_info = sys.exc_info()
 
1855
            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
 
1856
            raise testtools.testcase._ExpectedFailure(exc_info)
 
1857
        finally:
 
1858
            del exc_info
 
1859
 
 
1860
    def _suppress_log(self):
 
1861
        """Remove the log info from details."""
 
1862
        self.discardDetail('log')
 
1863
 
 
1864
    def _do_skip(self, result, reason):
 
1865
        self._suppress_log()
 
1866
        addSkip = getattr(result, 'addSkip', None)
 
1867
        if not callable(addSkip):
 
1868
            result.addSuccess(result)
 
1869
        else:
 
1870
            addSkip(self, reason)
 
1871
 
 
1872
    @staticmethod
 
1873
    def _do_known_failure(self, result, e):
 
1874
        self._suppress_log()
 
1875
        err = sys.exc_info()
 
1876
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1877
        if addExpectedFailure is not None:
 
1878
            addExpectedFailure(self, err)
 
1879
        else:
 
1880
            result.addSuccess(self)
 
1881
 
 
1882
    @staticmethod
 
1883
    def _do_not_applicable(self, result, e):
 
1884
        if not e.args:
 
1885
            reason = 'No reason given'
 
1886
        else:
 
1887
            reason = e.args[0]
 
1888
        self._suppress_log ()
 
1889
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1890
        if addNotApplicable is not None:
 
1891
            result.addNotApplicable(self, reason)
 
1892
        else:
 
1893
            self._do_skip(result, reason)
 
1894
 
 
1895
    @staticmethod
 
1896
    def _report_skip(self, result, err):
 
1897
        """Override the default _report_skip.
 
1898
 
 
1899
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1900
        already been formatted into the 'reason' string, and we can't pull it
 
1901
        out again.
 
1902
        """
 
1903
        self._suppress_log()
 
1904
        super(TestCase, self)._report_skip(self, result, err)
 
1905
 
 
1906
    @staticmethod
 
1907
    def _report_expected_failure(self, result, err):
 
1908
        """Strip the log.
 
1909
 
 
1910
        See _report_skip for motivation.
 
1911
        """
 
1912
        self._suppress_log()
 
1913
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1914
 
 
1915
    @staticmethod
 
1916
    def _do_unsupported_or_skip(self, result, e):
 
1917
        reason = e.args[0]
 
1918
        self._suppress_log()
 
1919
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1920
        if addNotSupported is not None:
 
1921
            result.addNotSupported(self, reason)
 
1922
        else:
 
1923
            self._do_skip(result, reason)
 
1924
 
 
1925
    def time(self, callable, *args, **kwargs):
 
1926
        """Run callable and accrue the time it takes to the benchmark time.
 
1927
 
 
1928
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1929
        this will cause lsprofile statistics to be gathered and stored in
 
1930
        self._benchcalls.
 
1931
        """
 
1932
        if self._benchtime is None:
 
1933
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1934
                "text", "plain"), lambda:[str(self._benchtime)]))
 
1935
            self._benchtime = 0
 
1936
        start = time.time()
 
1937
        try:
 
1938
            if not self._gather_lsprof_in_benchmarks:
 
1939
                return callable(*args, **kwargs)
 
1940
            else:
 
1941
                # record this benchmark
 
1942
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1943
                stats.sort()
 
1944
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1945
                return ret
 
1946
        finally:
 
1947
            self._benchtime += time.time() - start
 
1948
 
 
1949
    def log(self, *args):
 
1950
        trace.mutter(*args)
 
1951
 
 
1952
    def get_log(self):
 
1953
        """Get a unicode string containing the log from bzrlib.trace.
 
1954
 
 
1955
        Undecodable characters are replaced.
 
1956
        """
 
1957
        return u"".join(self.getDetails()['log'].iter_text())
 
1958
 
 
1959
    def requireFeature(self, feature):
 
1960
        """This test requires a specific feature is available.
 
1961
 
 
1962
        :raises UnavailableFeature: When feature is not available.
 
1963
        """
 
1964
        if not feature.available():
 
1965
            raise UnavailableFeature(feature)
 
1966
 
 
1967
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1968
            working_dir):
 
1969
        """Run bazaar command line, splitting up a string command line."""
 
1970
        if isinstance(args, basestring):
 
1971
            # shlex don't understand unicode strings,
 
1972
            # so args should be plain string (bialix 20070906)
 
1973
            args = list(shlex.split(str(args)))
 
1974
        return self._run_bzr_core(args, retcode=retcode,
 
1975
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1976
                )
 
1977
 
 
1978
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1979
            working_dir):
 
1980
        # Clear chk_map page cache, because the contents are likely to mask
 
1981
        # locking errors.
 
1982
        chk_map.clear_cache()
 
1983
        if encoding is None:
 
1984
            encoding = osutils.get_user_encoding()
 
1985
        stdout = StringIOWrapper()
 
1986
        stderr = StringIOWrapper()
 
1987
        stdout.encoding = encoding
 
1988
        stderr.encoding = encoding
 
1989
 
 
1990
        self.log('run bzr: %r', args)
 
1991
        # FIXME: don't call into logging here
 
1992
        handler = logging.StreamHandler(stderr)
 
1993
        handler.setLevel(logging.INFO)
 
1994
        logger = logging.getLogger('')
 
1995
        logger.addHandler(handler)
 
1996
        old_ui_factory = ui.ui_factory
 
1997
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1998
 
 
1999
        cwd = None
 
2000
        if working_dir is not None:
 
2001
            cwd = osutils.getcwd()
 
2002
            os.chdir(working_dir)
 
2003
 
 
2004
        try:
 
2005
            try:
 
2006
                result = self.apply_redirected(
 
2007
                    ui.ui_factory.stdin,
 
2008
                    stdout, stderr,
 
2009
                    _mod_commands.run_bzr_catch_user_errors,
 
2010
                    args)
 
2011
            except KeyboardInterrupt:
 
2012
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
2013
                # and stderr as arguments, for tests which are interested in
 
2014
                # stdout and stderr and are expecting the exception.
 
2015
                out = stdout.getvalue()
 
2016
                err = stderr.getvalue()
 
2017
                if out:
 
2018
                    self.log('output:\n%r', out)
 
2019
                if err:
 
2020
                    self.log('errors:\n%r', err)
 
2021
                raise KeyboardInterrupt(out, err)
 
2022
        finally:
 
2023
            logger.removeHandler(handler)
 
2024
            ui.ui_factory = old_ui_factory
 
2025
            if cwd is not None:
 
2026
                os.chdir(cwd)
 
2027
 
 
2028
        out = stdout.getvalue()
 
2029
        err = stderr.getvalue()
 
2030
        if out:
 
2031
            self.log('output:\n%r', out)
 
2032
        if err:
 
2033
            self.log('errors:\n%r', err)
 
2034
        if retcode is not None:
 
2035
            self.assertEquals(retcode, result,
 
2036
                              message='Unexpected return code')
 
2037
        return result, out, err
 
2038
 
 
2039
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
2040
                working_dir=None, error_regexes=[], output_encoding=None):
 
2041
        """Invoke bzr, as if it were run from the command line.
 
2042
 
 
2043
        The argument list should not include the bzr program name - the
 
2044
        first argument is normally the bzr command.  Arguments may be
 
2045
        passed in three ways:
 
2046
 
 
2047
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
2048
        when the command contains whitespace or metacharacters, or
 
2049
        is built up at run time.
 
2050
 
 
2051
        2- A single string, eg "add a".  This is the most convenient
 
2052
        for hardcoded commands.
 
2053
 
 
2054
        This runs bzr through the interface that catches and reports
 
2055
        errors, and with logging set to something approximating the
 
2056
        default, so that error reporting can be checked.
 
2057
 
 
2058
        This should be the main method for tests that want to exercise the
 
2059
        overall behavior of the bzr application (rather than a unit test
 
2060
        or a functional test of the library.)
 
2061
 
 
2062
        This sends the stdout/stderr results into the test's log,
 
2063
        where it may be useful for debugging.  See also run_captured.
 
2064
 
 
2065
        :keyword stdin: A string to be used as stdin for the command.
 
2066
        :keyword retcode: The status code the command should return;
 
2067
            default 0.
 
2068
        :keyword working_dir: The directory to run the command in
 
2069
        :keyword error_regexes: A list of expected error messages.  If
 
2070
            specified they must be seen in the error output of the command.
 
2071
        """
 
2072
        retcode, out, err = self._run_bzr_autosplit(
 
2073
            args=args,
 
2074
            retcode=retcode,
 
2075
            encoding=encoding,
 
2076
            stdin=stdin,
 
2077
            working_dir=working_dir,
 
2078
            )
 
2079
        self.assertIsInstance(error_regexes, (list, tuple))
 
2080
        for regex in error_regexes:
 
2081
            self.assertContainsRe(err, regex)
 
2082
        return out, err
 
2083
 
 
2084
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
2085
        """Run bzr, and check that stderr contains the supplied regexes
 
2086
 
 
2087
        :param error_regexes: Sequence of regular expressions which
 
2088
            must each be found in the error output. The relative ordering
 
2089
            is not enforced.
 
2090
        :param args: command-line arguments for bzr
 
2091
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
2092
            This function changes the default value of retcode to be 3,
 
2093
            since in most cases this is run when you expect bzr to fail.
 
2094
 
 
2095
        :return: (out, err) The actual output of running the command (in case
 
2096
            you want to do more inspection)
 
2097
 
 
2098
        Examples of use::
 
2099
 
 
2100
            # Make sure that commit is failing because there is nothing to do
 
2101
            self.run_bzr_error(['no changes to commit'],
 
2102
                               ['commit', '-m', 'my commit comment'])
 
2103
            # Make sure --strict is handling an unknown file, rather than
 
2104
            # giving us the 'nothing to do' error
 
2105
            self.build_tree(['unknown'])
 
2106
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
2107
                               ['commit', --strict', '-m', 'my commit comment'])
 
2108
        """
 
2109
        kwargs.setdefault('retcode', 3)
 
2110
        kwargs['error_regexes'] = error_regexes
 
2111
        out, err = self.run_bzr(*args, **kwargs)
 
2112
        return out, err
 
2113
 
 
2114
    def run_bzr_subprocess(self, *args, **kwargs):
 
2115
        """Run bzr in a subprocess for testing.
 
2116
 
 
2117
        This starts a new Python interpreter and runs bzr in there.
 
2118
        This should only be used for tests that have a justifiable need for
 
2119
        this isolation: e.g. they are testing startup time, or signal
 
2120
        handling, or early startup code, etc.  Subprocess code can't be
 
2121
        profiled or debugged so easily.
 
2122
 
 
2123
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
2124
            None is supplied, the status code is not checked.
 
2125
        :keyword env_changes: A dictionary which lists changes to environment
 
2126
            variables. A value of None will unset the env variable.
 
2127
            The values must be strings. The change will only occur in the
 
2128
            child, so you don't need to fix the environment after running.
 
2129
        :keyword universal_newlines: Convert CRLF => LF
 
2130
        :keyword allow_plugins: By default the subprocess is run with
 
2131
            --no-plugins to ensure test reproducibility. Also, it is possible
 
2132
            for system-wide plugins to create unexpected output on stderr,
 
2133
            which can cause unnecessary test failures.
 
2134
        """
 
2135
        env_changes = kwargs.get('env_changes', {})
 
2136
        working_dir = kwargs.get('working_dir', None)
 
2137
        allow_plugins = kwargs.get('allow_plugins', False)
 
2138
        if len(args) == 1:
 
2139
            if isinstance(args[0], list):
 
2140
                args = args[0]
 
2141
            elif isinstance(args[0], basestring):
 
2142
                args = list(shlex.split(args[0]))
 
2143
        else:
 
2144
            raise ValueError("passing varargs to run_bzr_subprocess")
 
2145
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
2146
                                            working_dir=working_dir,
 
2147
                                            allow_plugins=allow_plugins)
 
2148
        # We distinguish between retcode=None and retcode not passed.
 
2149
        supplied_retcode = kwargs.get('retcode', 0)
 
2150
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
2151
            universal_newlines=kwargs.get('universal_newlines', False),
 
2152
            process_args=args)
 
2153
 
 
2154
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2155
                             skip_if_plan_to_signal=False,
 
2156
                             working_dir=None,
 
2157
                             allow_plugins=False, stderr=subprocess.PIPE):
 
2158
        """Start bzr in a subprocess for testing.
 
2159
 
 
2160
        This starts a new Python interpreter and runs bzr in there.
 
2161
        This should only be used for tests that have a justifiable need for
 
2162
        this isolation: e.g. they are testing startup time, or signal
 
2163
        handling, or early startup code, etc.  Subprocess code can't be
 
2164
        profiled or debugged so easily.
 
2165
 
 
2166
        :param process_args: a list of arguments to pass to the bzr executable,
 
2167
            for example ``['--version']``.
 
2168
        :param env_changes: A dictionary which lists changes to environment
 
2169
            variables. A value of None will unset the env variable.
 
2170
            The values must be strings. The change will only occur in the
 
2171
            child, so you don't need to fix the environment after running.
 
2172
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
2173
            doesn't support signalling subprocesses.
 
2174
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2175
        :param stderr: file to use for the subprocess's stderr.  Valid values
 
2176
            are those valid for the stderr argument of `subprocess.Popen`.
 
2177
            Default value is ``subprocess.PIPE``.
 
2178
 
 
2179
        :returns: Popen object for the started process.
 
2180
        """
 
2181
        if skip_if_plan_to_signal:
 
2182
            if os.name != "posix":
 
2183
                raise TestSkipped("Sending signals not supported")
 
2184
 
 
2185
        if env_changes is None:
 
2186
            env_changes = {}
 
2187
        old_env = {}
 
2188
 
 
2189
        def cleanup_environment():
 
2190
            for env_var, value in env_changes.iteritems():
 
2191
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2192
 
 
2193
        def restore_environment():
 
2194
            for env_var, value in old_env.iteritems():
 
2195
                osutils.set_or_unset_env(env_var, value)
 
2196
 
 
2197
        bzr_path = self.get_bzr_path()
 
2198
 
 
2199
        cwd = None
 
2200
        if working_dir is not None:
 
2201
            cwd = osutils.getcwd()
 
2202
            os.chdir(working_dir)
 
2203
 
 
2204
        try:
 
2205
            # win32 subprocess doesn't support preexec_fn
 
2206
            # so we will avoid using it on all platforms, just to
 
2207
            # make sure the code path is used, and we don't break on win32
 
2208
            cleanup_environment()
 
2209
            # Include the subprocess's log file in the test details, in case
 
2210
            # the test fails due to an error in the subprocess.
 
2211
            self._add_subprocess_log(trace._get_bzr_log_filename())
 
2212
            command = [sys.executable]
 
2213
            # frozen executables don't need the path to bzr
 
2214
            if getattr(sys, "frozen", None) is None:
 
2215
                command.append(bzr_path)
 
2216
            if not allow_plugins:
 
2217
                command.append('--no-plugins')
 
2218
            command.extend(process_args)
 
2219
            process = self._popen(command, stdin=subprocess.PIPE,
 
2220
                                  stdout=subprocess.PIPE,
 
2221
                                  stderr=stderr)
 
2222
        finally:
 
2223
            restore_environment()
 
2224
            if cwd is not None:
 
2225
                os.chdir(cwd)
 
2226
 
 
2227
        return process
 
2228
 
 
2229
    def _add_subprocess_log(self, log_file_path):
 
2230
        if len(self._log_files) == 0:
 
2231
            # Register an addCleanup func.  We do this on the first call to
 
2232
            # _add_subprocess_log rather than in TestCase.setUp so that this
 
2233
            # addCleanup is registered after any cleanups for tempdirs that
 
2234
            # subclasses might create, which will probably remove the log file
 
2235
            # we want to read.
 
2236
            self.addCleanup(self._subprocess_log_cleanup)
 
2237
        # self._log_files is a set, so if a log file is reused we won't grab it
 
2238
        # twice.
 
2239
        self._log_files.add(log_file_path)
 
2240
 
 
2241
    def _subprocess_log_cleanup(self):
 
2242
        for count, log_file_path in enumerate(self._log_files):
 
2243
            # We use buffer_now=True to avoid holding the file open beyond
 
2244
            # the life of this function, which might interfere with e.g.
 
2245
            # cleaning tempdirs on Windows.
 
2246
            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
 
2247
            #detail_content = content.content_from_file(
 
2248
            #    log_file_path, buffer_now=True)
 
2249
            with open(log_file_path, 'rb') as log_file:
 
2250
                log_file_bytes = log_file.read()
 
2251
            detail_content = content.Content(content.ContentType("text",
 
2252
                "plain", {"charset": "utf8"}), lambda: [log_file_bytes])
 
2253
            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
 
2254
                detail_content)
 
2255
 
 
2256
    def _popen(self, *args, **kwargs):
 
2257
        """Place a call to Popen.
 
2258
 
 
2259
        Allows tests to override this method to intercept the calls made to
 
2260
        Popen for introspection.
 
2261
        """
 
2262
        return subprocess.Popen(*args, **kwargs)
 
2263
 
 
2264
    def get_source_path(self):
 
2265
        """Return the path of the directory containing bzrlib."""
 
2266
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2267
 
 
2268
    def get_bzr_path(self):
 
2269
        """Return the path of the 'bzr' executable for this test suite."""
 
2270
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
2271
        if not os.path.isfile(bzr_path):
 
2272
            # We are probably installed. Assume sys.argv is the right file
 
2273
            bzr_path = sys.argv[0]
 
2274
        return bzr_path
 
2275
 
 
2276
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2277
                              universal_newlines=False, process_args=None):
 
2278
        """Finish the execution of process.
 
2279
 
 
2280
        :param process: the Popen object returned from start_bzr_subprocess.
 
2281
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2282
            None is supplied, the status code is not checked.
 
2283
        :param send_signal: an optional signal to send to the process.
 
2284
        :param universal_newlines: Convert CRLF => LF
 
2285
        :returns: (stdout, stderr)
 
2286
        """
 
2287
        if send_signal is not None:
 
2288
            os.kill(process.pid, send_signal)
 
2289
        out, err = process.communicate()
 
2290
 
 
2291
        if universal_newlines:
 
2292
            out = out.replace('\r\n', '\n')
 
2293
            err = err.replace('\r\n', '\n')
 
2294
 
 
2295
        if retcode is not None and retcode != process.returncode:
 
2296
            if process_args is None:
 
2297
                process_args = "(unknown args)"
 
2298
            trace.mutter('Output of bzr %s:\n%s', process_args, out)
 
2299
            trace.mutter('Error for bzr %s:\n%s', process_args, err)
 
2300
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2301
                      % (process_args, retcode, process.returncode))
 
2302
        return [out, err]
 
2303
 
 
2304
    def check_tree_shape(self, tree, shape):
 
2305
        """Compare a tree to a list of expected names.
 
2306
 
 
2307
        Fail if they are not precisely equal.
 
2308
        """
 
2309
        extras = []
 
2310
        shape = list(shape)             # copy
 
2311
        for path, ie in tree.iter_entries_by_dir():
 
2312
            name = path.replace('\\', '/')
 
2313
            if ie.kind == 'directory':
 
2314
                name = name + '/'
 
2315
            if name == "/":
 
2316
                pass # ignore root entry
 
2317
            elif name in shape:
 
2318
                shape.remove(name)
 
2319
            else:
 
2320
                extras.append(name)
 
2321
        if shape:
 
2322
            self.fail("expected paths not found in inventory: %r" % shape)
 
2323
        if extras:
 
2324
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2325
 
 
2326
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2327
                         a_callable=None, *args, **kwargs):
 
2328
        """Call callable with redirected std io pipes.
 
2329
 
 
2330
        Returns the return code."""
 
2331
        if not callable(a_callable):
 
2332
            raise ValueError("a_callable must be callable.")
 
2333
        if stdin is None:
 
2334
            stdin = StringIO("")
 
2335
        if stdout is None:
 
2336
            if getattr(self, "_log_file", None) is not None:
 
2337
                stdout = self._log_file
 
2338
            else:
 
2339
                stdout = StringIO()
 
2340
        if stderr is None:
 
2341
            if getattr(self, "_log_file", None is not None):
 
2342
                stderr = self._log_file
 
2343
            else:
 
2344
                stderr = StringIO()
 
2345
        real_stdin = sys.stdin
 
2346
        real_stdout = sys.stdout
 
2347
        real_stderr = sys.stderr
 
2348
        try:
 
2349
            sys.stdout = stdout
 
2350
            sys.stderr = stderr
 
2351
            sys.stdin = stdin
 
2352
            return a_callable(*args, **kwargs)
 
2353
        finally:
 
2354
            sys.stdout = real_stdout
 
2355
            sys.stderr = real_stderr
 
2356
            sys.stdin = real_stdin
 
2357
 
 
2358
    def reduceLockdirTimeout(self):
 
2359
        """Reduce the default lock timeout for the duration of the test, so that
 
2360
        if LockContention occurs during a test, it does so quickly.
 
2361
 
 
2362
        Tests that expect to provoke LockContention errors should call this.
 
2363
        """
 
2364
        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2365
 
 
2366
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2367
        """Return a StringIOWrapper instance, that will encode Unicode
 
2368
        input to UTF-8.
 
2369
        """
 
2370
        if encoding_type is None:
 
2371
            encoding_type = 'strict'
 
2372
        sio = StringIO()
 
2373
        output_encoding = 'utf-8'
 
2374
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2375
        sio.encoding = output_encoding
 
2376
        return sio
 
2377
 
 
2378
    def disable_verb(self, verb):
 
2379
        """Disable a smart server verb for one test."""
 
2380
        from bzrlib.smart import request
 
2381
        request_handlers = request.request_handlers
 
2382
        orig_method = request_handlers.get(verb)
 
2383
        orig_info = request_handlers.get_info(verb)
 
2384
        request_handlers.remove(verb)
 
2385
        self.addCleanup(request_handlers.register, verb, orig_method,
 
2386
            info=orig_info)
 
2387
 
 
2388
 
 
2389
class CapturedCall(object):
 
2390
    """A helper for capturing smart server calls for easy debug analysis."""
 
2391
 
 
2392
    def __init__(self, params, prefix_length):
 
2393
        """Capture the call with params and skip prefix_length stack frames."""
 
2394
        self.call = params
 
2395
        import traceback
 
2396
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2397
        # client frames. Beyond this we could get more clever, but this is good
 
2398
        # enough for now.
 
2399
        stack = traceback.extract_stack()[prefix_length:-5]
 
2400
        self.stack = ''.join(traceback.format_list(stack))
 
2401
 
 
2402
    def __str__(self):
 
2403
        return self.call.method
 
2404
 
 
2405
    def __repr__(self):
 
2406
        return self.call.method
 
2407
 
 
2408
    def stack(self):
 
2409
        return self.stack
 
2410
 
 
2411
 
 
2412
class TestCaseWithMemoryTransport(TestCase):
 
2413
    """Common test class for tests that do not need disk resources.
 
2414
 
 
2415
    Tests that need disk resources should derive from TestCaseInTempDir
 
2416
    orTestCaseWithTransport.
 
2417
 
 
2418
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2419
 
 
2420
    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
 
2421
    a directory which does not exist. This serves to help ensure test isolation
 
2422
    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
 
2423
    must exist. However, TestCaseWithMemoryTransport does not offer local file
 
2424
    defaults for the transport in tests, nor does it obey the command line
 
2425
    override, so tests that accidentally write to the common directory should
 
2426
    be rare.
 
2427
 
 
2428
    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
 
2429
        ``.bzr`` directory that stops us ascending higher into the filesystem.
 
2430
    """
 
2431
 
 
2432
    TEST_ROOT = None
 
2433
    _TEST_NAME = 'test'
 
2434
 
 
2435
    def __init__(self, methodName='runTest'):
 
2436
        # allow test parameterization after test construction and before test
 
2437
        # execution. Variables that the parameterizer sets need to be
 
2438
        # ones that are not set by setUp, or setUp will trash them.
 
2439
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2440
        self.vfs_transport_factory = default_transport
 
2441
        self.transport_server = None
 
2442
        self.transport_readonly_server = None
 
2443
        self.__vfs_server = None
 
2444
 
 
2445
    def get_transport(self, relpath=None):
 
2446
        """Return a writeable transport.
 
2447
 
 
2448
        This transport is for the test scratch space relative to
 
2449
        "self._test_root"
 
2450
 
 
2451
        :param relpath: a path relative to the base url.
 
2452
        """
 
2453
        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
 
2454
        self.assertFalse(t.is_readonly())
 
2455
        return t
 
2456
 
 
2457
    def get_readonly_transport(self, relpath=None):
 
2458
        """Return a readonly transport for the test scratch space
 
2459
 
 
2460
        This can be used to test that operations which should only need
 
2461
        readonly access in fact do not try to write.
 
2462
 
 
2463
        :param relpath: a path relative to the base url.
 
2464
        """
 
2465
        t = _mod_transport.get_transport_from_url(
 
2466
            self.get_readonly_url(relpath))
 
2467
        self.assertTrue(t.is_readonly())
 
2468
        return t
 
2469
 
 
2470
    def create_transport_readonly_server(self):
 
2471
        """Create a transport server from class defined at init.
 
2472
 
 
2473
        This is mostly a hook for daughter classes.
 
2474
        """
 
2475
        return self.transport_readonly_server()
 
2476
 
 
2477
    def get_readonly_server(self):
 
2478
        """Get the server instance for the readonly transport
 
2479
 
 
2480
        This is useful for some tests with specific servers to do diagnostics.
 
2481
        """
 
2482
        if self.__readonly_server is None:
 
2483
            if self.transport_readonly_server is None:
 
2484
                # readonly decorator requested
 
2485
                self.__readonly_server = test_server.ReadonlyServer()
 
2486
            else:
 
2487
                # explicit readonly transport.
 
2488
                self.__readonly_server = self.create_transport_readonly_server()
 
2489
            self.start_server(self.__readonly_server,
 
2490
                self.get_vfs_only_server())
 
2491
        return self.__readonly_server
 
2492
 
 
2493
    def get_readonly_url(self, relpath=None):
 
2494
        """Get a URL for the readonly transport.
 
2495
 
 
2496
        This will either be backed by '.' or a decorator to the transport
 
2497
        used by self.get_url()
 
2498
        relpath provides for clients to get a path relative to the base url.
 
2499
        These should only be downwards relative, not upwards.
 
2500
        """
 
2501
        base = self.get_readonly_server().get_url()
 
2502
        return self._adjust_url(base, relpath)
 
2503
 
 
2504
    def get_vfs_only_server(self):
 
2505
        """Get the vfs only read/write server instance.
 
2506
 
 
2507
        This is useful for some tests with specific servers that need
 
2508
        diagnostics.
 
2509
 
 
2510
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2511
        is no means to override it.
 
2512
        """
 
2513
        if self.__vfs_server is None:
 
2514
            self.__vfs_server = memory.MemoryServer()
 
2515
            self.start_server(self.__vfs_server)
 
2516
        return self.__vfs_server
 
2517
 
 
2518
    def get_server(self):
 
2519
        """Get the read/write server instance.
 
2520
 
 
2521
        This is useful for some tests with specific servers that need
 
2522
        diagnostics.
 
2523
 
 
2524
        This is built from the self.transport_server factory. If that is None,
 
2525
        then the self.get_vfs_server is returned.
 
2526
        """
 
2527
        if self.__server is None:
 
2528
            if (self.transport_server is None or self.transport_server is
 
2529
                self.vfs_transport_factory):
 
2530
                self.__server = self.get_vfs_only_server()
 
2531
            else:
 
2532
                # bring up a decorated means of access to the vfs only server.
 
2533
                self.__server = self.transport_server()
 
2534
                self.start_server(self.__server, self.get_vfs_only_server())
 
2535
        return self.__server
 
2536
 
 
2537
    def _adjust_url(self, base, relpath):
 
2538
        """Get a URL (or maybe a path) for the readwrite transport.
 
2539
 
 
2540
        This will either be backed by '.' or to an equivalent non-file based
 
2541
        facility.
 
2542
        relpath provides for clients to get a path relative to the base url.
 
2543
        These should only be downwards relative, not upwards.
 
2544
        """
 
2545
        if relpath is not None and relpath != '.':
 
2546
            if not base.endswith('/'):
 
2547
                base = base + '/'
 
2548
            # XXX: Really base should be a url; we did after all call
 
2549
            # get_url()!  But sometimes it's just a path (from
 
2550
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2551
            # to a non-escaped local path.
 
2552
            if base.startswith('./') or base.startswith('/'):
 
2553
                base += relpath
 
2554
            else:
 
2555
                base += urlutils.escape(relpath)
 
2556
        return base
 
2557
 
 
2558
    def get_url(self, relpath=None):
 
2559
        """Get a URL (or maybe a path) for the readwrite transport.
 
2560
 
 
2561
        This will either be backed by '.' or to an equivalent non-file based
 
2562
        facility.
 
2563
        relpath provides for clients to get a path relative to the base url.
 
2564
        These should only be downwards relative, not upwards.
 
2565
        """
 
2566
        base = self.get_server().get_url()
 
2567
        return self._adjust_url(base, relpath)
 
2568
 
 
2569
    def get_vfs_only_url(self, relpath=None):
 
2570
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2571
 
 
2572
        This will never be a smart protocol.  It always has all the
 
2573
        capabilities of the local filesystem, but it might actually be a
 
2574
        MemoryTransport or some other similar virtual filesystem.
 
2575
 
 
2576
        This is the backing transport (if any) of the server returned by
 
2577
        get_url and get_readonly_url.
 
2578
 
 
2579
        :param relpath: provides for clients to get a path relative to the base
 
2580
            url.  These should only be downwards relative, not upwards.
 
2581
        :return: A URL
 
2582
        """
 
2583
        base = self.get_vfs_only_server().get_url()
 
2584
        return self._adjust_url(base, relpath)
 
2585
 
 
2586
    def _create_safety_net(self):
 
2587
        """Make a fake bzr directory.
 
2588
 
 
2589
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2590
        real branch.
 
2591
        """
 
2592
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2593
        # Make sure we get a readable and accessible home for .bzr.log
 
2594
        # and/or config files, and not fallback to weird defaults (see
 
2595
        # http://pad.lv/825027).
 
2596
        self.assertIs(None, os.environ.get('BZR_HOME', None))
 
2597
        os.environ['BZR_HOME'] = root
 
2598
        wt = bzrdir.BzrDir.create_standalone_workingtree(root)
 
2599
        del os.environ['BZR_HOME']
 
2600
        # Hack for speed: remember the raw bytes of the dirstate file so that
 
2601
        # we don't need to re-open the wt to check it hasn't changed.
 
2602
        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
 
2603
            wt.control_transport.get_bytes('dirstate'))
 
2604
 
 
2605
    def _check_safety_net(self):
 
2606
        """Check that the safety .bzr directory have not been touched.
 
2607
 
 
2608
        _make_test_root have created a .bzr directory to prevent tests from
 
2609
        propagating. This method ensures than a test did not leaked.
 
2610
        """
 
2611
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2612
        t = _mod_transport.get_transport_from_path(root)
 
2613
        self.permit_url(t.base)
 
2614
        if (t.get_bytes('.bzr/checkout/dirstate') != 
 
2615
                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
 
2616
            # The current test have modified the /bzr directory, we need to
 
2617
            # recreate a new one or all the followng tests will fail.
 
2618
            # If you need to inspect its content uncomment the following line
 
2619
            # import pdb; pdb.set_trace()
 
2620
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2621
            self._create_safety_net()
 
2622
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2623
 
 
2624
    def _make_test_root(self):
 
2625
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2626
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2627
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2628
                                                    suffix='.tmp'))
 
2629
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2630
 
 
2631
            self._create_safety_net()
 
2632
 
 
2633
            # The same directory is used by all tests, and we're not
 
2634
            # specifically told when all tests are finished.  This will do.
 
2635
            atexit.register(_rmtree_temp_dir, root)
 
2636
 
 
2637
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2638
        self.addCleanup(self._check_safety_net)
 
2639
 
 
2640
    def makeAndChdirToTestDir(self):
 
2641
        """Create a temporary directories for this one test.
 
2642
 
 
2643
        This must set self.test_home_dir and self.test_dir and chdir to
 
2644
        self.test_dir.
 
2645
 
 
2646
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2647
        """
 
2648
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2649
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2650
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2651
        self.permit_dir(self.test_dir)
 
2652
 
 
2653
    def make_branch(self, relpath, format=None):
 
2654
        """Create a branch on the transport at relpath."""
 
2655
        repo = self.make_repository(relpath, format=format)
 
2656
        return repo.bzrdir.create_branch(append_revisions_only=False)
 
2657
 
 
2658
    def get_default_format(self):
 
2659
        return 'default'
 
2660
 
 
2661
    def resolve_format(self, format):
 
2662
        """Resolve an object to a ControlDir format object.
 
2663
 
 
2664
        The initial format object can either already be
 
2665
        a ControlDirFormat, None (for the default format),
 
2666
        or a string with the name of the control dir format.
 
2667
 
 
2668
        :param format: Object to resolve
 
2669
        :return A ControlDirFormat instance
 
2670
        """
 
2671
        if format is None:
 
2672
            format = self.get_default_format()
 
2673
        if isinstance(format, basestring):
 
2674
            format = bzrdir.format_registry.make_bzrdir(format)
 
2675
        return format
 
2676
 
 
2677
    def make_bzrdir(self, relpath, format=None):
 
2678
        try:
 
2679
            # might be a relative or absolute path
 
2680
            maybe_a_url = self.get_url(relpath)
 
2681
            segments = maybe_a_url.rsplit('/', 1)
 
2682
            t = _mod_transport.get_transport(maybe_a_url)
 
2683
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2684
                t.ensure_base()
 
2685
            format = self.resolve_format(format)
 
2686
            return format.initialize_on_transport(t)
 
2687
        except errors.UninitializableFormat:
 
2688
            raise TestSkipped("Format %s is not initializable." % format)
 
2689
 
 
2690
    def make_repository(self, relpath, shared=None, format=None):
 
2691
        """Create a repository on our default transport at relpath.
 
2692
 
 
2693
        Note that relpath must be a relative path, not a full url.
 
2694
        """
 
2695
        # FIXME: If you create a remoterepository this returns the underlying
 
2696
        # real format, which is incorrect.  Actually we should make sure that
 
2697
        # RemoteBzrDir returns a RemoteRepository.
 
2698
        # maybe  mbp 20070410
 
2699
        made_control = self.make_bzrdir(relpath, format=format)
 
2700
        return made_control.create_repository(shared=shared)
 
2701
 
 
2702
    def make_smart_server(self, path, backing_server=None):
 
2703
        if backing_server is None:
 
2704
            backing_server = self.get_server()
 
2705
        smart_server = test_server.SmartTCPServer_for_testing()
 
2706
        self.start_server(smart_server, backing_server)
 
2707
        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
 
2708
                                                   ).clone(path)
 
2709
        return remote_transport
 
2710
 
 
2711
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2712
        """Create a branch on the default transport and a MemoryTree for it."""
 
2713
        b = self.make_branch(relpath, format=format)
 
2714
        return memorytree.MemoryTree.create_on_branch(b)
 
2715
 
 
2716
    def make_branch_builder(self, relpath, format=None):
 
2717
        branch = self.make_branch(relpath, format=format)
 
2718
        return branchbuilder.BranchBuilder(branch=branch)
 
2719
 
 
2720
    def overrideEnvironmentForTesting(self):
 
2721
        test_home_dir = self.test_home_dir
 
2722
        if isinstance(test_home_dir, unicode):
 
2723
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2724
        self.overrideEnv('HOME', test_home_dir)
 
2725
        self.overrideEnv('BZR_HOME', test_home_dir)
 
2726
 
 
2727
    def setUp(self):
 
2728
        super(TestCaseWithMemoryTransport, self).setUp()
 
2729
        # Ensure that ConnectedTransport doesn't leak sockets
 
2730
        def get_transport_from_url_with_cleanup(*args, **kwargs):
 
2731
            t = orig_get_transport_from_url(*args, **kwargs)
 
2732
            if isinstance(t, _mod_transport.ConnectedTransport):
 
2733
                self.addCleanup(t.disconnect)
 
2734
            return t
 
2735
 
 
2736
        orig_get_transport_from_url = self.overrideAttr(
 
2737
            _mod_transport, 'get_transport_from_url',
 
2738
            get_transport_from_url_with_cleanup)
 
2739
        self._make_test_root()
 
2740
        self.addCleanup(os.chdir, os.getcwdu())
 
2741
        self.makeAndChdirToTestDir()
 
2742
        self.overrideEnvironmentForTesting()
 
2743
        self.__readonly_server = None
 
2744
        self.__server = None
 
2745
        self.reduceLockdirTimeout()
 
2746
 
 
2747
    def setup_smart_server_with_call_log(self):
 
2748
        """Sets up a smart server as the transport server with a call log."""
 
2749
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2750
        self.hpss_calls = []
 
2751
        import traceback
 
2752
        # Skip the current stack down to the caller of
 
2753
        # setup_smart_server_with_call_log
 
2754
        prefix_length = len(traceback.extract_stack()) - 2
 
2755
        def capture_hpss_call(params):
 
2756
            self.hpss_calls.append(
 
2757
                CapturedCall(params, prefix_length))
 
2758
        client._SmartClient.hooks.install_named_hook(
 
2759
            'call', capture_hpss_call, None)
 
2760
 
 
2761
    def reset_smart_call_log(self):
 
2762
        self.hpss_calls = []
 
2763
 
 
2764
 
 
2765
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2766
    """Derived class that runs a test within a temporary directory.
 
2767
 
 
2768
    This is useful for tests that need to create a branch, etc.
 
2769
 
 
2770
    The directory is created in a slightly complex way: for each
 
2771
    Python invocation, a new temporary top-level directory is created.
 
2772
    All test cases create their own directory within that.  If the
 
2773
    tests complete successfully, the directory is removed.
 
2774
 
 
2775
    :ivar test_base_dir: The path of the top-level directory for this
 
2776
    test, which contains a home directory and a work directory.
 
2777
 
 
2778
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2779
    which is used as $HOME for this test.
 
2780
 
 
2781
    :ivar test_dir: A directory under test_base_dir used as the current
 
2782
    directory when the test proper is run.
 
2783
    """
 
2784
 
 
2785
    OVERRIDE_PYTHON = 'python'
 
2786
 
 
2787
    def setUp(self):
 
2788
        super(TestCaseInTempDir, self).setUp()
 
2789
        # Remove the protection set in isolated_environ, we have a proper
 
2790
        # access to disk resources now.
 
2791
        self.overrideEnv('BZR_LOG', None)
 
2792
 
 
2793
    def check_file_contents(self, filename, expect):
 
2794
        self.log("check contents of file %s" % filename)
 
2795
        f = file(filename)
 
2796
        try:
 
2797
            contents = f.read()
 
2798
        finally:
 
2799
            f.close()
 
2800
        if contents != expect:
 
2801
            self.log("expected: %r" % expect)
 
2802
            self.log("actually: %r" % contents)
 
2803
            self.fail("contents of %s not as expected" % filename)
 
2804
 
 
2805
    def _getTestDirPrefix(self):
 
2806
        # create a directory within the top level test directory
 
2807
        if sys.platform in ('win32', 'cygwin'):
 
2808
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2809
            # windows is likely to have path-length limits so use a short name
 
2810
            name_prefix = name_prefix[-30:]
 
2811
        else:
 
2812
            name_prefix = re.sub('[/]', '_', self.id())
 
2813
        return name_prefix
 
2814
 
 
2815
    def makeAndChdirToTestDir(self):
 
2816
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2817
 
 
2818
        For TestCaseInTempDir we create a temporary directory based on the test
 
2819
        name and then create two subdirs - test and home under it.
 
2820
        """
 
2821
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2822
            self._getTestDirPrefix())
 
2823
        name = name_prefix
 
2824
        for i in range(100):
 
2825
            if os.path.exists(name):
 
2826
                name = name_prefix + '_' + str(i)
 
2827
            else:
 
2828
                # now create test and home directories within this dir
 
2829
                self.test_base_dir = name
 
2830
                self.addCleanup(self.deleteTestDir)
 
2831
                os.mkdir(self.test_base_dir)
 
2832
                break
 
2833
        self.permit_dir(self.test_base_dir)
 
2834
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2835
        # stacking policy to honour; create a bzr dir with an unshared
 
2836
        # repository (but not a branch - our code would be trying to escape
 
2837
        # then!) to stop them, and permit it to be read.
 
2838
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2839
        # control.create_repository()
 
2840
        self.test_home_dir = self.test_base_dir + '/home'
 
2841
        os.mkdir(self.test_home_dir)
 
2842
        self.test_dir = self.test_base_dir + '/work'
 
2843
        os.mkdir(self.test_dir)
 
2844
        os.chdir(self.test_dir)
 
2845
        # put name of test inside
 
2846
        f = file(self.test_base_dir + '/name', 'w')
 
2847
        try:
 
2848
            f.write(self.id())
 
2849
        finally:
 
2850
            f.close()
 
2851
 
 
2852
    def deleteTestDir(self):
 
2853
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2854
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2855
 
 
2856
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2857
        """Build a test tree according to a pattern.
 
2858
 
 
2859
        shape is a sequence of file specifications.  If the final
 
2860
        character is '/', a directory is created.
 
2861
 
 
2862
        This assumes that all the elements in the tree being built are new.
 
2863
 
 
2864
        This doesn't add anything to a branch.
 
2865
 
 
2866
        :type shape:    list or tuple.
 
2867
        :param line_endings: Either 'binary' or 'native'
 
2868
            in binary mode, exact contents are written in native mode, the
 
2869
            line endings match the default platform endings.
 
2870
        :param transport: A transport to write to, for building trees on VFS's.
 
2871
            If the transport is readonly or None, "." is opened automatically.
 
2872
        :return: None
 
2873
        """
 
2874
        if type(shape) not in (list, tuple):
 
2875
            raise AssertionError("Parameter 'shape' should be "
 
2876
                "a list or a tuple. Got %r instead" % (shape,))
 
2877
        # It's OK to just create them using forward slashes on windows.
 
2878
        if transport is None or transport.is_readonly():
 
2879
            transport = _mod_transport.get_transport_from_path(".")
 
2880
        for name in shape:
 
2881
            self.assertIsInstance(name, basestring)
 
2882
            if name[-1] == '/':
 
2883
                transport.mkdir(urlutils.escape(name[:-1]))
 
2884
            else:
 
2885
                if line_endings == 'binary':
 
2886
                    end = '\n'
 
2887
                elif line_endings == 'native':
 
2888
                    end = os.linesep
 
2889
                else:
 
2890
                    raise errors.BzrError(
 
2891
                        'Invalid line ending request %r' % line_endings)
 
2892
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2893
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2894
 
 
2895
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2896
 
 
2897
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2898
        """Assert whether path or paths are in the WorkingTree"""
 
2899
        if tree is None:
 
2900
            tree = workingtree.WorkingTree.open(root_path)
 
2901
        if not isinstance(path, basestring):
 
2902
            for p in path:
 
2903
                self.assertInWorkingTree(p, tree=tree)
 
2904
        else:
 
2905
            self.assertIsNot(tree.path2id(path), None,
 
2906
                path+' not in working tree.')
 
2907
 
 
2908
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2909
        """Assert whether path or paths are not in the WorkingTree"""
 
2910
        if tree is None:
 
2911
            tree = workingtree.WorkingTree.open(root_path)
 
2912
        if not isinstance(path, basestring):
 
2913
            for p in path:
 
2914
                self.assertNotInWorkingTree(p,tree=tree)
 
2915
        else:
 
2916
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2917
 
 
2918
 
 
2919
class TestCaseWithTransport(TestCaseInTempDir):
 
2920
    """A test case that provides get_url and get_readonly_url facilities.
 
2921
 
 
2922
    These back onto two transport servers, one for readonly access and one for
 
2923
    read write access.
 
2924
 
 
2925
    If no explicit class is provided for readonly access, a
 
2926
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2927
    based read write transports.
 
2928
 
 
2929
    If an explicit class is provided for readonly access, that server and the
 
2930
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2931
    """
 
2932
 
 
2933
    def get_vfs_only_server(self):
 
2934
        """See TestCaseWithMemoryTransport.
 
2935
 
 
2936
        This is useful for some tests with specific servers that need
 
2937
        diagnostics.
 
2938
        """
 
2939
        if self.__vfs_server is None:
 
2940
            self.__vfs_server = self.vfs_transport_factory()
 
2941
            self.start_server(self.__vfs_server)
 
2942
        return self.__vfs_server
 
2943
 
 
2944
    def make_branch_and_tree(self, relpath, format=None):
 
2945
        """Create a branch on the transport and a tree locally.
 
2946
 
 
2947
        If the transport is not a LocalTransport, the Tree can't be created on
 
2948
        the transport.  In that case if the vfs_transport_factory is
 
2949
        LocalURLServer the working tree is created in the local
 
2950
        directory backing the transport, and the returned tree's branch and
 
2951
        repository will also be accessed locally. Otherwise a lightweight
 
2952
        checkout is created and returned.
 
2953
 
 
2954
        We do this because we can't physically create a tree in the local
 
2955
        path, with a branch reference to the transport_factory url, and
 
2956
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2957
        namespace is distinct from the local disk - the two branch objects
 
2958
        would collide. While we could construct a tree with its branch object
 
2959
        pointing at the transport_factory transport in memory, reopening it
 
2960
        would behaving unexpectedly, and has in the past caused testing bugs
 
2961
        when we tried to do it that way.
 
2962
 
 
2963
        :param format: The BzrDirFormat.
 
2964
        :returns: the WorkingTree.
 
2965
        """
 
2966
        # TODO: always use the local disk path for the working tree,
 
2967
        # this obviously requires a format that supports branch references
 
2968
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2969
        # RBC 20060208
 
2970
        format = self.resolve_format(format=format)
 
2971
        if not format.supports_workingtrees:
 
2972
            b = self.make_branch(relpath+'.branch', format=format)
 
2973
            return b.create_checkout(relpath, lightweight=True)
 
2974
        b = self.make_branch(relpath, format=format)
 
2975
        try:
 
2976
            return b.bzrdir.create_workingtree()
 
2977
        except errors.NotLocalUrl:
 
2978
            # We can only make working trees locally at the moment.  If the
 
2979
            # transport can't support them, then we keep the non-disk-backed
 
2980
            # branch and create a local checkout.
 
2981
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2982
                # the branch is colocated on disk, we cannot create a checkout.
 
2983
                # hopefully callers will expect this.
 
2984
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2985
                wt = local_controldir.create_workingtree()
 
2986
                if wt.branch._format != b._format:
 
2987
                    wt._branch = b
 
2988
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2989
                    # in case the implementation details of workingtree objects
 
2990
                    # change.
 
2991
                    self.assertIs(b, wt.branch)
 
2992
                return wt
 
2993
            else:
 
2994
                return b.create_checkout(relpath, lightweight=True)
 
2995
 
 
2996
    def assertIsDirectory(self, relpath, transport):
 
2997
        """Assert that relpath within transport is a directory.
 
2998
 
 
2999
        This may not be possible on all transports; in that case it propagates
 
3000
        a TransportNotPossible.
 
3001
        """
 
3002
        try:
 
3003
            mode = transport.stat(relpath).st_mode
 
3004
        except errors.NoSuchFile:
 
3005
            self.fail("path %s is not a directory; no such file"
 
3006
                      % (relpath))
 
3007
        if not stat.S_ISDIR(mode):
 
3008
            self.fail("path %s is not a directory; has mode %#o"
 
3009
                      % (relpath, mode))
 
3010
 
 
3011
    def assertTreesEqual(self, left, right):
 
3012
        """Check that left and right have the same content and properties."""
 
3013
        # we use a tree delta to check for equality of the content, and we
 
3014
        # manually check for equality of other things such as the parents list.
 
3015
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
3016
        differences = left.changes_from(right)
 
3017
        self.assertFalse(differences.has_changed(),
 
3018
            "Trees %r and %r are different: %r" % (left, right, differences))
 
3019
 
 
3020
    def setUp(self):
 
3021
        super(TestCaseWithTransport, self).setUp()
 
3022
        self.__vfs_server = None
 
3023
 
 
3024
    def disable_missing_extensions_warning(self):
 
3025
        """Some tests expect a precise stderr content.
 
3026
 
 
3027
        There is no point in forcing them to duplicate the extension related
 
3028
        warning.
 
3029
        """
 
3030
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
3031
 
 
3032
 
 
3033
class ChrootedTestCase(TestCaseWithTransport):
 
3034
    """A support class that provides readonly urls outside the local namespace.
 
3035
 
 
3036
    This is done by checking if self.transport_server is a MemoryServer. if it
 
3037
    is then we are chrooted already, if it is not then an HttpServer is used
 
3038
    for readonly urls.
 
3039
 
 
3040
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
3041
                       be used without needed to redo it when a different
 
3042
                       subclass is in use ?
 
3043
    """
 
3044
 
 
3045
    def setUp(self):
 
3046
        from bzrlib.tests import http_server
 
3047
        super(ChrootedTestCase, self).setUp()
 
3048
        if not self.vfs_transport_factory == memory.MemoryServer:
 
3049
            self.transport_readonly_server = http_server.HttpServer
 
3050
 
 
3051
 
 
3052
def condition_id_re(pattern):
 
3053
    """Create a condition filter which performs a re check on a test's id.
 
3054
 
 
3055
    :param pattern: A regular expression string.
 
3056
    :return: A callable that returns True if the re matches.
 
3057
    """
 
3058
    filter_re = re.compile(pattern, 0)
 
3059
    def condition(test):
 
3060
        test_id = test.id()
 
3061
        return filter_re.search(test_id)
 
3062
    return condition
 
3063
 
 
3064
 
 
3065
def condition_isinstance(klass_or_klass_list):
 
3066
    """Create a condition filter which returns isinstance(param, klass).
 
3067
 
 
3068
    :return: A callable which when called with one parameter obj return the
 
3069
        result of isinstance(obj, klass_or_klass_list).
 
3070
    """
 
3071
    def condition(obj):
 
3072
        return isinstance(obj, klass_or_klass_list)
 
3073
    return condition
 
3074
 
 
3075
 
 
3076
def condition_id_in_list(id_list):
 
3077
    """Create a condition filter which verify that test's id in a list.
 
3078
 
 
3079
    :param id_list: A TestIdList object.
 
3080
    :return: A callable that returns True if the test's id appears in the list.
 
3081
    """
 
3082
    def condition(test):
 
3083
        return id_list.includes(test.id())
 
3084
    return condition
 
3085
 
 
3086
 
 
3087
def condition_id_startswith(starts):
 
3088
    """Create a condition filter verifying that test's id starts with a string.
 
3089
 
 
3090
    :param starts: A list of string.
 
3091
    :return: A callable that returns True if the test's id starts with one of
 
3092
        the given strings.
 
3093
    """
 
3094
    def condition(test):
 
3095
        for start in starts:
 
3096
            if test.id().startswith(start):
 
3097
                return True
 
3098
        return False
 
3099
    return condition
 
3100
 
 
3101
 
 
3102
def exclude_tests_by_condition(suite, condition):
 
3103
    """Create a test suite which excludes some tests from suite.
 
3104
 
 
3105
    :param suite: The suite to get tests from.
 
3106
    :param condition: A callable whose result evaluates True when called with a
 
3107
        test case which should be excluded from the result.
 
3108
    :return: A suite which contains the tests found in suite that fail
 
3109
        condition.
 
3110
    """
 
3111
    result = []
 
3112
    for test in iter_suite_tests(suite):
 
3113
        if not condition(test):
 
3114
            result.append(test)
 
3115
    return TestUtil.TestSuite(result)
 
3116
 
 
3117
 
 
3118
def filter_suite_by_condition(suite, condition):
 
3119
    """Create a test suite by filtering another one.
 
3120
 
 
3121
    :param suite: The source suite.
 
3122
    :param condition: A callable whose result evaluates True when called with a
 
3123
        test case which should be included in the result.
 
3124
    :return: A suite which contains the tests found in suite that pass
 
3125
        condition.
 
3126
    """
 
3127
    result = []
 
3128
    for test in iter_suite_tests(suite):
 
3129
        if condition(test):
 
3130
            result.append(test)
 
3131
    return TestUtil.TestSuite(result)
 
3132
 
 
3133
 
 
3134
def filter_suite_by_re(suite, pattern):
 
3135
    """Create a test suite by filtering another one.
 
3136
 
 
3137
    :param suite:           the source suite
 
3138
    :param pattern:         pattern that names must match
 
3139
    :returns: the newly created suite
 
3140
    """
 
3141
    condition = condition_id_re(pattern)
 
3142
    result_suite = filter_suite_by_condition(suite, condition)
 
3143
    return result_suite
 
3144
 
 
3145
 
 
3146
def filter_suite_by_id_list(suite, test_id_list):
 
3147
    """Create a test suite by filtering another one.
 
3148
 
 
3149
    :param suite: The source suite.
 
3150
    :param test_id_list: A list of the test ids to keep as strings.
 
3151
    :returns: the newly created suite
 
3152
    """
 
3153
    condition = condition_id_in_list(test_id_list)
 
3154
    result_suite = filter_suite_by_condition(suite, condition)
 
3155
    return result_suite
 
3156
 
 
3157
 
 
3158
def filter_suite_by_id_startswith(suite, start):
 
3159
    """Create a test suite by filtering another one.
 
3160
 
 
3161
    :param suite: The source suite.
 
3162
    :param start: A list of string the test id must start with one of.
 
3163
    :returns: the newly created suite
 
3164
    """
 
3165
    condition = condition_id_startswith(start)
 
3166
    result_suite = filter_suite_by_condition(suite, condition)
 
3167
    return result_suite
 
3168
 
 
3169
 
 
3170
def exclude_tests_by_re(suite, pattern):
 
3171
    """Create a test suite which excludes some tests from suite.
 
3172
 
 
3173
    :param suite: The suite to get tests from.
 
3174
    :param pattern: A regular expression string. Test ids that match this
 
3175
        pattern will be excluded from the result.
 
3176
    :return: A TestSuite that contains all the tests from suite without the
 
3177
        tests that matched pattern. The order of tests is the same as it was in
 
3178
        suite.
 
3179
    """
 
3180
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
3181
 
 
3182
 
 
3183
def preserve_input(something):
 
3184
    """A helper for performing test suite transformation chains.
 
3185
 
 
3186
    :param something: Anything you want to preserve.
 
3187
    :return: Something.
 
3188
    """
 
3189
    return something
 
3190
 
 
3191
 
 
3192
def randomize_suite(suite):
 
3193
    """Return a new TestSuite with suite's tests in random order.
 
3194
 
 
3195
    The tests in the input suite are flattened into a single suite in order to
 
3196
    accomplish this. Any nested TestSuites are removed to provide global
 
3197
    randomness.
 
3198
    """
 
3199
    tests = list(iter_suite_tests(suite))
 
3200
    random.shuffle(tests)
 
3201
    return TestUtil.TestSuite(tests)
 
3202
 
 
3203
 
 
3204
def split_suite_by_condition(suite, condition):
 
3205
    """Split a test suite into two by a condition.
 
3206
 
 
3207
    :param suite: The suite to split.
 
3208
    :param condition: The condition to match on. Tests that match this
 
3209
        condition are returned in the first test suite, ones that do not match
 
3210
        are in the second suite.
 
3211
    :return: A tuple of two test suites, where the first contains tests from
 
3212
        suite matching the condition, and the second contains the remainder
 
3213
        from suite. The order within each output suite is the same as it was in
 
3214
        suite.
 
3215
    """
 
3216
    matched = []
 
3217
    did_not_match = []
 
3218
    for test in iter_suite_tests(suite):
 
3219
        if condition(test):
 
3220
            matched.append(test)
 
3221
        else:
 
3222
            did_not_match.append(test)
 
3223
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
3224
 
 
3225
 
 
3226
def split_suite_by_re(suite, pattern):
 
3227
    """Split a test suite into two by a regular expression.
 
3228
 
 
3229
    :param suite: The suite to split.
 
3230
    :param pattern: A regular expression string. Test ids that match this
 
3231
        pattern will be in the first test suite returned, and the others in the
 
3232
        second test suite returned.
 
3233
    :return: A tuple of two test suites, where the first contains tests from
 
3234
        suite matching pattern, and the second contains the remainder from
 
3235
        suite. The order within each output suite is the same as it was in
 
3236
        suite.
 
3237
    """
 
3238
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
3239
 
 
3240
 
 
3241
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
3242
              stop_on_failure=False,
 
3243
              transport=None, lsprof_timed=None, bench_history=None,
 
3244
              matching_tests_first=None,
 
3245
              list_only=False,
 
3246
              random_seed=None,
 
3247
              exclude_pattern=None,
 
3248
              strict=False,
 
3249
              runner_class=None,
 
3250
              suite_decorators=None,
 
3251
              stream=None,
 
3252
              result_decorators=None,
 
3253
              ):
 
3254
    """Run a test suite for bzr selftest.
 
3255
 
 
3256
    :param runner_class: The class of runner to use. Must support the
 
3257
        constructor arguments passed by run_suite which are more than standard
 
3258
        python uses.
 
3259
    :return: A boolean indicating success.
 
3260
    """
 
3261
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
3262
    if verbose:
 
3263
        verbosity = 2
 
3264
    else:
 
3265
        verbosity = 1
 
3266
    if runner_class is None:
 
3267
        runner_class = TextTestRunner
 
3268
    if stream is None:
 
3269
        stream = sys.stdout
 
3270
    runner = runner_class(stream=stream,
 
3271
                            descriptions=0,
 
3272
                            verbosity=verbosity,
 
3273
                            bench_history=bench_history,
 
3274
                            strict=strict,
 
3275
                            result_decorators=result_decorators,
 
3276
                            )
 
3277
    runner.stop_on_failure=stop_on_failure
 
3278
    if isinstance(suite, unittest.TestSuite):
 
3279
        # Empty out _tests list of passed suite and populate new TestSuite
 
3280
        suite._tests[:], suite = [], TestSuite(suite)
 
3281
    # built in decorator factories:
 
3282
    decorators = [
 
3283
        random_order(random_seed, runner),
 
3284
        exclude_tests(exclude_pattern),
 
3285
        ]
 
3286
    if matching_tests_first:
 
3287
        decorators.append(tests_first(pattern))
 
3288
    else:
 
3289
        decorators.append(filter_tests(pattern))
 
3290
    if suite_decorators:
 
3291
        decorators.extend(suite_decorators)
 
3292
    # tell the result object how many tests will be running: (except if
 
3293
    # --parallel=fork is being used. Robert said he will provide a better
 
3294
    # progress design later -- vila 20090817)
 
3295
    if fork_decorator not in decorators:
 
3296
        decorators.append(CountingDecorator)
 
3297
    for decorator in decorators:
 
3298
        suite = decorator(suite)
 
3299
    if list_only:
 
3300
        # Done after test suite decoration to allow randomisation etc
 
3301
        # to take effect, though that is of marginal benefit.
 
3302
        if verbosity >= 2:
 
3303
            stream.write("Listing tests only ...\n")
 
3304
        for t in iter_suite_tests(suite):
 
3305
            stream.write("%s\n" % (t.id()))
 
3306
        return True
 
3307
    result = runner.run(suite)
 
3308
    if strict:
 
3309
        return result.wasStrictlySuccessful()
 
3310
    else:
 
3311
        return result.wasSuccessful()
 
3312
 
 
3313
 
 
3314
# A registry where get() returns a suite decorator.
 
3315
parallel_registry = registry.Registry()
 
3316
 
 
3317
 
 
3318
def fork_decorator(suite):
 
3319
    if getattr(os, "fork", None) is None:
 
3320
        raise errors.BzrCommandError("platform does not support fork,"
 
3321
            " try --parallel=subprocess instead.")
 
3322
    concurrency = osutils.local_concurrency()
 
3323
    if concurrency == 1:
 
3324
        return suite
 
3325
    from testtools import ConcurrentTestSuite
 
3326
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3327
parallel_registry.register('fork', fork_decorator)
 
3328
 
 
3329
 
 
3330
def subprocess_decorator(suite):
 
3331
    concurrency = osutils.local_concurrency()
 
3332
    if concurrency == 1:
 
3333
        return suite
 
3334
    from testtools import ConcurrentTestSuite
 
3335
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3336
parallel_registry.register('subprocess', subprocess_decorator)
 
3337
 
 
3338
 
 
3339
def exclude_tests(exclude_pattern):
 
3340
    """Return a test suite decorator that excludes tests."""
 
3341
    if exclude_pattern is None:
 
3342
        return identity_decorator
 
3343
    def decorator(suite):
 
3344
        return ExcludeDecorator(suite, exclude_pattern)
 
3345
    return decorator
 
3346
 
 
3347
 
 
3348
def filter_tests(pattern):
 
3349
    if pattern == '.*':
 
3350
        return identity_decorator
 
3351
    def decorator(suite):
 
3352
        return FilterTestsDecorator(suite, pattern)
 
3353
    return decorator
 
3354
 
 
3355
 
 
3356
def random_order(random_seed, runner):
 
3357
    """Return a test suite decorator factory for randomising tests order.
 
3358
    
 
3359
    :param random_seed: now, a string which casts to a long, or a long.
 
3360
    :param runner: A test runner with a stream attribute to report on.
 
3361
    """
 
3362
    if random_seed is None:
 
3363
        return identity_decorator
 
3364
    def decorator(suite):
 
3365
        return RandomDecorator(suite, random_seed, runner.stream)
 
3366
    return decorator
 
3367
 
 
3368
 
 
3369
def tests_first(pattern):
 
3370
    if pattern == '.*':
 
3371
        return identity_decorator
 
3372
    def decorator(suite):
 
3373
        return TestFirstDecorator(suite, pattern)
 
3374
    return decorator
 
3375
 
 
3376
 
 
3377
def identity_decorator(suite):
 
3378
    """Return suite."""
 
3379
    return suite
 
3380
 
 
3381
 
 
3382
class TestDecorator(TestUtil.TestSuite):
 
3383
    """A decorator for TestCase/TestSuite objects.
 
3384
 
 
3385
    Contains rather than flattening suite passed on construction
 
3386
    """
 
3387
 
 
3388
    def __init__(self, suite=None):
 
3389
        super(TestDecorator, self).__init__()
 
3390
        if suite is not None:
 
3391
            self.addTest(suite)
 
3392
 
 
3393
    # Don't need subclass run method with suite emptying
 
3394
    run = unittest.TestSuite.run
 
3395
 
 
3396
 
 
3397
class CountingDecorator(TestDecorator):
 
3398
    """A decorator which calls result.progress(self.countTestCases)."""
 
3399
 
 
3400
    def run(self, result):
 
3401
        progress_method = getattr(result, 'progress', None)
 
3402
        if callable(progress_method):
 
3403
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3404
        return super(CountingDecorator, self).run(result)
 
3405
 
 
3406
 
 
3407
class ExcludeDecorator(TestDecorator):
 
3408
    """A decorator which excludes test matching an exclude pattern."""
 
3409
 
 
3410
    def __init__(self, suite, exclude_pattern):
 
3411
        super(ExcludeDecorator, self).__init__(
 
3412
            exclude_tests_by_re(suite, exclude_pattern))
 
3413
 
 
3414
 
 
3415
class FilterTestsDecorator(TestDecorator):
 
3416
    """A decorator which filters tests to those matching a pattern."""
 
3417
 
 
3418
    def __init__(self, suite, pattern):
 
3419
        super(FilterTestsDecorator, self).__init__(
 
3420
            filter_suite_by_re(suite, pattern))
 
3421
 
 
3422
 
 
3423
class RandomDecorator(TestDecorator):
 
3424
    """A decorator which randomises the order of its tests."""
 
3425
 
 
3426
    def __init__(self, suite, random_seed, stream):
 
3427
        random_seed = self.actual_seed(random_seed)
 
3428
        stream.write("Randomizing test order using seed %s\n\n" %
 
3429
            (random_seed,))
 
3430
        # Initialise the random number generator.
 
3431
        random.seed(random_seed)
 
3432
        super(RandomDecorator, self).__init__(randomize_suite(suite))
 
3433
 
 
3434
    @staticmethod
 
3435
    def actual_seed(seed):
 
3436
        if seed == "now":
 
3437
            # We convert the seed to a long to make it reuseable across
 
3438
            # invocations (because the user can reenter it).
 
3439
            return long(time.time())
 
3440
        else:
 
3441
            # Convert the seed to a long if we can
 
3442
            try:
 
3443
                return long(seed)
 
3444
            except (TypeError, ValueError):
 
3445
                pass
 
3446
        return seed
 
3447
 
 
3448
 
 
3449
class TestFirstDecorator(TestDecorator):
 
3450
    """A decorator which moves named tests to the front."""
 
3451
 
 
3452
    def __init__(self, suite, pattern):
 
3453
        super(TestFirstDecorator, self).__init__()
 
3454
        self.addTests(split_suite_by_re(suite, pattern))
 
3455
 
 
3456
 
 
3457
def partition_tests(suite, count):
 
3458
    """Partition suite into count lists of tests."""
 
3459
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3460
    # splits up blocks of related tests that might run faster if they shared
 
3461
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3462
    # just one partition.  So the slowest partition shouldn't be much slower
 
3463
    # than the fastest.
 
3464
    partitions = [list() for i in range(count)]
 
3465
    tests = iter_suite_tests(suite)
 
3466
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
 
3467
        partition.append(test)
 
3468
    return partitions
 
3469
 
 
3470
 
 
3471
def workaround_zealous_crypto_random():
 
3472
    """Crypto.Random want to help us being secure, but we don't care here.
 
3473
 
 
3474
    This workaround some test failure related to the sftp server. Once paramiko
 
3475
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3476
    """
 
3477
    try:
 
3478
        from Crypto.Random import atfork
 
3479
        atfork()
 
3480
    except ImportError:
 
3481
        pass
 
3482
 
 
3483
 
 
3484
def fork_for_tests(suite):
 
3485
    """Take suite and start up one runner per CPU by forking()
 
3486
 
 
3487
    :return: An iterable of TestCase-like objects which can each have
 
3488
        run(result) called on them to feed tests to result.
 
3489
    """
 
3490
    concurrency = osutils.local_concurrency()
 
3491
    result = []
 
3492
    from subunit import ProtocolTestCase
 
3493
    from subunit.test_results import AutoTimingTestResultDecorator
 
3494
    class TestInOtherProcess(ProtocolTestCase):
 
3495
        # Should be in subunit, I think. RBC.
 
3496
        def __init__(self, stream, pid):
 
3497
            ProtocolTestCase.__init__(self, stream)
 
3498
            self.pid = pid
 
3499
 
 
3500
        def run(self, result):
 
3501
            try:
 
3502
                ProtocolTestCase.run(self, result)
 
3503
            finally:
 
3504
                pid, status = os.waitpid(self.pid, 0)
 
3505
            # GZ 2011-10-18: If status is nonzero, should report to the result
 
3506
            #                that something went wrong.
 
3507
 
 
3508
    test_blocks = partition_tests(suite, concurrency)
 
3509
    # Clear the tests from the original suite so it doesn't keep them alive
 
3510
    suite._tests[:] = []
 
3511
    for process_tests in test_blocks:
 
3512
        process_suite = TestUtil.TestSuite(process_tests)
 
3513
        # Also clear each split list so new suite has only reference
 
3514
        process_tests[:] = []
 
3515
        c2pread, c2pwrite = os.pipe()
 
3516
        pid = os.fork()
 
3517
        if pid == 0:
 
3518
            try:
 
3519
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3520
                workaround_zealous_crypto_random()
 
3521
                os.close(c2pread)
 
3522
                # Leave stderr and stdout open so we can see test noise
 
3523
                # Close stdin so that the child goes away if it decides to
 
3524
                # read from stdin (otherwise its a roulette to see what
 
3525
                # child actually gets keystrokes for pdb etc).
 
3526
                sys.stdin.close()
 
3527
                subunit_result = AutoTimingTestResultDecorator(
 
3528
                    SubUnitBzrProtocolClient(stream))
 
3529
                process_suite.run(subunit_result)
 
3530
            except:
 
3531
                # Try and report traceback on stream, but exit with error even
 
3532
                # if stream couldn't be created or something else goes wrong.
 
3533
                # The traceback is formatted to a string and written in one go
 
3534
                # to avoid interleaving lines from multiple failing children.
 
3535
                try:
 
3536
                    stream.write(traceback.format_exc())
 
3537
                finally:
 
3538
                    os._exit(1)
 
3539
            os._exit(0)
 
3540
        else:
 
3541
            os.close(c2pwrite)
 
3542
            stream = os.fdopen(c2pread, 'rb', 1)
 
3543
            test = TestInOtherProcess(stream, pid)
 
3544
            result.append(test)
 
3545
    return result
 
3546
 
 
3547
 
 
3548
def reinvoke_for_tests(suite):
 
3549
    """Take suite and start up one runner per CPU using subprocess().
 
3550
 
 
3551
    :return: An iterable of TestCase-like objects which can each have
 
3552
        run(result) called on them to feed tests to result.
 
3553
    """
 
3554
    concurrency = osutils.local_concurrency()
 
3555
    result = []
 
3556
    from subunit import ProtocolTestCase
 
3557
    class TestInSubprocess(ProtocolTestCase):
 
3558
        def __init__(self, process, name):
 
3559
            ProtocolTestCase.__init__(self, process.stdout)
 
3560
            self.process = process
 
3561
            self.process.stdin.close()
 
3562
            self.name = name
 
3563
 
 
3564
        def run(self, result):
 
3565
            try:
 
3566
                ProtocolTestCase.run(self, result)
 
3567
            finally:
 
3568
                self.process.wait()
 
3569
                os.unlink(self.name)
 
3570
            # print "pid %d finished" % finished_process
 
3571
    test_blocks = partition_tests(suite, concurrency)
 
3572
    for process_tests in test_blocks:
 
3573
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3574
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3575
        if not os.path.isfile(bzr_path):
 
3576
            # We are probably installed. Assume sys.argv is the right file
 
3577
            bzr_path = sys.argv[0]
 
3578
        bzr_path = [bzr_path]
 
3579
        if sys.platform == "win32":
 
3580
            # if we're on windows, we can't execute the bzr script directly
 
3581
            bzr_path = [sys.executable] + bzr_path
 
3582
        fd, test_list_file_name = tempfile.mkstemp()
 
3583
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3584
        for test in process_tests:
 
3585
            test_list_file.write(test.id() + '\n')
 
3586
        test_list_file.close()
 
3587
        try:
 
3588
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3589
                '--subunit']
 
3590
            if '--no-plugins' in sys.argv:
 
3591
                argv.append('--no-plugins')
 
3592
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3593
            # noise on stderr it can interrupt the subunit protocol.
 
3594
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3595
                                      stdout=subprocess.PIPE,
 
3596
                                      stderr=subprocess.PIPE,
 
3597
                                      bufsize=1)
 
3598
            test = TestInSubprocess(process, test_list_file_name)
 
3599
            result.append(test)
 
3600
        except:
 
3601
            os.unlink(test_list_file_name)
 
3602
            raise
 
3603
    return result
 
3604
 
 
3605
 
 
3606
class ProfileResult(testtools.ExtendedToOriginalDecorator):
 
3607
    """Generate profiling data for all activity between start and success.
 
3608
    
 
3609
    The profile data is appended to the test's _benchcalls attribute and can
 
3610
    be accessed by the forwarded-to TestResult.
 
3611
 
 
3612
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3613
    where our existing output support for lsprof is, and this class aims to
 
3614
    fit in with that: while it could be moved it's not necessary to accomplish
 
3615
    test profiling, nor would it be dramatically cleaner.
 
3616
    """
 
3617
 
 
3618
    def startTest(self, test):
 
3619
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3620
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3621
        # unavoidably fail.
 
3622
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
 
3623
        self.profiler.start()
 
3624
        testtools.ExtendedToOriginalDecorator.startTest(self, test)
 
3625
 
 
3626
    def addSuccess(self, test):
 
3627
        stats = self.profiler.stop()
 
3628
        try:
 
3629
            calls = test._benchcalls
 
3630
        except AttributeError:
 
3631
            test._benchcalls = []
 
3632
            calls = test._benchcalls
 
3633
        calls.append(((test.id(), "", ""), stats))
 
3634
        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
 
3635
 
 
3636
    def stopTest(self, test):
 
3637
        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
 
3638
        self.profiler = None
 
3639
 
 
3640
 
 
3641
# Controlled by "bzr selftest -E=..." option
 
3642
# Currently supported:
 
3643
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3644
#                           preserves any flags supplied at the command line.
 
3645
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3646
#                           rather than failing tests. And no longer raise
 
3647
#                           LockContention when fctnl locks are not being used
 
3648
#                           with proper exclusion rules.
 
3649
#   -Ethreads               Will display thread ident at creation/join time to
 
3650
#                           help track thread leaks
 
3651
#   -Euncollected_cases     Display the identity of any test cases that weren't
 
3652
#                           deallocated after being completed.
 
3653
#   -Econfig_stats          Will collect statistics using addDetail
 
3654
selftest_debug_flags = set()
 
3655
 
 
3656
 
 
3657
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3658
             transport=None,
 
3659
             test_suite_factory=None,
 
3660
             lsprof_timed=None,
 
3661
             bench_history=None,
 
3662
             matching_tests_first=None,
 
3663
             list_only=False,
 
3664
             random_seed=None,
 
3665
             exclude_pattern=None,
 
3666
             strict=False,
 
3667
             load_list=None,
 
3668
             debug_flags=None,
 
3669
             starting_with=None,
 
3670
             runner_class=None,
 
3671
             suite_decorators=None,
 
3672
             stream=None,
 
3673
             lsprof_tests=False,
 
3674
             ):
 
3675
    """Run the whole test suite under the enhanced runner"""
 
3676
    # XXX: Very ugly way to do this...
 
3677
    # Disable warning about old formats because we don't want it to disturb
 
3678
    # any blackbox tests.
 
3679
    from bzrlib import repository
 
3680
    repository._deprecation_warning_done = True
 
3681
 
 
3682
    global default_transport
 
3683
    if transport is None:
 
3684
        transport = default_transport
 
3685
    old_transport = default_transport
 
3686
    default_transport = transport
 
3687
    global selftest_debug_flags
 
3688
    old_debug_flags = selftest_debug_flags
 
3689
    if debug_flags is not None:
 
3690
        selftest_debug_flags = set(debug_flags)
 
3691
    try:
 
3692
        if load_list is None:
 
3693
            keep_only = None
 
3694
        else:
 
3695
            keep_only = load_test_id_list(load_list)
 
3696
        if starting_with:
 
3697
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3698
                             for start in starting_with]
 
3699
        if test_suite_factory is None:
 
3700
            # Reduce loading time by loading modules based on the starting_with
 
3701
            # patterns.
 
3702
            suite = test_suite(keep_only, starting_with)
 
3703
        else:
 
3704
            suite = test_suite_factory()
 
3705
        if starting_with:
 
3706
            # But always filter as requested.
 
3707
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3708
        result_decorators = []
 
3709
        if lsprof_tests:
 
3710
            result_decorators.append(ProfileResult)
 
3711
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3712
                     stop_on_failure=stop_on_failure,
 
3713
                     transport=transport,
 
3714
                     lsprof_timed=lsprof_timed,
 
3715
                     bench_history=bench_history,
 
3716
                     matching_tests_first=matching_tests_first,
 
3717
                     list_only=list_only,
 
3718
                     random_seed=random_seed,
 
3719
                     exclude_pattern=exclude_pattern,
 
3720
                     strict=strict,
 
3721
                     runner_class=runner_class,
 
3722
                     suite_decorators=suite_decorators,
 
3723
                     stream=stream,
 
3724
                     result_decorators=result_decorators,
 
3725
                     )
 
3726
    finally:
 
3727
        default_transport = old_transport
 
3728
        selftest_debug_flags = old_debug_flags
 
3729
 
 
3730
 
 
3731
def load_test_id_list(file_name):
 
3732
    """Load a test id list from a text file.
 
3733
 
 
3734
    The format is one test id by line.  No special care is taken to impose
 
3735
    strict rules, these test ids are used to filter the test suite so a test id
 
3736
    that do not match an existing test will do no harm. This allows user to add
 
3737
    comments, leave blank lines, etc.
 
3738
    """
 
3739
    test_list = []
 
3740
    try:
 
3741
        ftest = open(file_name, 'rt')
 
3742
    except IOError, e:
 
3743
        if e.errno != errno.ENOENT:
 
3744
            raise
 
3745
        else:
 
3746
            raise errors.NoSuchFile(file_name)
 
3747
 
 
3748
    for test_name in ftest.readlines():
 
3749
        test_list.append(test_name.strip())
 
3750
    ftest.close()
 
3751
    return test_list
 
3752
 
 
3753
 
 
3754
def suite_matches_id_list(test_suite, id_list):
 
3755
    """Warns about tests not appearing or appearing more than once.
 
3756
 
 
3757
    :param test_suite: A TestSuite object.
 
3758
    :param test_id_list: The list of test ids that should be found in
 
3759
         test_suite.
 
3760
 
 
3761
    :return: (absents, duplicates) absents is a list containing the test found
 
3762
        in id_list but not in test_suite, duplicates is a list containing the
 
3763
        test found multiple times in test_suite.
 
3764
 
 
3765
    When using a prefined test id list, it may occurs that some tests do not
 
3766
    exist anymore or that some tests use the same id. This function warns the
 
3767
    tester about potential problems in his workflow (test lists are volatile)
 
3768
    or in the test suite itself (using the same id for several tests does not
 
3769
    help to localize defects).
 
3770
    """
 
3771
    # Build a dict counting id occurrences
 
3772
    tests = dict()
 
3773
    for test in iter_suite_tests(test_suite):
 
3774
        id = test.id()
 
3775
        tests[id] = tests.get(id, 0) + 1
 
3776
 
 
3777
    not_found = []
 
3778
    duplicates = []
 
3779
    for id in id_list:
 
3780
        occurs = tests.get(id, 0)
 
3781
        if not occurs:
 
3782
            not_found.append(id)
 
3783
        elif occurs > 1:
 
3784
            duplicates.append(id)
 
3785
 
 
3786
    return not_found, duplicates
 
3787
 
 
3788
 
 
3789
class TestIdList(object):
 
3790
    """Test id list to filter a test suite.
 
3791
 
 
3792
    Relying on the assumption that test ids are built as:
 
3793
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3794
    notation, this class offers methods to :
 
3795
    - avoid building a test suite for modules not refered to in the test list,
 
3796
    - keep only the tests listed from the module test suite.
 
3797
    """
 
3798
 
 
3799
    def __init__(self, test_id_list):
 
3800
        # When a test suite needs to be filtered against us we compare test ids
 
3801
        # for equality, so a simple dict offers a quick and simple solution.
 
3802
        self.tests = dict().fromkeys(test_id_list, True)
 
3803
 
 
3804
        # While unittest.TestCase have ids like:
 
3805
        # <module>.<class>.<method>[(<param+)],
 
3806
        # doctest.DocTestCase can have ids like:
 
3807
        # <module>
 
3808
        # <module>.<class>
 
3809
        # <module>.<function>
 
3810
        # <module>.<class>.<method>
 
3811
 
 
3812
        # Since we can't predict a test class from its name only, we settle on
 
3813
        # a simple constraint: a test id always begins with its module name.
 
3814
 
 
3815
        modules = {}
 
3816
        for test_id in test_id_list:
 
3817
            parts = test_id.split('.')
 
3818
            mod_name = parts.pop(0)
 
3819
            modules[mod_name] = True
 
3820
            for part in parts:
 
3821
                mod_name += '.' + part
 
3822
                modules[mod_name] = True
 
3823
        self.modules = modules
 
3824
 
 
3825
    def refers_to(self, module_name):
 
3826
        """Is there tests for the module or one of its sub modules."""
 
3827
        return self.modules.has_key(module_name)
 
3828
 
 
3829
    def includes(self, test_id):
 
3830
        return self.tests.has_key(test_id)
 
3831
 
 
3832
 
 
3833
class TestPrefixAliasRegistry(registry.Registry):
 
3834
    """A registry for test prefix aliases.
 
3835
 
 
3836
    This helps implement shorcuts for the --starting-with selftest
 
3837
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3838
    warning will be emitted).
 
3839
    """
 
3840
 
 
3841
    def register(self, key, obj, help=None, info=None,
 
3842
                 override_existing=False):
 
3843
        """See Registry.register.
 
3844
 
 
3845
        Trying to override an existing alias causes a warning to be emitted,
 
3846
        not a fatal execption.
 
3847
        """
 
3848
        try:
 
3849
            super(TestPrefixAliasRegistry, self).register(
 
3850
                key, obj, help=help, info=info, override_existing=False)
 
3851
        except KeyError:
 
3852
            actual = self.get(key)
 
3853
            trace.note(
 
3854
                'Test prefix alias %s is already used for %s, ignoring %s'
 
3855
                % (key, actual, obj))
 
3856
 
 
3857
    def resolve_alias(self, id_start):
 
3858
        """Replace the alias by the prefix in the given string.
 
3859
 
 
3860
        Using an unknown prefix is an error to help catching typos.
 
3861
        """
 
3862
        parts = id_start.split('.')
 
3863
        try:
 
3864
            parts[0] = self.get(parts[0])
 
3865
        except KeyError:
 
3866
            raise errors.BzrCommandError(
 
3867
                '%s is not a known test prefix alias' % parts[0])
 
3868
        return '.'.join(parts)
 
3869
 
 
3870
 
 
3871
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3872
"""Registry of test prefix aliases."""
 
3873
 
 
3874
 
 
3875
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3876
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3877
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3878
 
 
3879
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3880
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3881
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3882
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3883
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3884
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3885
 
 
3886
 
 
3887
def _test_suite_testmod_names():
 
3888
    """Return the standard list of test module names to test."""
 
3889
    return [
 
3890
        'bzrlib.doc',
 
3891
        'bzrlib.tests.blackbox',
 
3892
        'bzrlib.tests.commands',
 
3893
        'bzrlib.tests.doc_generate',
 
3894
        'bzrlib.tests.per_branch',
 
3895
        'bzrlib.tests.per_bzrdir',
 
3896
        'bzrlib.tests.per_controldir',
 
3897
        'bzrlib.tests.per_controldir_colo',
 
3898
        'bzrlib.tests.per_foreign_vcs',
 
3899
        'bzrlib.tests.per_interrepository',
 
3900
        'bzrlib.tests.per_intertree',
 
3901
        'bzrlib.tests.per_inventory',
 
3902
        'bzrlib.tests.per_interbranch',
 
3903
        'bzrlib.tests.per_lock',
 
3904
        'bzrlib.tests.per_merger',
 
3905
        'bzrlib.tests.per_transport',
 
3906
        'bzrlib.tests.per_tree',
 
3907
        'bzrlib.tests.per_pack_repository',
 
3908
        'bzrlib.tests.per_repository',
 
3909
        'bzrlib.tests.per_repository_chk',
 
3910
        'bzrlib.tests.per_repository_reference',
 
3911
        'bzrlib.tests.per_repository_vf',
 
3912
        'bzrlib.tests.per_uifactory',
 
3913
        'bzrlib.tests.per_versionedfile',
 
3914
        'bzrlib.tests.per_workingtree',
 
3915
        'bzrlib.tests.test__annotator',
 
3916
        'bzrlib.tests.test__bencode',
 
3917
        'bzrlib.tests.test__btree_serializer',
 
3918
        'bzrlib.tests.test__chk_map',
 
3919
        'bzrlib.tests.test__dirstate_helpers',
 
3920
        'bzrlib.tests.test__groupcompress',
 
3921
        'bzrlib.tests.test__known_graph',
 
3922
        'bzrlib.tests.test__rio',
 
3923
        'bzrlib.tests.test__simple_set',
 
3924
        'bzrlib.tests.test__static_tuple',
 
3925
        'bzrlib.tests.test__walkdirs_win32',
 
3926
        'bzrlib.tests.test_ancestry',
 
3927
        'bzrlib.tests.test_annotate',
 
3928
        'bzrlib.tests.test_api',
 
3929
        'bzrlib.tests.test_atomicfile',
 
3930
        'bzrlib.tests.test_bad_files',
 
3931
        'bzrlib.tests.test_bisect_multi',
 
3932
        'bzrlib.tests.test_branch',
 
3933
        'bzrlib.tests.test_branchbuilder',
 
3934
        'bzrlib.tests.test_btree_index',
 
3935
        'bzrlib.tests.test_bugtracker',
 
3936
        'bzrlib.tests.test_bundle',
 
3937
        'bzrlib.tests.test_bzrdir',
 
3938
        'bzrlib.tests.test__chunks_to_lines',
 
3939
        'bzrlib.tests.test_cache_utf8',
 
3940
        'bzrlib.tests.test_chk_map',
 
3941
        'bzrlib.tests.test_chk_serializer',
 
3942
        'bzrlib.tests.test_chunk_writer',
 
3943
        'bzrlib.tests.test_clean_tree',
 
3944
        'bzrlib.tests.test_cleanup',
 
3945
        'bzrlib.tests.test_cmdline',
 
3946
        'bzrlib.tests.test_commands',
 
3947
        'bzrlib.tests.test_commit',
 
3948
        'bzrlib.tests.test_commit_merge',
 
3949
        'bzrlib.tests.test_config',
 
3950
        'bzrlib.tests.test_conflicts',
 
3951
        'bzrlib.tests.test_controldir',
 
3952
        'bzrlib.tests.test_counted_lock',
 
3953
        'bzrlib.tests.test_crash',
 
3954
        'bzrlib.tests.test_decorators',
 
3955
        'bzrlib.tests.test_delta',
 
3956
        'bzrlib.tests.test_debug',
 
3957
        'bzrlib.tests.test_diff',
 
3958
        'bzrlib.tests.test_directory_service',
 
3959
        'bzrlib.tests.test_dirstate',
 
3960
        'bzrlib.tests.test_email_message',
 
3961
        'bzrlib.tests.test_eol_filters',
 
3962
        'bzrlib.tests.test_errors',
 
3963
        'bzrlib.tests.test_estimate_compressed_size',
 
3964
        'bzrlib.tests.test_export',
 
3965
        'bzrlib.tests.test_export_pot',
 
3966
        'bzrlib.tests.test_extract',
 
3967
        'bzrlib.tests.test_features',
 
3968
        'bzrlib.tests.test_fetch',
 
3969
        'bzrlib.tests.test_fixtures',
 
3970
        'bzrlib.tests.test_fifo_cache',
 
3971
        'bzrlib.tests.test_filters',
 
3972
        'bzrlib.tests.test_filter_tree',
 
3973
        'bzrlib.tests.test_ftp_transport',
 
3974
        'bzrlib.tests.test_foreign',
 
3975
        'bzrlib.tests.test_generate_docs',
 
3976
        'bzrlib.tests.test_generate_ids',
 
3977
        'bzrlib.tests.test_globbing',
 
3978
        'bzrlib.tests.test_gpg',
 
3979
        'bzrlib.tests.test_graph',
 
3980
        'bzrlib.tests.test_groupcompress',
 
3981
        'bzrlib.tests.test_hashcache',
 
3982
        'bzrlib.tests.test_help',
 
3983
        'bzrlib.tests.test_hooks',
 
3984
        'bzrlib.tests.test_http',
 
3985
        'bzrlib.tests.test_http_response',
 
3986
        'bzrlib.tests.test_https_ca_bundle',
 
3987
        'bzrlib.tests.test_i18n',
 
3988
        'bzrlib.tests.test_identitymap',
 
3989
        'bzrlib.tests.test_ignores',
 
3990
        'bzrlib.tests.test_index',
 
3991
        'bzrlib.tests.test_import_tariff',
 
3992
        'bzrlib.tests.test_info',
 
3993
        'bzrlib.tests.test_inv',
 
3994
        'bzrlib.tests.test_inventory_delta',
 
3995
        'bzrlib.tests.test_knit',
 
3996
        'bzrlib.tests.test_lazy_import',
 
3997
        'bzrlib.tests.test_lazy_regex',
 
3998
        'bzrlib.tests.test_library_state',
 
3999
        'bzrlib.tests.test_lock',
 
4000
        'bzrlib.tests.test_lockable_files',
 
4001
        'bzrlib.tests.test_lockdir',
 
4002
        'bzrlib.tests.test_log',
 
4003
        'bzrlib.tests.test_lru_cache',
 
4004
        'bzrlib.tests.test_lsprof',
 
4005
        'bzrlib.tests.test_mail_client',
 
4006
        'bzrlib.tests.test_matchers',
 
4007
        'bzrlib.tests.test_memorytree',
 
4008
        'bzrlib.tests.test_merge',
 
4009
        'bzrlib.tests.test_merge3',
 
4010
        'bzrlib.tests.test_merge_core',
 
4011
        'bzrlib.tests.test_merge_directive',
 
4012
        'bzrlib.tests.test_mergetools',
 
4013
        'bzrlib.tests.test_missing',
 
4014
        'bzrlib.tests.test_msgeditor',
 
4015
        'bzrlib.tests.test_multiparent',
 
4016
        'bzrlib.tests.test_mutabletree',
 
4017
        'bzrlib.tests.test_nonascii',
 
4018
        'bzrlib.tests.test_options',
 
4019
        'bzrlib.tests.test_osutils',
 
4020
        'bzrlib.tests.test_osutils_encodings',
 
4021
        'bzrlib.tests.test_pack',
 
4022
        'bzrlib.tests.test_patch',
 
4023
        'bzrlib.tests.test_patches',
 
4024
        'bzrlib.tests.test_permissions',
 
4025
        'bzrlib.tests.test_plugins',
 
4026
        'bzrlib.tests.test_progress',
 
4027
        'bzrlib.tests.test_pyutils',
 
4028
        'bzrlib.tests.test_read_bundle',
 
4029
        'bzrlib.tests.test_reconcile',
 
4030
        'bzrlib.tests.test_reconfigure',
 
4031
        'bzrlib.tests.test_registry',
 
4032
        'bzrlib.tests.test_remote',
 
4033
        'bzrlib.tests.test_rename_map',
 
4034
        'bzrlib.tests.test_repository',
 
4035
        'bzrlib.tests.test_revert',
 
4036
        'bzrlib.tests.test_revision',
 
4037
        'bzrlib.tests.test_revisionspec',
 
4038
        'bzrlib.tests.test_revisiontree',
 
4039
        'bzrlib.tests.test_rio',
 
4040
        'bzrlib.tests.test_rules',
 
4041
        'bzrlib.tests.test_sampler',
 
4042
        'bzrlib.tests.test_scenarios',
 
4043
        'bzrlib.tests.test_script',
 
4044
        'bzrlib.tests.test_selftest',
 
4045
        'bzrlib.tests.test_serializer',
 
4046
        'bzrlib.tests.test_setup',
 
4047
        'bzrlib.tests.test_sftp_transport',
 
4048
        'bzrlib.tests.test_shelf',
 
4049
        'bzrlib.tests.test_shelf_ui',
 
4050
        'bzrlib.tests.test_smart',
 
4051
        'bzrlib.tests.test_smart_add',
 
4052
        'bzrlib.tests.test_smart_request',
 
4053
        'bzrlib.tests.test_smart_signals',
 
4054
        'bzrlib.tests.test_smart_transport',
 
4055
        'bzrlib.tests.test_smtp_connection',
 
4056
        'bzrlib.tests.test_source',
 
4057
        'bzrlib.tests.test_ssh_transport',
 
4058
        'bzrlib.tests.test_status',
 
4059
        'bzrlib.tests.test_store',
 
4060
        'bzrlib.tests.test_strace',
 
4061
        'bzrlib.tests.test_subsume',
 
4062
        'bzrlib.tests.test_switch',
 
4063
        'bzrlib.tests.test_symbol_versioning',
 
4064
        'bzrlib.tests.test_tag',
 
4065
        'bzrlib.tests.test_test_server',
 
4066
        'bzrlib.tests.test_testament',
 
4067
        'bzrlib.tests.test_textfile',
 
4068
        'bzrlib.tests.test_textmerge',
 
4069
        'bzrlib.tests.test_cethread',
 
4070
        'bzrlib.tests.test_timestamp',
 
4071
        'bzrlib.tests.test_trace',
 
4072
        'bzrlib.tests.test_transactions',
 
4073
        'bzrlib.tests.test_transform',
 
4074
        'bzrlib.tests.test_transport',
 
4075
        'bzrlib.tests.test_transport_log',
 
4076
        'bzrlib.tests.test_tree',
 
4077
        'bzrlib.tests.test_treebuilder',
 
4078
        'bzrlib.tests.test_treeshape',
 
4079
        'bzrlib.tests.test_tsort',
 
4080
        'bzrlib.tests.test_tuned_gzip',
 
4081
        'bzrlib.tests.test_ui',
 
4082
        'bzrlib.tests.test_uncommit',
 
4083
        'bzrlib.tests.test_upgrade',
 
4084
        'bzrlib.tests.test_upgrade_stacked',
 
4085
        'bzrlib.tests.test_urlutils',
 
4086
        'bzrlib.tests.test_utextwrap',
 
4087
        'bzrlib.tests.test_version',
 
4088
        'bzrlib.tests.test_version_info',
 
4089
        'bzrlib.tests.test_versionedfile',
 
4090
        'bzrlib.tests.test_weave',
 
4091
        'bzrlib.tests.test_whitebox',
 
4092
        'bzrlib.tests.test_win32utils',
 
4093
        'bzrlib.tests.test_workingtree',
 
4094
        'bzrlib.tests.test_workingtree_4',
 
4095
        'bzrlib.tests.test_wsgi',
 
4096
        'bzrlib.tests.test_xml',
 
4097
        ]
 
4098
 
 
4099
 
 
4100
def _test_suite_modules_to_doctest():
 
4101
    """Return the list of modules to doctest."""
 
4102
    if __doc__ is None:
 
4103
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
4104
        return []
 
4105
    return [
 
4106
        'bzrlib',
 
4107
        'bzrlib.branchbuilder',
 
4108
        'bzrlib.decorators',
 
4109
        'bzrlib.inventory',
 
4110
        'bzrlib.iterablefile',
 
4111
        'bzrlib.lockdir',
 
4112
        'bzrlib.merge3',
 
4113
        'bzrlib.option',
 
4114
        'bzrlib.pyutils',
 
4115
        'bzrlib.symbol_versioning',
 
4116
        'bzrlib.tests',
 
4117
        'bzrlib.tests.fixtures',
 
4118
        'bzrlib.timestamp',
 
4119
        'bzrlib.transport.http',
 
4120
        'bzrlib.version_info_formats.format_custom',
 
4121
        ]
 
4122
 
 
4123
 
 
4124
def test_suite(keep_only=None, starting_with=None):
 
4125
    """Build and return TestSuite for the whole of bzrlib.
 
4126
 
 
4127
    :param keep_only: A list of test ids limiting the suite returned.
 
4128
 
 
4129
    :param starting_with: An id limiting the suite returned to the tests
 
4130
         starting with it.
 
4131
 
 
4132
    This function can be replaced if you need to change the default test
 
4133
    suite on a global basis, but it is not encouraged.
 
4134
    """
 
4135
 
 
4136
    loader = TestUtil.TestLoader()
 
4137
 
 
4138
    if keep_only is not None:
 
4139
        id_filter = TestIdList(keep_only)
 
4140
    if starting_with:
 
4141
        # We take precedence over keep_only because *at loading time* using
 
4142
        # both options means we will load less tests for the same final result.
 
4143
        def interesting_module(name):
 
4144
            for start in starting_with:
 
4145
                if (
 
4146
                    # Either the module name starts with the specified string
 
4147
                    name.startswith(start)
 
4148
                    # or it may contain tests starting with the specified string
 
4149
                    or start.startswith(name)
 
4150
                    ):
 
4151
                    return True
 
4152
            return False
 
4153
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
4154
 
 
4155
    elif keep_only is not None:
 
4156
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
4157
        def interesting_module(name):
 
4158
            return id_filter.refers_to(name)
 
4159
 
 
4160
    else:
 
4161
        loader = TestUtil.TestLoader()
 
4162
        def interesting_module(name):
 
4163
            # No filtering, all modules are interesting
 
4164
            return True
 
4165
 
 
4166
    suite = loader.suiteClass()
 
4167
 
 
4168
    # modules building their suite with loadTestsFromModuleNames
 
4169
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
4170
 
 
4171
    for mod in _test_suite_modules_to_doctest():
 
4172
        if not interesting_module(mod):
 
4173
            # No tests to keep here, move along
 
4174
            continue
 
4175
        try:
 
4176
            # note that this really does mean "report only" -- doctest
 
4177
            # still runs the rest of the examples
 
4178
            doc_suite = IsolatedDocTestSuite(
 
4179
                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
4180
        except ValueError, e:
 
4181
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
4182
            raise
 
4183
        if len(doc_suite._tests) == 0:
 
4184
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4185
        suite.addTest(doc_suite)
 
4186
 
 
4187
    default_encoding = sys.getdefaultencoding()
 
4188
    for name, plugin in _mod_plugin.plugins().items():
 
4189
        if not interesting_module(plugin.module.__name__):
 
4190
            continue
 
4191
        plugin_suite = plugin.test_suite()
 
4192
        # We used to catch ImportError here and turn it into just a warning,
 
4193
        # but really if you don't have --no-plugins this should be a failure.
 
4194
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4195
        if plugin_suite is None:
 
4196
            plugin_suite = plugin.load_plugin_tests(loader)
 
4197
        if plugin_suite is not None:
 
4198
            suite.addTest(plugin_suite)
 
4199
        if default_encoding != sys.getdefaultencoding():
 
4200
            trace.warning(
 
4201
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4202
                sys.getdefaultencoding())
 
4203
            reload(sys)
 
4204
            sys.setdefaultencoding(default_encoding)
 
4205
 
 
4206
    if keep_only is not None:
 
4207
        # Now that the referred modules have loaded their tests, keep only the
 
4208
        # requested ones.
 
4209
        suite = filter_suite_by_id_list(suite, id_filter)
 
4210
        # Do some sanity checks on the id_list filtering
 
4211
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4212
        if starting_with:
 
4213
            # The tester has used both keep_only and starting_with, so he is
 
4214
            # already aware that some tests are excluded from the list, there
 
4215
            # is no need to tell him which.
 
4216
            pass
 
4217
        else:
 
4218
            # Some tests mentioned in the list are not in the test suite. The
 
4219
            # list may be out of date, report to the tester.
 
4220
            for id in not_found:
 
4221
                trace.warning('"%s" not found in the test suite', id)
 
4222
        for id in duplicates:
 
4223
            trace.warning('"%s" is used as an id by several tests', id)
 
4224
 
 
4225
    return suite
 
4226
 
 
4227
 
 
4228
def multiply_scenarios(*scenarios):
 
4229
    """Multiply two or more iterables of scenarios.
 
4230
 
 
4231
    It is safe to pass scenario generators or iterators.
 
4232
 
 
4233
    :returns: A list of compound scenarios: the cross-product of all 
 
4234
        scenarios, with the names concatenated and the parameters
 
4235
        merged together.
 
4236
    """
 
4237
    return reduce(_multiply_two_scenarios, map(list, scenarios))
 
4238
 
 
4239
 
 
4240
def _multiply_two_scenarios(scenarios_left, scenarios_right):
 
4241
    """Multiply two sets of scenarios.
 
4242
 
 
4243
    :returns: the cartesian product of the two sets of scenarios, that is
 
4244
        a scenario for every possible combination of a left scenario and a
 
4245
        right scenario.
 
4246
    """
 
4247
    return [
 
4248
        ('%s,%s' % (left_name, right_name),
 
4249
         dict(left_dict.items() + right_dict.items()))
 
4250
        for left_name, left_dict in scenarios_left
 
4251
        for right_name, right_dict in scenarios_right]
 
4252
 
 
4253
 
 
4254
def multiply_tests(tests, scenarios, result):
 
4255
    """Multiply tests_list by scenarios into result.
 
4256
 
 
4257
    This is the core workhorse for test parameterisation.
 
4258
 
 
4259
    Typically the load_tests() method for a per-implementation test suite will
 
4260
    call multiply_tests and return the result.
 
4261
 
 
4262
    :param tests: The tests to parameterise.
 
4263
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4264
        scenario_param_dict).
 
4265
    :param result: A TestSuite to add created tests to.
 
4266
 
 
4267
    This returns the passed in result TestSuite with the cross product of all
 
4268
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4269
    the scenario name at the end of its id(), and updating the test object's
 
4270
    __dict__ with the scenario_param_dict.
 
4271
 
 
4272
    >>> import bzrlib.tests.test_sampler
 
4273
    >>> r = multiply_tests(
 
4274
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4275
    ...     [('one', dict(param=1)),
 
4276
    ...      ('two', dict(param=2))],
 
4277
    ...     TestUtil.TestSuite())
 
4278
    >>> tests = list(iter_suite_tests(r))
 
4279
    >>> len(tests)
 
4280
    2
 
4281
    >>> tests[0].id()
 
4282
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4283
    >>> tests[0].param
 
4284
    1
 
4285
    >>> tests[1].param
 
4286
    2
 
4287
    """
 
4288
    for test in iter_suite_tests(tests):
 
4289
        apply_scenarios(test, scenarios, result)
 
4290
    return result
 
4291
 
 
4292
 
 
4293
def apply_scenarios(test, scenarios, result):
 
4294
    """Apply the scenarios in scenarios to test and add to result.
 
4295
 
 
4296
    :param test: The test to apply scenarios to.
 
4297
    :param scenarios: An iterable of scenarios to apply to test.
 
4298
    :return: result
 
4299
    :seealso: apply_scenario
 
4300
    """
 
4301
    for scenario in scenarios:
 
4302
        result.addTest(apply_scenario(test, scenario))
 
4303
    return result
 
4304
 
 
4305
 
 
4306
def apply_scenario(test, scenario):
 
4307
    """Copy test and apply scenario to it.
 
4308
 
 
4309
    :param test: A test to adapt.
 
4310
    :param scenario: A tuple describing the scenarion.
 
4311
        The first element of the tuple is the new test id.
 
4312
        The second element is a dict containing attributes to set on the
 
4313
        test.
 
4314
    :return: The adapted test.
 
4315
    """
 
4316
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4317
    new_test = clone_test(test, new_id)
 
4318
    for name, value in scenario[1].items():
 
4319
        setattr(new_test, name, value)
 
4320
    return new_test
 
4321
 
 
4322
 
 
4323
def clone_test(test, new_id):
 
4324
    """Clone a test giving it a new id.
 
4325
 
 
4326
    :param test: The test to clone.
 
4327
    :param new_id: The id to assign to it.
 
4328
    :return: The new test.
 
4329
    """
 
4330
    new_test = copy.copy(test)
 
4331
    new_test.id = lambda: new_id
 
4332
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4333
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4334
    # read the test output for parameterized tests, because tracebacks will be
 
4335
    # associated with irrelevant tests.
 
4336
    try:
 
4337
        details = new_test._TestCase__details
 
4338
    except AttributeError:
 
4339
        # must be a different version of testtools than expected.  Do nothing.
 
4340
        pass
 
4341
    else:
 
4342
        # Reset the '__details' dict.
 
4343
        new_test._TestCase__details = {}
 
4344
    return new_test
 
4345
 
 
4346
 
 
4347
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4348
                                ext_module_name):
 
4349
    """Helper for permutating tests against an extension module.
 
4350
 
 
4351
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4352
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4353
    against both implementations. Setting 'test.module' to the appropriate
 
4354
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4355
 
 
4356
    :param standard_tests: A test suite to permute
 
4357
    :param loader: A TestLoader
 
4358
    :param py_module_name: The python path to a python module that can always
 
4359
        be loaded, and will be considered the 'python' implementation. (eg
 
4360
        'bzrlib._chk_map_py')
 
4361
    :param ext_module_name: The python path to an extension module. If the
 
4362
        module cannot be loaded, a single test will be added, which notes that
 
4363
        the module is not available. If it can be loaded, all standard_tests
 
4364
        will be run against that module.
 
4365
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4366
        tests. feature is the Feature object that can be used to determine if
 
4367
        the module is available.
 
4368
    """
 
4369
 
 
4370
    from bzrlib.tests.features import ModuleAvailableFeature
 
4371
    py_module = pyutils.get_named_object(py_module_name)
 
4372
    scenarios = [
 
4373
        ('python', {'module': py_module}),
 
4374
    ]
 
4375
    suite = loader.suiteClass()
 
4376
    feature = ModuleAvailableFeature(ext_module_name)
 
4377
    if feature.available():
 
4378
        scenarios.append(('C', {'module': feature.module}))
 
4379
    else:
 
4380
        # the compiled module isn't available, so we add a failing test
 
4381
        class FailWithoutFeature(TestCase):
 
4382
            def test_fail(self):
 
4383
                self.requireFeature(feature)
 
4384
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4385
    result = multiply_tests(standard_tests, scenarios, suite)
 
4386
    return result, feature
 
4387
 
 
4388
 
 
4389
def _rmtree_temp_dir(dirname, test_id=None):
 
4390
    # If LANG=C we probably have created some bogus paths
 
4391
    # which rmtree(unicode) will fail to delete
 
4392
    # so make sure we are using rmtree(str) to delete everything
 
4393
    # except on win32, where rmtree(str) will fail
 
4394
    # since it doesn't have the property of byte-stream paths
 
4395
    # (they are either ascii or mbcs)
 
4396
    if sys.platform == 'win32':
 
4397
        # make sure we are using the unicode win32 api
 
4398
        dirname = unicode(dirname)
 
4399
    else:
 
4400
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4401
    try:
 
4402
        osutils.rmtree(dirname)
 
4403
    except OSError, e:
 
4404
        # We don't want to fail here because some useful display will be lost
 
4405
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4406
        # possible info to the test runner is even worse.
 
4407
        if test_id != None:
 
4408
            ui.ui_factory.clear_term()
 
4409
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4410
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4411
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4412
                                    ).encode('ascii', 'replace')
 
4413
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4414
                         % (os.path.basename(dirname), printable_e))
 
4415
 
 
4416
 
 
4417
def probe_unicode_in_user_encoding():
 
4418
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4419
    Return first successfull match.
 
4420
 
 
4421
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4422
    """
 
4423
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4424
    for uni_val in possible_vals:
 
4425
        try:
 
4426
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4427
        except UnicodeEncodeError:
 
4428
            # Try a different character
 
4429
            pass
 
4430
        else:
 
4431
            return uni_val, str_val
 
4432
    return None, None
 
4433
 
 
4434
 
 
4435
def probe_bad_non_ascii(encoding):
 
4436
    """Try to find [bad] character with code [128..255]
 
4437
    that cannot be decoded to unicode in some encoding.
 
4438
    Return None if all non-ascii characters is valid
 
4439
    for given encoding.
 
4440
    """
 
4441
    for i in xrange(128, 256):
 
4442
        char = chr(i)
 
4443
        try:
 
4444
            char.decode(encoding)
 
4445
        except UnicodeDecodeError:
 
4446
            return char
 
4447
    return None
 
4448
 
 
4449
 
 
4450
# Only define SubUnitBzrRunner if subunit is available.
 
4451
try:
 
4452
    from subunit import TestProtocolClient
 
4453
    from subunit.test_results import AutoTimingTestResultDecorator
 
4454
    class SubUnitBzrProtocolClient(TestProtocolClient):
 
4455
 
 
4456
        def stopTest(self, test):
 
4457
            super(SubUnitBzrProtocolClient, self).stopTest(test)
 
4458
            _clear__type_equality_funcs(test)
 
4459
 
 
4460
        def addSuccess(self, test, details=None):
 
4461
            # The subunit client always includes the details in the subunit
 
4462
            # stream, but we don't want to include it in ours.
 
4463
            if details is not None and 'log' in details:
 
4464
                del details['log']
 
4465
            return super(SubUnitBzrProtocolClient, self).addSuccess(
 
4466
                test, details)
 
4467
 
 
4468
    class SubUnitBzrRunner(TextTestRunner):
 
4469
        def run(self, test):
 
4470
            result = AutoTimingTestResultDecorator(
 
4471
                SubUnitBzrProtocolClient(self.stream))
 
4472
            test.run(result)
 
4473
            return result
 
4474
except ImportError:
 
4475
    pass
 
4476
 
 
4477
 
 
4478
# API compatibility for old plugins; see bug 892622.
 
4479
for name in [
 
4480
    'Feature',
 
4481
    'HTTPServerFeature', 
 
4482
    'ModuleAvailableFeature',
 
4483
    'HTTPSServerFeature', 'SymlinkFeature', 'HardlinkFeature',
 
4484
    'OsFifoFeature', 'UnicodeFilenameFeature',
 
4485
    'ByteStringNamedFilesystem', 'UTF8Filesystem',
 
4486
    'BreakinFeature', 'CaseInsCasePresFilenameFeature',
 
4487
    'CaseInsensitiveFilesystemFeature', 'case_sensitive_filesystem_feature',
 
4488
    'posix_permissions_feature',
 
4489
    ]:
 
4490
    globals()[name] = _CompatabilityThunkFeature(
 
4491
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4492
        'bzrlib.tests', name,
 
4493
        name, 'bzrlib.tests.features')
 
4494
 
 
4495
 
 
4496
for (old_name, new_name) in [
 
4497
    ('UnicodeFilename', 'UnicodeFilenameFeature'),
 
4498
    ]:
 
4499
    globals()[name] = _CompatabilityThunkFeature(
 
4500
        symbol_versioning.deprecated_in((2, 5, 0)),
 
4501
        'bzrlib.tests', old_name,
 
4502
        new_name, 'bzrlib.tests.features')