~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-13 16:23:07 UTC
  • mto: (4634.119.7 2.0)
  • mto: This revision was merged to the branch mainline in revision 4959.
  • Revision ID: john@arbash-meinel.com-20100113162307-0bs82td16gzih827
Update the MANIFEST.in file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
from unittest import TestResult, TestCase
19
 
 
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import math
 
37
import os
 
38
from pprint import pformat
 
39
import random
 
40
import re
 
41
import shlex
 
42
import stat
 
43
from subprocess import Popen, PIPE, STDOUT
 
44
import sys
 
45
import tempfile
 
46
import threading
 
47
import time
 
48
import unittest
 
49
import warnings
 
50
 
 
51
 
 
52
from bzrlib import (
 
53
    branchbuilder,
 
54
    bzrdir,
 
55
    chk_map,
 
56
    debug,
 
57
    errors,
 
58
    hooks,
 
59
    lock as _mod_lock,
 
60
    memorytree,
 
61
    osutils,
 
62
    progress,
 
63
    ui,
 
64
    urlutils,
 
65
    registry,
 
66
    workingtree,
 
67
    )
 
68
import bzrlib.branch
 
69
import bzrlib.commands
 
70
import bzrlib.timestamp
 
71
import bzrlib.export
 
72
import bzrlib.inventory
 
73
import bzrlib.iterablefile
 
74
import bzrlib.lockdir
20
75
try:
21
 
    import shutil
22
 
    from subprocess import call, Popen, PIPE
23
 
except ImportError, e:
24
 
    sys.stderr.write("testbzr: sorry, this test suite requires the subprocess module\n"
25
 
                     "this is shipped with python2.4 and available separately for 2.3\n")
26
 
    raise
 
76
    import bzrlib.lsprof
 
77
except ImportError:
 
78
    # lsprof not available
 
79
    pass
 
80
from bzrlib.merge import merge_inner
 
81
import bzrlib.merge3
 
82
import bzrlib.plugin
 
83
from bzrlib.smart import client, request, server
 
84
import bzrlib.store
 
85
from bzrlib import symbol_versioning
 
86
from bzrlib.symbol_versioning import (
 
87
    DEPRECATED_PARAMETER,
 
88
    deprecated_function,
 
89
    deprecated_method,
 
90
    deprecated_passed,
 
91
    )
 
92
import bzrlib.trace
 
93
from bzrlib.transport import get_transport
 
94
import bzrlib.transport
 
95
from bzrlib.transport.local import LocalURLServer
 
96
from bzrlib.transport.memory import MemoryServer
 
97
from bzrlib.transport.readonly import ReadonlyServer
 
98
from bzrlib.trace import mutter, note
 
99
from bzrlib.tests import TestUtil
 
100
from bzrlib.tests.http_server import HttpServer
 
101
from bzrlib.tests.TestUtil import (
 
102
                          TestSuite,
 
103
                          TestLoader,
 
104
                          )
 
105
from bzrlib.tests.treeshape import build_tree_contents
 
106
from bzrlib.ui import NullProgressView
 
107
from bzrlib.ui.text import TextUIFactory
 
108
import bzrlib.version_info_formats.format_custom
 
109
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
110
 
 
111
# Mark this python module as being part of the implementation
 
112
# of unittest: this gives us better tracebacks where the last
 
113
# shown frame is the test code, not our assertXYZ.
 
114
__unittest = 1
 
115
 
 
116
default_transport = LocalURLServer
 
117
 
 
118
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
119
SUBUNIT_SEEK_SET = 0
 
120
SUBUNIT_SEEK_CUR = 1
 
121
 
 
122
 
 
123
class ExtendedTestResult(unittest._TextTestResult):
 
124
    """Accepts, reports and accumulates the results of running tests.
 
125
 
 
126
    Compared to the unittest version this class adds support for
 
127
    profiling, benchmarking, stopping as soon as a test fails,  and
 
128
    skipping tests.  There are further-specialized subclasses for
 
129
    different types of display.
 
130
 
 
131
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
132
    addFailure or addError classes.  These in turn may redirect to a more
 
133
    specific case for the special test results supported by our extended
 
134
    tests.
 
135
 
 
136
    Note that just one of these objects is fed the results from many tests.
 
137
    """
 
138
 
 
139
    stop_early = False
 
140
 
 
141
    def __init__(self, stream, descriptions, verbosity,
 
142
                 bench_history=None,
 
143
                 strict=False,
 
144
                 ):
 
145
        """Construct new TestResult.
 
146
 
 
147
        :param bench_history: Optionally, a writable file object to accumulate
 
148
            benchmark results.
 
149
        """
 
150
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
151
        if bench_history is not None:
 
152
            from bzrlib.version import _get_bzr_source_tree
 
153
            src_tree = _get_bzr_source_tree()
 
154
            if src_tree:
 
155
                try:
 
156
                    revision_id = src_tree.get_parent_ids()[0]
 
157
                except IndexError:
 
158
                    # XXX: if this is a brand new tree, do the same as if there
 
159
                    # is no branch.
 
160
                    revision_id = ''
 
161
            else:
 
162
                # XXX: If there's no branch, what should we do?
 
163
                revision_id = ''
 
164
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
165
        self._bench_history = bench_history
 
166
        self.ui = ui.ui_factory
 
167
        self.num_tests = 0
 
168
        self.error_count = 0
 
169
        self.failure_count = 0
 
170
        self.known_failure_count = 0
 
171
        self.skip_count = 0
 
172
        self.not_applicable_count = 0
 
173
        self.unsupported = {}
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
 
176
        self._strict = strict
 
177
 
 
178
    def done(self):
 
179
        # nb: called stopTestRun in the version of this that Python merged
 
180
        # upstream, according to lifeless 20090803
 
181
        if self._strict:
 
182
            ok = self.wasStrictlySuccessful()
 
183
        else:
 
184
            ok = self.wasSuccessful()
 
185
        if ok:
 
186
            self.stream.write('tests passed\n')
 
187
        else:
 
188
            self.stream.write('tests failed\n')
 
189
        if TestCase._first_thread_leaker_id:
 
190
            self.stream.write(
 
191
                '%s is leaking threads among %d leaking tests.\n' % (
 
192
                TestCase._first_thread_leaker_id,
 
193
                TestCase._leaking_threads_tests))
 
194
 
 
195
    def _extractBenchmarkTime(self, testCase):
 
196
        """Add a benchmark time for the current test case."""
 
197
        return getattr(testCase, "_benchtime", None)
 
198
 
 
199
    def _elapsedTestTimeString(self):
 
200
        """Return a time string for the overall time the current test has taken."""
 
201
        return self._formatTime(time.time() - self._start_time)
 
202
 
 
203
    def _testTimeString(self, testCase):
 
204
        benchmark_time = self._extractBenchmarkTime(testCase)
 
205
        if benchmark_time is not None:
 
206
            return self._formatTime(benchmark_time) + "*"
 
207
        else:
 
208
            return self._elapsedTestTimeString()
 
209
 
 
210
    def _formatTime(self, seconds):
 
211
        """Format seconds as milliseconds with leading spaces."""
 
212
        # some benchmarks can take thousands of seconds to run, so we need 8
 
213
        # places
 
214
        return "%8dms" % (1000 * seconds)
 
215
 
 
216
    def _shortened_test_description(self, test):
 
217
        what = test.id()
 
218
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
219
        return what
 
220
 
 
221
    def startTest(self, test):
 
222
        unittest.TestResult.startTest(self, test)
 
223
        if self.count == 0:
 
224
            self.startTests()
 
225
        self.report_test_start(test)
 
226
        test.number = self.count
 
227
        self._recordTestStartTime()
 
228
 
 
229
    def startTests(self):
 
230
        import platform
 
231
        if getattr(sys, 'frozen', None) is None:
 
232
            bzr_path = osutils.realpath(sys.argv[0])
 
233
        else:
 
234
            bzr_path = sys.executable
 
235
        self.stream.write(
 
236
            'testing: %s\n' % (bzr_path,))
 
237
        self.stream.write(
 
238
            '   %s\n' % (
 
239
                    bzrlib.__path__[0],))
 
240
        self.stream.write(
 
241
            '   bzr-%s python-%s %s\n' % (
 
242
                    bzrlib.version_string,
 
243
                    bzrlib._format_version_tuple(sys.version_info),
 
244
                    platform.platform(aliased=1),
 
245
                    ))
 
246
        self.stream.write('\n')
 
247
 
 
248
    def _recordTestStartTime(self):
 
249
        """Record that a test has started."""
 
250
        self._start_time = time.time()
 
251
 
 
252
    def _cleanupLogFile(self, test):
 
253
        # We can only do this if we have one of our TestCases, not if
 
254
        # we have a doctest.
 
255
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
256
        if setKeepLogfile is not None:
 
257
            setKeepLogfile()
 
258
 
 
259
    def addError(self, test, err):
 
260
        """Tell result that test finished with an error.
 
261
 
 
262
        Called from the TestCase run() method when the test
 
263
        fails with an unexpected error.
 
264
        """
 
265
        self._testConcluded(test)
 
266
        if isinstance(err[1], TestNotApplicable):
 
267
            return self._addNotApplicable(test, err)
 
268
        elif isinstance(err[1], UnavailableFeature):
 
269
            return self.addNotSupported(test, err[1].args[0])
 
270
        else:
 
271
            unittest.TestResult.addError(self, test, err)
 
272
            self.error_count += 1
 
273
            self.report_error(test, err)
 
274
            if self.stop_early:
 
275
                self.stop()
 
276
            self._cleanupLogFile(test)
 
277
 
 
278
    def addFailure(self, test, err):
 
279
        """Tell result that test failed.
 
280
 
 
281
        Called from the TestCase run() method when the test
 
282
        fails because e.g. an assert() method failed.
 
283
        """
 
284
        self._testConcluded(test)
 
285
        if isinstance(err[1], KnownFailure):
 
286
            return self._addKnownFailure(test, err)
 
287
        else:
 
288
            unittest.TestResult.addFailure(self, test, err)
 
289
            self.failure_count += 1
 
290
            self.report_failure(test, err)
 
291
            if self.stop_early:
 
292
                self.stop()
 
293
            self._cleanupLogFile(test)
 
294
 
 
295
    def addSuccess(self, test):
 
296
        """Tell result that test completed successfully.
 
297
 
 
298
        Called from the TestCase run()
 
299
        """
 
300
        self._testConcluded(test)
 
301
        if self._bench_history is not None:
 
302
            benchmark_time = self._extractBenchmarkTime(test)
 
303
            if benchmark_time is not None:
 
304
                self._bench_history.write("%s %s\n" % (
 
305
                    self._formatTime(benchmark_time),
 
306
                    test.id()))
 
307
        self.report_success(test)
 
308
        self._cleanupLogFile(test)
 
309
        unittest.TestResult.addSuccess(self, test)
 
310
        test._log_contents = ''
 
311
 
 
312
    def _testConcluded(self, test):
 
313
        """Common code when a test has finished.
 
314
 
 
315
        Called regardless of whether it succeded, failed, etc.
 
316
        """
 
317
        pass
 
318
 
 
319
    def _addKnownFailure(self, test, err):
 
320
        self.known_failure_count += 1
 
321
        self.report_known_failure(test, err)
 
322
 
 
323
    def addNotSupported(self, test, feature):
 
324
        """The test will not be run because of a missing feature.
 
325
        """
 
326
        # this can be called in two different ways: it may be that the
 
327
        # test started running, and then raised (through addError)
 
328
        # UnavailableFeature.  Alternatively this method can be called
 
329
        # while probing for features before running the tests; in that
 
330
        # case we will see startTest and stopTest, but the test will never
 
331
        # actually run.
 
332
        self.unsupported.setdefault(str(feature), 0)
 
333
        self.unsupported[str(feature)] += 1
 
334
        self.report_unsupported(test, feature)
 
335
 
 
336
    def addSkip(self, test, reason):
 
337
        """A test has not run for 'reason'."""
 
338
        self.skip_count += 1
 
339
        self.report_skip(test, reason)
 
340
 
 
341
    def _addNotApplicable(self, test, skip_excinfo):
 
342
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
343
            self.not_applicable_count += 1
 
344
            self.report_not_applicable(test, skip_excinfo)
 
345
        try:
 
346
            test.tearDown()
 
347
        except KeyboardInterrupt:
 
348
            raise
 
349
        except:
 
350
            self.addError(test, test.exc_info())
 
351
        else:
 
352
            # seems best to treat this as success from point-of-view of unittest
 
353
            # -- it actually does nothing so it barely matters :)
 
354
            unittest.TestResult.addSuccess(self, test)
 
355
            test._log_contents = ''
 
356
 
 
357
    def printErrorList(self, flavour, errors):
 
358
        for test, err in errors:
 
359
            self.stream.writeln(self.separator1)
 
360
            self.stream.write("%s: " % flavour)
 
361
            self.stream.writeln(self.getDescription(test))
 
362
            if getattr(test, '_get_log', None) is not None:
 
363
                log_contents = test._get_log()
 
364
                if log_contents:
 
365
                    self.stream.write('\n')
 
366
                    self.stream.write(
 
367
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
368
                    self.stream.write('\n')
 
369
                    self.stream.write(log_contents)
 
370
                    self.stream.write('\n')
 
371
                    self.stream.write(
 
372
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
373
                    self.stream.write('\n')
 
374
            self.stream.writeln(self.separator2)
 
375
            self.stream.writeln("%s" % err)
 
376
 
 
377
    def progress(self, offset, whence):
 
378
        """The test is adjusting the count of tests to run."""
 
379
        if whence == SUBUNIT_SEEK_SET:
 
380
            self.num_tests = offset
 
381
        elif whence == SUBUNIT_SEEK_CUR:
 
382
            self.num_tests += offset
 
383
        else:
 
384
            raise errors.BzrError("Unknown whence %r" % whence)
 
385
 
 
386
    def finished(self):
 
387
        pass
 
388
 
 
389
    def report_cleaning_up(self):
 
390
        pass
 
391
 
 
392
    def report_success(self, test):
 
393
        pass
 
394
 
 
395
    def wasStrictlySuccessful(self):
 
396
        if self.unsupported or self.known_failure_count:
 
397
            return False
 
398
        return self.wasSuccessful()
 
399
 
 
400
 
 
401
class TextTestResult(ExtendedTestResult):
 
402
    """Displays progress and results of tests in text form"""
 
403
 
 
404
    def __init__(self, stream, descriptions, verbosity,
 
405
                 bench_history=None,
 
406
                 pb=None,
 
407
                 strict=None,
 
408
                 ):
 
409
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
410
            bench_history, strict)
 
411
        # We no longer pass them around, but just rely on the UIFactory stack
 
412
        # for state
 
413
        if pb is not None:
 
414
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
415
        self.pb = self.ui.nested_progress_bar()
 
416
        self.pb.show_pct = False
 
417
        self.pb.show_spinner = False
 
418
        self.pb.show_eta = False,
 
419
        self.pb.show_count = False
 
420
        self.pb.show_bar = False
 
421
        self.pb.update_latency = 0
 
422
        self.pb.show_transport_activity = False
 
423
 
 
424
    def done(self):
 
425
        # called when the tests that are going to run have run
 
426
        self.pb.clear()
 
427
        super(TextTestResult, self).done()
 
428
 
 
429
    def finished(self):
 
430
        self.pb.finished()
 
431
 
 
432
    def report_starting(self):
 
433
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
434
 
 
435
    def printErrors(self):
 
436
        # clear the pb to make room for the error listing
 
437
        self.pb.clear()
 
438
        super(TextTestResult, self).printErrors()
 
439
 
 
440
    def _progress_prefix_text(self):
 
441
        # the longer this text, the less space we have to show the test
 
442
        # name...
 
443
        a = '[%d' % self.count              # total that have been run
 
444
        # tests skipped as known not to be relevant are not important enough
 
445
        # to show here
 
446
        ## if self.skip_count:
 
447
        ##     a += ', %d skip' % self.skip_count
 
448
        ## if self.known_failure_count:
 
449
        ##     a += '+%dX' % self.known_failure_count
 
450
        if self.num_tests:
 
451
            a +='/%d' % self.num_tests
 
452
        a += ' in '
 
453
        runtime = time.time() - self._overall_start_time
 
454
        if runtime >= 60:
 
455
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
456
        else:
 
457
            a += '%ds' % runtime
 
458
        if self.error_count:
 
459
            a += ', %d err' % self.error_count
 
460
        if self.failure_count:
 
461
            a += ', %d fail' % self.failure_count
 
462
        if self.unsupported:
 
463
            a += ', %d missing' % len(self.unsupported)
 
464
        a += ']'
 
465
        return a
 
466
 
 
467
    def report_test_start(self, test):
 
468
        self.count += 1
 
469
        self.pb.update(
 
470
                self._progress_prefix_text()
 
471
                + ' '
 
472
                + self._shortened_test_description(test))
 
473
 
 
474
    def _test_description(self, test):
 
475
        return self._shortened_test_description(test)
 
476
 
 
477
    def report_error(self, test, err):
 
478
        self.pb.note('ERROR: %s\n    %s\n',
 
479
            self._test_description(test),
 
480
            err[1],
 
481
            )
 
482
 
 
483
    def report_failure(self, test, err):
 
484
        self.pb.note('FAIL: %s\n    %s\n',
 
485
            self._test_description(test),
 
486
            err[1],
 
487
            )
 
488
 
 
489
    def report_known_failure(self, test, err):
 
490
        self.pb.note('XFAIL: %s\n%s\n',
 
491
            self._test_description(test), err[1])
 
492
 
 
493
    def report_skip(self, test, reason):
 
494
        pass
 
495
 
 
496
    def report_not_applicable(self, test, skip_excinfo):
 
497
        pass
 
498
 
 
499
    def report_unsupported(self, test, feature):
 
500
        """test cannot be run because feature is missing."""
 
501
 
 
502
    def report_cleaning_up(self):
 
503
        self.pb.update('Cleaning up')
 
504
 
 
505
 
 
506
class VerboseTestResult(ExtendedTestResult):
 
507
    """Produce long output, with one line per test run plus times"""
 
508
 
 
509
    def _ellipsize_to_right(self, a_string, final_width):
 
510
        """Truncate and pad a string, keeping the right hand side"""
 
511
        if len(a_string) > final_width:
 
512
            result = '...' + a_string[3-final_width:]
 
513
        else:
 
514
            result = a_string
 
515
        return result.ljust(final_width)
 
516
 
 
517
    def report_starting(self):
 
518
        self.stream.write('running %d tests...\n' % self.num_tests)
 
519
 
 
520
    def report_test_start(self, test):
 
521
        self.count += 1
 
522
        name = self._shortened_test_description(test)
 
523
        # width needs space for 6 char status, plus 1 for slash, plus an
 
524
        # 11-char time string, plus a trailing blank
 
525
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
526
        self.stream.write(self._ellipsize_to_right(name,
 
527
                          osutils.terminal_width()-18))
 
528
        self.stream.flush()
 
529
 
 
530
    def _error_summary(self, err):
 
531
        indent = ' ' * 4
 
532
        return '%s%s' % (indent, err[1])
 
533
 
 
534
    def report_error(self, test, err):
 
535
        self.stream.writeln('ERROR %s\n%s'
 
536
                % (self._testTimeString(test),
 
537
                   self._error_summary(err)))
 
538
 
 
539
    def report_failure(self, test, err):
 
540
        self.stream.writeln(' FAIL %s\n%s'
 
541
                % (self._testTimeString(test),
 
542
                   self._error_summary(err)))
 
543
 
 
544
    def report_known_failure(self, test, err):
 
545
        self.stream.writeln('XFAIL %s\n%s'
 
546
                % (self._testTimeString(test),
 
547
                   self._error_summary(err)))
 
548
 
 
549
    def report_success(self, test):
 
550
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
551
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
552
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
553
            stats.pprint(file=self.stream)
 
554
        # flush the stream so that we get smooth output. This verbose mode is
 
555
        # used to show the output in PQM.
 
556
        self.stream.flush()
 
557
 
 
558
    def report_skip(self, test, reason):
 
559
        self.stream.writeln(' SKIP %s\n%s'
 
560
                % (self._testTimeString(test), reason))
 
561
 
 
562
    def report_not_applicable(self, test, skip_excinfo):
 
563
        self.stream.writeln('  N/A %s\n%s'
 
564
                % (self._testTimeString(test),
 
565
                   self._error_summary(skip_excinfo)))
 
566
 
 
567
    def report_unsupported(self, test, feature):
 
568
        """test cannot be run because feature is missing."""
 
569
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
570
                %(self._testTimeString(test), feature))
 
571
 
 
572
 
 
573
class TextTestRunner(object):
 
574
    stop_on_failure = False
 
575
 
 
576
    def __init__(self,
 
577
                 stream=sys.stderr,
 
578
                 descriptions=0,
 
579
                 verbosity=1,
 
580
                 bench_history=None,
 
581
                 list_only=False,
 
582
                 strict=False,
 
583
                 ):
 
584
        self.stream = unittest._WritelnDecorator(stream)
 
585
        self.descriptions = descriptions
 
586
        self.verbosity = verbosity
 
587
        self._bench_history = bench_history
 
588
        self.list_only = list_only
 
589
        self._strict = strict
 
590
 
 
591
    def run(self, test):
 
592
        "Run the given test case or test suite."
 
593
        startTime = time.time()
 
594
        if self.verbosity == 1:
 
595
            result_class = TextTestResult
 
596
        elif self.verbosity >= 2:
 
597
            result_class = VerboseTestResult
 
598
        result = result_class(self.stream,
 
599
                              self.descriptions,
 
600
                              self.verbosity,
 
601
                              bench_history=self._bench_history,
 
602
                              strict=self._strict,
 
603
                              )
 
604
        result.stop_early = self.stop_on_failure
 
605
        result.report_starting()
 
606
        if self.list_only:
 
607
            if self.verbosity >= 2:
 
608
                self.stream.writeln("Listing tests only ...\n")
 
609
            run = 0
 
610
            for t in iter_suite_tests(test):
 
611
                self.stream.writeln("%s" % (t.id()))
 
612
                run += 1
 
613
            return None
 
614
        else:
 
615
            try:
 
616
                import testtools
 
617
            except ImportError:
 
618
                test.run(result)
 
619
            else:
 
620
                if isinstance(test, testtools.ConcurrentTestSuite):
 
621
                    # We need to catch bzr specific behaviors
 
622
                    test.run(BZRTransformingResult(result))
 
623
                else:
 
624
                    test.run(result)
 
625
            run = result.testsRun
 
626
            actionTaken = "Ran"
 
627
        stopTime = time.time()
 
628
        timeTaken = stopTime - startTime
 
629
        result.printErrors()
 
630
        self.stream.writeln(result.separator2)
 
631
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
632
                            run, run != 1 and "s" or "", timeTaken))
 
633
        self.stream.writeln()
 
634
        if not result.wasSuccessful():
 
635
            self.stream.write("FAILED (")
 
636
            failed, errored = map(len, (result.failures, result.errors))
 
637
            if failed:
 
638
                self.stream.write("failures=%d" % failed)
 
639
            if errored:
 
640
                if failed: self.stream.write(", ")
 
641
                self.stream.write("errors=%d" % errored)
 
642
            if result.known_failure_count:
 
643
                if failed or errored: self.stream.write(", ")
 
644
                self.stream.write("known_failure_count=%d" %
 
645
                    result.known_failure_count)
 
646
            self.stream.writeln(")")
 
647
        else:
 
648
            if result.known_failure_count:
 
649
                self.stream.writeln("OK (known_failures=%d)" %
 
650
                    result.known_failure_count)
 
651
            else:
 
652
                self.stream.writeln("OK")
 
653
        if result.skip_count > 0:
 
654
            skipped = result.skip_count
 
655
            self.stream.writeln('%d test%s skipped' %
 
656
                                (skipped, skipped != 1 and "s" or ""))
 
657
        if result.unsupported:
 
658
            for feature, count in sorted(result.unsupported.items()):
 
659
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
660
                    (feature, count))
 
661
        result.finished()
 
662
        return result
 
663
 
 
664
 
 
665
def iter_suite_tests(suite):
 
666
    """Return all tests in a suite, recursing through nested suites"""
 
667
    if isinstance(suite, unittest.TestCase):
 
668
        yield suite
 
669
    elif isinstance(suite, unittest.TestSuite):
 
670
        for item in suite:
 
671
            for r in iter_suite_tests(item):
 
672
                yield r
 
673
    else:
 
674
        raise Exception('unknown type %r for object %r'
 
675
                        % (type(suite), suite))
 
676
 
 
677
 
 
678
class TestSkipped(Exception):
 
679
    """Indicates that a test was intentionally skipped, rather than failing."""
 
680
 
 
681
 
 
682
class TestNotApplicable(TestSkipped):
 
683
    """A test is not applicable to the situation where it was run.
 
684
 
 
685
    This is only normally raised by parameterized tests, if they find that
 
686
    the instance they're constructed upon does not support one aspect
 
687
    of its interface.
 
688
    """
 
689
 
 
690
 
 
691
class KnownFailure(AssertionError):
 
692
    """Indicates that a test failed in a precisely expected manner.
 
693
 
 
694
    Such failures dont block the whole test suite from passing because they are
 
695
    indicators of partially completed code or of future work. We have an
 
696
    explicit error for them so that we can ensure that they are always visible:
 
697
    KnownFailures are always shown in the output of bzr selftest.
 
698
    """
 
699
 
 
700
 
 
701
class UnavailableFeature(Exception):
 
702
    """A feature required for this test was not available.
 
703
 
 
704
    The feature should be used to construct the exception.
 
705
    """
27
706
 
28
707
 
29
708
class CommandFailed(Exception):
30
709
    pass
31
710
 
32
711
 
33
 
class TestBase(TestCase):
34
 
    """Base class for bzr test cases.
35
 
 
36
 
    Just defines some useful helper functions; doesn't actually test
37
 
    anything.
38
 
    """
39
 
    
40
 
    # TODO: Special methods to invoke bzr, so that we can run it
41
 
    # through a specified Python intepreter
42
 
 
43
 
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
44
 
    BZRPATH = 'bzr'
45
 
 
46
 
    _log_buf = ""
47
 
 
48
 
 
49
 
    def formcmd(self, cmd):
50
 
        if isinstance(cmd, basestring):
51
 
            cmd = cmd.split()
52
 
 
53
 
        if cmd[0] == 'bzr':
54
 
            cmd[0] = self.BZRPATH
55
 
            if self.OVERRIDE_PYTHON:
56
 
                cmd.insert(0, self.OVERRIDE_PYTHON)
57
 
 
58
 
        self.log('$ %r' % cmd)
59
 
 
60
 
        return cmd
61
 
 
62
 
 
63
 
    def runcmd(self, cmd, retcode=0):
64
 
        """Run one command and check the return code.
65
 
 
66
 
        Returns a tuple of (stdout,stderr) strings.
67
 
 
68
 
        If a single string is based, it is split into words.
69
 
        For commands that are not simple space-separated words, please
70
 
        pass a list instead."""
71
 
        cmd = self.formcmd(cmd)
72
 
 
73
 
        self.log('$ ' + ' '.join(cmd))
74
 
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
75
 
 
76
 
        if retcode != actual_retcode:
77
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
78
 
                                % (cmd, actual_retcode, retcode))
79
 
 
80
 
 
81
 
    def backtick(self, cmd, retcode=0):
82
 
        """Run a command and return its output"""
83
 
        cmd = self.formcmd(cmd)
84
 
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
85
 
        outd, errd = child.communicate()
86
 
        self.log(outd)
87
 
        actual_retcode = child.wait()
88
 
 
89
 
        outd = outd.replace('\r', '')
90
 
 
91
 
        if retcode != actual_retcode:
92
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
93
 
                                % (cmd, actual_retcode, retcode))
94
 
 
95
 
        return outd
96
 
 
97
 
 
98
 
 
99
 
    def build_tree(self, shape):
100
 
        """Build a test tree according to a pattern.
101
 
 
102
 
        shape is a sequence of file specifications.  If the final
103
 
        character is '/', a directory is created.
104
 
 
105
 
        This doesn't add anything to a branch.
106
 
        """
107
 
        # XXX: It's OK to just create them using forward slashes on windows?
108
 
        import os
109
 
        for name in shape:
110
 
            assert isinstance(name, basestring)
111
 
            if name[-1] == '/':
112
 
                os.mkdir(name[:-1])
113
 
            else:
114
 
                f = file(name, 'wt')
115
 
                print >>f, "contents of", name
116
 
                f.close()
117
 
 
118
 
 
119
 
    def log(self, msg):
120
 
        """Log a message to a progress file"""
121
 
        self._log_buf = self._log_buf + str(msg) + '\n'
122
 
        print >>self.TEST_LOG, msg
123
 
 
 
712
class StringIOWrapper(object):
 
713
    """A wrapper around cStringIO which just adds an encoding attribute.
 
714
 
 
715
    Internally we can check sys.stdout to see what the output encoding
 
716
    should be. However, cStringIO has no encoding attribute that we can
 
717
    set. So we wrap it instead.
 
718
    """
 
719
    encoding='ascii'
 
720
    _cstring = None
 
721
 
 
722
    def __init__(self, s=None):
 
723
        if s is not None:
 
724
            self.__dict__['_cstring'] = StringIO(s)
 
725
        else:
 
726
            self.__dict__['_cstring'] = StringIO()
 
727
 
 
728
    def __getattr__(self, name, getattr=getattr):
 
729
        return getattr(self.__dict__['_cstring'], name)
 
730
 
 
731
    def __setattr__(self, name, val):
 
732
        if name == 'encoding':
 
733
            self.__dict__['encoding'] = val
 
734
        else:
 
735
            return setattr(self._cstring, name, val)
 
736
 
 
737
 
 
738
class TestUIFactory(TextUIFactory):
 
739
    """A UI Factory for testing.
 
740
 
 
741
    Hide the progress bar but emit note()s.
 
742
    Redirect stdin.
 
743
    Allows get_password to be tested without real tty attached.
 
744
 
 
745
    See also CannedInputUIFactory which lets you provide programmatic input in
 
746
    a structured way.
 
747
    """
 
748
    # TODO: Capture progress events at the model level and allow them to be
 
749
    # observed by tests that care.
 
750
    #
 
751
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
752
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
753
    # idea of how they're supposed to be different.
 
754
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
755
 
 
756
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
757
        if stdin is not None:
 
758
            # We use a StringIOWrapper to be able to test various
 
759
            # encodings, but the user is still responsible to
 
760
            # encode the string and to set the encoding attribute
 
761
            # of StringIOWrapper.
 
762
            stdin = StringIOWrapper(stdin)
 
763
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
764
 
 
765
    def get_non_echoed_password(self):
 
766
        """Get password from stdin without trying to handle the echo mode"""
 
767
        password = self.stdin.readline()
 
768
        if not password:
 
769
            raise EOFError
 
770
        if password[-1] == '\n':
 
771
            password = password[:-1]
 
772
        return password
 
773
 
 
774
    def make_progress_view(self):
 
775
        return NullProgressView()
 
776
 
 
777
 
 
778
class TestCase(unittest.TestCase):
 
779
    """Base class for bzr unit tests.
 
780
 
 
781
    Tests that need access to disk resources should subclass
 
782
    TestCaseInTempDir not TestCase.
 
783
 
 
784
    Error and debug log messages are redirected from their usual
 
785
    location into a temporary file, the contents of which can be
 
786
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
787
    so that it can also capture file IO.  When the test completes this file
 
788
    is read into memory and removed from disk.
 
789
 
 
790
    There are also convenience functions to invoke bzr's command-line
 
791
    routine, and to build and check bzr trees.
 
792
 
 
793
    In addition to the usual method of overriding tearDown(), this class also
 
794
    allows subclasses to register functions into the _cleanups list, which is
 
795
    run in order as the object is torn down.  It's less likely this will be
 
796
    accidentally overlooked.
 
797
    """
 
798
 
 
799
    _active_threads = None
 
800
    _leaking_threads_tests = 0
 
801
    _first_thread_leaker_id = None
 
802
    _log_file_name = None
 
803
    _log_contents = ''
 
804
    _keep_log_file = False
 
805
    # record lsprof data when performing benchmark calls.
 
806
    _gather_lsprof_in_benchmarks = False
 
807
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
808
                     '_log_contents', '_log_file_name', '_benchtime',
 
809
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
810
 
 
811
    def __init__(self, methodName='testMethod'):
 
812
        super(TestCase, self).__init__(methodName)
 
813
        self._cleanups = []
 
814
        self._bzr_test_setUp_run = False
 
815
        self._bzr_test_tearDown_run = False
 
816
 
 
817
    def setUp(self):
 
818
        unittest.TestCase.setUp(self)
 
819
        self._bzr_test_setUp_run = True
 
820
        self._cleanEnvironment()
 
821
        self._silenceUI()
 
822
        self._startLogFile()
 
823
        self._benchcalls = []
 
824
        self._benchtime = None
 
825
        self._clear_hooks()
 
826
        self._track_locks()
 
827
        self._clear_debug_flags()
 
828
        TestCase._active_threads = threading.activeCount()
 
829
        self.addCleanup(self._check_leaked_threads)
 
830
 
 
831
    def debug(self):
 
832
        # debug a frame up.
 
833
        import pdb
 
834
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
835
 
 
836
    def _check_leaked_threads(self):
 
837
        active = threading.activeCount()
 
838
        leaked_threads = active - TestCase._active_threads
 
839
        TestCase._active_threads = active
 
840
        if leaked_threads:
 
841
            TestCase._leaking_threads_tests += 1
 
842
            if TestCase._first_thread_leaker_id is None:
 
843
                TestCase._first_thread_leaker_id = self.id()
 
844
 
 
845
    def _clear_debug_flags(self):
 
846
        """Prevent externally set debug flags affecting tests.
 
847
 
 
848
        Tests that want to use debug flags can just set them in the
 
849
        debug_flags set during setup/teardown.
 
850
        """
 
851
        self._preserved_debug_flags = set(debug.debug_flags)
 
852
        if 'allow_debug' not in selftest_debug_flags:
 
853
            debug.debug_flags.clear()
 
854
        if 'disable_lock_checks' not in selftest_debug_flags:
 
855
            debug.debug_flags.add('strict_locks')
 
856
        self.addCleanup(self._restore_debug_flags)
 
857
 
 
858
    def _clear_hooks(self):
 
859
        # prevent hooks affecting tests
 
860
        self._preserved_hooks = {}
 
861
        for key, factory in hooks.known_hooks.items():
 
862
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
863
            current_hooks = hooks.known_hooks_key_to_object(key)
 
864
            self._preserved_hooks[parent] = (name, current_hooks)
 
865
        self.addCleanup(self._restoreHooks)
 
866
        for key, factory in hooks.known_hooks.items():
 
867
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
868
            setattr(parent, name, factory())
 
869
        # this hook should always be installed
 
870
        request._install_hook()
 
871
 
 
872
    def _silenceUI(self):
 
873
        """Turn off UI for duration of test"""
 
874
        # by default the UI is off; tests can turn it on if they want it.
 
875
        saved = ui.ui_factory
 
876
        def _restore():
 
877
            ui.ui_factory = saved
 
878
        ui.ui_factory = ui.SilentUIFactory()
 
879
        self.addCleanup(_restore)
 
880
 
 
881
    def _check_locks(self):
 
882
        """Check that all lock take/release actions have been paired."""
 
883
        # We always check for mismatched locks. If a mismatch is found, we
 
884
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
885
        # case we just print a warning.
 
886
        # unhook:
 
887
        acquired_locks = [lock for action, lock in self._lock_actions
 
888
                          if action == 'acquired']
 
889
        released_locks = [lock for action, lock in self._lock_actions
 
890
                          if action == 'released']
 
891
        broken_locks = [lock for action, lock in self._lock_actions
 
892
                        if action == 'broken']
 
893
        # trivially, given the tests for lock acquistion and release, if we
 
894
        # have as many in each list, it should be ok. Some lock tests also
 
895
        # break some locks on purpose and should be taken into account by
 
896
        # considering that breaking a lock is just a dirty way of releasing it.
 
897
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
898
            message = ('Different number of acquired and '
 
899
                       'released or broken locks. (%s, %s + %s)' %
 
900
                       (acquired_locks, released_locks, broken_locks))
 
901
            if not self._lock_check_thorough:
 
902
                # Rather than fail, just warn
 
903
                print "Broken test %s: %s" % (self, message)
 
904
                return
 
905
            self.fail(message)
 
906
 
 
907
    def _track_locks(self):
 
908
        """Track lock activity during tests."""
 
909
        self._lock_actions = []
 
910
        if 'disable_lock_checks' in selftest_debug_flags:
 
911
            self._lock_check_thorough = False
 
912
        else:
 
913
            self._lock_check_thorough = True
 
914
            
 
915
        self.addCleanup(self._check_locks)
 
916
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
917
                                                self._lock_acquired, None)
 
918
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
919
                                                self._lock_released, None)
 
920
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
921
                                                self._lock_broken, None)
 
922
 
 
923
    def _lock_acquired(self, result):
 
924
        self._lock_actions.append(('acquired', result))
 
925
 
 
926
    def _lock_released(self, result):
 
927
        self._lock_actions.append(('released', result))
 
928
 
 
929
    def _lock_broken(self, result):
 
930
        self._lock_actions.append(('broken', result))
 
931
 
 
932
    def _ndiff_strings(self, a, b):
 
933
        """Return ndiff between two strings containing lines.
 
934
 
 
935
        A trailing newline is added if missing to make the strings
 
936
        print properly."""
 
937
        if b and b[-1] != '\n':
 
938
            b += '\n'
 
939
        if a and a[-1] != '\n':
 
940
            a += '\n'
 
941
        difflines = difflib.ndiff(a.splitlines(True),
 
942
                                  b.splitlines(True),
 
943
                                  linejunk=lambda x: False,
 
944
                                  charjunk=lambda x: False)
 
945
        return ''.join(difflines)
 
946
 
 
947
    def assertEqual(self, a, b, message=''):
 
948
        try:
 
949
            if a == b:
 
950
                return
 
951
        except UnicodeError, e:
 
952
            # If we can't compare without getting a UnicodeError, then
 
953
            # obviously they are different
 
954
            mutter('UnicodeError: %s', e)
 
955
        if message:
 
956
            message += '\n'
 
957
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
958
            % (message,
 
959
               pformat(a), pformat(b)))
 
960
 
 
961
    assertEquals = assertEqual
 
962
 
 
963
    def assertEqualDiff(self, a, b, message=None):
 
964
        """Assert two texts are equal, if not raise an exception.
 
965
 
 
966
        This is intended for use with multi-line strings where it can
 
967
        be hard to find the differences by eye.
 
968
        """
 
969
        # TODO: perhaps override assertEquals to call this for strings?
 
970
        if a == b:
 
971
            return
 
972
        if message is None:
 
973
            message = "texts not equal:\n"
 
974
        if a == b + '\n':
 
975
            message = 'first string is missing a final newline.\n'
 
976
        if a + '\n' == b:
 
977
            message = 'second string is missing a final newline.\n'
 
978
        raise AssertionError(message +
 
979
                             self._ndiff_strings(a, b))
 
980
 
 
981
    def assertEqualMode(self, mode, mode_test):
 
982
        self.assertEqual(mode, mode_test,
 
983
                         'mode mismatch %o != %o' % (mode, mode_test))
 
984
 
 
985
    def assertEqualStat(self, expected, actual):
 
986
        """assert that expected and actual are the same stat result.
 
987
 
 
988
        :param expected: A stat result.
 
989
        :param actual: A stat result.
 
990
        :raises AssertionError: If the expected and actual stat values differ
 
991
            other than by atime.
 
992
        """
 
993
        self.assertEqual(expected.st_size, actual.st_size)
 
994
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
995
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
996
        self.assertEqual(expected.st_dev, actual.st_dev)
 
997
        self.assertEqual(expected.st_ino, actual.st_ino)
 
998
        self.assertEqual(expected.st_mode, actual.st_mode)
 
999
 
 
1000
    def assertLength(self, length, obj_with_len):
 
1001
        """Assert that obj_with_len is of length length."""
 
1002
        if len(obj_with_len) != length:
 
1003
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1004
                length, len(obj_with_len), obj_with_len))
 
1005
 
 
1006
    def assertPositive(self, val):
 
1007
        """Assert that val is greater than 0."""
 
1008
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1009
 
 
1010
    def assertNegative(self, val):
 
1011
        """Assert that val is less than 0."""
 
1012
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1013
 
 
1014
    def assertStartsWith(self, s, prefix):
 
1015
        if not s.startswith(prefix):
 
1016
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1017
 
 
1018
    def assertEndsWith(self, s, suffix):
 
1019
        """Asserts that s ends with suffix."""
 
1020
        if not s.endswith(suffix):
 
1021
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1022
 
 
1023
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1024
        """Assert that a contains something matching a regular expression."""
 
1025
        if not re.search(needle_re, haystack, flags):
 
1026
            if '\n' in haystack or len(haystack) > 60:
 
1027
                # a long string, format it in a more readable way
 
1028
                raise AssertionError(
 
1029
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1030
                        % (needle_re, haystack))
 
1031
            else:
 
1032
                raise AssertionError('pattern "%s" not found in "%s"'
 
1033
                        % (needle_re, haystack))
 
1034
 
 
1035
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1036
        """Assert that a does not match a regular expression"""
 
1037
        if re.search(needle_re, haystack, flags):
 
1038
            raise AssertionError('pattern "%s" found in "%s"'
 
1039
                    % (needle_re, haystack))
 
1040
 
 
1041
    def assertSubset(self, sublist, superlist):
 
1042
        """Assert that every entry in sublist is present in superlist."""
 
1043
        missing = set(sublist) - set(superlist)
 
1044
        if len(missing) > 0:
 
1045
            raise AssertionError("value(s) %r not present in container %r" %
 
1046
                                 (missing, superlist))
 
1047
 
 
1048
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1049
        """Fail unless excClass is raised when the iterator from func is used.
 
1050
 
 
1051
        Many functions can return generators this makes sure
 
1052
        to wrap them in a list() call to make sure the whole generator
 
1053
        is run, and that the proper exception is raised.
 
1054
        """
 
1055
        try:
 
1056
            list(func(*args, **kwargs))
 
1057
        except excClass, e:
 
1058
            return e
 
1059
        else:
 
1060
            if getattr(excClass,'__name__', None) is not None:
 
1061
                excName = excClass.__name__
 
1062
            else:
 
1063
                excName = str(excClass)
 
1064
            raise self.failureException, "%s not raised" % excName
 
1065
 
 
1066
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1067
        """Assert that a callable raises a particular exception.
 
1068
 
 
1069
        :param excClass: As for the except statement, this may be either an
 
1070
            exception class, or a tuple of classes.
 
1071
        :param callableObj: A callable, will be passed ``*args`` and
 
1072
            ``**kwargs``.
 
1073
 
 
1074
        Returns the exception so that you can examine it.
 
1075
        """
 
1076
        try:
 
1077
            callableObj(*args, **kwargs)
 
1078
        except excClass, e:
 
1079
            return e
 
1080
        else:
 
1081
            if getattr(excClass,'__name__', None) is not None:
 
1082
                excName = excClass.__name__
 
1083
            else:
 
1084
                # probably a tuple
 
1085
                excName = str(excClass)
 
1086
            raise self.failureException, "%s not raised" % excName
 
1087
 
 
1088
    def assertIs(self, left, right, message=None):
 
1089
        if not (left is right):
 
1090
            if message is not None:
 
1091
                raise AssertionError(message)
 
1092
            else:
 
1093
                raise AssertionError("%r is not %r." % (left, right))
 
1094
 
 
1095
    def assertIsNot(self, left, right, message=None):
 
1096
        if (left is right):
 
1097
            if message is not None:
 
1098
                raise AssertionError(message)
 
1099
            else:
 
1100
                raise AssertionError("%r is %r." % (left, right))
 
1101
 
 
1102
    def assertTransportMode(self, transport, path, mode):
 
1103
        """Fail if a path does not have mode "mode".
 
1104
 
 
1105
        If modes are not supported on this transport, the assertion is ignored.
 
1106
        """
 
1107
        if not transport._can_roundtrip_unix_modebits():
 
1108
            return
 
1109
        path_stat = transport.stat(path)
 
1110
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1111
        self.assertEqual(mode, actual_mode,
 
1112
                         'mode of %r incorrect (%s != %s)'
 
1113
                         % (path, oct(mode), oct(actual_mode)))
 
1114
 
 
1115
    def assertIsSameRealPath(self, path1, path2):
 
1116
        """Fail if path1 and path2 points to different files"""
 
1117
        self.assertEqual(osutils.realpath(path1),
 
1118
                         osutils.realpath(path2),
 
1119
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1120
 
 
1121
    def assertIsInstance(self, obj, kls, msg=None):
 
1122
        """Fail if obj is not an instance of kls
 
1123
        
 
1124
        :param msg: Supplementary message to show if the assertion fails.
 
1125
        """
 
1126
        if not isinstance(obj, kls):
 
1127
            m = "%r is an instance of %s rather than %s" % (
 
1128
                obj, obj.__class__, kls)
 
1129
            if msg:
 
1130
                m += ": " + msg
 
1131
            self.fail(m)
 
1132
 
 
1133
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1134
        """Invoke a test, expecting it to fail for the given reason.
 
1135
 
 
1136
        This is for assertions that ought to succeed, but currently fail.
 
1137
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1138
        about the failure you're expecting.  If a new bug is introduced,
 
1139
        AssertionError should be raised, not KnownFailure.
 
1140
 
 
1141
        Frequently, expectFailure should be followed by an opposite assertion.
 
1142
        See example below.
 
1143
 
 
1144
        Intended to be used with a callable that raises AssertionError as the
 
1145
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1146
 
 
1147
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1148
        test succeeds.
 
1149
 
 
1150
        example usage::
 
1151
 
 
1152
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1153
                             dynamic_val)
 
1154
          self.assertEqual(42, dynamic_val)
 
1155
 
 
1156
          This means that a dynamic_val of 54 will cause the test to raise
 
1157
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1158
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1159
          than 54 or 42 will cause an AssertionError.
 
1160
        """
 
1161
        try:
 
1162
            assertion(*args, **kwargs)
 
1163
        except AssertionError:
 
1164
            raise KnownFailure(reason)
 
1165
        else:
 
1166
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1167
 
 
1168
    def assertFileEqual(self, content, path):
 
1169
        """Fail if path does not contain 'content'."""
 
1170
        self.failUnlessExists(path)
 
1171
        f = file(path, 'rb')
 
1172
        try:
 
1173
            s = f.read()
 
1174
        finally:
 
1175
            f.close()
 
1176
        self.assertEqualDiff(content, s)
 
1177
 
 
1178
    def failUnlessExists(self, path):
 
1179
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1180
        if not isinstance(path, basestring):
 
1181
            for p in path:
 
1182
                self.failUnlessExists(p)
 
1183
        else:
 
1184
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1185
 
 
1186
    def failIfExists(self, path):
 
1187
        """Fail if path or paths, which may be abs or relative, exist."""
 
1188
        if not isinstance(path, basestring):
 
1189
            for p in path:
 
1190
                self.failIfExists(p)
 
1191
        else:
 
1192
            self.failIf(osutils.lexists(path),path+" exists")
 
1193
 
 
1194
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1195
        """A helper for callDeprecated and applyDeprecated.
 
1196
 
 
1197
        :param a_callable: A callable to call.
 
1198
        :param args: The positional arguments for the callable
 
1199
        :param kwargs: The keyword arguments for the callable
 
1200
        :return: A tuple (warnings, result). result is the result of calling
 
1201
            a_callable(``*args``, ``**kwargs``).
 
1202
        """
 
1203
        local_warnings = []
 
1204
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1205
            # we've hooked into a deprecation specific callpath,
 
1206
            # only deprecations should getting sent via it.
 
1207
            self.assertEqual(cls, DeprecationWarning)
 
1208
            local_warnings.append(msg)
 
1209
        original_warning_method = symbol_versioning.warn
 
1210
        symbol_versioning.set_warning_method(capture_warnings)
 
1211
        try:
 
1212
            result = a_callable(*args, **kwargs)
 
1213
        finally:
 
1214
            symbol_versioning.set_warning_method(original_warning_method)
 
1215
        return (local_warnings, result)
 
1216
 
 
1217
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1218
        """Call a deprecated callable without warning the user.
 
1219
 
 
1220
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1221
        not other callers that go direct to the warning module.
 
1222
 
 
1223
        To test that a deprecated method raises an error, do something like
 
1224
        this::
 
1225
 
 
1226
            self.assertRaises(errors.ReservedId,
 
1227
                self.applyDeprecated,
 
1228
                deprecated_in((1, 5, 0)),
 
1229
                br.append_revision,
 
1230
                'current:')
 
1231
 
 
1232
        :param deprecation_format: The deprecation format that the callable
 
1233
            should have been deprecated with. This is the same type as the
 
1234
            parameter to deprecated_method/deprecated_function. If the
 
1235
            callable is not deprecated with this format, an assertion error
 
1236
            will be raised.
 
1237
        :param a_callable: A callable to call. This may be a bound method or
 
1238
            a regular function. It will be called with ``*args`` and
 
1239
            ``**kwargs``.
 
1240
        :param args: The positional arguments for the callable
 
1241
        :param kwargs: The keyword arguments for the callable
 
1242
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1243
        """
 
1244
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1245
            *args, **kwargs)
 
1246
        expected_first_warning = symbol_versioning.deprecation_string(
 
1247
            a_callable, deprecation_format)
 
1248
        if len(call_warnings) == 0:
 
1249
            self.fail("No deprecation warning generated by call to %s" %
 
1250
                a_callable)
 
1251
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1252
        return result
 
1253
 
 
1254
    def callCatchWarnings(self, fn, *args, **kw):
 
1255
        """Call a callable that raises python warnings.
 
1256
 
 
1257
        The caller's responsible for examining the returned warnings.
 
1258
 
 
1259
        If the callable raises an exception, the exception is not
 
1260
        caught and propagates up to the caller.  In that case, the list
 
1261
        of warnings is not available.
 
1262
 
 
1263
        :returns: ([warning_object, ...], fn_result)
 
1264
        """
 
1265
        # XXX: This is not perfect, because it completely overrides the
 
1266
        # warnings filters, and some code may depend on suppressing particular
 
1267
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1268
        # though.  -- Andrew, 20071062
 
1269
        wlist = []
 
1270
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1271
            # despite the name, 'message' is normally(?) a Warning subclass
 
1272
            # instance
 
1273
            wlist.append(message)
 
1274
        saved_showwarning = warnings.showwarning
 
1275
        saved_filters = warnings.filters
 
1276
        try:
 
1277
            warnings.showwarning = _catcher
 
1278
            warnings.filters = []
 
1279
            result = fn(*args, **kw)
 
1280
        finally:
 
1281
            warnings.showwarning = saved_showwarning
 
1282
            warnings.filters = saved_filters
 
1283
        return wlist, result
 
1284
 
 
1285
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1286
        """Assert that a callable is deprecated in a particular way.
 
1287
 
 
1288
        This is a very precise test for unusual requirements. The
 
1289
        applyDeprecated helper function is probably more suited for most tests
 
1290
        as it allows you to simply specify the deprecation format being used
 
1291
        and will ensure that that is issued for the function being called.
 
1292
 
 
1293
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1294
        not other callers that go direct to the warning module.  To catch
 
1295
        general warnings, use callCatchWarnings.
 
1296
 
 
1297
        :param expected: a list of the deprecation warnings expected, in order
 
1298
        :param callable: The callable to call
 
1299
        :param args: The positional arguments for the callable
 
1300
        :param kwargs: The keyword arguments for the callable
 
1301
        """
 
1302
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1303
            *args, **kwargs)
 
1304
        self.assertEqual(expected, call_warnings)
 
1305
        return result
 
1306
 
 
1307
    def _startLogFile(self):
 
1308
        """Send bzr and test log messages to a temporary file.
 
1309
 
 
1310
        The file is removed as the test is torn down.
 
1311
        """
 
1312
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1313
        self._log_file = os.fdopen(fileno, 'w+')
 
1314
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1315
        self._log_file_name = name
 
1316
        self.addCleanup(self._finishLogFile)
 
1317
 
 
1318
    def _finishLogFile(self):
 
1319
        """Finished with the log file.
 
1320
 
 
1321
        Close the file and delete it, unless setKeepLogfile was called.
 
1322
        """
 
1323
        if self._log_file is None:
 
1324
            return
 
1325
        bzrlib.trace.pop_log_file(self._log_memento)
 
1326
        self._log_file.close()
 
1327
        self._log_file = None
 
1328
        if not self._keep_log_file:
 
1329
            os.remove(self._log_file_name)
 
1330
            self._log_file_name = None
 
1331
 
 
1332
    def setKeepLogfile(self):
 
1333
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1334
        self._keep_log_file = True
 
1335
 
 
1336
    def thisFailsStrictLockCheck(self):
 
1337
        """It is known that this test would fail with -Dstrict_locks.
 
1338
 
 
1339
        By default, all tests are run with strict lock checking unless
 
1340
        -Edisable_lock_checks is supplied. However there are some tests which
 
1341
        we know fail strict locks at this point that have not been fixed.
 
1342
        They should call this function to disable the strict checking.
 
1343
 
 
1344
        This should be used sparingly, it is much better to fix the locking
 
1345
        issues rather than papering over the problem by calling this function.
 
1346
        """
 
1347
        debug.debug_flags.discard('strict_locks')
 
1348
 
 
1349
    def addCleanup(self, callable, *args, **kwargs):
 
1350
        """Arrange to run a callable when this case is torn down.
 
1351
 
 
1352
        Callables are run in the reverse of the order they are registered,
 
1353
        ie last-in first-out.
 
1354
        """
 
1355
        self._cleanups.append((callable, args, kwargs))
 
1356
 
 
1357
    def _cleanEnvironment(self):
 
1358
        new_env = {
 
1359
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1360
            'HOME': os.getcwd(),
 
1361
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1362
            # tests do check our impls match APPDATA
 
1363
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1364
            'VISUAL': None,
 
1365
            'EDITOR': None,
 
1366
            'BZR_EMAIL': None,
 
1367
            'BZREMAIL': None, # may still be present in the environment
 
1368
            'EMAIL': None,
 
1369
            'BZR_PROGRESS_BAR': None,
 
1370
            'BZR_LOG': None,
 
1371
            'BZR_PLUGIN_PATH': None,
 
1372
            # Make sure that any text ui tests are consistent regardless of
 
1373
            # the environment the test case is run in; you may want tests that
 
1374
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1375
            # going to a pipe or a StringIO.
 
1376
            'TERM': 'dumb',
 
1377
            'LINES': '25',
 
1378
            'COLUMNS': '80',
 
1379
            # SSH Agent
 
1380
            'SSH_AUTH_SOCK': None,
 
1381
            # Proxies
 
1382
            'http_proxy': None,
 
1383
            'HTTP_PROXY': None,
 
1384
            'https_proxy': None,
 
1385
            'HTTPS_PROXY': None,
 
1386
            'no_proxy': None,
 
1387
            'NO_PROXY': None,
 
1388
            'all_proxy': None,
 
1389
            'ALL_PROXY': None,
 
1390
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1391
            # least. If you do (care), please update this comment
 
1392
            # -- vila 20080401
 
1393
            'ftp_proxy': None,
 
1394
            'FTP_PROXY': None,
 
1395
            'BZR_REMOTE_PATH': None,
 
1396
        }
 
1397
        self.__old_env = {}
 
1398
        self.addCleanup(self._restoreEnvironment)
 
1399
        for name, value in new_env.iteritems():
 
1400
            self._captureVar(name, value)
 
1401
 
 
1402
    def _captureVar(self, name, newvalue):
 
1403
        """Set an environment variable, and reset it when finished."""
 
1404
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1405
 
 
1406
    def _restore_debug_flags(self):
 
1407
        debug.debug_flags.clear()
 
1408
        debug.debug_flags.update(self._preserved_debug_flags)
 
1409
 
 
1410
    def _restoreEnvironment(self):
 
1411
        for name, value in self.__old_env.iteritems():
 
1412
            osutils.set_or_unset_env(name, value)
 
1413
 
 
1414
    def _restoreHooks(self):
 
1415
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1416
            setattr(klass, name, hooks)
 
1417
 
 
1418
    def knownFailure(self, reason):
 
1419
        """This test has failed for some known reason."""
 
1420
        raise KnownFailure(reason)
 
1421
 
 
1422
    def _do_skip(self, result, reason):
 
1423
        addSkip = getattr(result, 'addSkip', None)
 
1424
        if not callable(addSkip):
 
1425
            result.addError(self, sys.exc_info())
 
1426
        else:
 
1427
            addSkip(self, reason)
 
1428
 
 
1429
    def run(self, result=None):
 
1430
        if result is None: result = self.defaultTestResult()
 
1431
        for feature in getattr(self, '_test_needs_features', []):
 
1432
            if not feature.available():
 
1433
                result.startTest(self)
 
1434
                if getattr(result, 'addNotSupported', None):
 
1435
                    result.addNotSupported(self, feature)
 
1436
                else:
 
1437
                    result.addSuccess(self)
 
1438
                result.stopTest(self)
 
1439
                return result
 
1440
        try:
 
1441
            try:
 
1442
                result.startTest(self)
 
1443
                absent_attr = object()
 
1444
                # Python 2.5
 
1445
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1446
                if method_name is absent_attr:
 
1447
                    # Python 2.4
 
1448
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1449
                testMethod = getattr(self, method_name)
 
1450
                try:
 
1451
                    try:
 
1452
                        self.setUp()
 
1453
                        if not self._bzr_test_setUp_run:
 
1454
                            self.fail(
 
1455
                                "test setUp did not invoke "
 
1456
                                "bzrlib.tests.TestCase's setUp")
 
1457
                    except KeyboardInterrupt:
 
1458
                        self._runCleanups()
 
1459
                        raise
 
1460
                    except TestSkipped, e:
 
1461
                        self._do_skip(result, e.args[0])
 
1462
                        self.tearDown()
 
1463
                        return result
 
1464
                    except:
 
1465
                        result.addError(self, sys.exc_info())
 
1466
                        self._runCleanups()
 
1467
                        return result
 
1468
 
 
1469
                    ok = False
 
1470
                    try:
 
1471
                        testMethod()
 
1472
                        ok = True
 
1473
                    except self.failureException:
 
1474
                        result.addFailure(self, sys.exc_info())
 
1475
                    except TestSkipped, e:
 
1476
                        if not e.args:
 
1477
                            reason = "No reason given."
 
1478
                        else:
 
1479
                            reason = e.args[0]
 
1480
                        self._do_skip(result, reason)
 
1481
                    except KeyboardInterrupt:
 
1482
                        self._runCleanups()
 
1483
                        raise
 
1484
                    except:
 
1485
                        result.addError(self, sys.exc_info())
 
1486
 
 
1487
                    try:
 
1488
                        self.tearDown()
 
1489
                        if not self._bzr_test_tearDown_run:
 
1490
                            self.fail(
 
1491
                                "test tearDown did not invoke "
 
1492
                                "bzrlib.tests.TestCase's tearDown")
 
1493
                    except KeyboardInterrupt:
 
1494
                        self._runCleanups()
 
1495
                        raise
 
1496
                    except:
 
1497
                        result.addError(self, sys.exc_info())
 
1498
                        self._runCleanups()
 
1499
                        ok = False
 
1500
                    if ok: result.addSuccess(self)
 
1501
                finally:
 
1502
                    result.stopTest(self)
 
1503
                return result
 
1504
            except TestNotApplicable:
 
1505
                # Not moved from the result [yet].
 
1506
                self._runCleanups()
 
1507
                raise
 
1508
            except KeyboardInterrupt:
 
1509
                self._runCleanups()
 
1510
                raise
 
1511
        finally:
 
1512
            saved_attrs = {}
 
1513
            for attr_name in self.attrs_to_keep:
 
1514
                if attr_name in self.__dict__:
 
1515
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1516
            self.__dict__ = saved_attrs
 
1517
 
 
1518
    def tearDown(self):
 
1519
        self._runCleanups()
 
1520
        self._log_contents = ''
 
1521
        self._bzr_test_tearDown_run = True
 
1522
        unittest.TestCase.tearDown(self)
 
1523
 
 
1524
    def time(self, callable, *args, **kwargs):
 
1525
        """Run callable and accrue the time it takes to the benchmark time.
 
1526
 
 
1527
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1528
        this will cause lsprofile statistics to be gathered and stored in
 
1529
        self._benchcalls.
 
1530
        """
 
1531
        if self._benchtime is None:
 
1532
            self._benchtime = 0
 
1533
        start = time.time()
 
1534
        try:
 
1535
            if not self._gather_lsprof_in_benchmarks:
 
1536
                return callable(*args, **kwargs)
 
1537
            else:
 
1538
                # record this benchmark
 
1539
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1540
                stats.sort()
 
1541
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1542
                return ret
 
1543
        finally:
 
1544
            self._benchtime += time.time() - start
 
1545
 
 
1546
    def _runCleanups(self):
 
1547
        """Run registered cleanup functions.
 
1548
 
 
1549
        This should only be called from TestCase.tearDown.
 
1550
        """
 
1551
        # TODO: Perhaps this should keep running cleanups even if
 
1552
        # one of them fails?
 
1553
 
 
1554
        # Actually pop the cleanups from the list so tearDown running
 
1555
        # twice is safe (this happens for skipped tests).
 
1556
        while self._cleanups:
 
1557
            cleanup, args, kwargs = self._cleanups.pop()
 
1558
            cleanup(*args, **kwargs)
 
1559
 
 
1560
    def log(self, *args):
 
1561
        mutter(*args)
 
1562
 
 
1563
    def _get_log(self, keep_log_file=False):
 
1564
        """Get the log from bzrlib.trace calls from this test.
 
1565
 
 
1566
        :param keep_log_file: When True, if the log is still a file on disk
 
1567
            leave it as a file on disk. When False, if the log is still a file
 
1568
            on disk, the log file is deleted and the log preserved as
 
1569
            self._log_contents.
 
1570
        :return: A string containing the log.
 
1571
        """
 
1572
        # flush the log file, to get all content
 
1573
        import bzrlib.trace
 
1574
        if bzrlib.trace._trace_file:
 
1575
            bzrlib.trace._trace_file.flush()
 
1576
        if self._log_contents:
 
1577
            # XXX: this can hardly contain the content flushed above --vila
 
1578
            # 20080128
 
1579
            return self._log_contents
 
1580
        if self._log_file_name is not None:
 
1581
            logfile = open(self._log_file_name)
 
1582
            try:
 
1583
                log_contents = logfile.read()
 
1584
            finally:
 
1585
                logfile.close()
 
1586
            if not keep_log_file:
 
1587
                self._log_contents = log_contents
 
1588
                try:
 
1589
                    os.remove(self._log_file_name)
 
1590
                except OSError, e:
 
1591
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1592
                        sys.stderr.write(('Unable to delete log file '
 
1593
                                             ' %r\n' % self._log_file_name))
 
1594
                    else:
 
1595
                        raise
 
1596
            return log_contents
 
1597
        else:
 
1598
            return "DELETED log file to reduce memory footprint"
 
1599
 
 
1600
    def requireFeature(self, feature):
 
1601
        """This test requires a specific feature is available.
 
1602
 
 
1603
        :raises UnavailableFeature: When feature is not available.
 
1604
        """
 
1605
        if not feature.available():
 
1606
            raise UnavailableFeature(feature)
 
1607
 
 
1608
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1609
            working_dir):
 
1610
        """Run bazaar command line, splitting up a string command line."""
 
1611
        if isinstance(args, basestring):
 
1612
            # shlex don't understand unicode strings,
 
1613
            # so args should be plain string (bialix 20070906)
 
1614
            args = list(shlex.split(str(args)))
 
1615
        return self._run_bzr_core(args, retcode=retcode,
 
1616
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1617
                )
 
1618
 
 
1619
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1620
            working_dir):
 
1621
        # Clear chk_map page cache, because the contents are likely to mask
 
1622
        # locking errors.
 
1623
        chk_map.clear_cache()
 
1624
        if encoding is None:
 
1625
            encoding = osutils.get_user_encoding()
 
1626
        stdout = StringIOWrapper()
 
1627
        stderr = StringIOWrapper()
 
1628
        stdout.encoding = encoding
 
1629
        stderr.encoding = encoding
 
1630
 
 
1631
        self.log('run bzr: %r', args)
 
1632
        # FIXME: don't call into logging here
 
1633
        handler = logging.StreamHandler(stderr)
 
1634
        handler.setLevel(logging.INFO)
 
1635
        logger = logging.getLogger('')
 
1636
        logger.addHandler(handler)
 
1637
        old_ui_factory = ui.ui_factory
 
1638
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1639
 
 
1640
        cwd = None
 
1641
        if working_dir is not None:
 
1642
            cwd = osutils.getcwd()
 
1643
            os.chdir(working_dir)
 
1644
 
 
1645
        try:
 
1646
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1647
                stdout, stderr,
 
1648
                bzrlib.commands.run_bzr_catch_user_errors,
 
1649
                args)
 
1650
        finally:
 
1651
            logger.removeHandler(handler)
 
1652
            ui.ui_factory = old_ui_factory
 
1653
            if cwd is not None:
 
1654
                os.chdir(cwd)
 
1655
 
 
1656
        out = stdout.getvalue()
 
1657
        err = stderr.getvalue()
 
1658
        if out:
 
1659
            self.log('output:\n%r', out)
 
1660
        if err:
 
1661
            self.log('errors:\n%r', err)
 
1662
        if retcode is not None:
 
1663
            self.assertEquals(retcode, result,
 
1664
                              message='Unexpected return code')
 
1665
        return out, err
 
1666
 
 
1667
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1668
                working_dir=None, error_regexes=[], output_encoding=None):
 
1669
        """Invoke bzr, as if it were run from the command line.
 
1670
 
 
1671
        The argument list should not include the bzr program name - the
 
1672
        first argument is normally the bzr command.  Arguments may be
 
1673
        passed in three ways:
 
1674
 
 
1675
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1676
        when the command contains whitespace or metacharacters, or
 
1677
        is built up at run time.
 
1678
 
 
1679
        2- A single string, eg "add a".  This is the most convenient
 
1680
        for hardcoded commands.
 
1681
 
 
1682
        This runs bzr through the interface that catches and reports
 
1683
        errors, and with logging set to something approximating the
 
1684
        default, so that error reporting can be checked.
 
1685
 
 
1686
        This should be the main method for tests that want to exercise the
 
1687
        overall behavior of the bzr application (rather than a unit test
 
1688
        or a functional test of the library.)
 
1689
 
 
1690
        This sends the stdout/stderr results into the test's log,
 
1691
        where it may be useful for debugging.  See also run_captured.
 
1692
 
 
1693
        :keyword stdin: A string to be used as stdin for the command.
 
1694
        :keyword retcode: The status code the command should return;
 
1695
            default 0.
 
1696
        :keyword working_dir: The directory to run the command in
 
1697
        :keyword error_regexes: A list of expected error messages.  If
 
1698
            specified they must be seen in the error output of the command.
 
1699
        """
 
1700
        out, err = self._run_bzr_autosplit(
 
1701
            args=args,
 
1702
            retcode=retcode,
 
1703
            encoding=encoding,
 
1704
            stdin=stdin,
 
1705
            working_dir=working_dir,
 
1706
            )
 
1707
        self.assertIsInstance(error_regexes, (list, tuple))
 
1708
        for regex in error_regexes:
 
1709
            self.assertContainsRe(err, regex)
 
1710
        return out, err
 
1711
 
 
1712
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1713
        """Run bzr, and check that stderr contains the supplied regexes
 
1714
 
 
1715
        :param error_regexes: Sequence of regular expressions which
 
1716
            must each be found in the error output. The relative ordering
 
1717
            is not enforced.
 
1718
        :param args: command-line arguments for bzr
 
1719
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1720
            This function changes the default value of retcode to be 3,
 
1721
            since in most cases this is run when you expect bzr to fail.
 
1722
 
 
1723
        :return: (out, err) The actual output of running the command (in case
 
1724
            you want to do more inspection)
 
1725
 
 
1726
        Examples of use::
 
1727
 
 
1728
            # Make sure that commit is failing because there is nothing to do
 
1729
            self.run_bzr_error(['no changes to commit'],
 
1730
                               ['commit', '-m', 'my commit comment'])
 
1731
            # Make sure --strict is handling an unknown file, rather than
 
1732
            # giving us the 'nothing to do' error
 
1733
            self.build_tree(['unknown'])
 
1734
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1735
                               ['commit', --strict', '-m', 'my commit comment'])
 
1736
        """
 
1737
        kwargs.setdefault('retcode', 3)
 
1738
        kwargs['error_regexes'] = error_regexes
 
1739
        out, err = self.run_bzr(*args, **kwargs)
 
1740
        return out, err
 
1741
 
 
1742
    def run_bzr_subprocess(self, *args, **kwargs):
 
1743
        """Run bzr in a subprocess for testing.
 
1744
 
 
1745
        This starts a new Python interpreter and runs bzr in there.
 
1746
        This should only be used for tests that have a justifiable need for
 
1747
        this isolation: e.g. they are testing startup time, or signal
 
1748
        handling, or early startup code, etc.  Subprocess code can't be
 
1749
        profiled or debugged so easily.
 
1750
 
 
1751
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1752
            None is supplied, the status code is not checked.
 
1753
        :keyword env_changes: A dictionary which lists changes to environment
 
1754
            variables. A value of None will unset the env variable.
 
1755
            The values must be strings. The change will only occur in the
 
1756
            child, so you don't need to fix the environment after running.
 
1757
        :keyword universal_newlines: Convert CRLF => LF
 
1758
        :keyword allow_plugins: By default the subprocess is run with
 
1759
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1760
            for system-wide plugins to create unexpected output on stderr,
 
1761
            which can cause unnecessary test failures.
 
1762
        """
 
1763
        env_changes = kwargs.get('env_changes', {})
 
1764
        working_dir = kwargs.get('working_dir', None)
 
1765
        allow_plugins = kwargs.get('allow_plugins', False)
 
1766
        if len(args) == 1:
 
1767
            if isinstance(args[0], list):
 
1768
                args = args[0]
 
1769
            elif isinstance(args[0], basestring):
 
1770
                args = list(shlex.split(args[0]))
 
1771
        else:
 
1772
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1773
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1774
                                            working_dir=working_dir,
 
1775
                                            allow_plugins=allow_plugins)
 
1776
        # We distinguish between retcode=None and retcode not passed.
 
1777
        supplied_retcode = kwargs.get('retcode', 0)
 
1778
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1779
            universal_newlines=kwargs.get('universal_newlines', False),
 
1780
            process_args=args)
 
1781
 
 
1782
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1783
                             skip_if_plan_to_signal=False,
 
1784
                             working_dir=None,
 
1785
                             allow_plugins=False):
 
1786
        """Start bzr in a subprocess for testing.
 
1787
 
 
1788
        This starts a new Python interpreter and runs bzr in there.
 
1789
        This should only be used for tests that have a justifiable need for
 
1790
        this isolation: e.g. they are testing startup time, or signal
 
1791
        handling, or early startup code, etc.  Subprocess code can't be
 
1792
        profiled or debugged so easily.
 
1793
 
 
1794
        :param process_args: a list of arguments to pass to the bzr executable,
 
1795
            for example ``['--version']``.
 
1796
        :param env_changes: A dictionary which lists changes to environment
 
1797
            variables. A value of None will unset the env variable.
 
1798
            The values must be strings. The change will only occur in the
 
1799
            child, so you don't need to fix the environment after running.
 
1800
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1801
            is not available.
 
1802
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1803
 
 
1804
        :returns: Popen object for the started process.
 
1805
        """
 
1806
        if skip_if_plan_to_signal:
 
1807
            if not getattr(os, 'kill', None):
 
1808
                raise TestSkipped("os.kill not available.")
 
1809
 
 
1810
        if env_changes is None:
 
1811
            env_changes = {}
 
1812
        old_env = {}
 
1813
 
 
1814
        def cleanup_environment():
 
1815
            for env_var, value in env_changes.iteritems():
 
1816
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1817
 
 
1818
        def restore_environment():
 
1819
            for env_var, value in old_env.iteritems():
 
1820
                osutils.set_or_unset_env(env_var, value)
 
1821
 
 
1822
        bzr_path = self.get_bzr_path()
 
1823
 
 
1824
        cwd = None
 
1825
        if working_dir is not None:
 
1826
            cwd = osutils.getcwd()
 
1827
            os.chdir(working_dir)
 
1828
 
 
1829
        try:
 
1830
            # win32 subprocess doesn't support preexec_fn
 
1831
            # so we will avoid using it on all platforms, just to
 
1832
            # make sure the code path is used, and we don't break on win32
 
1833
            cleanup_environment()
 
1834
            command = [sys.executable]
 
1835
            # frozen executables don't need the path to bzr
 
1836
            if getattr(sys, "frozen", None) is None:
 
1837
                command.append(bzr_path)
 
1838
            if not allow_plugins:
 
1839
                command.append('--no-plugins')
 
1840
            command.extend(process_args)
 
1841
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1842
        finally:
 
1843
            restore_environment()
 
1844
            if cwd is not None:
 
1845
                os.chdir(cwd)
 
1846
 
 
1847
        return process
 
1848
 
 
1849
    def _popen(self, *args, **kwargs):
 
1850
        """Place a call to Popen.
 
1851
 
 
1852
        Allows tests to override this method to intercept the calls made to
 
1853
        Popen for introspection.
 
1854
        """
 
1855
        return Popen(*args, **kwargs)
 
1856
 
 
1857
    def get_bzr_path(self):
 
1858
        """Return the path of the 'bzr' executable for this test suite."""
 
1859
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1860
        if not os.path.isfile(bzr_path):
 
1861
            # We are probably installed. Assume sys.argv is the right file
 
1862
            bzr_path = sys.argv[0]
 
1863
        return bzr_path
 
1864
 
 
1865
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1866
                              universal_newlines=False, process_args=None):
 
1867
        """Finish the execution of process.
 
1868
 
 
1869
        :param process: the Popen object returned from start_bzr_subprocess.
 
1870
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1871
            None is supplied, the status code is not checked.
 
1872
        :param send_signal: an optional signal to send to the process.
 
1873
        :param universal_newlines: Convert CRLF => LF
 
1874
        :returns: (stdout, stderr)
 
1875
        """
 
1876
        if send_signal is not None:
 
1877
            os.kill(process.pid, send_signal)
 
1878
        out, err = process.communicate()
 
1879
 
 
1880
        if universal_newlines:
 
1881
            out = out.replace('\r\n', '\n')
 
1882
            err = err.replace('\r\n', '\n')
 
1883
 
 
1884
        if retcode is not None and retcode != process.returncode:
 
1885
            if process_args is None:
 
1886
                process_args = "(unknown args)"
 
1887
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1888
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1889
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1890
                      % (process_args, retcode, process.returncode))
 
1891
        return [out, err]
124
1892
 
125
1893
    def check_inventory_shape(self, inv, shape):
126
 
        """
127
 
        Compare an inventory to a list of expected names.
 
1894
        """Compare an inventory to a list of expected names.
128
1895
 
129
1896
        Fail if they are not precisely equal.
130
1897
        """
132
1899
        shape = list(shape)             # copy
133
1900
        for path, ie in inv.entries():
134
1901
            name = path.replace('\\', '/')
135
 
            if ie.kind == 'dir':
 
1902
            if ie.kind == 'directory':
136
1903
                name = name + '/'
137
1904
            if name in shape:
138
1905
                shape.remove(name)
143
1910
        if extras:
144
1911
            self.fail("unexpected paths found in inventory: %r" % extras)
145
1912
 
 
1913
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1914
                         a_callable=None, *args, **kwargs):
 
1915
        """Call callable with redirected std io pipes.
 
1916
 
 
1917
        Returns the return code."""
 
1918
        if not callable(a_callable):
 
1919
            raise ValueError("a_callable must be callable.")
 
1920
        if stdin is None:
 
1921
            stdin = StringIO("")
 
1922
        if stdout is None:
 
1923
            if getattr(self, "_log_file", None) is not None:
 
1924
                stdout = self._log_file
 
1925
            else:
 
1926
                stdout = StringIO()
 
1927
        if stderr is None:
 
1928
            if getattr(self, "_log_file", None is not None):
 
1929
                stderr = self._log_file
 
1930
            else:
 
1931
                stderr = StringIO()
 
1932
        real_stdin = sys.stdin
 
1933
        real_stdout = sys.stdout
 
1934
        real_stderr = sys.stderr
 
1935
        try:
 
1936
            sys.stdout = stdout
 
1937
            sys.stderr = stderr
 
1938
            sys.stdin = stdin
 
1939
            return a_callable(*args, **kwargs)
 
1940
        finally:
 
1941
            sys.stdout = real_stdout
 
1942
            sys.stderr = real_stderr
 
1943
            sys.stdin = real_stdin
 
1944
 
 
1945
    def reduceLockdirTimeout(self):
 
1946
        """Reduce the default lock timeout for the duration of the test, so that
 
1947
        if LockContention occurs during a test, it does so quickly.
 
1948
 
 
1949
        Tests that expect to provoke LockContention errors should call this.
 
1950
        """
 
1951
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1952
        def resetTimeout():
 
1953
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1954
        self.addCleanup(resetTimeout)
 
1955
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1956
 
 
1957
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1958
        """Return a StringIOWrapper instance, that will encode Unicode
 
1959
        input to UTF-8.
 
1960
        """
 
1961
        if encoding_type is None:
 
1962
            encoding_type = 'strict'
 
1963
        sio = StringIO()
 
1964
        output_encoding = 'utf-8'
 
1965
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1966
        sio.encoding = output_encoding
 
1967
        return sio
 
1968
 
 
1969
    def disable_verb(self, verb):
 
1970
        """Disable a smart server verb for one test."""
 
1971
        from bzrlib.smart import request
 
1972
        request_handlers = request.request_handlers
 
1973
        orig_method = request_handlers.get(verb)
 
1974
        request_handlers.remove(verb)
 
1975
        def restoreVerb():
 
1976
            request_handlers.register(verb, orig_method)
 
1977
        self.addCleanup(restoreVerb)
 
1978
 
 
1979
 
 
1980
class CapturedCall(object):
 
1981
    """A helper for capturing smart server calls for easy debug analysis."""
 
1982
 
 
1983
    def __init__(self, params, prefix_length):
 
1984
        """Capture the call with params and skip prefix_length stack frames."""
 
1985
        self.call = params
 
1986
        import traceback
 
1987
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
1988
        # client frames. Beyond this we could get more clever, but this is good
 
1989
        # enough for now.
 
1990
        stack = traceback.extract_stack()[prefix_length:-5]
 
1991
        self.stack = ''.join(traceback.format_list(stack))
 
1992
 
 
1993
    def __str__(self):
 
1994
        return self.call.method
 
1995
 
 
1996
    def __repr__(self):
 
1997
        return self.call.method
 
1998
 
 
1999
    def stack(self):
 
2000
        return self.stack
 
2001
 
 
2002
 
 
2003
class TestCaseWithMemoryTransport(TestCase):
 
2004
    """Common test class for tests that do not need disk resources.
 
2005
 
 
2006
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2007
 
 
2008
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2009
 
 
2010
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2011
    a directory which does not exist. This serves to help ensure test isolation
 
2012
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2013
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2014
    file defaults for the transport in tests, nor does it obey the command line
 
2015
    override, so tests that accidentally write to the common directory should
 
2016
    be rare.
 
2017
 
 
2018
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2019
    a .bzr directory that stops us ascending higher into the filesystem.
 
2020
    """
 
2021
 
 
2022
    TEST_ROOT = None
 
2023
    _TEST_NAME = 'test'
 
2024
 
 
2025
    def __init__(self, methodName='runTest'):
 
2026
        # allow test parameterization after test construction and before test
 
2027
        # execution. Variables that the parameterizer sets need to be
 
2028
        # ones that are not set by setUp, or setUp will trash them.
 
2029
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2030
        self.vfs_transport_factory = default_transport
 
2031
        self.transport_server = None
 
2032
        self.transport_readonly_server = None
 
2033
        self.__vfs_server = None
 
2034
 
 
2035
    def get_transport(self, relpath=None):
 
2036
        """Return a writeable transport.
 
2037
 
 
2038
        This transport is for the test scratch space relative to
 
2039
        "self._test_root"
 
2040
 
 
2041
        :param relpath: a path relative to the base url.
 
2042
        """
 
2043
        t = get_transport(self.get_url(relpath))
 
2044
        self.assertFalse(t.is_readonly())
 
2045
        return t
 
2046
 
 
2047
    def get_readonly_transport(self, relpath=None):
 
2048
        """Return a readonly transport for the test scratch space
 
2049
 
 
2050
        This can be used to test that operations which should only need
 
2051
        readonly access in fact do not try to write.
 
2052
 
 
2053
        :param relpath: a path relative to the base url.
 
2054
        """
 
2055
        t = get_transport(self.get_readonly_url(relpath))
 
2056
        self.assertTrue(t.is_readonly())
 
2057
        return t
 
2058
 
 
2059
    def create_transport_readonly_server(self):
 
2060
        """Create a transport server from class defined at init.
 
2061
 
 
2062
        This is mostly a hook for daughter classes.
 
2063
        """
 
2064
        return self.transport_readonly_server()
 
2065
 
 
2066
    def get_readonly_server(self):
 
2067
        """Get the server instance for the readonly transport
 
2068
 
 
2069
        This is useful for some tests with specific servers to do diagnostics.
 
2070
        """
 
2071
        if self.__readonly_server is None:
 
2072
            if self.transport_readonly_server is None:
 
2073
                # readonly decorator requested
 
2074
                # bring up the server
 
2075
                self.__readonly_server = ReadonlyServer()
 
2076
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2077
            else:
 
2078
                self.__readonly_server = self.create_transport_readonly_server()
 
2079
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
2080
            self.addCleanup(self.__readonly_server.tearDown)
 
2081
        return self.__readonly_server
 
2082
 
 
2083
    def get_readonly_url(self, relpath=None):
 
2084
        """Get a URL for the readonly transport.
 
2085
 
 
2086
        This will either be backed by '.' or a decorator to the transport
 
2087
        used by self.get_url()
 
2088
        relpath provides for clients to get a path relative to the base url.
 
2089
        These should only be downwards relative, not upwards.
 
2090
        """
 
2091
        base = self.get_readonly_server().get_url()
 
2092
        return self._adjust_url(base, relpath)
 
2093
 
 
2094
    def get_vfs_only_server(self):
 
2095
        """Get the vfs only read/write server instance.
 
2096
 
 
2097
        This is useful for some tests with specific servers that need
 
2098
        diagnostics.
 
2099
 
 
2100
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2101
        is no means to override it.
 
2102
        """
 
2103
        if self.__vfs_server is None:
 
2104
            self.__vfs_server = MemoryServer()
 
2105
            self.__vfs_server.setUp()
 
2106
            self.addCleanup(self.__vfs_server.tearDown)
 
2107
        return self.__vfs_server
 
2108
 
 
2109
    def get_server(self):
 
2110
        """Get the read/write server instance.
 
2111
 
 
2112
        This is useful for some tests with specific servers that need
 
2113
        diagnostics.
 
2114
 
 
2115
        This is built from the self.transport_server factory. If that is None,
 
2116
        then the self.get_vfs_server is returned.
 
2117
        """
 
2118
        if self.__server is None:
 
2119
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
2120
                return self.get_vfs_only_server()
 
2121
            else:
 
2122
                # bring up a decorated means of access to the vfs only server.
 
2123
                self.__server = self.transport_server()
 
2124
                try:
 
2125
                    self.__server.setUp(self.get_vfs_only_server())
 
2126
                except TypeError, e:
 
2127
                    # This should never happen; the try:Except here is to assist
 
2128
                    # developers having to update code rather than seeing an
 
2129
                    # uninformative TypeError.
 
2130
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
2131
            self.addCleanup(self.__server.tearDown)
 
2132
        return self.__server
 
2133
 
 
2134
    def _adjust_url(self, base, relpath):
 
2135
        """Get a URL (or maybe a path) for the readwrite transport.
 
2136
 
 
2137
        This will either be backed by '.' or to an equivalent non-file based
 
2138
        facility.
 
2139
        relpath provides for clients to get a path relative to the base url.
 
2140
        These should only be downwards relative, not upwards.
 
2141
        """
 
2142
        if relpath is not None and relpath != '.':
 
2143
            if not base.endswith('/'):
 
2144
                base = base + '/'
 
2145
            # XXX: Really base should be a url; we did after all call
 
2146
            # get_url()!  But sometimes it's just a path (from
 
2147
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2148
            # to a non-escaped local path.
 
2149
            if base.startswith('./') or base.startswith('/'):
 
2150
                base += relpath
 
2151
            else:
 
2152
                base += urlutils.escape(relpath)
 
2153
        return base
 
2154
 
 
2155
    def get_url(self, relpath=None):
 
2156
        """Get a URL (or maybe a path) for the readwrite transport.
 
2157
 
 
2158
        This will either be backed by '.' or to an equivalent non-file based
 
2159
        facility.
 
2160
        relpath provides for clients to get a path relative to the base url.
 
2161
        These should only be downwards relative, not upwards.
 
2162
        """
 
2163
        base = self.get_server().get_url()
 
2164
        return self._adjust_url(base, relpath)
 
2165
 
 
2166
    def get_vfs_only_url(self, relpath=None):
 
2167
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2168
 
 
2169
        This will never be a smart protocol.  It always has all the
 
2170
        capabilities of the local filesystem, but it might actually be a
 
2171
        MemoryTransport or some other similar virtual filesystem.
 
2172
 
 
2173
        This is the backing transport (if any) of the server returned by
 
2174
        get_url and get_readonly_url.
 
2175
 
 
2176
        :param relpath: provides for clients to get a path relative to the base
 
2177
            url.  These should only be downwards relative, not upwards.
 
2178
        :return: A URL
 
2179
        """
 
2180
        base = self.get_vfs_only_server().get_url()
 
2181
        return self._adjust_url(base, relpath)
 
2182
 
 
2183
    def _create_safety_net(self):
 
2184
        """Make a fake bzr directory.
 
2185
 
 
2186
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2187
        real branch.
 
2188
        """
 
2189
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2190
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2191
 
 
2192
    def _check_safety_net(self):
 
2193
        """Check that the safety .bzr directory have not been touched.
 
2194
 
 
2195
        _make_test_root have created a .bzr directory to prevent tests from
 
2196
        propagating. This method ensures than a test did not leaked.
 
2197
        """
 
2198
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2199
        wt = workingtree.WorkingTree.open(root)
 
2200
        last_rev = wt.last_revision()
 
2201
        if last_rev != 'null:':
 
2202
            # The current test have modified the /bzr directory, we need to
 
2203
            # recreate a new one or all the followng tests will fail.
 
2204
            # If you need to inspect its content uncomment the following line
 
2205
            # import pdb; pdb.set_trace()
 
2206
            _rmtree_temp_dir(root + '/.bzr')
 
2207
            self._create_safety_net()
 
2208
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2209
 
 
2210
    def _make_test_root(self):
 
2211
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2212
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2213
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2214
 
 
2215
            self._create_safety_net()
 
2216
 
 
2217
            # The same directory is used by all tests, and we're not
 
2218
            # specifically told when all tests are finished.  This will do.
 
2219
            atexit.register(_rmtree_temp_dir, root)
 
2220
 
 
2221
        self.addCleanup(self._check_safety_net)
 
2222
 
 
2223
    def makeAndChdirToTestDir(self):
 
2224
        """Create a temporary directories for this one test.
 
2225
 
 
2226
        This must set self.test_home_dir and self.test_dir and chdir to
 
2227
        self.test_dir.
 
2228
 
 
2229
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2230
        """
 
2231
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2232
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2233
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2234
 
 
2235
    def make_branch(self, relpath, format=None):
 
2236
        """Create a branch on the transport at relpath."""
 
2237
        repo = self.make_repository(relpath, format=format)
 
2238
        return repo.bzrdir.create_branch()
 
2239
 
 
2240
    def make_bzrdir(self, relpath, format=None):
 
2241
        try:
 
2242
            # might be a relative or absolute path
 
2243
            maybe_a_url = self.get_url(relpath)
 
2244
            segments = maybe_a_url.rsplit('/', 1)
 
2245
            t = get_transport(maybe_a_url)
 
2246
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2247
                t.ensure_base()
 
2248
            if format is None:
 
2249
                format = 'default'
 
2250
            if isinstance(format, basestring):
 
2251
                format = bzrdir.format_registry.make_bzrdir(format)
 
2252
            return format.initialize_on_transport(t)
 
2253
        except errors.UninitializableFormat:
 
2254
            raise TestSkipped("Format %s is not initializable." % format)
 
2255
 
 
2256
    def make_repository(self, relpath, shared=False, format=None):
 
2257
        """Create a repository on our default transport at relpath.
 
2258
 
 
2259
        Note that relpath must be a relative path, not a full url.
 
2260
        """
 
2261
        # FIXME: If you create a remoterepository this returns the underlying
 
2262
        # real format, which is incorrect.  Actually we should make sure that
 
2263
        # RemoteBzrDir returns a RemoteRepository.
 
2264
        # maybe  mbp 20070410
 
2265
        made_control = self.make_bzrdir(relpath, format=format)
 
2266
        return made_control.create_repository(shared=shared)
 
2267
 
 
2268
    def make_smart_server(self, path):
 
2269
        smart_server = server.SmartTCPServer_for_testing()
 
2270
        smart_server.setUp(self.get_server())
 
2271
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2272
        self.addCleanup(smart_server.tearDown)
 
2273
        return remote_transport
 
2274
 
 
2275
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2276
        """Create a branch on the default transport and a MemoryTree for it."""
 
2277
        b = self.make_branch(relpath, format=format)
 
2278
        return memorytree.MemoryTree.create_on_branch(b)
 
2279
 
 
2280
    def make_branch_builder(self, relpath, format=None):
 
2281
        branch = self.make_branch(relpath, format=format)
 
2282
        return branchbuilder.BranchBuilder(branch=branch)
 
2283
 
 
2284
    def overrideEnvironmentForTesting(self):
 
2285
        os.environ['HOME'] = self.test_home_dir
 
2286
        os.environ['BZR_HOME'] = self.test_home_dir
 
2287
 
 
2288
    def setUp(self):
 
2289
        super(TestCaseWithMemoryTransport, self).setUp()
 
2290
        self._make_test_root()
 
2291
        _currentdir = os.getcwdu()
 
2292
        def _leaveDirectory():
 
2293
            os.chdir(_currentdir)
 
2294
        self.addCleanup(_leaveDirectory)
 
2295
        self.makeAndChdirToTestDir()
 
2296
        self.overrideEnvironmentForTesting()
 
2297
        self.__readonly_server = None
 
2298
        self.__server = None
 
2299
        self.reduceLockdirTimeout()
 
2300
 
 
2301
    def setup_smart_server_with_call_log(self):
 
2302
        """Sets up a smart server as the transport server with a call log."""
 
2303
        self.transport_server = server.SmartTCPServer_for_testing
 
2304
        self.hpss_calls = []
 
2305
        import traceback
 
2306
        # Skip the current stack down to the caller of
 
2307
        # setup_smart_server_with_call_log
 
2308
        prefix_length = len(traceback.extract_stack()) - 2
 
2309
        def capture_hpss_call(params):
 
2310
            self.hpss_calls.append(
 
2311
                CapturedCall(params, prefix_length))
 
2312
        client._SmartClient.hooks.install_named_hook(
 
2313
            'call', capture_hpss_call, None)
 
2314
 
 
2315
    def reset_smart_call_log(self):
 
2316
        self.hpss_calls = []
 
2317
 
 
2318
 
 
2319
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
2320
    """Derived class that runs a test within a temporary directory.
 
2321
 
 
2322
    This is useful for tests that need to create a branch, etc.
 
2323
 
 
2324
    The directory is created in a slightly complex way: for each
 
2325
    Python invocation, a new temporary top-level directory is created.
 
2326
    All test cases create their own directory within that.  If the
 
2327
    tests complete successfully, the directory is removed.
 
2328
 
 
2329
    :ivar test_base_dir: The path of the top-level directory for this
 
2330
    test, which contains a home directory and a work directory.
 
2331
 
 
2332
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2333
    which is used as $HOME for this test.
 
2334
 
 
2335
    :ivar test_dir: A directory under test_base_dir used as the current
 
2336
    directory when the test proper is run.
 
2337
    """
 
2338
 
 
2339
    OVERRIDE_PYTHON = 'python'
146
2340
 
147
2341
    def check_file_contents(self, filename, expect):
148
2342
        self.log("check contents of file %s" % filename)
149
2343
        contents = file(filename, 'r').read()
150
2344
        if contents != expect:
151
 
            self.log("expected: %r" % expected)
 
2345
            self.log("expected: %r" % expect)
152
2346
            self.log("actually: %r" % contents)
153
 
            self.fail("contents of %s not as expected")
154
 
            
155
 
 
156
 
 
157
 
class InTempDir(TestBase):
158
 
    """Base class for tests run in a temporary branch."""
159
 
    def setUp(self):
160
 
        import os
161
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
2347
            self.fail("contents of %s not as expected" % filename)
 
2348
 
 
2349
    def _getTestDirPrefix(self):
 
2350
        # create a directory within the top level test directory
 
2351
        if sys.platform in ('win32', 'cygwin'):
 
2352
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2353
            # windows is likely to have path-length limits so use a short name
 
2354
            name_prefix = name_prefix[-30:]
 
2355
        else:
 
2356
            name_prefix = re.sub('[/]', '_', self.id())
 
2357
        return name_prefix
 
2358
 
 
2359
    def makeAndChdirToTestDir(self):
 
2360
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2361
 
 
2362
        For TestCaseInTempDir we create a temporary directory based on the test
 
2363
        name and then create two subdirs - test and home under it.
 
2364
        """
 
2365
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2366
            self._getTestDirPrefix())
 
2367
        name = name_prefix
 
2368
        for i in range(100):
 
2369
            if os.path.exists(name):
 
2370
                name = name_prefix + '_' + str(i)
 
2371
            else:
 
2372
                os.mkdir(name)
 
2373
                break
 
2374
        # now create test and home directories within this dir
 
2375
        self.test_base_dir = name
 
2376
        self.test_home_dir = self.test_base_dir + '/home'
 
2377
        os.mkdir(self.test_home_dir)
 
2378
        self.test_dir = self.test_base_dir + '/work'
162
2379
        os.mkdir(self.test_dir)
163
2380
        os.chdir(self.test_dir)
164
 
        
165
 
    def tearDown(self):
166
 
        import os
167
 
        os.chdir(self.TEST_ROOT)
168
 
 
169
 
 
170
 
 
171
 
 
172
 
 
173
 
class _MyResult(TestResult):
174
 
    """
175
 
    Custom TestResult.
176
 
 
177
 
    No special behaviour for now.
178
 
    """
179
 
    def __init__(self, out):
180
 
        self.out = out
181
 
        TestResult.__init__(self)
 
2381
        # put name of test inside
 
2382
        f = file(self.test_base_dir + '/name', 'w')
 
2383
        try:
 
2384
            f.write(self.id())
 
2385
        finally:
 
2386
            f.close()
 
2387
        self.addCleanup(self.deleteTestDir)
 
2388
 
 
2389
    def deleteTestDir(self):
 
2390
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2391
        _rmtree_temp_dir(self.test_base_dir)
 
2392
 
 
2393
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2394
        """Build a test tree according to a pattern.
 
2395
 
 
2396
        shape is a sequence of file specifications.  If the final
 
2397
        character is '/', a directory is created.
 
2398
 
 
2399
        This assumes that all the elements in the tree being built are new.
 
2400
 
 
2401
        This doesn't add anything to a branch.
 
2402
 
 
2403
        :type shape:    list or tuple.
 
2404
        :param line_endings: Either 'binary' or 'native'
 
2405
            in binary mode, exact contents are written in native mode, the
 
2406
            line endings match the default platform endings.
 
2407
        :param transport: A transport to write to, for building trees on VFS's.
 
2408
            If the transport is readonly or None, "." is opened automatically.
 
2409
        :return: None
 
2410
        """
 
2411
        if type(shape) not in (list, tuple):
 
2412
            raise AssertionError("Parameter 'shape' should be "
 
2413
                "a list or a tuple. Got %r instead" % (shape,))
 
2414
        # It's OK to just create them using forward slashes on windows.
 
2415
        if transport is None or transport.is_readonly():
 
2416
            transport = get_transport(".")
 
2417
        for name in shape:
 
2418
            self.assertIsInstance(name, basestring)
 
2419
            if name[-1] == '/':
 
2420
                transport.mkdir(urlutils.escape(name[:-1]))
 
2421
            else:
 
2422
                if line_endings == 'binary':
 
2423
                    end = '\n'
 
2424
                elif line_endings == 'native':
 
2425
                    end = os.linesep
 
2426
                else:
 
2427
                    raise errors.BzrError(
 
2428
                        'Invalid line ending request %r' % line_endings)
 
2429
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2430
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2431
 
 
2432
    def build_tree_contents(self, shape):
 
2433
        build_tree_contents(shape)
 
2434
 
 
2435
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2436
        """Assert whether path or paths are in the WorkingTree"""
 
2437
        if tree is None:
 
2438
            tree = workingtree.WorkingTree.open(root_path)
 
2439
        if not isinstance(path, basestring):
 
2440
            for p in path:
 
2441
                self.assertInWorkingTree(p, tree=tree)
 
2442
        else:
 
2443
            self.assertIsNot(tree.path2id(path), None,
 
2444
                path+' not in working tree.')
 
2445
 
 
2446
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2447
        """Assert whether path or paths are not in the WorkingTree"""
 
2448
        if tree is None:
 
2449
            tree = workingtree.WorkingTree.open(root_path)
 
2450
        if not isinstance(path, basestring):
 
2451
            for p in path:
 
2452
                self.assertNotInWorkingTree(p,tree=tree)
 
2453
        else:
 
2454
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2455
 
 
2456
 
 
2457
class TestCaseWithTransport(TestCaseInTempDir):
 
2458
    """A test case that provides get_url and get_readonly_url facilities.
 
2459
 
 
2460
    These back onto two transport servers, one for readonly access and one for
 
2461
    read write access.
 
2462
 
 
2463
    If no explicit class is provided for readonly access, a
 
2464
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2465
    based read write transports.
 
2466
 
 
2467
    If an explicit class is provided for readonly access, that server and the
 
2468
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2469
    """
 
2470
 
 
2471
    def get_vfs_only_server(self):
 
2472
        """See TestCaseWithMemoryTransport.
 
2473
 
 
2474
        This is useful for some tests with specific servers that need
 
2475
        diagnostics.
 
2476
        """
 
2477
        if self.__vfs_server is None:
 
2478
            self.__vfs_server = self.vfs_transport_factory()
 
2479
            self.__vfs_server.setUp()
 
2480
            self.addCleanup(self.__vfs_server.tearDown)
 
2481
        return self.__vfs_server
 
2482
 
 
2483
    def make_branch_and_tree(self, relpath, format=None):
 
2484
        """Create a branch on the transport and a tree locally.
 
2485
 
 
2486
        If the transport is not a LocalTransport, the Tree can't be created on
 
2487
        the transport.  In that case if the vfs_transport_factory is
 
2488
        LocalURLServer the working tree is created in the local
 
2489
        directory backing the transport, and the returned tree's branch and
 
2490
        repository will also be accessed locally. Otherwise a lightweight
 
2491
        checkout is created and returned.
 
2492
 
 
2493
        :param format: The BzrDirFormat.
 
2494
        :returns: the WorkingTree.
 
2495
        """
 
2496
        # TODO: always use the local disk path for the working tree,
 
2497
        # this obviously requires a format that supports branch references
 
2498
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2499
        # RBC 20060208
 
2500
        b = self.make_branch(relpath, format=format)
 
2501
        try:
 
2502
            return b.bzrdir.create_workingtree()
 
2503
        except errors.NotLocalUrl:
 
2504
            # We can only make working trees locally at the moment.  If the
 
2505
            # transport can't support them, then we keep the non-disk-backed
 
2506
            # branch and create a local checkout.
 
2507
            if self.vfs_transport_factory is LocalURLServer:
 
2508
                # the branch is colocated on disk, we cannot create a checkout.
 
2509
                # hopefully callers will expect this.
 
2510
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2511
                wt = local_controldir.create_workingtree()
 
2512
                if wt.branch._format != b._format:
 
2513
                    wt._branch = b
 
2514
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2515
                    # in case the implementation details of workingtree objects
 
2516
                    # change.
 
2517
                    self.assertIs(b, wt.branch)
 
2518
                return wt
 
2519
            else:
 
2520
                return b.create_checkout(relpath, lightweight=True)
 
2521
 
 
2522
    def assertIsDirectory(self, relpath, transport):
 
2523
        """Assert that relpath within transport is a directory.
 
2524
 
 
2525
        This may not be possible on all transports; in that case it propagates
 
2526
        a TransportNotPossible.
 
2527
        """
 
2528
        try:
 
2529
            mode = transport.stat(relpath).st_mode
 
2530
        except errors.NoSuchFile:
 
2531
            self.fail("path %s is not a directory; no such file"
 
2532
                      % (relpath))
 
2533
        if not stat.S_ISDIR(mode):
 
2534
            self.fail("path %s is not a directory; has mode %#o"
 
2535
                      % (relpath, mode))
 
2536
 
 
2537
    def assertTreesEqual(self, left, right):
 
2538
        """Check that left and right have the same content and properties."""
 
2539
        # we use a tree delta to check for equality of the content, and we
 
2540
        # manually check for equality of other things such as the parents list.
 
2541
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2542
        differences = left.changes_from(right)
 
2543
        self.assertFalse(differences.has_changed(),
 
2544
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2545
 
 
2546
    def setUp(self):
 
2547
        super(TestCaseWithTransport, self).setUp()
 
2548
        self.__vfs_server = None
 
2549
 
 
2550
 
 
2551
class ChrootedTestCase(TestCaseWithTransport):
 
2552
    """A support class that provides readonly urls outside the local namespace.
 
2553
 
 
2554
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2555
    is then we are chrooted already, if it is not then an HttpServer is used
 
2556
    for readonly urls.
 
2557
 
 
2558
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2559
                       be used without needed to redo it when a different
 
2560
                       subclass is in use ?
 
2561
    """
 
2562
 
 
2563
    def setUp(self):
 
2564
        super(ChrootedTestCase, self).setUp()
 
2565
        if not self.vfs_transport_factory == MemoryServer:
 
2566
            self.transport_readonly_server = HttpServer
 
2567
 
 
2568
 
 
2569
def condition_id_re(pattern):
 
2570
    """Create a condition filter which performs a re check on a test's id.
 
2571
 
 
2572
    :param pattern: A regular expression string.
 
2573
    :return: A callable that returns True if the re matches.
 
2574
    """
 
2575
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2576
        'test filter')
 
2577
    def condition(test):
 
2578
        test_id = test.id()
 
2579
        return filter_re.search(test_id)
 
2580
    return condition
 
2581
 
 
2582
 
 
2583
def condition_isinstance(klass_or_klass_list):
 
2584
    """Create a condition filter which returns isinstance(param, klass).
 
2585
 
 
2586
    :return: A callable which when called with one parameter obj return the
 
2587
        result of isinstance(obj, klass_or_klass_list).
 
2588
    """
 
2589
    def condition(obj):
 
2590
        return isinstance(obj, klass_or_klass_list)
 
2591
    return condition
 
2592
 
 
2593
 
 
2594
def condition_id_in_list(id_list):
 
2595
    """Create a condition filter which verify that test's id in a list.
 
2596
 
 
2597
    :param id_list: A TestIdList object.
 
2598
    :return: A callable that returns True if the test's id appears in the list.
 
2599
    """
 
2600
    def condition(test):
 
2601
        return id_list.includes(test.id())
 
2602
    return condition
 
2603
 
 
2604
 
 
2605
def condition_id_startswith(starts):
 
2606
    """Create a condition filter verifying that test's id starts with a string.
 
2607
 
 
2608
    :param starts: A list of string.
 
2609
    :return: A callable that returns True if the test's id starts with one of
 
2610
        the given strings.
 
2611
    """
 
2612
    def condition(test):
 
2613
        for start in starts:
 
2614
            if test.id().startswith(start):
 
2615
                return True
 
2616
        return False
 
2617
    return condition
 
2618
 
 
2619
 
 
2620
def exclude_tests_by_condition(suite, condition):
 
2621
    """Create a test suite which excludes some tests from suite.
 
2622
 
 
2623
    :param suite: The suite to get tests from.
 
2624
    :param condition: A callable whose result evaluates True when called with a
 
2625
        test case which should be excluded from the result.
 
2626
    :return: A suite which contains the tests found in suite that fail
 
2627
        condition.
 
2628
    """
 
2629
    result = []
 
2630
    for test in iter_suite_tests(suite):
 
2631
        if not condition(test):
 
2632
            result.append(test)
 
2633
    return TestUtil.TestSuite(result)
 
2634
 
 
2635
 
 
2636
def filter_suite_by_condition(suite, condition):
 
2637
    """Create a test suite by filtering another one.
 
2638
 
 
2639
    :param suite: The source suite.
 
2640
    :param condition: A callable whose result evaluates True when called with a
 
2641
        test case which should be included in the result.
 
2642
    :return: A suite which contains the tests found in suite that pass
 
2643
        condition.
 
2644
    """
 
2645
    result = []
 
2646
    for test in iter_suite_tests(suite):
 
2647
        if condition(test):
 
2648
            result.append(test)
 
2649
    return TestUtil.TestSuite(result)
 
2650
 
 
2651
 
 
2652
def filter_suite_by_re(suite, pattern):
 
2653
    """Create a test suite by filtering another one.
 
2654
 
 
2655
    :param suite:           the source suite
 
2656
    :param pattern:         pattern that names must match
 
2657
    :returns: the newly created suite
 
2658
    """
 
2659
    condition = condition_id_re(pattern)
 
2660
    result_suite = filter_suite_by_condition(suite, condition)
 
2661
    return result_suite
 
2662
 
 
2663
 
 
2664
def filter_suite_by_id_list(suite, test_id_list):
 
2665
    """Create a test suite by filtering another one.
 
2666
 
 
2667
    :param suite: The source suite.
 
2668
    :param test_id_list: A list of the test ids to keep as strings.
 
2669
    :returns: the newly created suite
 
2670
    """
 
2671
    condition = condition_id_in_list(test_id_list)
 
2672
    result_suite = filter_suite_by_condition(suite, condition)
 
2673
    return result_suite
 
2674
 
 
2675
 
 
2676
def filter_suite_by_id_startswith(suite, start):
 
2677
    """Create a test suite by filtering another one.
 
2678
 
 
2679
    :param suite: The source suite.
 
2680
    :param start: A list of string the test id must start with one of.
 
2681
    :returns: the newly created suite
 
2682
    """
 
2683
    condition = condition_id_startswith(start)
 
2684
    result_suite = filter_suite_by_condition(suite, condition)
 
2685
    return result_suite
 
2686
 
 
2687
 
 
2688
def exclude_tests_by_re(suite, pattern):
 
2689
    """Create a test suite which excludes some tests from suite.
 
2690
 
 
2691
    :param suite: The suite to get tests from.
 
2692
    :param pattern: A regular expression string. Test ids that match this
 
2693
        pattern will be excluded from the result.
 
2694
    :return: A TestSuite that contains all the tests from suite without the
 
2695
        tests that matched pattern. The order of tests is the same as it was in
 
2696
        suite.
 
2697
    """
 
2698
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2699
 
 
2700
 
 
2701
def preserve_input(something):
 
2702
    """A helper for performing test suite transformation chains.
 
2703
 
 
2704
    :param something: Anything you want to preserve.
 
2705
    :return: Something.
 
2706
    """
 
2707
    return something
 
2708
 
 
2709
 
 
2710
def randomize_suite(suite):
 
2711
    """Return a new TestSuite with suite's tests in random order.
 
2712
 
 
2713
    The tests in the input suite are flattened into a single suite in order to
 
2714
    accomplish this. Any nested TestSuites are removed to provide global
 
2715
    randomness.
 
2716
    """
 
2717
    tests = list(iter_suite_tests(suite))
 
2718
    random.shuffle(tests)
 
2719
    return TestUtil.TestSuite(tests)
 
2720
 
 
2721
 
 
2722
def split_suite_by_condition(suite, condition):
 
2723
    """Split a test suite into two by a condition.
 
2724
 
 
2725
    :param suite: The suite to split.
 
2726
    :param condition: The condition to match on. Tests that match this
 
2727
        condition are returned in the first test suite, ones that do not match
 
2728
        are in the second suite.
 
2729
    :return: A tuple of two test suites, where the first contains tests from
 
2730
        suite matching the condition, and the second contains the remainder
 
2731
        from suite. The order within each output suite is the same as it was in
 
2732
        suite.
 
2733
    """
 
2734
    matched = []
 
2735
    did_not_match = []
 
2736
    for test in iter_suite_tests(suite):
 
2737
        if condition(test):
 
2738
            matched.append(test)
 
2739
        else:
 
2740
            did_not_match.append(test)
 
2741
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2742
 
 
2743
 
 
2744
def split_suite_by_re(suite, pattern):
 
2745
    """Split a test suite into two by a regular expression.
 
2746
 
 
2747
    :param suite: The suite to split.
 
2748
    :param pattern: A regular expression string. Test ids that match this
 
2749
        pattern will be in the first test suite returned, and the others in the
 
2750
        second test suite returned.
 
2751
    :return: A tuple of two test suites, where the first contains tests from
 
2752
        suite matching pattern, and the second contains the remainder from
 
2753
        suite. The order within each output suite is the same as it was in
 
2754
        suite.
 
2755
    """
 
2756
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2757
 
 
2758
 
 
2759
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2760
              stop_on_failure=False,
 
2761
              transport=None, lsprof_timed=None, bench_history=None,
 
2762
              matching_tests_first=None,
 
2763
              list_only=False,
 
2764
              random_seed=None,
 
2765
              exclude_pattern=None,
 
2766
              strict=False,
 
2767
              runner_class=None,
 
2768
              suite_decorators=None,
 
2769
              stream=None):
 
2770
    """Run a test suite for bzr selftest.
 
2771
 
 
2772
    :param runner_class: The class of runner to use. Must support the
 
2773
        constructor arguments passed by run_suite which are more than standard
 
2774
        python uses.
 
2775
    :return: A boolean indicating success.
 
2776
    """
 
2777
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2778
    if verbose:
 
2779
        verbosity = 2
 
2780
    else:
 
2781
        verbosity = 1
 
2782
    if runner_class is None:
 
2783
        runner_class = TextTestRunner
 
2784
    if stream is None:
 
2785
        stream = sys.stdout
 
2786
    runner = runner_class(stream=stream,
 
2787
                            descriptions=0,
 
2788
                            verbosity=verbosity,
 
2789
                            bench_history=bench_history,
 
2790
                            list_only=list_only,
 
2791
                            strict=strict,
 
2792
                            )
 
2793
    runner.stop_on_failure=stop_on_failure
 
2794
    # built in decorator factories:
 
2795
    decorators = [
 
2796
        random_order(random_seed, runner),
 
2797
        exclude_tests(exclude_pattern),
 
2798
        ]
 
2799
    if matching_tests_first:
 
2800
        decorators.append(tests_first(pattern))
 
2801
    else:
 
2802
        decorators.append(filter_tests(pattern))
 
2803
    if suite_decorators:
 
2804
        decorators.extend(suite_decorators)
 
2805
    # tell the result object how many tests will be running: (except if
 
2806
    # --parallel=fork is being used. Robert said he will provide a better
 
2807
    # progress design later -- vila 20090817)
 
2808
    if fork_decorator not in decorators:
 
2809
        decorators.append(CountingDecorator)
 
2810
    for decorator in decorators:
 
2811
        suite = decorator(suite)
 
2812
    result = runner.run(suite)
 
2813
    if list_only:
 
2814
        return True
 
2815
    result.done()
 
2816
    if strict:
 
2817
        return result.wasStrictlySuccessful()
 
2818
    else:
 
2819
        return result.wasSuccessful()
 
2820
 
 
2821
 
 
2822
# A registry where get() returns a suite decorator.
 
2823
parallel_registry = registry.Registry()
 
2824
 
 
2825
 
 
2826
def fork_decorator(suite):
 
2827
    concurrency = osutils.local_concurrency()
 
2828
    if concurrency == 1:
 
2829
        return suite
 
2830
    from testtools import ConcurrentTestSuite
 
2831
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2832
parallel_registry.register('fork', fork_decorator)
 
2833
 
 
2834
 
 
2835
def subprocess_decorator(suite):
 
2836
    concurrency = osutils.local_concurrency()
 
2837
    if concurrency == 1:
 
2838
        return suite
 
2839
    from testtools import ConcurrentTestSuite
 
2840
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2841
parallel_registry.register('subprocess', subprocess_decorator)
 
2842
 
 
2843
 
 
2844
def exclude_tests(exclude_pattern):
 
2845
    """Return a test suite decorator that excludes tests."""
 
2846
    if exclude_pattern is None:
 
2847
        return identity_decorator
 
2848
    def decorator(suite):
 
2849
        return ExcludeDecorator(suite, exclude_pattern)
 
2850
    return decorator
 
2851
 
 
2852
 
 
2853
def filter_tests(pattern):
 
2854
    if pattern == '.*':
 
2855
        return identity_decorator
 
2856
    def decorator(suite):
 
2857
        return FilterTestsDecorator(suite, pattern)
 
2858
    return decorator
 
2859
 
 
2860
 
 
2861
def random_order(random_seed, runner):
 
2862
    """Return a test suite decorator factory for randomising tests order.
 
2863
    
 
2864
    :param random_seed: now, a string which casts to a long, or a long.
 
2865
    :param runner: A test runner with a stream attribute to report on.
 
2866
    """
 
2867
    if random_seed is None:
 
2868
        return identity_decorator
 
2869
    def decorator(suite):
 
2870
        return RandomDecorator(suite, random_seed, runner.stream)
 
2871
    return decorator
 
2872
 
 
2873
 
 
2874
def tests_first(pattern):
 
2875
    if pattern == '.*':
 
2876
        return identity_decorator
 
2877
    def decorator(suite):
 
2878
        return TestFirstDecorator(suite, pattern)
 
2879
    return decorator
 
2880
 
 
2881
 
 
2882
def identity_decorator(suite):
 
2883
    """Return suite."""
 
2884
    return suite
 
2885
 
 
2886
 
 
2887
class TestDecorator(TestSuite):
 
2888
    """A decorator for TestCase/TestSuite objects.
 
2889
    
 
2890
    Usually, subclasses should override __iter__(used when flattening test
 
2891
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2892
    debug().
 
2893
    """
 
2894
 
 
2895
    def __init__(self, suite):
 
2896
        TestSuite.__init__(self)
 
2897
        self.addTest(suite)
 
2898
 
 
2899
    def countTestCases(self):
 
2900
        cases = 0
 
2901
        for test in self:
 
2902
            cases += test.countTestCases()
 
2903
        return cases
 
2904
 
 
2905
    def debug(self):
 
2906
        for test in self:
 
2907
            test.debug()
 
2908
 
 
2909
    def run(self, result):
 
2910
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2911
        # into __iter__.
 
2912
        for test in self:
 
2913
            if result.shouldStop:
 
2914
                break
 
2915
            test.run(result)
 
2916
        return result
 
2917
 
 
2918
 
 
2919
class CountingDecorator(TestDecorator):
 
2920
    """A decorator which calls result.progress(self.countTestCases)."""
 
2921
 
 
2922
    def run(self, result):
 
2923
        progress_method = getattr(result, 'progress', None)
 
2924
        if callable(progress_method):
 
2925
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
2926
        return super(CountingDecorator, self).run(result)
 
2927
 
 
2928
 
 
2929
class ExcludeDecorator(TestDecorator):
 
2930
    """A decorator which excludes test matching an exclude pattern."""
 
2931
 
 
2932
    def __init__(self, suite, exclude_pattern):
 
2933
        TestDecorator.__init__(self, suite)
 
2934
        self.exclude_pattern = exclude_pattern
 
2935
        self.excluded = False
 
2936
 
 
2937
    def __iter__(self):
 
2938
        if self.excluded:
 
2939
            return iter(self._tests)
 
2940
        self.excluded = True
 
2941
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2942
        del self._tests[:]
 
2943
        self.addTests(suite)
 
2944
        return iter(self._tests)
 
2945
 
 
2946
 
 
2947
class FilterTestsDecorator(TestDecorator):
 
2948
    """A decorator which filters tests to those matching a pattern."""
 
2949
 
 
2950
    def __init__(self, suite, pattern):
 
2951
        TestDecorator.__init__(self, suite)
 
2952
        self.pattern = pattern
 
2953
        self.filtered = False
 
2954
 
 
2955
    def __iter__(self):
 
2956
        if self.filtered:
 
2957
            return iter(self._tests)
 
2958
        self.filtered = True
 
2959
        suite = filter_suite_by_re(self, self.pattern)
 
2960
        del self._tests[:]
 
2961
        self.addTests(suite)
 
2962
        return iter(self._tests)
 
2963
 
 
2964
 
 
2965
class RandomDecorator(TestDecorator):
 
2966
    """A decorator which randomises the order of its tests."""
 
2967
 
 
2968
    def __init__(self, suite, random_seed, stream):
 
2969
        TestDecorator.__init__(self, suite)
 
2970
        self.random_seed = random_seed
 
2971
        self.randomised = False
 
2972
        self.stream = stream
 
2973
 
 
2974
    def __iter__(self):
 
2975
        if self.randomised:
 
2976
            return iter(self._tests)
 
2977
        self.randomised = True
 
2978
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
2979
            (self.actual_seed()))
 
2980
        # Initialise the random number generator.
 
2981
        random.seed(self.actual_seed())
 
2982
        suite = randomize_suite(self)
 
2983
        del self._tests[:]
 
2984
        self.addTests(suite)
 
2985
        return iter(self._tests)
 
2986
 
 
2987
    def actual_seed(self):
 
2988
        if self.random_seed == "now":
 
2989
            # We convert the seed to a long to make it reuseable across
 
2990
            # invocations (because the user can reenter it).
 
2991
            self.random_seed = long(time.time())
 
2992
        else:
 
2993
            # Convert the seed to a long if we can
 
2994
            try:
 
2995
                self.random_seed = long(self.random_seed)
 
2996
            except:
 
2997
                pass
 
2998
        return self.random_seed
 
2999
 
 
3000
 
 
3001
class TestFirstDecorator(TestDecorator):
 
3002
    """A decorator which moves named tests to the front."""
 
3003
 
 
3004
    def __init__(self, suite, pattern):
 
3005
        TestDecorator.__init__(self, suite)
 
3006
        self.pattern = pattern
 
3007
        self.filtered = False
 
3008
 
 
3009
    def __iter__(self):
 
3010
        if self.filtered:
 
3011
            return iter(self._tests)
 
3012
        self.filtered = True
 
3013
        suites = split_suite_by_re(self, self.pattern)
 
3014
        del self._tests[:]
 
3015
        self.addTests(suites)
 
3016
        return iter(self._tests)
 
3017
 
 
3018
 
 
3019
def partition_tests(suite, count):
 
3020
    """Partition suite into count lists of tests."""
 
3021
    result = []
 
3022
    tests = list(iter_suite_tests(suite))
 
3023
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3024
    for block in range(count):
 
3025
        low_test = block * tests_per_process
 
3026
        high_test = low_test + tests_per_process
 
3027
        process_tests = tests[low_test:high_test]
 
3028
        result.append(process_tests)
 
3029
    return result
 
3030
 
 
3031
 
 
3032
def fork_for_tests(suite):
 
3033
    """Take suite and start up one runner per CPU by forking()
 
3034
 
 
3035
    :return: An iterable of TestCase-like objects which can each have
 
3036
        run(result) called on them to feed tests to result.
 
3037
    """
 
3038
    concurrency = osutils.local_concurrency()
 
3039
    result = []
 
3040
    from subunit import TestProtocolClient, ProtocolTestCase
 
3041
    try:
 
3042
        from subunit.test_results import AutoTimingTestResultDecorator
 
3043
    except ImportError:
 
3044
        AutoTimingTestResultDecorator = lambda x:x
 
3045
    class TestInOtherProcess(ProtocolTestCase):
 
3046
        # Should be in subunit, I think. RBC.
 
3047
        def __init__(self, stream, pid):
 
3048
            ProtocolTestCase.__init__(self, stream)
 
3049
            self.pid = pid
 
3050
 
 
3051
        def run(self, result):
 
3052
            try:
 
3053
                ProtocolTestCase.run(self, result)
 
3054
            finally:
 
3055
                os.waitpid(self.pid, os.WNOHANG)
 
3056
 
 
3057
    test_blocks = partition_tests(suite, concurrency)
 
3058
    for process_tests in test_blocks:
 
3059
        process_suite = TestSuite()
 
3060
        process_suite.addTests(process_tests)
 
3061
        c2pread, c2pwrite = os.pipe()
 
3062
        pid = os.fork()
 
3063
        if pid == 0:
 
3064
            try:
 
3065
                os.close(c2pread)
 
3066
                # Leave stderr and stdout open so we can see test noise
 
3067
                # Close stdin so that the child goes away if it decides to
 
3068
                # read from stdin (otherwise its a roulette to see what
 
3069
                # child actually gets keystrokes for pdb etc).
 
3070
                sys.stdin.close()
 
3071
                sys.stdin = None
 
3072
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3073
                subunit_result = AutoTimingTestResultDecorator(
 
3074
                    TestProtocolClient(stream))
 
3075
                process_suite.run(subunit_result)
 
3076
            finally:
 
3077
                os._exit(0)
 
3078
        else:
 
3079
            os.close(c2pwrite)
 
3080
            stream = os.fdopen(c2pread, 'rb', 1)
 
3081
            test = TestInOtherProcess(stream, pid)
 
3082
            result.append(test)
 
3083
    return result
 
3084
 
 
3085
 
 
3086
def reinvoke_for_tests(suite):
 
3087
    """Take suite and start up one runner per CPU using subprocess().
 
3088
 
 
3089
    :return: An iterable of TestCase-like objects which can each have
 
3090
        run(result) called on them to feed tests to result.
 
3091
    """
 
3092
    concurrency = osutils.local_concurrency()
 
3093
    result = []
 
3094
    from subunit import ProtocolTestCase
 
3095
    class TestInSubprocess(ProtocolTestCase):
 
3096
        def __init__(self, process, name):
 
3097
            ProtocolTestCase.__init__(self, process.stdout)
 
3098
            self.process = process
 
3099
            self.process.stdin.close()
 
3100
            self.name = name
 
3101
 
 
3102
        def run(self, result):
 
3103
            try:
 
3104
                ProtocolTestCase.run(self, result)
 
3105
            finally:
 
3106
                self.process.wait()
 
3107
                os.unlink(self.name)
 
3108
            # print "pid %d finished" % finished_process
 
3109
    test_blocks = partition_tests(suite, concurrency)
 
3110
    for process_tests in test_blocks:
 
3111
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3112
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3113
        if not os.path.isfile(bzr_path):
 
3114
            # We are probably installed. Assume sys.argv is the right file
 
3115
            bzr_path = sys.argv[0]
 
3116
        fd, test_list_file_name = tempfile.mkstemp()
 
3117
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3118
        for test in process_tests:
 
3119
            test_list_file.write(test.id() + '\n')
 
3120
        test_list_file.close()
 
3121
        try:
 
3122
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3123
                '--subunit']
 
3124
            if '--no-plugins' in sys.argv:
 
3125
                argv.append('--no-plugins')
 
3126
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3127
            # stderr it can interrupt the subunit protocol.
 
3128
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3129
                bufsize=1)
 
3130
            test = TestInSubprocess(process, test_list_file_name)
 
3131
            result.append(test)
 
3132
        except:
 
3133
            os.unlink(test_list_file_name)
 
3134
            raise
 
3135
    return result
 
3136
 
 
3137
 
 
3138
class BZRTransformingResult(unittest.TestResult):
 
3139
 
 
3140
    def __init__(self, target):
 
3141
        unittest.TestResult.__init__(self)
 
3142
        self.result = target
182
3143
 
183
3144
    def startTest(self, test):
184
 
        # TODO: Maybe show test.shortDescription somewhere?
185
 
        print >>self.out, '%-60.60s' % test.id(),
186
 
        self.out.flush()
187
 
        TestResult.startTest(self, test)
 
3145
        self.result.startTest(test)
188
3146
 
189
3147
    def stopTest(self, test):
190
 
        # print
191
 
        TestResult.stopTest(self, test)
192
 
 
 
3148
        self.result.stopTest(test)
193
3149
 
194
3150
    def addError(self, test, err):
195
 
        print >>self.out, 'ERROR'
196
 
        TestResult.addError(self, test, err)
197
 
        _show_test_failure('error', test, err, self.out)
 
3151
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3152
        if feature is not None:
 
3153
            self.result.addNotSupported(test, feature)
 
3154
        else:
 
3155
            self.result.addError(test, err)
198
3156
 
199
3157
    def addFailure(self, test, err):
200
 
        print >>self.out, 'FAILURE'
201
 
        TestResult.addFailure(self, test, err)
202
 
        _show_test_failure('failure', test, err, self.out)
 
3158
        known = self._error_looks_like('KnownFailure: ', err)
 
3159
        if known is not None:
 
3160
            self.result._addKnownFailure(test, [KnownFailure,
 
3161
                                                KnownFailure(known), None])
 
3162
        else:
 
3163
            self.result.addFailure(test, err)
 
3164
 
 
3165
    def addSkip(self, test, reason):
 
3166
        self.result.addSkip(test, reason)
203
3167
 
204
3168
    def addSuccess(self, test):
205
 
        print >>self.out, 'OK'
206
 
        TestResult.addSuccess(self, test)
207
 
 
208
 
 
209
 
 
210
 
def selftest():
211
 
    from unittest import TestLoader, TestSuite
212
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch, bzrlib.osutils, bzrlib.commands
213
 
 
214
 
    import bzrlib.selftest.whitebox
215
 
    import bzrlib.selftest.blackbox
216
 
    import bzrlib.selftest.versioning
217
 
    from doctest import DocTestSuite
218
 
    import os
219
 
    import shutil
220
 
    import time
221
 
    import sys
222
 
 
223
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
224
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
225
 
 
226
 
    _setup_test_log()
227
 
    _setup_test_dir()
228
 
    print
229
 
 
230
 
    suite = TestSuite()
231
 
    tl = TestLoader()
232
 
 
233
 
    for m in bzrlib.selftest.whitebox, \
234
 
            bzrlib.selftest.versioning:
235
 
        suite.addTest(tl.loadTestsFromModule(m))
236
 
 
237
 
    for m in bzrlib.store, bzrlib.inventory, bzrlib.branch, bzrlib.osutils, \
238
 
            bzrlib.commands:
239
 
        suite.addTest(DocTestSuite(m))
240
 
 
241
 
    suite.addTest(bzrlib.selftest.blackbox.suite())
242
 
 
243
 
    # save stdout & stderr so there's no leakage from code-under-test
244
 
    real_stdout = sys.stdout
245
 
    real_stderr = sys.stderr
246
 
    sys.stdout = sys.stderr = TestBase.TEST_LOG
 
3169
        self.result.addSuccess(test)
 
3170
 
 
3171
    def _error_looks_like(self, prefix, err):
 
3172
        """Deserialize exception and returns the stringify value."""
 
3173
        import subunit
 
3174
        value = None
 
3175
        typ, exc, _ = err
 
3176
        if isinstance(exc, subunit.RemoteException):
 
3177
            # stringify the exception gives access to the remote traceback
 
3178
            # We search the last line for 'prefix'
 
3179
            lines = str(exc).split('\n')
 
3180
            while lines and not lines[-1]:
 
3181
                lines.pop(-1)
 
3182
            if lines:
 
3183
                if lines[-1].startswith(prefix):
 
3184
                    value = lines[-1][len(prefix):]
 
3185
        return value
 
3186
 
 
3187
 
 
3188
# Controlled by "bzr selftest -E=..." option
 
3189
# Currently supported:
 
3190
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3191
#                           preserves any flags supplied at the command line.
 
3192
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3193
#                           rather than failing tests. And no longer raise
 
3194
#                           LockContention when fctnl locks are not being used
 
3195
#                           with proper exclusion rules.
 
3196
selftest_debug_flags = set()
 
3197
 
 
3198
 
 
3199
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3200
             transport=None,
 
3201
             test_suite_factory=None,
 
3202
             lsprof_timed=None,
 
3203
             bench_history=None,
 
3204
             matching_tests_first=None,
 
3205
             list_only=False,
 
3206
             random_seed=None,
 
3207
             exclude_pattern=None,
 
3208
             strict=False,
 
3209
             load_list=None,
 
3210
             debug_flags=None,
 
3211
             starting_with=None,
 
3212
             runner_class=None,
 
3213
             suite_decorators=None,
 
3214
             stream=None,
 
3215
             ):
 
3216
    """Run the whole test suite under the enhanced runner"""
 
3217
    # XXX: Very ugly way to do this...
 
3218
    # Disable warning about old formats because we don't want it to disturb
 
3219
    # any blackbox tests.
 
3220
    from bzrlib import repository
 
3221
    repository._deprecation_warning_done = True
 
3222
 
 
3223
    global default_transport
 
3224
    if transport is None:
 
3225
        transport = default_transport
 
3226
    old_transport = default_transport
 
3227
    default_transport = transport
 
3228
    global selftest_debug_flags
 
3229
    old_debug_flags = selftest_debug_flags
 
3230
    if debug_flags is not None:
 
3231
        selftest_debug_flags = set(debug_flags)
247
3232
    try:
248
 
        result = _MyResult(real_stdout)
249
 
        suite.run(result)
 
3233
        if load_list is None:
 
3234
            keep_only = None
 
3235
        else:
 
3236
            keep_only = load_test_id_list(load_list)
 
3237
        if starting_with:
 
3238
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3239
                             for start in starting_with]
 
3240
        if test_suite_factory is None:
 
3241
            # Reduce loading time by loading modules based on the starting_with
 
3242
            # patterns.
 
3243
            suite = test_suite(keep_only, starting_with)
 
3244
        else:
 
3245
            suite = test_suite_factory()
 
3246
        if starting_with:
 
3247
            # But always filter as requested.
 
3248
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3249
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3250
                     stop_on_failure=stop_on_failure,
 
3251
                     transport=transport,
 
3252
                     lsprof_timed=lsprof_timed,
 
3253
                     bench_history=bench_history,
 
3254
                     matching_tests_first=matching_tests_first,
 
3255
                     list_only=list_only,
 
3256
                     random_seed=random_seed,
 
3257
                     exclude_pattern=exclude_pattern,
 
3258
                     strict=strict,
 
3259
                     runner_class=runner_class,
 
3260
                     suite_decorators=suite_decorators,
 
3261
                     stream=stream,
 
3262
                     )
250
3263
    finally:
251
 
        sys.stdout = real_stdout
252
 
        sys.stderr = real_stderr
253
 
 
254
 
    _show_results(result)
255
 
 
256
 
    return result.wasSuccessful()
257
 
 
258
 
 
259
 
 
260
 
 
261
 
def _setup_test_log():
262
 
    import time
263
 
    import os
264
 
    
265
 
    log_filename = os.path.abspath('testbzr.log')
266
 
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
267
 
 
268
 
    print >>TestBase.TEST_LOG, "bzr tests run at " + time.ctime()
269
 
    print '%-30s %s' % ('test log', log_filename)
270
 
 
271
 
 
272
 
def _setup_test_dir():
273
 
    import os
274
 
    import shutil
275
 
    
276
 
    TestBase.ORIG_DIR = os.getcwdu()
277
 
    TestBase.TEST_ROOT = os.path.abspath("testbzr.tmp")
278
 
 
279
 
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
280
 
 
281
 
    if os.path.exists(TestBase.TEST_ROOT):
282
 
        shutil.rmtree(TestBase.TEST_ROOT)
283
 
    os.mkdir(TestBase.TEST_ROOT)
284
 
    os.chdir(TestBase.TEST_ROOT)
285
 
 
286
 
    # make a fake bzr directory there to prevent any tests propagating
287
 
    # up onto the source directory's real branch
288
 
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
289
 
 
290
 
    
291
 
 
292
 
def _show_results(result):
293
 
     print
294
 
     print '%4d tests run' % result.testsRun
295
 
     print '%4d errors' % len(result.errors)
296
 
     print '%4d failures' % len(result.failures)
297
 
 
298
 
 
299
 
 
300
 
def _show_test_failure(kind, case, exc_info, out):
301
 
    from traceback import print_exception
302
 
    
303
 
    print >>out, '-' * 60
304
 
    print >>out, case
305
 
    
306
 
    desc = case.shortDescription()
307
 
    if desc:
308
 
        print >>out, '   (%s)' % desc
309
 
         
310
 
    print_exception(exc_info[0], exc_info[1], exc_info[2], None, out)
311
 
        
312
 
    if isinstance(case, TestBase):
313
 
        print >>out
314
 
        print >>out, 'log from this test:'
315
 
        print >>out, case._log_buf
316
 
         
317
 
    print >>out, '-' * 60
318
 
    
319
 
 
 
3264
        default_transport = old_transport
 
3265
        selftest_debug_flags = old_debug_flags
 
3266
 
 
3267
 
 
3268
def load_test_id_list(file_name):
 
3269
    """Load a test id list from a text file.
 
3270
 
 
3271
    The format is one test id by line.  No special care is taken to impose
 
3272
    strict rules, these test ids are used to filter the test suite so a test id
 
3273
    that do not match an existing test will do no harm. This allows user to add
 
3274
    comments, leave blank lines, etc.
 
3275
    """
 
3276
    test_list = []
 
3277
    try:
 
3278
        ftest = open(file_name, 'rt')
 
3279
    except IOError, e:
 
3280
        if e.errno != errno.ENOENT:
 
3281
            raise
 
3282
        else:
 
3283
            raise errors.NoSuchFile(file_name)
 
3284
 
 
3285
    for test_name in ftest.readlines():
 
3286
        test_list.append(test_name.strip())
 
3287
    ftest.close()
 
3288
    return test_list
 
3289
 
 
3290
 
 
3291
def suite_matches_id_list(test_suite, id_list):
 
3292
    """Warns about tests not appearing or appearing more than once.
 
3293
 
 
3294
    :param test_suite: A TestSuite object.
 
3295
    :param test_id_list: The list of test ids that should be found in
 
3296
         test_suite.
 
3297
 
 
3298
    :return: (absents, duplicates) absents is a list containing the test found
 
3299
        in id_list but not in test_suite, duplicates is a list containing the
 
3300
        test found multiple times in test_suite.
 
3301
 
 
3302
    When using a prefined test id list, it may occurs that some tests do not
 
3303
    exist anymore or that some tests use the same id. This function warns the
 
3304
    tester about potential problems in his workflow (test lists are volatile)
 
3305
    or in the test suite itself (using the same id for several tests does not
 
3306
    help to localize defects).
 
3307
    """
 
3308
    # Build a dict counting id occurrences
 
3309
    tests = dict()
 
3310
    for test in iter_suite_tests(test_suite):
 
3311
        id = test.id()
 
3312
        tests[id] = tests.get(id, 0) + 1
 
3313
 
 
3314
    not_found = []
 
3315
    duplicates = []
 
3316
    for id in id_list:
 
3317
        occurs = tests.get(id, 0)
 
3318
        if not occurs:
 
3319
            not_found.append(id)
 
3320
        elif occurs > 1:
 
3321
            duplicates.append(id)
 
3322
 
 
3323
    return not_found, duplicates
 
3324
 
 
3325
 
 
3326
class TestIdList(object):
 
3327
    """Test id list to filter a test suite.
 
3328
 
 
3329
    Relying on the assumption that test ids are built as:
 
3330
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3331
    notation, this class offers methods to :
 
3332
    - avoid building a test suite for modules not refered to in the test list,
 
3333
    - keep only the tests listed from the module test suite.
 
3334
    """
 
3335
 
 
3336
    def __init__(self, test_id_list):
 
3337
        # When a test suite needs to be filtered against us we compare test ids
 
3338
        # for equality, so a simple dict offers a quick and simple solution.
 
3339
        self.tests = dict().fromkeys(test_id_list, True)
 
3340
 
 
3341
        # While unittest.TestCase have ids like:
 
3342
        # <module>.<class>.<method>[(<param+)],
 
3343
        # doctest.DocTestCase can have ids like:
 
3344
        # <module>
 
3345
        # <module>.<class>
 
3346
        # <module>.<function>
 
3347
        # <module>.<class>.<method>
 
3348
 
 
3349
        # Since we can't predict a test class from its name only, we settle on
 
3350
        # a simple constraint: a test id always begins with its module name.
 
3351
 
 
3352
        modules = {}
 
3353
        for test_id in test_id_list:
 
3354
            parts = test_id.split('.')
 
3355
            mod_name = parts.pop(0)
 
3356
            modules[mod_name] = True
 
3357
            for part in parts:
 
3358
                mod_name += '.' + part
 
3359
                modules[mod_name] = True
 
3360
        self.modules = modules
 
3361
 
 
3362
    def refers_to(self, module_name):
 
3363
        """Is there tests for the module or one of its sub modules."""
 
3364
        return self.modules.has_key(module_name)
 
3365
 
 
3366
    def includes(self, test_id):
 
3367
        return self.tests.has_key(test_id)
 
3368
 
 
3369
 
 
3370
class TestPrefixAliasRegistry(registry.Registry):
 
3371
    """A registry for test prefix aliases.
 
3372
 
 
3373
    This helps implement shorcuts for the --starting-with selftest
 
3374
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3375
    warning will be emitted).
 
3376
    """
 
3377
 
 
3378
    def register(self, key, obj, help=None, info=None,
 
3379
                 override_existing=False):
 
3380
        """See Registry.register.
 
3381
 
 
3382
        Trying to override an existing alias causes a warning to be emitted,
 
3383
        not a fatal execption.
 
3384
        """
 
3385
        try:
 
3386
            super(TestPrefixAliasRegistry, self).register(
 
3387
                key, obj, help=help, info=info, override_existing=False)
 
3388
        except KeyError:
 
3389
            actual = self.get(key)
 
3390
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3391
                 % (key, actual, obj))
 
3392
 
 
3393
    def resolve_alias(self, id_start):
 
3394
        """Replace the alias by the prefix in the given string.
 
3395
 
 
3396
        Using an unknown prefix is an error to help catching typos.
 
3397
        """
 
3398
        parts = id_start.split('.')
 
3399
        try:
 
3400
            parts[0] = self.get(parts[0])
 
3401
        except KeyError:
 
3402
            raise errors.BzrCommandError(
 
3403
                '%s is not a known test prefix alias' % parts[0])
 
3404
        return '.'.join(parts)
 
3405
 
 
3406
 
 
3407
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3408
"""Registry of test prefix aliases."""
 
3409
 
 
3410
 
 
3411
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3412
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3413
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3414
 
 
3415
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3416
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3417
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3418
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3419
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3420
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3421
 
 
3422
 
 
3423
def test_suite(keep_only=None, starting_with=None):
 
3424
    """Build and return TestSuite for the whole of bzrlib.
 
3425
 
 
3426
    :param keep_only: A list of test ids limiting the suite returned.
 
3427
 
 
3428
    :param starting_with: An id limiting the suite returned to the tests
 
3429
         starting with it.
 
3430
 
 
3431
    This function can be replaced if you need to change the default test
 
3432
    suite on a global basis, but it is not encouraged.
 
3433
    """
 
3434
    testmod_names = [
 
3435
                   'bzrlib.doc',
 
3436
                   'bzrlib.tests.blackbox',
 
3437
                   'bzrlib.tests.commands',
 
3438
                   'bzrlib.tests.per_branch',
 
3439
                   'bzrlib.tests.per_bzrdir',
 
3440
                   'bzrlib.tests.per_interrepository',
 
3441
                   'bzrlib.tests.per_intertree',
 
3442
                   'bzrlib.tests.per_inventory',
 
3443
                   'bzrlib.tests.per_interbranch',
 
3444
                   'bzrlib.tests.per_lock',
 
3445
                   'bzrlib.tests.per_transport',
 
3446
                   'bzrlib.tests.per_tree',
 
3447
                   'bzrlib.tests.per_pack_repository',
 
3448
                   'bzrlib.tests.per_repository',
 
3449
                   'bzrlib.tests.per_repository_chk',
 
3450
                   'bzrlib.tests.per_repository_reference',
 
3451
                   'bzrlib.tests.per_versionedfile',
 
3452
                   'bzrlib.tests.per_workingtree',
 
3453
                   'bzrlib.tests.test__annotator',
 
3454
                   'bzrlib.tests.test__chk_map',
 
3455
                   'bzrlib.tests.test__dirstate_helpers',
 
3456
                   'bzrlib.tests.test__groupcompress',
 
3457
                   'bzrlib.tests.test__known_graph',
 
3458
                   'bzrlib.tests.test__rio',
 
3459
                   'bzrlib.tests.test__walkdirs_win32',
 
3460
                   'bzrlib.tests.test_ancestry',
 
3461
                   'bzrlib.tests.test_annotate',
 
3462
                   'bzrlib.tests.test_api',
 
3463
                   'bzrlib.tests.test_atomicfile',
 
3464
                   'bzrlib.tests.test_bad_files',
 
3465
                   'bzrlib.tests.test_bencode',
 
3466
                   'bzrlib.tests.test_bisect_multi',
 
3467
                   'bzrlib.tests.test_branch',
 
3468
                   'bzrlib.tests.test_branchbuilder',
 
3469
                   'bzrlib.tests.test_btree_index',
 
3470
                   'bzrlib.tests.test_bugtracker',
 
3471
                   'bzrlib.tests.test_bundle',
 
3472
                   'bzrlib.tests.test_bzrdir',
 
3473
                   'bzrlib.tests.test__chunks_to_lines',
 
3474
                   'bzrlib.tests.test_cache_utf8',
 
3475
                   'bzrlib.tests.test_chk_map',
 
3476
                   'bzrlib.tests.test_chk_serializer',
 
3477
                   'bzrlib.tests.test_chunk_writer',
 
3478
                   'bzrlib.tests.test_clean_tree',
 
3479
                   'bzrlib.tests.test_commands',
 
3480
                   'bzrlib.tests.test_commit',
 
3481
                   'bzrlib.tests.test_commit_merge',
 
3482
                   'bzrlib.tests.test_config',
 
3483
                   'bzrlib.tests.test_conflicts',
 
3484
                   'bzrlib.tests.test_counted_lock',
 
3485
                   'bzrlib.tests.test_crash',
 
3486
                   'bzrlib.tests.test_decorators',
 
3487
                   'bzrlib.tests.test_delta',
 
3488
                   'bzrlib.tests.test_debug',
 
3489
                   'bzrlib.tests.test_deprecated_graph',
 
3490
                   'bzrlib.tests.test_diff',
 
3491
                   'bzrlib.tests.test_directory_service',
 
3492
                   'bzrlib.tests.test_dirstate',
 
3493
                   'bzrlib.tests.test_email_message',
 
3494
                   'bzrlib.tests.test_eol_filters',
 
3495
                   'bzrlib.tests.test_errors',
 
3496
                   'bzrlib.tests.test_export',
 
3497
                   'bzrlib.tests.test_extract',
 
3498
                   'bzrlib.tests.test_fetch',
 
3499
                   'bzrlib.tests.test_fifo_cache',
 
3500
                   'bzrlib.tests.test_filters',
 
3501
                   'bzrlib.tests.test_ftp_transport',
 
3502
                   'bzrlib.tests.test_foreign',
 
3503
                   'bzrlib.tests.test_generate_docs',
 
3504
                   'bzrlib.tests.test_generate_ids',
 
3505
                   'bzrlib.tests.test_globbing',
 
3506
                   'bzrlib.tests.test_gpg',
 
3507
                   'bzrlib.tests.test_graph',
 
3508
                   'bzrlib.tests.test_groupcompress',
 
3509
                   'bzrlib.tests.test_hashcache',
 
3510
                   'bzrlib.tests.test_help',
 
3511
                   'bzrlib.tests.test_hooks',
 
3512
                   'bzrlib.tests.test_http',
 
3513
                   'bzrlib.tests.test_http_response',
 
3514
                   'bzrlib.tests.test_https_ca_bundle',
 
3515
                   'bzrlib.tests.test_identitymap',
 
3516
                   'bzrlib.tests.test_ignores',
 
3517
                   'bzrlib.tests.test_index',
 
3518
                   'bzrlib.tests.test_info',
 
3519
                   'bzrlib.tests.test_inv',
 
3520
                   'bzrlib.tests.test_inventory_delta',
 
3521
                   'bzrlib.tests.test_knit',
 
3522
                   'bzrlib.tests.test_lazy_import',
 
3523
                   'bzrlib.tests.test_lazy_regex',
 
3524
                   'bzrlib.tests.test_lock',
 
3525
                   'bzrlib.tests.test_lockable_files',
 
3526
                   'bzrlib.tests.test_lockdir',
 
3527
                   'bzrlib.tests.test_log',
 
3528
                   'bzrlib.tests.test_lru_cache',
 
3529
                   'bzrlib.tests.test_lsprof',
 
3530
                   'bzrlib.tests.test_mail_client',
 
3531
                   'bzrlib.tests.test_memorytree',
 
3532
                   'bzrlib.tests.test_merge',
 
3533
                   'bzrlib.tests.test_merge3',
 
3534
                   'bzrlib.tests.test_merge_core',
 
3535
                   'bzrlib.tests.test_merge_directive',
 
3536
                   'bzrlib.tests.test_missing',
 
3537
                   'bzrlib.tests.test_msgeditor',
 
3538
                   'bzrlib.tests.test_multiparent',
 
3539
                   'bzrlib.tests.test_mutabletree',
 
3540
                   'bzrlib.tests.test_nonascii',
 
3541
                   'bzrlib.tests.test_options',
 
3542
                   'bzrlib.tests.test_osutils',
 
3543
                   'bzrlib.tests.test_osutils_encodings',
 
3544
                   'bzrlib.tests.test_pack',
 
3545
                   'bzrlib.tests.test_patch',
 
3546
                   'bzrlib.tests.test_patches',
 
3547
                   'bzrlib.tests.test_permissions',
 
3548
                   'bzrlib.tests.test_plugins',
 
3549
                   'bzrlib.tests.test_progress',
 
3550
                   'bzrlib.tests.test_read_bundle',
 
3551
                   'bzrlib.tests.test_reconcile',
 
3552
                   'bzrlib.tests.test_reconfigure',
 
3553
                   'bzrlib.tests.test_registry',
 
3554
                   'bzrlib.tests.test_remote',
 
3555
                   'bzrlib.tests.test_rename_map',
 
3556
                   'bzrlib.tests.test_repository',
 
3557
                   'bzrlib.tests.test_revert',
 
3558
                   'bzrlib.tests.test_revision',
 
3559
                   'bzrlib.tests.test_revisionspec',
 
3560
                   'bzrlib.tests.test_revisiontree',
 
3561
                   'bzrlib.tests.test_rio',
 
3562
                   'bzrlib.tests.test_rules',
 
3563
                   'bzrlib.tests.test_sampler',
 
3564
                   'bzrlib.tests.test_selftest',
 
3565
                   'bzrlib.tests.test_serializer',
 
3566
                   'bzrlib.tests.test_setup',
 
3567
                   'bzrlib.tests.test_sftp_transport',
 
3568
                   'bzrlib.tests.test_shelf',
 
3569
                   'bzrlib.tests.test_shelf_ui',
 
3570
                   'bzrlib.tests.test_smart',
 
3571
                   'bzrlib.tests.test_smart_add',
 
3572
                   'bzrlib.tests.test_smart_request',
 
3573
                   'bzrlib.tests.test_smart_transport',
 
3574
                   'bzrlib.tests.test_smtp_connection',
 
3575
                   'bzrlib.tests.test_source',
 
3576
                   'bzrlib.tests.test_ssh_transport',
 
3577
                   'bzrlib.tests.test_status',
 
3578
                   'bzrlib.tests.test_store',
 
3579
                   'bzrlib.tests.test_strace',
 
3580
                   'bzrlib.tests.test_subsume',
 
3581
                   'bzrlib.tests.test_switch',
 
3582
                   'bzrlib.tests.test_symbol_versioning',
 
3583
                   'bzrlib.tests.test_tag',
 
3584
                   'bzrlib.tests.test_testament',
 
3585
                   'bzrlib.tests.test_textfile',
 
3586
                   'bzrlib.tests.test_textmerge',
 
3587
                   'bzrlib.tests.test_timestamp',
 
3588
                   'bzrlib.tests.test_trace',
 
3589
                   'bzrlib.tests.test_transactions',
 
3590
                   'bzrlib.tests.test_transform',
 
3591
                   'bzrlib.tests.test_transport',
 
3592
                   'bzrlib.tests.test_transport_log',
 
3593
                   'bzrlib.tests.test_tree',
 
3594
                   'bzrlib.tests.test_treebuilder',
 
3595
                   'bzrlib.tests.test_tsort',
 
3596
                   'bzrlib.tests.test_tuned_gzip',
 
3597
                   'bzrlib.tests.test_ui',
 
3598
                   'bzrlib.tests.test_uncommit',
 
3599
                   'bzrlib.tests.test_upgrade',
 
3600
                   'bzrlib.tests.test_upgrade_stacked',
 
3601
                   'bzrlib.tests.test_urlutils',
 
3602
                   'bzrlib.tests.test_version',
 
3603
                   'bzrlib.tests.test_version_info',
 
3604
                   'bzrlib.tests.test_weave',
 
3605
                   'bzrlib.tests.test_whitebox',
 
3606
                   'bzrlib.tests.test_win32utils',
 
3607
                   'bzrlib.tests.test_workingtree',
 
3608
                   'bzrlib.tests.test_workingtree_4',
 
3609
                   'bzrlib.tests.test_wsgi',
 
3610
                   'bzrlib.tests.test_xml',
 
3611
                   ]
 
3612
 
 
3613
    loader = TestUtil.TestLoader()
 
3614
 
 
3615
    if keep_only is not None:
 
3616
        id_filter = TestIdList(keep_only)
 
3617
    if starting_with:
 
3618
        # We take precedence over keep_only because *at loading time* using
 
3619
        # both options means we will load less tests for the same final result.
 
3620
        def interesting_module(name):
 
3621
            for start in starting_with:
 
3622
                if (
 
3623
                    # Either the module name starts with the specified string
 
3624
                    name.startswith(start)
 
3625
                    # or it may contain tests starting with the specified string
 
3626
                    or start.startswith(name)
 
3627
                    ):
 
3628
                    return True
 
3629
            return False
 
3630
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3631
 
 
3632
    elif keep_only is not None:
 
3633
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3634
        def interesting_module(name):
 
3635
            return id_filter.refers_to(name)
 
3636
 
 
3637
    else:
 
3638
        loader = TestUtil.TestLoader()
 
3639
        def interesting_module(name):
 
3640
            # No filtering, all modules are interesting
 
3641
            return True
 
3642
 
 
3643
    suite = loader.suiteClass()
 
3644
 
 
3645
    # modules building their suite with loadTestsFromModuleNames
 
3646
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
3647
 
 
3648
    modules_to_doctest = [
 
3649
        'bzrlib',
 
3650
        'bzrlib.branchbuilder',
 
3651
        'bzrlib.export',
 
3652
        'bzrlib.inventory',
 
3653
        'bzrlib.iterablefile',
 
3654
        'bzrlib.lockdir',
 
3655
        'bzrlib.merge3',
 
3656
        'bzrlib.option',
 
3657
        'bzrlib.symbol_versioning',
 
3658
        'bzrlib.tests',
 
3659
        'bzrlib.timestamp',
 
3660
        'bzrlib.version_info_formats.format_custom',
 
3661
        ]
 
3662
 
 
3663
    for mod in modules_to_doctest:
 
3664
        if not interesting_module(mod):
 
3665
            # No tests to keep here, move along
 
3666
            continue
 
3667
        try:
 
3668
            # note that this really does mean "report only" -- doctest
 
3669
            # still runs the rest of the examples
 
3670
            doc_suite = doctest.DocTestSuite(mod,
 
3671
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3672
        except ValueError, e:
 
3673
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3674
            raise
 
3675
        if len(doc_suite._tests) == 0:
 
3676
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3677
        suite.addTest(doc_suite)
 
3678
 
 
3679
    default_encoding = sys.getdefaultencoding()
 
3680
    for name, plugin in bzrlib.plugin.plugins().items():
 
3681
        if not interesting_module(plugin.module.__name__):
 
3682
            continue
 
3683
        plugin_suite = plugin.test_suite()
 
3684
        # We used to catch ImportError here and turn it into just a warning,
 
3685
        # but really if you don't have --no-plugins this should be a failure.
 
3686
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3687
        if plugin_suite is None:
 
3688
            plugin_suite = plugin.load_plugin_tests(loader)
 
3689
        if plugin_suite is not None:
 
3690
            suite.addTest(plugin_suite)
 
3691
        if default_encoding != sys.getdefaultencoding():
 
3692
            bzrlib.trace.warning(
 
3693
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
3694
                sys.getdefaultencoding())
 
3695
            reload(sys)
 
3696
            sys.setdefaultencoding(default_encoding)
 
3697
 
 
3698
    if keep_only is not None:
 
3699
        # Now that the referred modules have loaded their tests, keep only the
 
3700
        # requested ones.
 
3701
        suite = filter_suite_by_id_list(suite, id_filter)
 
3702
        # Do some sanity checks on the id_list filtering
 
3703
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3704
        if starting_with:
 
3705
            # The tester has used both keep_only and starting_with, so he is
 
3706
            # already aware that some tests are excluded from the list, there
 
3707
            # is no need to tell him which.
 
3708
            pass
 
3709
        else:
 
3710
            # Some tests mentioned in the list are not in the test suite. The
 
3711
            # list may be out of date, report to the tester.
 
3712
            for id in not_found:
 
3713
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3714
        for id in duplicates:
 
3715
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3716
 
 
3717
    return suite
 
3718
 
 
3719
 
 
3720
def multiply_scenarios(scenarios_left, scenarios_right):
 
3721
    """Multiply two sets of scenarios.
 
3722
 
 
3723
    :returns: the cartesian product of the two sets of scenarios, that is
 
3724
        a scenario for every possible combination of a left scenario and a
 
3725
        right scenario.
 
3726
    """
 
3727
    return [
 
3728
        ('%s,%s' % (left_name, right_name),
 
3729
         dict(left_dict.items() + right_dict.items()))
 
3730
        for left_name, left_dict in scenarios_left
 
3731
        for right_name, right_dict in scenarios_right]
 
3732
 
 
3733
 
 
3734
def multiply_tests(tests, scenarios, result):
 
3735
    """Multiply tests_list by scenarios into result.
 
3736
 
 
3737
    This is the core workhorse for test parameterisation.
 
3738
 
 
3739
    Typically the load_tests() method for a per-implementation test suite will
 
3740
    call multiply_tests and return the result.
 
3741
 
 
3742
    :param tests: The tests to parameterise.
 
3743
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
3744
        scenario_param_dict).
 
3745
    :param result: A TestSuite to add created tests to.
 
3746
 
 
3747
    This returns the passed in result TestSuite with the cross product of all
 
3748
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3749
    the scenario name at the end of its id(), and updating the test object's
 
3750
    __dict__ with the scenario_param_dict.
 
3751
 
 
3752
    >>> import bzrlib.tests.test_sampler
 
3753
    >>> r = multiply_tests(
 
3754
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3755
    ...     [('one', dict(param=1)),
 
3756
    ...      ('two', dict(param=2))],
 
3757
    ...     TestSuite())
 
3758
    >>> tests = list(iter_suite_tests(r))
 
3759
    >>> len(tests)
 
3760
    2
 
3761
    >>> tests[0].id()
 
3762
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
3763
    >>> tests[0].param
 
3764
    1
 
3765
    >>> tests[1].param
 
3766
    2
 
3767
    """
 
3768
    for test in iter_suite_tests(tests):
 
3769
        apply_scenarios(test, scenarios, result)
 
3770
    return result
 
3771
 
 
3772
 
 
3773
def apply_scenarios(test, scenarios, result):
 
3774
    """Apply the scenarios in scenarios to test and add to result.
 
3775
 
 
3776
    :param test: The test to apply scenarios to.
 
3777
    :param scenarios: An iterable of scenarios to apply to test.
 
3778
    :return: result
 
3779
    :seealso: apply_scenario
 
3780
    """
 
3781
    for scenario in scenarios:
 
3782
        result.addTest(apply_scenario(test, scenario))
 
3783
    return result
 
3784
 
 
3785
 
 
3786
def apply_scenario(test, scenario):
 
3787
    """Copy test and apply scenario to it.
 
3788
 
 
3789
    :param test: A test to adapt.
 
3790
    :param scenario: A tuple describing the scenarion.
 
3791
        The first element of the tuple is the new test id.
 
3792
        The second element is a dict containing attributes to set on the
 
3793
        test.
 
3794
    :return: The adapted test.
 
3795
    """
 
3796
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3797
    new_test = clone_test(test, new_id)
 
3798
    for name, value in scenario[1].items():
 
3799
        setattr(new_test, name, value)
 
3800
    return new_test
 
3801
 
 
3802
 
 
3803
def clone_test(test, new_id):
 
3804
    """Clone a test giving it a new id.
 
3805
 
 
3806
    :param test: The test to clone.
 
3807
    :param new_id: The id to assign to it.
 
3808
    :return: The new test.
 
3809
    """
 
3810
    from copy import deepcopy
 
3811
    new_test = deepcopy(test)
 
3812
    new_test.id = lambda: new_id
 
3813
    return new_test
 
3814
 
 
3815
 
 
3816
def _rmtree_temp_dir(dirname):
 
3817
    # If LANG=C we probably have created some bogus paths
 
3818
    # which rmtree(unicode) will fail to delete
 
3819
    # so make sure we are using rmtree(str) to delete everything
 
3820
    # except on win32, where rmtree(str) will fail
 
3821
    # since it doesn't have the property of byte-stream paths
 
3822
    # (they are either ascii or mbcs)
 
3823
    if sys.platform == 'win32':
 
3824
        # make sure we are using the unicode win32 api
 
3825
        dirname = unicode(dirname)
 
3826
    else:
 
3827
        dirname = dirname.encode(sys.getfilesystemencoding())
 
3828
    try:
 
3829
        osutils.rmtree(dirname)
 
3830
    except OSError, e:
 
3831
        # We don't want to fail here because some useful display will be lost
 
3832
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
3833
        # possible info to the test runner is even worse.
 
3834
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
3835
                         % (os.path.basename(dirname), e))
 
3836
 
 
3837
 
 
3838
class Feature(object):
 
3839
    """An operating system Feature."""
 
3840
 
 
3841
    def __init__(self):
 
3842
        self._available = None
 
3843
 
 
3844
    def available(self):
 
3845
        """Is the feature available?
 
3846
 
 
3847
        :return: True if the feature is available.
 
3848
        """
 
3849
        if self._available is None:
 
3850
            self._available = self._probe()
 
3851
        return self._available
 
3852
 
 
3853
    def _probe(self):
 
3854
        """Implement this method in concrete features.
 
3855
 
 
3856
        :return: True if the feature is available.
 
3857
        """
 
3858
        raise NotImplementedError
 
3859
 
 
3860
    def __str__(self):
 
3861
        if getattr(self, 'feature_name', None):
 
3862
            return self.feature_name()
 
3863
        return self.__class__.__name__
 
3864
 
 
3865
 
 
3866
class _SymlinkFeature(Feature):
 
3867
 
 
3868
    def _probe(self):
 
3869
        return osutils.has_symlinks()
 
3870
 
 
3871
    def feature_name(self):
 
3872
        return 'symlinks'
 
3873
 
 
3874
SymlinkFeature = _SymlinkFeature()
 
3875
 
 
3876
 
 
3877
class _HardlinkFeature(Feature):
 
3878
 
 
3879
    def _probe(self):
 
3880
        return osutils.has_hardlinks()
 
3881
 
 
3882
    def feature_name(self):
 
3883
        return 'hardlinks'
 
3884
 
 
3885
HardlinkFeature = _HardlinkFeature()
 
3886
 
 
3887
 
 
3888
class _OsFifoFeature(Feature):
 
3889
 
 
3890
    def _probe(self):
 
3891
        return getattr(os, 'mkfifo', None)
 
3892
 
 
3893
    def feature_name(self):
 
3894
        return 'filesystem fifos'
 
3895
 
 
3896
OsFifoFeature = _OsFifoFeature()
 
3897
 
 
3898
 
 
3899
class _UnicodeFilenameFeature(Feature):
 
3900
    """Does the filesystem support Unicode filenames?"""
 
3901
 
 
3902
    def _probe(self):
 
3903
        try:
 
3904
            # Check for character combinations unlikely to be covered by any
 
3905
            # single non-unicode encoding. We use the characters
 
3906
            # - greek small letter alpha (U+03B1) and
 
3907
            # - braille pattern dots-123456 (U+283F).
 
3908
            os.stat(u'\u03b1\u283f')
 
3909
        except UnicodeEncodeError:
 
3910
            return False
 
3911
        except (IOError, OSError):
 
3912
            # The filesystem allows the Unicode filename but the file doesn't
 
3913
            # exist.
 
3914
            return True
 
3915
        else:
 
3916
            # The filesystem allows the Unicode filename and the file exists,
 
3917
            # for some reason.
 
3918
            return True
 
3919
 
 
3920
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3921
 
 
3922
 
 
3923
def probe_unicode_in_user_encoding():
 
3924
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3925
    Return first successfull match.
 
3926
 
 
3927
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3928
    """
 
3929
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3930
    for uni_val in possible_vals:
 
3931
        try:
 
3932
            str_val = uni_val.encode(osutils.get_user_encoding())
 
3933
        except UnicodeEncodeError:
 
3934
            # Try a different character
 
3935
            pass
 
3936
        else:
 
3937
            return uni_val, str_val
 
3938
    return None, None
 
3939
 
 
3940
 
 
3941
def probe_bad_non_ascii(encoding):
 
3942
    """Try to find [bad] character with code [128..255]
 
3943
    that cannot be decoded to unicode in some encoding.
 
3944
    Return None if all non-ascii characters is valid
 
3945
    for given encoding.
 
3946
    """
 
3947
    for i in xrange(128, 256):
 
3948
        char = chr(i)
 
3949
        try:
 
3950
            char.decode(encoding)
 
3951
        except UnicodeDecodeError:
 
3952
            return char
 
3953
    return None
 
3954
 
 
3955
 
 
3956
class _HTTPSServerFeature(Feature):
 
3957
    """Some tests want an https Server, check if one is available.
 
3958
 
 
3959
    Right now, the only way this is available is under python2.6 which provides
 
3960
    an ssl module.
 
3961
    """
 
3962
 
 
3963
    def _probe(self):
 
3964
        try:
 
3965
            import ssl
 
3966
            return True
 
3967
        except ImportError:
 
3968
            return False
 
3969
 
 
3970
    def feature_name(self):
 
3971
        return 'HTTPSServer'
 
3972
 
 
3973
 
 
3974
HTTPSServerFeature = _HTTPSServerFeature()
 
3975
 
 
3976
 
 
3977
class _UnicodeFilename(Feature):
 
3978
    """Does the filesystem support Unicode filenames?"""
 
3979
 
 
3980
    def _probe(self):
 
3981
        try:
 
3982
            os.stat(u'\u03b1')
 
3983
        except UnicodeEncodeError:
 
3984
            return False
 
3985
        except (IOError, OSError):
 
3986
            # The filesystem allows the Unicode filename but the file doesn't
 
3987
            # exist.
 
3988
            return True
 
3989
        else:
 
3990
            # The filesystem allows the Unicode filename and the file exists,
 
3991
            # for some reason.
 
3992
            return True
 
3993
 
 
3994
UnicodeFilename = _UnicodeFilename()
 
3995
 
 
3996
 
 
3997
class _UTF8Filesystem(Feature):
 
3998
    """Is the filesystem UTF-8?"""
 
3999
 
 
4000
    def _probe(self):
 
4001
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4002
            return True
 
4003
        return False
 
4004
 
 
4005
UTF8Filesystem = _UTF8Filesystem()
 
4006
 
 
4007
 
 
4008
class _CaseInsCasePresFilenameFeature(Feature):
 
4009
    """Is the file-system case insensitive, but case-preserving?"""
 
4010
 
 
4011
    def _probe(self):
 
4012
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4013
        try:
 
4014
            # first check truly case-preserving for created files, then check
 
4015
            # case insensitive when opening existing files.
 
4016
            name = osutils.normpath(name)
 
4017
            base, rel = osutils.split(name)
 
4018
            found_rel = osutils.canonical_relpath(base, name)
 
4019
            return (found_rel == rel
 
4020
                    and os.path.isfile(name.upper())
 
4021
                    and os.path.isfile(name.lower()))
 
4022
        finally:
 
4023
            os.close(fileno)
 
4024
            os.remove(name)
 
4025
 
 
4026
    def feature_name(self):
 
4027
        return "case-insensitive case-preserving filesystem"
 
4028
 
 
4029
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4030
 
 
4031
 
 
4032
class _CaseInsensitiveFilesystemFeature(Feature):
 
4033
    """Check if underlying filesystem is case-insensitive but *not* case
 
4034
    preserving.
 
4035
    """
 
4036
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4037
    # more likely to be case preserving, so this case is rare.
 
4038
 
 
4039
    def _probe(self):
 
4040
        if CaseInsCasePresFilenameFeature.available():
 
4041
            return False
 
4042
 
 
4043
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4044
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4045
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4046
        else:
 
4047
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4048
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4049
            dir=root)
 
4050
        name_a = osutils.pathjoin(tdir, 'a')
 
4051
        name_A = osutils.pathjoin(tdir, 'A')
 
4052
        os.mkdir(name_a)
 
4053
        result = osutils.isdir(name_A)
 
4054
        _rmtree_temp_dir(tdir)
 
4055
        return result
 
4056
 
 
4057
    def feature_name(self):
 
4058
        return 'case-insensitive filesystem'
 
4059
 
 
4060
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4061
 
 
4062
 
 
4063
class _SubUnitFeature(Feature):
 
4064
    """Check if subunit is available."""
 
4065
 
 
4066
    def _probe(self):
 
4067
        try:
 
4068
            import subunit
 
4069
            return True
 
4070
        except ImportError:
 
4071
            return False
 
4072
 
 
4073
    def feature_name(self):
 
4074
        return 'subunit'
 
4075
 
 
4076
SubUnitFeature = _SubUnitFeature()
 
4077
# Only define SubUnitBzrRunner if subunit is available.
 
4078
try:
 
4079
    from subunit import TestProtocolClient
 
4080
    try:
 
4081
        from subunit.test_results import AutoTimingTestResultDecorator
 
4082
    except ImportError:
 
4083
        AutoTimingTestResultDecorator = lambda x:x
 
4084
    class SubUnitBzrRunner(TextTestRunner):
 
4085
        def run(self, test):
 
4086
            result = AutoTimingTestResultDecorator(
 
4087
                TestProtocolClient(self.stream))
 
4088
            test.run(result)
 
4089
            return result
 
4090
except ImportError:
 
4091
    pass