~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Vincent Ladeuil
  • Date: 2009-12-14 15:51:36 UTC
  • mto: (4894.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4895.
  • Revision ID: v.ladeuil+lp@free.fr-20091214155136-rf4nkqvxda9oiw4u
Cleanup tests and tweak the text displayed.

* bzrlib/tests/blackbox/test_update.py:
Fix imports and replace the assertContainsRe with assertEqualDiff
to make the test clearer, more robust and easier to debug.

* bzrlib/tests/commands/test_update.py: 
Fix imports.

* bzrlib/tests/blackbox/test_filtered_view_ops.py: 
Fix imports and strange accesses to base class methods.
(TestViewTreeOperations.test_view_on_update): Avoid os.chdir()
call, simplify string matching assertions.

* bzrlib/builtins.py:
(cmd_update.run): Fix spurious space, get rid of the final '/' for
the base path, don't add a final period (it's a legal char in a
path and would be annoying for people that like to copy/paste).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from copy import copy
 
32
from cStringIO import StringIO
 
33
import difflib
 
34
import doctest
 
35
import errno
18
36
import logging
19
 
import unittest
20
 
import tempfile
 
37
import math
21
38
import os
 
39
from pprint import pformat
 
40
import random
 
41
import re
 
42
import shlex
 
43
import stat
 
44
from subprocess import Popen, PIPE, STDOUT
22
45
import sys
23
 
import errno
24
 
import subprocess
25
 
import shutil
26
 
 
27
 
import testsweet
 
46
import tempfile
 
47
import threading
 
48
import time
 
49
import unittest
 
50
import warnings
 
51
 
 
52
 
 
53
from bzrlib import (
 
54
    branchbuilder,
 
55
    bzrdir,
 
56
    chk_map,
 
57
    config,
 
58
    debug,
 
59
    errors,
 
60
    hooks,
 
61
    lock as _mod_lock,
 
62
    memorytree,
 
63
    osutils,
 
64
    progress,
 
65
    ui,
 
66
    urlutils,
 
67
    registry,
 
68
    workingtree,
 
69
    )
 
70
import bzrlib.branch
28
71
import bzrlib.commands
29
 
 
 
72
import bzrlib.timestamp
 
73
import bzrlib.export
 
74
import bzrlib.inventory
 
75
import bzrlib.iterablefile
 
76
import bzrlib.lockdir
 
77
try:
 
78
    import bzrlib.lsprof
 
79
except ImportError:
 
80
    # lsprof not available
 
81
    pass
 
82
from bzrlib.merge import merge_inner
 
83
import bzrlib.merge3
 
84
import bzrlib.plugin
 
85
from bzrlib.smart import client, request, server
 
86
import bzrlib.store
 
87
from bzrlib import symbol_versioning
 
88
from bzrlib.symbol_versioning import (
 
89
    DEPRECATED_PARAMETER,
 
90
    deprecated_function,
 
91
    deprecated_method,
 
92
    deprecated_passed,
 
93
    )
30
94
import bzrlib.trace
31
 
import bzrlib.fetch
32
 
 
33
 
 
34
 
MODULES_TO_TEST = []
35
 
MODULES_TO_DOCTEST = []
36
 
 
37
 
from logging import debug, warning, error
 
95
from bzrlib.transport import get_transport, pathfilter
 
96
import bzrlib.transport
 
97
from bzrlib.transport.local import LocalURLServer
 
98
from bzrlib.transport.memory import MemoryServer
 
99
from bzrlib.transport.readonly import ReadonlyServer
 
100
from bzrlib.trace import mutter, note
 
101
from bzrlib.tests import TestUtil
 
102
from bzrlib.tests.http_server import HttpServer
 
103
from bzrlib.tests.TestUtil import (
 
104
                          TestSuite,
 
105
                          TestLoader,
 
106
                          )
 
107
from bzrlib.tests.treeshape import build_tree_contents
 
108
from bzrlib.ui import NullProgressView
 
109
from bzrlib.ui.text import TextUIFactory
 
110
import bzrlib.version_info_formats.format_custom
 
111
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
112
 
 
113
# Mark this python module as being part of the implementation
 
114
# of unittest: this gives us better tracebacks where the last
 
115
# shown frame is the test code, not our assertXYZ.
 
116
__unittest = 1
 
117
 
 
118
default_transport = LocalURLServer
 
119
 
 
120
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
121
SUBUNIT_SEEK_SET = 0
 
122
SUBUNIT_SEEK_CUR = 1
 
123
 
 
124
 
 
125
class ExtendedTestResult(unittest._TextTestResult):
 
126
    """Accepts, reports and accumulates the results of running tests.
 
127
 
 
128
    Compared to the unittest version this class adds support for
 
129
    profiling, benchmarking, stopping as soon as a test fails,  and
 
130
    skipping tests.  There are further-specialized subclasses for
 
131
    different types of display.
 
132
 
 
133
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
134
    addFailure or addError classes.  These in turn may redirect to a more
 
135
    specific case for the special test results supported by our extended
 
136
    tests.
 
137
 
 
138
    Note that just one of these objects is fed the results from many tests.
 
139
    """
 
140
 
 
141
    stop_early = False
 
142
 
 
143
    def __init__(self, stream, descriptions, verbosity,
 
144
                 bench_history=None,
 
145
                 strict=False,
 
146
                 ):
 
147
        """Construct new TestResult.
 
148
 
 
149
        :param bench_history: Optionally, a writable file object to accumulate
 
150
            benchmark results.
 
151
        """
 
152
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
153
        if bench_history is not None:
 
154
            from bzrlib.version import _get_bzr_source_tree
 
155
            src_tree = _get_bzr_source_tree()
 
156
            if src_tree:
 
157
                try:
 
158
                    revision_id = src_tree.get_parent_ids()[0]
 
159
                except IndexError:
 
160
                    # XXX: if this is a brand new tree, do the same as if there
 
161
                    # is no branch.
 
162
                    revision_id = ''
 
163
            else:
 
164
                # XXX: If there's no branch, what should we do?
 
165
                revision_id = ''
 
166
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
167
        self._bench_history = bench_history
 
168
        self.ui = ui.ui_factory
 
169
        self.num_tests = 0
 
170
        self.error_count = 0
 
171
        self.failure_count = 0
 
172
        self.known_failure_count = 0
 
173
        self.skip_count = 0
 
174
        self.not_applicable_count = 0
 
175
        self.unsupported = {}
 
176
        self.count = 0
 
177
        self._overall_start_time = time.time()
 
178
        self._strict = strict
 
179
 
 
180
    def stopTestRun(self):
 
181
        run = self.testsRun
 
182
        actionTaken = "Ran"
 
183
        stopTime = time.time()
 
184
        timeTaken = stopTime - self.startTime
 
185
        self.printErrors()
 
186
        self.stream.writeln(self.separator2)
 
187
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
188
                            run, run != 1 and "s" or "", timeTaken))
 
189
        self.stream.writeln()
 
190
        if not self.wasSuccessful():
 
191
            self.stream.write("FAILED (")
 
192
            failed, errored = map(len, (self.failures, self.errors))
 
193
            if failed:
 
194
                self.stream.write("failures=%d" % failed)
 
195
            if errored:
 
196
                if failed: self.stream.write(", ")
 
197
                self.stream.write("errors=%d" % errored)
 
198
            if self.known_failure_count:
 
199
                if failed or errored: self.stream.write(", ")
 
200
                self.stream.write("known_failure_count=%d" %
 
201
                    self.known_failure_count)
 
202
            self.stream.writeln(")")
 
203
        else:
 
204
            if self.known_failure_count:
 
205
                self.stream.writeln("OK (known_failures=%d)" %
 
206
                    self.known_failure_count)
 
207
            else:
 
208
                self.stream.writeln("OK")
 
209
        if self.skip_count > 0:
 
210
            skipped = self.skip_count
 
211
            self.stream.writeln('%d test%s skipped' %
 
212
                                (skipped, skipped != 1 and "s" or ""))
 
213
        if self.unsupported:
 
214
            for feature, count in sorted(self.unsupported.items()):
 
215
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
216
                    (feature, count))
 
217
        if self._strict:
 
218
            ok = self.wasStrictlySuccessful()
 
219
        else:
 
220
            ok = self.wasSuccessful()
 
221
        if TestCase._first_thread_leaker_id:
 
222
            self.stream.write(
 
223
                '%s is leaking threads among %d leaking tests.\n' % (
 
224
                TestCase._first_thread_leaker_id,
 
225
                TestCase._leaking_threads_tests))
 
226
            # We don't report the main thread as an active one.
 
227
            self.stream.write(
 
228
                '%d non-main threads were left active in the end.\n'
 
229
                % (TestCase._active_threads - 1))
 
230
 
 
231
    def _extractBenchmarkTime(self, testCase):
 
232
        """Add a benchmark time for the current test case."""
 
233
        return getattr(testCase, "_benchtime", None)
 
234
 
 
235
    def _elapsedTestTimeString(self):
 
236
        """Return a time string for the overall time the current test has taken."""
 
237
        return self._formatTime(time.time() - self._start_time)
 
238
 
 
239
    def _testTimeString(self, testCase):
 
240
        benchmark_time = self._extractBenchmarkTime(testCase)
 
241
        if benchmark_time is not None:
 
242
            return self._formatTime(benchmark_time) + "*"
 
243
        else:
 
244
            return self._elapsedTestTimeString()
 
245
 
 
246
    def _formatTime(self, seconds):
 
247
        """Format seconds as milliseconds with leading spaces."""
 
248
        # some benchmarks can take thousands of seconds to run, so we need 8
 
249
        # places
 
250
        return "%8dms" % (1000 * seconds)
 
251
 
 
252
    def _shortened_test_description(self, test):
 
253
        what = test.id()
 
254
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
255
        return what
 
256
 
 
257
    def startTest(self, test):
 
258
        unittest.TestResult.startTest(self, test)
 
259
        if self.count == 0:
 
260
            self.startTests()
 
261
        self.report_test_start(test)
 
262
        test.number = self.count
 
263
        self._recordTestStartTime()
 
264
 
 
265
    def startTests(self):
 
266
        import platform
 
267
        if getattr(sys, 'frozen', None) is None:
 
268
            bzr_path = osutils.realpath(sys.argv[0])
 
269
        else:
 
270
            bzr_path = sys.executable
 
271
        self.stream.write(
 
272
            'testing: %s\n' % (bzr_path,))
 
273
        self.stream.write(
 
274
            '   %s\n' % (
 
275
                    bzrlib.__path__[0],))
 
276
        self.stream.write(
 
277
            '   bzr-%s python-%s %s\n' % (
 
278
                    bzrlib.version_string,
 
279
                    bzrlib._format_version_tuple(sys.version_info),
 
280
                    platform.platform(aliased=1),
 
281
                    ))
 
282
        self.stream.write('\n')
 
283
 
 
284
    def _recordTestStartTime(self):
 
285
        """Record that a test has started."""
 
286
        self._start_time = time.time()
 
287
 
 
288
    def _cleanupLogFile(self, test):
 
289
        # We can only do this if we have one of our TestCases, not if
 
290
        # we have a doctest.
 
291
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
292
        if setKeepLogfile is not None:
 
293
            setKeepLogfile()
 
294
 
 
295
    def addError(self, test, err):
 
296
        """Tell result that test finished with an error.
 
297
 
 
298
        Called from the TestCase run() method when the test
 
299
        fails with an unexpected error.
 
300
        """
 
301
        self._post_mortem()
 
302
        unittest.TestResult.addError(self, test, err)
 
303
        self.error_count += 1
 
304
        self.report_error(test, err)
 
305
        if self.stop_early:
 
306
            self.stop()
 
307
        self._cleanupLogFile(test)
 
308
 
 
309
    def addFailure(self, test, err):
 
310
        """Tell result that test failed.
 
311
 
 
312
        Called from the TestCase run() method when the test
 
313
        fails because e.g. an assert() method failed.
 
314
        """
 
315
        self._post_mortem()
 
316
        unittest.TestResult.addFailure(self, test, err)
 
317
        self.failure_count += 1
 
318
        self.report_failure(test, err)
 
319
        if self.stop_early:
 
320
            self.stop()
 
321
        self._cleanupLogFile(test)
 
322
 
 
323
    def addSuccess(self, test):
 
324
        """Tell result that test completed successfully.
 
325
 
 
326
        Called from the TestCase run()
 
327
        """
 
328
        if self._bench_history is not None:
 
329
            benchmark_time = self._extractBenchmarkTime(test)
 
330
            if benchmark_time is not None:
 
331
                self._bench_history.write("%s %s\n" % (
 
332
                    self._formatTime(benchmark_time),
 
333
                    test.id()))
 
334
        self.report_success(test)
 
335
        self._cleanupLogFile(test)
 
336
        unittest.TestResult.addSuccess(self, test)
 
337
        test._log_contents = ''
 
338
 
 
339
    def addExpectedFailure(self, test, err):
 
340
        self.known_failure_count += 1
 
341
        self.report_known_failure(test, err)
 
342
 
 
343
    def addNotSupported(self, test, feature):
 
344
        """The test will not be run because of a missing feature.
 
345
        """
 
346
        # this can be called in two different ways: it may be that the
 
347
        # test started running, and then raised (through requireFeature)
 
348
        # UnavailableFeature.  Alternatively this method can be called
 
349
        # while probing for features before running the test code proper; in
 
350
        # that case we will see startTest and stopTest, but the test will
 
351
        # never actually run.
 
352
        self.unsupported.setdefault(str(feature), 0)
 
353
        self.unsupported[str(feature)] += 1
 
354
        self.report_unsupported(test, feature)
 
355
 
 
356
    def addSkip(self, test, reason):
 
357
        """A test has not run for 'reason'."""
 
358
        self.skip_count += 1
 
359
        self.report_skip(test, reason)
 
360
 
 
361
    def addNotApplicable(self, test, reason):
 
362
        self.not_applicable_count += 1
 
363
        self.report_not_applicable(test, reason)
 
364
 
 
365
    def printErrorList(self, flavour, errors):
 
366
        for test, err in errors:
 
367
            self.stream.writeln(self.separator1)
 
368
            self.stream.write("%s: " % flavour)
 
369
            self.stream.writeln(self.getDescription(test))
 
370
            if getattr(test, '_get_log', None) is not None:
 
371
                log_contents = test._get_log()
 
372
                if log_contents:
 
373
                    self.stream.write('\n')
 
374
                    self.stream.write(
 
375
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
376
                    self.stream.write('\n')
 
377
                    self.stream.write(log_contents)
 
378
                    self.stream.write('\n')
 
379
                    self.stream.write(
 
380
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
381
                    self.stream.write('\n')
 
382
            self.stream.writeln(self.separator2)
 
383
            self.stream.writeln("%s" % err)
 
384
 
 
385
    def _post_mortem(self):
 
386
        """Start a PDB post mortem session."""
 
387
        if os.environ.get('BZR_TEST_PDB', None):
 
388
            import pdb;pdb.post_mortem()
 
389
 
 
390
    def progress(self, offset, whence):
 
391
        """The test is adjusting the count of tests to run."""
 
392
        if whence == SUBUNIT_SEEK_SET:
 
393
            self.num_tests = offset
 
394
        elif whence == SUBUNIT_SEEK_CUR:
 
395
            self.num_tests += offset
 
396
        else:
 
397
            raise errors.BzrError("Unknown whence %r" % whence)
 
398
 
 
399
    def report_cleaning_up(self):
 
400
        pass
 
401
 
 
402
    def startTestRun(self):
 
403
        self.startTime = time.time()
 
404
 
 
405
    def report_success(self, test):
 
406
        pass
 
407
 
 
408
    def wasStrictlySuccessful(self):
 
409
        if self.unsupported or self.known_failure_count:
 
410
            return False
 
411
        return self.wasSuccessful()
 
412
 
 
413
 
 
414
class TextTestResult(ExtendedTestResult):
 
415
    """Displays progress and results of tests in text form"""
 
416
 
 
417
    def __init__(self, stream, descriptions, verbosity,
 
418
                 bench_history=None,
 
419
                 pb=None,
 
420
                 strict=None,
 
421
                 ):
 
422
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
423
            bench_history, strict)
 
424
        # We no longer pass them around, but just rely on the UIFactory stack
 
425
        # for state
 
426
        if pb is not None:
 
427
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
428
        self.pb = self.ui.nested_progress_bar()
 
429
        self.pb.show_pct = False
 
430
        self.pb.show_spinner = False
 
431
        self.pb.show_eta = False,
 
432
        self.pb.show_count = False
 
433
        self.pb.show_bar = False
 
434
        self.pb.update_latency = 0
 
435
        self.pb.show_transport_activity = False
 
436
 
 
437
    def stopTestRun(self):
 
438
        # called when the tests that are going to run have run
 
439
        self.pb.clear()
 
440
        self.pb.finished()
 
441
        super(TextTestResult, self).stopTestRun()
 
442
 
 
443
    def startTestRun(self):
 
444
        super(TextTestResult, self).startTestRun()
 
445
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
446
 
 
447
    def printErrors(self):
 
448
        # clear the pb to make room for the error listing
 
449
        self.pb.clear()
 
450
        super(TextTestResult, self).printErrors()
 
451
 
 
452
    def _progress_prefix_text(self):
 
453
        # the longer this text, the less space we have to show the test
 
454
        # name...
 
455
        a = '[%d' % self.count              # total that have been run
 
456
        # tests skipped as known not to be relevant are not important enough
 
457
        # to show here
 
458
        ## if self.skip_count:
 
459
        ##     a += ', %d skip' % self.skip_count
 
460
        ## if self.known_failure_count:
 
461
        ##     a += '+%dX' % self.known_failure_count
 
462
        if self.num_tests:
 
463
            a +='/%d' % self.num_tests
 
464
        a += ' in '
 
465
        runtime = time.time() - self._overall_start_time
 
466
        if runtime >= 60:
 
467
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
468
        else:
 
469
            a += '%ds' % runtime
 
470
        if self.error_count:
 
471
            a += ', %d err' % self.error_count
 
472
        if self.failure_count:
 
473
            a += ', %d fail' % self.failure_count
 
474
        # if self.unsupported:
 
475
        #     a += ', %d missing' % len(self.unsupported)
 
476
        a += ']'
 
477
        return a
 
478
 
 
479
    def report_test_start(self, test):
 
480
        self.count += 1
 
481
        self.pb.update(
 
482
                self._progress_prefix_text()
 
483
                + ' '
 
484
                + self._shortened_test_description(test))
 
485
 
 
486
    def _test_description(self, test):
 
487
        return self._shortened_test_description(test)
 
488
 
 
489
    def report_error(self, test, err):
 
490
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
 
491
            self._test_description(test),
 
492
            err[1],
 
493
            ))
 
494
 
 
495
    def report_failure(self, test, err):
 
496
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
 
497
            self._test_description(test),
 
498
            err[1],
 
499
            ))
 
500
 
 
501
    def report_known_failure(self, test, err):
 
502
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
 
503
            self._test_description(test), err[1]))
 
504
 
 
505
    def report_skip(self, test, reason):
 
506
        pass
 
507
 
 
508
    def report_not_applicable(self, test, reason):
 
509
        pass
 
510
 
 
511
    def report_unsupported(self, test, feature):
 
512
        """test cannot be run because feature is missing."""
 
513
 
 
514
    def report_cleaning_up(self):
 
515
        self.pb.update('Cleaning up')
 
516
 
 
517
 
 
518
class VerboseTestResult(ExtendedTestResult):
 
519
    """Produce long output, with one line per test run plus times"""
 
520
 
 
521
    def _ellipsize_to_right(self, a_string, final_width):
 
522
        """Truncate and pad a string, keeping the right hand side"""
 
523
        if len(a_string) > final_width:
 
524
            result = '...' + a_string[3-final_width:]
 
525
        else:
 
526
            result = a_string
 
527
        return result.ljust(final_width)
 
528
 
 
529
    def startTestRun(self):
 
530
        super(VerboseTestResult, self).startTestRun()
 
531
        self.stream.write('running %d tests...\n' % self.num_tests)
 
532
 
 
533
    def report_test_start(self, test):
 
534
        self.count += 1
 
535
        name = self._shortened_test_description(test)
 
536
        width = osutils.terminal_width()
 
537
        if width is not None:
 
538
            # width needs space for 6 char status, plus 1 for slash, plus an
 
539
            # 11-char time string, plus a trailing blank
 
540
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
 
541
            # space
 
542
            self.stream.write(self._ellipsize_to_right(name, width-18))
 
543
        else:
 
544
            self.stream.write(name)
 
545
        self.stream.flush()
 
546
 
 
547
    def _error_summary(self, err):
 
548
        indent = ' ' * 4
 
549
        return '%s%s' % (indent, err[1])
 
550
 
 
551
    def report_error(self, test, err):
 
552
        self.stream.writeln('ERROR %s\n%s'
 
553
                % (self._testTimeString(test),
 
554
                   self._error_summary(err)))
 
555
 
 
556
    def report_failure(self, test, err):
 
557
        self.stream.writeln(' FAIL %s\n%s'
 
558
                % (self._testTimeString(test),
 
559
                   self._error_summary(err)))
 
560
 
 
561
    def report_known_failure(self, test, err):
 
562
        self.stream.writeln('XFAIL %s\n%s'
 
563
                % (self._testTimeString(test),
 
564
                   self._error_summary(err)))
 
565
 
 
566
    def report_success(self, test):
 
567
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
568
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
569
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
570
            stats.pprint(file=self.stream)
 
571
        # flush the stream so that we get smooth output. This verbose mode is
 
572
        # used to show the output in PQM.
 
573
        self.stream.flush()
 
574
 
 
575
    def report_skip(self, test, reason):
 
576
        self.stream.writeln(' SKIP %s\n%s'
 
577
                % (self._testTimeString(test), reason))
 
578
 
 
579
    def report_not_applicable(self, test, reason):
 
580
        self.stream.writeln('  N/A %s\n    %s'
 
581
                % (self._testTimeString(test), reason))
 
582
 
 
583
    def report_unsupported(self, test, feature):
 
584
        """test cannot be run because feature is missing."""
 
585
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
586
                %(self._testTimeString(test), feature))
 
587
 
 
588
 
 
589
class TextTestRunner(object):
 
590
    stop_on_failure = False
 
591
 
 
592
    def __init__(self,
 
593
                 stream=sys.stderr,
 
594
                 descriptions=0,
 
595
                 verbosity=1,
 
596
                 bench_history=None,
 
597
                 strict=False,
 
598
                 result_decorators=None,
 
599
                 ):
 
600
        """Create a TextTestRunner.
 
601
 
 
602
        :param result_decorators: An optional list of decorators to apply
 
603
            to the result object being used by the runner. Decorators are
 
604
            applied left to right - the first element in the list is the 
 
605
            innermost decorator.
 
606
        """
 
607
        self.stream = unittest._WritelnDecorator(stream)
 
608
        self.descriptions = descriptions
 
609
        self.verbosity = verbosity
 
610
        self._bench_history = bench_history
 
611
        self._strict = strict
 
612
        self._result_decorators = result_decorators or []
 
613
 
 
614
    def run(self, test):
 
615
        "Run the given test case or test suite."
 
616
        if self.verbosity == 1:
 
617
            result_class = TextTestResult
 
618
        elif self.verbosity >= 2:
 
619
            result_class = VerboseTestResult
 
620
        original_result = result_class(self.stream,
 
621
                              self.descriptions,
 
622
                              self.verbosity,
 
623
                              bench_history=self._bench_history,
 
624
                              strict=self._strict,
 
625
                              )
 
626
        # Signal to result objects that look at stop early policy to stop,
 
627
        original_result.stop_early = self.stop_on_failure
 
628
        result = original_result
 
629
        for decorator in self._result_decorators:
 
630
            result = decorator(result)
 
631
            result.stop_early = self.stop_on_failure
 
632
        try:
 
633
            import testtools
 
634
        except ImportError:
 
635
            pass
 
636
        else:
 
637
            if isinstance(test, testtools.ConcurrentTestSuite):
 
638
                # We need to catch bzr specific behaviors
 
639
                result = BZRTransformingResult(result)
 
640
        result.startTestRun()
 
641
        try:
 
642
            test.run(result)
 
643
        finally:
 
644
            result.stopTestRun()
 
645
        # higher level code uses our extended protocol to determine
 
646
        # what exit code to give.
 
647
        return original_result
 
648
 
 
649
 
 
650
def iter_suite_tests(suite):
 
651
    """Return all tests in a suite, recursing through nested suites"""
 
652
    if isinstance(suite, unittest.TestCase):
 
653
        yield suite
 
654
    elif isinstance(suite, unittest.TestSuite):
 
655
        for item in suite:
 
656
            for r in iter_suite_tests(item):
 
657
                yield r
 
658
    else:
 
659
        raise Exception('unknown type %r for object %r'
 
660
                        % (type(suite), suite))
 
661
 
 
662
 
 
663
class TestSkipped(Exception):
 
664
    """Indicates that a test was intentionally skipped, rather than failing."""
 
665
 
 
666
 
 
667
class TestNotApplicable(TestSkipped):
 
668
    """A test is not applicable to the situation where it was run.
 
669
 
 
670
    This is only normally raised by parameterized tests, if they find that
 
671
    the instance they're constructed upon does not support one aspect
 
672
    of its interface.
 
673
    """
 
674
 
 
675
 
 
676
class KnownFailure(AssertionError):
 
677
    """Indicates that a test failed in a precisely expected manner.
 
678
 
 
679
    Such failures dont block the whole test suite from passing because they are
 
680
    indicators of partially completed code or of future work. We have an
 
681
    explicit error for them so that we can ensure that they are always visible:
 
682
    KnownFailures are always shown in the output of bzr selftest.
 
683
    """
 
684
 
 
685
 
 
686
class UnavailableFeature(Exception):
 
687
    """A feature required for this test was not available.
 
688
 
 
689
    This can be considered a specialised form of SkippedTest.
 
690
 
 
691
    The feature should be used to construct the exception.
 
692
    """
 
693
 
38
694
 
39
695
class CommandFailed(Exception):
40
696
    pass
41
697
 
 
698
 
 
699
class StringIOWrapper(object):
 
700
    """A wrapper around cStringIO which just adds an encoding attribute.
 
701
 
 
702
    Internally we can check sys.stdout to see what the output encoding
 
703
    should be. However, cStringIO has no encoding attribute that we can
 
704
    set. So we wrap it instead.
 
705
    """
 
706
    encoding='ascii'
 
707
    _cstring = None
 
708
 
 
709
    def __init__(self, s=None):
 
710
        if s is not None:
 
711
            self.__dict__['_cstring'] = StringIO(s)
 
712
        else:
 
713
            self.__dict__['_cstring'] = StringIO()
 
714
 
 
715
    def __getattr__(self, name, getattr=getattr):
 
716
        return getattr(self.__dict__['_cstring'], name)
 
717
 
 
718
    def __setattr__(self, name, val):
 
719
        if name == 'encoding':
 
720
            self.__dict__['encoding'] = val
 
721
        else:
 
722
            return setattr(self._cstring, name, val)
 
723
 
 
724
 
 
725
class TestUIFactory(TextUIFactory):
 
726
    """A UI Factory for testing.
 
727
 
 
728
    Hide the progress bar but emit note()s.
 
729
    Redirect stdin.
 
730
    Allows get_password to be tested without real tty attached.
 
731
 
 
732
    See also CannedInputUIFactory which lets you provide programmatic input in
 
733
    a structured way.
 
734
    """
 
735
    # TODO: Capture progress events at the model level and allow them to be
 
736
    # observed by tests that care.
 
737
    #
 
738
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
739
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
740
    # idea of how they're supposed to be different.
 
741
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
 
742
 
 
743
    def __init__(self, stdout=None, stderr=None, stdin=None):
 
744
        if stdin is not None:
 
745
            # We use a StringIOWrapper to be able to test various
 
746
            # encodings, but the user is still responsible to
 
747
            # encode the string and to set the encoding attribute
 
748
            # of StringIOWrapper.
 
749
            stdin = StringIOWrapper(stdin)
 
750
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
751
 
 
752
    def get_non_echoed_password(self):
 
753
        """Get password from stdin without trying to handle the echo mode"""
 
754
        password = self.stdin.readline()
 
755
        if not password:
 
756
            raise EOFError
 
757
        if password[-1] == '\n':
 
758
            password = password[:-1]
 
759
        return password
 
760
 
 
761
    def make_progress_view(self):
 
762
        return NullProgressView()
 
763
 
 
764
 
42
765
class TestCase(unittest.TestCase):
43
766
    """Base class for bzr unit tests.
44
 
    
45
 
    Tests that need access to disk resources should subclass 
 
767
 
 
768
    Tests that need access to disk resources should subclass
46
769
    TestCaseInTempDir not TestCase.
47
770
 
48
771
    Error and debug log messages are redirected from their usual
49
772
    location into a temporary file, the contents of which can be
50
 
    retrieved by _get_log().
51
 
       
 
773
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
774
    so that it can also capture file IO.  When the test completes this file
 
775
    is read into memory and removed from disk.
 
776
 
52
777
    There are also convenience functions to invoke bzr's command-line
53
 
    routine, and to build and check bzr trees."""
54
 
 
55
 
    BZRPATH = 'bzr'
 
778
    routine, and to build and check bzr trees.
 
779
 
 
780
    In addition to the usual method of overriding tearDown(), this class also
 
781
    allows subclasses to register functions into the _cleanups list, which is
 
782
    run in order as the object is torn down.  It's less likely this will be
 
783
    accidentally overlooked.
 
784
    """
 
785
 
 
786
    _active_threads = None
 
787
    _leaking_threads_tests = 0
 
788
    _first_thread_leaker_id = None
 
789
    _log_file_name = None
 
790
    _log_contents = ''
 
791
    _keep_log_file = False
 
792
    # record lsprof data when performing benchmark calls.
 
793
    _gather_lsprof_in_benchmarks = False
 
794
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
795
                     '_log_contents', '_log_file_name', '_benchtime',
 
796
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
 
797
 
 
798
    def __init__(self, methodName='testMethod'):
 
799
        super(TestCase, self).__init__(methodName)
 
800
        self._cleanups = []
 
801
        self._bzr_test_setUp_run = False
 
802
        self._bzr_test_tearDown_run = False
 
803
        self._directory_isolation = True
56
804
 
57
805
    def setUp(self):
58
 
        # this replaces the default testsweet.TestCase; we don't want logging changed
59
806
        unittest.TestCase.setUp(self)
60
 
        bzrlib.trace.disable_default_logging()
61
 
        self._enable_file_logging()
62
 
 
63
 
 
64
 
    def _enable_file_logging(self):
 
807
        self._bzr_test_setUp_run = True
 
808
        self._cleanEnvironment()
 
809
        self._silenceUI()
 
810
        self._startLogFile()
 
811
        self._benchcalls = []
 
812
        self._benchtime = None
 
813
        self._clear_hooks()
 
814
        self._track_transports()
 
815
        self._track_locks()
 
816
        self._clear_debug_flags()
 
817
        TestCase._active_threads = threading.activeCount()
 
818
        self.addCleanup(self._check_leaked_threads)
 
819
 
 
820
    def debug(self):
 
821
        # debug a frame up.
 
822
        import pdb
 
823
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
824
 
 
825
    def _check_leaked_threads(self):
 
826
        active = threading.activeCount()
 
827
        leaked_threads = active - TestCase._active_threads
 
828
        TestCase._active_threads = active
 
829
        # If some tests make the number of threads *decrease*, we'll consider
 
830
        # that they are just observing old threads dieing, not agressively kill
 
831
        # random threads. So we don't report these tests as leaking. The risk
 
832
        # is that we have false positives that way (the test see 2 threads
 
833
        # going away but leak one) but it seems less likely than the actual
 
834
        # false positives (the test see threads going away and does not leak).
 
835
        if leaked_threads > 0:
 
836
            TestCase._leaking_threads_tests += 1
 
837
            if TestCase._first_thread_leaker_id is None:
 
838
                TestCase._first_thread_leaker_id = self.id()
 
839
 
 
840
    def _clear_debug_flags(self):
 
841
        """Prevent externally set debug flags affecting tests.
 
842
 
 
843
        Tests that want to use debug flags can just set them in the
 
844
        debug_flags set during setup/teardown.
 
845
        """
 
846
        self._preserved_debug_flags = set(debug.debug_flags)
 
847
        if 'allow_debug' not in selftest_debug_flags:
 
848
            debug.debug_flags.clear()
 
849
        if 'disable_lock_checks' not in selftest_debug_flags:
 
850
            debug.debug_flags.add('strict_locks')
 
851
        self.addCleanup(self._restore_debug_flags)
 
852
 
 
853
    def _clear_hooks(self):
 
854
        # prevent hooks affecting tests
 
855
        self._preserved_hooks = {}
 
856
        for key, factory in hooks.known_hooks.items():
 
857
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
858
            current_hooks = hooks.known_hooks_key_to_object(key)
 
859
            self._preserved_hooks[parent] = (name, current_hooks)
 
860
        self.addCleanup(self._restoreHooks)
 
861
        for key, factory in hooks.known_hooks.items():
 
862
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
863
            setattr(parent, name, factory())
 
864
        # this hook should always be installed
 
865
        request._install_hook()
 
866
 
 
867
    def disable_directory_isolation(self):
 
868
        """Turn off directory isolation checks."""
 
869
        self._directory_isolation = False
 
870
 
 
871
    def enable_directory_isolation(self):
 
872
        """Enable directory isolation checks."""
 
873
        self._directory_isolation = True
 
874
 
 
875
    def _silenceUI(self):
 
876
        """Turn off UI for duration of test"""
 
877
        # by default the UI is off; tests can turn it on if they want it.
 
878
        saved = ui.ui_factory
 
879
        def _restore():
 
880
            ui.ui_factory = saved
 
881
        ui.ui_factory = ui.SilentUIFactory()
 
882
        self.addCleanup(_restore)
 
883
 
 
884
    def _check_locks(self):
 
885
        """Check that all lock take/release actions have been paired."""
 
886
        # We always check for mismatched locks. If a mismatch is found, we
 
887
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
888
        # case we just print a warning.
 
889
        # unhook:
 
890
        acquired_locks = [lock for action, lock in self._lock_actions
 
891
                          if action == 'acquired']
 
892
        released_locks = [lock for action, lock in self._lock_actions
 
893
                          if action == 'released']
 
894
        broken_locks = [lock for action, lock in self._lock_actions
 
895
                        if action == 'broken']
 
896
        # trivially, given the tests for lock acquistion and release, if we
 
897
        # have as many in each list, it should be ok. Some lock tests also
 
898
        # break some locks on purpose and should be taken into account by
 
899
        # considering that breaking a lock is just a dirty way of releasing it.
 
900
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
901
            message = ('Different number of acquired and '
 
902
                       'released or broken locks. (%s, %s + %s)' %
 
903
                       (acquired_locks, released_locks, broken_locks))
 
904
            if not self._lock_check_thorough:
 
905
                # Rather than fail, just warn
 
906
                print "Broken test %s: %s" % (self, message)
 
907
                return
 
908
            self.fail(message)
 
909
 
 
910
    def _track_locks(self):
 
911
        """Track lock activity during tests."""
 
912
        self._lock_actions = []
 
913
        if 'disable_lock_checks' in selftest_debug_flags:
 
914
            self._lock_check_thorough = False
 
915
        else:
 
916
            self._lock_check_thorough = True
 
917
            
 
918
        self.addCleanup(self._check_locks)
 
919
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
920
                                                self._lock_acquired, None)
 
921
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
922
                                                self._lock_released, None)
 
923
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
924
                                                self._lock_broken, None)
 
925
 
 
926
    def _lock_acquired(self, result):
 
927
        self._lock_actions.append(('acquired', result))
 
928
 
 
929
    def _lock_released(self, result):
 
930
        self._lock_actions.append(('released', result))
 
931
 
 
932
    def _lock_broken(self, result):
 
933
        self._lock_actions.append(('broken', result))
 
934
 
 
935
    def permit_dir(self, name):
 
936
        """Permit a directory to be used by this test. See permit_url."""
 
937
        name_transport = get_transport(name)
 
938
        self.permit_url(name)
 
939
        self.permit_url(name_transport.base)
 
940
 
 
941
    def permit_url(self, url):
 
942
        """Declare that url is an ok url to use in this test.
 
943
        
 
944
        Do this for memory transports, temporary test directory etc.
 
945
        
 
946
        Do not do this for the current working directory, /tmp, or any other
 
947
        preexisting non isolated url.
 
948
        """
 
949
        if not url.endswith('/'):
 
950
            url += '/'
 
951
        self._bzr_selftest_roots.append(url)
 
952
 
 
953
    def permit_source_tree_branch_repo(self):
 
954
        """Permit the source tree bzr is running from to be opened.
 
955
 
 
956
        Some code such as bzrlib.version attempts to read from the bzr branch
 
957
        that bzr is executing from (if any). This method permits that directory
 
958
        to be used in the test suite.
 
959
        """
 
960
        path = self.get_source_path()
 
961
        self.record_directory_isolation()
 
962
        try:
 
963
            try:
 
964
                workingtree.WorkingTree.open(path)
 
965
            except (errors.NotBranchError, errors.NoWorkingTree):
 
966
                return
 
967
        finally:
 
968
            self.enable_directory_isolation()
 
969
 
 
970
    def _preopen_isolate_transport(self, transport):
 
971
        """Check that all transport openings are done in the test work area."""
 
972
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
973
            # Unwrap pathfiltered transports
 
974
            transport = transport.server.backing_transport.clone(
 
975
                transport._filter('.'))
 
976
        url = transport.base
 
977
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
978
        # urls it is given by prepending readonly+. This is appropriate as the
 
979
        # client shouldn't know that the server is readonly (or not readonly).
 
980
        # We could register all servers twice, with readonly+ prepending, but
 
981
        # that makes for a long list; this is about the same but easier to
 
982
        # read.
 
983
        if url.startswith('readonly+'):
 
984
            url = url[len('readonly+'):]
 
985
        self._preopen_isolate_url(url)
 
986
 
 
987
    def _preopen_isolate_url(self, url):
 
988
        if not self._directory_isolation:
 
989
            return
 
990
        if self._directory_isolation == 'record':
 
991
            self._bzr_selftest_roots.append(url)
 
992
            return
 
993
        # This prevents all transports, including e.g. sftp ones backed on disk
 
994
        # from working unless they are explicitly granted permission. We then
 
995
        # depend on the code that sets up test transports to check that they are
 
996
        # appropriately isolated and enable their use by calling
 
997
        # self.permit_transport()
 
998
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
999
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
1000
                % (url, self._bzr_selftest_roots))
 
1001
 
 
1002
    def record_directory_isolation(self):
 
1003
        """Gather accessed directories to permit later access.
 
1004
        
 
1005
        This is used for tests that access the branch bzr is running from.
 
1006
        """
 
1007
        self._directory_isolation = "record"
 
1008
 
 
1009
    def start_server(self, transport_server, backing_server=None):
 
1010
        """Start transport_server for this test.
 
1011
 
 
1012
        This starts the server, registers a cleanup for it and permits the
 
1013
        server's urls to be used.
 
1014
        """
 
1015
        if backing_server is None:
 
1016
            transport_server.setUp()
 
1017
        else:
 
1018
            transport_server.setUp(backing_server)
 
1019
        self.addCleanup(transport_server.tearDown)
 
1020
        # Obtain a real transport because if the server supplies a password, it
 
1021
        # will be hidden from the base on the client side.
 
1022
        t = get_transport(transport_server.get_url())
 
1023
        # Some transport servers effectively chroot the backing transport;
 
1024
        # others like SFTPServer don't - users of the transport can walk up the
 
1025
        # transport to read the entire backing transport. This wouldn't matter
 
1026
        # except that the workdir tests are given - and that they expect the
 
1027
        # server's url to point at - is one directory under the safety net. So
 
1028
        # Branch operations into the transport will attempt to walk up one
 
1029
        # directory. Chrooting all servers would avoid this but also mean that
 
1030
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1031
        # getting the test framework to start the server with a backing server
 
1032
        # at the actual safety net directory would work too, but this then
 
1033
        # means that the self.get_url/self.get_transport methods would need
 
1034
        # to transform all their results. On balance its cleaner to handle it
 
1035
        # here, and permit a higher url when we have one of these transports.
 
1036
        if t.base.endswith('/work/'):
 
1037
            # we have safety net/test root/work
 
1038
            t = t.clone('../..')
 
1039
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1040
            # The smart server adds a path similar to work, which is traversed
 
1041
            # up from by the client. But the server is chrooted - the actual
 
1042
            # backing transport is not escaped from, and VFS requests to the
 
1043
            # root will error (because they try to escape the chroot).
 
1044
            t2 = t.clone('..')
 
1045
            while t2.base != t.base:
 
1046
                t = t2
 
1047
                t2 = t.clone('..')
 
1048
        self.permit_url(t.base)
 
1049
 
 
1050
    def _track_transports(self):
 
1051
        """Install checks for transport usage."""
 
1052
        # TestCase has no safe place it can write to.
 
1053
        self._bzr_selftest_roots = []
 
1054
        # Currently the easiest way to be sure that nothing is going on is to
 
1055
        # hook into bzr dir opening. This leaves a small window of error for
 
1056
        # transport tests, but they are well known, and we can improve on this
 
1057
        # step.
 
1058
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1059
            self._preopen_isolate_transport, "Check bzr directories are safe.")
 
1060
 
 
1061
    def _ndiff_strings(self, a, b):
 
1062
        """Return ndiff between two strings containing lines.
 
1063
 
 
1064
        A trailing newline is added if missing to make the strings
 
1065
        print properly."""
 
1066
        if b and b[-1] != '\n':
 
1067
            b += '\n'
 
1068
        if a and a[-1] != '\n':
 
1069
            a += '\n'
 
1070
        difflines = difflib.ndiff(a.splitlines(True),
 
1071
                                  b.splitlines(True),
 
1072
                                  linejunk=lambda x: False,
 
1073
                                  charjunk=lambda x: False)
 
1074
        return ''.join(difflines)
 
1075
 
 
1076
    def assertEqual(self, a, b, message=''):
 
1077
        try:
 
1078
            if a == b:
 
1079
                return
 
1080
        except UnicodeError, e:
 
1081
            # If we can't compare without getting a UnicodeError, then
 
1082
            # obviously they are different
 
1083
            mutter('UnicodeError: %s', e)
 
1084
        if message:
 
1085
            message += '\n'
 
1086
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
1087
            % (message,
 
1088
               pformat(a), pformat(b)))
 
1089
 
 
1090
    assertEquals = assertEqual
 
1091
 
 
1092
    def assertEqualDiff(self, a, b, message=None):
 
1093
        """Assert two texts are equal, if not raise an exception.
 
1094
 
 
1095
        This is intended for use with multi-line strings where it can
 
1096
        be hard to find the differences by eye.
 
1097
        """
 
1098
        # TODO: perhaps override assertEquals to call this for strings?
 
1099
        if a == b:
 
1100
            return
 
1101
        if message is None:
 
1102
            message = "texts not equal:\n"
 
1103
        if a + '\n' == b:
 
1104
            message = 'first string is missing a final newline.\n'
 
1105
        if a == b + '\n':
 
1106
            message = 'second string is missing a final newline.\n'
 
1107
        raise AssertionError(message +
 
1108
                             self._ndiff_strings(a, b))
 
1109
 
 
1110
    def assertEqualMode(self, mode, mode_test):
 
1111
        self.assertEqual(mode, mode_test,
 
1112
                         'mode mismatch %o != %o' % (mode, mode_test))
 
1113
 
 
1114
    def assertEqualStat(self, expected, actual):
 
1115
        """assert that expected and actual are the same stat result.
 
1116
 
 
1117
        :param expected: A stat result.
 
1118
        :param actual: A stat result.
 
1119
        :raises AssertionError: If the expected and actual stat values differ
 
1120
            other than by atime.
 
1121
        """
 
1122
        self.assertEqual(expected.st_size, actual.st_size,
 
1123
                         'st_size did not match')
 
1124
        self.assertEqual(expected.st_mtime, actual.st_mtime,
 
1125
                         'st_mtime did not match')
 
1126
        self.assertEqual(expected.st_ctime, actual.st_ctime,
 
1127
                         'st_ctime did not match')
 
1128
        if sys.platform != 'win32':
 
1129
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
 
1130
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
 
1131
            # odd. Regardless we shouldn't actually try to assert anything
 
1132
            # about their values
 
1133
            self.assertEqual(expected.st_dev, actual.st_dev,
 
1134
                             'st_dev did not match')
 
1135
            self.assertEqual(expected.st_ino, actual.st_ino,
 
1136
                             'st_ino did not match')
 
1137
        self.assertEqual(expected.st_mode, actual.st_mode,
 
1138
                         'st_mode did not match')
 
1139
 
 
1140
    def assertLength(self, length, obj_with_len):
 
1141
        """Assert that obj_with_len is of length length."""
 
1142
        if len(obj_with_len) != length:
 
1143
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1144
                length, len(obj_with_len), obj_with_len))
 
1145
 
 
1146
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1147
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1148
        """
 
1149
        from bzrlib import trace
 
1150
        captured = []
 
1151
        orig_log_exception_quietly = trace.log_exception_quietly
 
1152
        try:
 
1153
            def capture():
 
1154
                orig_log_exception_quietly()
 
1155
                captured.append(sys.exc_info())
 
1156
            trace.log_exception_quietly = capture
 
1157
            func(*args, **kwargs)
 
1158
        finally:
 
1159
            trace.log_exception_quietly = orig_log_exception_quietly
 
1160
        self.assertLength(1, captured)
 
1161
        err = captured[0][1]
 
1162
        self.assertIsInstance(err, exception_class)
 
1163
        return err
 
1164
 
 
1165
    def assertPositive(self, val):
 
1166
        """Assert that val is greater than 0."""
 
1167
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
1168
 
 
1169
    def assertNegative(self, val):
 
1170
        """Assert that val is less than 0."""
 
1171
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
1172
 
 
1173
    def assertStartsWith(self, s, prefix):
 
1174
        if not s.startswith(prefix):
 
1175
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
1176
 
 
1177
    def assertEndsWith(self, s, suffix):
 
1178
        """Asserts that s ends with suffix."""
 
1179
        if not s.endswith(suffix):
 
1180
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
1181
 
 
1182
    def assertContainsRe(self, haystack, needle_re, flags=0):
 
1183
        """Assert that a contains something matching a regular expression."""
 
1184
        if not re.search(needle_re, haystack, flags):
 
1185
            if '\n' in haystack or len(haystack) > 60:
 
1186
                # a long string, format it in a more readable way
 
1187
                raise AssertionError(
 
1188
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
1189
                        % (needle_re, haystack))
 
1190
            else:
 
1191
                raise AssertionError('pattern "%s" not found in "%s"'
 
1192
                        % (needle_re, haystack))
 
1193
 
 
1194
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
 
1195
        """Assert that a does not match a regular expression"""
 
1196
        if re.search(needle_re, haystack, flags):
 
1197
            raise AssertionError('pattern "%s" found in "%s"'
 
1198
                    % (needle_re, haystack))
 
1199
 
 
1200
    def assertSubset(self, sublist, superlist):
 
1201
        """Assert that every entry in sublist is present in superlist."""
 
1202
        missing = set(sublist) - set(superlist)
 
1203
        if len(missing) > 0:
 
1204
            raise AssertionError("value(s) %r not present in container %r" %
 
1205
                                 (missing, superlist))
 
1206
 
 
1207
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
1208
        """Fail unless excClass is raised when the iterator from func is used.
 
1209
 
 
1210
        Many functions can return generators this makes sure
 
1211
        to wrap them in a list() call to make sure the whole generator
 
1212
        is run, and that the proper exception is raised.
 
1213
        """
 
1214
        try:
 
1215
            list(func(*args, **kwargs))
 
1216
        except excClass, e:
 
1217
            return e
 
1218
        else:
 
1219
            if getattr(excClass,'__name__', None) is not None:
 
1220
                excName = excClass.__name__
 
1221
            else:
 
1222
                excName = str(excClass)
 
1223
            raise self.failureException, "%s not raised" % excName
 
1224
 
 
1225
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
1226
        """Assert that a callable raises a particular exception.
 
1227
 
 
1228
        :param excClass: As for the except statement, this may be either an
 
1229
            exception class, or a tuple of classes.
 
1230
        :param callableObj: A callable, will be passed ``*args`` and
 
1231
            ``**kwargs``.
 
1232
 
 
1233
        Returns the exception so that you can examine it.
 
1234
        """
 
1235
        try:
 
1236
            callableObj(*args, **kwargs)
 
1237
        except excClass, e:
 
1238
            return e
 
1239
        else:
 
1240
            if getattr(excClass,'__name__', None) is not None:
 
1241
                excName = excClass.__name__
 
1242
            else:
 
1243
                # probably a tuple
 
1244
                excName = str(excClass)
 
1245
            raise self.failureException, "%s not raised" % excName
 
1246
 
 
1247
    def assertIs(self, left, right, message=None):
 
1248
        if not (left is right):
 
1249
            if message is not None:
 
1250
                raise AssertionError(message)
 
1251
            else:
 
1252
                raise AssertionError("%r is not %r." % (left, right))
 
1253
 
 
1254
    def assertIsNot(self, left, right, message=None):
 
1255
        if (left is right):
 
1256
            if message is not None:
 
1257
                raise AssertionError(message)
 
1258
            else:
 
1259
                raise AssertionError("%r is %r." % (left, right))
 
1260
 
 
1261
    def assertTransportMode(self, transport, path, mode):
 
1262
        """Fail if a path does not have mode "mode".
 
1263
 
 
1264
        If modes are not supported on this transport, the assertion is ignored.
 
1265
        """
 
1266
        if not transport._can_roundtrip_unix_modebits():
 
1267
            return
 
1268
        path_stat = transport.stat(path)
 
1269
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
1270
        self.assertEqual(mode, actual_mode,
 
1271
                         'mode of %r incorrect (%s != %s)'
 
1272
                         % (path, oct(mode), oct(actual_mode)))
 
1273
 
 
1274
    def assertIsSameRealPath(self, path1, path2):
 
1275
        """Fail if path1 and path2 points to different files"""
 
1276
        self.assertEqual(osutils.realpath(path1),
 
1277
                         osutils.realpath(path2),
 
1278
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1279
 
 
1280
    def assertIsInstance(self, obj, kls, msg=None):
 
1281
        """Fail if obj is not an instance of kls
 
1282
        
 
1283
        :param msg: Supplementary message to show if the assertion fails.
 
1284
        """
 
1285
        if not isinstance(obj, kls):
 
1286
            m = "%r is an instance of %s rather than %s" % (
 
1287
                obj, obj.__class__, kls)
 
1288
            if msg:
 
1289
                m += ": " + msg
 
1290
            self.fail(m)
 
1291
 
 
1292
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1293
        """Invoke a test, expecting it to fail for the given reason.
 
1294
 
 
1295
        This is for assertions that ought to succeed, but currently fail.
 
1296
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1297
        about the failure you're expecting.  If a new bug is introduced,
 
1298
        AssertionError should be raised, not KnownFailure.
 
1299
 
 
1300
        Frequently, expectFailure should be followed by an opposite assertion.
 
1301
        See example below.
 
1302
 
 
1303
        Intended to be used with a callable that raises AssertionError as the
 
1304
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1305
 
 
1306
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1307
        test succeeds.
 
1308
 
 
1309
        example usage::
 
1310
 
 
1311
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1312
                             dynamic_val)
 
1313
          self.assertEqual(42, dynamic_val)
 
1314
 
 
1315
          This means that a dynamic_val of 54 will cause the test to raise
 
1316
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1317
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1318
          than 54 or 42 will cause an AssertionError.
 
1319
        """
 
1320
        try:
 
1321
            assertion(*args, **kwargs)
 
1322
        except AssertionError:
 
1323
            raise KnownFailure(reason)
 
1324
        else:
 
1325
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1326
 
 
1327
    def assertFileEqual(self, content, path):
 
1328
        """Fail if path does not contain 'content'."""
 
1329
        self.failUnlessExists(path)
 
1330
        f = file(path, 'rb')
 
1331
        try:
 
1332
            s = f.read()
 
1333
        finally:
 
1334
            f.close()
 
1335
        self.assertEqualDiff(content, s)
 
1336
 
 
1337
    def failUnlessExists(self, path):
 
1338
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1339
        if not isinstance(path, basestring):
 
1340
            for p in path:
 
1341
                self.failUnlessExists(p)
 
1342
        else:
 
1343
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1344
 
 
1345
    def failIfExists(self, path):
 
1346
        """Fail if path or paths, which may be abs or relative, exist."""
 
1347
        if not isinstance(path, basestring):
 
1348
            for p in path:
 
1349
                self.failIfExists(p)
 
1350
        else:
 
1351
            self.failIf(osutils.lexists(path),path+" exists")
 
1352
 
 
1353
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1354
        """A helper for callDeprecated and applyDeprecated.
 
1355
 
 
1356
        :param a_callable: A callable to call.
 
1357
        :param args: The positional arguments for the callable
 
1358
        :param kwargs: The keyword arguments for the callable
 
1359
        :return: A tuple (warnings, result). result is the result of calling
 
1360
            a_callable(``*args``, ``**kwargs``).
 
1361
        """
 
1362
        local_warnings = []
 
1363
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1364
            # we've hooked into a deprecation specific callpath,
 
1365
            # only deprecations should getting sent via it.
 
1366
            self.assertEqual(cls, DeprecationWarning)
 
1367
            local_warnings.append(msg)
 
1368
        original_warning_method = symbol_versioning.warn
 
1369
        symbol_versioning.set_warning_method(capture_warnings)
 
1370
        try:
 
1371
            result = a_callable(*args, **kwargs)
 
1372
        finally:
 
1373
            symbol_versioning.set_warning_method(original_warning_method)
 
1374
        return (local_warnings, result)
 
1375
 
 
1376
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1377
        """Call a deprecated callable without warning the user.
 
1378
 
 
1379
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1380
        not other callers that go direct to the warning module.
 
1381
 
 
1382
        To test that a deprecated method raises an error, do something like
 
1383
        this::
 
1384
 
 
1385
            self.assertRaises(errors.ReservedId,
 
1386
                self.applyDeprecated,
 
1387
                deprecated_in((1, 5, 0)),
 
1388
                br.append_revision,
 
1389
                'current:')
 
1390
 
 
1391
        :param deprecation_format: The deprecation format that the callable
 
1392
            should have been deprecated with. This is the same type as the
 
1393
            parameter to deprecated_method/deprecated_function. If the
 
1394
            callable is not deprecated with this format, an assertion error
 
1395
            will be raised.
 
1396
        :param a_callable: A callable to call. This may be a bound method or
 
1397
            a regular function. It will be called with ``*args`` and
 
1398
            ``**kwargs``.
 
1399
        :param args: The positional arguments for the callable
 
1400
        :param kwargs: The keyword arguments for the callable
 
1401
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1402
        """
 
1403
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1404
            *args, **kwargs)
 
1405
        expected_first_warning = symbol_versioning.deprecation_string(
 
1406
            a_callable, deprecation_format)
 
1407
        if len(call_warnings) == 0:
 
1408
            self.fail("No deprecation warning generated by call to %s" %
 
1409
                a_callable)
 
1410
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1411
        return result
 
1412
 
 
1413
    def callCatchWarnings(self, fn, *args, **kw):
 
1414
        """Call a callable that raises python warnings.
 
1415
 
 
1416
        The caller's responsible for examining the returned warnings.
 
1417
 
 
1418
        If the callable raises an exception, the exception is not
 
1419
        caught and propagates up to the caller.  In that case, the list
 
1420
        of warnings is not available.
 
1421
 
 
1422
        :returns: ([warning_object, ...], fn_result)
 
1423
        """
 
1424
        # XXX: This is not perfect, because it completely overrides the
 
1425
        # warnings filters, and some code may depend on suppressing particular
 
1426
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1427
        # though.  -- Andrew, 20071062
 
1428
        wlist = []
 
1429
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1430
            # despite the name, 'message' is normally(?) a Warning subclass
 
1431
            # instance
 
1432
            wlist.append(message)
 
1433
        saved_showwarning = warnings.showwarning
 
1434
        saved_filters = warnings.filters
 
1435
        try:
 
1436
            warnings.showwarning = _catcher
 
1437
            warnings.filters = []
 
1438
            result = fn(*args, **kw)
 
1439
        finally:
 
1440
            warnings.showwarning = saved_showwarning
 
1441
            warnings.filters = saved_filters
 
1442
        return wlist, result
 
1443
 
 
1444
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1445
        """Assert that a callable is deprecated in a particular way.
 
1446
 
 
1447
        This is a very precise test for unusual requirements. The
 
1448
        applyDeprecated helper function is probably more suited for most tests
 
1449
        as it allows you to simply specify the deprecation format being used
 
1450
        and will ensure that that is issued for the function being called.
 
1451
 
 
1452
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1453
        not other callers that go direct to the warning module.  To catch
 
1454
        general warnings, use callCatchWarnings.
 
1455
 
 
1456
        :param expected: a list of the deprecation warnings expected, in order
 
1457
        :param callable: The callable to call
 
1458
        :param args: The positional arguments for the callable
 
1459
        :param kwargs: The keyword arguments for the callable
 
1460
        """
 
1461
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1462
            *args, **kwargs)
 
1463
        self.assertEqual(expected, call_warnings)
 
1464
        return result
 
1465
 
 
1466
    def _startLogFile(self):
 
1467
        """Send bzr and test log messages to a temporary file.
 
1468
 
 
1469
        The file is removed as the test is torn down.
 
1470
        """
65
1471
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
66
 
 
67
1472
        self._log_file = os.fdopen(fileno, 'w+')
68
 
 
69
 
        hdlr = logging.StreamHandler(self._log_file)
70
 
        hdlr.setLevel(logging.DEBUG)
71
 
        hdlr.setFormatter(logging.Formatter('%(levelname)8s  %(message)s'))
72
 
        logging.getLogger('').addHandler(hdlr)
73
 
        logging.getLogger('').setLevel(logging.DEBUG)
74
 
        self._log_hdlr = hdlr
75
 
        debug('opened log file %s', name)
76
 
        
 
1473
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
77
1474
        self._log_file_name = name
78
 
 
79
 
        
80
 
    def tearDown(self):
81
 
        logging.getLogger('').removeHandler(self._log_hdlr)
82
 
        bzrlib.trace.enable_default_logging()
83
 
        logging.debug('%s teardown', self.id())
 
1475
        self.addCleanup(self._finishLogFile)
 
1476
 
 
1477
    def _finishLogFile(self):
 
1478
        """Finished with the log file.
 
1479
 
 
1480
        Close the file and delete it, unless setKeepLogfile was called.
 
1481
        """
 
1482
        if self._log_file is None:
 
1483
            return
 
1484
        bzrlib.trace.pop_log_file(self._log_memento)
84
1485
        self._log_file.close()
 
1486
        self._log_file = None
 
1487
        if not self._keep_log_file:
 
1488
            os.remove(self._log_file_name)
 
1489
            self._log_file_name = None
 
1490
 
 
1491
    def setKeepLogfile(self):
 
1492
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1493
        self._keep_log_file = True
 
1494
 
 
1495
    def thisFailsStrictLockCheck(self):
 
1496
        """It is known that this test would fail with -Dstrict_locks.
 
1497
 
 
1498
        By default, all tests are run with strict lock checking unless
 
1499
        -Edisable_lock_checks is supplied. However there are some tests which
 
1500
        we know fail strict locks at this point that have not been fixed.
 
1501
        They should call this function to disable the strict checking.
 
1502
 
 
1503
        This should be used sparingly, it is much better to fix the locking
 
1504
        issues rather than papering over the problem by calling this function.
 
1505
        """
 
1506
        debug.debug_flags.discard('strict_locks')
 
1507
 
 
1508
    def addCleanup(self, callable, *args, **kwargs):
 
1509
        """Arrange to run a callable when this case is torn down.
 
1510
 
 
1511
        Callables are run in the reverse of the order they are registered,
 
1512
        ie last-in first-out.
 
1513
        """
 
1514
        self._cleanups.append((callable, args, kwargs))
 
1515
 
 
1516
    def _cleanEnvironment(self):
 
1517
        new_env = {
 
1518
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1519
            'HOME': os.getcwd(),
 
1520
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1521
            # tests do check our impls match APPDATA
 
1522
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1523
            'VISUAL': None,
 
1524
            'EDITOR': None,
 
1525
            'BZR_EMAIL': None,
 
1526
            'BZREMAIL': None, # may still be present in the environment
 
1527
            'EMAIL': None,
 
1528
            'BZR_PROGRESS_BAR': None,
 
1529
            'BZR_LOG': None,
 
1530
            'BZR_PLUGIN_PATH': None,
 
1531
            'BZR_CONCURRENCY': None,
 
1532
            # Make sure that any text ui tests are consistent regardless of
 
1533
            # the environment the test case is run in; you may want tests that
 
1534
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1535
            # going to a pipe or a StringIO.
 
1536
            'TERM': 'dumb',
 
1537
            'LINES': '25',
 
1538
            'COLUMNS': '80',
 
1539
            'BZR_COLUMNS': '80',
 
1540
            # SSH Agent
 
1541
            'SSH_AUTH_SOCK': None,
 
1542
            # Proxies
 
1543
            'http_proxy': None,
 
1544
            'HTTP_PROXY': None,
 
1545
            'https_proxy': None,
 
1546
            'HTTPS_PROXY': None,
 
1547
            'no_proxy': None,
 
1548
            'NO_PROXY': None,
 
1549
            'all_proxy': None,
 
1550
            'ALL_PROXY': None,
 
1551
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
 
1552
            # least. If you do (care), please update this comment
 
1553
            # -- vila 20080401
 
1554
            'ftp_proxy': None,
 
1555
            'FTP_PROXY': None,
 
1556
            'BZR_REMOTE_PATH': None,
 
1557
        }
 
1558
        self.__old_env = {}
 
1559
        self.addCleanup(self._restoreEnvironment)
 
1560
        for name, value in new_env.iteritems():
 
1561
            self._captureVar(name, value)
 
1562
 
 
1563
    def _captureVar(self, name, newvalue):
 
1564
        """Set an environment variable, and reset it when finished."""
 
1565
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1566
 
 
1567
    def _restore_debug_flags(self):
 
1568
        debug.debug_flags.clear()
 
1569
        debug.debug_flags.update(self._preserved_debug_flags)
 
1570
 
 
1571
    def _restoreEnvironment(self):
 
1572
        for name, value in self.__old_env.iteritems():
 
1573
            osutils.set_or_unset_env(name, value)
 
1574
 
 
1575
    def _restoreHooks(self):
 
1576
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1577
            setattr(klass, name, hooks)
 
1578
 
 
1579
    def knownFailure(self, reason):
 
1580
        """This test has failed for some known reason."""
 
1581
        raise KnownFailure(reason)
 
1582
 
 
1583
    def _do_skip(self, result, reason):
 
1584
        addSkip = getattr(result, 'addSkip', None)
 
1585
        if not callable(addSkip):
 
1586
            result.addSuccess(result)
 
1587
        else:
 
1588
            addSkip(self, reason)
 
1589
 
 
1590
    def _do_known_failure(self, result):
 
1591
        err = sys.exc_info()
 
1592
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1593
        if addExpectedFailure is not None:
 
1594
            addExpectedFailure(self, err)
 
1595
        else:
 
1596
            result.addSuccess(self)
 
1597
 
 
1598
    def _do_not_applicable(self, result, e):
 
1599
        if not e.args:
 
1600
            reason = 'No reason given'
 
1601
        else:
 
1602
            reason = e.args[0]
 
1603
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1604
        if addNotApplicable is not None:
 
1605
            result.addNotApplicable(self, reason)
 
1606
        else:
 
1607
            self._do_skip(result, reason)
 
1608
 
 
1609
    def _do_unsupported_or_skip(self, result, reason):
 
1610
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1611
        if addNotSupported is not None:
 
1612
            result.addNotSupported(self, reason)
 
1613
        else:
 
1614
            self._do_skip(result, reason)
 
1615
 
 
1616
    def run(self, result=None):
 
1617
        if result is None: result = self.defaultTestResult()
 
1618
        result.startTest(self)
 
1619
        try:
 
1620
            self._run(result)
 
1621
            return result
 
1622
        finally:
 
1623
            result.stopTest(self)
 
1624
 
 
1625
    def _run(self, result):
 
1626
        for feature in getattr(self, '_test_needs_features', []):
 
1627
            if not feature.available():
 
1628
                return self._do_unsupported_or_skip(result, feature)
 
1629
        try:
 
1630
            absent_attr = object()
 
1631
            # Python 2.5
 
1632
            method_name = getattr(self, '_testMethodName', absent_attr)
 
1633
            if method_name is absent_attr:
 
1634
                # Python 2.4
 
1635
                method_name = getattr(self, '_TestCase__testMethodName')
 
1636
            testMethod = getattr(self, method_name)
 
1637
            try:
 
1638
                try:
 
1639
                    self.setUp()
 
1640
                    if not self._bzr_test_setUp_run:
 
1641
                        self.fail(
 
1642
                            "test setUp did not invoke "
 
1643
                            "bzrlib.tests.TestCase's setUp")
 
1644
                except KeyboardInterrupt:
 
1645
                    self._runCleanups()
 
1646
                    raise
 
1647
                except KnownFailure:
 
1648
                    self._do_known_failure(result)
 
1649
                    self.tearDown()
 
1650
                    return
 
1651
                except TestNotApplicable, e:
 
1652
                    self._do_not_applicable(result, e)
 
1653
                    self.tearDown()
 
1654
                    return
 
1655
                except TestSkipped, e:
 
1656
                    self._do_skip(result, e.args[0])
 
1657
                    self.tearDown()
 
1658
                    return result
 
1659
                except UnavailableFeature, e:
 
1660
                    self._do_unsupported_or_skip(result, e.args[0])
 
1661
                    self.tearDown()
 
1662
                    return
 
1663
                except:
 
1664
                    result.addError(self, sys.exc_info())
 
1665
                    self._runCleanups()
 
1666
                    return result
 
1667
 
 
1668
                ok = False
 
1669
                try:
 
1670
                    testMethod()
 
1671
                    ok = True
 
1672
                except KnownFailure:
 
1673
                    self._do_known_failure(result)
 
1674
                except self.failureException:
 
1675
                    result.addFailure(self, sys.exc_info())
 
1676
                except TestNotApplicable, e:
 
1677
                    self._do_not_applicable(result, e)
 
1678
                except TestSkipped, e:
 
1679
                    if not e.args:
 
1680
                        reason = "No reason given."
 
1681
                    else:
 
1682
                        reason = e.args[0]
 
1683
                    self._do_skip(result, reason)
 
1684
                except UnavailableFeature, e:
 
1685
                    self._do_unsupported_or_skip(result, e.args[0])
 
1686
                except KeyboardInterrupt:
 
1687
                    self._runCleanups()
 
1688
                    raise
 
1689
                except:
 
1690
                    result.addError(self, sys.exc_info())
 
1691
 
 
1692
                try:
 
1693
                    self.tearDown()
 
1694
                    if not self._bzr_test_tearDown_run:
 
1695
                        self.fail(
 
1696
                            "test tearDown did not invoke "
 
1697
                            "bzrlib.tests.TestCase's tearDown")
 
1698
                except KeyboardInterrupt:
 
1699
                    self._runCleanups()
 
1700
                    raise
 
1701
                except:
 
1702
                    result.addError(self, sys.exc_info())
 
1703
                    self._runCleanups()
 
1704
                    ok = False
 
1705
                if ok: result.addSuccess(self)
 
1706
                return result
 
1707
            except KeyboardInterrupt:
 
1708
                self._runCleanups()
 
1709
                raise
 
1710
        finally:
 
1711
            saved_attrs = {}
 
1712
            for attr_name in self.attrs_to_keep:
 
1713
                if attr_name in self.__dict__:
 
1714
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1715
            self.__dict__ = saved_attrs
 
1716
 
 
1717
    def tearDown(self):
 
1718
        self._runCleanups()
 
1719
        self._log_contents = ''
 
1720
        self._bzr_test_tearDown_run = True
85
1721
        unittest.TestCase.tearDown(self)
86
1722
 
 
1723
    def time(self, callable, *args, **kwargs):
 
1724
        """Run callable and accrue the time it takes to the benchmark time.
 
1725
 
 
1726
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1727
        this will cause lsprofile statistics to be gathered and stored in
 
1728
        self._benchcalls.
 
1729
        """
 
1730
        if self._benchtime is None:
 
1731
            self._benchtime = 0
 
1732
        start = time.time()
 
1733
        try:
 
1734
            if not self._gather_lsprof_in_benchmarks:
 
1735
                return callable(*args, **kwargs)
 
1736
            else:
 
1737
                # record this benchmark
 
1738
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1739
                stats.sort()
 
1740
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1741
                return ret
 
1742
        finally:
 
1743
            self._benchtime += time.time() - start
 
1744
 
 
1745
    def _runCleanups(self):
 
1746
        """Run registered cleanup functions.
 
1747
 
 
1748
        This should only be called from TestCase.tearDown.
 
1749
        """
 
1750
        # TODO: Perhaps this should keep running cleanups even if
 
1751
        # one of them fails?
 
1752
 
 
1753
        # Actually pop the cleanups from the list so tearDown running
 
1754
        # twice is safe (this happens for skipped tests).
 
1755
        while self._cleanups:
 
1756
            cleanup, args, kwargs = self._cleanups.pop()
 
1757
            cleanup(*args, **kwargs)
87
1758
 
88
1759
    def log(self, *args):
89
 
        logging.debug(*args)
90
 
 
91
 
    def _get_log(self):
92
 
        """Return as a string the log for this test"""
93
 
        return open(self._log_file_name).read()
94
 
 
95
 
    def run_bzr(self, *args, **kwargs):
 
1760
        mutter(*args)
 
1761
 
 
1762
    def _get_log(self, keep_log_file=False):
 
1763
        """Get the log from bzrlib.trace calls from this test.
 
1764
 
 
1765
        :param keep_log_file: When True, if the log is still a file on disk
 
1766
            leave it as a file on disk. When False, if the log is still a file
 
1767
            on disk, the log file is deleted and the log preserved as
 
1768
            self._log_contents.
 
1769
        :return: A string containing the log.
 
1770
        """
 
1771
        # flush the log file, to get all content
 
1772
        import bzrlib.trace
 
1773
        if bzrlib.trace._trace_file:
 
1774
            bzrlib.trace._trace_file.flush()
 
1775
        if self._log_contents:
 
1776
            # XXX: this can hardly contain the content flushed above --vila
 
1777
            # 20080128
 
1778
            return self._log_contents
 
1779
        if self._log_file_name is not None:
 
1780
            logfile = open(self._log_file_name)
 
1781
            try:
 
1782
                log_contents = logfile.read()
 
1783
            finally:
 
1784
                logfile.close()
 
1785
            if not keep_log_file:
 
1786
                self._log_contents = log_contents
 
1787
                try:
 
1788
                    os.remove(self._log_file_name)
 
1789
                except OSError, e:
 
1790
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1791
                        sys.stderr.write(('Unable to delete log file '
 
1792
                                             ' %r\n' % self._log_file_name))
 
1793
                    else:
 
1794
                        raise
 
1795
            return log_contents
 
1796
        else:
 
1797
            return "DELETED log file to reduce memory footprint"
 
1798
 
 
1799
    def requireFeature(self, feature):
 
1800
        """This test requires a specific feature is available.
 
1801
 
 
1802
        :raises UnavailableFeature: When feature is not available.
 
1803
        """
 
1804
        if not feature.available():
 
1805
            raise UnavailableFeature(feature)
 
1806
 
 
1807
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1808
            working_dir):
 
1809
        """Run bazaar command line, splitting up a string command line."""
 
1810
        if isinstance(args, basestring):
 
1811
            # shlex don't understand unicode strings,
 
1812
            # so args should be plain string (bialix 20070906)
 
1813
            args = list(shlex.split(str(args)))
 
1814
        return self._run_bzr_core(args, retcode=retcode,
 
1815
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1816
                )
 
1817
 
 
1818
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1819
            working_dir):
 
1820
        # Clear chk_map page cache, because the contents are likely to mask
 
1821
        # locking errors.
 
1822
        chk_map.clear_cache()
 
1823
        if encoding is None:
 
1824
            encoding = osutils.get_user_encoding()
 
1825
        stdout = StringIOWrapper()
 
1826
        stderr = StringIOWrapper()
 
1827
        stdout.encoding = encoding
 
1828
        stderr.encoding = encoding
 
1829
 
 
1830
        self.log('run bzr: %r', args)
 
1831
        # FIXME: don't call into logging here
 
1832
        handler = logging.StreamHandler(stderr)
 
1833
        handler.setLevel(logging.INFO)
 
1834
        logger = logging.getLogger('')
 
1835
        logger.addHandler(handler)
 
1836
        old_ui_factory = ui.ui_factory
 
1837
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1838
 
 
1839
        cwd = None
 
1840
        if working_dir is not None:
 
1841
            cwd = osutils.getcwd()
 
1842
            os.chdir(working_dir)
 
1843
 
 
1844
        try:
 
1845
            try:
 
1846
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1847
                    stdout, stderr,
 
1848
                    bzrlib.commands.run_bzr_catch_user_errors,
 
1849
                    args)
 
1850
            except KeyboardInterrupt:
 
1851
                # Reraise KeyboardInterrupt with contents of redirected stdout
 
1852
                # and stderr as arguments, for tests which are interested in
 
1853
                # stdout and stderr and are expecting the exception.
 
1854
                out = stdout.getvalue()
 
1855
                err = stderr.getvalue()
 
1856
                if out:
 
1857
                    self.log('output:\n%r', out)
 
1858
                if err:
 
1859
                    self.log('errors:\n%r', err)
 
1860
                raise KeyboardInterrupt(out, err)
 
1861
        finally:
 
1862
            logger.removeHandler(handler)
 
1863
            ui.ui_factory = old_ui_factory
 
1864
            if cwd is not None:
 
1865
                os.chdir(cwd)
 
1866
 
 
1867
        out = stdout.getvalue()
 
1868
        err = stderr.getvalue()
 
1869
        if out:
 
1870
            self.log('output:\n%r', out)
 
1871
        if err:
 
1872
            self.log('errors:\n%r', err)
 
1873
        if retcode is not None:
 
1874
            self.assertEquals(retcode, result,
 
1875
                              message='Unexpected return code')
 
1876
        return result, out, err
 
1877
 
 
1878
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1879
                working_dir=None, error_regexes=[], output_encoding=None):
96
1880
        """Invoke bzr, as if it were run from the command line.
97
1881
 
 
1882
        The argument list should not include the bzr program name - the
 
1883
        first argument is normally the bzr command.  Arguments may be
 
1884
        passed in three ways:
 
1885
 
 
1886
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1887
        when the command contains whitespace or metacharacters, or
 
1888
        is built up at run time.
 
1889
 
 
1890
        2- A single string, eg "add a".  This is the most convenient
 
1891
        for hardcoded commands.
 
1892
 
 
1893
        This runs bzr through the interface that catches and reports
 
1894
        errors, and with logging set to something approximating the
 
1895
        default, so that error reporting can be checked.
 
1896
 
98
1897
        This should be the main method for tests that want to exercise the
99
1898
        overall behavior of the bzr application (rather than a unit test
100
1899
        or a functional test of the library.)
101
1900
 
102
 
        Much of the old code runs bzr by forking a new copy of Python, but
103
 
        that is slower, harder to debug, and generally not necessary.
104
 
        """
105
 
        retcode = kwargs.get('retcode', 0)
106
 
        result = self.apply_redirected(None, None, None,
107
 
                                       bzrlib.commands.run_bzr, args)
108
 
        self.assertEquals(result, retcode)
109
 
        
110
 
        
 
1901
        This sends the stdout/stderr results into the test's log,
 
1902
        where it may be useful for debugging.  See also run_captured.
 
1903
 
 
1904
        :keyword stdin: A string to be used as stdin for the command.
 
1905
        :keyword retcode: The status code the command should return;
 
1906
            default 0.
 
1907
        :keyword working_dir: The directory to run the command in
 
1908
        :keyword error_regexes: A list of expected error messages.  If
 
1909
            specified they must be seen in the error output of the command.
 
1910
        """
 
1911
        retcode, out, err = self._run_bzr_autosplit(
 
1912
            args=args,
 
1913
            retcode=retcode,
 
1914
            encoding=encoding,
 
1915
            stdin=stdin,
 
1916
            working_dir=working_dir,
 
1917
            )
 
1918
        self.assertIsInstance(error_regexes, (list, tuple))
 
1919
        for regex in error_regexes:
 
1920
            self.assertContainsRe(err, regex)
 
1921
        return out, err
 
1922
 
 
1923
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1924
        """Run bzr, and check that stderr contains the supplied regexes
 
1925
 
 
1926
        :param error_regexes: Sequence of regular expressions which
 
1927
            must each be found in the error output. The relative ordering
 
1928
            is not enforced.
 
1929
        :param args: command-line arguments for bzr
 
1930
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1931
            This function changes the default value of retcode to be 3,
 
1932
            since in most cases this is run when you expect bzr to fail.
 
1933
 
 
1934
        :return: (out, err) The actual output of running the command (in case
 
1935
            you want to do more inspection)
 
1936
 
 
1937
        Examples of use::
 
1938
 
 
1939
            # Make sure that commit is failing because there is nothing to do
 
1940
            self.run_bzr_error(['no changes to commit'],
 
1941
                               ['commit', '-m', 'my commit comment'])
 
1942
            # Make sure --strict is handling an unknown file, rather than
 
1943
            # giving us the 'nothing to do' error
 
1944
            self.build_tree(['unknown'])
 
1945
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1946
                               ['commit', --strict', '-m', 'my commit comment'])
 
1947
        """
 
1948
        kwargs.setdefault('retcode', 3)
 
1949
        kwargs['error_regexes'] = error_regexes
 
1950
        out, err = self.run_bzr(*args, **kwargs)
 
1951
        return out, err
 
1952
 
 
1953
    def run_bzr_subprocess(self, *args, **kwargs):
 
1954
        """Run bzr in a subprocess for testing.
 
1955
 
 
1956
        This starts a new Python interpreter and runs bzr in there.
 
1957
        This should only be used for tests that have a justifiable need for
 
1958
        this isolation: e.g. they are testing startup time, or signal
 
1959
        handling, or early startup code, etc.  Subprocess code can't be
 
1960
        profiled or debugged so easily.
 
1961
 
 
1962
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1963
            None is supplied, the status code is not checked.
 
1964
        :keyword env_changes: A dictionary which lists changes to environment
 
1965
            variables. A value of None will unset the env variable.
 
1966
            The values must be strings. The change will only occur in the
 
1967
            child, so you don't need to fix the environment after running.
 
1968
        :keyword universal_newlines: Convert CRLF => LF
 
1969
        :keyword allow_plugins: By default the subprocess is run with
 
1970
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1971
            for system-wide plugins to create unexpected output on stderr,
 
1972
            which can cause unnecessary test failures.
 
1973
        """
 
1974
        env_changes = kwargs.get('env_changes', {})
 
1975
        working_dir = kwargs.get('working_dir', None)
 
1976
        allow_plugins = kwargs.get('allow_plugins', False)
 
1977
        if len(args) == 1:
 
1978
            if isinstance(args[0], list):
 
1979
                args = args[0]
 
1980
            elif isinstance(args[0], basestring):
 
1981
                args = list(shlex.split(args[0]))
 
1982
        else:
 
1983
            raise ValueError("passing varargs to run_bzr_subprocess")
 
1984
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1985
                                            working_dir=working_dir,
 
1986
                                            allow_plugins=allow_plugins)
 
1987
        # We distinguish between retcode=None and retcode not passed.
 
1988
        supplied_retcode = kwargs.get('retcode', 0)
 
1989
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1990
            universal_newlines=kwargs.get('universal_newlines', False),
 
1991
            process_args=args)
 
1992
 
 
1993
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1994
                             skip_if_plan_to_signal=False,
 
1995
                             working_dir=None,
 
1996
                             allow_plugins=False):
 
1997
        """Start bzr in a subprocess for testing.
 
1998
 
 
1999
        This starts a new Python interpreter and runs bzr in there.
 
2000
        This should only be used for tests that have a justifiable need for
 
2001
        this isolation: e.g. they are testing startup time, or signal
 
2002
        handling, or early startup code, etc.  Subprocess code can't be
 
2003
        profiled or debugged so easily.
 
2004
 
 
2005
        :param process_args: a list of arguments to pass to the bzr executable,
 
2006
            for example ``['--version']``.
 
2007
        :param env_changes: A dictionary which lists changes to environment
 
2008
            variables. A value of None will unset the env variable.
 
2009
            The values must be strings. The change will only occur in the
 
2010
            child, so you don't need to fix the environment after running.
 
2011
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
2012
            is not available.
 
2013
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
2014
 
 
2015
        :returns: Popen object for the started process.
 
2016
        """
 
2017
        if skip_if_plan_to_signal:
 
2018
            if not getattr(os, 'kill', None):
 
2019
                raise TestSkipped("os.kill not available.")
 
2020
 
 
2021
        if env_changes is None:
 
2022
            env_changes = {}
 
2023
        old_env = {}
 
2024
 
 
2025
        def cleanup_environment():
 
2026
            for env_var, value in env_changes.iteritems():
 
2027
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
2028
 
 
2029
        def restore_environment():
 
2030
            for env_var, value in old_env.iteritems():
 
2031
                osutils.set_or_unset_env(env_var, value)
 
2032
 
 
2033
        bzr_path = self.get_bzr_path()
 
2034
 
 
2035
        cwd = None
 
2036
        if working_dir is not None:
 
2037
            cwd = osutils.getcwd()
 
2038
            os.chdir(working_dir)
 
2039
 
 
2040
        try:
 
2041
            # win32 subprocess doesn't support preexec_fn
 
2042
            # so we will avoid using it on all platforms, just to
 
2043
            # make sure the code path is used, and we don't break on win32
 
2044
            cleanup_environment()
 
2045
            command = [sys.executable]
 
2046
            # frozen executables don't need the path to bzr
 
2047
            if getattr(sys, "frozen", None) is None:
 
2048
                command.append(bzr_path)
 
2049
            if not allow_plugins:
 
2050
                command.append('--no-plugins')
 
2051
            command.extend(process_args)
 
2052
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
2053
        finally:
 
2054
            restore_environment()
 
2055
            if cwd is not None:
 
2056
                os.chdir(cwd)
 
2057
 
 
2058
        return process
 
2059
 
 
2060
    def _popen(self, *args, **kwargs):
 
2061
        """Place a call to Popen.
 
2062
 
 
2063
        Allows tests to override this method to intercept the calls made to
 
2064
        Popen for introspection.
 
2065
        """
 
2066
        return Popen(*args, **kwargs)
 
2067
 
 
2068
    def get_source_path(self):
 
2069
        """Return the path of the directory containing bzrlib."""
 
2070
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2071
 
 
2072
    def get_bzr_path(self):
 
2073
        """Return the path of the 'bzr' executable for this test suite."""
 
2074
        bzr_path = self.get_source_path()+'/bzr'
 
2075
        if not os.path.isfile(bzr_path):
 
2076
            # We are probably installed. Assume sys.argv is the right file
 
2077
            bzr_path = sys.argv[0]
 
2078
        return bzr_path
 
2079
 
 
2080
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
2081
                              universal_newlines=False, process_args=None):
 
2082
        """Finish the execution of process.
 
2083
 
 
2084
        :param process: the Popen object returned from start_bzr_subprocess.
 
2085
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
2086
            None is supplied, the status code is not checked.
 
2087
        :param send_signal: an optional signal to send to the process.
 
2088
        :param universal_newlines: Convert CRLF => LF
 
2089
        :returns: (stdout, stderr)
 
2090
        """
 
2091
        if send_signal is not None:
 
2092
            os.kill(process.pid, send_signal)
 
2093
        out, err = process.communicate()
 
2094
 
 
2095
        if universal_newlines:
 
2096
            out = out.replace('\r\n', '\n')
 
2097
            err = err.replace('\r\n', '\n')
 
2098
 
 
2099
        if retcode is not None and retcode != process.returncode:
 
2100
            if process_args is None:
 
2101
                process_args = "(unknown args)"
 
2102
            mutter('Output of bzr %s:\n%s', process_args, out)
 
2103
            mutter('Error for bzr %s:\n%s', process_args, err)
 
2104
            self.fail('Command bzr %s failed with retcode %s != %s'
 
2105
                      % (process_args, retcode, process.returncode))
 
2106
        return [out, err]
 
2107
 
111
2108
    def check_inventory_shape(self, inv, shape):
112
 
        """
113
 
        Compare an inventory to a list of expected names.
 
2109
        """Compare an inventory to a list of expected names.
114
2110
 
115
2111
        Fail if they are not precisely equal.
116
2112
        """
118
2114
        shape = list(shape)             # copy
119
2115
        for path, ie in inv.entries():
120
2116
            name = path.replace('\\', '/')
121
 
            if ie.kind == 'dir':
 
2117
            if ie.kind == 'directory':
122
2118
                name = name + '/'
123
2119
            if name in shape:
124
2120
                shape.remove(name)
134
2130
        """Call callable with redirected std io pipes.
135
2131
 
136
2132
        Returns the return code."""
137
 
        from StringIO import StringIO
138
2133
        if not callable(a_callable):
139
2134
            raise ValueError("a_callable must be callable.")
140
2135
        if stdin is None:
141
2136
            stdin = StringIO("")
142
2137
        if stdout is None:
143
 
            stdout = self._log_file
 
2138
            if getattr(self, "_log_file", None) is not None:
 
2139
                stdout = self._log_file
 
2140
            else:
 
2141
                stdout = StringIO()
144
2142
        if stderr is None:
145
 
            stderr = self._log_file
 
2143
            if getattr(self, "_log_file", None is not None):
 
2144
                stderr = self._log_file
 
2145
            else:
 
2146
                stderr = StringIO()
146
2147
        real_stdin = sys.stdin
147
2148
        real_stdout = sys.stdout
148
2149
        real_stderr = sys.stderr
156
2157
            sys.stderr = real_stderr
157
2158
            sys.stdin = real_stdin
158
2159
 
159
 
 
160
 
BzrTestBase = TestCase
161
 
 
162
 
     
163
 
class TestCaseInTempDir(TestCase):
 
2160
    def reduceLockdirTimeout(self):
 
2161
        """Reduce the default lock timeout for the duration of the test, so that
 
2162
        if LockContention occurs during a test, it does so quickly.
 
2163
 
 
2164
        Tests that expect to provoke LockContention errors should call this.
 
2165
        """
 
2166
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
2167
        def resetTimeout():
 
2168
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
2169
        self.addCleanup(resetTimeout)
 
2170
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2171
 
 
2172
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
2173
        """Return a StringIOWrapper instance, that will encode Unicode
 
2174
        input to UTF-8.
 
2175
        """
 
2176
        if encoding_type is None:
 
2177
            encoding_type = 'strict'
 
2178
        sio = StringIO()
 
2179
        output_encoding = 'utf-8'
 
2180
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
2181
        sio.encoding = output_encoding
 
2182
        return sio
 
2183
 
 
2184
    def disable_verb(self, verb):
 
2185
        """Disable a smart server verb for one test."""
 
2186
        from bzrlib.smart import request
 
2187
        request_handlers = request.request_handlers
 
2188
        orig_method = request_handlers.get(verb)
 
2189
        request_handlers.remove(verb)
 
2190
        def restoreVerb():
 
2191
            request_handlers.register(verb, orig_method)
 
2192
        self.addCleanup(restoreVerb)
 
2193
 
 
2194
 
 
2195
class CapturedCall(object):
 
2196
    """A helper for capturing smart server calls for easy debug analysis."""
 
2197
 
 
2198
    def __init__(self, params, prefix_length):
 
2199
        """Capture the call with params and skip prefix_length stack frames."""
 
2200
        self.call = params
 
2201
        import traceback
 
2202
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
2203
        # client frames. Beyond this we could get more clever, but this is good
 
2204
        # enough for now.
 
2205
        stack = traceback.extract_stack()[prefix_length:-5]
 
2206
        self.stack = ''.join(traceback.format_list(stack))
 
2207
 
 
2208
    def __str__(self):
 
2209
        return self.call.method
 
2210
 
 
2211
    def __repr__(self):
 
2212
        return self.call.method
 
2213
 
 
2214
    def stack(self):
 
2215
        return self.stack
 
2216
 
 
2217
 
 
2218
class TestCaseWithMemoryTransport(TestCase):
 
2219
    """Common test class for tests that do not need disk resources.
 
2220
 
 
2221
    Tests that need disk resources should derive from TestCaseWithTransport.
 
2222
 
 
2223
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
2224
 
 
2225
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
2226
    a directory which does not exist. This serves to help ensure test isolation
 
2227
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
2228
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
2229
    file defaults for the transport in tests, nor does it obey the command line
 
2230
    override, so tests that accidentally write to the common directory should
 
2231
    be rare.
 
2232
 
 
2233
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
2234
    a .bzr directory that stops us ascending higher into the filesystem.
 
2235
    """
 
2236
 
 
2237
    TEST_ROOT = None
 
2238
    _TEST_NAME = 'test'
 
2239
 
 
2240
    def __init__(self, methodName='runTest'):
 
2241
        # allow test parameterization after test construction and before test
 
2242
        # execution. Variables that the parameterizer sets need to be
 
2243
        # ones that are not set by setUp, or setUp will trash them.
 
2244
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
2245
        self.vfs_transport_factory = default_transport
 
2246
        self.transport_server = None
 
2247
        self.transport_readonly_server = None
 
2248
        self.__vfs_server = None
 
2249
 
 
2250
    def get_transport(self, relpath=None):
 
2251
        """Return a writeable transport.
 
2252
 
 
2253
        This transport is for the test scratch space relative to
 
2254
        "self._test_root"
 
2255
 
 
2256
        :param relpath: a path relative to the base url.
 
2257
        """
 
2258
        t = get_transport(self.get_url(relpath))
 
2259
        self.assertFalse(t.is_readonly())
 
2260
        return t
 
2261
 
 
2262
    def get_readonly_transport(self, relpath=None):
 
2263
        """Return a readonly transport for the test scratch space
 
2264
 
 
2265
        This can be used to test that operations which should only need
 
2266
        readonly access in fact do not try to write.
 
2267
 
 
2268
        :param relpath: a path relative to the base url.
 
2269
        """
 
2270
        t = get_transport(self.get_readonly_url(relpath))
 
2271
        self.assertTrue(t.is_readonly())
 
2272
        return t
 
2273
 
 
2274
    def create_transport_readonly_server(self):
 
2275
        """Create a transport server from class defined at init.
 
2276
 
 
2277
        This is mostly a hook for daughter classes.
 
2278
        """
 
2279
        return self.transport_readonly_server()
 
2280
 
 
2281
    def get_readonly_server(self):
 
2282
        """Get the server instance for the readonly transport
 
2283
 
 
2284
        This is useful for some tests with specific servers to do diagnostics.
 
2285
        """
 
2286
        if self.__readonly_server is None:
 
2287
            if self.transport_readonly_server is None:
 
2288
                # readonly decorator requested
 
2289
                self.__readonly_server = ReadonlyServer()
 
2290
            else:
 
2291
                # explicit readonly transport.
 
2292
                self.__readonly_server = self.create_transport_readonly_server()
 
2293
            self.start_server(self.__readonly_server,
 
2294
                self.get_vfs_only_server())
 
2295
        return self.__readonly_server
 
2296
 
 
2297
    def get_readonly_url(self, relpath=None):
 
2298
        """Get a URL for the readonly transport.
 
2299
 
 
2300
        This will either be backed by '.' or a decorator to the transport
 
2301
        used by self.get_url()
 
2302
        relpath provides for clients to get a path relative to the base url.
 
2303
        These should only be downwards relative, not upwards.
 
2304
        """
 
2305
        base = self.get_readonly_server().get_url()
 
2306
        return self._adjust_url(base, relpath)
 
2307
 
 
2308
    def get_vfs_only_server(self):
 
2309
        """Get the vfs only read/write server instance.
 
2310
 
 
2311
        This is useful for some tests with specific servers that need
 
2312
        diagnostics.
 
2313
 
 
2314
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
2315
        is no means to override it.
 
2316
        """
 
2317
        if self.__vfs_server is None:
 
2318
            self.__vfs_server = MemoryServer()
 
2319
            self.start_server(self.__vfs_server)
 
2320
        return self.__vfs_server
 
2321
 
 
2322
    def get_server(self):
 
2323
        """Get the read/write server instance.
 
2324
 
 
2325
        This is useful for some tests with specific servers that need
 
2326
        diagnostics.
 
2327
 
 
2328
        This is built from the self.transport_server factory. If that is None,
 
2329
        then the self.get_vfs_server is returned.
 
2330
        """
 
2331
        if self.__server is None:
 
2332
            if (self.transport_server is None or self.transport_server is
 
2333
                self.vfs_transport_factory):
 
2334
                self.__server = self.get_vfs_only_server()
 
2335
            else:
 
2336
                # bring up a decorated means of access to the vfs only server.
 
2337
                self.__server = self.transport_server()
 
2338
                self.start_server(self.__server, self.get_vfs_only_server())
 
2339
        return self.__server
 
2340
 
 
2341
    def _adjust_url(self, base, relpath):
 
2342
        """Get a URL (or maybe a path) for the readwrite transport.
 
2343
 
 
2344
        This will either be backed by '.' or to an equivalent non-file based
 
2345
        facility.
 
2346
        relpath provides for clients to get a path relative to the base url.
 
2347
        These should only be downwards relative, not upwards.
 
2348
        """
 
2349
        if relpath is not None and relpath != '.':
 
2350
            if not base.endswith('/'):
 
2351
                base = base + '/'
 
2352
            # XXX: Really base should be a url; we did after all call
 
2353
            # get_url()!  But sometimes it's just a path (from
 
2354
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
2355
            # to a non-escaped local path.
 
2356
            if base.startswith('./') or base.startswith('/'):
 
2357
                base += relpath
 
2358
            else:
 
2359
                base += urlutils.escape(relpath)
 
2360
        return base
 
2361
 
 
2362
    def get_url(self, relpath=None):
 
2363
        """Get a URL (or maybe a path) for the readwrite transport.
 
2364
 
 
2365
        This will either be backed by '.' or to an equivalent non-file based
 
2366
        facility.
 
2367
        relpath provides for clients to get a path relative to the base url.
 
2368
        These should only be downwards relative, not upwards.
 
2369
        """
 
2370
        base = self.get_server().get_url()
 
2371
        return self._adjust_url(base, relpath)
 
2372
 
 
2373
    def get_vfs_only_url(self, relpath=None):
 
2374
        """Get a URL (or maybe a path for the plain old vfs transport.
 
2375
 
 
2376
        This will never be a smart protocol.  It always has all the
 
2377
        capabilities of the local filesystem, but it might actually be a
 
2378
        MemoryTransport or some other similar virtual filesystem.
 
2379
 
 
2380
        This is the backing transport (if any) of the server returned by
 
2381
        get_url and get_readonly_url.
 
2382
 
 
2383
        :param relpath: provides for clients to get a path relative to the base
 
2384
            url.  These should only be downwards relative, not upwards.
 
2385
        :return: A URL
 
2386
        """
 
2387
        base = self.get_vfs_only_server().get_url()
 
2388
        return self._adjust_url(base, relpath)
 
2389
 
 
2390
    def _create_safety_net(self):
 
2391
        """Make a fake bzr directory.
 
2392
 
 
2393
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2394
        real branch.
 
2395
        """
 
2396
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2397
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2398
 
 
2399
    def _check_safety_net(self):
 
2400
        """Check that the safety .bzr directory have not been touched.
 
2401
 
 
2402
        _make_test_root have created a .bzr directory to prevent tests from
 
2403
        propagating. This method ensures than a test did not leaked.
 
2404
        """
 
2405
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2406
        self.permit_url(get_transport(root).base)
 
2407
        wt = workingtree.WorkingTree.open(root)
 
2408
        last_rev = wt.last_revision()
 
2409
        if last_rev != 'null:':
 
2410
            # The current test have modified the /bzr directory, we need to
 
2411
            # recreate a new one or all the followng tests will fail.
 
2412
            # If you need to inspect its content uncomment the following line
 
2413
            # import pdb; pdb.set_trace()
 
2414
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2415
            self._create_safety_net()
 
2416
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2417
 
 
2418
    def _make_test_root(self):
 
2419
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2420
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2421
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2422
                                                    suffix='.tmp'))
 
2423
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2424
 
 
2425
            self._create_safety_net()
 
2426
 
 
2427
            # The same directory is used by all tests, and we're not
 
2428
            # specifically told when all tests are finished.  This will do.
 
2429
            atexit.register(_rmtree_temp_dir, root)
 
2430
 
 
2431
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2432
        self.addCleanup(self._check_safety_net)
 
2433
 
 
2434
    def makeAndChdirToTestDir(self):
 
2435
        """Create a temporary directories for this one test.
 
2436
 
 
2437
        This must set self.test_home_dir and self.test_dir and chdir to
 
2438
        self.test_dir.
 
2439
 
 
2440
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
2441
        """
 
2442
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2443
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
2444
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2445
        self.permit_dir(self.test_dir)
 
2446
 
 
2447
    def make_branch(self, relpath, format=None):
 
2448
        """Create a branch on the transport at relpath."""
 
2449
        repo = self.make_repository(relpath, format=format)
 
2450
        return repo.bzrdir.create_branch()
 
2451
 
 
2452
    def make_bzrdir(self, relpath, format=None):
 
2453
        try:
 
2454
            # might be a relative or absolute path
 
2455
            maybe_a_url = self.get_url(relpath)
 
2456
            segments = maybe_a_url.rsplit('/', 1)
 
2457
            t = get_transport(maybe_a_url)
 
2458
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
2459
                t.ensure_base()
 
2460
            if format is None:
 
2461
                format = 'default'
 
2462
            if isinstance(format, basestring):
 
2463
                format = bzrdir.format_registry.make_bzrdir(format)
 
2464
            return format.initialize_on_transport(t)
 
2465
        except errors.UninitializableFormat:
 
2466
            raise TestSkipped("Format %s is not initializable." % format)
 
2467
 
 
2468
    def make_repository(self, relpath, shared=False, format=None):
 
2469
        """Create a repository on our default transport at relpath.
 
2470
 
 
2471
        Note that relpath must be a relative path, not a full url.
 
2472
        """
 
2473
        # FIXME: If you create a remoterepository this returns the underlying
 
2474
        # real format, which is incorrect.  Actually we should make sure that
 
2475
        # RemoteBzrDir returns a RemoteRepository.
 
2476
        # maybe  mbp 20070410
 
2477
        made_control = self.make_bzrdir(relpath, format=format)
 
2478
        return made_control.create_repository(shared=shared)
 
2479
 
 
2480
    def make_smart_server(self, path):
 
2481
        smart_server = server.SmartTCPServer_for_testing()
 
2482
        self.start_server(smart_server, self.get_server())
 
2483
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2484
        return remote_transport
 
2485
 
 
2486
    def make_branch_and_memory_tree(self, relpath, format=None):
 
2487
        """Create a branch on the default transport and a MemoryTree for it."""
 
2488
        b = self.make_branch(relpath, format=format)
 
2489
        return memorytree.MemoryTree.create_on_branch(b)
 
2490
 
 
2491
    def make_branch_builder(self, relpath, format=None):
 
2492
        branch = self.make_branch(relpath, format=format)
 
2493
        return branchbuilder.BranchBuilder(branch=branch)
 
2494
 
 
2495
    def overrideEnvironmentForTesting(self):
 
2496
        test_home_dir = self.test_home_dir
 
2497
        if isinstance(test_home_dir, unicode):
 
2498
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
 
2499
        os.environ['HOME'] = test_home_dir
 
2500
        os.environ['BZR_HOME'] = test_home_dir
 
2501
 
 
2502
    def setUp(self):
 
2503
        super(TestCaseWithMemoryTransport, self).setUp()
 
2504
        self._make_test_root()
 
2505
        _currentdir = os.getcwdu()
 
2506
        def _leaveDirectory():
 
2507
            os.chdir(_currentdir)
 
2508
        self.addCleanup(_leaveDirectory)
 
2509
        self.makeAndChdirToTestDir()
 
2510
        self.overrideEnvironmentForTesting()
 
2511
        self.__readonly_server = None
 
2512
        self.__server = None
 
2513
        self.reduceLockdirTimeout()
 
2514
 
 
2515
    def setup_smart_server_with_call_log(self):
 
2516
        """Sets up a smart server as the transport server with a call log."""
 
2517
        self.transport_server = server.SmartTCPServer_for_testing
 
2518
        self.hpss_calls = []
 
2519
        import traceback
 
2520
        # Skip the current stack down to the caller of
 
2521
        # setup_smart_server_with_call_log
 
2522
        prefix_length = len(traceback.extract_stack()) - 2
 
2523
        def capture_hpss_call(params):
 
2524
            self.hpss_calls.append(
 
2525
                CapturedCall(params, prefix_length))
 
2526
        client._SmartClient.hooks.install_named_hook(
 
2527
            'call', capture_hpss_call, None)
 
2528
 
 
2529
    def reset_smart_call_log(self):
 
2530
        self.hpss_calls = []
 
2531
 
 
2532
 
 
2533
class TestCaseInTempDir(TestCaseWithMemoryTransport):
164
2534
    """Derived class that runs a test within a temporary directory.
165
2535
 
166
2536
    This is useful for tests that need to create a branch, etc.
170
2540
    All test cases create their own directory within that.  If the
171
2541
    tests complete successfully, the directory is removed.
172
2542
 
173
 
    InTempDir is an old alias for FunctionalTestCase.
 
2543
    :ivar test_base_dir: The path of the top-level directory for this
 
2544
    test, which contains a home directory and a work directory.
 
2545
 
 
2546
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
2547
    which is used as $HOME for this test.
 
2548
 
 
2549
    :ivar test_dir: A directory under test_base_dir used as the current
 
2550
    directory when the test proper is run.
174
2551
    """
175
2552
 
176
 
    TEST_ROOT = None
177
 
    _TEST_NAME = 'test'
178
2553
    OVERRIDE_PYTHON = 'python'
179
2554
 
180
2555
    def check_file_contents(self, filename, expect):
183
2558
        if contents != expect:
184
2559
            self.log("expected: %r" % expect)
185
2560
            self.log("actually: %r" % contents)
186
 
            self.fail("contents of %s not as expected")
187
 
 
188
 
    def _make_test_root(self):
189
 
        if TestCaseInTempDir.TEST_ROOT is not None:
190
 
            return
191
 
        i = 0
192
 
        while True:
193
 
            root = 'test%04d.tmp' % i
194
 
            try:
195
 
                os.mkdir(root)
196
 
            except OSError, e:
197
 
                if e.errno == errno.EEXIST:
198
 
                    i += 1
199
 
                    continue
200
 
                else:
201
 
                    raise
202
 
            # successfully created
203
 
            TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
204
 
            break
205
 
        # make a fake bzr directory there to prevent any tests propagating
206
 
        # up onto the source directory's real branch
207
 
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
208
 
 
209
 
    def setUp(self):
210
 
        super(TestCaseInTempDir, self).setUp()
211
 
        import os
212
 
        self._make_test_root()
213
 
        self._currentdir = os.getcwdu()
214
 
        short_id = self.id().replace('bzrlib.selftest.', '') \
215
 
                   .replace('__main__.', '')
216
 
        self.test_dir = os.path.join(self.TEST_ROOT, short_id)
 
2561
            self.fail("contents of %s not as expected" % filename)
 
2562
 
 
2563
    def _getTestDirPrefix(self):
 
2564
        # create a directory within the top level test directory
 
2565
        if sys.platform in ('win32', 'cygwin'):
 
2566
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2567
            # windows is likely to have path-length limits so use a short name
 
2568
            name_prefix = name_prefix[-30:]
 
2569
        else:
 
2570
            name_prefix = re.sub('[/]', '_', self.id())
 
2571
        return name_prefix
 
2572
 
 
2573
    def makeAndChdirToTestDir(self):
 
2574
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2575
 
 
2576
        For TestCaseInTempDir we create a temporary directory based on the test
 
2577
        name and then create two subdirs - test and home under it.
 
2578
        """
 
2579
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2580
            self._getTestDirPrefix())
 
2581
        name = name_prefix
 
2582
        for i in range(100):
 
2583
            if os.path.exists(name):
 
2584
                name = name_prefix + '_' + str(i)
 
2585
            else:
 
2586
                # now create test and home directories within this dir
 
2587
                self.test_base_dir = name
 
2588
                self.addCleanup(self.deleteTestDir)
 
2589
                os.mkdir(self.test_base_dir)
 
2590
                break
 
2591
        self.permit_dir(self.test_base_dir)
 
2592
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2593
        # stacking policy to honour; create a bzr dir with an unshared
 
2594
        # repository (but not a branch - our code would be trying to escape
 
2595
        # then!) to stop them, and permit it to be read.
 
2596
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2597
        # control.create_repository()
 
2598
        self.test_home_dir = self.test_base_dir + '/home'
 
2599
        os.mkdir(self.test_home_dir)
 
2600
        self.test_dir = self.test_base_dir + '/work'
217
2601
        os.mkdir(self.test_dir)
218
2602
        os.chdir(self.test_dir)
219
 
        
220
 
    def tearDown(self):
221
 
        import os
222
 
        os.chdir(self._currentdir)
223
 
        super(TestCaseInTempDir, self).tearDown()
224
 
 
225
 
    def _formcmd(self, cmd):
226
 
        if isinstance(cmd, basestring):
227
 
            cmd = cmd.split()
228
 
        if cmd[0] == 'bzr':
229
 
            cmd[0] = self.BZRPATH
230
 
            if self.OVERRIDE_PYTHON:
231
 
                cmd.insert(0, self.OVERRIDE_PYTHON)
232
 
        self.log('$ %r' % cmd)
233
 
        return cmd
234
 
 
235
 
    def runcmd(self, cmd, retcode=0):
236
 
        """Run one command and check the return code.
237
 
 
238
 
        Returns a tuple of (stdout,stderr) strings.
239
 
 
240
 
        If a single string is based, it is split into words.
241
 
        For commands that are not simple space-separated words, please
242
 
        pass a list instead."""
243
 
        cmd = self._formcmd(cmd)
244
 
        self.log('$ ' + ' '.join(cmd))
245
 
        actual_retcode = subprocess.call(cmd, stdout=self._log_file,
246
 
                                         stderr=self._log_file)
247
 
        if retcode != actual_retcode:
248
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
249
 
                                % (cmd, actual_retcode, retcode))
250
 
 
251
 
    def backtick(self, cmd, retcode=0):
252
 
        """Run a command and return its output"""
253
 
        cmd = self._formcmd(cmd)
254
 
        child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=self._log_file)
255
 
        outd, errd = child.communicate()
256
 
        self.log(outd)
257
 
        actual_retcode = child.wait()
258
 
 
259
 
        outd = outd.replace('\r', '')
260
 
 
261
 
        if retcode != actual_retcode:
262
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
263
 
                                % (cmd, actual_retcode, retcode))
264
 
 
265
 
        return outd
266
 
 
267
 
 
268
 
 
269
 
    def build_tree(self, shape):
 
2603
        # put name of test inside
 
2604
        f = file(self.test_base_dir + '/name', 'w')
 
2605
        try:
 
2606
            f.write(self.id())
 
2607
        finally:
 
2608
            f.close()
 
2609
 
 
2610
    def deleteTestDir(self):
 
2611
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
2612
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2613
 
 
2614
    def build_tree(self, shape, line_endings='binary', transport=None):
270
2615
        """Build a test tree according to a pattern.
271
2616
 
272
2617
        shape is a sequence of file specifications.  If the final
273
2618
        character is '/', a directory is created.
274
2619
 
 
2620
        This assumes that all the elements in the tree being built are new.
 
2621
 
275
2622
        This doesn't add anything to a branch.
 
2623
 
 
2624
        :type shape:    list or tuple.
 
2625
        :param line_endings: Either 'binary' or 'native'
 
2626
            in binary mode, exact contents are written in native mode, the
 
2627
            line endings match the default platform endings.
 
2628
        :param transport: A transport to write to, for building trees on VFS's.
 
2629
            If the transport is readonly or None, "." is opened automatically.
 
2630
        :return: None
276
2631
        """
277
 
        # XXX: It's OK to just create them using forward slashes on windows?
278
 
        import os
 
2632
        if type(shape) not in (list, tuple):
 
2633
            raise AssertionError("Parameter 'shape' should be "
 
2634
                "a list or a tuple. Got %r instead" % (shape,))
 
2635
        # It's OK to just create them using forward slashes on windows.
 
2636
        if transport is None or transport.is_readonly():
 
2637
            transport = get_transport(".")
279
2638
        for name in shape:
280
 
            assert isinstance(name, basestring)
 
2639
            self.assertIsInstance(name, basestring)
281
2640
            if name[-1] == '/':
282
 
                os.mkdir(name[:-1])
283
 
            else:
284
 
                f = file(name, 'wt')
285
 
                print >>f, "contents of", name
286
 
                f.close()
287
 
                
288
 
 
289
 
 
290
 
class MetaTestLog(TestCase):
291
 
    def test_logging(self):
292
 
        """Test logs are captured when a test fails."""
293
 
        logging.info('an info message')
294
 
        warning('something looks dodgy...')
295
 
        logging.debug('hello, test is running')
296
 
        ##assert 0
297
 
 
298
 
 
299
 
def selftest(verbose=False, pattern=".*"):
 
2641
                transport.mkdir(urlutils.escape(name[:-1]))
 
2642
            else:
 
2643
                if line_endings == 'binary':
 
2644
                    end = '\n'
 
2645
                elif line_endings == 'native':
 
2646
                    end = os.linesep
 
2647
                else:
 
2648
                    raise errors.BzrError(
 
2649
                        'Invalid line ending request %r' % line_endings)
 
2650
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2651
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2652
 
 
2653
    def build_tree_contents(self, shape):
 
2654
        build_tree_contents(shape)
 
2655
 
 
2656
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2657
        """Assert whether path or paths are in the WorkingTree"""
 
2658
        if tree is None:
 
2659
            tree = workingtree.WorkingTree.open(root_path)
 
2660
        if not isinstance(path, basestring):
 
2661
            for p in path:
 
2662
                self.assertInWorkingTree(p, tree=tree)
 
2663
        else:
 
2664
            self.assertIsNot(tree.path2id(path), None,
 
2665
                path+' not in working tree.')
 
2666
 
 
2667
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2668
        """Assert whether path or paths are not in the WorkingTree"""
 
2669
        if tree is None:
 
2670
            tree = workingtree.WorkingTree.open(root_path)
 
2671
        if not isinstance(path, basestring):
 
2672
            for p in path:
 
2673
                self.assertNotInWorkingTree(p,tree=tree)
 
2674
        else:
 
2675
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2676
 
 
2677
 
 
2678
class TestCaseWithTransport(TestCaseInTempDir):
 
2679
    """A test case that provides get_url and get_readonly_url facilities.
 
2680
 
 
2681
    These back onto two transport servers, one for readonly access and one for
 
2682
    read write access.
 
2683
 
 
2684
    If no explicit class is provided for readonly access, a
 
2685
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2686
    based read write transports.
 
2687
 
 
2688
    If an explicit class is provided for readonly access, that server and the
 
2689
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2690
    """
 
2691
 
 
2692
    def get_vfs_only_server(self):
 
2693
        """See TestCaseWithMemoryTransport.
 
2694
 
 
2695
        This is useful for some tests with specific servers that need
 
2696
        diagnostics.
 
2697
        """
 
2698
        if self.__vfs_server is None:
 
2699
            self.__vfs_server = self.vfs_transport_factory()
 
2700
            self.start_server(self.__vfs_server)
 
2701
        return self.__vfs_server
 
2702
 
 
2703
    def make_branch_and_tree(self, relpath, format=None):
 
2704
        """Create a branch on the transport and a tree locally.
 
2705
 
 
2706
        If the transport is not a LocalTransport, the Tree can't be created on
 
2707
        the transport.  In that case if the vfs_transport_factory is
 
2708
        LocalURLServer the working tree is created in the local
 
2709
        directory backing the transport, and the returned tree's branch and
 
2710
        repository will also be accessed locally. Otherwise a lightweight
 
2711
        checkout is created and returned.
 
2712
 
 
2713
        We do this because we can't physically create a tree in the local
 
2714
        path, with a branch reference to the transport_factory url, and
 
2715
        a branch + repository in the vfs_transport, unless the vfs_transport
 
2716
        namespace is distinct from the local disk - the two branch objects
 
2717
        would collide. While we could construct a tree with its branch object
 
2718
        pointing at the transport_factory transport in memory, reopening it
 
2719
        would behaving unexpectedly, and has in the past caused testing bugs
 
2720
        when we tried to do it that way.
 
2721
 
 
2722
        :param format: The BzrDirFormat.
 
2723
        :returns: the WorkingTree.
 
2724
        """
 
2725
        # TODO: always use the local disk path for the working tree,
 
2726
        # this obviously requires a format that supports branch references
 
2727
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2728
        # RBC 20060208
 
2729
        b = self.make_branch(relpath, format=format)
 
2730
        try:
 
2731
            return b.bzrdir.create_workingtree()
 
2732
        except errors.NotLocalUrl:
 
2733
            # We can only make working trees locally at the moment.  If the
 
2734
            # transport can't support them, then we keep the non-disk-backed
 
2735
            # branch and create a local checkout.
 
2736
            if self.vfs_transport_factory is LocalURLServer:
 
2737
                # the branch is colocated on disk, we cannot create a checkout.
 
2738
                # hopefully callers will expect this.
 
2739
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2740
                wt = local_controldir.create_workingtree()
 
2741
                if wt.branch._format != b._format:
 
2742
                    wt._branch = b
 
2743
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2744
                    # in case the implementation details of workingtree objects
 
2745
                    # change.
 
2746
                    self.assertIs(b, wt.branch)
 
2747
                return wt
 
2748
            else:
 
2749
                return b.create_checkout(relpath, lightweight=True)
 
2750
 
 
2751
    def assertIsDirectory(self, relpath, transport):
 
2752
        """Assert that relpath within transport is a directory.
 
2753
 
 
2754
        This may not be possible on all transports; in that case it propagates
 
2755
        a TransportNotPossible.
 
2756
        """
 
2757
        try:
 
2758
            mode = transport.stat(relpath).st_mode
 
2759
        except errors.NoSuchFile:
 
2760
            self.fail("path %s is not a directory; no such file"
 
2761
                      % (relpath))
 
2762
        if not stat.S_ISDIR(mode):
 
2763
            self.fail("path %s is not a directory; has mode %#o"
 
2764
                      % (relpath, mode))
 
2765
 
 
2766
    def assertTreesEqual(self, left, right):
 
2767
        """Check that left and right have the same content and properties."""
 
2768
        # we use a tree delta to check for equality of the content, and we
 
2769
        # manually check for equality of other things such as the parents list.
 
2770
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2771
        differences = left.changes_from(right)
 
2772
        self.assertFalse(differences.has_changed(),
 
2773
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2774
 
 
2775
    def setUp(self):
 
2776
        super(TestCaseWithTransport, self).setUp()
 
2777
        self.__vfs_server = None
 
2778
 
 
2779
    def disable_missing_extensions_warning(self):
 
2780
        """Some tests expect a precise stderr content.
 
2781
 
 
2782
        There is no point in forcing them to duplicate the extension related
 
2783
        warning.
 
2784
        """
 
2785
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
2786
 
 
2787
 
 
2788
class ChrootedTestCase(TestCaseWithTransport):
 
2789
    """A support class that provides readonly urls outside the local namespace.
 
2790
 
 
2791
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2792
    is then we are chrooted already, if it is not then an HttpServer is used
 
2793
    for readonly urls.
 
2794
 
 
2795
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2796
                       be used without needed to redo it when a different
 
2797
                       subclass is in use ?
 
2798
    """
 
2799
 
 
2800
    def setUp(self):
 
2801
        super(ChrootedTestCase, self).setUp()
 
2802
        if not self.vfs_transport_factory == MemoryServer:
 
2803
            self.transport_readonly_server = HttpServer
 
2804
 
 
2805
 
 
2806
def condition_id_re(pattern):
 
2807
    """Create a condition filter which performs a re check on a test's id.
 
2808
 
 
2809
    :param pattern: A regular expression string.
 
2810
    :return: A callable that returns True if the re matches.
 
2811
    """
 
2812
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2813
        'test filter')
 
2814
    def condition(test):
 
2815
        test_id = test.id()
 
2816
        return filter_re.search(test_id)
 
2817
    return condition
 
2818
 
 
2819
 
 
2820
def condition_isinstance(klass_or_klass_list):
 
2821
    """Create a condition filter which returns isinstance(param, klass).
 
2822
 
 
2823
    :return: A callable which when called with one parameter obj return the
 
2824
        result of isinstance(obj, klass_or_klass_list).
 
2825
    """
 
2826
    def condition(obj):
 
2827
        return isinstance(obj, klass_or_klass_list)
 
2828
    return condition
 
2829
 
 
2830
 
 
2831
def condition_id_in_list(id_list):
 
2832
    """Create a condition filter which verify that test's id in a list.
 
2833
 
 
2834
    :param id_list: A TestIdList object.
 
2835
    :return: A callable that returns True if the test's id appears in the list.
 
2836
    """
 
2837
    def condition(test):
 
2838
        return id_list.includes(test.id())
 
2839
    return condition
 
2840
 
 
2841
 
 
2842
def condition_id_startswith(starts):
 
2843
    """Create a condition filter verifying that test's id starts with a string.
 
2844
 
 
2845
    :param starts: A list of string.
 
2846
    :return: A callable that returns True if the test's id starts with one of
 
2847
        the given strings.
 
2848
    """
 
2849
    def condition(test):
 
2850
        for start in starts:
 
2851
            if test.id().startswith(start):
 
2852
                return True
 
2853
        return False
 
2854
    return condition
 
2855
 
 
2856
 
 
2857
def exclude_tests_by_condition(suite, condition):
 
2858
    """Create a test suite which excludes some tests from suite.
 
2859
 
 
2860
    :param suite: The suite to get tests from.
 
2861
    :param condition: A callable whose result evaluates True when called with a
 
2862
        test case which should be excluded from the result.
 
2863
    :return: A suite which contains the tests found in suite that fail
 
2864
        condition.
 
2865
    """
 
2866
    result = []
 
2867
    for test in iter_suite_tests(suite):
 
2868
        if not condition(test):
 
2869
            result.append(test)
 
2870
    return TestUtil.TestSuite(result)
 
2871
 
 
2872
 
 
2873
def filter_suite_by_condition(suite, condition):
 
2874
    """Create a test suite by filtering another one.
 
2875
 
 
2876
    :param suite: The source suite.
 
2877
    :param condition: A callable whose result evaluates True when called with a
 
2878
        test case which should be included in the result.
 
2879
    :return: A suite which contains the tests found in suite that pass
 
2880
        condition.
 
2881
    """
 
2882
    result = []
 
2883
    for test in iter_suite_tests(suite):
 
2884
        if condition(test):
 
2885
            result.append(test)
 
2886
    return TestUtil.TestSuite(result)
 
2887
 
 
2888
 
 
2889
def filter_suite_by_re(suite, pattern):
 
2890
    """Create a test suite by filtering another one.
 
2891
 
 
2892
    :param suite:           the source suite
 
2893
    :param pattern:         pattern that names must match
 
2894
    :returns: the newly created suite
 
2895
    """
 
2896
    condition = condition_id_re(pattern)
 
2897
    result_suite = filter_suite_by_condition(suite, condition)
 
2898
    return result_suite
 
2899
 
 
2900
 
 
2901
def filter_suite_by_id_list(suite, test_id_list):
 
2902
    """Create a test suite by filtering another one.
 
2903
 
 
2904
    :param suite: The source suite.
 
2905
    :param test_id_list: A list of the test ids to keep as strings.
 
2906
    :returns: the newly created suite
 
2907
    """
 
2908
    condition = condition_id_in_list(test_id_list)
 
2909
    result_suite = filter_suite_by_condition(suite, condition)
 
2910
    return result_suite
 
2911
 
 
2912
 
 
2913
def filter_suite_by_id_startswith(suite, start):
 
2914
    """Create a test suite by filtering another one.
 
2915
 
 
2916
    :param suite: The source suite.
 
2917
    :param start: A list of string the test id must start with one of.
 
2918
    :returns: the newly created suite
 
2919
    """
 
2920
    condition = condition_id_startswith(start)
 
2921
    result_suite = filter_suite_by_condition(suite, condition)
 
2922
    return result_suite
 
2923
 
 
2924
 
 
2925
def exclude_tests_by_re(suite, pattern):
 
2926
    """Create a test suite which excludes some tests from suite.
 
2927
 
 
2928
    :param suite: The suite to get tests from.
 
2929
    :param pattern: A regular expression string. Test ids that match this
 
2930
        pattern will be excluded from the result.
 
2931
    :return: A TestSuite that contains all the tests from suite without the
 
2932
        tests that matched pattern. The order of tests is the same as it was in
 
2933
        suite.
 
2934
    """
 
2935
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2936
 
 
2937
 
 
2938
def preserve_input(something):
 
2939
    """A helper for performing test suite transformation chains.
 
2940
 
 
2941
    :param something: Anything you want to preserve.
 
2942
    :return: Something.
 
2943
    """
 
2944
    return something
 
2945
 
 
2946
 
 
2947
def randomize_suite(suite):
 
2948
    """Return a new TestSuite with suite's tests in random order.
 
2949
 
 
2950
    The tests in the input suite are flattened into a single suite in order to
 
2951
    accomplish this. Any nested TestSuites are removed to provide global
 
2952
    randomness.
 
2953
    """
 
2954
    tests = list(iter_suite_tests(suite))
 
2955
    random.shuffle(tests)
 
2956
    return TestUtil.TestSuite(tests)
 
2957
 
 
2958
 
 
2959
def split_suite_by_condition(suite, condition):
 
2960
    """Split a test suite into two by a condition.
 
2961
 
 
2962
    :param suite: The suite to split.
 
2963
    :param condition: The condition to match on. Tests that match this
 
2964
        condition are returned in the first test suite, ones that do not match
 
2965
        are in the second suite.
 
2966
    :return: A tuple of two test suites, where the first contains tests from
 
2967
        suite matching the condition, and the second contains the remainder
 
2968
        from suite. The order within each output suite is the same as it was in
 
2969
        suite.
 
2970
    """
 
2971
    matched = []
 
2972
    did_not_match = []
 
2973
    for test in iter_suite_tests(suite):
 
2974
        if condition(test):
 
2975
            matched.append(test)
 
2976
        else:
 
2977
            did_not_match.append(test)
 
2978
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2979
 
 
2980
 
 
2981
def split_suite_by_re(suite, pattern):
 
2982
    """Split a test suite into two by a regular expression.
 
2983
 
 
2984
    :param suite: The suite to split.
 
2985
    :param pattern: A regular expression string. Test ids that match this
 
2986
        pattern will be in the first test suite returned, and the others in the
 
2987
        second test suite returned.
 
2988
    :return: A tuple of two test suites, where the first contains tests from
 
2989
        suite matching pattern, and the second contains the remainder from
 
2990
        suite. The order within each output suite is the same as it was in
 
2991
        suite.
 
2992
    """
 
2993
    return split_suite_by_condition(suite, condition_id_re(pattern))
 
2994
 
 
2995
 
 
2996
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2997
              stop_on_failure=False,
 
2998
              transport=None, lsprof_timed=None, bench_history=None,
 
2999
              matching_tests_first=None,
 
3000
              list_only=False,
 
3001
              random_seed=None,
 
3002
              exclude_pattern=None,
 
3003
              strict=False,
 
3004
              runner_class=None,
 
3005
              suite_decorators=None,
 
3006
              stream=None,
 
3007
              result_decorators=None,
 
3008
              ):
 
3009
    """Run a test suite for bzr selftest.
 
3010
 
 
3011
    :param runner_class: The class of runner to use. Must support the
 
3012
        constructor arguments passed by run_suite which are more than standard
 
3013
        python uses.
 
3014
    :return: A boolean indicating success.
 
3015
    """
 
3016
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
3017
    if verbose:
 
3018
        verbosity = 2
 
3019
    else:
 
3020
        verbosity = 1
 
3021
    if runner_class is None:
 
3022
        runner_class = TextTestRunner
 
3023
    if stream is None:
 
3024
        stream = sys.stdout
 
3025
    runner = runner_class(stream=stream,
 
3026
                            descriptions=0,
 
3027
                            verbosity=verbosity,
 
3028
                            bench_history=bench_history,
 
3029
                            strict=strict,
 
3030
                            result_decorators=result_decorators,
 
3031
                            )
 
3032
    runner.stop_on_failure=stop_on_failure
 
3033
    # built in decorator factories:
 
3034
    decorators = [
 
3035
        random_order(random_seed, runner),
 
3036
        exclude_tests(exclude_pattern),
 
3037
        ]
 
3038
    if matching_tests_first:
 
3039
        decorators.append(tests_first(pattern))
 
3040
    else:
 
3041
        decorators.append(filter_tests(pattern))
 
3042
    if suite_decorators:
 
3043
        decorators.extend(suite_decorators)
 
3044
    # tell the result object how many tests will be running: (except if
 
3045
    # --parallel=fork is being used. Robert said he will provide a better
 
3046
    # progress design later -- vila 20090817)
 
3047
    if fork_decorator not in decorators:
 
3048
        decorators.append(CountingDecorator)
 
3049
    for decorator in decorators:
 
3050
        suite = decorator(suite)
 
3051
    if list_only:
 
3052
        # Done after test suite decoration to allow randomisation etc
 
3053
        # to take effect, though that is of marginal benefit.
 
3054
        if verbosity >= 2:
 
3055
            stream.write("Listing tests only ...\n")
 
3056
        for t in iter_suite_tests(suite):
 
3057
            stream.write("%s\n" % (t.id()))
 
3058
        return True
 
3059
    result = runner.run(suite)
 
3060
    if strict:
 
3061
        return result.wasStrictlySuccessful()
 
3062
    else:
 
3063
        return result.wasSuccessful()
 
3064
 
 
3065
 
 
3066
# A registry where get() returns a suite decorator.
 
3067
parallel_registry = registry.Registry()
 
3068
 
 
3069
 
 
3070
def fork_decorator(suite):
 
3071
    concurrency = osutils.local_concurrency()
 
3072
    if concurrency == 1:
 
3073
        return suite
 
3074
    from testtools import ConcurrentTestSuite
 
3075
    return ConcurrentTestSuite(suite, fork_for_tests)
 
3076
parallel_registry.register('fork', fork_decorator)
 
3077
 
 
3078
 
 
3079
def subprocess_decorator(suite):
 
3080
    concurrency = osutils.local_concurrency()
 
3081
    if concurrency == 1:
 
3082
        return suite
 
3083
    from testtools import ConcurrentTestSuite
 
3084
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
3085
parallel_registry.register('subprocess', subprocess_decorator)
 
3086
 
 
3087
 
 
3088
def exclude_tests(exclude_pattern):
 
3089
    """Return a test suite decorator that excludes tests."""
 
3090
    if exclude_pattern is None:
 
3091
        return identity_decorator
 
3092
    def decorator(suite):
 
3093
        return ExcludeDecorator(suite, exclude_pattern)
 
3094
    return decorator
 
3095
 
 
3096
 
 
3097
def filter_tests(pattern):
 
3098
    if pattern == '.*':
 
3099
        return identity_decorator
 
3100
    def decorator(suite):
 
3101
        return FilterTestsDecorator(suite, pattern)
 
3102
    return decorator
 
3103
 
 
3104
 
 
3105
def random_order(random_seed, runner):
 
3106
    """Return a test suite decorator factory for randomising tests order.
 
3107
    
 
3108
    :param random_seed: now, a string which casts to a long, or a long.
 
3109
    :param runner: A test runner with a stream attribute to report on.
 
3110
    """
 
3111
    if random_seed is None:
 
3112
        return identity_decorator
 
3113
    def decorator(suite):
 
3114
        return RandomDecorator(suite, random_seed, runner.stream)
 
3115
    return decorator
 
3116
 
 
3117
 
 
3118
def tests_first(pattern):
 
3119
    if pattern == '.*':
 
3120
        return identity_decorator
 
3121
    def decorator(suite):
 
3122
        return TestFirstDecorator(suite, pattern)
 
3123
    return decorator
 
3124
 
 
3125
 
 
3126
def identity_decorator(suite):
 
3127
    """Return suite."""
 
3128
    return suite
 
3129
 
 
3130
 
 
3131
class TestDecorator(TestSuite):
 
3132
    """A decorator for TestCase/TestSuite objects.
 
3133
    
 
3134
    Usually, subclasses should override __iter__(used when flattening test
 
3135
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
3136
    debug().
 
3137
    """
 
3138
 
 
3139
    def __init__(self, suite):
 
3140
        TestSuite.__init__(self)
 
3141
        self.addTest(suite)
 
3142
 
 
3143
    def countTestCases(self):
 
3144
        cases = 0
 
3145
        for test in self:
 
3146
            cases += test.countTestCases()
 
3147
        return cases
 
3148
 
 
3149
    def debug(self):
 
3150
        for test in self:
 
3151
            test.debug()
 
3152
 
 
3153
    def run(self, result):
 
3154
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
3155
        # into __iter__.
 
3156
        for test in self:
 
3157
            if result.shouldStop:
 
3158
                break
 
3159
            test.run(result)
 
3160
        return result
 
3161
 
 
3162
 
 
3163
class CountingDecorator(TestDecorator):
 
3164
    """A decorator which calls result.progress(self.countTestCases)."""
 
3165
 
 
3166
    def run(self, result):
 
3167
        progress_method = getattr(result, 'progress', None)
 
3168
        if callable(progress_method):
 
3169
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
3170
        return super(CountingDecorator, self).run(result)
 
3171
 
 
3172
 
 
3173
class ExcludeDecorator(TestDecorator):
 
3174
    """A decorator which excludes test matching an exclude pattern."""
 
3175
 
 
3176
    def __init__(self, suite, exclude_pattern):
 
3177
        TestDecorator.__init__(self, suite)
 
3178
        self.exclude_pattern = exclude_pattern
 
3179
        self.excluded = False
 
3180
 
 
3181
    def __iter__(self):
 
3182
        if self.excluded:
 
3183
            return iter(self._tests)
 
3184
        self.excluded = True
 
3185
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
3186
        del self._tests[:]
 
3187
        self.addTests(suite)
 
3188
        return iter(self._tests)
 
3189
 
 
3190
 
 
3191
class FilterTestsDecorator(TestDecorator):
 
3192
    """A decorator which filters tests to those matching a pattern."""
 
3193
 
 
3194
    def __init__(self, suite, pattern):
 
3195
        TestDecorator.__init__(self, suite)
 
3196
        self.pattern = pattern
 
3197
        self.filtered = False
 
3198
 
 
3199
    def __iter__(self):
 
3200
        if self.filtered:
 
3201
            return iter(self._tests)
 
3202
        self.filtered = True
 
3203
        suite = filter_suite_by_re(self, self.pattern)
 
3204
        del self._tests[:]
 
3205
        self.addTests(suite)
 
3206
        return iter(self._tests)
 
3207
 
 
3208
 
 
3209
class RandomDecorator(TestDecorator):
 
3210
    """A decorator which randomises the order of its tests."""
 
3211
 
 
3212
    def __init__(self, suite, random_seed, stream):
 
3213
        TestDecorator.__init__(self, suite)
 
3214
        self.random_seed = random_seed
 
3215
        self.randomised = False
 
3216
        self.stream = stream
 
3217
 
 
3218
    def __iter__(self):
 
3219
        if self.randomised:
 
3220
            return iter(self._tests)
 
3221
        self.randomised = True
 
3222
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
3223
            (self.actual_seed()))
 
3224
        # Initialise the random number generator.
 
3225
        random.seed(self.actual_seed())
 
3226
        suite = randomize_suite(self)
 
3227
        del self._tests[:]
 
3228
        self.addTests(suite)
 
3229
        return iter(self._tests)
 
3230
 
 
3231
    def actual_seed(self):
 
3232
        if self.random_seed == "now":
 
3233
            # We convert the seed to a long to make it reuseable across
 
3234
            # invocations (because the user can reenter it).
 
3235
            self.random_seed = long(time.time())
 
3236
        else:
 
3237
            # Convert the seed to a long if we can
 
3238
            try:
 
3239
                self.random_seed = long(self.random_seed)
 
3240
            except:
 
3241
                pass
 
3242
        return self.random_seed
 
3243
 
 
3244
 
 
3245
class TestFirstDecorator(TestDecorator):
 
3246
    """A decorator which moves named tests to the front."""
 
3247
 
 
3248
    def __init__(self, suite, pattern):
 
3249
        TestDecorator.__init__(self, suite)
 
3250
        self.pattern = pattern
 
3251
        self.filtered = False
 
3252
 
 
3253
    def __iter__(self):
 
3254
        if self.filtered:
 
3255
            return iter(self._tests)
 
3256
        self.filtered = True
 
3257
        suites = split_suite_by_re(self, self.pattern)
 
3258
        del self._tests[:]
 
3259
        self.addTests(suites)
 
3260
        return iter(self._tests)
 
3261
 
 
3262
 
 
3263
def partition_tests(suite, count):
 
3264
    """Partition suite into count lists of tests."""
 
3265
    result = []
 
3266
    tests = list(iter_suite_tests(suite))
 
3267
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3268
    for block in range(count):
 
3269
        low_test = block * tests_per_process
 
3270
        high_test = low_test + tests_per_process
 
3271
        process_tests = tests[low_test:high_test]
 
3272
        result.append(process_tests)
 
3273
    return result
 
3274
 
 
3275
 
 
3276
def fork_for_tests(suite):
 
3277
    """Take suite and start up one runner per CPU by forking()
 
3278
 
 
3279
    :return: An iterable of TestCase-like objects which can each have
 
3280
        run(result) called on them to feed tests to result.
 
3281
    """
 
3282
    concurrency = osutils.local_concurrency()
 
3283
    result = []
 
3284
    from subunit import TestProtocolClient, ProtocolTestCase
 
3285
    class TestInOtherProcess(ProtocolTestCase):
 
3286
        # Should be in subunit, I think. RBC.
 
3287
        def __init__(self, stream, pid):
 
3288
            ProtocolTestCase.__init__(self, stream)
 
3289
            self.pid = pid
 
3290
 
 
3291
        def run(self, result):
 
3292
            try:
 
3293
                ProtocolTestCase.run(self, result)
 
3294
            finally:
 
3295
                os.waitpid(self.pid, os.WNOHANG)
 
3296
 
 
3297
    test_blocks = partition_tests(suite, concurrency)
 
3298
    for process_tests in test_blocks:
 
3299
        process_suite = TestSuite()
 
3300
        process_suite.addTests(process_tests)
 
3301
        c2pread, c2pwrite = os.pipe()
 
3302
        pid = os.fork()
 
3303
        if pid == 0:
 
3304
            try:
 
3305
                os.close(c2pread)
 
3306
                # Leave stderr and stdout open so we can see test noise
 
3307
                # Close stdin so that the child goes away if it decides to
 
3308
                # read from stdin (otherwise its a roulette to see what
 
3309
                # child actually gets keystrokes for pdb etc).
 
3310
                sys.stdin.close()
 
3311
                sys.stdin = None
 
3312
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3313
                subunit_result = BzrAutoTimingTestResultDecorator(
 
3314
                    TestProtocolClient(stream))
 
3315
                process_suite.run(subunit_result)
 
3316
            finally:
 
3317
                os._exit(0)
 
3318
        else:
 
3319
            os.close(c2pwrite)
 
3320
            stream = os.fdopen(c2pread, 'rb', 1)
 
3321
            test = TestInOtherProcess(stream, pid)
 
3322
            result.append(test)
 
3323
    return result
 
3324
 
 
3325
 
 
3326
def reinvoke_for_tests(suite):
 
3327
    """Take suite and start up one runner per CPU using subprocess().
 
3328
 
 
3329
    :return: An iterable of TestCase-like objects which can each have
 
3330
        run(result) called on them to feed tests to result.
 
3331
    """
 
3332
    concurrency = osutils.local_concurrency()
 
3333
    result = []
 
3334
    from subunit import ProtocolTestCase
 
3335
    class TestInSubprocess(ProtocolTestCase):
 
3336
        def __init__(self, process, name):
 
3337
            ProtocolTestCase.__init__(self, process.stdout)
 
3338
            self.process = process
 
3339
            self.process.stdin.close()
 
3340
            self.name = name
 
3341
 
 
3342
        def run(self, result):
 
3343
            try:
 
3344
                ProtocolTestCase.run(self, result)
 
3345
            finally:
 
3346
                self.process.wait()
 
3347
                os.unlink(self.name)
 
3348
            # print "pid %d finished" % finished_process
 
3349
    test_blocks = partition_tests(suite, concurrency)
 
3350
    for process_tests in test_blocks:
 
3351
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3352
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3353
        if not os.path.isfile(bzr_path):
 
3354
            # We are probably installed. Assume sys.argv is the right file
 
3355
            bzr_path = sys.argv[0]
 
3356
        bzr_path = [bzr_path]
 
3357
        if sys.platform == "win32":
 
3358
            # if we're on windows, we can't execute the bzr script directly
 
3359
            bzr_path = [sys.executable] + bzr_path
 
3360
        fd, test_list_file_name = tempfile.mkstemp()
 
3361
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3362
        for test in process_tests:
 
3363
            test_list_file.write(test.id() + '\n')
 
3364
        test_list_file.close()
 
3365
        try:
 
3366
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3367
                '--subunit']
 
3368
            if '--no-plugins' in sys.argv:
 
3369
                argv.append('--no-plugins')
 
3370
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3371
            # stderr it can interrupt the subunit protocol.
 
3372
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3373
                bufsize=1)
 
3374
            test = TestInSubprocess(process, test_list_file_name)
 
3375
            result.append(test)
 
3376
        except:
 
3377
            os.unlink(test_list_file_name)
 
3378
            raise
 
3379
    return result
 
3380
 
 
3381
 
 
3382
class ForwardingResult(unittest.TestResult):
 
3383
 
 
3384
    def __init__(self, target):
 
3385
        unittest.TestResult.__init__(self)
 
3386
        self.result = target
 
3387
 
 
3388
    def startTest(self, test):
 
3389
        self.result.startTest(test)
 
3390
 
 
3391
    def stopTest(self, test):
 
3392
        self.result.stopTest(test)
 
3393
 
 
3394
    def startTestRun(self):
 
3395
        self.result.startTestRun()
 
3396
 
 
3397
    def stopTestRun(self):
 
3398
        self.result.stopTestRun()
 
3399
 
 
3400
    def addSkip(self, test, reason):
 
3401
        self.result.addSkip(test, reason)
 
3402
 
 
3403
    def addSuccess(self, test):
 
3404
        self.result.addSuccess(test)
 
3405
 
 
3406
    def addError(self, test, err):
 
3407
        self.result.addError(test, err)
 
3408
 
 
3409
    def addFailure(self, test, err):
 
3410
        self.result.addFailure(test, err)
 
3411
 
 
3412
 
 
3413
class BZRTransformingResult(ForwardingResult):
 
3414
 
 
3415
    def addError(self, test, err):
 
3416
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3417
        if feature is not None:
 
3418
            self.result.addNotSupported(test, feature)
 
3419
        else:
 
3420
            self.result.addError(test, err)
 
3421
 
 
3422
    def addFailure(self, test, err):
 
3423
        known = self._error_looks_like('KnownFailure: ', err)
 
3424
        if known is not None:
 
3425
            self.result.addExpectedFailure(test,
 
3426
                [KnownFailure, KnownFailure(known), None])
 
3427
        else:
 
3428
            self.result.addFailure(test, err)
 
3429
 
 
3430
    def _error_looks_like(self, prefix, err):
 
3431
        """Deserialize exception and returns the stringify value."""
 
3432
        import subunit
 
3433
        value = None
 
3434
        typ, exc, _ = err
 
3435
        if isinstance(exc, subunit.RemoteException):
 
3436
            # stringify the exception gives access to the remote traceback
 
3437
            # We search the last line for 'prefix'
 
3438
            lines = str(exc).split('\n')
 
3439
            while lines and not lines[-1]:
 
3440
                lines.pop(-1)
 
3441
            if lines:
 
3442
                if lines[-1].startswith(prefix):
 
3443
                    value = lines[-1][len(prefix):]
 
3444
        return value
 
3445
 
 
3446
 
 
3447
try:
 
3448
    from subunit.test_results import AutoTimingTestResultDecorator
 
3449
    # Expected failure should be seen as a success not a failure Once subunit
 
3450
    # provide native support for that, BZRTransformingResult and this class
 
3451
    # will become useless.
 
3452
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
 
3453
 
 
3454
        def addExpectedFailure(self, test, err):
 
3455
            self._before_event()
 
3456
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
 
3457
                                    test, err)
 
3458
except ImportError:
 
3459
    # Let's just define a no-op decorator
 
3460
    BzrAutoTimingTestResultDecorator = lambda x:x
 
3461
 
 
3462
 
 
3463
class ProfileResult(ForwardingResult):
 
3464
    """Generate profiling data for all activity between start and success.
 
3465
    
 
3466
    The profile data is appended to the test's _benchcalls attribute and can
 
3467
    be accessed by the forwarded-to TestResult.
 
3468
 
 
3469
    While it might be cleaner do accumulate this in stopTest, addSuccess is
 
3470
    where our existing output support for lsprof is, and this class aims to
 
3471
    fit in with that: while it could be moved it's not necessary to accomplish
 
3472
    test profiling, nor would it be dramatically cleaner.
 
3473
    """
 
3474
 
 
3475
    def startTest(self, test):
 
3476
        self.profiler = bzrlib.lsprof.BzrProfiler()
 
3477
        self.profiler.start()
 
3478
        ForwardingResult.startTest(self, test)
 
3479
 
 
3480
    def addSuccess(self, test):
 
3481
        stats = self.profiler.stop()
 
3482
        try:
 
3483
            calls = test._benchcalls
 
3484
        except AttributeError:
 
3485
            test._benchcalls = []
 
3486
            calls = test._benchcalls
 
3487
        calls.append(((test.id(), "", ""), stats))
 
3488
        ForwardingResult.addSuccess(self, test)
 
3489
 
 
3490
    def stopTest(self, test):
 
3491
        ForwardingResult.stopTest(self, test)
 
3492
        self.profiler = None
 
3493
 
 
3494
 
 
3495
# Controlled by "bzr selftest -E=..." option
 
3496
# Currently supported:
 
3497
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3498
#                           preserves any flags supplied at the command line.
 
3499
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3500
#                           rather than failing tests. And no longer raise
 
3501
#                           LockContention when fctnl locks are not being used
 
3502
#                           with proper exclusion rules.
 
3503
selftest_debug_flags = set()
 
3504
 
 
3505
 
 
3506
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
3507
             transport=None,
 
3508
             test_suite_factory=None,
 
3509
             lsprof_timed=None,
 
3510
             bench_history=None,
 
3511
             matching_tests_first=None,
 
3512
             list_only=False,
 
3513
             random_seed=None,
 
3514
             exclude_pattern=None,
 
3515
             strict=False,
 
3516
             load_list=None,
 
3517
             debug_flags=None,
 
3518
             starting_with=None,
 
3519
             runner_class=None,
 
3520
             suite_decorators=None,
 
3521
             stream=None,
 
3522
             lsprof_tests=False,
 
3523
             ):
300
3524
    """Run the whole test suite under the enhanced runner"""
301
 
    return testsweet.run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
302
 
 
303
 
 
304
 
def test_suite():
305
 
    """Build and return TestSuite for the whole program."""
306
 
    from bzrlib.selftest.TestUtil import TestLoader, TestSuite
307
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
308
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
309
 
    from doctest import DocTestSuite
310
 
    import os
311
 
    import shutil
312
 
    import time
313
 
    import sys
314
 
 
315
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
316
 
 
317
 
    testmod_names = \
318
 
                  ['bzrlib.selftest.MetaTestLog',
319
 
                   'bzrlib.selftest.testinv',
320
 
                   'bzrlib.selftest.versioning',
321
 
                   'bzrlib.selftest.testmerge3',
322
 
                   'bzrlib.selftest.testhashcache',
323
 
                   'bzrlib.selftest.teststatus',
324
 
                   'bzrlib.selftest.testlog',
325
 
                   'bzrlib.selftest.testrevisionnamespaces',
326
 
                   'bzrlib.selftest.testbranch',
327
 
#                   'bzrlib.selftest.testrevision',
328
 
#                   'bzrlib.selftest.test_merge_core',
329
 
                   'bzrlib.selftest.test_smart_add',
330
 
                   'bzrlib.selftest.testdiff',
331
 
#                   'bzrlib.selftest.test_parent',
332
 
                   'bzrlib.selftest.test_xml',
333
 
#                   'bzrlib.selftest.testfetch',
334
 
#                   'bzrlib.selftest.whitebox',
335
 
                   'bzrlib.selftest.teststore',
336
 
#                   'bzrlib.selftest.blackbox',
337
 
                   ]
338
 
 
339
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
340
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
341
 
        if m not in MODULES_TO_DOCTEST:
342
 
            MODULES_TO_DOCTEST.append(m)
343
 
 
344
 
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
345
 
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
346
 
    print
347
 
    suite = TestSuite()
348
 
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
349
 
    for m in MODULES_TO_TEST:
350
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
351
 
    for m in (MODULES_TO_DOCTEST):
352
 
        suite.addTest(DocTestSuite(m))
353
 
    for p in bzrlib.plugin.all_plugins:
354
 
        if hasattr(p, 'test_suite'):
355
 
            suite.addTest(p.test_suite())
 
3525
    # XXX: Very ugly way to do this...
 
3526
    # Disable warning about old formats because we don't want it to disturb
 
3527
    # any blackbox tests.
 
3528
    from bzrlib import repository
 
3529
    repository._deprecation_warning_done = True
 
3530
 
 
3531
    global default_transport
 
3532
    if transport is None:
 
3533
        transport = default_transport
 
3534
    old_transport = default_transport
 
3535
    default_transport = transport
 
3536
    global selftest_debug_flags
 
3537
    old_debug_flags = selftest_debug_flags
 
3538
    if debug_flags is not None:
 
3539
        selftest_debug_flags = set(debug_flags)
 
3540
    try:
 
3541
        if load_list is None:
 
3542
            keep_only = None
 
3543
        else:
 
3544
            keep_only = load_test_id_list(load_list)
 
3545
        if starting_with:
 
3546
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3547
                             for start in starting_with]
 
3548
        if test_suite_factory is None:
 
3549
            # Reduce loading time by loading modules based on the starting_with
 
3550
            # patterns.
 
3551
            suite = test_suite(keep_only, starting_with)
 
3552
        else:
 
3553
            suite = test_suite_factory()
 
3554
        if starting_with:
 
3555
            # But always filter as requested.
 
3556
            suite = filter_suite_by_id_startswith(suite, starting_with)
 
3557
        result_decorators = []
 
3558
        if lsprof_tests:
 
3559
            result_decorators.append(ProfileResult)
 
3560
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
3561
                     stop_on_failure=stop_on_failure,
 
3562
                     transport=transport,
 
3563
                     lsprof_timed=lsprof_timed,
 
3564
                     bench_history=bench_history,
 
3565
                     matching_tests_first=matching_tests_first,
 
3566
                     list_only=list_only,
 
3567
                     random_seed=random_seed,
 
3568
                     exclude_pattern=exclude_pattern,
 
3569
                     strict=strict,
 
3570
                     runner_class=runner_class,
 
3571
                     suite_decorators=suite_decorators,
 
3572
                     stream=stream,
 
3573
                     result_decorators=result_decorators,
 
3574
                     )
 
3575
    finally:
 
3576
        default_transport = old_transport
 
3577
        selftest_debug_flags = old_debug_flags
 
3578
 
 
3579
 
 
3580
def load_test_id_list(file_name):
 
3581
    """Load a test id list from a text file.
 
3582
 
 
3583
    The format is one test id by line.  No special care is taken to impose
 
3584
    strict rules, these test ids are used to filter the test suite so a test id
 
3585
    that do not match an existing test will do no harm. This allows user to add
 
3586
    comments, leave blank lines, etc.
 
3587
    """
 
3588
    test_list = []
 
3589
    try:
 
3590
        ftest = open(file_name, 'rt')
 
3591
    except IOError, e:
 
3592
        if e.errno != errno.ENOENT:
 
3593
            raise
 
3594
        else:
 
3595
            raise errors.NoSuchFile(file_name)
 
3596
 
 
3597
    for test_name in ftest.readlines():
 
3598
        test_list.append(test_name.strip())
 
3599
    ftest.close()
 
3600
    return test_list
 
3601
 
 
3602
 
 
3603
def suite_matches_id_list(test_suite, id_list):
 
3604
    """Warns about tests not appearing or appearing more than once.
 
3605
 
 
3606
    :param test_suite: A TestSuite object.
 
3607
    :param test_id_list: The list of test ids that should be found in
 
3608
         test_suite.
 
3609
 
 
3610
    :return: (absents, duplicates) absents is a list containing the test found
 
3611
        in id_list but not in test_suite, duplicates is a list containing the
 
3612
        test found multiple times in test_suite.
 
3613
 
 
3614
    When using a prefined test id list, it may occurs that some tests do not
 
3615
    exist anymore or that some tests use the same id. This function warns the
 
3616
    tester about potential problems in his workflow (test lists are volatile)
 
3617
    or in the test suite itself (using the same id for several tests does not
 
3618
    help to localize defects).
 
3619
    """
 
3620
    # Build a dict counting id occurrences
 
3621
    tests = dict()
 
3622
    for test in iter_suite_tests(test_suite):
 
3623
        id = test.id()
 
3624
        tests[id] = tests.get(id, 0) + 1
 
3625
 
 
3626
    not_found = []
 
3627
    duplicates = []
 
3628
    for id in id_list:
 
3629
        occurs = tests.get(id, 0)
 
3630
        if not occurs:
 
3631
            not_found.append(id)
 
3632
        elif occurs > 1:
 
3633
            duplicates.append(id)
 
3634
 
 
3635
    return not_found, duplicates
 
3636
 
 
3637
 
 
3638
class TestIdList(object):
 
3639
    """Test id list to filter a test suite.
 
3640
 
 
3641
    Relying on the assumption that test ids are built as:
 
3642
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3643
    notation, this class offers methods to :
 
3644
    - avoid building a test suite for modules not refered to in the test list,
 
3645
    - keep only the tests listed from the module test suite.
 
3646
    """
 
3647
 
 
3648
    def __init__(self, test_id_list):
 
3649
        # When a test suite needs to be filtered against us we compare test ids
 
3650
        # for equality, so a simple dict offers a quick and simple solution.
 
3651
        self.tests = dict().fromkeys(test_id_list, True)
 
3652
 
 
3653
        # While unittest.TestCase have ids like:
 
3654
        # <module>.<class>.<method>[(<param+)],
 
3655
        # doctest.DocTestCase can have ids like:
 
3656
        # <module>
 
3657
        # <module>.<class>
 
3658
        # <module>.<function>
 
3659
        # <module>.<class>.<method>
 
3660
 
 
3661
        # Since we can't predict a test class from its name only, we settle on
 
3662
        # a simple constraint: a test id always begins with its module name.
 
3663
 
 
3664
        modules = {}
 
3665
        for test_id in test_id_list:
 
3666
            parts = test_id.split('.')
 
3667
            mod_name = parts.pop(0)
 
3668
            modules[mod_name] = True
 
3669
            for part in parts:
 
3670
                mod_name += '.' + part
 
3671
                modules[mod_name] = True
 
3672
        self.modules = modules
 
3673
 
 
3674
    def refers_to(self, module_name):
 
3675
        """Is there tests for the module or one of its sub modules."""
 
3676
        return self.modules.has_key(module_name)
 
3677
 
 
3678
    def includes(self, test_id):
 
3679
        return self.tests.has_key(test_id)
 
3680
 
 
3681
 
 
3682
class TestPrefixAliasRegistry(registry.Registry):
 
3683
    """A registry for test prefix aliases.
 
3684
 
 
3685
    This helps implement shorcuts for the --starting-with selftest
 
3686
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3687
    warning will be emitted).
 
3688
    """
 
3689
 
 
3690
    def register(self, key, obj, help=None, info=None,
 
3691
                 override_existing=False):
 
3692
        """See Registry.register.
 
3693
 
 
3694
        Trying to override an existing alias causes a warning to be emitted,
 
3695
        not a fatal execption.
 
3696
        """
 
3697
        try:
 
3698
            super(TestPrefixAliasRegistry, self).register(
 
3699
                key, obj, help=help, info=info, override_existing=False)
 
3700
        except KeyError:
 
3701
            actual = self.get(key)
 
3702
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3703
                 % (key, actual, obj))
 
3704
 
 
3705
    def resolve_alias(self, id_start):
 
3706
        """Replace the alias by the prefix in the given string.
 
3707
 
 
3708
        Using an unknown prefix is an error to help catching typos.
 
3709
        """
 
3710
        parts = id_start.split('.')
 
3711
        try:
 
3712
            parts[0] = self.get(parts[0])
 
3713
        except KeyError:
 
3714
            raise errors.BzrCommandError(
 
3715
                '%s is not a known test prefix alias' % parts[0])
 
3716
        return '.'.join(parts)
 
3717
 
 
3718
 
 
3719
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3720
"""Registry of test prefix aliases."""
 
3721
 
 
3722
 
 
3723
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3724
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3725
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3726
 
 
3727
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3728
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3729
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3730
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3731
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3732
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3733
 
 
3734
 
 
3735
def _test_suite_testmod_names():
 
3736
    """Return the standard list of test module names to test."""
 
3737
    return [
 
3738
        'bzrlib.doc',
 
3739
        'bzrlib.tests.blackbox',
 
3740
        'bzrlib.tests.commands',
 
3741
        'bzrlib.tests.per_branch',
 
3742
        'bzrlib.tests.per_bzrdir',
 
3743
        'bzrlib.tests.per_foreign_vcs',
 
3744
        'bzrlib.tests.per_interrepository',
 
3745
        'bzrlib.tests.per_intertree',
 
3746
        'bzrlib.tests.per_inventory',
 
3747
        'bzrlib.tests.per_interbranch',
 
3748
        'bzrlib.tests.per_lock',
 
3749
        'bzrlib.tests.per_transport',
 
3750
        'bzrlib.tests.per_tree',
 
3751
        'bzrlib.tests.per_pack_repository',
 
3752
        'bzrlib.tests.per_repository',
 
3753
        'bzrlib.tests.per_repository_chk',
 
3754
        'bzrlib.tests.per_repository_reference',
 
3755
        'bzrlib.tests.per_uifactory',
 
3756
        'bzrlib.tests.per_versionedfile',
 
3757
        'bzrlib.tests.per_workingtree',
 
3758
        'bzrlib.tests.test__annotator',
 
3759
        'bzrlib.tests.test__chk_map',
 
3760
        'bzrlib.tests.test__dirstate_helpers',
 
3761
        'bzrlib.tests.test__groupcompress',
 
3762
        'bzrlib.tests.test__known_graph',
 
3763
        'bzrlib.tests.test__rio',
 
3764
        'bzrlib.tests.test__simple_set',
 
3765
        'bzrlib.tests.test__static_tuple',
 
3766
        'bzrlib.tests.test__walkdirs_win32',
 
3767
        'bzrlib.tests.test_ancestry',
 
3768
        'bzrlib.tests.test_annotate',
 
3769
        'bzrlib.tests.test_api',
 
3770
        'bzrlib.tests.test_atomicfile',
 
3771
        'bzrlib.tests.test_bad_files',
 
3772
        'bzrlib.tests.test_bencode',
 
3773
        'bzrlib.tests.test_bisect_multi',
 
3774
        'bzrlib.tests.test_branch',
 
3775
        'bzrlib.tests.test_branchbuilder',
 
3776
        'bzrlib.tests.test_btree_index',
 
3777
        'bzrlib.tests.test_bugtracker',
 
3778
        'bzrlib.tests.test_bundle',
 
3779
        'bzrlib.tests.test_bzrdir',
 
3780
        'bzrlib.tests.test__chunks_to_lines',
 
3781
        'bzrlib.tests.test_cache_utf8',
 
3782
        'bzrlib.tests.test_chk_map',
 
3783
        'bzrlib.tests.test_chk_serializer',
 
3784
        'bzrlib.tests.test_chunk_writer',
 
3785
        'bzrlib.tests.test_clean_tree',
 
3786
        'bzrlib.tests.test_cleanup',
 
3787
        'bzrlib.tests.test_commands',
 
3788
        'bzrlib.tests.test_commit',
 
3789
        'bzrlib.tests.test_commit_merge',
 
3790
        'bzrlib.tests.test_config',
 
3791
        'bzrlib.tests.test_conflicts',
 
3792
        'bzrlib.tests.test_counted_lock',
 
3793
        'bzrlib.tests.test_crash',
 
3794
        'bzrlib.tests.test_decorators',
 
3795
        'bzrlib.tests.test_delta',
 
3796
        'bzrlib.tests.test_debug',
 
3797
        'bzrlib.tests.test_deprecated_graph',
 
3798
        'bzrlib.tests.test_diff',
 
3799
        'bzrlib.tests.test_directory_service',
 
3800
        'bzrlib.tests.test_dirstate',
 
3801
        'bzrlib.tests.test_email_message',
 
3802
        'bzrlib.tests.test_eol_filters',
 
3803
        'bzrlib.tests.test_errors',
 
3804
        'bzrlib.tests.test_export',
 
3805
        'bzrlib.tests.test_extract',
 
3806
        'bzrlib.tests.test_fetch',
 
3807
        'bzrlib.tests.test_fifo_cache',
 
3808
        'bzrlib.tests.test_filters',
 
3809
        'bzrlib.tests.test_ftp_transport',
 
3810
        'bzrlib.tests.test_foreign',
 
3811
        'bzrlib.tests.test_generate_docs',
 
3812
        'bzrlib.tests.test_generate_ids',
 
3813
        'bzrlib.tests.test_globbing',
 
3814
        'bzrlib.tests.test_gpg',
 
3815
        'bzrlib.tests.test_graph',
 
3816
        'bzrlib.tests.test_groupcompress',
 
3817
        'bzrlib.tests.test_hashcache',
 
3818
        'bzrlib.tests.test_help',
 
3819
        'bzrlib.tests.test_hooks',
 
3820
        'bzrlib.tests.test_http',
 
3821
        'bzrlib.tests.test_http_response',
 
3822
        'bzrlib.tests.test_https_ca_bundle',
 
3823
        'bzrlib.tests.test_identitymap',
 
3824
        'bzrlib.tests.test_ignores',
 
3825
        'bzrlib.tests.test_index',
 
3826
        'bzrlib.tests.test_info',
 
3827
        'bzrlib.tests.test_inv',
 
3828
        'bzrlib.tests.test_inventory_delta',
 
3829
        'bzrlib.tests.test_knit',
 
3830
        'bzrlib.tests.test_lazy_import',
 
3831
        'bzrlib.tests.test_lazy_regex',
 
3832
        'bzrlib.tests.test_lock',
 
3833
        'bzrlib.tests.test_lockable_files',
 
3834
        'bzrlib.tests.test_lockdir',
 
3835
        'bzrlib.tests.test_log',
 
3836
        'bzrlib.tests.test_lru_cache',
 
3837
        'bzrlib.tests.test_lsprof',
 
3838
        'bzrlib.tests.test_mail_client',
 
3839
        'bzrlib.tests.test_memorytree',
 
3840
        'bzrlib.tests.test_merge',
 
3841
        'bzrlib.tests.test_merge3',
 
3842
        'bzrlib.tests.test_merge_core',
 
3843
        'bzrlib.tests.test_merge_directive',
 
3844
        'bzrlib.tests.test_missing',
 
3845
        'bzrlib.tests.test_msgeditor',
 
3846
        'bzrlib.tests.test_multiparent',
 
3847
        'bzrlib.tests.test_mutabletree',
 
3848
        'bzrlib.tests.test_nonascii',
 
3849
        'bzrlib.tests.test_options',
 
3850
        'bzrlib.tests.test_osutils',
 
3851
        'bzrlib.tests.test_osutils_encodings',
 
3852
        'bzrlib.tests.test_pack',
 
3853
        'bzrlib.tests.test_patch',
 
3854
        'bzrlib.tests.test_patches',
 
3855
        'bzrlib.tests.test_permissions',
 
3856
        'bzrlib.tests.test_plugins',
 
3857
        'bzrlib.tests.test_progress',
 
3858
        'bzrlib.tests.test_read_bundle',
 
3859
        'bzrlib.tests.test_reconcile',
 
3860
        'bzrlib.tests.test_reconfigure',
 
3861
        'bzrlib.tests.test_registry',
 
3862
        'bzrlib.tests.test_remote',
 
3863
        'bzrlib.tests.test_rename_map',
 
3864
        'bzrlib.tests.test_repository',
 
3865
        'bzrlib.tests.test_revert',
 
3866
        'bzrlib.tests.test_revision',
 
3867
        'bzrlib.tests.test_revisionspec',
 
3868
        'bzrlib.tests.test_revisiontree',
 
3869
        'bzrlib.tests.test_rio',
 
3870
        'bzrlib.tests.test_rules',
 
3871
        'bzrlib.tests.test_sampler',
 
3872
        'bzrlib.tests.test_script',
 
3873
        'bzrlib.tests.test_selftest',
 
3874
        'bzrlib.tests.test_serializer',
 
3875
        'bzrlib.tests.test_setup',
 
3876
        'bzrlib.tests.test_sftp_transport',
 
3877
        'bzrlib.tests.test_shelf',
 
3878
        'bzrlib.tests.test_shelf_ui',
 
3879
        'bzrlib.tests.test_smart',
 
3880
        'bzrlib.tests.test_smart_add',
 
3881
        'bzrlib.tests.test_smart_request',
 
3882
        'bzrlib.tests.test_smart_transport',
 
3883
        'bzrlib.tests.test_smtp_connection',
 
3884
        'bzrlib.tests.test_source',
 
3885
        'bzrlib.tests.test_ssh_transport',
 
3886
        'bzrlib.tests.test_status',
 
3887
        'bzrlib.tests.test_store',
 
3888
        'bzrlib.tests.test_strace',
 
3889
        'bzrlib.tests.test_subsume',
 
3890
        'bzrlib.tests.test_switch',
 
3891
        'bzrlib.tests.test_symbol_versioning',
 
3892
        'bzrlib.tests.test_tag',
 
3893
        'bzrlib.tests.test_testament',
 
3894
        'bzrlib.tests.test_textfile',
 
3895
        'bzrlib.tests.test_textmerge',
 
3896
        'bzrlib.tests.test_timestamp',
 
3897
        'bzrlib.tests.test_trace',
 
3898
        'bzrlib.tests.test_transactions',
 
3899
        'bzrlib.tests.test_transform',
 
3900
        'bzrlib.tests.test_transport',
 
3901
        'bzrlib.tests.test_transport_log',
 
3902
        'bzrlib.tests.test_tree',
 
3903
        'bzrlib.tests.test_treebuilder',
 
3904
        'bzrlib.tests.test_tsort',
 
3905
        'bzrlib.tests.test_tuned_gzip',
 
3906
        'bzrlib.tests.test_ui',
 
3907
        'bzrlib.tests.test_uncommit',
 
3908
        'bzrlib.tests.test_upgrade',
 
3909
        'bzrlib.tests.test_upgrade_stacked',
 
3910
        'bzrlib.tests.test_urlutils',
 
3911
        'bzrlib.tests.test_version',
 
3912
        'bzrlib.tests.test_version_info',
 
3913
        'bzrlib.tests.test_weave',
 
3914
        'bzrlib.tests.test_whitebox',
 
3915
        'bzrlib.tests.test_win32utils',
 
3916
        'bzrlib.tests.test_workingtree',
 
3917
        'bzrlib.tests.test_workingtree_4',
 
3918
        'bzrlib.tests.test_wsgi',
 
3919
        'bzrlib.tests.test_xml',
 
3920
        ]
 
3921
 
 
3922
 
 
3923
def _test_suite_modules_to_doctest():
 
3924
    """Return the list of modules to doctest."""   
 
3925
    return [
 
3926
        'bzrlib',
 
3927
        'bzrlib.branchbuilder',
 
3928
        'bzrlib.export',
 
3929
        'bzrlib.inventory',
 
3930
        'bzrlib.iterablefile',
 
3931
        'bzrlib.lockdir',
 
3932
        'bzrlib.merge3',
 
3933
        'bzrlib.option',
 
3934
        'bzrlib.symbol_versioning',
 
3935
        'bzrlib.tests',
 
3936
        'bzrlib.timestamp',
 
3937
        'bzrlib.version_info_formats.format_custom',
 
3938
        ]
 
3939
 
 
3940
 
 
3941
def test_suite(keep_only=None, starting_with=None):
 
3942
    """Build and return TestSuite for the whole of bzrlib.
 
3943
 
 
3944
    :param keep_only: A list of test ids limiting the suite returned.
 
3945
 
 
3946
    :param starting_with: An id limiting the suite returned to the tests
 
3947
         starting with it.
 
3948
 
 
3949
    This function can be replaced if you need to change the default test
 
3950
    suite on a global basis, but it is not encouraged.
 
3951
    """
 
3952
 
 
3953
    loader = TestUtil.TestLoader()
 
3954
 
 
3955
    if keep_only is not None:
 
3956
        id_filter = TestIdList(keep_only)
 
3957
    if starting_with:
 
3958
        # We take precedence over keep_only because *at loading time* using
 
3959
        # both options means we will load less tests for the same final result.
 
3960
        def interesting_module(name):
 
3961
            for start in starting_with:
 
3962
                if (
 
3963
                    # Either the module name starts with the specified string
 
3964
                    name.startswith(start)
 
3965
                    # or it may contain tests starting with the specified string
 
3966
                    or start.startswith(name)
 
3967
                    ):
 
3968
                    return True
 
3969
            return False
 
3970
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3971
 
 
3972
    elif keep_only is not None:
 
3973
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3974
        def interesting_module(name):
 
3975
            return id_filter.refers_to(name)
 
3976
 
 
3977
    else:
 
3978
        loader = TestUtil.TestLoader()
 
3979
        def interesting_module(name):
 
3980
            # No filtering, all modules are interesting
 
3981
            return True
 
3982
 
 
3983
    suite = loader.suiteClass()
 
3984
 
 
3985
    # modules building their suite with loadTestsFromModuleNames
 
3986
    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
 
3987
 
 
3988
    for mod in _test_suite_modules_to_doctest():
 
3989
        if not interesting_module(mod):
 
3990
            # No tests to keep here, move along
 
3991
            continue
 
3992
        try:
 
3993
            # note that this really does mean "report only" -- doctest
 
3994
            # still runs the rest of the examples
 
3995
            doc_suite = doctest.DocTestSuite(mod,
 
3996
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
 
3997
        except ValueError, e:
 
3998
            print '**failed to get doctest for: %s\n%s' % (mod, e)
 
3999
            raise
 
4000
        if len(doc_suite._tests) == 0:
 
4001
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
4002
        suite.addTest(doc_suite)
 
4003
 
 
4004
    default_encoding = sys.getdefaultencoding()
 
4005
    for name, plugin in bzrlib.plugin.plugins().items():
 
4006
        if not interesting_module(plugin.module.__name__):
 
4007
            continue
 
4008
        plugin_suite = plugin.test_suite()
 
4009
        # We used to catch ImportError here and turn it into just a warning,
 
4010
        # but really if you don't have --no-plugins this should be a failure.
 
4011
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
4012
        if plugin_suite is None:
 
4013
            plugin_suite = plugin.load_plugin_tests(loader)
 
4014
        if plugin_suite is not None:
 
4015
            suite.addTest(plugin_suite)
 
4016
        if default_encoding != sys.getdefaultencoding():
 
4017
            bzrlib.trace.warning(
 
4018
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
4019
                sys.getdefaultencoding())
 
4020
            reload(sys)
 
4021
            sys.setdefaultencoding(default_encoding)
 
4022
 
 
4023
    if keep_only is not None:
 
4024
        # Now that the referred modules have loaded their tests, keep only the
 
4025
        # requested ones.
 
4026
        suite = filter_suite_by_id_list(suite, id_filter)
 
4027
        # Do some sanity checks on the id_list filtering
 
4028
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
4029
        if starting_with:
 
4030
            # The tester has used both keep_only and starting_with, so he is
 
4031
            # already aware that some tests are excluded from the list, there
 
4032
            # is no need to tell him which.
 
4033
            pass
 
4034
        else:
 
4035
            # Some tests mentioned in the list are not in the test suite. The
 
4036
            # list may be out of date, report to the tester.
 
4037
            for id in not_found:
 
4038
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
4039
        for id in duplicates:
 
4040
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
4041
 
356
4042
    return suite
357
4043
 
 
4044
 
 
4045
def multiply_scenarios(scenarios_left, scenarios_right):
 
4046
    """Multiply two sets of scenarios.
 
4047
 
 
4048
    :returns: the cartesian product of the two sets of scenarios, that is
 
4049
        a scenario for every possible combination of a left scenario and a
 
4050
        right scenario.
 
4051
    """
 
4052
    return [
 
4053
        ('%s,%s' % (left_name, right_name),
 
4054
         dict(left_dict.items() + right_dict.items()))
 
4055
        for left_name, left_dict in scenarios_left
 
4056
        for right_name, right_dict in scenarios_right]
 
4057
 
 
4058
 
 
4059
def multiply_tests(tests, scenarios, result):
 
4060
    """Multiply tests_list by scenarios into result.
 
4061
 
 
4062
    This is the core workhorse for test parameterisation.
 
4063
 
 
4064
    Typically the load_tests() method for a per-implementation test suite will
 
4065
    call multiply_tests and return the result.
 
4066
 
 
4067
    :param tests: The tests to parameterise.
 
4068
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
 
4069
        scenario_param_dict).
 
4070
    :param result: A TestSuite to add created tests to.
 
4071
 
 
4072
    This returns the passed in result TestSuite with the cross product of all
 
4073
    the tests repeated once for each scenario.  Each test is adapted by adding
 
4074
    the scenario name at the end of its id(), and updating the test object's
 
4075
    __dict__ with the scenario_param_dict.
 
4076
 
 
4077
    >>> import bzrlib.tests.test_sampler
 
4078
    >>> r = multiply_tests(
 
4079
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
4080
    ...     [('one', dict(param=1)),
 
4081
    ...      ('two', dict(param=2))],
 
4082
    ...     TestSuite())
 
4083
    >>> tests = list(iter_suite_tests(r))
 
4084
    >>> len(tests)
 
4085
    2
 
4086
    >>> tests[0].id()
 
4087
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
4088
    >>> tests[0].param
 
4089
    1
 
4090
    >>> tests[1].param
 
4091
    2
 
4092
    """
 
4093
    for test in iter_suite_tests(tests):
 
4094
        apply_scenarios(test, scenarios, result)
 
4095
    return result
 
4096
 
 
4097
 
 
4098
def apply_scenarios(test, scenarios, result):
 
4099
    """Apply the scenarios in scenarios to test and add to result.
 
4100
 
 
4101
    :param test: The test to apply scenarios to.
 
4102
    :param scenarios: An iterable of scenarios to apply to test.
 
4103
    :return: result
 
4104
    :seealso: apply_scenario
 
4105
    """
 
4106
    for scenario in scenarios:
 
4107
        result.addTest(apply_scenario(test, scenario))
 
4108
    return result
 
4109
 
 
4110
 
 
4111
def apply_scenario(test, scenario):
 
4112
    """Copy test and apply scenario to it.
 
4113
 
 
4114
    :param test: A test to adapt.
 
4115
    :param scenario: A tuple describing the scenarion.
 
4116
        The first element of the tuple is the new test id.
 
4117
        The second element is a dict containing attributes to set on the
 
4118
        test.
 
4119
    :return: The adapted test.
 
4120
    """
 
4121
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
4122
    new_test = clone_test(test, new_id)
 
4123
    for name, value in scenario[1].items():
 
4124
        setattr(new_test, name, value)
 
4125
    return new_test
 
4126
 
 
4127
 
 
4128
def clone_test(test, new_id):
 
4129
    """Clone a test giving it a new id.
 
4130
 
 
4131
    :param test: The test to clone.
 
4132
    :param new_id: The id to assign to it.
 
4133
    :return: The new test.
 
4134
    """
 
4135
    new_test = copy(test)
 
4136
    new_test.id = lambda: new_id
 
4137
    return new_test
 
4138
 
 
4139
 
 
4140
def _rmtree_temp_dir(dirname, test_id=None):
 
4141
    # If LANG=C we probably have created some bogus paths
 
4142
    # which rmtree(unicode) will fail to delete
 
4143
    # so make sure we are using rmtree(str) to delete everything
 
4144
    # except on win32, where rmtree(str) will fail
 
4145
    # since it doesn't have the property of byte-stream paths
 
4146
    # (they are either ascii or mbcs)
 
4147
    if sys.platform == 'win32':
 
4148
        # make sure we are using the unicode win32 api
 
4149
        dirname = unicode(dirname)
 
4150
    else:
 
4151
        dirname = dirname.encode(sys.getfilesystemencoding())
 
4152
    try:
 
4153
        osutils.rmtree(dirname)
 
4154
    except OSError, e:
 
4155
        # We don't want to fail here because some useful display will be lost
 
4156
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
4157
        # possible info to the test runner is even worse.
 
4158
        if test_id != None:
 
4159
            ui.ui_factory.clear_term()
 
4160
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
 
4161
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
4162
                         % (os.path.basename(dirname), e))
 
4163
 
 
4164
 
 
4165
class Feature(object):
 
4166
    """An operating system Feature."""
 
4167
 
 
4168
    def __init__(self):
 
4169
        self._available = None
 
4170
 
 
4171
    def available(self):
 
4172
        """Is the feature available?
 
4173
 
 
4174
        :return: True if the feature is available.
 
4175
        """
 
4176
        if self._available is None:
 
4177
            self._available = self._probe()
 
4178
        return self._available
 
4179
 
 
4180
    def _probe(self):
 
4181
        """Implement this method in concrete features.
 
4182
 
 
4183
        :return: True if the feature is available.
 
4184
        """
 
4185
        raise NotImplementedError
 
4186
 
 
4187
    def __str__(self):
 
4188
        if getattr(self, 'feature_name', None):
 
4189
            return self.feature_name()
 
4190
        return self.__class__.__name__
 
4191
 
 
4192
 
 
4193
class _SymlinkFeature(Feature):
 
4194
 
 
4195
    def _probe(self):
 
4196
        return osutils.has_symlinks()
 
4197
 
 
4198
    def feature_name(self):
 
4199
        return 'symlinks'
 
4200
 
 
4201
SymlinkFeature = _SymlinkFeature()
 
4202
 
 
4203
 
 
4204
class _HardlinkFeature(Feature):
 
4205
 
 
4206
    def _probe(self):
 
4207
        return osutils.has_hardlinks()
 
4208
 
 
4209
    def feature_name(self):
 
4210
        return 'hardlinks'
 
4211
 
 
4212
HardlinkFeature = _HardlinkFeature()
 
4213
 
 
4214
 
 
4215
class _OsFifoFeature(Feature):
 
4216
 
 
4217
    def _probe(self):
 
4218
        return getattr(os, 'mkfifo', None)
 
4219
 
 
4220
    def feature_name(self):
 
4221
        return 'filesystem fifos'
 
4222
 
 
4223
OsFifoFeature = _OsFifoFeature()
 
4224
 
 
4225
 
 
4226
class _UnicodeFilenameFeature(Feature):
 
4227
    """Does the filesystem support Unicode filenames?"""
 
4228
 
 
4229
    def _probe(self):
 
4230
        try:
 
4231
            # Check for character combinations unlikely to be covered by any
 
4232
            # single non-unicode encoding. We use the characters
 
4233
            # - greek small letter alpha (U+03B1) and
 
4234
            # - braille pattern dots-123456 (U+283F).
 
4235
            os.stat(u'\u03b1\u283f')
 
4236
        except UnicodeEncodeError:
 
4237
            return False
 
4238
        except (IOError, OSError):
 
4239
            # The filesystem allows the Unicode filename but the file doesn't
 
4240
            # exist.
 
4241
            return True
 
4242
        else:
 
4243
            # The filesystem allows the Unicode filename and the file exists,
 
4244
            # for some reason.
 
4245
            return True
 
4246
 
 
4247
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
4248
 
 
4249
 
 
4250
class ModuleAvailableFeature(Feature):
 
4251
    """This is a feature than describes a module we want to be available.
 
4252
 
 
4253
    Declare the name of the module in __init__(), and then after probing, the
 
4254
    module will be available as 'self.module'.
 
4255
 
 
4256
    :ivar module: The module if it is available, else None.
 
4257
    """
 
4258
 
 
4259
    def __init__(self, module_name):
 
4260
        super(ModuleAvailableFeature, self).__init__()
 
4261
        self.module_name = module_name
 
4262
 
 
4263
    def _probe(self):
 
4264
        try:
 
4265
            self._module = __import__(self.module_name, {}, {}, [''])
 
4266
            return True
 
4267
        except ImportError:
 
4268
            return False
 
4269
 
 
4270
    @property
 
4271
    def module(self):
 
4272
        if self.available(): # Make sure the probe has been done
 
4273
            return self._module
 
4274
        return None
 
4275
    
 
4276
    def feature_name(self):
 
4277
        return self.module_name
 
4278
 
 
4279
 
 
4280
 
 
4281
def probe_unicode_in_user_encoding():
 
4282
    """Try to encode several unicode strings to use in unicode-aware tests.
 
4283
    Return first successfull match.
 
4284
 
 
4285
    :return:  (unicode value, encoded plain string value) or (None, None)
 
4286
    """
 
4287
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
4288
    for uni_val in possible_vals:
 
4289
        try:
 
4290
            str_val = uni_val.encode(osutils.get_user_encoding())
 
4291
        except UnicodeEncodeError:
 
4292
            # Try a different character
 
4293
            pass
 
4294
        else:
 
4295
            return uni_val, str_val
 
4296
    return None, None
 
4297
 
 
4298
 
 
4299
def probe_bad_non_ascii(encoding):
 
4300
    """Try to find [bad] character with code [128..255]
 
4301
    that cannot be decoded to unicode in some encoding.
 
4302
    Return None if all non-ascii characters is valid
 
4303
    for given encoding.
 
4304
    """
 
4305
    for i in xrange(128, 256):
 
4306
        char = chr(i)
 
4307
        try:
 
4308
            char.decode(encoding)
 
4309
        except UnicodeDecodeError:
 
4310
            return char
 
4311
    return None
 
4312
 
 
4313
 
 
4314
class _HTTPSServerFeature(Feature):
 
4315
    """Some tests want an https Server, check if one is available.
 
4316
 
 
4317
    Right now, the only way this is available is under python2.6 which provides
 
4318
    an ssl module.
 
4319
    """
 
4320
 
 
4321
    def _probe(self):
 
4322
        try:
 
4323
            import ssl
 
4324
            return True
 
4325
        except ImportError:
 
4326
            return False
 
4327
 
 
4328
    def feature_name(self):
 
4329
        return 'HTTPSServer'
 
4330
 
 
4331
 
 
4332
HTTPSServerFeature = _HTTPSServerFeature()
 
4333
 
 
4334
 
 
4335
class _ParamikoFeature(Feature):
 
4336
    """Is paramiko available?"""
 
4337
 
 
4338
    def _probe(self):
 
4339
        try:
 
4340
            from bzrlib.transport.sftp import SFTPAbsoluteServer
 
4341
            return True
 
4342
        except errors.ParamikoNotPresent:
 
4343
            return False
 
4344
 
 
4345
    def feature_name(self):
 
4346
        return "Paramiko"
 
4347
 
 
4348
 
 
4349
ParamikoFeature = _ParamikoFeature()
 
4350
 
 
4351
 
 
4352
class _UnicodeFilename(Feature):
 
4353
    """Does the filesystem support Unicode filenames?"""
 
4354
 
 
4355
    def _probe(self):
 
4356
        try:
 
4357
            os.stat(u'\u03b1')
 
4358
        except UnicodeEncodeError:
 
4359
            return False
 
4360
        except (IOError, OSError):
 
4361
            # The filesystem allows the Unicode filename but the file doesn't
 
4362
            # exist.
 
4363
            return True
 
4364
        else:
 
4365
            # The filesystem allows the Unicode filename and the file exists,
 
4366
            # for some reason.
 
4367
            return True
 
4368
 
 
4369
UnicodeFilename = _UnicodeFilename()
 
4370
 
 
4371
 
 
4372
class _UTF8Filesystem(Feature):
 
4373
    """Is the filesystem UTF-8?"""
 
4374
 
 
4375
    def _probe(self):
 
4376
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
4377
            return True
 
4378
        return False
 
4379
 
 
4380
UTF8Filesystem = _UTF8Filesystem()
 
4381
 
 
4382
 
 
4383
class _BreakinFeature(Feature):
 
4384
    """Does this platform support the breakin feature?"""
 
4385
 
 
4386
    def _probe(self):
 
4387
        from bzrlib import breakin
 
4388
        if breakin.determine_signal() is None:
 
4389
            return False
 
4390
        if sys.platform == 'win32':
 
4391
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4392
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4393
            # access the function
 
4394
            try:
 
4395
                import ctypes
 
4396
            except OSError:
 
4397
                return False
 
4398
        return True
 
4399
 
 
4400
    def feature_name(self):
 
4401
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4402
 
 
4403
 
 
4404
BreakinFeature = _BreakinFeature()
 
4405
 
 
4406
 
 
4407
class _CaseInsCasePresFilenameFeature(Feature):
 
4408
    """Is the file-system case insensitive, but case-preserving?"""
 
4409
 
 
4410
    def _probe(self):
 
4411
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4412
        try:
 
4413
            # first check truly case-preserving for created files, then check
 
4414
            # case insensitive when opening existing files.
 
4415
            name = osutils.normpath(name)
 
4416
            base, rel = osutils.split(name)
 
4417
            found_rel = osutils.canonical_relpath(base, name)
 
4418
            return (found_rel == rel
 
4419
                    and os.path.isfile(name.upper())
 
4420
                    and os.path.isfile(name.lower()))
 
4421
        finally:
 
4422
            os.close(fileno)
 
4423
            os.remove(name)
 
4424
 
 
4425
    def feature_name(self):
 
4426
        return "case-insensitive case-preserving filesystem"
 
4427
 
 
4428
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4429
 
 
4430
 
 
4431
class _CaseInsensitiveFilesystemFeature(Feature):
 
4432
    """Check if underlying filesystem is case-insensitive but *not* case
 
4433
    preserving.
 
4434
    """
 
4435
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4436
    # more likely to be case preserving, so this case is rare.
 
4437
 
 
4438
    def _probe(self):
 
4439
        if CaseInsCasePresFilenameFeature.available():
 
4440
            return False
 
4441
 
 
4442
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4443
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4444
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4445
        else:
 
4446
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4447
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4448
            dir=root)
 
4449
        name_a = osutils.pathjoin(tdir, 'a')
 
4450
        name_A = osutils.pathjoin(tdir, 'A')
 
4451
        os.mkdir(name_a)
 
4452
        result = osutils.isdir(name_A)
 
4453
        _rmtree_temp_dir(tdir)
 
4454
        return result
 
4455
 
 
4456
    def feature_name(self):
 
4457
        return 'case-insensitive filesystem'
 
4458
 
 
4459
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4460
 
 
4461
 
 
4462
class _SubUnitFeature(Feature):
 
4463
    """Check if subunit is available."""
 
4464
 
 
4465
    def _probe(self):
 
4466
        try:
 
4467
            import subunit
 
4468
            return True
 
4469
        except ImportError:
 
4470
            return False
 
4471
 
 
4472
    def feature_name(self):
 
4473
        return 'subunit'
 
4474
 
 
4475
SubUnitFeature = _SubUnitFeature()
 
4476
# Only define SubUnitBzrRunner if subunit is available.
 
4477
try:
 
4478
    from subunit import TestProtocolClient
 
4479
    class SubUnitBzrRunner(TextTestRunner):
 
4480
        def run(self, test):
 
4481
            result = BzrAutoTimingTestResultDecorator(
 
4482
                TestProtocolClient(self.stream))
 
4483
            test.run(result)
 
4484
            return result
 
4485
except ImportError:
 
4486
    pass