~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Andrew Bennetts
  • Date: 2010-10-08 04:25:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5472.
  • Revision ID: andrew.bennetts@canonical.com-20101008042510-sg9vdhmnggilzxsk
Fix stray TAB in source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Testing framework extensions"""
 
18
 
 
19
# TODO: Perhaps there should be an API to find out if bzr running under the
 
20
# test suite -- some plugins might want to avoid making intrusive changes if
 
21
# this is the case.  However, we want behaviour under to test to diverge as
 
22
# little as possible, so this should be used rarely if it's added at all.
 
23
# (Suggestion from j-a-meinel, 2005-11-24)
 
24
 
 
25
# NOTE: Some classes in here use camelCaseNaming() rather than
 
26
# underscore_naming().  That's for consistency with unittest; it's not the
 
27
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
28
# new assertFoo() methods.
 
29
 
 
30
import atexit
 
31
import codecs
 
32
import copy
 
33
from cStringIO import StringIO
 
34
import difflib
 
35
import doctest
 
36
import errno
 
37
import itertools
 
38
import logging
 
39
import os
 
40
import platform
 
41
import pprint
 
42
import random
 
43
import re
 
44
import shlex
 
45
import stat
 
46
import subprocess
 
47
import sys
 
48
import tempfile
 
49
import threading
 
50
import time
 
51
import traceback
 
52
import unittest
 
53
import warnings
 
54
 
 
55
import testtools
 
56
# nb: check this before importing anything else from within it
 
57
_testtools_version = getattr(testtools, '__version__', ())
 
58
if _testtools_version < (0, 9, 2):
 
59
    raise ImportError("need at least testtools 0.9.2: %s is %r"
 
60
        % (testtools.__file__, _testtools_version))
 
61
from testtools import content
 
62
 
 
63
from bzrlib import (
 
64
    branchbuilder,
 
65
    bzrdir,
 
66
    chk_map,
 
67
    config,
 
68
    debug,
 
69
    errors,
 
70
    hooks,
 
71
    lock as _mod_lock,
 
72
    memorytree,
 
73
    osutils,
 
74
    ui,
 
75
    urlutils,
 
76
    registry,
 
77
    transport as _mod_transport,
 
78
    workingtree,
 
79
    )
 
80
import bzrlib.branch
 
81
import bzrlib.commands
 
82
import bzrlib.timestamp
 
83
import bzrlib.export
 
84
import bzrlib.inventory
 
85
import bzrlib.iterablefile
 
86
import bzrlib.lockdir
 
87
try:
 
88
    import bzrlib.lsprof
 
89
except ImportError:
 
90
    # lsprof not available
 
91
    pass
 
92
from bzrlib.merge import merge_inner
 
93
import bzrlib.merge3
 
94
import bzrlib.plugin
 
95
from bzrlib.smart import client, request, server
 
96
import bzrlib.store
 
97
from bzrlib import symbol_versioning
 
98
from bzrlib.symbol_versioning import (
 
99
    DEPRECATED_PARAMETER,
 
100
    deprecated_function,
 
101
    deprecated_in,
 
102
    deprecated_method,
 
103
    deprecated_passed,
 
104
    )
 
105
import bzrlib.trace
 
106
from bzrlib.transport import (
 
107
    memory,
 
108
    pathfilter,
 
109
    )
 
110
from bzrlib.trace import mutter, note
 
111
from bzrlib.tests import (
 
112
    test_server,
 
113
    TestUtil,
 
114
    treeshape,
 
115
    )
 
116
from bzrlib.ui import NullProgressView
 
117
from bzrlib.ui.text import TextUIFactory
 
118
import bzrlib.version_info_formats.format_custom
 
119
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
120
 
 
121
# Mark this python module as being part of the implementation
 
122
# of unittest: this gives us better tracebacks where the last
 
123
# shown frame is the test code, not our assertXYZ.
 
124
__unittest = 1
 
125
 
 
126
default_transport = test_server.LocalURLServer
 
127
 
 
128
 
 
129
_unitialized_attr = object()
 
130
"""A sentinel needed to act as a default value in a method signature."""
 
131
 
 
132
 
 
133
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
134
SUBUNIT_SEEK_SET = 0
 
135
SUBUNIT_SEEK_CUR = 1
 
136
 
 
137
# These are intentionally brought into this namespace. That way plugins, etc
 
138
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
 
139
TestSuite = TestUtil.TestSuite
 
140
TestLoader = TestUtil.TestLoader
 
141
 
 
142
class ExtendedTestResult(testtools.TextTestResult):
 
143
    """Accepts, reports and accumulates the results of running tests.
 
144
 
 
145
    Compared to the unittest version this class adds support for
 
146
    profiling, benchmarking, stopping as soon as a test fails,  and
 
147
    skipping tests.  There are further-specialized subclasses for
 
148
    different types of display.
 
149
 
 
150
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
151
    addFailure or addError classes.  These in turn may redirect to a more
 
152
    specific case for the special test results supported by our extended
 
153
    tests.
 
154
 
 
155
    Note that just one of these objects is fed the results from many tests.
 
156
    """
 
157
 
 
158
    stop_early = False
 
159
 
 
160
    def __init__(self, stream, descriptions, verbosity,
 
161
                 bench_history=None,
 
162
                 strict=False,
 
163
                 ):
 
164
        """Construct new TestResult.
 
165
 
 
166
        :param bench_history: Optionally, a writable file object to accumulate
 
167
            benchmark results.
 
168
        """
 
169
        testtools.TextTestResult.__init__(self, stream)
 
170
        if bench_history is not None:
 
171
            from bzrlib.version import _get_bzr_source_tree
 
172
            src_tree = _get_bzr_source_tree()
 
173
            if src_tree:
 
174
                try:
 
175
                    revision_id = src_tree.get_parent_ids()[0]
 
176
                except IndexError:
 
177
                    # XXX: if this is a brand new tree, do the same as if there
 
178
                    # is no branch.
 
179
                    revision_id = ''
 
180
            else:
 
181
                # XXX: If there's no branch, what should we do?
 
182
                revision_id = ''
 
183
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
184
        self._bench_history = bench_history
 
185
        self.ui = ui.ui_factory
 
186
        self.num_tests = 0
 
187
        self.error_count = 0
 
188
        self.failure_count = 0
 
189
        self.known_failure_count = 0
 
190
        self.skip_count = 0
 
191
        self.not_applicable_count = 0
 
192
        self.unsupported = {}
 
193
        self.count = 0
 
194
        self._overall_start_time = time.time()
 
195
        self._strict = strict
 
196
        self._first_thread_leaker_id = None
 
197
        self._tests_leaking_threads_count = 0
 
198
 
 
199
    def stopTestRun(self):
 
200
        run = self.testsRun
 
201
        actionTaken = "Ran"
 
202
        stopTime = time.time()
 
203
        timeTaken = stopTime - self.startTime
 
204
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
 
205
        #                the parent class method is similar have to duplicate
 
206
        self._show_list('ERROR', self.errors)
 
207
        self._show_list('FAIL', self.failures)
 
208
        self.stream.write(self.sep2)
 
209
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
210
                            run, run != 1 and "s" or "", timeTaken))
 
211
        if not self.wasSuccessful():
 
212
            self.stream.write("FAILED (")
 
213
            failed, errored = map(len, (self.failures, self.errors))
 
214
            if failed:
 
215
                self.stream.write("failures=%d" % failed)
 
216
            if errored:
 
217
                if failed: self.stream.write(", ")
 
218
                self.stream.write("errors=%d" % errored)
 
219
            if self.known_failure_count:
 
220
                if failed or errored: self.stream.write(", ")
 
221
                self.stream.write("known_failure_count=%d" %
 
222
                    self.known_failure_count)
 
223
            self.stream.write(")\n")
 
224
        else:
 
225
            if self.known_failure_count:
 
226
                self.stream.write("OK (known_failures=%d)\n" %
 
227
                    self.known_failure_count)
 
228
            else:
 
229
                self.stream.write("OK\n")
 
230
        if self.skip_count > 0:
 
231
            skipped = self.skip_count
 
232
            self.stream.write('%d test%s skipped\n' %
 
233
                                (skipped, skipped != 1 and "s" or ""))
 
234
        if self.unsupported:
 
235
            for feature, count in sorted(self.unsupported.items()):
 
236
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
237
                    (feature, count))
 
238
        if self._strict:
 
239
            ok = self.wasStrictlySuccessful()
 
240
        else:
 
241
            ok = self.wasSuccessful()
 
242
        if self._first_thread_leaker_id:
 
243
            self.stream.write(
 
244
                '%s is leaking threads among %d leaking tests.\n' % (
 
245
                self._first_thread_leaker_id,
 
246
                self._tests_leaking_threads_count))
 
247
            # We don't report the main thread as an active one.
 
248
            self.stream.write(
 
249
                '%d non-main threads were left active in the end.\n'
 
250
                % (len(self._active_threads) - 1))
 
251
 
 
252
    def getDescription(self, test):
 
253
        return test.id()
 
254
 
 
255
    def _extractBenchmarkTime(self, testCase, details=None):
 
256
        """Add a benchmark time for the current test case."""
 
257
        if details and 'benchtime' in details:
 
258
            return float(''.join(details['benchtime'].iter_bytes()))
 
259
        return getattr(testCase, "_benchtime", None)
 
260
 
 
261
    def _elapsedTestTimeString(self):
 
262
        """Return a time string for the overall time the current test has taken."""
 
263
        return self._formatTime(self._delta_to_float(
 
264
            self._now() - self._start_datetime))
 
265
 
 
266
    def _testTimeString(self, testCase):
 
267
        benchmark_time = self._extractBenchmarkTime(testCase)
 
268
        if benchmark_time is not None:
 
269
            return self._formatTime(benchmark_time) + "*"
 
270
        else:
 
271
            return self._elapsedTestTimeString()
 
272
 
 
273
    def _formatTime(self, seconds):
 
274
        """Format seconds as milliseconds with leading spaces."""
 
275
        # some benchmarks can take thousands of seconds to run, so we need 8
 
276
        # places
 
277
        return "%8dms" % (1000 * seconds)
 
278
 
 
279
    def _shortened_test_description(self, test):
 
280
        what = test.id()
 
281
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
282
        return what
 
283
 
 
284
    def startTest(self, test):
 
285
        super(ExtendedTestResult, self).startTest(test)
 
286
        if self.count == 0:
 
287
            self.startTests()
 
288
        self.count += 1
 
289
        self.report_test_start(test)
 
290
        test.number = self.count
 
291
        self._recordTestStartTime()
 
292
        # Only check for thread leaks if the test case supports cleanups
 
293
        addCleanup = getattr(test, "addCleanup", None)
 
294
        if addCleanup is not None:
 
295
            addCleanup(self._check_leaked_threads, test)
 
296
 
 
297
    def startTests(self):
 
298
        self.report_tests_starting()
 
299
        self._active_threads = threading.enumerate()
 
300
 
 
301
    def _check_leaked_threads(self, test):
 
302
        """See if any threads have leaked since last call
 
303
 
 
304
        A sample of live threads is stored in the _active_threads attribute,
 
305
        when this method runs it compares the current live threads and any not
 
306
        in the previous sample are treated as having leaked.
 
307
        """
 
308
        now_active_threads = set(threading.enumerate())
 
309
        threads_leaked = now_active_threads.difference(self._active_threads)
 
310
        if threads_leaked:
 
311
            self._report_thread_leak(test, threads_leaked, now_active_threads)
 
312
            self._tests_leaking_threads_count += 1
 
313
            if self._first_thread_leaker_id is None:
 
314
                self._first_thread_leaker_id = test.id()
 
315
            self._active_threads = now_active_threads
 
316
 
 
317
    def _recordTestStartTime(self):
 
318
        """Record that a test has started."""
 
319
        self._start_datetime = self._now()
 
320
 
 
321
    def addError(self, test, err):
 
322
        """Tell result that test finished with an error.
 
323
 
 
324
        Called from the TestCase run() method when the test
 
325
        fails with an unexpected error.
 
326
        """
 
327
        self._post_mortem()
 
328
        super(ExtendedTestResult, self).addError(test, err)
 
329
        self.error_count += 1
 
330
        self.report_error(test, err)
 
331
        if self.stop_early:
 
332
            self.stop()
 
333
 
 
334
    def addFailure(self, test, err):
 
335
        """Tell result that test failed.
 
336
 
 
337
        Called from the TestCase run() method when the test
 
338
        fails because e.g. an assert() method failed.
 
339
        """
 
340
        self._post_mortem()
 
341
        super(ExtendedTestResult, self).addFailure(test, err)
 
342
        self.failure_count += 1
 
343
        self.report_failure(test, err)
 
344
        if self.stop_early:
 
345
            self.stop()
 
346
 
 
347
    def addSuccess(self, test, details=None):
 
348
        """Tell result that test completed successfully.
 
349
 
 
350
        Called from the TestCase run()
 
351
        """
 
352
        if self._bench_history is not None:
 
353
            benchmark_time = self._extractBenchmarkTime(test, details)
 
354
            if benchmark_time is not None:
 
355
                self._bench_history.write("%s %s\n" % (
 
356
                    self._formatTime(benchmark_time),
 
357
                    test.id()))
 
358
        self.report_success(test)
 
359
        super(ExtendedTestResult, self).addSuccess(test)
 
360
        test._log_contents = ''
 
361
 
 
362
    def addExpectedFailure(self, test, err):
 
363
        self.known_failure_count += 1
 
364
        self.report_known_failure(test, err)
 
365
 
 
366
    def addNotSupported(self, test, feature):
 
367
        """The test will not be run because of a missing feature.
 
368
        """
 
369
        # this can be called in two different ways: it may be that the
 
370
        # test started running, and then raised (through requireFeature)
 
371
        # UnavailableFeature.  Alternatively this method can be called
 
372
        # while probing for features before running the test code proper; in
 
373
        # that case we will see startTest and stopTest, but the test will
 
374
        # never actually run.
 
375
        self.unsupported.setdefault(str(feature), 0)
 
376
        self.unsupported[str(feature)] += 1
 
377
        self.report_unsupported(test, feature)
 
378
 
 
379
    def addSkip(self, test, reason):
 
380
        """A test has not run for 'reason'."""
 
381
        self.skip_count += 1
 
382
        self.report_skip(test, reason)
 
383
 
 
384
    def addNotApplicable(self, test, reason):
 
385
        self.not_applicable_count += 1
 
386
        self.report_not_applicable(test, reason)
 
387
 
 
388
    def _post_mortem(self):
 
389
        """Start a PDB post mortem session."""
 
390
        if os.environ.get('BZR_TEST_PDB', None):
 
391
            import pdb;pdb.post_mortem()
 
392
 
 
393
    def progress(self, offset, whence):
 
394
        """The test is adjusting the count of tests to run."""
 
395
        if whence == SUBUNIT_SEEK_SET:
 
396
            self.num_tests = offset
 
397
        elif whence == SUBUNIT_SEEK_CUR:
 
398
            self.num_tests += offset
 
399
        else:
 
400
            raise errors.BzrError("Unknown whence %r" % whence)
 
401
 
 
402
    def report_tests_starting(self):
 
403
        """Display information before the test run begins"""
 
404
        if getattr(sys, 'frozen', None) is None:
 
405
            bzr_path = osutils.realpath(sys.argv[0])
 
406
        else:
 
407
            bzr_path = sys.executable
 
408
        self.stream.write(
 
409
            'bzr selftest: %s\n' % (bzr_path,))
 
410
        self.stream.write(
 
411
            '   %s\n' % (
 
412
                    bzrlib.__path__[0],))
 
413
        self.stream.write(
 
414
            '   bzr-%s python-%s %s\n' % (
 
415
                    bzrlib.version_string,
 
416
                    bzrlib._format_version_tuple(sys.version_info),
 
417
                    platform.platform(aliased=1),
 
418
                    ))
 
419
        self.stream.write('\n')
 
420
 
 
421
    def report_test_start(self, test):
 
422
        """Display information on the test just about to be run"""
 
423
 
 
424
    def _report_thread_leak(self, test, leaked_threads, active_threads):
 
425
        """Display information on a test that leaked one or more threads"""
 
426
        # GZ 2010-09-09: A leak summary reported separately from the general
 
427
        #                thread debugging would be nice. Tests under subunit
 
428
        #                need something not using stream, perhaps adding a
 
429
        #                testtools details object would be fitting.
 
430
        if 'threads' in selftest_debug_flags:
 
431
            self.stream.write('%s is leaking, active is now %d\n' %
 
432
                (test.id(), len(active_threads)))
 
433
 
 
434
    def startTestRun(self):
 
435
        self.startTime = time.time()
 
436
 
 
437
    def report_success(self, test):
 
438
        pass
 
439
 
 
440
    def wasStrictlySuccessful(self):
 
441
        if self.unsupported or self.known_failure_count:
 
442
            return False
 
443
        return self.wasSuccessful()
 
444
 
 
445
 
 
446
class TextTestResult(ExtendedTestResult):
 
447
    """Displays progress and results of tests in text form"""
 
448
 
 
449
    def __init__(self, stream, descriptions, verbosity,
 
450
                 bench_history=None,
 
451
                 pb=None,
 
452
                 strict=None,
 
453
                 ):
 
454
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
455
            bench_history, strict)
 
456
        # We no longer pass them around, but just rely on the UIFactory stack
 
457
        # for state
 
458
        if pb is not None:
 
459
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
460
        self.pb = self.ui.nested_progress_bar()
 
461
        self.pb.show_pct = False
 
462
        self.pb.show_spinner = False
 
463
        self.pb.show_eta = False,
 
464
        self.pb.show_count = False
 
465
        self.pb.show_bar = False
 
466
        self.pb.update_latency = 0
 
467
        self.pb.show_transport_activity = False
 
468
 
 
469
    def stopTestRun(self):
 
470
        # called when the tests that are going to run have run
 
471
        self.pb.clear()
 
472
        self.pb.finished()
 
473
        super(TextTestResult, self).stopTestRun()
 
474
 
 
475
    def report_tests_starting(self):
 
476
        super(TextTestResult, self).report_tests_starting()
 
477
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
478
 
 
479
    def _progress_prefix_text(self):
 
480
        # the longer this text, the less space we have to show the test
 
481
        # name...
 
482
        a = '[%d' % self.count              # total that have been run
 
483
        # tests skipped as known not to be relevant are not important enough
 
484
        # to show here
 
485
        ## if self.skip_count:
 
486
        ##     a += ', %d skip' % self.skip_count
 
487
        ## if self.known_failure_count:
 
488
        ##     a += '+%dX' % self.known_failure_count
 
489
        if self.num_tests:
 
490
            a +='/%d' % self.num_tests
 
491
        a += ' in '
 
492
        runtime = time.time() - self._overall_start_time
 
493
        if runtime >= 60:
 
494
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
495
        else:
 
496
            a += '%ds' % runtime
 
497
        total_fail_count = self.error_count + self.failure_count
 
498
        if total_fail_count:
 
499
            a += ', %d failed' % total_fail_count
 
500
        # if self.unsupported:
 
501
        #     a += ', %d missing' % len(self.unsupported)
 
502
        a += ']'
 
503
        return a
 
504
 
 
505
    def report_test_start(self, test):
 
506
        self.pb.update(
 
507
                self._progress_prefix_text()
 
508
                + ' '
 
509
                + self._shortened_test_description(test))
 
510
 
 
511
    def _test_description(self, test):
 
512
        return self._shortened_test_description(test)
 
513
 
 
514
    def report_error(self, test, err):
 
515
        self.stream.write('ERROR: %s\n    %s\n' % (
 
516
            self._test_description(test),
 
517
            err[1],
 
518
            ))
 
519
 
 
520
    def report_failure(self, test, err):
 
521
        self.stream.write('FAIL: %s\n    %s\n' % (
 
522
            self._test_description(test),
 
523
            err[1],
 
524
            ))
 
525
 
 
526
    def report_known_failure(self, test, err):
 
527
        pass
 
528
 
 
529
    def report_skip(self, test, reason):
 
530
        pass
 
531
 
 
532
    def report_not_applicable(self, test, reason):
 
533
        pass
 
534
 
 
535
    def report_unsupported(self, test, feature):
 
536
        """test cannot be run because feature is missing."""
 
537
 
 
538
 
 
539
class VerboseTestResult(ExtendedTestResult):
 
540
    """Produce long output, with one line per test run plus times"""
 
541
 
 
542
    def _ellipsize_to_right(self, a_string, final_width):
 
543
        """Truncate and pad a string, keeping the right hand side"""
 
544
        if len(a_string) > final_width:
 
545
            result = '...' + a_string[3-final_width:]
 
546
        else:
 
547
            result = a_string
 
548
        return result.ljust(final_width)
 
549
 
 
550
    def report_tests_starting(self):
 
551
        self.stream.write('running %d tests...\n' % self.num_tests)
 
552
        super(VerboseTestResult, self).report_tests_starting()
 
553
 
 
554
    def report_test_start(self, test):
 
555
        name = self._shortened_test_description(test)
 
556
        width = osutils.terminal_width()
 
557
        if width is not None:
 
558
            # width needs space for 6 char status, plus 1 for slash, plus an
 
559
            # 11-char time string, plus a trailing blank
 
560
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
561
            # space
 
562
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
563
        else:
 
564
            self.stream.write(name)
 
565
        self.stream.flush()
 
566
 
 
567
    def _error_summary(self, err):
 
568
        indent = ' ' * 4
 
569
        return '%s%s' % (indent, err[1])
 
570
 
 
571
    def report_error(self, test, err):
 
572
        self.stream.write('ERROR %s\n%s\n'
 
573
                % (self._testTimeString(test),
 
574
                   self._error_summary(err)))
 
575
 
 
576
    def report_failure(self, test, err):
 
577
        self.stream.write(' FAIL %s\n%s\n'
 
578
                % (self._testTimeString(test),
 
579
                   self._error_summary(err)))
 
580
 
 
581
    def report_known_failure(self, test, err):
 
582
        self.stream.write('XFAIL %s\n%s\n'
 
583
                % (self._testTimeString(test),
 
584
                   self._error_summary(err)))
 
585
 
 
586
    def report_success(self, test):
 
587
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
588
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
589
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
590
            stats.pprint(file=self.stream)
 
591
        # flush the stream so that we get smooth output. This verbose mode is
 
592
        # used to show the output in PQM.
 
593
        self.stream.flush()
 
594
 
 
595
    def report_skip(self, test, reason):
 
596
        self.stream.write(' SKIP %s\n%s\n'
 
597
                % (self._testTimeString(test), reason))
 
598
 
 
599
    def report_not_applicable(self, test, reason):
 
600
        self.stream.write('  N/A %s\n    %s\n'
 
601
                % (self._testTimeString(test), reason))
 
602
 
 
603
    def report_unsupported(self, test, feature):
 
604
        """test cannot be run because feature is missing."""
 
605
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
606
                %(self._testTimeString(test), feature))
 
607
 
 
608
 
 
609
class TextTestRunner(object):
 
610
    stop_on_failure = False
 
611
 
 
612
    def __init__(self,
 
613
                 stream=sys.stderr,
 
614
                 descriptions=0,
 
615
                 verbosity=1,
 
616
                 bench_history=None,
 
617
                 strict=False,
 
618
                 result_decorators=None,
 
619
                 ):
 
620
        """Create a TextTestRunner.
 
621
 
 
622
        :param result_decorators: An optional list of decorators to apply
 
623
            to the result object being used by the runner. Decorators are
 
624
            applied left to right - the first element in the list is the 
 
625
            innermost decorator.
 
626
        """
 
627
        # stream may know claim to know to write unicode strings, but in older
 
628
        # pythons this goes sufficiently wrong that it is a bad idea. (
 
629
        # specifically a built in file with encoding 'UTF-8' will still try
 
630
        # to encode using ascii.
 
631
        new_encoding = osutils.get_terminal_encoding()
 
632
        codec = codecs.lookup(new_encoding)
 
633
        if type(codec) is tuple:
 
634
            # Python 2.4
 
635
            encode = codec[0]
 
636
        else:
 
637
            encode = codec.encode
 
638
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
 
639
        stream.encoding = new_encoding
 
640
        self.stream = stream
 
641
        self.descriptions = descriptions
 
642
        self.verbosity = verbosity
 
643
        self._bench_history = bench_history
 
644
        self._strict = strict
 
645
        self._result_decorators = result_decorators or []
 
646
 
 
647
    def run(self, test):
 
648
        "Run the given test case or test suite."
 
649
        if self.verbosity == 1:
 
650
            result_class = TextTestResult
 
651
        elif self.verbosity >= 2:
 
652
            result_class = VerboseTestResult
 
653
        original_result = result_class(self.stream,
 
654
                              self.descriptions,
 
655
                              self.verbosity,
 
656
                              bench_history=self._bench_history,
 
657
                              strict=self._strict,
 
658
                              )
 
659
        # Signal to result objects that look at stop early policy to stop,
 
660
        original_result.stop_early = self.stop_on_failure
 
661
        result = original_result
 
662
        for decorator in self._result_decorators:
 
663
            result = decorator(result)
 
664
            result.stop_early = self.stop_on_failure
 
665
        result.startTestRun()
 
666
        try:
 
667
            test.run(result)
 
668
        finally:
 
669
            result.stopTestRun()
 
670
        # higher level code uses our extended protocol to determine
 
671
        # what exit code to give.
 
672
        return original_result
 
673
 
 
674
 
 
675
def iter_suite_tests(suite):
 
676
    """Return all tests in a suite, recursing through nested suites"""
 
677
    if isinstance(suite, unittest.TestCase):
 
678
        yield suite
 
679
    elif isinstance(suite, unittest.TestSuite):
 
680
        for item in suite:
 
681
            for r in iter_suite_tests(item):
 
682
                yield r
 
683
    else:
 
684
        raise Exception('unknown type %r for object %r'
 
685
                        % (type(suite), suite))
 
686
 
 
687
 
 
688
TestSkipped = testtools.testcase.TestSkipped
 
689
 
 
690
 
 
691
class TestNotApplicable(TestSkipped):
 
692
    """A test is not applicable to the situation where it was run.
 
693
 
 
694
    This is only normally raised by parameterized tests, if they find that
 
695
    the instance they're constructed upon does not support one aspect
 
696
    of its interface.
 
697
    """
 
698
 
 
699
 
 
700
# traceback._some_str fails to format exceptions that have the default
 
701
# __str__ which does an implicit ascii conversion. However, repr() on those
 
702
# objects works, for all that its not quite what the doctor may have ordered.
 
703
def _clever_some_str(value):
 
704
    try:
 
705
        return str(value)
 
706
    except:
 
707
        try:
 
708
            return repr(value).replace('\\n', '\n')
 
709
        except:
 
710
            return '<unprintable %s object>' % type(value).__name__
 
711
 
 
712
traceback._some_str = _clever_some_str
 
713
 
 
714
 
 
715
# deprecated - use self.knownFailure(), or self.expectFailure.
 
716
KnownFailure = testtools.testcase._ExpectedFailure
 
717
 
 
718
 
 
719
class UnavailableFeature(Exception):
 
720
    """A feature required for this test was not available.
 
721
 
 
722
    This can be considered a specialised form of SkippedTest.
 
723
 
 
724
    The feature should be used to construct the exception.
 
725
    """
 
726
 
 
727
 
 
728
class StringIOWrapper(object):
 
729
    """A wrapper around cStringIO which just adds an encoding attribute.
 
730
 
 
731
    Internally we can check sys.stdout to see what the output encoding
 
732
    should be. However, cStringIO has no encoding attribute that we can
 
733
    set. So we wrap it instead.
 
734
    """
 
735
    encoding='ascii'
 
736
    _cstring = None
 
737
 
 
738
    def __init__(self, s=None):
 
739
        if s is not None:
 
740
            self.__dict__['_cstring'] = StringIO(s)
 
741
        else:
 
742
            self.__dict__['_cstring'] = StringIO()
 
743
 
 
744
    def __getattr__(self, name, getattr=getattr):
 
745
        return getattr(self.__dict__['_cstring'], name)
 
746
 
 
747
    def __setattr__(self, name, val):
 
748
        if name == 'encoding':
 
749
            self.__dict__['encoding'] = val
 
750
        else:
 
751
            return setattr(self._cstring, name, val)
 
752
 
 
753
 
 
754
class TestUIFactory(TextUIFactory):
 
755
    """A UI Factory for testing.
 
756
 
 
757
    Hide the progress bar but emit note()s.
 
758
    Redirect stdin.
 
759
    Allows get_password to be tested without real tty attached.
 
760
 
 
761
    See also CannedInputUIFactory which lets you provide programmatic input in
 
762
    a structured way.
 
763
    """
 
764
    # TODO: Capture progress events at the model level and allow them to be
 
765
    # observed by tests that care.
 
766
    #
 
767
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
768
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
769
    # idea of how they're supposed to be different.
 
770
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
771
 
 
772
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
773
        if stdin is not None:
 
774
            # We use a StringIOWrapper to be able to test various
 
775
            # encodings, but the user is still responsible to
 
776
            # encode the string and to set the encoding attribute
 
777
            # of StringIOWrapper.
 
778
            stdin = StringIOWrapper(stdin)
 
779
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
780
 
 
781
    def get_non_echoed_password(self):
 
782
        """Get password from stdin without trying to handle the echo mode"""
 
783
        password = self.stdin.readline()
 
784
        if not password:
 
785
            raise EOFError
 
786
        if password[-1] == '\n':
 
787
            password = password[:-1]
 
788
        return password
 
789
 
 
790
    def make_progress_view(self):
 
791
        return NullProgressView()
 
792
 
 
793
 
 
794
class TestCase(testtools.TestCase):
 
795
    """Base class for bzr unit tests.
 
796
 
 
797
    Tests that need access to disk resources should subclass
 
798
    TestCaseInTempDir not TestCase.
 
799
 
 
800
    Error and debug log messages are redirected from their usual
 
801
    location into a temporary file, the contents of which can be
 
802
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
803
    so that it can also capture file IO.  When the test completes this file
 
804
    is read into memory and removed from disk.
 
805
 
 
806
    There are also convenience functions to invoke bzr's command-line
 
807
    routine, and to build and check bzr trees.
 
808
 
 
809
    In addition to the usual method of overriding tearDown(), this class also
 
810
    allows subclasses to register cleanup functions via addCleanup, which are
 
811
    run in order as the object is torn down.  It's less likely this will be
 
812
    accidentally overlooked.
 
813
    """
 
814
 
 
815
    _log_file = None
 
816
    # record lsprof data when performing benchmark calls.
 
817
    _gather_lsprof_in_benchmarks = False
 
818
 
 
819
    def __init__(self, methodName='testMethod'):
 
820
        super(TestCase, self).__init__(methodName)
 
821
        self._directory_isolation = True
 
822
        self.exception_handlers.insert(0,
 
823
            (UnavailableFeature, self._do_unsupported_or_skip))
 
824
        self.exception_handlers.insert(0,
 
825
            (TestNotApplicable, self._do_not_applicable))
 
826
 
 
827
    def setUp(self):
 
828
        super(TestCase, self).setUp()
 
829
        for feature in getattr(self, '_test_needs_features', []):
 
830
            self.requireFeature(feature)
 
831
        self._log_contents = None
 
832
        self.addDetail("log", content.Content(content.ContentType("text",
 
833
            "plain", {"charset": "utf8"}),
 
834
            lambda:[self._get_log(keep_log_file=True)]))
 
835
        self._cleanEnvironment()
 
836
        self._silenceUI()
 
837
        self._startLogFile()
 
838
        self._benchcalls = []
 
839
        self._benchtime = None
 
840
        self._clear_hooks()
 
841
        self._track_transports()
 
842
        self._track_locks()
 
843
        self._clear_debug_flags()
 
844
 
 
845
    def debug(self):
 
846
        # debug a frame up.
 
847
        import pdb
 
848
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
849
 
 
850
    def discardDetail(self, name):
 
851
        """Extend the addDetail, getDetails api so we can remove a detail.
 
852
 
 
853
        eg. bzr always adds the 'log' detail at startup, but we don't want to
 
854
        include it for skipped, xfail, etc tests.
 
855
 
 
856
        It is safe to call this for a detail that doesn't exist, in case this
 
857
        gets called multiple times.
 
858
        """
 
859
        # We cheat. details is stored in __details which means we shouldn't
 
860
        # touch it. but getDetails() returns the dict directly, so we can
 
861
        # mutate it.
 
862
        details = self.getDetails()
 
863
        if name in details:
 
864
            del details[name]
 
865
 
 
866
    def _clear_debug_flags(self):
 
867
        """Prevent externally set debug flags affecting tests.
 
868
 
 
869
        Tests that want to use debug flags can just set them in the
 
870
        debug_flags set during setup/teardown.
 
871
        """
 
872
        # Start with a copy of the current debug flags we can safely modify.
 
873
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
 
874
        if 'allow_debug' not in selftest_debug_flags:
 
875
            debug.debug_flags.clear()
 
876
        if 'disable_lock_checks' not in selftest_debug_flags:
 
877
            debug.debug_flags.add('strict_locks')
 
878
 
 
879
    def _clear_hooks(self):
 
880
        # prevent hooks affecting tests
 
881
        self._preserved_hooks = {}
 
882
        for key, factory in hooks.known_hooks.items():
 
883
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
884
            current_hooks = hooks.known_hooks_key_to_object(key)
 
885
            self._preserved_hooks[parent] = (name, current_hooks)
 
886
        self.addCleanup(self._restoreHooks)
 
887
        for key, factory in hooks.known_hooks.items():
 
888
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
889
            setattr(parent, name, factory())
 
890
        # this hook should always be installed
 
891
        request._install_hook()
 
892
 
 
893
    def disable_directory_isolation(self):
 
894
        """Turn off directory isolation checks."""
 
895
        self._directory_isolation = False
 
896
 
 
897
    def enable_directory_isolation(self):
 
898
        """Enable directory isolation checks."""
 
899
        self._directory_isolation = True
 
900
 
 
901
    def _silenceUI(self):
 
902
        """Turn off UI for duration of test"""
 
903
        # by default the UI is off; tests can turn it on if they want it.
 
904
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
 
905
 
 
906
    def _check_locks(self):
 
907
        """Check that all lock take/release actions have been paired."""
 
908
        # We always check for mismatched locks. If a mismatch is found, we
 
909
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
910
        # case we just print a warning.
 
911
        # unhook:
 
912
        acquired_locks = [lock for action, lock in self._lock_actions
 
913
                          if action == 'acquired']
 
914
        released_locks = [lock for action, lock in self._lock_actions
 
915
                          if action == 'released']
 
916
        broken_locks = [lock for action, lock in self._lock_actions
 
917
                        if action == 'broken']
 
918
        # trivially, given the tests for lock acquistion and release, if we
 
919
        # have as many in each list, it should be ok. Some lock tests also
 
920
        # break some locks on purpose and should be taken into account by
 
921
        # considering that breaking a lock is just a dirty way of releasing it.
 
922
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
923
            message = ('Different number of acquired and '
 
924
                       'released or broken locks. (%s, %s + %s)' %
 
925
                       (acquired_locks, released_locks, broken_locks))
 
926
            if not self._lock_check_thorough:
 
927
                # Rather than fail, just warn
 
928
                print "Broken test %s: %s" % (self, message)
 
929
                return
 
930
            self.fail(message)
 
931
 
 
932
    def _track_locks(self):
 
933
        """Track lock activity during tests."""
 
934
        self._lock_actions = []
 
935
        if 'disable_lock_checks' in selftest_debug_flags:
 
936
            self._lock_check_thorough = False
 
937
        else:
 
938
            self._lock_check_thorough = True
 
939
 
 
940
        self.addCleanup(self._check_locks)
 
941
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
942
                                                self._lock_acquired, None)
 
943
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
944
                                                self._lock_released, None)
 
945
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
946
                                                self._lock_broken, None)
 
947
 
 
948
    def _lock_acquired(self, result):
 
949
        self._lock_actions.append(('acquired', result))
 
950
 
 
951
    def _lock_released(self, result):
 
952
        self._lock_actions.append(('released', result))
 
953
 
 
954
    def _lock_broken(self, result):
 
955
        self._lock_actions.append(('broken', result))
 
956
 
 
957
    def permit_dir(self, name):
 
958
        """Permit a directory to be used by this test. See permit_url."""
 
959
        name_transport = _mod_transport.get_transport(name)
 
960
        self.permit_url(name)
 
961
        self.permit_url(name_transport.base)
 
962
 
 
963
    def permit_url(self, url):
 
964
        """Declare that url is an ok url to use in this test.
 
965
        
 
966
        Do this for memory transports, temporary test directory etc.
 
967
        
 
968
        Do not do this for the current working directory, /tmp, or any other
 
969
        preexisting non isolated url.
 
970
        """
 
971
        if not url.endswith('/'):
 
972
            url += '/'
 
973
        self._bzr_selftest_roots.append(url)
 
974
 
 
975
    def permit_source_tree_branch_repo(self):
 
976
        """Permit the source tree bzr is running from to be opened.
 
977
 
 
978
        Some code such as bzrlib.version attempts to read from the bzr branch
 
979
        that bzr is executing from (if any). This method permits that directory
 
980
        to be used in the test suite.
 
981
        """
 
982
        path = self.get_source_path()
 
983
        self.record_directory_isolation()
 
984
        try:
 
985
            try:
 
986
                workingtree.WorkingTree.open(path)
 
987
            except (errors.NotBranchError, errors.NoWorkingTree):
 
988
                raise TestSkipped('Needs a working tree of bzr sources')
 
989
        finally:
 
990
            self.enable_directory_isolation()
 
991
 
 
992
    def _preopen_isolate_transport(self, transport):
 
993
        """Check that all transport openings are done in the test work area."""
 
994
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
995
            # Unwrap pathfiltered transports
 
996
            transport = transport.server.backing_transport.clone(
 
997
                transport._filter('.'))
 
998
        url = transport.base
 
999
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
1000
        # urls it is given by prepending readonly+. This is appropriate as the
 
1001
        # client shouldn't know that the server is readonly (or not readonly).
 
1002
        # We could register all servers twice, with readonly+ prepending, but
 
1003
        # that makes for a long list; this is about the same but easier to
 
1004
        # read.
 
1005
        if url.startswith('readonly+'):
 
1006
            url = url[len('readonly+'):]
 
1007
        self._preopen_isolate_url(url)
 
1008
 
 
1009
    def _preopen_isolate_url(self, url):
 
1010
        if not self._directory_isolation:
 
1011
            return
 
1012
        if self._directory_isolation == 'record':
 
1013
            self._bzr_selftest_roots.append(url)
 
1014
            return
 
1015
        # This prevents all transports, including e.g. sftp ones backed on disk
 
1016
        # from working unless they are explicitly granted permission. We then
 
1017
        # depend on the code that sets up test transports to check that they are
 
1018
        # appropriately isolated and enable their use by calling
 
1019
        # self.permit_transport()
 
1020
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
1021
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1022
                % (url, self._bzr_selftest_roots))
 
1023
 
 
1024
    def record_directory_isolation(self):
 
1025
        """Gather accessed directories to permit later access.
 
1026
        
 
1027
        This is used for tests that access the branch bzr is running from.
 
1028
        """
 
1029
        self._directory_isolation = "record"
 
1030
 
 
1031
    def start_server(self, transport_server, backing_server=None):
 
1032
        """Start transport_server for this test.
 
1033
 
 
1034
        This starts the server, registers a cleanup for it and permits the
 
1035
        server's urls to be used.
 
1036
        """
 
1037
        if backing_server is None:
 
1038
            transport_server.start_server()
 
1039
        else:
 
1040
            transport_server.start_server(backing_server)
 
1041
        self.addCleanup(transport_server.stop_server)
 
1042
        # Obtain a real transport because if the server supplies a password, it
 
1043
        # will be hidden from the base on the client side.
 
1044
        t = _mod_transport.get_transport(transport_server.get_url())
 
1045
        # Some transport servers effectively chroot the backing transport;
 
1046
        # others like SFTPServer don't - users of the transport can walk up the
 
1047
        # transport to read the entire backing transport. This wouldn't matter
 
1048
        # except that the workdir tests are given - and that they expect the
 
1049
        # server's url to point at - is one directory under the safety net. So
 
1050
        # Branch operations into the transport will attempt to walk up one
 
1051
        # directory. Chrooting all servers would avoid this but also mean that
 
1052
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1053
        # getting the test framework to start the server with a backing server
 
1054
        # at the actual safety net directory would work too, but this then
 
1055
        # means that the self.get_url/self.get_transport methods would need
 
1056
        # to transform all their results. On balance its cleaner to handle it
 
1057
        # here, and permit a higher url when we have one of these transports.
 
1058
        if t.base.endswith('/work/'):
 
1059
            # we have safety net/test root/work
 
1060
            t = t.clone('../..')
 
1061
        elif isinstance(transport_server,
 
1062
                        test_server.SmartTCPServer_for_testing):
 
1063
            # The smart server adds a path similar to work, which is traversed
 
1064
            # up from by the client. But the server is chrooted - the actual
 
1065
            # backing transport is not escaped from, and VFS requests to the
 
1066
            # root will error (because they try to escape the chroot).
 
1067
            t2 = t.clone('..')
 
1068
            while t2.base != t.base:
 
1069
                t = t2
 
1070
                t2 = t.clone('..')
 
1071
        self.permit_url(t.base)
 
1072
 
 
1073
    def _track_transports(self):
 
1074
        """Install checks for transport usage."""
 
1075
        # TestCase has no safe place it can write to.
 
1076
        self._bzr_selftest_roots = []
 
1077
        # Currently the easiest way to be sure that nothing is going on is to
 
1078
        # hook into bzr dir opening. This leaves a small window of error for
 
1079
        # transport tests, but they are well known, and we can improve on this
 
1080
        # step.
 
1081
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1082
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1083
 
 
1084
    def _ndiff_strings(self, a, b):
 
1085
        """Return ndiff between two strings containing lines.
 
1086
 
 
1087
        A trailing newline is added if missing to make the strings
 
1088
        print properly."""
 
1089
        if b and b[-1] != '\n':
 
1090
            b += '\n'
 
1091
        if a and a[-1] != '\n':
 
1092
            a += '\n'
 
1093
        difflines = difflib.ndiff(a.splitlines(True),
 
1094
                                  b.splitlines(True),
 
1095
                                  linejunk=lambda x: False,
 
1096
                                  charjunk=lambda x: False)
 
1097
        return ''.join(difflines)
 
1098
 
 
1099
    def assertEqual(self, a, b, message=''):
 
1100
        try:
 
1101
            if a == b:
 
1102
                return
 
1103
        except UnicodeError, e:
 
1104
            # If we can't compare without getting a UnicodeError, then
 
1105
            # obviously they are different
 
1106
            mutter('UnicodeError: %s', e)
 
1107
        if message:
 
1108
            message += '\n'
 
1109
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1110
            % (message,
 
1111
               pprint.pformat(a), pprint.pformat(b)))
 
1112
 
 
1113
    assertEquals = assertEqual
 
1114
 
 
1115
    def assertEqualDiff(self, a, b, message=None):
 
1116
        """Assert two texts are equal, if not raise an exception.
 
1117
 
 
1118
        This is intended for use with multi-line strings where it can
 
1119
        be hard to find the differences by eye.
 
1120
        """
 
1121
        # TODO: perhaps override assertEquals to call this for strings?
 
1122
        if a == b:
 
1123
            return
 
1124
        if message is None:
 
1125
            message = "texts not equal:\n"
 
1126
        if a + '\n' == b:
 
1127
            message = 'first string is missing a final newline.\n'
 
1128
        if a == b + '\n':
 
1129
            message = 'second string is missing a final newline.\n'
 
1130
        raise AssertionError(message +
 
1131
                             self._ndiff_strings(a, b))
 
1132
 
 
1133
    def assertEqualMode(self, mode, mode_test):
 
1134
        self.assertEqual(mode, mode_test,
 
1135
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1136
 
 
1137
    def assertEqualStat(self, expected, actual):
 
1138
        """assert that expected and actual are the same stat result.
 
1139
 
 
1140
        :param expected: A stat result.
 
1141
        :param actual: A stat result.
 
1142
        :raises AssertionError: If the expected and actual stat values differ
 
1143
            other than by atime.
 
1144
        """
 
1145
        self.assertEqual(expected.st_size, actual.st_size,
 
1146
                         'st_size did not match')
 
1147
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1148
                         'st_mtime did not match')
 
1149
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1150
                         'st_ctime did not match')
 
1151
        if sys.platform != 'win32':
 
1152
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1153
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1154
            # odd. Regardless we shouldn't actually try to assert anything
 
1155
            # about their values
 
1156
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1157
                             'st_dev did not match')
 
1158
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1159
                             'st_ino did not match')
 
1160
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1161
                         'st_mode did not match')
 
1162
 
 
1163
    def assertLength(self, length, obj_with_len):
 
1164
        """Assert that obj_with_len is of length length."""
 
1165
        if len(obj_with_len) != length:
 
1166
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1167
                length, len(obj_with_len), obj_with_len))
 
1168
 
 
1169
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1170
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1171
        """
 
1172
        from bzrlib import trace
 
1173
        captured = []
 
1174
        orig_log_exception_quietly = trace.log_exception_quietly
 
1175
        try:
 
1176
            def capture():
 
1177
                orig_log_exception_quietly()
 
1178
                captured.append(sys.exc_info())
 
1179
            trace.log_exception_quietly = capture
 
1180
            func(*args, **kwargs)
 
1181
        finally:
 
1182
            trace.log_exception_quietly = orig_log_exception_quietly
 
1183
        self.assertLength(1, captured)
 
1184
        err = captured[0][1]
 
1185
        self.assertIsInstance(err, exception_class)
 
1186
        return err
 
1187
 
 
1188
    def assertPositive(self, val):
 
1189
        """Assert that val is greater than 0."""
 
1190
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1191
 
 
1192
    def assertNegative(self, val):
 
1193
        """Assert that val is less than 0."""
 
1194
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1195
 
 
1196
    def assertStartsWith(self, s, prefix):
 
1197
        if not s.startswith(prefix):
 
1198
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1199
 
 
1200
    def assertEndsWith(self, s, suffix):
 
1201
        """Asserts that s ends with suffix."""
 
1202
        if not s.endswith(suffix):
 
1203
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1204
 
 
1205
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1206
        """Assert that a contains something matching a regular expression."""
 
1207
        if not re.search(needle_re, haystack, flags):
 
1208
            if '\n' in haystack or len(haystack) > 60:
 
1209
                # a long string, format it in a more readable way
 
1210
                raise AssertionError(
 
1211
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1212
                        % (needle_re, haystack))
 
1213
            else:
 
1214
                raise AssertionError('pattern "%s" not found in "%s"'
 
1215
                        % (needle_re, haystack))
 
1216
 
 
1217
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1218
        """Assert that a does not match a regular expression"""
 
1219
        if re.search(needle_re, haystack, flags):
 
1220
            raise AssertionError('pattern "%s" found in "%s"'
 
1221
                    % (needle_re, haystack))
 
1222
 
 
1223
    def assertContainsString(self, haystack, needle):
 
1224
        if haystack.find(needle) == -1:
 
1225
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1226
 
 
1227
    def assertSubset(self, sublist, superlist):
 
1228
        """Assert that every entry in sublist is present in superlist."""
 
1229
        missing = set(sublist) - set(superlist)
 
1230
        if len(missing) > 0:
 
1231
            raise AssertionError("value(s) %r not present in container %r" %
 
1232
                                 (missing, superlist))
 
1233
 
 
1234
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1235
        """Fail unless excClass is raised when the iterator from func is used.
 
1236
 
 
1237
        Many functions can return generators this makes sure
 
1238
        to wrap them in a list() call to make sure the whole generator
 
1239
        is run, and that the proper exception is raised.
 
1240
        """
 
1241
        try:
 
1242
            list(func(*args, **kwargs))
 
1243
        except excClass, e:
 
1244
            return e
 
1245
        else:
 
1246
            if getattr(excClass,'__name__', None) is not None:
 
1247
                excName = excClass.__name__
 
1248
            else:
 
1249
                excName = str(excClass)
 
1250
            raise self.failureException, "%s not raised" % excName
 
1251
 
 
1252
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1253
        """Assert that a callable raises a particular exception.
 
1254
 
 
1255
        :param excClass: As for the except statement, this may be either an
 
1256
            exception class, or a tuple of classes.
 
1257
        :param callableObj: A callable, will be passed ``*args`` and
 
1258
            ``**kwargs``.
 
1259
 
 
1260
        Returns the exception so that you can examine it.
 
1261
        """
 
1262
        try:
 
1263
            callableObj(*args, **kwargs)
 
1264
        except excClass, e:
 
1265
            return e
 
1266
        else:
 
1267
            if getattr(excClass,'__name__', None) is not None:
 
1268
                excName = excClass.__name__
 
1269
            else:
 
1270
                # probably a tuple
 
1271
                excName = str(excClass)
 
1272
            raise self.failureException, "%s not raised" % excName
 
1273
 
 
1274
    def assertIs(self, left, right, message=None):
 
1275
        if not (left is right):
 
1276
            if message is not None:
 
1277
                raise AssertionError(message)
 
1278
            else:
 
1279
                raise AssertionError("%r is not %r." % (left, right))
 
1280
 
 
1281
    def assertIsNot(self, left, right, message=None):
 
1282
        if (left is right):
 
1283
            if message is not None:
 
1284
                raise AssertionError(message)
 
1285
            else:
 
1286
                raise AssertionError("%r is %r." % (left, right))
 
1287
 
 
1288
    def assertTransportMode(self, transport, path, mode):
 
1289
        """Fail if a path does not have mode "mode".
 
1290
 
 
1291
        If modes are not supported on this transport, the assertion is ignored.
 
1292
        """
 
1293
        if not transport._can_roundtrip_unix_modebits():
 
1294
            return
 
1295
        path_stat = transport.stat(path)
 
1296
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1297
        self.assertEqual(mode, actual_mode,
 
1298
                         'mode of %r incorrect (%s != %s)'
 
1299
                         % (path, oct(mode), oct(actual_mode)))
 
1300
 
 
1301
    def assertIsSameRealPath(self, path1, path2):
 
1302
        """Fail if path1 and path2 points to different files"""
 
1303
        self.assertEqual(osutils.realpath(path1),
 
1304
                         osutils.realpath(path2),
 
1305
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1306
 
 
1307
    def assertIsInstance(self, obj, kls, msg=None):
 
1308
        """Fail if obj is not an instance of kls
 
1309
        
 
1310
        :param msg: Supplementary message to show if the assertion fails.
 
1311
        """
 
1312
        if not isinstance(obj, kls):
 
1313
            m = "%r is an instance of %s rather than %s" % (
 
1314
                obj, obj.__class__, kls)
 
1315
            if msg:
 
1316
                m += ": " + msg
 
1317
            self.fail(m)
 
1318
 
 
1319
    def assertFileEqual(self, content, path):
 
1320
        """Fail if path does not contain 'content'."""
 
1321
        self.failUnlessExists(path)
 
1322
        f = file(path, 'rb')
 
1323
        try:
 
1324
            s = f.read()
 
1325
        finally:
 
1326
            f.close()
 
1327
        self.assertEqualDiff(content, s)
 
1328
 
 
1329
    def assertDocstring(self, expected_docstring, obj):
 
1330
        """Fail if obj does not have expected_docstring"""
 
1331
        if __doc__ is None:
 
1332
            # With -OO the docstring should be None instead
 
1333
            self.assertIs(obj.__doc__, None)
 
1334
        else:
 
1335
            self.assertEqual(expected_docstring, obj.__doc__)
 
1336
 
 
1337
    def failUnlessExists(self, path):
 
1338
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1339
        if not isinstance(path, basestring):
 
1340
            for p in path:
 
1341
                self.failUnlessExists(p)
 
1342
        else:
 
1343
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1344
 
 
1345
    def failIfExists(self, path):
 
1346
        """Fail if path or paths, which may be abs or relative, exist."""
 
1347
        if not isinstance(path, basestring):
 
1348
            for p in path:
 
1349
                self.failIfExists(p)
 
1350
        else:
 
1351
            self.failIf(osutils.lexists(path),path+" exists")
 
1352
 
 
1353
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1354
        """A helper for callDeprecated and applyDeprecated.
 
1355
 
 
1356
        :param a_callable: A callable to call.
 
1357
        :param args: The positional arguments for the callable
 
1358
        :param kwargs: The keyword arguments for the callable
 
1359
        :return: A tuple (warnings, result). result is the result of calling
 
1360
            a_callable(``*args``, ``**kwargs``).
 
1361
        """
 
1362
        local_warnings = []
 
1363
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1364
            # we've hooked into a deprecation specific callpath,
 
1365
            # only deprecations should getting sent via it.
 
1366
            self.assertEqual(cls, DeprecationWarning)
 
1367
            local_warnings.append(msg)
 
1368
        original_warning_method = symbol_versioning.warn
 
1369
        symbol_versioning.set_warning_method(capture_warnings)
 
1370
        try:
 
1371
            result = a_callable(*args, **kwargs)
 
1372
        finally:
 
1373
            symbol_versioning.set_warning_method(original_warning_method)
 
1374
        return (local_warnings, result)
 
1375
 
 
1376
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1377
        """Call a deprecated callable without warning the user.
 
1378
 
 
1379
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1380
        not other callers that go direct to the warning module.
 
1381
 
 
1382
        To test that a deprecated method raises an error, do something like
 
1383
        this::
 
1384
 
 
1385
            self.assertRaises(errors.ReservedId,
 
1386
                self.applyDeprecated,
 
1387
                deprecated_in((1, 5, 0)),
 
1388
                br.append_revision,
 
1389
                'current:')
 
1390
 
 
1391
        :param deprecation_format: The deprecation format that the callable
 
1392
            should have been deprecated with. This is the same type as the
 
1393
            parameter to deprecated_method/deprecated_function. If the
 
1394
            callable is not deprecated with this format, an assertion error
 
1395
            will be raised.
 
1396
        :param a_callable: A callable to call. This may be a bound method or
 
1397
            a regular function. It will be called with ``*args`` and
 
1398
            ``**kwargs``.
 
1399
        :param args: The positional arguments for the callable
 
1400
        :param kwargs: The keyword arguments for the callable
 
1401
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1402
        """
 
1403
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1404
            *args, **kwargs)
 
1405
        expected_first_warning = symbol_versioning.deprecation_string(
 
1406
            a_callable, deprecation_format)
 
1407
        if len(call_warnings) == 0:
 
1408
            self.fail("No deprecation warning generated by call to %s" %
 
1409
                a_callable)
 
1410
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1411
        return result
 
1412
 
 
1413
    def callCatchWarnings(self, fn, *args, **kw):
 
1414
        """Call a callable that raises python warnings.
 
1415
 
 
1416
        The caller's responsible for examining the returned warnings.
 
1417
 
 
1418
        If the callable raises an exception, the exception is not
 
1419
        caught and propagates up to the caller.  In that case, the list
 
1420
        of warnings is not available.
 
1421
 
 
1422
        :returns: ([warning_object, ...], fn_result)
 
1423
        """
 
1424
        # XXX: This is not perfect, because it completely overrides the
 
1425
        # warnings filters, and some code may depend on suppressing particular
 
1426
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1427
        # though.  -- Andrew, 20071062
 
1428
        wlist = []
 
1429
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1430
            # despite the name, 'message' is normally(?) a Warning subclass
 
1431
            # instance
 
1432
            wlist.append(message)
 
1433
        saved_showwarning = warnings.showwarning
 
1434
        saved_filters = warnings.filters
 
1435
        try:
 
1436
            warnings.showwarning = _catcher
 
1437
            warnings.filters = []
 
1438
            result = fn(*args, **kw)
 
1439
        finally:
 
1440
            warnings.showwarning = saved_showwarning
 
1441
            warnings.filters = saved_filters
 
1442
        return wlist, result
 
1443
 
 
1444
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1445
        """Assert that a callable is deprecated in a particular way.
 
1446
 
 
1447
        This is a very precise test for unusual requirements. The
 
1448
        applyDeprecated helper function is probably more suited for most tests
 
1449
        as it allows you to simply specify the deprecation format being used
 
1450
        and will ensure that that is issued for the function being called.
 
1451
 
 
1452
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1453
        not other callers that go direct to the warning module.  To catch
 
1454
        general warnings, use callCatchWarnings.
 
1455
 
 
1456
        :param expected: a list of the deprecation warnings expected, in order
 
1457
        :param callable: The callable to call
 
1458
        :param args: The positional arguments for the callable
 
1459
        :param kwargs: The keyword arguments for the callable
 
1460
        """
 
1461
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1462
            *args, **kwargs)
 
1463
        self.assertEqual(expected, call_warnings)
 
1464
        return result
 
1465
 
 
1466
    def _startLogFile(self):
 
1467
        """Send bzr and test log messages to a temporary file.
 
1468
 
 
1469
        The file is removed as the test is torn down.
 
1470
        """
 
1471
        self._log_file = StringIO()
 
1472
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1473
        self.addCleanup(self._finishLogFile)
 
1474
 
 
1475
    def _finishLogFile(self):
 
1476
        """Finished with the log file.
 
1477
 
 
1478
        Close the file and delete it, unless setKeepLogfile was called.
 
1479
        """
 
1480
        if bzrlib.trace._trace_file:
 
1481
            # flush the log file, to get all content
 
1482
            bzrlib.trace._trace_file.flush()
 
1483
        bzrlib.trace.pop_log_file(self._log_memento)
 
1484
        # Cache the log result and delete the file on disk
 
1485
        self._get_log(False)
 
1486
 
 
1487
    def thisFailsStrictLockCheck(self):
 
1488
        """It is known that this test would fail with -Dstrict_locks.
 
1489
 
 
1490
        By default, all tests are run with strict lock checking unless
 
1491
        -Edisable_lock_checks is supplied. However there are some tests which
 
1492
        we know fail strict locks at this point that have not been fixed.
 
1493
        They should call this function to disable the strict checking.
 
1494
 
 
1495
        This should be used sparingly, it is much better to fix the locking
 
1496
        issues rather than papering over the problem by calling this function.
 
1497
        """
 
1498
        debug.debug_flags.discard('strict_locks')
 
1499
 
 
1500
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1501
        """Overrides an object attribute restoring it after the test.
 
1502
 
 
1503
        :param obj: The object that will be mutated.
 
1504
 
 
1505
        :param attr_name: The attribute name we want to preserve/override in
 
1506
            the object.
 
1507
 
 
1508
        :param new: The optional value we want to set the attribute to.
 
1509
 
 
1510
        :returns: The actual attr value.
 
1511
        """
 
1512
        value = getattr(obj, attr_name)
 
1513
        # The actual value is captured by the call below
 
1514
        self.addCleanup(setattr, obj, attr_name, value)
 
1515
        if new is not _unitialized_attr:
 
1516
            setattr(obj, attr_name, new)
 
1517
        return value
 
1518
 
 
1519
    def _cleanEnvironment(self):
 
1520
        new_env = {
 
1521
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1522
            'HOME': os.getcwd(),
 
1523
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1524
            # tests do check our impls match APPDATA
 
1525
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1526
            'VISUAL': None,
 
1527
            'EDITOR': None,
 
1528
            'BZR_EMAIL': None,
 
1529
            'BZREMAIL': None, # may still be present in the environment
 
1530
            'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
1531
            'BZR_PROGRESS_BAR': None,
 
1532
            'BZR_LOG': None,
 
1533
            'BZR_PLUGIN_PATH': None,
 
1534
            'BZR_DISABLE_PLUGINS': None,
 
1535
            'BZR_PLUGINS_AT': None,
 
1536
            'BZR_CONCURRENCY': None,
 
1537
            # Make sure that any text ui tests are consistent regardless of
 
1538
            # the environment the test case is run in; you may want tests that
 
1539
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1540
            # going to a pipe or a StringIO.
 
1541
            'TERM': 'dumb',
 
1542
            'LINES': '25',
 
1543
            'COLUMNS': '80',
 
1544
            'BZR_COLUMNS': '80',
 
1545
            # SSH Agent
 
1546
            'SSH_AUTH_SOCK': None,
 
1547
            # Proxies
 
1548
            'http_proxy': None,
 
1549
            'HTTP_PROXY': None,
 
1550
            'https_proxy': None,
 
1551
            'HTTPS_PROXY': None,
 
1552
            'no_proxy': None,
 
1553
            'NO_PROXY': None,
 
1554
            'all_proxy': None,
 
1555
            'ALL_PROXY': None,
 
1556
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1557
            # least. If you do (care), please update this comment
 
1558
            # -- vila 20080401
 
1559
            'ftp_proxy': None,
 
1560
            'FTP_PROXY': None,
 
1561
            'BZR_REMOTE_PATH': None,
 
1562
            # Generally speaking, we don't want apport reporting on crashes in
 
1563
            # the test envirnoment unless we're specifically testing apport,
 
1564
            # so that it doesn't leak into the real system environment.  We
 
1565
            # use an env var so it propagates to subprocesses.
 
1566
            'APPORT_DISABLE': '1',
 
1567
        }
 
1568
        self._old_env = {}
 
1569
        self.addCleanup(self._restoreEnvironment)
 
1570
        for name, value in new_env.iteritems():
 
1571
            self._captureVar(name, value)
 
1572
 
 
1573
    def _captureVar(self, name, newvalue):
 
1574
        """Set an environment variable, and reset it when finished."""
 
1575
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1576
 
 
1577
    def _restoreEnvironment(self):
 
1578
        for name, value in self._old_env.iteritems():
 
1579
            osutils.set_or_unset_env(name, value)
 
1580
 
 
1581
    def _restoreHooks(self):
 
1582
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1583
            setattr(klass, name, hooks)
 
1584
 
 
1585
    def knownFailure(self, reason):
 
1586
        """This test has failed for some known reason."""
 
1587
        raise KnownFailure(reason)
 
1588
 
 
1589
    def _suppress_log(self):
 
1590
        """Remove the log info from details."""
 
1591
        self.discardDetail('log')
 
1592
 
 
1593
    def _do_skip(self, result, reason):
 
1594
        self._suppress_log()
 
1595
        addSkip = getattr(result, 'addSkip', None)
 
1596
        if not callable(addSkip):
 
1597
            result.addSuccess(result)
 
1598
        else:
 
1599
            addSkip(self, reason)
 
1600
 
 
1601
    @staticmethod
 
1602
    def _do_known_failure(self, result, e):
 
1603
        self._suppress_log()
 
1604
        err = sys.exc_info()
 
1605
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1606
        if addExpectedFailure is not None:
 
1607
            addExpectedFailure(self, err)
 
1608
        else:
 
1609
            result.addSuccess(self)
 
1610
 
 
1611
    @staticmethod
 
1612
    def _do_not_applicable(self, result, e):
 
1613
        if not e.args:
 
1614
            reason = 'No reason given'
 
1615
        else:
 
1616
            reason = e.args[0]
 
1617
        self._suppress_log ()
 
1618
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1619
        if addNotApplicable is not None:
 
1620
            result.addNotApplicable(self, reason)
 
1621
        else:
 
1622
            self._do_skip(result, reason)
 
1623
 
 
1624
    @staticmethod
 
1625
    def _report_skip(self, result, err):
 
1626
        """Override the default _report_skip.
 
1627
 
 
1628
        We want to strip the 'log' detail. If we waint until _do_skip, it has
 
1629
        already been formatted into the 'reason' string, and we can't pull it
 
1630
        out again.
 
1631
        """
 
1632
        self._suppress_log()
 
1633
        super(TestCase, self)._report_skip(self, result, err)
 
1634
 
 
1635
    @staticmethod
 
1636
    def _report_expected_failure(self, result, err):
 
1637
        """Strip the log.
 
1638
 
 
1639
        See _report_skip for motivation.
 
1640
        """
 
1641
        self._suppress_log()
 
1642
        super(TestCase, self)._report_expected_failure(self, result, err)
 
1643
 
 
1644
    @staticmethod
 
1645
    def _do_unsupported_or_skip(self, result, e):
 
1646
        reason = e.args[0]
 
1647
        self._suppress_log()
 
1648
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1649
        if addNotSupported is not None:
 
1650
            result.addNotSupported(self, reason)
 
1651
        else:
 
1652
            self._do_skip(result, reason)
 
1653
 
 
1654
    def time(self, callable, *args, **kwargs):
 
1655
        """Run callable and accrue the time it takes to the benchmark time.
 
1656
 
 
1657
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1658
        this will cause lsprofile statistics to be gathered and stored in
 
1659
        self._benchcalls.
 
1660
        """
 
1661
        if self._benchtime is None:
 
1662
            self.addDetail('benchtime', content.Content(content.ContentType(
 
1663
                "text", "plain"), lambda:[str(self._benchtime)]))
 
1664
            self._benchtime = 0
 
1665
        start = time.time()
 
1666
        try:
 
1667
            if not self._gather_lsprof_in_benchmarks:
 
1668
                return callable(*args, **kwargs)
 
1669
            else:
 
1670
                # record this benchmark
 
1671
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1672
                stats.sort()
 
1673
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1674
                return ret
 
1675
        finally:
 
1676
            self._benchtime += time.time() - start
 
1677
 
 
1678
    def log(self, *args):
 
1679
        mutter(*args)
 
1680
 
 
1681
    def _get_log(self, keep_log_file=False):
 
1682
        """Internal helper to get the log from bzrlib.trace for this test.
 
1683
 
 
1684
        Please use self.getDetails, or self.get_log to access this in test case
 
1685
        code.
 
1686
 
 
1687
        :param keep_log_file: When True, if the log is still a file on disk
 
1688
            leave it as a file on disk. When False, if the log is still a file
 
1689
            on disk, the log file is deleted and the log preserved as
 
1690
            self._log_contents.
 
1691
        :return: A string containing the log.
 
1692
        """
 
1693
        if self._log_contents is not None:
 
1694
            try:
 
1695
                self._log_contents.decode('utf8')
 
1696
            except UnicodeDecodeError:
 
1697
                unicodestr = self._log_contents.decode('utf8', 'replace')
 
1698
                self._log_contents = unicodestr.encode('utf8')
 
1699
            return self._log_contents
 
1700
        if self._log_file is not None:
 
1701
            log_contents = self._log_file.getvalue()
 
1702
            try:
 
1703
                log_contents.decode('utf8')
 
1704
            except UnicodeDecodeError:
 
1705
                unicodestr = log_contents.decode('utf8', 'replace')
 
1706
                log_contents = unicodestr.encode('utf8')
 
1707
            if not keep_log_file:
 
1708
                self._log_file = None
 
1709
                # Permit multiple calls to get_log until we clean it up in
 
1710
                # finishLogFile
 
1711
                self._log_contents = log_contents
 
1712
            return log_contents
 
1713
        else:
 
1714
            return "No log file content."
 
1715
 
 
1716
    def get_log(self):
 
1717
        """Get a unicode string containing the log from bzrlib.trace.
 
1718
 
 
1719
        Undecodable characters are replaced.
 
1720
        """
 
1721
        return u"".join(self.getDetails()['log'].iter_text())
 
1722
 
 
1723
    def requireFeature(self, feature):
 
1724
        """This test requires a specific feature is available.
 
1725
 
 
1726
        :raises UnavailableFeature: When feature is not available.
 
1727
        """
 
1728
        if not feature.available():
 
1729
            raise UnavailableFeature(feature)
 
1730
 
 
1731
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1732
            working_dir):
 
1733
        """Run bazaar command line, splitting up a string command line."""
 
1734
        if isinstance(args, basestring):
 
1735
            # shlex don't understand unicode strings,
 
1736
            # so args should be plain string (bialix 20070906)
 
1737
            args = list(shlex.split(str(args)))
 
1738
        return self._run_bzr_core(args, retcode=retcode,
 
1739
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1740
                )
 
1741
 
 
1742
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1743
            working_dir):
 
1744
        # Clear chk_map page cache, because the contents are likely to mask
 
1745
        # locking errors.
 
1746
        chk_map.clear_cache()
 
1747
        if encoding is None:
 
1748
            encoding = osutils.get_user_encoding()
 
1749
        stdout = StringIOWrapper()
 
1750
        stderr = StringIOWrapper()
 
1751
        stdout.encoding = encoding
 
1752
        stderr.encoding = encoding
 
1753
 
 
1754
        self.log('run bzr: %r', args)
 
1755
        # FIXME: don't call into logging here
 
1756
        handler = logging.StreamHandler(stderr)
 
1757
        handler.setLevel(logging.INFO)
 
1758
        logger = logging.getLogger('')
 
1759
        logger.addHandler(handler)
 
1760
        old_ui_factory = ui.ui_factory
 
1761
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1762
 
 
1763
        cwd = None
 
1764
        if working_dir is not None:
 
1765
            cwd = osutils.getcwd()
 
1766
            os.chdir(working_dir)
 
1767
 
 
1768
        try:
 
1769
            try:
 
1770
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1771
                    stdout, stderr,
 
1772
                    bzrlib.commands.run_bzr_catch_user_errors,
 
1773
                    args)
 
1774
            except KeyboardInterrupt:
 
1775
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
1776
                # and stderr as arguments, for tests which are interested in
 
1777
                # stdout and stderr and are expecting the exception.
 
1778
                out = stdout.getvalue()
 
1779
                err = stderr.getvalue()
 
1780
                if out:
 
1781
                    self.log('output:\n%r', out)
 
1782
                if err:
 
1783
                    self.log('errors:\n%r', err)
 
1784
                raise KeyboardInterrupt(out, err)
 
1785
        finally:
 
1786
            logger.removeHandler(handler)
 
1787
            ui.ui_factory = old_ui_factory
 
1788
            if cwd is not None:
 
1789
                os.chdir(cwd)
 
1790
 
 
1791
        out = stdout.getvalue()
 
1792
        err = stderr.getvalue()
 
1793
        if out:
 
1794
            self.log('output:\n%r', out)
 
1795
        if err:
 
1796
            self.log('errors:\n%r', err)
 
1797
        if retcode is not None:
 
1798
            self.assertEquals(retcode, result,
 
1799
                              message='Unexpected return code')
 
1800
        return result, out, err
 
1801
 
 
1802
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1803
                working_dir=None, error_regexes=[], output_encoding=None):
 
1804
        """Invoke bzr, as if it were run from the command line.
 
1805
 
 
1806
        The argument list should not include the bzr program name - the
 
1807
        first argument is normally the bzr command.  Arguments may be
 
1808
        passed in three ways:
 
1809
 
 
1810
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1811
        when the command contains whitespace or metacharacters, or
 
1812
        is built up at run time.
 
1813
 
 
1814
        2- A single string, eg "add a".  This is the most convenient
 
1815
        for hardcoded commands.
 
1816
 
 
1817
        This runs bzr through the interface that catches and reports
 
1818
        errors, and with logging set to something approximating the
 
1819
        default, so that error reporting can be checked.
 
1820
 
 
1821
        This should be the main method for tests that want to exercise the
 
1822
        overall behavior of the bzr application (rather than a unit test
 
1823
        or a functional test of the library.)
 
1824
 
 
1825
        This sends the stdout/stderr results into the test's log,
 
1826
        where it may be useful for debugging.  See also run_captured.
 
1827
 
 
1828
        :keyword stdin: A string to be used as stdin for the command.
 
1829
        :keyword retcode: The status code the command should return;
 
1830
            default 0.
 
1831
        :keyword working_dir: The directory to run the command in
 
1832
        :keyword error_regexes: A list of expected error messages.  If
 
1833
            specified they must be seen in the error output of the command.
 
1834
        """
 
1835
        retcode, out, err = self._run_bzr_autosplit(
 
1836
            args=args,
 
1837
            retcode=retcode,
 
1838
            encoding=encoding,
 
1839
            stdin=stdin,
 
1840
            working_dir=working_dir,
 
1841
            )
 
1842
        self.assertIsInstance(error_regexes, (list, tuple))
 
1843
        for regex in error_regexes:
 
1844
            self.assertContainsRe(err, regex)
 
1845
        return out, err
 
1846
 
 
1847
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1848
        """Run bzr, and check that stderr contains the supplied regexes
 
1849
 
 
1850
        :param error_regexes: Sequence of regular expressions which
 
1851
            must each be found in the error output. The relative ordering
 
1852
            is not enforced.
 
1853
        :param args: command-line arguments for bzr
 
1854
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1855
            This function changes the default value of retcode to be 3,
 
1856
            since in most cases this is run when you expect bzr to fail.
 
1857
 
 
1858
        :return: (out, err) The actual output of running the command (in case
 
1859
            you want to do more inspection)
 
1860
 
 
1861
        Examples of use::
 
1862
 
 
1863
            # Make sure that commit is failing because there is nothing to do
 
1864
            self.run_bzr_error(['no changes to commit'],
 
1865
                               ['commit', '-m', 'my commit comment'])
 
1866
            # Make sure --strict is handling an unknown file, rather than
 
1867
            # giving us the 'nothing to do' error
 
1868
            self.build_tree(['unknown'])
 
1869
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1870
                               ['commit', --strict', '-m', 'my commit comment'])
 
1871
        """
 
1872
        kwargs.setdefault('retcode', 3)
 
1873
        kwargs['error_regexes'] = error_regexes
 
1874
        out, err = self.run_bzr(*args, **kwargs)
 
1875
        return out, err
 
1876
 
 
1877
    def run_bzr_subprocess(self, *args, **kwargs):
 
1878
        """Run bzr in a subprocess for testing.
 
1879
 
 
1880
        This starts a new Python interpreter and runs bzr in there.
 
1881
        This should only be used for tests that have a justifiable need for
 
1882
        this isolation: e.g. they are testing startup time, or signal
 
1883
        handling, or early startup code, etc.  Subprocess code can't be
 
1884
        profiled or debugged so easily.
 
1885
 
 
1886
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1887
            None is supplied, the status code is not checked.
 
1888
        :keyword env_changes: A dictionary which lists changes to environment
 
1889
            variables. A value of None will unset the env variable.
 
1890
            The values must be strings. The change will only occur in the
 
1891
            child, so you don't need to fix the environment after running.
 
1892
        :keyword universal_newlines: Convert CRLF => LF
 
1893
        :keyword allow_plugins: By default the subprocess is run with
 
1894
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1895
            for system-wide plugins to create unexpected output on stderr,
 
1896
            which can cause unnecessary test failures.
 
1897
        """
 
1898
        env_changes = kwargs.get('env_changes', {})
 
1899
        working_dir = kwargs.get('working_dir', None)
 
1900
        allow_plugins = kwargs.get('allow_plugins', False)
 
1901
        if len(args) == 1:
 
1902
            if isinstance(args[0], list):
 
1903
                args = args[0]
 
1904
            elif isinstance(args[0], basestring):
 
1905
                args = list(shlex.split(args[0]))
 
1906
        else:
 
1907
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1908
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1909
                                            working_dir=working_dir,
 
1910
                                            allow_plugins=allow_plugins)
 
1911
        # We distinguish between retcode=None and retcode not passed.
 
1912
        supplied_retcode = kwargs.get('retcode', 0)
 
1913
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1914
            universal_newlines=kwargs.get('universal_newlines', False),
 
1915
            process_args=args)
 
1916
 
 
1917
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1918
                             skip_if_plan_to_signal=False,
 
1919
                             working_dir=None,
 
1920
                             allow_plugins=False):
 
1921
        """Start bzr in a subprocess for testing.
 
1922
 
 
1923
        This starts a new Python interpreter and runs bzr in there.
 
1924
        This should only be used for tests that have a justifiable need for
 
1925
        this isolation: e.g. they are testing startup time, or signal
 
1926
        handling, or early startup code, etc.  Subprocess code can't be
 
1927
        profiled or debugged so easily.
 
1928
 
 
1929
        :param process_args: a list of arguments to pass to the bzr executable,
 
1930
            for example ``['--version']``.
 
1931
        :param env_changes: A dictionary which lists changes to environment
 
1932
            variables. A value of None will unset the env variable.
 
1933
            The values must be strings. The change will only occur in the
 
1934
            child, so you don't need to fix the environment after running.
 
1935
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
 
1936
            doesn't support signalling subprocesses.
 
1937
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1938
 
 
1939
        :returns: Popen object for the started process.
 
1940
        """
 
1941
        if skip_if_plan_to_signal:
 
1942
            if os.name != "posix":
 
1943
                raise TestSkipped("Sending signals not supported")
 
1944
 
 
1945
        if env_changes is None:
 
1946
            env_changes = {}
 
1947
        old_env = {}
 
1948
 
 
1949
        def cleanup_environment():
 
1950
            for env_var, value in env_changes.iteritems():
 
1951
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1952
 
 
1953
        def restore_environment():
 
1954
            for env_var, value in old_env.iteritems():
 
1955
                osutils.set_or_unset_env(env_var, value)
 
1956
 
 
1957
        bzr_path = self.get_bzr_path()
 
1958
 
 
1959
        cwd = None
 
1960
        if working_dir is not None:
 
1961
            cwd = osutils.getcwd()
 
1962
            os.chdir(working_dir)
 
1963
 
 
1964
        try:
 
1965
            # win32 subprocess doesn't support preexec_fn
 
1966
            # so we will avoid using it on all platforms, just to
 
1967
            # make sure the code path is used, and we don't break on win32
 
1968
            cleanup_environment()
 
1969
            command = [sys.executable]
 
1970
            # frozen executables don't need the path to bzr
 
1971
            if getattr(sys, "frozen", None) is None:
 
1972
                command.append(bzr_path)
 
1973
            if not allow_plugins:
 
1974
                command.append('--no-plugins')
 
1975
            command.extend(process_args)
 
1976
            process = self._popen(command, stdin=subprocess.PIPE,
 
1977
                                  stdout=subprocess.PIPE,
 
1978
                                  stderr=subprocess.PIPE)
 
1979
        finally:
 
1980
            restore_environment()
 
1981
            if cwd is not None:
 
1982
                os.chdir(cwd)
 
1983
 
 
1984
        return process
 
1985
 
 
1986
    def _popen(self, *args, **kwargs):
 
1987
        """Place a call to Popen.
 
1988
 
 
1989
        Allows tests to override this method to intercept the calls made to
 
1990
        Popen for introspection.
 
1991
        """
 
1992
        return subprocess.Popen(*args, **kwargs)
 
1993
 
 
1994
    def get_source_path(self):
 
1995
        """Return the path of the directory containing bzrlib."""
 
1996
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
1997
 
 
1998
    def get_bzr_path(self):
 
1999
        """Return the path of the 'bzr' executable for this test suite."""
 
2000
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
2001
        if not os.path.isfile(bzr_path):
 
2002
            # We are probably installed. Assume sys.argv is the right file
 
2003
            bzr_path = sys.argv[0]
 
2004
        return bzr_path
 
2005
 
 
2006
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2007
                              universal_newlines=False, process_args=None):
 
2008
        """Finish the execution of process.
 
2009
 
 
2010
        :param process: the Popen object returned from start_bzr_subprocess.
 
2011
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2012
            None is supplied, the status code is not checked.
 
2013
        :param send_signal: an optional signal to send to the process.
 
2014
        :param universal_newlines: Convert CRLF => LF
 
2015
        :returns: (stdout, stderr)
 
2016
        """
 
2017
        if send_signal is not None:
 
2018
            os.kill(process.pid, send_signal)
 
2019
        out, err = process.communicate()
 
2020
 
 
2021
        if universal_newlines:
 
2022
            out = out.replace('\r\n', '\n')
 
2023
            err = err.replace('\r\n', '\n')
 
2024
 
 
2025
        if retcode is not None and retcode != process.returncode:
 
2026
            if process_args is None:
 
2027
                process_args = "(unknown args)"
 
2028
            mutter('Output of bzr %s:\n%s', process_args, out)
 
2029
            mutter('Error for bzr %s:\n%s', process_args, err)
 
2030
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2031
                      % (process_args, retcode, process.returncode))
 
2032
        return [out, err]
 
2033
 
 
2034
    def check_inventory_shape(self, inv, shape):
 
2035
        """Compare an inventory to a list of expected names.
 
2036
 
 
2037
        Fail if they are not precisely equal.
 
2038
        """
 
2039
        extras = []
 
2040
        shape = list(shape)             # copy
 
2041
        for path, ie in inv.entries():
 
2042
            name = path.replace('\\', '/')
 
2043
            if ie.kind == 'directory':
 
2044
                name = name + '/'
 
2045
            if name in shape:
 
2046
                shape.remove(name)
 
2047
            else:
 
2048
                extras.append(name)
 
2049
        if shape:
 
2050
            self.fail("expected paths not found in inventory: %r" % shape)
 
2051
        if extras:
 
2052
            self.fail("unexpected paths found in inventory: %r" % extras)
 
2053
 
 
2054
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2055
                         a_callable=None, *args, **kwargs):
 
2056
        """Call callable with redirected std io pipes.
 
2057
 
 
2058
        Returns the return code."""
 
2059
        if not callable(a_callable):
 
2060
            raise ValueError("a_callable must be callable.")
 
2061
        if stdin is None:
 
2062
            stdin = StringIO("")
 
2063
        if stdout is None:
 
2064
            if getattr(self, "_log_file", None) is not None:
 
2065
                stdout = self._log_file
 
2066
            else:
 
2067
                stdout = StringIO()
 
2068
        if stderr is None:
 
2069
            if getattr(self, "_log_file", None is not None):
 
2070
                stderr = self._log_file
 
2071
            else:
 
2072
                stderr = StringIO()
 
2073
        real_stdin = sys.stdin
 
2074
        real_stdout = sys.stdout
 
2075
        real_stderr = sys.stderr
 
2076
        try:
 
2077
            sys.stdout = stdout
 
2078
            sys.stderr = stderr
 
2079
            sys.stdin = stdin
 
2080
            return a_callable(*args, **kwargs)
 
2081
        finally:
 
2082
            sys.stdout = real_stdout
 
2083
            sys.stderr = real_stderr
 
2084
            sys.stdin = real_stdin
 
2085
 
 
2086
    def reduceLockdirTimeout(self):
 
2087
        """Reduce the default lock timeout for the duration of the test, so that
 
2088
        if LockContention occurs during a test, it does so quickly.
 
2089
 
 
2090
        Tests that expect to provoke LockContention errors should call this.
 
2091
        """
 
2092
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
 
2093
 
 
2094
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2095
        """Return a StringIOWrapper instance, that will encode Unicode
 
2096
        input to UTF-8.
 
2097
        """
 
2098
        if encoding_type is None:
 
2099
            encoding_type = 'strict'
 
2100
        sio = StringIO()
 
2101
        output_encoding = 'utf-8'
 
2102
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2103
        sio.encoding = output_encoding
 
2104
        return sio
 
2105
 
 
2106
    def disable_verb(self, verb):
 
2107
        """Disable a smart server verb for one test."""
 
2108
        from bzrlib.smart import request
 
2109
        request_handlers = request.request_handlers
 
2110
        orig_method = request_handlers.get(verb)
 
2111
        request_handlers.remove(verb)
 
2112
        self.addCleanup(request_handlers.register, verb, orig_method)
 
2113
 
 
2114
 
 
2115
class CapturedCall(object):
 
2116
    """A helper for capturing smart server calls for easy debug analysis."""
 
2117
 
 
2118
    def __init__(self, params, prefix_length):
 
2119
        """Capture the call with params and skip prefix_length stack frames."""
 
2120
        self.call = params
 
2121
        import traceback
 
2122
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2123
        # client frames. Beyond this we could get more clever, but this is good
 
2124
        # enough for now.
 
2125
        stack = traceback.extract_stack()[prefix_length:-5]
 
2126
        self.stack = ''.join(traceback.format_list(stack))
 
2127
 
 
2128
    def __str__(self):
 
2129
        return self.call.method
 
2130
 
 
2131
    def __repr__(self):
 
2132
        return self.call.method
 
2133
 
 
2134
    def stack(self):
 
2135
        return self.stack
 
2136
 
 
2137
 
 
2138
class TestCaseWithMemoryTransport(TestCase):
 
2139
    """Common test class for tests that do not need disk resources.
 
2140
 
 
2141
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2142
 
 
2143
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2144
 
 
2145
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2146
    a directory which does not exist. This serves to help ensure test isolation
 
2147
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2148
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2149
    file defaults for the transport in tests, nor does it obey the command line
 
2150
    override, so tests that accidentally write to the common directory should
 
2151
    be rare.
 
2152
 
 
2153
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2154
    a .bzr directory that stops us ascending higher into the filesystem.
 
2155
    """
 
2156
 
 
2157
    TEST_ROOT = None
 
2158
    _TEST_NAME = 'test'
 
2159
 
 
2160
    def __init__(self, methodName='runTest'):
 
2161
        # allow test parameterization after test construction and before test
 
2162
        # execution. Variables that the parameterizer sets need to be
 
2163
        # ones that are not set by setUp, or setUp will trash them.
 
2164
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2165
        self.vfs_transport_factory = default_transport
 
2166
        self.transport_server = None
 
2167
        self.transport_readonly_server = None
 
2168
        self.__vfs_server = None
 
2169
 
 
2170
    def get_transport(self, relpath=None):
 
2171
        """Return a writeable transport.
 
2172
 
 
2173
        This transport is for the test scratch space relative to
 
2174
        "self._test_root"
 
2175
 
 
2176
        :param relpath: a path relative to the base url.
 
2177
        """
 
2178
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2179
        self.assertFalse(t.is_readonly())
 
2180
        return t
 
2181
 
 
2182
    def get_readonly_transport(self, relpath=None):
 
2183
        """Return a readonly transport for the test scratch space
 
2184
 
 
2185
        This can be used to test that operations which should only need
 
2186
        readonly access in fact do not try to write.
 
2187
 
 
2188
        :param relpath: a path relative to the base url.
 
2189
        """
 
2190
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2191
        self.assertTrue(t.is_readonly())
 
2192
        return t
 
2193
 
 
2194
    def create_transport_readonly_server(self):
 
2195
        """Create a transport server from class defined at init.
 
2196
 
 
2197
        This is mostly a hook for daughter classes.
 
2198
        """
 
2199
        return self.transport_readonly_server()
 
2200
 
 
2201
    def get_readonly_server(self):
 
2202
        """Get the server instance for the readonly transport
 
2203
 
 
2204
        This is useful for some tests with specific servers to do diagnostics.
 
2205
        """
 
2206
        if self.__readonly_server is None:
 
2207
            if self.transport_readonly_server is None:
 
2208
                # readonly decorator requested
 
2209
                self.__readonly_server = test_server.ReadonlyServer()
 
2210
            else:
 
2211
                # explicit readonly transport.
 
2212
                self.__readonly_server = self.create_transport_readonly_server()
 
2213
            self.start_server(self.__readonly_server,
 
2214
                self.get_vfs_only_server())
 
2215
        return self.__readonly_server
 
2216
 
 
2217
    def get_readonly_url(self, relpath=None):
 
2218
        """Get a URL for the readonly transport.
 
2219
 
 
2220
        This will either be backed by '.' or a decorator to the transport
 
2221
        used by self.get_url()
 
2222
        relpath provides for clients to get a path relative to the base url.
 
2223
        These should only be downwards relative, not upwards.
 
2224
        """
 
2225
        base = self.get_readonly_server().get_url()
 
2226
        return self._adjust_url(base, relpath)
 
2227
 
 
2228
    def get_vfs_only_server(self):
 
2229
        """Get the vfs only read/write server instance.
 
2230
 
 
2231
        This is useful for some tests with specific servers that need
 
2232
        diagnostics.
 
2233
 
 
2234
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2235
        is no means to override it.
 
2236
        """
 
2237
        if self.__vfs_server is None:
 
2238
            self.__vfs_server = memory.MemoryServer()
 
2239
            self.start_server(self.__vfs_server)
 
2240
        return self.__vfs_server
 
2241
 
 
2242
    def get_server(self):
 
2243
        """Get the read/write server instance.
 
2244
 
 
2245
        This is useful for some tests with specific servers that need
 
2246
        diagnostics.
 
2247
 
 
2248
        This is built from the self.transport_server factory. If that is None,
 
2249
        then the self.get_vfs_server is returned.
 
2250
        """
 
2251
        if self.__server is None:
 
2252
            if (self.transport_server is None or self.transport_server is
 
2253
                self.vfs_transport_factory):
 
2254
                self.__server = self.get_vfs_only_server()
 
2255
            else:
 
2256
                # bring up a decorated means of access to the vfs only server.
 
2257
                self.__server = self.transport_server()
 
2258
                self.start_server(self.__server, self.get_vfs_only_server())
 
2259
        return self.__server
 
2260
 
 
2261
    def _adjust_url(self, base, relpath):
 
2262
        """Get a URL (or maybe a path) for the readwrite transport.
 
2263
 
 
2264
        This will either be backed by '.' or to an equivalent non-file based
 
2265
        facility.
 
2266
        relpath provides for clients to get a path relative to the base url.
 
2267
        These should only be downwards relative, not upwards.
 
2268
        """
 
2269
        if relpath is not None and relpath != '.':
 
2270
            if not base.endswith('/'):
 
2271
                base = base + '/'
 
2272
            # XXX: Really base should be a url; we did after all call
 
2273
            # get_url()!  But sometimes it's just a path (from
 
2274
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2275
            # to a non-escaped local path.
 
2276
            if base.startswith('./') or base.startswith('/'):
 
2277
                base += relpath
 
2278
            else:
 
2279
                base += urlutils.escape(relpath)
 
2280
        return base
 
2281
 
 
2282
    def get_url(self, relpath=None):
 
2283
        """Get a URL (or maybe a path) for the readwrite transport.
 
2284
 
 
2285
        This will either be backed by '.' or to an equivalent non-file based
 
2286
        facility.
 
2287
        relpath provides for clients to get a path relative to the base url.
 
2288
        These should only be downwards relative, not upwards.
 
2289
        """
 
2290
        base = self.get_server().get_url()
 
2291
        return self._adjust_url(base, relpath)
 
2292
 
 
2293
    def get_vfs_only_url(self, relpath=None):
 
2294
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2295
 
 
2296
        This will never be a smart protocol.  It always has all the
 
2297
        capabilities of the local filesystem, but it might actually be a
 
2298
        MemoryTransport or some other similar virtual filesystem.
 
2299
 
 
2300
        This is the backing transport (if any) of the server returned by
 
2301
        get_url and get_readonly_url.
 
2302
 
 
2303
        :param relpath: provides for clients to get a path relative to the base
 
2304
            url.  These should only be downwards relative, not upwards.
 
2305
        :return: A URL
 
2306
        """
 
2307
        base = self.get_vfs_only_server().get_url()
 
2308
        return self._adjust_url(base, relpath)
 
2309
 
 
2310
    def _create_safety_net(self):
 
2311
        """Make a fake bzr directory.
 
2312
 
 
2313
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2314
        real branch.
 
2315
        """
 
2316
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2317
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2318
 
 
2319
    def _check_safety_net(self):
 
2320
        """Check that the safety .bzr directory have not been touched.
 
2321
 
 
2322
        _make_test_root have created a .bzr directory to prevent tests from
 
2323
        propagating. This method ensures than a test did not leaked.
 
2324
        """
 
2325
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2326
        self.permit_url(_mod_transport.get_transport(root).base)
 
2327
        wt = workingtree.WorkingTree.open(root)
 
2328
        last_rev = wt.last_revision()
 
2329
        if last_rev != 'null:':
 
2330
            # The current test have modified the /bzr directory, we need to
 
2331
            # recreate a new one or all the followng tests will fail.
 
2332
            # If you need to inspect its content uncomment the following line
 
2333
            # import pdb; pdb.set_trace()
 
2334
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2335
            self._create_safety_net()
 
2336
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2337
 
 
2338
    def _make_test_root(self):
 
2339
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2340
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2341
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2342
                                                    suffix='.tmp'))
 
2343
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2344
 
 
2345
            self._create_safety_net()
 
2346
 
 
2347
            # The same directory is used by all tests, and we're not
 
2348
            # specifically told when all tests are finished.  This will do.
 
2349
            atexit.register(_rmtree_temp_dir, root)
 
2350
 
 
2351
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2352
        self.addCleanup(self._check_safety_net)
 
2353
 
 
2354
    def makeAndChdirToTestDir(self):
 
2355
        """Create a temporary directories for this one test.
 
2356
 
 
2357
        This must set self.test_home_dir and self.test_dir and chdir to
 
2358
        self.test_dir.
 
2359
 
 
2360
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2361
        """
 
2362
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2363
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2364
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2365
        self.permit_dir(self.test_dir)
 
2366
 
 
2367
    def make_branch(self, relpath, format=None):
 
2368
        """Create a branch on the transport at relpath."""
 
2369
        repo = self.make_repository(relpath, format=format)
 
2370
        return repo.bzrdir.create_branch()
 
2371
 
 
2372
    def make_bzrdir(self, relpath, format=None):
 
2373
        try:
 
2374
            # might be a relative or absolute path
 
2375
            maybe_a_url = self.get_url(relpath)
 
2376
            segments = maybe_a_url.rsplit('/', 1)
 
2377
            t = _mod_transport.get_transport(maybe_a_url)
 
2378
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2379
                t.ensure_base()
 
2380
            if format is None:
 
2381
                format = 'default'
 
2382
            if isinstance(format, basestring):
 
2383
                format = bzrdir.format_registry.make_bzrdir(format)
 
2384
            return format.initialize_on_transport(t)
 
2385
        except errors.UninitializableFormat:
 
2386
            raise TestSkipped("Format %s is not initializable." % format)
 
2387
 
 
2388
    def make_repository(self, relpath, shared=False, format=None):
 
2389
        """Create a repository on our default transport at relpath.
 
2390
 
 
2391
        Note that relpath must be a relative path, not a full url.
 
2392
        """
 
2393
        # FIXME: If you create a remoterepository this returns the underlying
 
2394
        # real format, which is incorrect.  Actually we should make sure that
 
2395
        # RemoteBzrDir returns a RemoteRepository.
 
2396
        # maybe  mbp 20070410
 
2397
        made_control = self.make_bzrdir(relpath, format=format)
 
2398
        return made_control.create_repository(shared=shared)
 
2399
 
 
2400
    def make_smart_server(self, path, backing_server=None):
 
2401
        if backing_server is None:
 
2402
            backing_server = self.get_server()
 
2403
        smart_server = test_server.SmartTCPServer_for_testing()
 
2404
        self.start_server(smart_server, backing_server)
 
2405
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
 
2406
                                                   ).clone(path)
 
2407
        return remote_transport
 
2408
 
 
2409
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2410
        """Create a branch on the default transport and a MemoryTree for it."""
 
2411
        b = self.make_branch(relpath, format=format)
 
2412
        return memorytree.MemoryTree.create_on_branch(b)
 
2413
 
 
2414
    def make_branch_builder(self, relpath, format=None):
 
2415
        branch = self.make_branch(relpath, format=format)
 
2416
        return branchbuilder.BranchBuilder(branch=branch)
 
2417
 
 
2418
    def overrideEnvironmentForTesting(self):
 
2419
        test_home_dir = self.test_home_dir
 
2420
        if isinstance(test_home_dir, unicode):
 
2421
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2422
        os.environ['HOME'] = test_home_dir
 
2423
        os.environ['BZR_HOME'] = test_home_dir
 
2424
 
 
2425
    def setUp(self):
 
2426
        super(TestCaseWithMemoryTransport, self).setUp()
 
2427
        # Ensure that ConnectedTransport doesn't leak sockets
 
2428
        def get_transport_with_cleanup(*args, **kwargs):
 
2429
            t = orig_get_transport(*args, **kwargs)
 
2430
            if isinstance(t, _mod_transport.ConnectedTransport):
 
2431
                self.addCleanup(t.disconnect)
 
2432
            return t
 
2433
 
 
2434
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
 
2435
                                               get_transport_with_cleanup)
 
2436
        self._make_test_root()
 
2437
        self.addCleanup(os.chdir, os.getcwdu())
 
2438
        self.makeAndChdirToTestDir()
 
2439
        self.overrideEnvironmentForTesting()
 
2440
        self.__readonly_server = None
 
2441
        self.__server = None
 
2442
        self.reduceLockdirTimeout()
 
2443
 
 
2444
    def setup_smart_server_with_call_log(self):
 
2445
        """Sets up a smart server as the transport server with a call log."""
 
2446
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2447
        self.hpss_calls = []
 
2448
        import traceback
 
2449
        # Skip the current stack down to the caller of
 
2450
        # setup_smart_server_with_call_log
 
2451
        prefix_length = len(traceback.extract_stack()) - 2
 
2452
        def capture_hpss_call(params):
 
2453
            self.hpss_calls.append(
 
2454
                CapturedCall(params, prefix_length))
 
2455
        client._SmartClient.hooks.install_named_hook(
 
2456
            'call', capture_hpss_call, None)
 
2457
 
 
2458
    def reset_smart_call_log(self):
 
2459
        self.hpss_calls = []
 
2460
 
 
2461
 
 
2462
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2463
    """Derived class that runs a test within a temporary directory.
 
2464
 
 
2465
    This is useful for tests that need to create a branch, etc.
 
2466
 
 
2467
    The directory is created in a slightly complex way: for each
 
2468
    Python invocation, a new temporary top-level directory is created.
 
2469
    All test cases create their own directory within that.  If the
 
2470
    tests complete successfully, the directory is removed.
 
2471
 
 
2472
    :ivar test_base_dir: The path of the top-level directory for this
 
2473
    test, which contains a home directory and a work directory.
 
2474
 
 
2475
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2476
    which is used as $HOME for this test.
 
2477
 
 
2478
    :ivar test_dir: A directory under test_base_dir used as the current
 
2479
    directory when the test proper is run.
 
2480
    """
 
2481
 
 
2482
    OVERRIDE_PYTHON = 'python'
 
2483
 
 
2484
    def check_file_contents(self, filename, expect):
 
2485
        self.log("check contents of file %s" % filename)
 
2486
        f = file(filename)
 
2487
        try:
 
2488
            contents = f.read()
 
2489
        finally:
 
2490
            f.close()
 
2491
        if contents != expect:
 
2492
            self.log("expected: %r" % expect)
 
2493
            self.log("actually: %r" % contents)
 
2494
            self.fail("contents of %s not as expected" % filename)
 
2495
 
 
2496
    def _getTestDirPrefix(self):
 
2497
        # create a directory within the top level test directory
 
2498
        if sys.platform in ('win32', 'cygwin'):
 
2499
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2500
            # windows is likely to have path-length limits so use a short name
 
2501
            name_prefix = name_prefix[-30:]
 
2502
        else:
 
2503
            name_prefix = re.sub('[/]', '_', self.id())
 
2504
        return name_prefix
 
2505
 
 
2506
    def makeAndChdirToTestDir(self):
 
2507
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2508
 
 
2509
        For TestCaseInTempDir we create a temporary directory based on the test
 
2510
        name and then create two subdirs - test and home under it.
 
2511
        """
 
2512
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2513
            self._getTestDirPrefix())
 
2514
        name = name_prefix
 
2515
        for i in range(100):
 
2516
            if os.path.exists(name):
 
2517
                name = name_prefix + '_' + str(i)
 
2518
            else:
 
2519
                # now create test and home directories within this dir
 
2520
                self.test_base_dir = name
 
2521
                self.addCleanup(self.deleteTestDir)
 
2522
                os.mkdir(self.test_base_dir)
 
2523
                break
 
2524
        self.permit_dir(self.test_base_dir)
 
2525
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2526
        # stacking policy to honour; create a bzr dir with an unshared
 
2527
        # repository (but not a branch - our code would be trying to escape
 
2528
        # then!) to stop them, and permit it to be read.
 
2529
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2530
        # control.create_repository()
 
2531
        self.test_home_dir = self.test_base_dir + '/home'
 
2532
        os.mkdir(self.test_home_dir)
 
2533
        self.test_dir = self.test_base_dir + '/work'
 
2534
        os.mkdir(self.test_dir)
 
2535
        os.chdir(self.test_dir)
 
2536
        # put name of test inside
 
2537
        f = file(self.test_base_dir + '/name', 'w')
 
2538
        try:
 
2539
            f.write(self.id())
 
2540
        finally:
 
2541
            f.close()
 
2542
 
 
2543
    def deleteTestDir(self):
 
2544
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2545
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2546
 
 
2547
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2548
        """Build a test tree according to a pattern.
 
2549
 
 
2550
        shape is a sequence of file specifications.  If the final
 
2551
        character is '/', a directory is created.
 
2552
 
 
2553
        This assumes that all the elements in the tree being built are new.
 
2554
 
 
2555
        This doesn't add anything to a branch.
 
2556
 
 
2557
        :type shape:    list or tuple.
 
2558
        :param line_endings: Either 'binary' or 'native'
 
2559
            in binary mode, exact contents are written in native mode, the
 
2560
            line endings match the default platform endings.
 
2561
        :param transport: A transport to write to, for building trees on VFS's.
 
2562
            If the transport is readonly or None, "." is opened automatically.
 
2563
        :return: None
 
2564
        """
 
2565
        if type(shape) not in (list, tuple):
 
2566
            raise AssertionError("Parameter 'shape' should be "
 
2567
                "a list or a tuple. Got %r instead" % (shape,))
 
2568
        # It's OK to just create them using forward slashes on windows.
 
2569
        if transport is None or transport.is_readonly():
 
2570
            transport = _mod_transport.get_transport(".")
 
2571
        for name in shape:
 
2572
            self.assertIsInstance(name, basestring)
 
2573
            if name[-1] == '/':
 
2574
                transport.mkdir(urlutils.escape(name[:-1]))
 
2575
            else:
 
2576
                if line_endings == 'binary':
 
2577
                    end = '\n'
 
2578
                elif line_endings == 'native':
 
2579
                    end = os.linesep
 
2580
                else:
 
2581
                    raise errors.BzrError(
 
2582
                        'Invalid line ending request %r' % line_endings)
 
2583
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2584
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2585
 
 
2586
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2587
 
 
2588
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2589
        """Assert whether path or paths are in the WorkingTree"""
 
2590
        if tree is None:
 
2591
            tree = workingtree.WorkingTree.open(root_path)
 
2592
        if not isinstance(path, basestring):
 
2593
            for p in path:
 
2594
                self.assertInWorkingTree(p, tree=tree)
 
2595
        else:
 
2596
            self.assertIsNot(tree.path2id(path), None,
 
2597
                path+' not in working tree.')
 
2598
 
 
2599
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2600
        """Assert whether path or paths are not in the WorkingTree"""
 
2601
        if tree is None:
 
2602
            tree = workingtree.WorkingTree.open(root_path)
 
2603
        if not isinstance(path, basestring):
 
2604
            for p in path:
 
2605
                self.assertNotInWorkingTree(p,tree=tree)
 
2606
        else:
 
2607
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2608
 
 
2609
 
 
2610
class TestCaseWithTransport(TestCaseInTempDir):
 
2611
    """A test case that provides get_url and get_readonly_url facilities.
 
2612
 
 
2613
    These back onto two transport servers, one for readonly access and one for
 
2614
    read write access.
 
2615
 
 
2616
    If no explicit class is provided for readonly access, a
 
2617
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2618
    based read write transports.
 
2619
 
 
2620
    If an explicit class is provided for readonly access, that server and the
 
2621
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2622
    """
 
2623
 
 
2624
    def get_vfs_only_server(self):
 
2625
        """See TestCaseWithMemoryTransport.
 
2626
 
 
2627
        This is useful for some tests with specific servers that need
 
2628
        diagnostics.
 
2629
        """
 
2630
        if self.__vfs_server is None:
 
2631
            self.__vfs_server = self.vfs_transport_factory()
 
2632
            self.start_server(self.__vfs_server)
 
2633
        return self.__vfs_server
 
2634
 
 
2635
    def make_branch_and_tree(self, relpath, format=None):
 
2636
        """Create a branch on the transport and a tree locally.
 
2637
 
 
2638
        If the transport is not a LocalTransport, the Tree can't be created on
 
2639
        the transport.  In that case if the vfs_transport_factory is
 
2640
        LocalURLServer the working tree is created in the local
 
2641
        directory backing the transport, and the returned tree's branch and
 
2642
        repository will also be accessed locally. Otherwise a lightweight
 
2643
        checkout is created and returned.
 
2644
 
 
2645
        We do this because we can't physically create a tree in the local
 
2646
        path, with a branch reference to the transport_factory url, and
 
2647
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2648
        namespace is distinct from the local disk - the two branch objects
 
2649
        would collide. While we could construct a tree with its branch object
 
2650
        pointing at the transport_factory transport in memory, reopening it
 
2651
        would behaving unexpectedly, and has in the past caused testing bugs
 
2652
        when we tried to do it that way.
 
2653
 
 
2654
        :param format: The BzrDirFormat.
 
2655
        :returns: the WorkingTree.
 
2656
        """
 
2657
        # TODO: always use the local disk path for the working tree,
 
2658
        # this obviously requires a format that supports branch references
 
2659
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2660
        # RBC 20060208
 
2661
        b = self.make_branch(relpath, format=format)
 
2662
        try:
 
2663
            return b.bzrdir.create_workingtree()
 
2664
        except errors.NotLocalUrl:
 
2665
            # We can only make working trees locally at the moment.  If the
 
2666
            # transport can't support them, then we keep the non-disk-backed
 
2667
            # branch and create a local checkout.
 
2668
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2669
                # the branch is colocated on disk, we cannot create a checkout.
 
2670
                # hopefully callers will expect this.
 
2671
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2672
                wt = local_controldir.create_workingtree()
 
2673
                if wt.branch._format != b._format:
 
2674
                    wt._branch = b
 
2675
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2676
                    # in case the implementation details of workingtree objects
 
2677
                    # change.
 
2678
                    self.assertIs(b, wt.branch)
 
2679
                return wt
 
2680
            else:
 
2681
                return b.create_checkout(relpath, lightweight=True)
 
2682
 
 
2683
    def assertIsDirectory(self, relpath, transport):
 
2684
        """Assert that relpath within transport is a directory.
 
2685
 
 
2686
        This may not be possible on all transports; in that case it propagates
 
2687
        a TransportNotPossible.
 
2688
        """
 
2689
        try:
 
2690
            mode = transport.stat(relpath).st_mode
 
2691
        except errors.NoSuchFile:
 
2692
            self.fail("path %s is not a directory; no such file"
 
2693
                      % (relpath))
 
2694
        if not stat.S_ISDIR(mode):
 
2695
            self.fail("path %s is not a directory; has mode %#o"
 
2696
                      % (relpath, mode))
 
2697
 
 
2698
    def assertTreesEqual(self, left, right):
 
2699
        """Check that left and right have the same content and properties."""
 
2700
        # we use a tree delta to check for equality of the content, and we
 
2701
        # manually check for equality of other things such as the parents list.
 
2702
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2703
        differences = left.changes_from(right)
 
2704
        self.assertFalse(differences.has_changed(),
 
2705
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2706
 
 
2707
    def setUp(self):
 
2708
        super(TestCaseWithTransport, self).setUp()
 
2709
        self.__vfs_server = None
 
2710
 
 
2711
    def disable_missing_extensions_warning(self):
 
2712
        """Some tests expect a precise stderr content.
 
2713
 
 
2714
        There is no point in forcing them to duplicate the extension related
 
2715
        warning.
 
2716
        """
 
2717
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
2718
 
 
2719
 
 
2720
class ChrootedTestCase(TestCaseWithTransport):
 
2721
    """A support class that provides readonly urls outside the local namespace.
 
2722
 
 
2723
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2724
    is then we are chrooted already, if it is not then an HttpServer is used
 
2725
    for readonly urls.
 
2726
 
 
2727
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2728
                       be used without needed to redo it when a different
 
2729
                       subclass is in use ?
 
2730
    """
 
2731
 
 
2732
    def setUp(self):
 
2733
        from bzrlib.tests import http_server
 
2734
        super(ChrootedTestCase, self).setUp()
 
2735
        if not self.vfs_transport_factory == memory.MemoryServer:
 
2736
            self.transport_readonly_server = http_server.HttpServer
 
2737
 
 
2738
 
 
2739
def condition_id_re(pattern):
 
2740
    """Create a condition filter which performs a re check on a test's id.
 
2741
 
 
2742
    :param pattern: A regular expression string.
 
2743
    :return: A callable that returns True if the re matches.
 
2744
    """
 
2745
    filter_re = re.compile(pattern, 0)
 
2746
    def condition(test):
 
2747
        test_id = test.id()
 
2748
        return filter_re.search(test_id)
 
2749
    return condition
 
2750
 
 
2751
 
 
2752
def condition_isinstance(klass_or_klass_list):
 
2753
    """Create a condition filter which returns isinstance(param, klass).
 
2754
 
 
2755
    :return: A callable which when called with one parameter obj return the
 
2756
        result of isinstance(obj, klass_or_klass_list).
 
2757
    """
 
2758
    def condition(obj):
 
2759
        return isinstance(obj, klass_or_klass_list)
 
2760
    return condition
 
2761
 
 
2762
 
 
2763
def condition_id_in_list(id_list):
 
2764
    """Create a condition filter which verify that test's id in a list.
 
2765
 
 
2766
    :param id_list: A TestIdList object.
 
2767
    :return: A callable that returns True if the test's id appears in the list.
 
2768
    """
 
2769
    def condition(test):
 
2770
        return id_list.includes(test.id())
 
2771
    return condition
 
2772
 
 
2773
 
 
2774
def condition_id_startswith(starts):
 
2775
    """Create a condition filter verifying that test's id starts with a string.
 
2776
 
 
2777
    :param starts: A list of string.
 
2778
    :return: A callable that returns True if the test's id starts with one of
 
2779
        the given strings.
 
2780
    """
 
2781
    def condition(test):
 
2782
        for start in starts:
 
2783
            if test.id().startswith(start):
 
2784
                return True
 
2785
        return False
 
2786
    return condition
 
2787
 
 
2788
 
 
2789
def exclude_tests_by_condition(suite, condition):
 
2790
    """Create a test suite which excludes some tests from suite.
 
2791
 
 
2792
    :param suite: The suite to get tests from.
 
2793
    :param condition: A callable whose result evaluates True when called with a
 
2794
        test case which should be excluded from the result.
 
2795
    :return: A suite which contains the tests found in suite that fail
 
2796
        condition.
 
2797
    """
 
2798
    result = []
 
2799
    for test in iter_suite_tests(suite):
 
2800
        if not condition(test):
 
2801
            result.append(test)
 
2802
    return TestUtil.TestSuite(result)
 
2803
 
 
2804
 
 
2805
def filter_suite_by_condition(suite, condition):
 
2806
    """Create a test suite by filtering another one.
 
2807
 
 
2808
    :param suite: The source suite.
 
2809
    :param condition: A callable whose result evaluates True when called with a
 
2810
        test case which should be included in the result.
 
2811
    :return: A suite which contains the tests found in suite that pass
 
2812
        condition.
 
2813
    """
 
2814
    result = []
 
2815
    for test in iter_suite_tests(suite):
 
2816
        if condition(test):
 
2817
            result.append(test)
 
2818
    return TestUtil.TestSuite(result)
 
2819
 
 
2820
 
 
2821
def filter_suite_by_re(suite, pattern):
 
2822
    """Create a test suite by filtering another one.
 
2823
 
 
2824
    :param suite:           the source suite
 
2825
    :param pattern:         pattern that names must match
 
2826
    :returns: the newly created suite
 
2827
    """
 
2828
    condition = condition_id_re(pattern)
 
2829
    result_suite = filter_suite_by_condition(suite, condition)
 
2830
    return result_suite
 
2831
 
 
2832
 
 
2833
def filter_suite_by_id_list(suite, test_id_list):
 
2834
    """Create a test suite by filtering another one.
 
2835
 
 
2836
    :param suite: The source suite.
 
2837
    :param test_id_list: A list of the test ids to keep as strings.
 
2838
    :returns: the newly created suite
 
2839
    """
 
2840
    condition = condition_id_in_list(test_id_list)
 
2841
    result_suite = filter_suite_by_condition(suite, condition)
 
2842
    return result_suite
 
2843
 
 
2844
 
 
2845
def filter_suite_by_id_startswith(suite, start):
 
2846
    """Create a test suite by filtering another one.
 
2847
 
 
2848
    :param suite: The source suite.
 
2849
    :param start: A list of string the test id must start with one of.
 
2850
    :returns: the newly created suite
 
2851
    """
 
2852
    condition = condition_id_startswith(start)
 
2853
    result_suite = filter_suite_by_condition(suite, condition)
 
2854
    return result_suite
 
2855
 
 
2856
 
 
2857
def exclude_tests_by_re(suite, pattern):
 
2858
    """Create a test suite which excludes some tests from suite.
 
2859
 
 
2860
    :param suite: The suite to get tests from.
 
2861
    :param pattern: A regular expression string. Test ids that match this
 
2862
        pattern will be excluded from the result.
 
2863
    :return: A TestSuite that contains all the tests from suite without the
 
2864
        tests that matched pattern. The order of tests is the same as it was in
 
2865
        suite.
 
2866
    """
 
2867
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2868
 
 
2869
 
 
2870
def preserve_input(something):
 
2871
    """A helper for performing test suite transformation chains.
 
2872
 
 
2873
    :param something: Anything you want to preserve.
 
2874
    :return: Something.
 
2875
    """
 
2876
    return something
 
2877
 
 
2878
 
 
2879
def randomize_suite(suite):
 
2880
    """Return a new TestSuite with suite's tests in random order.
 
2881
 
 
2882
    The tests in the input suite are flattened into a single suite in order to
 
2883
    accomplish this. Any nested TestSuites are removed to provide global
 
2884
    randomness.
 
2885
    """
 
2886
    tests = list(iter_suite_tests(suite))
 
2887
    random.shuffle(tests)
 
2888
    return TestUtil.TestSuite(tests)
 
2889
 
 
2890
 
 
2891
def split_suite_by_condition(suite, condition):
 
2892
    """Split a test suite into two by a condition.
 
2893
 
 
2894
    :param suite: The suite to split.
 
2895
    :param condition: The condition to match on. Tests that match this
 
2896
        condition are returned in the first test suite, ones that do not match
 
2897
        are in the second suite.
 
2898
    :return: A tuple of two test suites, where the first contains tests from
 
2899
        suite matching the condition, and the second contains the remainder
 
2900
        from suite. The order within each output suite is the same as it was in
 
2901
        suite.
 
2902
    """
 
2903
    matched = []
 
2904
    did_not_match = []
 
2905
    for test in iter_suite_tests(suite):
 
2906
        if condition(test):
 
2907
            matched.append(test)
 
2908
        else:
 
2909
            did_not_match.append(test)
 
2910
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2911
 
 
2912
 
 
2913
def split_suite_by_re(suite, pattern):
 
2914
    """Split a test suite into two by a regular expression.
 
2915
 
 
2916
    :param suite: The suite to split.
 
2917
    :param pattern: A regular expression string. Test ids that match this
 
2918
        pattern will be in the first test suite returned, and the others in the
 
2919
        second test suite returned.
 
2920
    :return: A tuple of two test suites, where the first contains tests from
 
2921
        suite matching pattern, and the second contains the remainder from
 
2922
        suite. The order within each output suite is the same as it was in
 
2923
        suite.
 
2924
    """
 
2925
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2926
 
 
2927
 
 
2928
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2929
              stop_on_failure=False,
 
2930
              transport=None, lsprof_timed=None, bench_history=None,
 
2931
              matching_tests_first=None,
 
2932
              list_only=False,
 
2933
              random_seed=None,
 
2934
              exclude_pattern=None,
 
2935
              strict=False,
 
2936
              runner_class=None,
 
2937
              suite_decorators=None,
 
2938
              stream=None,
 
2939
              result_decorators=None,
 
2940
              ):
 
2941
    """Run a test suite for bzr selftest.
 
2942
 
 
2943
    :param runner_class: The class of runner to use. Must support the
 
2944
        constructor arguments passed by run_suite which are more than standard
 
2945
        python uses.
 
2946
    :return: A boolean indicating success.
 
2947
    """
 
2948
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2949
    if verbose:
 
2950
        verbosity = 2
 
2951
    else:
 
2952
        verbosity = 1
 
2953
    if runner_class is None:
 
2954
        runner_class = TextTestRunner
 
2955
    if stream is None:
 
2956
        stream = sys.stdout
 
2957
    runner = runner_class(stream=stream,
 
2958
                            descriptions=0,
 
2959
                            verbosity=verbosity,
 
2960
                            bench_history=bench_history,
 
2961
                            strict=strict,
 
2962
                            result_decorators=result_decorators,
 
2963
                            )
 
2964
    runner.stop_on_failure=stop_on_failure
 
2965
    # built in decorator factories:
 
2966
    decorators = [
 
2967
        random_order(random_seed, runner),
 
2968
        exclude_tests(exclude_pattern),
 
2969
        ]
 
2970
    if matching_tests_first:
 
2971
        decorators.append(tests_first(pattern))
 
2972
    else:
 
2973
        decorators.append(filter_tests(pattern))
 
2974
    if suite_decorators:
 
2975
        decorators.extend(suite_decorators)
 
2976
    # tell the result object how many tests will be running: (except if
 
2977
    # --parallel=fork is being used. Robert said he will provide a better
 
2978
    # progress design later -- vila 20090817)
 
2979
    if fork_decorator not in decorators:
 
2980
        decorators.append(CountingDecorator)
 
2981
    for decorator in decorators:
 
2982
        suite = decorator(suite)
 
2983
    if list_only:
 
2984
        # Done after test suite decoration to allow randomisation etc
 
2985
        # to take effect, though that is of marginal benefit.
 
2986
        if verbosity >= 2:
 
2987
            stream.write("Listing tests only ...\n")
 
2988
        for t in iter_suite_tests(suite):
 
2989
            stream.write("%s\n" % (t.id()))
 
2990
        return True
 
2991
    result = runner.run(suite)
 
2992
    if strict:
 
2993
        return result.wasStrictlySuccessful()
 
2994
    else:
 
2995
        return result.wasSuccessful()
 
2996
 
 
2997
 
 
2998
# A registry where get() returns a suite decorator.
 
2999
parallel_registry = registry.Registry()
 
3000
 
 
3001
 
 
3002
def fork_decorator(suite):
 
3003
    if getattr(os, "fork", None) is None:
 
3004
        raise errors.BzrCommandError("platform does not support fork,"
 
3005
            " try --parallel=subprocess instead.")
 
3006
    concurrency = osutils.local_concurrency()
 
3007
    if concurrency == 1:
 
3008
        return suite
 
3009
    from testtools import ConcurrentTestSuite
 
3010
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3011
parallel_registry.register('fork', fork_decorator)
 
3012
 
 
3013
 
 
3014
def subprocess_decorator(suite):
 
3015
    concurrency = osutils.local_concurrency()
 
3016
    if concurrency == 1:
 
3017
        return suite
 
3018
    from testtools import ConcurrentTestSuite
 
3019
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3020
parallel_registry.register('subprocess', subprocess_decorator)
 
3021
 
 
3022
 
 
3023
def exclude_tests(exclude_pattern):
 
3024
    """Return a test suite decorator that excludes tests."""
 
3025
    if exclude_pattern is None:
 
3026
        return identity_decorator
 
3027
    def decorator(suite):
 
3028
        return ExcludeDecorator(suite, exclude_pattern)
 
3029
    return decorator
 
3030
 
 
3031
 
 
3032
def filter_tests(pattern):
 
3033
    if pattern == '.*':
 
3034
        return identity_decorator
 
3035
    def decorator(suite):
 
3036
        return FilterTestsDecorator(suite, pattern)
 
3037
    return decorator
 
3038
 
 
3039
 
 
3040
def random_order(random_seed, runner):
 
3041
    """Return a test suite decorator factory for randomising tests order.
 
3042
    
 
3043
    :param random_seed: now, a string which casts to a long, or a long.
 
3044
    :param runner: A test runner with a stream attribute to report on.
 
3045
    """
 
3046
    if random_seed is None:
 
3047
        return identity_decorator
 
3048
    def decorator(suite):
 
3049
        return RandomDecorator(suite, random_seed, runner.stream)
 
3050
    return decorator
 
3051
 
 
3052
 
 
3053
def tests_first(pattern):
 
3054
    if pattern == '.*':
 
3055
        return identity_decorator
 
3056
    def decorator(suite):
 
3057
        return TestFirstDecorator(suite, pattern)
 
3058
    return decorator
 
3059
 
 
3060
 
 
3061
def identity_decorator(suite):
 
3062
    """Return suite."""
 
3063
    return suite
 
3064
 
 
3065
 
 
3066
class TestDecorator(TestUtil.TestSuite):
 
3067
    """A decorator for TestCase/TestSuite objects.
 
3068
    
 
3069
    Usually, subclasses should override __iter__(used when flattening test
 
3070
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
3071
    debug().
 
3072
    """
 
3073
 
 
3074
    def __init__(self, suite):
 
3075
        TestUtil.TestSuite.__init__(self)
 
3076
        self.addTest(suite)
 
3077
 
 
3078
    def countTestCases(self):
 
3079
        cases = 0
 
3080
        for test in self:
 
3081
            cases += test.countTestCases()
 
3082
        return cases
 
3083
 
 
3084
    def debug(self):
 
3085
        for test in self:
 
3086
            test.debug()
 
3087
 
 
3088
    def run(self, result):
 
3089
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
3090
        # into __iter__.
 
3091
        for test in self:
 
3092
            if result.shouldStop:
 
3093
                break
 
3094
            test.run(result)
 
3095
        return result
 
3096
 
 
3097
 
 
3098
class CountingDecorator(TestDecorator):
 
3099
    """A decorator which calls result.progress(self.countTestCases)."""
 
3100
 
 
3101
    def run(self, result):
 
3102
        progress_method = getattr(result, 'progress', None)
 
3103
        if callable(progress_method):
 
3104
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3105
        return super(CountingDecorator, self).run(result)
 
3106
 
 
3107
 
 
3108
class ExcludeDecorator(TestDecorator):
 
3109
    """A decorator which excludes test matching an exclude pattern."""
 
3110
 
 
3111
    def __init__(self, suite, exclude_pattern):
 
3112
        TestDecorator.__init__(self, suite)
 
3113
        self.exclude_pattern = exclude_pattern
 
3114
        self.excluded = False
 
3115
 
 
3116
    def __iter__(self):
 
3117
        if self.excluded:
 
3118
            return iter(self._tests)
 
3119
        self.excluded = True
 
3120
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
3121
        del self._tests[:]
 
3122
        self.addTests(suite)
 
3123
        return iter(self._tests)
 
3124
 
 
3125
 
 
3126
class FilterTestsDecorator(TestDecorator):
 
3127
    """A decorator which filters tests to those matching a pattern."""
 
3128
 
 
3129
    def __init__(self, suite, pattern):
 
3130
        TestDecorator.__init__(self, suite)
 
3131
        self.pattern = pattern
 
3132
        self.filtered = False
 
3133
 
 
3134
    def __iter__(self):
 
3135
        if self.filtered:
 
3136
            return iter(self._tests)
 
3137
        self.filtered = True
 
3138
        suite = filter_suite_by_re(self, self.pattern)
 
3139
        del self._tests[:]
 
3140
        self.addTests(suite)
 
3141
        return iter(self._tests)
 
3142
 
 
3143
 
 
3144
class RandomDecorator(TestDecorator):
 
3145
    """A decorator which randomises the order of its tests."""
 
3146
 
 
3147
    def __init__(self, suite, random_seed, stream):
 
3148
        TestDecorator.__init__(self, suite)
 
3149
        self.random_seed = random_seed
 
3150
        self.randomised = False
 
3151
        self.stream = stream
 
3152
 
 
3153
    def __iter__(self):
 
3154
        if self.randomised:
 
3155
            return iter(self._tests)
 
3156
        self.randomised = True
 
3157
        self.stream.write("Randomizing test order using seed %s\n\n" %
 
3158
            (self.actual_seed()))
 
3159
        # Initialise the random number generator.
 
3160
        random.seed(self.actual_seed())
 
3161
        suite = randomize_suite(self)
 
3162
        del self._tests[:]
 
3163
        self.addTests(suite)
 
3164
        return iter(self._tests)
 
3165
 
 
3166
    def actual_seed(self):
 
3167
        if self.random_seed == "now":
 
3168
            # We convert the seed to a long to make it reuseable across
 
3169
            # invocations (because the user can reenter it).
 
3170
            self.random_seed = long(time.time())
 
3171
        else:
 
3172
            # Convert the seed to a long if we can
 
3173
            try:
 
3174
                self.random_seed = long(self.random_seed)
 
3175
            except:
 
3176
                pass
 
3177
        return self.random_seed
 
3178
 
 
3179
 
 
3180
class TestFirstDecorator(TestDecorator):
 
3181
    """A decorator which moves named tests to the front."""
 
3182
 
 
3183
    def __init__(self, suite, pattern):
 
3184
        TestDecorator.__init__(self, suite)
 
3185
        self.pattern = pattern
 
3186
        self.filtered = False
 
3187
 
 
3188
    def __iter__(self):
 
3189
        if self.filtered:
 
3190
            return iter(self._tests)
 
3191
        self.filtered = True
 
3192
        suites = split_suite_by_re(self, self.pattern)
 
3193
        del self._tests[:]
 
3194
        self.addTests(suites)
 
3195
        return iter(self._tests)
 
3196
 
 
3197
 
 
3198
def partition_tests(suite, count):
 
3199
    """Partition suite into count lists of tests."""
 
3200
    # This just assigns tests in a round-robin fashion.  On one hand this
 
3201
    # splits up blocks of related tests that might run faster if they shared
 
3202
    # resources, but on the other it avoids assigning blocks of slow tests to
 
3203
    # just one partition.  So the slowest partition shouldn't be much slower
 
3204
    # than the fastest.
 
3205
    partitions = [list() for i in range(count)]
 
3206
    tests = iter_suite_tests(suite)
 
3207
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
 
3208
        partition.append(test)
 
3209
    return partitions
 
3210
 
 
3211
 
 
3212
def workaround_zealous_crypto_random():
 
3213
    """Crypto.Random want to help us being secure, but we don't care here.
 
3214
 
 
3215
    This workaround some test failure related to the sftp server. Once paramiko
 
3216
    stop using the controversial API in Crypto.Random, we may get rid of it.
 
3217
    """
 
3218
    try:
 
3219
        from Crypto.Random import atfork
 
3220
        atfork()
 
3221
    except ImportError:
 
3222
        pass
 
3223
 
 
3224
 
 
3225
def fork_for_tests(suite):
 
3226
    """Take suite and start up one runner per CPU by forking()
 
3227
 
 
3228
    :return: An iterable of TestCase-like objects which can each have
 
3229
        run(result) called on them to feed tests to result.
 
3230
    """
 
3231
    concurrency = osutils.local_concurrency()
 
3232
    result = []
 
3233
    from subunit import TestProtocolClient, ProtocolTestCase
 
3234
    from subunit.test_results import AutoTimingTestResultDecorator
 
3235
    class TestInOtherProcess(ProtocolTestCase):
 
3236
        # Should be in subunit, I think. RBC.
 
3237
        def __init__(self, stream, pid):
 
3238
            ProtocolTestCase.__init__(self, stream)
 
3239
            self.pid = pid
 
3240
 
 
3241
        def run(self, result):
 
3242
            try:
 
3243
                ProtocolTestCase.run(self, result)
 
3244
            finally:
 
3245
                os.waitpid(self.pid, 0)
 
3246
 
 
3247
    test_blocks = partition_tests(suite, concurrency)
 
3248
    for process_tests in test_blocks:
 
3249
        process_suite = TestUtil.TestSuite()
 
3250
        process_suite.addTests(process_tests)
 
3251
        c2pread, c2pwrite = os.pipe()
 
3252
        pid = os.fork()
 
3253
        if pid == 0:
 
3254
            workaround_zealous_crypto_random()
 
3255
            try:
 
3256
                os.close(c2pread)
 
3257
                # Leave stderr and stdout open so we can see test noise
 
3258
                # Close stdin so that the child goes away if it decides to
 
3259
                # read from stdin (otherwise its a roulette to see what
 
3260
                # child actually gets keystrokes for pdb etc).
 
3261
                sys.stdin.close()
 
3262
                sys.stdin = None
 
3263
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3264
                subunit_result = AutoTimingTestResultDecorator(
 
3265
                    TestProtocolClient(stream))
 
3266
                process_suite.run(subunit_result)
 
3267
            finally:
 
3268
                os._exit(0)
 
3269
        else:
 
3270
            os.close(c2pwrite)
 
3271
            stream = os.fdopen(c2pread, 'rb', 1)
 
3272
            test = TestInOtherProcess(stream, pid)
 
3273
            result.append(test)
 
3274
    return result
 
3275
 
 
3276
 
 
3277
def reinvoke_for_tests(suite):
 
3278
    """Take suite and start up one runner per CPU using subprocess().
 
3279
 
 
3280
    :return: An iterable of TestCase-like objects which can each have
 
3281
        run(result) called on them to feed tests to result.
 
3282
    """
 
3283
    concurrency = osutils.local_concurrency()
 
3284
    result = []
 
3285
    from subunit import ProtocolTestCase
 
3286
    class TestInSubprocess(ProtocolTestCase):
 
3287
        def __init__(self, process, name):
 
3288
            ProtocolTestCase.__init__(self, process.stdout)
 
3289
            self.process = process
 
3290
            self.process.stdin.close()
 
3291
            self.name = name
 
3292
 
 
3293
        def run(self, result):
 
3294
            try:
 
3295
                ProtocolTestCase.run(self, result)
 
3296
            finally:
 
3297
                self.process.wait()
 
3298
                os.unlink(self.name)
 
3299
            # print "pid %d finished" % finished_process
 
3300
    test_blocks = partition_tests(suite, concurrency)
 
3301
    for process_tests in test_blocks:
 
3302
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3303
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3304
        if not os.path.isfile(bzr_path):
 
3305
            # We are probably installed. Assume sys.argv is the right file
 
3306
            bzr_path = sys.argv[0]
 
3307
        bzr_path = [bzr_path]
 
3308
        if sys.platform == "win32":
 
3309
            # if we're on windows, we can't execute the bzr script directly
 
3310
            bzr_path = [sys.executable] + bzr_path
 
3311
        fd, test_list_file_name = tempfile.mkstemp()
 
3312
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3313
        for test in process_tests:
 
3314
            test_list_file.write(test.id() + '\n')
 
3315
        test_list_file.close()
 
3316
        try:
 
3317
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3318
                '--subunit']
 
3319
            if '--no-plugins' in sys.argv:
 
3320
                argv.append('--no-plugins')
 
3321
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
 
3322
            # noise on stderr it can interrupt the subunit protocol.
 
3323
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
 
3324
                                      stdout=subprocess.PIPE,
 
3325
                                      stderr=subprocess.PIPE,
 
3326
                                      bufsize=1)
 
3327
            test = TestInSubprocess(process, test_list_file_name)
 
3328
            result.append(test)
 
3329
        except:
 
3330
            os.unlink(test_list_file_name)
 
3331
            raise
 
3332
    return result
 
3333
 
 
3334
 
 
3335
class ForwardingResult(unittest.TestResult):
 
3336
 
 
3337
    def __init__(self, target):
 
3338
        unittest.TestResult.__init__(self)
 
3339
        self.result = target
 
3340
 
 
3341
    def startTest(self, test):
 
3342
        self.result.startTest(test)
 
3343
 
 
3344
    def stopTest(self, test):
 
3345
        self.result.stopTest(test)
 
3346
 
 
3347
    def startTestRun(self):
 
3348
        self.result.startTestRun()
 
3349
 
 
3350
    def stopTestRun(self):
 
3351
        self.result.stopTestRun()
 
3352
 
 
3353
    def addSkip(self, test, reason):
 
3354
        self.result.addSkip(test, reason)
 
3355
 
 
3356
    def addSuccess(self, test):
 
3357
        self.result.addSuccess(test)
 
3358
 
 
3359
    def addError(self, test, err):
 
3360
        self.result.addError(test, err)
 
3361
 
 
3362
    def addFailure(self, test, err):
 
3363
        self.result.addFailure(test, err)
 
3364
ForwardingResult = testtools.ExtendedToOriginalDecorator
 
3365
 
 
3366
 
 
3367
class ProfileResult(ForwardingResult):
 
3368
    """Generate profiling data for all activity between start and success.
 
3369
    
 
3370
    The profile data is appended to the test's _benchcalls attribute and can
 
3371
    be accessed by the forwarded-to TestResult.
 
3372
 
 
3373
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3374
    where our existing output support for lsprof is, and this class aims to
 
3375
    fit in with that: while it could be moved it's not necessary to accomplish
 
3376
    test profiling, nor would it be dramatically cleaner.
 
3377
    """
 
3378
 
 
3379
    def startTest(self, test):
 
3380
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3381
        # Prevent deadlocks in tests that use lsprof: those tests will
 
3382
        # unavoidably fail.
 
3383
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
 
3384
        self.profiler.start()
 
3385
        ForwardingResult.startTest(self, test)
 
3386
 
 
3387
    def addSuccess(self, test):
 
3388
        stats = self.profiler.stop()
 
3389
        try:
 
3390
            calls = test._benchcalls
 
3391
        except AttributeError:
 
3392
            test._benchcalls = []
 
3393
            calls = test._benchcalls
 
3394
        calls.append(((test.id(), "", ""), stats))
 
3395
        ForwardingResult.addSuccess(self, test)
 
3396
 
 
3397
    def stopTest(self, test):
 
3398
        ForwardingResult.stopTest(self, test)
 
3399
        self.profiler = None
 
3400
 
 
3401
 
 
3402
# Controlled by "bzr selftest -E=..." option
 
3403
# Currently supported:
 
3404
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3405
#                           preserves any flags supplied at the command line.
 
3406
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3407
#                           rather than failing tests. And no longer raise
 
3408
#                           LockContention when fctnl locks are not being used
 
3409
#                           with proper exclusion rules.
 
3410
#   -Ethreads               Will display thread ident at creation/join time to
 
3411
#                           help track thread leaks
 
3412
selftest_debug_flags = set()
 
3413
 
 
3414
 
 
3415
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3416
             transport=None,
 
3417
             test_suite_factory=None,
 
3418
             lsprof_timed=None,
 
3419
             bench_history=None,
 
3420
             matching_tests_first=None,
 
3421
             list_only=False,
 
3422
             random_seed=None,
 
3423
             exclude_pattern=None,
 
3424
             strict=False,
 
3425
             load_list=None,
 
3426
             debug_flags=None,
 
3427
             starting_with=None,
 
3428
             runner_class=None,
 
3429
             suite_decorators=None,
 
3430
             stream=None,
 
3431
             lsprof_tests=False,
 
3432
             ):
 
3433
    """Run the whole test suite under the enhanced runner"""
 
3434
    # XXX: Very ugly way to do this...
 
3435
    # Disable warning about old formats because we don't want it to disturb
 
3436
    # any blackbox tests.
 
3437
    from bzrlib import repository
 
3438
    repository._deprecation_warning_done = True
 
3439
 
 
3440
    global default_transport
 
3441
    if transport is None:
 
3442
        transport = default_transport
 
3443
    old_transport = default_transport
 
3444
    default_transport = transport
 
3445
    global selftest_debug_flags
 
3446
    old_debug_flags = selftest_debug_flags
 
3447
    if debug_flags is not None:
 
3448
        selftest_debug_flags = set(debug_flags)
 
3449
    try:
 
3450
        if load_list is None:
 
3451
            keep_only = None
 
3452
        else:
 
3453
            keep_only = load_test_id_list(load_list)
 
3454
        if starting_with:
 
3455
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3456
                             for start in starting_with]
 
3457
        if test_suite_factory is None:
 
3458
            # Reduce loading time by loading modules based on the starting_with
 
3459
            # patterns.
 
3460
            suite = test_suite(keep_only, starting_with)
 
3461
        else:
 
3462
            suite = test_suite_factory()
 
3463
        if starting_with:
 
3464
            # But always filter as requested.
 
3465
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3466
        result_decorators = []
 
3467
        if lsprof_tests:
 
3468
            result_decorators.append(ProfileResult)
 
3469
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3470
                     stop_on_failure=stop_on_failure,
 
3471
                     transport=transport,
 
3472
                     lsprof_timed=lsprof_timed,
 
3473
                     bench_history=bench_history,
 
3474
                     matching_tests_first=matching_tests_first,
 
3475
                     list_only=list_only,
 
3476
                     random_seed=random_seed,
 
3477
                     exclude_pattern=exclude_pattern,
 
3478
                     strict=strict,
 
3479
                     runner_class=runner_class,
 
3480
                     suite_decorators=suite_decorators,
 
3481
                     stream=stream,
 
3482
                     result_decorators=result_decorators,
 
3483
                     )
 
3484
    finally:
 
3485
        default_transport = old_transport
 
3486
        selftest_debug_flags = old_debug_flags
 
3487
 
 
3488
 
 
3489
def load_test_id_list(file_name):
 
3490
    """Load a test id list from a text file.
 
3491
 
 
3492
    The format is one test id by line.  No special care is taken to impose
 
3493
    strict rules, these test ids are used to filter the test suite so a test id
 
3494
    that do not match an existing test will do no harm. This allows user to add
 
3495
    comments, leave blank lines, etc.
 
3496
    """
 
3497
    test_list = []
 
3498
    try:
 
3499
        ftest = open(file_name, 'rt')
 
3500
    except IOError, e:
 
3501
        if e.errno != errno.ENOENT:
 
3502
            raise
 
3503
        else:
 
3504
            raise errors.NoSuchFile(file_name)
 
3505
 
 
3506
    for test_name in ftest.readlines():
 
3507
        test_list.append(test_name.strip())
 
3508
    ftest.close()
 
3509
    return test_list
 
3510
 
 
3511
 
 
3512
def suite_matches_id_list(test_suite, id_list):
 
3513
    """Warns about tests not appearing or appearing more than once.
 
3514
 
 
3515
    :param test_suite: A TestSuite object.
 
3516
    :param test_id_list: The list of test ids that should be found in
 
3517
         test_suite.
 
3518
 
 
3519
    :return: (absents, duplicates) absents is a list containing the test found
 
3520
        in id_list but not in test_suite, duplicates is a list containing the
 
3521
        test found multiple times in test_suite.
 
3522
 
 
3523
    When using a prefined test id list, it may occurs that some tests do not
 
3524
    exist anymore or that some tests use the same id. This function warns the
 
3525
    tester about potential problems in his workflow (test lists are volatile)
 
3526
    or in the test suite itself (using the same id for several tests does not
 
3527
    help to localize defects).
 
3528
    """
 
3529
    # Build a dict counting id occurrences
 
3530
    tests = dict()
 
3531
    for test in iter_suite_tests(test_suite):
 
3532
        id = test.id()
 
3533
        tests[id] = tests.get(id, 0) + 1
 
3534
 
 
3535
    not_found = []
 
3536
    duplicates = []
 
3537
    for id in id_list:
 
3538
        occurs = tests.get(id, 0)
 
3539
        if not occurs:
 
3540
            not_found.append(id)
 
3541
        elif occurs > 1:
 
3542
            duplicates.append(id)
 
3543
 
 
3544
    return not_found, duplicates
 
3545
 
 
3546
 
 
3547
class TestIdList(object):
 
3548
    """Test id list to filter a test suite.
 
3549
 
 
3550
    Relying on the assumption that test ids are built as:
 
3551
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3552
    notation, this class offers methods to :
 
3553
    - avoid building a test suite for modules not refered to in the test list,
 
3554
    - keep only the tests listed from the module test suite.
 
3555
    """
 
3556
 
 
3557
    def __init__(self, test_id_list):
 
3558
        # When a test suite needs to be filtered against us we compare test ids
 
3559
        # for equality, so a simple dict offers a quick and simple solution.
 
3560
        self.tests = dict().fromkeys(test_id_list, True)
 
3561
 
 
3562
        # While unittest.TestCase have ids like:
 
3563
        # <module>.<class>.<method>[(<param+)],
 
3564
        # doctest.DocTestCase can have ids like:
 
3565
        # <module>
 
3566
        # <module>.<class>
 
3567
        # <module>.<function>
 
3568
        # <module>.<class>.<method>
 
3569
 
 
3570
        # Since we can't predict a test class from its name only, we settle on
 
3571
        # a simple constraint: a test id always begins with its module name.
 
3572
 
 
3573
        modules = {}
 
3574
        for test_id in test_id_list:
 
3575
            parts = test_id.split('.')
 
3576
            mod_name = parts.pop(0)
 
3577
            modules[mod_name] = True
 
3578
            for part in parts:
 
3579
                mod_name += '.' + part
 
3580
                modules[mod_name] = True
 
3581
        self.modules = modules
 
3582
 
 
3583
    def refers_to(self, module_name):
 
3584
        """Is there tests for the module or one of its sub modules."""
 
3585
        return self.modules.has_key(module_name)
 
3586
 
 
3587
    def includes(self, test_id):
 
3588
        return self.tests.has_key(test_id)
 
3589
 
 
3590
 
 
3591
class TestPrefixAliasRegistry(registry.Registry):
 
3592
    """A registry for test prefix aliases.
 
3593
 
 
3594
    This helps implement shorcuts for the --starting-with selftest
 
3595
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3596
    warning will be emitted).
 
3597
    """
 
3598
 
 
3599
    def register(self, key, obj, help=None, info=None,
 
3600
                 override_existing=False):
 
3601
        """See Registry.register.
 
3602
 
 
3603
        Trying to override an existing alias causes a warning to be emitted,
 
3604
        not a fatal execption.
 
3605
        """
 
3606
        try:
 
3607
            super(TestPrefixAliasRegistry, self).register(
 
3608
                key, obj, help=help, info=info, override_existing=False)
 
3609
        except KeyError:
 
3610
            actual = self.get(key)
 
3611
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3612
                 % (key, actual, obj))
 
3613
 
 
3614
    def resolve_alias(self, id_start):
 
3615
        """Replace the alias by the prefix in the given string.
 
3616
 
 
3617
        Using an unknown prefix is an error to help catching typos.
 
3618
        """
 
3619
        parts = id_start.split('.')
 
3620
        try:
 
3621
            parts[0] = self.get(parts[0])
 
3622
        except KeyError:
 
3623
            raise errors.BzrCommandError(
 
3624
                '%s is not a known test prefix alias' % parts[0])
 
3625
        return '.'.join(parts)
 
3626
 
 
3627
 
 
3628
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3629
"""Registry of test prefix aliases."""
 
3630
 
 
3631
 
 
3632
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3633
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3634
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3635
 
 
3636
# Obvious highest levels prefixes, feel free to add your own via a plugin
 
3637
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3638
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3639
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3640
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3641
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3642
 
 
3643
 
 
3644
def _test_suite_testmod_names():
 
3645
    """Return the standard list of test module names to test."""
 
3646
    return [
 
3647
        'bzrlib.doc',
 
3648
        'bzrlib.tests.blackbox',
 
3649
        'bzrlib.tests.commands',
 
3650
        'bzrlib.tests.doc_generate',
 
3651
        'bzrlib.tests.per_branch',
 
3652
        'bzrlib.tests.per_bzrdir',
 
3653
        'bzrlib.tests.per_controldir',
 
3654
        'bzrlib.tests.per_controldir_colo',
 
3655
        'bzrlib.tests.per_foreign_vcs',
 
3656
        'bzrlib.tests.per_interrepository',
 
3657
        'bzrlib.tests.per_intertree',
 
3658
        'bzrlib.tests.per_inventory',
 
3659
        'bzrlib.tests.per_interbranch',
 
3660
        'bzrlib.tests.per_lock',
 
3661
        'bzrlib.tests.per_merger',
 
3662
        'bzrlib.tests.per_transport',
 
3663
        'bzrlib.tests.per_tree',
 
3664
        'bzrlib.tests.per_pack_repository',
 
3665
        'bzrlib.tests.per_repository',
 
3666
        'bzrlib.tests.per_repository_chk',
 
3667
        'bzrlib.tests.per_repository_reference',
 
3668
        'bzrlib.tests.per_uifactory',
 
3669
        'bzrlib.tests.per_versionedfile',
 
3670
        'bzrlib.tests.per_workingtree',
 
3671
        'bzrlib.tests.test__annotator',
 
3672
        'bzrlib.tests.test__bencode',
 
3673
        'bzrlib.tests.test__btree_serializer',
 
3674
        'bzrlib.tests.test__chk_map',
 
3675
        'bzrlib.tests.test__dirstate_helpers',
 
3676
        'bzrlib.tests.test__groupcompress',
 
3677
        'bzrlib.tests.test__known_graph',
 
3678
        'bzrlib.tests.test__rio',
 
3679
        'bzrlib.tests.test__simple_set',
 
3680
        'bzrlib.tests.test__static_tuple',
 
3681
        'bzrlib.tests.test__walkdirs_win32',
 
3682
        'bzrlib.tests.test_ancestry',
 
3683
        'bzrlib.tests.test_annotate',
 
3684
        'bzrlib.tests.test_api',
 
3685
        'bzrlib.tests.test_atomicfile',
 
3686
        'bzrlib.tests.test_bad_files',
 
3687
        'bzrlib.tests.test_bisect_multi',
 
3688
        'bzrlib.tests.test_branch',
 
3689
        'bzrlib.tests.test_branchbuilder',
 
3690
        'bzrlib.tests.test_btree_index',
 
3691
        'bzrlib.tests.test_bugtracker',
 
3692
        'bzrlib.tests.test_bundle',
 
3693
        'bzrlib.tests.test_bzrdir',
 
3694
        'bzrlib.tests.test__chunks_to_lines',
 
3695
        'bzrlib.tests.test_cache_utf8',
 
3696
        'bzrlib.tests.test_chk_map',
 
3697
        'bzrlib.tests.test_chk_serializer',
 
3698
        'bzrlib.tests.test_chunk_writer',
 
3699
        'bzrlib.tests.test_clean_tree',
 
3700
        'bzrlib.tests.test_cleanup',
 
3701
        'bzrlib.tests.test_cmdline',
 
3702
        'bzrlib.tests.test_commands',
 
3703
        'bzrlib.tests.test_commit',
 
3704
        'bzrlib.tests.test_commit_merge',
 
3705
        'bzrlib.tests.test_config',
 
3706
        'bzrlib.tests.test_conflicts',
 
3707
        'bzrlib.tests.test_counted_lock',
 
3708
        'bzrlib.tests.test_crash',
 
3709
        'bzrlib.tests.test_decorators',
 
3710
        'bzrlib.tests.test_delta',
 
3711
        'bzrlib.tests.test_debug',
 
3712
        'bzrlib.tests.test_deprecated_graph',
 
3713
        'bzrlib.tests.test_diff',
 
3714
        'bzrlib.tests.test_directory_service',
 
3715
        'bzrlib.tests.test_dirstate',
 
3716
        'bzrlib.tests.test_email_message',
 
3717
        'bzrlib.tests.test_eol_filters',
 
3718
        'bzrlib.tests.test_errors',
 
3719
        'bzrlib.tests.test_export',
 
3720
        'bzrlib.tests.test_extract',
 
3721
        'bzrlib.tests.test_fetch',
 
3722
        'bzrlib.tests.test_fixtures',
 
3723
        'bzrlib.tests.test_fifo_cache',
 
3724
        'bzrlib.tests.test_filters',
 
3725
        'bzrlib.tests.test_ftp_transport',
 
3726
        'bzrlib.tests.test_foreign',
 
3727
        'bzrlib.tests.test_generate_docs',
 
3728
        'bzrlib.tests.test_generate_ids',
 
3729
        'bzrlib.tests.test_globbing',
 
3730
        'bzrlib.tests.test_gpg',
 
3731
        'bzrlib.tests.test_graph',
 
3732
        'bzrlib.tests.test_groupcompress',
 
3733
        'bzrlib.tests.test_hashcache',
 
3734
        'bzrlib.tests.test_help',
 
3735
        'bzrlib.tests.test_hooks',
 
3736
        'bzrlib.tests.test_http',
 
3737
        'bzrlib.tests.test_http_response',
 
3738
        'bzrlib.tests.test_https_ca_bundle',
 
3739
        'bzrlib.tests.test_identitymap',
 
3740
        'bzrlib.tests.test_ignores',
 
3741
        'bzrlib.tests.test_index',
 
3742
        'bzrlib.tests.test_import_tariff',
 
3743
        'bzrlib.tests.test_info',
 
3744
        'bzrlib.tests.test_inv',
 
3745
        'bzrlib.tests.test_inventory_delta',
 
3746
        'bzrlib.tests.test_knit',
 
3747
        'bzrlib.tests.test_lazy_import',
 
3748
        'bzrlib.tests.test_lazy_regex',
 
3749
        'bzrlib.tests.test_library_state',
 
3750
        'bzrlib.tests.test_lock',
 
3751
        'bzrlib.tests.test_lockable_files',
 
3752
        'bzrlib.tests.test_lockdir',
 
3753
        'bzrlib.tests.test_log',
 
3754
        'bzrlib.tests.test_lru_cache',
 
3755
        'bzrlib.tests.test_lsprof',
 
3756
        'bzrlib.tests.test_mail_client',
 
3757
        'bzrlib.tests.test_matchers',
 
3758
        'bzrlib.tests.test_memorytree',
 
3759
        'bzrlib.tests.test_merge',
 
3760
        'bzrlib.tests.test_merge3',
 
3761
        'bzrlib.tests.test_merge_core',
 
3762
        'bzrlib.tests.test_merge_directive',
 
3763
        'bzrlib.tests.test_missing',
 
3764
        'bzrlib.tests.test_msgeditor',
 
3765
        'bzrlib.tests.test_multiparent',
 
3766
        'bzrlib.tests.test_mutabletree',
 
3767
        'bzrlib.tests.test_nonascii',
 
3768
        'bzrlib.tests.test_options',
 
3769
        'bzrlib.tests.test_osutils',
 
3770
        'bzrlib.tests.test_osutils_encodings',
 
3771
        'bzrlib.tests.test_pack',
 
3772
        'bzrlib.tests.test_patch',
 
3773
        'bzrlib.tests.test_patches',
 
3774
        'bzrlib.tests.test_permissions',
 
3775
        'bzrlib.tests.test_plugins',
 
3776
        'bzrlib.tests.test_progress',
 
3777
        'bzrlib.tests.test_read_bundle',
 
3778
        'bzrlib.tests.test_reconcile',
 
3779
        'bzrlib.tests.test_reconfigure',
 
3780
        'bzrlib.tests.test_registry',
 
3781
        'bzrlib.tests.test_remote',
 
3782
        'bzrlib.tests.test_rename_map',
 
3783
        'bzrlib.tests.test_repository',
 
3784
        'bzrlib.tests.test_revert',
 
3785
        'bzrlib.tests.test_revision',
 
3786
        'bzrlib.tests.test_revisionspec',
 
3787
        'bzrlib.tests.test_revisiontree',
 
3788
        'bzrlib.tests.test_rio',
 
3789
        'bzrlib.tests.test_rules',
 
3790
        'bzrlib.tests.test_sampler',
 
3791
        'bzrlib.tests.test_script',
 
3792
        'bzrlib.tests.test_selftest',
 
3793
        'bzrlib.tests.test_serializer',
 
3794
        'bzrlib.tests.test_setup',
 
3795
        'bzrlib.tests.test_sftp_transport',
 
3796
        'bzrlib.tests.test_shelf',
 
3797
        'bzrlib.tests.test_shelf_ui',
 
3798
        'bzrlib.tests.test_smart',
 
3799
        'bzrlib.tests.test_smart_add',
 
3800
        'bzrlib.tests.test_smart_request',
 
3801
        'bzrlib.tests.test_smart_transport',
 
3802
        'bzrlib.tests.test_smtp_connection',
 
3803
        'bzrlib.tests.test_source',
 
3804
        'bzrlib.tests.test_ssh_transport',
 
3805
        'bzrlib.tests.test_status',
 
3806
        'bzrlib.tests.test_store',
 
3807
        'bzrlib.tests.test_strace',
 
3808
        'bzrlib.tests.test_subsume',
 
3809
        'bzrlib.tests.test_switch',
 
3810
        'bzrlib.tests.test_symbol_versioning',
 
3811
        'bzrlib.tests.test_tag',
 
3812
        'bzrlib.tests.test_test_server',
 
3813
        'bzrlib.tests.test_testament',
 
3814
        'bzrlib.tests.test_textfile',
 
3815
        'bzrlib.tests.test_textmerge',
 
3816
        'bzrlib.tests.test_timestamp',
 
3817
        'bzrlib.tests.test_trace',
 
3818
        'bzrlib.tests.test_transactions',
 
3819
        'bzrlib.tests.test_transform',
 
3820
        'bzrlib.tests.test_transport',
 
3821
        'bzrlib.tests.test_transport_log',
 
3822
        'bzrlib.tests.test_tree',
 
3823
        'bzrlib.tests.test_treebuilder',
 
3824
        'bzrlib.tests.test_treeshape',
 
3825
        'bzrlib.tests.test_tsort',
 
3826
        'bzrlib.tests.test_tuned_gzip',
 
3827
        'bzrlib.tests.test_ui',
 
3828
        'bzrlib.tests.test_uncommit',
 
3829
        'bzrlib.tests.test_upgrade',
 
3830
        'bzrlib.tests.test_upgrade_stacked',
 
3831
        'bzrlib.tests.test_urlutils',
 
3832
        'bzrlib.tests.test_version',
 
3833
        'bzrlib.tests.test_version_info',
 
3834
        'bzrlib.tests.test_versionedfile',
 
3835
        'bzrlib.tests.test_weave',
 
3836
        'bzrlib.tests.test_whitebox',
 
3837
        'bzrlib.tests.test_win32utils',
 
3838
        'bzrlib.tests.test_workingtree',
 
3839
        'bzrlib.tests.test_workingtree_4',
 
3840
        'bzrlib.tests.test_wsgi',
 
3841
        'bzrlib.tests.test_xml',
 
3842
        ]
 
3843
 
 
3844
 
 
3845
def _test_suite_modules_to_doctest():
 
3846
    """Return the list of modules to doctest."""
 
3847
    if __doc__ is None:
 
3848
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
 
3849
        return []
 
3850
    return [
 
3851
        'bzrlib',
 
3852
        'bzrlib.branchbuilder',
 
3853
        'bzrlib.decorators',
 
3854
        'bzrlib.export',
 
3855
        'bzrlib.inventory',
 
3856
        'bzrlib.iterablefile',
 
3857
        'bzrlib.lockdir',
 
3858
        'bzrlib.merge3',
 
3859
        'bzrlib.option',
 
3860
        'bzrlib.symbol_versioning',
 
3861
        'bzrlib.tests',
 
3862
        'bzrlib.tests.fixtures',
 
3863
        'bzrlib.timestamp',
 
3864
        'bzrlib.version_info_formats.format_custom',
 
3865
        ]
 
3866
 
 
3867
 
 
3868
def test_suite(keep_only=None, starting_with=None):
 
3869
    """Build and return TestSuite for the whole of bzrlib.
 
3870
 
 
3871
    :param keep_only: A list of test ids limiting the suite returned.
 
3872
 
 
3873
    :param starting_with: An id limiting the suite returned to the tests
 
3874
         starting with it.
 
3875
 
 
3876
    This function can be replaced if you need to change the default test
 
3877
    suite on a global basis, but it is not encouraged.
 
3878
    """
 
3879
 
 
3880
    loader = TestUtil.TestLoader()
 
3881
 
 
3882
    if keep_only is not None:
 
3883
        id_filter = TestIdList(keep_only)
 
3884
    if starting_with:
 
3885
        # We take precedence over keep_only because *at loading time* using
 
3886
        # both options means we will load less tests for the same final result.
 
3887
        def interesting_module(name):
 
3888
            for start in starting_with:
 
3889
                if (
 
3890
                    # Either the module name starts with the specified string
 
3891
                    name.startswith(start)
 
3892
                    # or it may contain tests starting with the specified string
 
3893
                    or start.startswith(name)
 
3894
                    ):
 
3895
                    return True
 
3896
            return False
 
3897
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3898
 
 
3899
    elif keep_only is not None:
 
3900
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3901
        def interesting_module(name):
 
3902
            return id_filter.refers_to(name)
 
3903
 
 
3904
    else:
 
3905
        loader = TestUtil.TestLoader()
 
3906
        def interesting_module(name):
 
3907
            # No filtering, all modules are interesting
 
3908
            return True
 
3909
 
 
3910
    suite = loader.suiteClass()
 
3911
 
 
3912
    # modules building their suite with loadTestsFromModuleNames
 
3913
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3914
 
 
3915
    for mod in _test_suite_modules_to_doctest():
 
3916
        if not interesting_module(mod):
 
3917
            # No tests to keep here, move along
 
3918
            continue
 
3919
        try:
 
3920
            # note that this really does mean "report only" -- doctest
 
3921
            # still runs the rest of the examples
 
3922
            doc_suite = doctest.DocTestSuite(mod,
 
3923
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3924
        except ValueError, e:
 
3925
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3926
            raise
 
3927
        if len(doc_suite._tests) == 0:
 
3928
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3929
        suite.addTest(doc_suite)
 
3930
 
 
3931
    default_encoding = sys.getdefaultencoding()
 
3932
    for name, plugin in bzrlib.plugin.plugins().items():
 
3933
        if not interesting_module(plugin.module.__name__):
 
3934
            continue
 
3935
        plugin_suite = plugin.test_suite()
 
3936
        # We used to catch ImportError here and turn it into just a warning,
 
3937
        # but really if you don't have --no-plugins this should be a failure.
 
3938
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3939
        if plugin_suite is None:
 
3940
            plugin_suite = plugin.load_plugin_tests(loader)
 
3941
        if plugin_suite is not None:
 
3942
            suite.addTest(plugin_suite)
 
3943
        if default_encoding != sys.getdefaultencoding():
 
3944
            bzrlib.trace.warning(
 
3945
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3946
                sys.getdefaultencoding())
 
3947
            reload(sys)
 
3948
            sys.setdefaultencoding(default_encoding)
 
3949
 
 
3950
    if keep_only is not None:
 
3951
        # Now that the referred modules have loaded their tests, keep only the
 
3952
        # requested ones.
 
3953
        suite = filter_suite_by_id_list(suite, id_filter)
 
3954
        # Do some sanity checks on the id_list filtering
 
3955
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3956
        if starting_with:
 
3957
            # The tester has used both keep_only and starting_with, so he is
 
3958
            # already aware that some tests are excluded from the list, there
 
3959
            # is no need to tell him which.
 
3960
            pass
 
3961
        else:
 
3962
            # Some tests mentioned in the list are not in the test suite. The
 
3963
            # list may be out of date, report to the tester.
 
3964
            for id in not_found:
 
3965
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3966
        for id in duplicates:
 
3967
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3968
 
 
3969
    return suite
 
3970
 
 
3971
 
 
3972
def multiply_scenarios(scenarios_left, scenarios_right):
 
3973
    """Multiply two sets of scenarios.
 
3974
 
 
3975
    :returns: the cartesian product of the two sets of scenarios, that is
 
3976
        a scenario for every possible combination of a left scenario and a
 
3977
        right scenario.
 
3978
    """
 
3979
    return [
 
3980
        ('%s,%s' % (left_name, right_name),
 
3981
         dict(left_dict.items() + right_dict.items()))
 
3982
        for left_name, left_dict in scenarios_left
 
3983
        for right_name, right_dict in scenarios_right]
 
3984
 
 
3985
 
 
3986
def multiply_tests(tests, scenarios, result):
 
3987
    """Multiply tests_list by scenarios into result.
 
3988
 
 
3989
    This is the core workhorse for test parameterisation.
 
3990
 
 
3991
    Typically the load_tests() method for a per-implementation test suite will
 
3992
    call multiply_tests and return the result.
 
3993
 
 
3994
    :param tests: The tests to parameterise.
 
3995
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3996
        scenario_param_dict).
 
3997
    :param result: A TestSuite to add created tests to.
 
3998
 
 
3999
    This returns the passed in result TestSuite with the cross product of all
 
4000
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4001
    the scenario name at the end of its id(), and updating the test object's
 
4002
    __dict__ with the scenario_param_dict.
 
4003
 
 
4004
    >>> import bzrlib.tests.test_sampler
 
4005
    >>> r = multiply_tests(
 
4006
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4007
    ...     [('one', dict(param=1)),
 
4008
    ...      ('two', dict(param=2))],
 
4009
    ...     TestUtil.TestSuite())
 
4010
    >>> tests = list(iter_suite_tests(r))
 
4011
    >>> len(tests)
 
4012
    2
 
4013
    >>> tests[0].id()
 
4014
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4015
    >>> tests[0].param
 
4016
    1
 
4017
    >>> tests[1].param
 
4018
    2
 
4019
    """
 
4020
    for test in iter_suite_tests(tests):
 
4021
        apply_scenarios(test, scenarios, result)
 
4022
    return result
 
4023
 
 
4024
 
 
4025
def apply_scenarios(test, scenarios, result):
 
4026
    """Apply the scenarios in scenarios to test and add to result.
 
4027
 
 
4028
    :param test: The test to apply scenarios to.
 
4029
    :param scenarios: An iterable of scenarios to apply to test.
 
4030
    :return: result
 
4031
    :seealso: apply_scenario
 
4032
    """
 
4033
    for scenario in scenarios:
 
4034
        result.addTest(apply_scenario(test, scenario))
 
4035
    return result
 
4036
 
 
4037
 
 
4038
def apply_scenario(test, scenario):
 
4039
    """Copy test and apply scenario to it.
 
4040
 
 
4041
    :param test: A test to adapt.
 
4042
    :param scenario: A tuple describing the scenarion.
 
4043
        The first element of the tuple is the new test id.
 
4044
        The second element is a dict containing attributes to set on the
 
4045
        test.
 
4046
    :return: The adapted test.
 
4047
    """
 
4048
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4049
    new_test = clone_test(test, new_id)
 
4050
    for name, value in scenario[1].items():
 
4051
        setattr(new_test, name, value)
 
4052
    return new_test
 
4053
 
 
4054
 
 
4055
def clone_test(test, new_id):
 
4056
    """Clone a test giving it a new id.
 
4057
 
 
4058
    :param test: The test to clone.
 
4059
    :param new_id: The id to assign to it.
 
4060
    :return: The new test.
 
4061
    """
 
4062
    new_test = copy.copy(test)
 
4063
    new_test.id = lambda: new_id
 
4064
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
 
4065
    # causes cloned tests to share the 'details' dict.  This makes it hard to
 
4066
    # read the test output for parameterized tests, because tracebacks will be
 
4067
    # associated with irrelevant tests.
 
4068
    try:
 
4069
        details = new_test._TestCase__details
 
4070
    except AttributeError:
 
4071
        # must be a different version of testtools than expected.  Do nothing.
 
4072
        pass
 
4073
    else:
 
4074
        # Reset the '__details' dict.
 
4075
        new_test._TestCase__details = {}
 
4076
    return new_test
 
4077
 
 
4078
 
 
4079
def permute_tests_for_extension(standard_tests, loader, py_module_name,
 
4080
                                ext_module_name):
 
4081
    """Helper for permutating tests against an extension module.
 
4082
 
 
4083
    This is meant to be used inside a modules 'load_tests()' function. It will
 
4084
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
 
4085
    against both implementations. Setting 'test.module' to the appropriate
 
4086
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
 
4087
 
 
4088
    :param standard_tests: A test suite to permute
 
4089
    :param loader: A TestLoader
 
4090
    :param py_module_name: The python path to a python module that can always
 
4091
        be loaded, and will be considered the 'python' implementation. (eg
 
4092
        'bzrlib._chk_map_py')
 
4093
    :param ext_module_name: The python path to an extension module. If the
 
4094
        module cannot be loaded, a single test will be added, which notes that
 
4095
        the module is not available. If it can be loaded, all standard_tests
 
4096
        will be run against that module.
 
4097
    :return: (suite, feature) suite is a test-suite that has all the permuted
 
4098
        tests. feature is the Feature object that can be used to determine if
 
4099
        the module is available.
 
4100
    """
 
4101
 
 
4102
    py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
 
4103
    scenarios = [
 
4104
        ('python', {'module': py_module}),
 
4105
    ]
 
4106
    suite = loader.suiteClass()
 
4107
    feature = ModuleAvailableFeature(ext_module_name)
 
4108
    if feature.available():
 
4109
        scenarios.append(('C', {'module': feature.module}))
 
4110
    else:
 
4111
        # the compiled module isn't available, so we add a failing test
 
4112
        class FailWithoutFeature(TestCase):
 
4113
            def test_fail(self):
 
4114
                self.requireFeature(feature)
 
4115
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
4116
    result = multiply_tests(standard_tests, scenarios, suite)
 
4117
    return result, feature
 
4118
 
 
4119
 
 
4120
def _rmtree_temp_dir(dirname, test_id=None):
 
4121
    # If LANG=C we probably have created some bogus paths
 
4122
    # which rmtree(unicode) will fail to delete
 
4123
    # so make sure we are using rmtree(str) to delete everything
 
4124
    # except on win32, where rmtree(str) will fail
 
4125
    # since it doesn't have the property of byte-stream paths
 
4126
    # (they are either ascii or mbcs)
 
4127
    if sys.platform == 'win32':
 
4128
        # make sure we are using the unicode win32 api
 
4129
        dirname = unicode(dirname)
 
4130
    else:
 
4131
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4132
    try:
 
4133
        osutils.rmtree(dirname)
 
4134
    except OSError, e:
 
4135
        # We don't want to fail here because some useful display will be lost
 
4136
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4137
        # possible info to the test runner is even worse.
 
4138
        if test_id != None:
 
4139
            ui.ui_factory.clear_term()
 
4140
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4141
        # Ugly, but the last thing we want here is fail, so bear with it.
 
4142
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
 
4143
                                    ).encode('ascii', 'replace')
 
4144
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4145
                         % (os.path.basename(dirname), printable_e))
 
4146
 
 
4147
 
 
4148
class Feature(object):
 
4149
    """An operating system Feature."""
 
4150
 
 
4151
    def __init__(self):
 
4152
        self._available = None
 
4153
 
 
4154
    def available(self):
 
4155
        """Is the feature available?
 
4156
 
 
4157
        :return: True if the feature is available.
 
4158
        """
 
4159
        if self._available is None:
 
4160
            self._available = self._probe()
 
4161
        return self._available
 
4162
 
 
4163
    def _probe(self):
 
4164
        """Implement this method in concrete features.
 
4165
 
 
4166
        :return: True if the feature is available.
 
4167
        """
 
4168
        raise NotImplementedError
 
4169
 
 
4170
    def __str__(self):
 
4171
        if getattr(self, 'feature_name', None):
 
4172
            return self.feature_name()
 
4173
        return self.__class__.__name__
 
4174
 
 
4175
 
 
4176
class _SymlinkFeature(Feature):
 
4177
 
 
4178
    def _probe(self):
 
4179
        return osutils.has_symlinks()
 
4180
 
 
4181
    def feature_name(self):
 
4182
        return 'symlinks'
 
4183
 
 
4184
SymlinkFeature = _SymlinkFeature()
 
4185
 
 
4186
 
 
4187
class _HardlinkFeature(Feature):
 
4188
 
 
4189
    def _probe(self):
 
4190
        return osutils.has_hardlinks()
 
4191
 
 
4192
    def feature_name(self):
 
4193
        return 'hardlinks'
 
4194
 
 
4195
HardlinkFeature = _HardlinkFeature()
 
4196
 
 
4197
 
 
4198
class _OsFifoFeature(Feature):
 
4199
 
 
4200
    def _probe(self):
 
4201
        return getattr(os, 'mkfifo', None)
 
4202
 
 
4203
    def feature_name(self):
 
4204
        return 'filesystem fifos'
 
4205
 
 
4206
OsFifoFeature = _OsFifoFeature()
 
4207
 
 
4208
 
 
4209
class _UnicodeFilenameFeature(Feature):
 
4210
    """Does the filesystem support Unicode filenames?"""
 
4211
 
 
4212
    def _probe(self):
 
4213
        try:
 
4214
            # Check for character combinations unlikely to be covered by any
 
4215
            # single non-unicode encoding. We use the characters
 
4216
            # - greek small letter alpha (U+03B1) and
 
4217
            # - braille pattern dots-123456 (U+283F).
 
4218
            os.stat(u'\u03b1\u283f')
 
4219
        except UnicodeEncodeError:
 
4220
            return False
 
4221
        except (IOError, OSError):
 
4222
            # The filesystem allows the Unicode filename but the file doesn't
 
4223
            # exist.
 
4224
            return True
 
4225
        else:
 
4226
            # The filesystem allows the Unicode filename and the file exists,
 
4227
            # for some reason.
 
4228
            return True
 
4229
 
 
4230
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4231
 
 
4232
 
 
4233
class _CompatabilityThunkFeature(Feature):
 
4234
    """This feature is just a thunk to another feature.
 
4235
 
 
4236
    It issues a deprecation warning if it is accessed, to let you know that you
 
4237
    should really use a different feature.
 
4238
    """
 
4239
 
 
4240
    def __init__(self, dep_version, module, name,
 
4241
                 replacement_name, replacement_module=None):
 
4242
        super(_CompatabilityThunkFeature, self).__init__()
 
4243
        self._module = module
 
4244
        if replacement_module is None:
 
4245
            replacement_module = module
 
4246
        self._replacement_module = replacement_module
 
4247
        self._name = name
 
4248
        self._replacement_name = replacement_name
 
4249
        self._dep_version = dep_version
 
4250
        self._feature = None
 
4251
 
 
4252
    def _ensure(self):
 
4253
        if self._feature is None:
 
4254
            depr_msg = self._dep_version % ('%s.%s'
 
4255
                                            % (self._module, self._name))
 
4256
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4257
                                               self._replacement_name)
 
4258
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4259
            # Import the new feature and use it as a replacement for the
 
4260
            # deprecated one.
 
4261
            mod = __import__(self._replacement_module, {}, {},
 
4262
                             [self._replacement_name])
 
4263
            self._feature = getattr(mod, self._replacement_name)
 
4264
 
 
4265
    def _probe(self):
 
4266
        self._ensure()
 
4267
        return self._feature._probe()
 
4268
 
 
4269
 
 
4270
class ModuleAvailableFeature(Feature):
 
4271
    """This is a feature than describes a module we want to be available.
 
4272
 
 
4273
    Declare the name of the module in __init__(), and then after probing, the
 
4274
    module will be available as 'self.module'.
 
4275
 
 
4276
    :ivar module: The module if it is available, else None.
 
4277
    """
 
4278
 
 
4279
    def __init__(self, module_name):
 
4280
        super(ModuleAvailableFeature, self).__init__()
 
4281
        self.module_name = module_name
 
4282
 
 
4283
    def _probe(self):
 
4284
        try:
 
4285
            self._module = __import__(self.module_name, {}, {}, [''])
 
4286
            return True
 
4287
        except ImportError:
 
4288
            return False
 
4289
 
 
4290
    @property
 
4291
    def module(self):
 
4292
        if self.available(): # Make sure the probe has been done
 
4293
            return self._module
 
4294
        return None
 
4295
 
 
4296
    def feature_name(self):
 
4297
        return self.module_name
 
4298
 
 
4299
 
 
4300
# This is kept here for compatibility, it is recommended to use
 
4301
# 'bzrlib.tests.feature.paramiko' instead
 
4302
ParamikoFeature = _CompatabilityThunkFeature(
 
4303
    deprecated_in((2,1,0)),
 
4304
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
 
4305
 
 
4306
 
 
4307
def probe_unicode_in_user_encoding():
 
4308
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4309
    Return first successfull match.
 
4310
 
 
4311
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4312
    """
 
4313
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4314
    for uni_val in possible_vals:
 
4315
        try:
 
4316
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4317
        except UnicodeEncodeError:
 
4318
            # Try a different character
 
4319
            pass
 
4320
        else:
 
4321
            return uni_val, str_val
 
4322
    return None, None
 
4323
 
 
4324
 
 
4325
def probe_bad_non_ascii(encoding):
 
4326
    """Try to find [bad] character with code [128..255]
 
4327
    that cannot be decoded to unicode in some encoding.
 
4328
    Return None if all non-ascii characters is valid
 
4329
    for given encoding.
 
4330
    """
 
4331
    for i in xrange(128, 256):
 
4332
        char = chr(i)
 
4333
        try:
 
4334
            char.decode(encoding)
 
4335
        except UnicodeDecodeError:
 
4336
            return char
 
4337
    return None
 
4338
 
 
4339
 
 
4340
class _HTTPSServerFeature(Feature):
 
4341
    """Some tests want an https Server, check if one is available.
 
4342
 
 
4343
    Right now, the only way this is available is under python2.6 which provides
 
4344
    an ssl module.
 
4345
    """
 
4346
 
 
4347
    def _probe(self):
 
4348
        try:
 
4349
            import ssl
 
4350
            return True
 
4351
        except ImportError:
 
4352
            return False
 
4353
 
 
4354
    def feature_name(self):
 
4355
        return 'HTTPSServer'
 
4356
 
 
4357
 
 
4358
HTTPSServerFeature = _HTTPSServerFeature()
 
4359
 
 
4360
 
 
4361
class _UnicodeFilename(Feature):
 
4362
    """Does the filesystem support Unicode filenames?"""
 
4363
 
 
4364
    def _probe(self):
 
4365
        try:
 
4366
            os.stat(u'\u03b1')
 
4367
        except UnicodeEncodeError:
 
4368
            return False
 
4369
        except (IOError, OSError):
 
4370
            # The filesystem allows the Unicode filename but the file doesn't
 
4371
            # exist.
 
4372
            return True
 
4373
        else:
 
4374
            # The filesystem allows the Unicode filename and the file exists,
 
4375
            # for some reason.
 
4376
            return True
 
4377
 
 
4378
UnicodeFilename = _UnicodeFilename()
 
4379
 
 
4380
 
 
4381
class _ByteStringNamedFilesystem(Feature):
 
4382
    """Is the filesystem based on bytes?"""
 
4383
 
 
4384
    def _probe(self):
 
4385
        if os.name == "posix":
 
4386
            return True
 
4387
        return False
 
4388
 
 
4389
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
 
4390
 
 
4391
 
 
4392
class _UTF8Filesystem(Feature):
 
4393
    """Is the filesystem UTF-8?"""
 
4394
 
 
4395
    def _probe(self):
 
4396
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4397
            return True
 
4398
        return False
 
4399
 
 
4400
UTF8Filesystem = _UTF8Filesystem()
 
4401
 
 
4402
 
 
4403
class _BreakinFeature(Feature):
 
4404
    """Does this platform support the breakin feature?"""
 
4405
 
 
4406
    def _probe(self):
 
4407
        from bzrlib import breakin
 
4408
        if breakin.determine_signal() is None:
 
4409
            return False
 
4410
        if sys.platform == 'win32':
 
4411
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4412
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4413
            # access the function
 
4414
            try:
 
4415
                import ctypes
 
4416
            except OSError:
 
4417
                return False
 
4418
        return True
 
4419
 
 
4420
    def feature_name(self):
 
4421
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4422
 
 
4423
 
 
4424
BreakinFeature = _BreakinFeature()
 
4425
 
 
4426
 
 
4427
class _CaseInsCasePresFilenameFeature(Feature):
 
4428
    """Is the file-system case insensitive, but case-preserving?"""
 
4429
 
 
4430
    def _probe(self):
 
4431
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4432
        try:
 
4433
            # first check truly case-preserving for created files, then check
 
4434
            # case insensitive when opening existing files.
 
4435
            name = osutils.normpath(name)
 
4436
            base, rel = osutils.split(name)
 
4437
            found_rel = osutils.canonical_relpath(base, name)
 
4438
            return (found_rel == rel
 
4439
                    and os.path.isfile(name.upper())
 
4440
                    and os.path.isfile(name.lower()))
 
4441
        finally:
 
4442
            os.close(fileno)
 
4443
            os.remove(name)
 
4444
 
 
4445
    def feature_name(self):
 
4446
        return "case-insensitive case-preserving filesystem"
 
4447
 
 
4448
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4449
 
 
4450
 
 
4451
class _CaseInsensitiveFilesystemFeature(Feature):
 
4452
    """Check if underlying filesystem is case-insensitive but *not* case
 
4453
    preserving.
 
4454
    """
 
4455
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4456
    # more likely to be case preserving, so this case is rare.
 
4457
 
 
4458
    def _probe(self):
 
4459
        if CaseInsCasePresFilenameFeature.available():
 
4460
            return False
 
4461
 
 
4462
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4463
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4464
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4465
        else:
 
4466
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4467
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4468
            dir=root)
 
4469
        name_a = osutils.pathjoin(tdir, 'a')
 
4470
        name_A = osutils.pathjoin(tdir, 'A')
 
4471
        os.mkdir(name_a)
 
4472
        result = osutils.isdir(name_A)
 
4473
        _rmtree_temp_dir(tdir)
 
4474
        return result
 
4475
 
 
4476
    def feature_name(self):
 
4477
        return 'case-insensitive filesystem'
 
4478
 
 
4479
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4480
 
 
4481
 
 
4482
class _CaseSensitiveFilesystemFeature(Feature):
 
4483
 
 
4484
    def _probe(self):
 
4485
        if CaseInsCasePresFilenameFeature.available():
 
4486
            return False
 
4487
        elif CaseInsensitiveFilesystemFeature.available():
 
4488
            return False
 
4489
        else:
 
4490
            return True
 
4491
 
 
4492
    def feature_name(self):
 
4493
        return 'case-sensitive filesystem'
 
4494
 
 
4495
# new coding style is for feature instances to be lowercase
 
4496
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
 
4497
 
 
4498
 
 
4499
# Kept for compatibility, use bzrlib.tests.features.subunit instead
 
4500
SubUnitFeature = _CompatabilityThunkFeature(
 
4501
    deprecated_in((2,1,0)),
 
4502
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
 
4503
# Only define SubUnitBzrRunner if subunit is available.
 
4504
try:
 
4505
    from subunit import TestProtocolClient
 
4506
    from subunit.test_results import AutoTimingTestResultDecorator
 
4507
    class SubUnitBzrProtocolClient(TestProtocolClient):
 
4508
 
 
4509
        def addSuccess(self, test, details=None):
 
4510
            # The subunit client always includes the details in the subunit
 
4511
            # stream, but we don't want to include it in ours.
 
4512
            if details is not None and 'log' in details:
 
4513
                del details['log']
 
4514
            return super(SubUnitBzrProtocolClient, self).addSuccess(
 
4515
                test, details)
 
4516
 
 
4517
    class SubUnitBzrRunner(TextTestRunner):
 
4518
        def run(self, test):
 
4519
            result = AutoTimingTestResultDecorator(
 
4520
                SubUnitBzrProtocolClient(self.stream))
 
4521
            test.run(result)
 
4522
            return result
 
4523
except ImportError:
 
4524
    pass
 
4525
 
 
4526
class _PosixPermissionsFeature(Feature):
 
4527
 
 
4528
    def _probe(self):
 
4529
        def has_perms():
 
4530
            # create temporary file and check if specified perms are maintained.
 
4531
            import tempfile
 
4532
 
 
4533
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
 
4534
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
 
4535
            fd, name = f
 
4536
            os.close(fd)
 
4537
            os.chmod(name, write_perms)
 
4538
 
 
4539
            read_perms = os.stat(name).st_mode & 0777
 
4540
            os.unlink(name)
 
4541
            return (write_perms == read_perms)
 
4542
 
 
4543
        return (os.name == 'posix') and has_perms()
 
4544
 
 
4545
    def feature_name(self):
 
4546
        return 'POSIX permissions support'
 
4547
 
 
4548
posix_permissions_feature = _PosixPermissionsFeature()