~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Ian Clatworthy
  • Date: 2009-09-09 11:43:10 UTC
  • mto: (4634.37.2 prepare-2.0)
  • mto: This revision was merged to the branch mainline in revision 4689.
  • Revision ID: ian.clatworthy@canonical.com-20090909114310-glw7tv76i5gnx9pt
put rules back in Makefile supporting plain-style docs

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
# TODO: Perhaps there should be an API to find out if bzr running under the
33
33
import doctest
34
34
import errno
35
35
import logging
 
36
import math
36
37
import os
37
38
from pprint import pformat
38
39
import random
39
40
import re
40
41
import shlex
41
42
import stat
42
 
from subprocess import Popen, PIPE
 
43
from subprocess import Popen, PIPE, STDOUT
43
44
import sys
44
45
import tempfile
 
46
import threading
 
47
import time
45
48
import unittest
46
 
import time
47
49
import warnings
48
50
 
49
51
 
50
52
from bzrlib import (
 
53
    branchbuilder,
51
54
    bzrdir,
52
55
    debug,
53
56
    errors,
 
57
    hooks,
 
58
    lock as _mod_lock,
54
59
    memorytree,
55
60
    osutils,
56
61
    progress,
57
62
    ui,
58
63
    urlutils,
 
64
    registry,
59
65
    workingtree,
60
66
    )
61
67
import bzrlib.branch
72
78
    pass
73
79
from bzrlib.merge import merge_inner
74
80
import bzrlib.merge3
75
 
import bzrlib.osutils
76
81
import bzrlib.plugin
77
 
from bzrlib.revision import common_ancestor
 
82
from bzrlib.smart import client, request, server
78
83
import bzrlib.store
79
84
from bzrlib import symbol_versioning
80
85
from bzrlib.symbol_versioning import (
 
86
    DEPRECATED_PARAMETER,
 
87
    deprecated_function,
81
88
    deprecated_method,
82
 
    zero_eighteen,
83
 
    zero_ninetyone,
 
89
    deprecated_passed,
84
90
    )
85
91
import bzrlib.trace
86
92
from bzrlib.transport import get_transport
90
96
from bzrlib.transport.readonly import ReadonlyServer
91
97
from bzrlib.trace import mutter, note
92
98
from bzrlib.tests import TestUtil
93
 
from bzrlib.tests.HttpServer import HttpServer
 
99
from bzrlib.tests.http_server import HttpServer
94
100
from bzrlib.tests.TestUtil import (
95
101
                          TestSuite,
96
102
                          TestLoader,
97
103
                          )
98
104
from bzrlib.tests.treeshape import build_tree_contents
 
105
from bzrlib.ui import NullProgressView
 
106
from bzrlib.ui.text import TextUIFactory
 
107
import bzrlib.version_info_formats.format_custom
99
108
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
100
109
 
101
110
# Mark this python module as being part of the implementation
105
114
 
106
115
default_transport = LocalURLServer
107
116
 
108
 
MODULES_TO_TEST = []
109
 
MODULES_TO_DOCTEST = [
110
 
        bzrlib.timestamp,
111
 
        bzrlib.errors,
112
 
        bzrlib.export,
113
 
        bzrlib.inventory,
114
 
        bzrlib.iterablefile,
115
 
        bzrlib.lockdir,
116
 
        bzrlib.merge3,
117
 
        bzrlib.option,
118
 
        bzrlib.store,
119
 
        # quoted to avoid module-loading circularity
120
 
        'bzrlib.tests',
121
 
        ]
122
 
 
123
 
 
124
 
def packages_to_test():
125
 
    """Return a list of packages to test.
126
 
 
127
 
    The packages are not globally imported so that import failures are
128
 
    triggered when running selftest, not when importing the command.
129
 
    """
130
 
    import bzrlib.doc
131
 
    import bzrlib.tests.blackbox
132
 
    import bzrlib.tests.branch_implementations
133
 
    import bzrlib.tests.bzrdir_implementations
134
 
    import bzrlib.tests.commands
135
 
    import bzrlib.tests.interrepository_implementations
136
 
    import bzrlib.tests.interversionedfile_implementations
137
 
    import bzrlib.tests.intertree_implementations
138
 
    import bzrlib.tests.per_lock
139
 
    import bzrlib.tests.repository_implementations
140
 
    import bzrlib.tests.revisionstore_implementations
141
 
    import bzrlib.tests.tree_implementations
142
 
    import bzrlib.tests.workingtree_implementations
143
 
    return [
144
 
            bzrlib.doc,
145
 
            bzrlib.tests.blackbox,
146
 
            bzrlib.tests.branch_implementations,
147
 
            bzrlib.tests.bzrdir_implementations,
148
 
            bzrlib.tests.commands,
149
 
            bzrlib.tests.interrepository_implementations,
150
 
            bzrlib.tests.interversionedfile_implementations,
151
 
            bzrlib.tests.intertree_implementations,
152
 
            bzrlib.tests.per_lock,
153
 
            bzrlib.tests.repository_implementations,
154
 
            bzrlib.tests.revisionstore_implementations,
155
 
            bzrlib.tests.tree_implementations,
156
 
            bzrlib.tests.workingtree_implementations,
157
 
            ]
 
117
# Subunit result codes, defined here to prevent a hard dependency on subunit.
 
118
SUBUNIT_SEEK_SET = 0
 
119
SUBUNIT_SEEK_CUR = 1
158
120
 
159
121
 
160
122
class ExtendedTestResult(unittest._TextTestResult):
161
123
    """Accepts, reports and accumulates the results of running tests.
162
124
 
163
 
    Compared to this unittest version this class adds support for
 
125
    Compared to the unittest version this class adds support for
164
126
    profiling, benchmarking, stopping as soon as a test fails,  and
165
127
    skipping tests.  There are further-specialized subclasses for
166
128
    different types of display.
174
136
    """
175
137
 
176
138
    stop_early = False
177
 
    
 
139
 
178
140
    def __init__(self, stream, descriptions, verbosity,
179
141
                 bench_history=None,
180
 
                 num_tests=None,
 
142
                 strict=False,
181
143
                 ):
182
144
        """Construct new TestResult.
183
145
 
201
163
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
202
164
        self._bench_history = bench_history
203
165
        self.ui = ui.ui_factory
204
 
        self.num_tests = num_tests
 
166
        self.num_tests = 0
205
167
        self.error_count = 0
206
168
        self.failure_count = 0
207
169
        self.known_failure_count = 0
210
172
        self.unsupported = {}
211
173
        self.count = 0
212
174
        self._overall_start_time = time.time()
213
 
    
 
175
        self._strict = strict
 
176
 
 
177
    def done(self):
 
178
        # nb: called stopTestRun in the version of this that Python merged
 
179
        # upstream, according to lifeless 20090803
 
180
        if self._strict:
 
181
            ok = self.wasStrictlySuccessful()
 
182
        else:
 
183
            ok = self.wasSuccessful()
 
184
        if ok:
 
185
            self.stream.write('tests passed\n')
 
186
        else:
 
187
            self.stream.write('tests failed\n')
 
188
        if TestCase._first_thread_leaker_id:
 
189
            self.stream.write(
 
190
                '%s is leaking threads among %d leaking tests.\n' % (
 
191
                TestCase._first_thread_leaker_id,
 
192
                TestCase._leaking_threads_tests))
 
193
 
214
194
    def _extractBenchmarkTime(self, testCase):
215
195
        """Add a benchmark time for the current test case."""
216
196
        return getattr(testCase, "_benchtime", None)
217
 
    
 
197
 
218
198
    def _elapsedTestTimeString(self):
219
199
        """Return a time string for the overall time the current test has taken."""
220
200
        return self._formatTime(time.time() - self._start_time)
222
202
    def _testTimeString(self, testCase):
223
203
        benchmark_time = self._extractBenchmarkTime(testCase)
224
204
        if benchmark_time is not None:
225
 
            return "%s/%s" % (
226
 
                self._formatTime(benchmark_time),
227
 
                self._elapsedTestTimeString())
 
205
            return self._formatTime(benchmark_time) + "*"
228
206
        else:
229
 
            return "           %s" % self._elapsedTestTimeString()
 
207
            return self._elapsedTestTimeString()
230
208
 
231
209
    def _formatTime(self, seconds):
232
210
        """Format seconds as milliseconds with leading spaces."""
241
219
 
242
220
    def startTest(self, test):
243
221
        unittest.TestResult.startTest(self, test)
 
222
        if self.count == 0:
 
223
            self.startTests()
244
224
        self.report_test_start(test)
245
225
        test.number = self.count
246
226
        self._recordTestStartTime()
247
227
 
 
228
    def startTests(self):
 
229
        import platform
 
230
        if getattr(sys, 'frozen', None) is None:
 
231
            bzr_path = osutils.realpath(sys.argv[0])
 
232
        else:
 
233
            bzr_path = sys.executable
 
234
        self.stream.write(
 
235
            'testing: %s\n' % (bzr_path,))
 
236
        self.stream.write(
 
237
            '   %s\n' % (
 
238
                    bzrlib.__path__[0],))
 
239
        self.stream.write(
 
240
            '   bzr-%s python-%s %s\n' % (
 
241
                    bzrlib.version_string,
 
242
                    bzrlib._format_version_tuple(sys.version_info),
 
243
                    platform.platform(aliased=1),
 
244
                    ))
 
245
        self.stream.write('\n')
 
246
 
248
247
    def _recordTestStartTime(self):
249
248
        """Record that a test has started."""
250
249
        self._start_time = time.time()
263
262
        fails with an unexpected error.
264
263
        """
265
264
        self._testConcluded(test)
266
 
        if isinstance(err[1], TestSkipped):
267
 
            return self._addSkipped(test, err)
 
265
        if isinstance(err[1], TestNotApplicable):
 
266
            return self._addNotApplicable(test, err)
268
267
        elif isinstance(err[1], UnavailableFeature):
269
268
            return self.addNotSupported(test, err[1].args[0])
270
269
        else:
273
272
            self.report_error(test, err)
274
273
            if self.stop_early:
275
274
                self.stop()
 
275
            self._cleanupLogFile(test)
276
276
 
277
277
    def addFailure(self, test, err):
278
278
        """Tell result that test failed.
289
289
            self.report_failure(test, err)
290
290
            if self.stop_early:
291
291
                self.stop()
 
292
            self._cleanupLogFile(test)
292
293
 
293
294
    def addSuccess(self, test):
294
295
        """Tell result that test completed successfully.
303
304
                    self._formatTime(benchmark_time),
304
305
                    test.id()))
305
306
        self.report_success(test)
 
307
        self._cleanupLogFile(test)
306
308
        unittest.TestResult.addSuccess(self, test)
 
309
        test._log_contents = ''
307
310
 
308
311
    def _testConcluded(self, test):
309
312
        """Common code when a test has finished.
310
313
 
311
314
        Called regardless of whether it succeded, failed, etc.
312
315
        """
313
 
        self._cleanupLogFile(test)
 
316
        pass
314
317
 
315
318
    def _addKnownFailure(self, test, err):
316
319
        self.known_failure_count += 1
320
323
        """The test will not be run because of a missing feature.
321
324
        """
322
325
        # this can be called in two different ways: it may be that the
323
 
        # test started running, and then raised (through addError) 
 
326
        # test started running, and then raised (through addError)
324
327
        # UnavailableFeature.  Alternatively this method can be called
325
328
        # while probing for features before running the tests; in that
326
329
        # case we will see startTest and stopTest, but the test will never
329
332
        self.unsupported[str(feature)] += 1
330
333
        self.report_unsupported(test, feature)
331
334
 
332
 
    def _addSkipped(self, test, skip_excinfo):
 
335
    def addSkip(self, test, reason):
 
336
        """A test has not run for 'reason'."""
 
337
        self.skip_count += 1
 
338
        self.report_skip(test, reason)
 
339
 
 
340
    def _addNotApplicable(self, test, skip_excinfo):
333
341
        if isinstance(skip_excinfo[1], TestNotApplicable):
334
342
            self.not_applicable_count += 1
335
343
            self.report_not_applicable(test, skip_excinfo)
336
 
        else:
337
 
            self.skip_count += 1
338
 
            self.report_skip(test, skip_excinfo)
339
344
        try:
340
345
            test.tearDown()
341
346
        except KeyboardInterrupt:
342
347
            raise
343
348
        except:
344
 
            self.addError(test, test.__exc_info())
 
349
            self.addError(test, test.exc_info())
345
350
        else:
346
351
            # seems best to treat this as success from point-of-view of unittest
347
352
            # -- it actually does nothing so it barely matters :)
348
353
            unittest.TestResult.addSuccess(self, test)
 
354
            test._log_contents = ''
349
355
 
350
356
    def printErrorList(self, flavour, errors):
351
357
        for test, err in errors:
353
359
            self.stream.write("%s: " % flavour)
354
360
            self.stream.writeln(self.getDescription(test))
355
361
            if getattr(test, '_get_log', None) is not None:
356
 
                print >>self.stream
357
 
                print >>self.stream, \
358
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
359
 
                print >>self.stream, test._get_log()
360
 
                print >>self.stream, \
361
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
362
                log_contents = test._get_log()
 
363
                if log_contents:
 
364
                    self.stream.write('\n')
 
365
                    self.stream.write(
 
366
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
367
                    self.stream.write('\n')
 
368
                    self.stream.write(log_contents)
 
369
                    self.stream.write('\n')
 
370
                    self.stream.write(
 
371
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
372
                    self.stream.write('\n')
362
373
            self.stream.writeln(self.separator2)
363
374
            self.stream.writeln("%s" % err)
364
375
 
 
376
    def progress(self, offset, whence):
 
377
        """The test is adjusting the count of tests to run."""
 
378
        if whence == SUBUNIT_SEEK_SET:
 
379
            self.num_tests = offset
 
380
        elif whence == SUBUNIT_SEEK_CUR:
 
381
            self.num_tests += offset
 
382
        else:
 
383
            raise errors.BzrError("Unknown whence %r" % whence)
 
384
 
365
385
    def finished(self):
366
386
        pass
367
387
 
374
394
    def wasStrictlySuccessful(self):
375
395
        if self.unsupported or self.known_failure_count:
376
396
            return False
377
 
 
378
397
        return self.wasSuccessful()
379
398
 
380
399
 
383
402
 
384
403
    def __init__(self, stream, descriptions, verbosity,
385
404
                 bench_history=None,
386
 
                 num_tests=None,
387
405
                 pb=None,
 
406
                 strict=None,
388
407
                 ):
389
408
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
390
 
            bench_history, num_tests)
391
 
        if pb is None:
392
 
            self.pb = self.ui.nested_progress_bar()
393
 
            self._supplied_pb = False
394
 
        else:
395
 
            self.pb = pb
396
 
            self._supplied_pb = True
 
409
            bench_history, strict)
 
410
        # We no longer pass them around, but just rely on the UIFactory stack
 
411
        # for state
 
412
        if pb is not None:
 
413
            warnings.warn("Passing pb to TextTestResult is deprecated")
 
414
        self.pb = self.ui.nested_progress_bar()
397
415
        self.pb.show_pct = False
398
416
        self.pb.show_spinner = False
399
417
        self.pb.show_eta = False,
400
418
        self.pb.show_count = False
401
419
        self.pb.show_bar = False
 
420
        self.pb.update_latency = 0
 
421
        self.pb.show_transport_activity = False
 
422
 
 
423
    def done(self):
 
424
        # called when the tests that are going to run have run
 
425
        self.pb.clear()
 
426
        super(TextTestResult, self).done()
 
427
 
 
428
    def finished(self):
 
429
        self.pb.finished()
402
430
 
403
431
    def report_starting(self):
404
 
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
432
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
 
433
 
 
434
    def printErrors(self):
 
435
        # clear the pb to make room for the error listing
 
436
        self.pb.clear()
 
437
        super(TextTestResult, self).printErrors()
405
438
 
406
439
    def _progress_prefix_text(self):
407
 
        a = '[%d' % self.count
408
 
        if self.num_tests is not None:
 
440
        # the longer this text, the less space we have to show the test
 
441
        # name...
 
442
        a = '[%d' % self.count              # total that have been run
 
443
        # tests skipped as known not to be relevant are not important enough
 
444
        # to show here
 
445
        ## if self.skip_count:
 
446
        ##     a += ', %d skip' % self.skip_count
 
447
        ## if self.known_failure_count:
 
448
        ##     a += '+%dX' % self.known_failure_count
 
449
        if self.num_tests:
409
450
            a +='/%d' % self.num_tests
410
 
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
451
        a += ' in '
 
452
        runtime = time.time() - self._overall_start_time
 
453
        if runtime >= 60:
 
454
            a += '%dm%ds' % (runtime / 60, runtime % 60)
 
455
        else:
 
456
            a += '%ds' % runtime
411
457
        if self.error_count:
412
 
            a += ', %d errors' % self.error_count
 
458
            a += ', %d err' % self.error_count
413
459
        if self.failure_count:
414
 
            a += ', %d failed' % self.failure_count
415
 
        if self.known_failure_count:
416
 
            a += ', %d known failures' % self.known_failure_count
417
 
        if self.skip_count:
418
 
            a += ', %d skipped' % self.skip_count
 
460
            a += ', %d fail' % self.failure_count
419
461
        if self.unsupported:
420
 
            a += ', %d missing features' % len(self.unsupported)
 
462
            a += ', %d missing' % len(self.unsupported)
421
463
        a += ']'
422
464
        return a
423
465
 
425
467
        self.count += 1
426
468
        self.pb.update(
427
469
                self._progress_prefix_text()
428
 
                + ' ' 
 
470
                + ' '
429
471
                + self._shortened_test_description(test))
430
472
 
431
473
    def _test_description(self, test):
432
474
        return self._shortened_test_description(test)
433
475
 
434
476
    def report_error(self, test, err):
435
 
        self.pb.note('ERROR: %s\n    %s\n', 
 
477
        self.pb.note('ERROR: %s\n    %s\n',
436
478
            self._test_description(test),
437
479
            err[1],
438
480
            )
439
481
 
440
482
    def report_failure(self, test, err):
441
 
        self.pb.note('FAIL: %s\n    %s\n', 
 
483
        self.pb.note('FAIL: %s\n    %s\n',
442
484
            self._test_description(test),
443
485
            err[1],
444
486
            )
447
489
        self.pb.note('XFAIL: %s\n%s\n',
448
490
            self._test_description(test), err[1])
449
491
 
450
 
    def report_skip(self, test, skip_excinfo):
 
492
    def report_skip(self, test, reason):
451
493
        pass
452
494
 
453
495
    def report_not_applicable(self, test, skip_excinfo):
455
497
 
456
498
    def report_unsupported(self, test, feature):
457
499
        """test cannot be run because feature is missing."""
458
 
                  
 
500
 
459
501
    def report_cleaning_up(self):
460
 
        self.pb.update('cleaning up...')
461
 
 
462
 
    def finished(self):
463
 
        if not self._supplied_pb:
464
 
            self.pb.finished()
 
502
        self.pb.update('Cleaning up')
465
503
 
466
504
 
467
505
class VerboseTestResult(ExtendedTestResult):
481
519
    def report_test_start(self, test):
482
520
        self.count += 1
483
521
        name = self._shortened_test_description(test)
484
 
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
485
 
        # numbers, plus a trailing blank
 
522
        # width needs space for 6 char status, plus 1 for slash, plus an
 
523
        # 11-char time string, plus a trailing blank
486
524
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
487
525
        self.stream.write(self._ellipsize_to_right(name,
488
 
                          osutils.terminal_width()-30))
 
526
                          osutils.terminal_width()-18))
489
527
        self.stream.flush()
490
528
 
491
529
    def _error_summary(self, err):
516
554
        # used to show the output in PQM.
517
555
        self.stream.flush()
518
556
 
519
 
    def report_skip(self, test, skip_excinfo):
 
557
    def report_skip(self, test, reason):
520
558
        self.stream.writeln(' SKIP %s\n%s'
521
 
                % (self._testTimeString(test),
522
 
                   self._error_summary(skip_excinfo)))
 
559
                % (self._testTimeString(test), reason))
523
560
 
524
561
    def report_not_applicable(self, test, skip_excinfo):
525
562
        self.stream.writeln('  N/A %s\n%s'
540
577
                 descriptions=0,
541
578
                 verbosity=1,
542
579
                 bench_history=None,
543
 
                 list_only=False
 
580
                 list_only=False,
 
581
                 strict=False,
544
582
                 ):
545
583
        self.stream = unittest._WritelnDecorator(stream)
546
584
        self.descriptions = descriptions
547
585
        self.verbosity = verbosity
548
586
        self._bench_history = bench_history
549
587
        self.list_only = list_only
 
588
        self._strict = strict
550
589
 
551
590
    def run(self, test):
552
591
        "Run the given test case or test suite."
559
598
                              self.descriptions,
560
599
                              self.verbosity,
561
600
                              bench_history=self._bench_history,
562
 
                              num_tests=test.countTestCases(),
 
601
                              strict=self._strict,
563
602
                              )
564
603
        result.stop_early = self.stop_on_failure
565
604
        result.report_starting()
570
609
            for t in iter_suite_tests(test):
571
610
                self.stream.writeln("%s" % (t.id()))
572
611
                run += 1
573
 
            actionTaken = "Listed"
574
 
        else: 
575
 
            test.run(result)
 
612
            return None
 
613
        else:
 
614
            try:
 
615
                import testtools
 
616
            except ImportError:
 
617
                test.run(result)
 
618
            else:
 
619
                if isinstance(test, testtools.ConcurrentTestSuite):
 
620
                    # We need to catch bzr specific behaviors
 
621
                    test.run(BZRTransformingResult(result))
 
622
                else:
 
623
                    test.run(result)
576
624
            run = result.testsRun
577
625
            actionTaken = "Ran"
578
626
        stopTime = time.time()
615
663
 
616
664
def iter_suite_tests(suite):
617
665
    """Return all tests in a suite, recursing through nested suites"""
618
 
    for item in suite._tests:
619
 
        if isinstance(item, unittest.TestCase):
620
 
            yield item
621
 
        elif isinstance(item, unittest.TestSuite):
 
666
    if isinstance(suite, unittest.TestCase):
 
667
        yield suite
 
668
    elif isinstance(suite, unittest.TestSuite):
 
669
        for item in suite:
622
670
            for r in iter_suite_tests(item):
623
671
                yield r
624
 
        else:
625
 
            raise Exception('unknown object %r inside test suite %r'
626
 
                            % (item, suite))
 
672
    else:
 
673
        raise Exception('unknown type %r for object %r'
 
674
                        % (type(suite), suite))
627
675
 
628
676
 
629
677
class TestSkipped(Exception):
633
681
class TestNotApplicable(TestSkipped):
634
682
    """A test is not applicable to the situation where it was run.
635
683
 
636
 
    This is only normally raised by parameterized tests, if they find that 
637
 
    the instance they're constructed upon does not support one aspect 
 
684
    This is only normally raised by parameterized tests, if they find that
 
685
    the instance they're constructed upon does not support one aspect
638
686
    of its interface.
639
687
    """
640
688
 
662
710
 
663
711
class StringIOWrapper(object):
664
712
    """A wrapper around cStringIO which just adds an encoding attribute.
665
 
    
 
713
 
666
714
    Internally we can check sys.stdout to see what the output encoding
667
715
    should be. However, cStringIO has no encoding attribute that we can
668
716
    set. So we wrap it instead.
686
734
            return setattr(self._cstring, name, val)
687
735
 
688
736
 
689
 
class TestUIFactory(ui.CLIUIFactory):
 
737
class TestUIFactory(TextUIFactory):
690
738
    """A UI Factory for testing.
691
739
 
692
740
    Hide the progress bar but emit note()s.
693
741
    Redirect stdin.
694
742
    Allows get_password to be tested without real tty attached.
 
743
 
 
744
    See also CannedInputUIFactory which lets you provide programmatic input in
 
745
    a structured way.
695
746
    """
 
747
    # TODO: Capture progress events at the model level and allow them to be
 
748
    # observed by tests that care.
 
749
    #
 
750
    # XXX: Should probably unify more with CannedInputUIFactory or a
 
751
    # particular configuration of TextUIFactory, or otherwise have a clearer
 
752
    # idea of how they're supposed to be different.
 
753
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
696
754
 
697
 
    def __init__(self,
698
 
                 stdout=None,
699
 
                 stderr=None,
700
 
                 stdin=None):
701
 
        super(TestUIFactory, self).__init__()
 
755
    def __init__(self, stdout=None, stderr=None, stdin=None):
702
756
        if stdin is not None:
703
757
            # We use a StringIOWrapper to be able to test various
704
758
            # encodings, but the user is still responsible to
705
759
            # encode the string and to set the encoding attribute
706
760
            # of StringIOWrapper.
707
 
            self.stdin = StringIOWrapper(stdin)
708
 
        if stdout is None:
709
 
            self.stdout = sys.stdout
710
 
        else:
711
 
            self.stdout = stdout
712
 
        if stderr is None:
713
 
            self.stderr = sys.stderr
714
 
        else:
715
 
            self.stderr = stderr
716
 
 
717
 
    def clear(self):
718
 
        """See progress.ProgressBar.clear()."""
719
 
 
720
 
    def clear_term(self):
721
 
        """See progress.ProgressBar.clear_term()."""
722
 
 
723
 
    def clear_term(self):
724
 
        """See progress.ProgressBar.clear_term()."""
725
 
 
726
 
    def finished(self):
727
 
        """See progress.ProgressBar.finished()."""
728
 
 
729
 
    def note(self, fmt_string, *args, **kwargs):
730
 
        """See progress.ProgressBar.note()."""
731
 
        self.stdout.write((fmt_string + "\n") % args)
732
 
 
733
 
    def progress_bar(self):
734
 
        return self
735
 
 
736
 
    def nested_progress_bar(self):
737
 
        return self
738
 
 
739
 
    def update(self, message, count=None, total=None):
740
 
        """See progress.ProgressBar.update()."""
741
 
 
742
 
    def get_non_echoed_password(self, prompt):
 
761
            stdin = StringIOWrapper(stdin)
 
762
        super(TestUIFactory, self).__init__(stdin, stdout, stderr)
 
763
 
 
764
    def get_non_echoed_password(self):
743
765
        """Get password from stdin without trying to handle the echo mode"""
744
 
        if prompt:
745
 
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
746
766
        password = self.stdin.readline()
747
767
        if not password:
748
768
            raise EOFError
750
770
            password = password[:-1]
751
771
        return password
752
772
 
 
773
    def make_progress_view(self):
 
774
        return NullProgressView()
 
775
 
753
776
 
754
777
class TestCase(unittest.TestCase):
755
778
    """Base class for bzr unit tests.
756
 
    
757
 
    Tests that need access to disk resources should subclass 
 
779
 
 
780
    Tests that need access to disk resources should subclass
758
781
    TestCaseInTempDir not TestCase.
759
782
 
760
783
    Error and debug log messages are redirected from their usual
762
785
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
763
786
    so that it can also capture file IO.  When the test completes this file
764
787
    is read into memory and removed from disk.
765
 
       
 
788
 
766
789
    There are also convenience functions to invoke bzr's command-line
767
790
    routine, and to build and check bzr trees.
768
 
   
 
791
 
769
792
    In addition to the usual method of overriding tearDown(), this class also
770
793
    allows subclasses to register functions into the _cleanups list, which is
771
794
    run in order as the object is torn down.  It's less likely this will be
772
795
    accidentally overlooked.
773
796
    """
774
797
 
 
798
    _active_threads = None
 
799
    _leaking_threads_tests = 0
 
800
    _first_thread_leaker_id = None
775
801
    _log_file_name = None
776
802
    _log_contents = ''
777
803
    _keep_log_file = False
778
804
    # record lsprof data when performing benchmark calls.
779
805
    _gather_lsprof_in_benchmarks = False
 
806
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
807
                     '_log_contents', '_log_file_name', '_benchtime',
 
808
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
780
809
 
781
810
    def __init__(self, methodName='testMethod'):
782
811
        super(TestCase, self).__init__(methodName)
783
812
        self._cleanups = []
 
813
        self._bzr_test_setUp_run = False
 
814
        self._bzr_test_tearDown_run = False
784
815
 
785
816
    def setUp(self):
786
817
        unittest.TestCase.setUp(self)
 
818
        self._bzr_test_setUp_run = True
787
819
        self._cleanEnvironment()
788
 
        bzrlib.trace.disable_default_logging()
789
820
        self._silenceUI()
790
821
        self._startLogFile()
791
822
        self._benchcalls = []
792
823
        self._benchtime = None
793
824
        self._clear_hooks()
 
825
        self._track_locks()
794
826
        self._clear_debug_flags()
 
827
        TestCase._active_threads = threading.activeCount()
 
828
        self.addCleanup(self._check_leaked_threads)
 
829
 
 
830
    def debug(self):
 
831
        # debug a frame up.
 
832
        import pdb
 
833
        pdb.Pdb().set_trace(sys._getframe().f_back)
 
834
 
 
835
    def _check_leaked_threads(self):
 
836
        active = threading.activeCount()
 
837
        leaked_threads = active - TestCase._active_threads
 
838
        TestCase._active_threads = active
 
839
        if leaked_threads:
 
840
            TestCase._leaking_threads_tests += 1
 
841
            if TestCase._first_thread_leaker_id is None:
 
842
                TestCase._first_thread_leaker_id = self.id()
795
843
 
796
844
    def _clear_debug_flags(self):
797
845
        """Prevent externally set debug flags affecting tests.
798
 
        
 
846
 
799
847
        Tests that want to use debug flags can just set them in the
800
848
        debug_flags set during setup/teardown.
801
849
        """
802
850
        self._preserved_debug_flags = set(debug.debug_flags)
803
 
        debug.debug_flags.clear()
 
851
        if 'allow_debug' not in selftest_debug_flags:
 
852
            debug.debug_flags.clear()
 
853
        if 'disable_lock_checks' not in selftest_debug_flags:
 
854
            debug.debug_flags.add('strict_locks')
804
855
        self.addCleanup(self._restore_debug_flags)
805
856
 
806
857
    def _clear_hooks(self):
807
858
        # prevent hooks affecting tests
808
 
        import bzrlib.branch
809
 
        import bzrlib.smart.server
810
 
        self._preserved_hooks = {
811
 
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
812
 
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
813
 
            }
 
859
        self._preserved_hooks = {}
 
860
        for key, factory in hooks.known_hooks.items():
 
861
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
862
            current_hooks = hooks.known_hooks_key_to_object(key)
 
863
            self._preserved_hooks[parent] = (name, current_hooks)
814
864
        self.addCleanup(self._restoreHooks)
815
 
        # reset all hooks to an empty instance of the appropriate type
816
 
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
817
 
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
865
        for key, factory in hooks.known_hooks.items():
 
866
            parent, name = hooks.known_hooks_key_to_parent_and_attribute(key)
 
867
            setattr(parent, name, factory())
 
868
        # this hook should always be installed
 
869
        request._install_hook()
818
870
 
819
871
    def _silenceUI(self):
820
872
        """Turn off UI for duration of test"""
825
877
        ui.ui_factory = ui.SilentUIFactory()
826
878
        self.addCleanup(_restore)
827
879
 
 
880
    def _check_locks(self):
 
881
        """Check that all lock take/release actions have been paired."""
 
882
        # We always check for mismatched locks. If a mismatch is found, we
 
883
        # fail unless -Edisable_lock_checks is supplied to selftest, in which
 
884
        # case we just print a warning.
 
885
        # unhook:
 
886
        acquired_locks = [lock for action, lock in self._lock_actions
 
887
                          if action == 'acquired']
 
888
        released_locks = [lock for action, lock in self._lock_actions
 
889
                          if action == 'released']
 
890
        broken_locks = [lock for action, lock in self._lock_actions
 
891
                        if action == 'broken']
 
892
        # trivially, given the tests for lock acquistion and release, if we
 
893
        # have as many in each list, it should be ok. Some lock tests also
 
894
        # break some locks on purpose and should be taken into account by
 
895
        # considering that breaking a lock is just a dirty way of releasing it.
 
896
        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
 
897
            message = ('Different number of acquired and '
 
898
                       'released or broken locks. (%s, %s + %s)' %
 
899
                       (acquired_locks, released_locks, broken_locks))
 
900
            if not self._lock_check_thorough:
 
901
                # Rather than fail, just warn
 
902
                print "Broken test %s: %s" % (self, message)
 
903
                return
 
904
            self.fail(message)
 
905
 
 
906
    def _track_locks(self):
 
907
        """Track lock activity during tests."""
 
908
        self._lock_actions = []
 
909
        if 'disable_lock_checks' in selftest_debug_flags:
 
910
            self._lock_check_thorough = False
 
911
        else:
 
912
            self._lock_check_thorough = True
 
913
            
 
914
        self.addCleanup(self._check_locks)
 
915
        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
 
916
                                                self._lock_acquired, None)
 
917
        _mod_lock.Lock.hooks.install_named_hook('lock_released',
 
918
                                                self._lock_released, None)
 
919
        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
 
920
                                                self._lock_broken, None)
 
921
 
 
922
    def _lock_acquired(self, result):
 
923
        self._lock_actions.append(('acquired', result))
 
924
 
 
925
    def _lock_released(self, result):
 
926
        self._lock_actions.append(('released', result))
 
927
 
 
928
    def _lock_broken(self, result):
 
929
        self._lock_actions.append(('broken', result))
 
930
 
828
931
    def _ndiff_strings(self, a, b):
829
932
        """Return ndiff between two strings containing lines.
830
 
        
 
933
 
831
934
        A trailing newline is added if missing to make the strings
832
935
        print properly."""
833
936
        if b and b[-1] != '\n':
858
961
 
859
962
    def assertEqualDiff(self, a, b, message=None):
860
963
        """Assert two texts are equal, if not raise an exception.
861
 
        
862
 
        This is intended for use with multi-line strings where it can 
 
964
 
 
965
        This is intended for use with multi-line strings where it can
863
966
        be hard to find the differences by eye.
864
967
        """
865
968
        # TODO: perhaps override assertEquals to call this for strings?
867
970
            return
868
971
        if message is None:
869
972
            message = "texts not equal:\n"
 
973
        if a == b + '\n':
 
974
            message = 'first string is missing a final newline.\n'
 
975
        if a + '\n' == b:
 
976
            message = 'second string is missing a final newline.\n'
870
977
        raise AssertionError(message +
871
978
                             self._ndiff_strings(a, b))
872
 
        
 
979
 
873
980
    def assertEqualMode(self, mode, mode_test):
874
981
        self.assertEqual(mode, mode_test,
875
982
                         'mode mismatch %o != %o' % (mode, mode_test))
876
983
 
 
984
    def assertEqualStat(self, expected, actual):
 
985
        """assert that expected and actual are the same stat result.
 
986
 
 
987
        :param expected: A stat result.
 
988
        :param actual: A stat result.
 
989
        :raises AssertionError: If the expected and actual stat values differ
 
990
            other than by atime.
 
991
        """
 
992
        self.assertEqual(expected.st_size, actual.st_size)
 
993
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
994
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
995
        self.assertEqual(expected.st_dev, actual.st_dev)
 
996
        self.assertEqual(expected.st_ino, actual.st_ino)
 
997
        self.assertEqual(expected.st_mode, actual.st_mode)
 
998
 
 
999
    def assertLength(self, length, obj_with_len):
 
1000
        """Assert that obj_with_len is of length length."""
 
1001
        if len(obj_with_len) != length:
 
1002
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
 
1003
                length, len(obj_with_len), obj_with_len))
 
1004
 
877
1005
    def assertPositive(self, val):
878
1006
        """Assert that val is greater than 0."""
879
1007
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
891
1019
        if not s.endswith(suffix):
892
1020
            raise AssertionError('string %r does not end with %r' % (s, suffix))
893
1021
 
894
 
    def assertContainsRe(self, haystack, needle_re):
 
1022
    def assertContainsRe(self, haystack, needle_re, flags=0):
895
1023
        """Assert that a contains something matching a regular expression."""
896
 
        if not re.search(needle_re, haystack):
 
1024
        if not re.search(needle_re, haystack, flags):
897
1025
            if '\n' in haystack or len(haystack) > 60:
898
1026
                # a long string, format it in a more readable way
899
1027
                raise AssertionError(
903
1031
                raise AssertionError('pattern "%s" not found in "%s"'
904
1032
                        % (needle_re, haystack))
905
1033
 
906
 
    def assertNotContainsRe(self, haystack, needle_re):
 
1034
    def assertNotContainsRe(self, haystack, needle_re, flags=0):
907
1035
        """Assert that a does not match a regular expression"""
908
 
        if re.search(needle_re, haystack):
 
1036
        if re.search(needle_re, haystack, flags):
909
1037
            raise AssertionError('pattern "%s" found in "%s"'
910
1038
                    % (needle_re, haystack))
911
1039
 
918
1046
 
919
1047
    def assertListRaises(self, excClass, func, *args, **kwargs):
920
1048
        """Fail unless excClass is raised when the iterator from func is used.
921
 
        
 
1049
 
922
1050
        Many functions can return generators this makes sure
923
1051
        to wrap them in a list() call to make sure the whole generator
924
1052
        is run, and that the proper exception is raised.
925
1053
        """
926
1054
        try:
927
1055
            list(func(*args, **kwargs))
928
 
        except excClass:
929
 
            return
 
1056
        except excClass, e:
 
1057
            return e
930
1058
        else:
931
1059
            if getattr(excClass,'__name__', None) is not None:
932
1060
                excName = excClass.__name__
971
1099
                raise AssertionError("%r is %r." % (left, right))
972
1100
 
973
1101
    def assertTransportMode(self, transport, path, mode):
974
 
        """Fail if a path does not have mode mode.
975
 
        
 
1102
        """Fail if a path does not have mode "mode".
 
1103
 
976
1104
        If modes are not supported on this transport, the assertion is ignored.
977
1105
        """
978
1106
        if not transport._can_roundtrip_unix_modebits():
980
1108
        path_stat = transport.stat(path)
981
1109
        actual_mode = stat.S_IMODE(path_stat.st_mode)
982
1110
        self.assertEqual(mode, actual_mode,
983
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
984
 
 
985
 
    def assertIsInstance(self, obj, kls):
986
 
        """Fail if obj is not an instance of kls"""
 
1111
                         'mode of %r incorrect (%s != %s)'
 
1112
                         % (path, oct(mode), oct(actual_mode)))
 
1113
 
 
1114
    def assertIsSameRealPath(self, path1, path2):
 
1115
        """Fail if path1 and path2 points to different files"""
 
1116
        self.assertEqual(osutils.realpath(path1),
 
1117
                         osutils.realpath(path2),
 
1118
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
1119
 
 
1120
    def assertIsInstance(self, obj, kls, msg=None):
 
1121
        """Fail if obj is not an instance of kls
 
1122
        
 
1123
        :param msg: Supplementary message to show if the assertion fails.
 
1124
        """
987
1125
        if not isinstance(obj, kls):
988
 
            self.fail("%r is an instance of %s rather than %s" % (
989
 
                obj, obj.__class__, kls))
 
1126
            m = "%r is an instance of %s rather than %s" % (
 
1127
                obj, obj.__class__, kls)
 
1128
            if msg:
 
1129
                m += ": " + msg
 
1130
            self.fail(m)
990
1131
 
991
1132
    def expectFailure(self, reason, assertion, *args, **kwargs):
992
1133
        """Invoke a test, expecting it to fail for the given reason.
1023
1164
        else:
1024
1165
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1025
1166
 
1026
 
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
1167
    def assertFileEqual(self, content, path):
 
1168
        """Fail if path does not contain 'content'."""
 
1169
        self.failUnlessExists(path)
 
1170
        f = file(path, 'rb')
 
1171
        try:
 
1172
            s = f.read()
 
1173
        finally:
 
1174
            f.close()
 
1175
        self.assertEqualDiff(content, s)
 
1176
 
 
1177
    def failUnlessExists(self, path):
 
1178
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1179
        if not isinstance(path, basestring):
 
1180
            for p in path:
 
1181
                self.failUnlessExists(p)
 
1182
        else:
 
1183
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1184
 
 
1185
    def failIfExists(self, path):
 
1186
        """Fail if path or paths, which may be abs or relative, exist."""
 
1187
        if not isinstance(path, basestring):
 
1188
            for p in path:
 
1189
                self.failIfExists(p)
 
1190
        else:
 
1191
            self.failIf(osutils.lexists(path),path+" exists")
 
1192
 
 
1193
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1027
1194
        """A helper for callDeprecated and applyDeprecated.
1028
1195
 
1029
1196
        :param a_callable: A callable to call.
1055
1222
        To test that a deprecated method raises an error, do something like
1056
1223
        this::
1057
1224
 
1058
 
        self.assertRaises(errors.ReservedId,
1059
 
            self.applyDeprecated, zero_ninetyone,
1060
 
                br.append_revision, 'current:')
 
1225
            self.assertRaises(errors.ReservedId,
 
1226
                self.applyDeprecated,
 
1227
                deprecated_in((1, 5, 0)),
 
1228
                br.append_revision,
 
1229
                'current:')
1061
1230
 
1062
1231
        :param deprecation_format: The deprecation format that the callable
1063
1232
            should have been deprecated with. This is the same type as the
1071
1240
        :param kwargs: The keyword arguments for the callable
1072
1241
        :return: The result of a_callable(``*args``, ``**kwargs``)
1073
1242
        """
1074
 
        call_warnings, result = self._capture_warnings(a_callable,
 
1243
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
1075
1244
            *args, **kwargs)
1076
1245
        expected_first_warning = symbol_versioning.deprecation_string(
1077
1246
            a_callable, deprecation_format)
1081
1250
        self.assertEqual(expected_first_warning, call_warnings[0])
1082
1251
        return result
1083
1252
 
 
1253
    def callCatchWarnings(self, fn, *args, **kw):
 
1254
        """Call a callable that raises python warnings.
 
1255
 
 
1256
        The caller's responsible for examining the returned warnings.
 
1257
 
 
1258
        If the callable raises an exception, the exception is not
 
1259
        caught and propagates up to the caller.  In that case, the list
 
1260
        of warnings is not available.
 
1261
 
 
1262
        :returns: ([warning_object, ...], fn_result)
 
1263
        """
 
1264
        # XXX: This is not perfect, because it completely overrides the
 
1265
        # warnings filters, and some code may depend on suppressing particular
 
1266
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1267
        # though.  -- Andrew, 20071062
 
1268
        wlist = []
 
1269
        def _catcher(message, category, filename, lineno, file=None, line=None):
 
1270
            # despite the name, 'message' is normally(?) a Warning subclass
 
1271
            # instance
 
1272
            wlist.append(message)
 
1273
        saved_showwarning = warnings.showwarning
 
1274
        saved_filters = warnings.filters
 
1275
        try:
 
1276
            warnings.showwarning = _catcher
 
1277
            warnings.filters = []
 
1278
            result = fn(*args, **kw)
 
1279
        finally:
 
1280
            warnings.showwarning = saved_showwarning
 
1281
            warnings.filters = saved_filters
 
1282
        return wlist, result
 
1283
 
1084
1284
    def callDeprecated(self, expected, callable, *args, **kwargs):
1085
1285
        """Assert that a callable is deprecated in a particular way.
1086
1286
 
1087
 
        This is a very precise test for unusual requirements. The 
 
1287
        This is a very precise test for unusual requirements. The
1088
1288
        applyDeprecated helper function is probably more suited for most tests
1089
1289
        as it allows you to simply specify the deprecation format being used
1090
1290
        and will ensure that that is issued for the function being called.
1091
1291
 
1092
1292
        Note that this only captures warnings raised by symbol_versioning.warn,
1093
 
        not other callers that go direct to the warning module.
 
1293
        not other callers that go direct to the warning module.  To catch
 
1294
        general warnings, use callCatchWarnings.
1094
1295
 
1095
1296
        :param expected: a list of the deprecation warnings expected, in order
1096
1297
        :param callable: The callable to call
1097
1298
        :param args: The positional arguments for the callable
1098
1299
        :param kwargs: The keyword arguments for the callable
1099
1300
        """
1100
 
        call_warnings, result = self._capture_warnings(callable,
 
1301
        call_warnings, result = self._capture_deprecation_warnings(callable,
1101
1302
            *args, **kwargs)
1102
1303
        self.assertEqual(expected, call_warnings)
1103
1304
        return result
1109
1310
        """
1110
1311
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1111
1312
        self._log_file = os.fdopen(fileno, 'w+')
1112
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1313
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
1113
1314
        self._log_file_name = name
1114
1315
        self.addCleanup(self._finishLogFile)
1115
1316
 
1120
1321
        """
1121
1322
        if self._log_file is None:
1122
1323
            return
1123
 
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1324
        bzrlib.trace.pop_log_file(self._log_memento)
1124
1325
        self._log_file.close()
1125
1326
        self._log_file = None
1126
1327
        if not self._keep_log_file:
1131
1332
        """Make the logfile not be deleted when _finishLogFile is called."""
1132
1333
        self._keep_log_file = True
1133
1334
 
1134
 
    def addCleanup(self, callable):
 
1335
    def thisFailsStrictLockCheck(self):
 
1336
        """It is known that this test would fail with -Dstrict_locks.
 
1337
 
 
1338
        By default, all tests are run with strict lock checking unless
 
1339
        -Edisable_lock_checks is supplied. However there are some tests which
 
1340
        we know fail strict locks at this point that have not been fixed.
 
1341
        They should call this function to disable the strict checking.
 
1342
 
 
1343
        This should be used sparingly, it is much better to fix the locking
 
1344
        issues rather than papering over the problem by calling this function.
 
1345
        """
 
1346
        debug.debug_flags.discard('strict_locks')
 
1347
 
 
1348
    def addCleanup(self, callable, *args, **kwargs):
1135
1349
        """Arrange to run a callable when this case is torn down.
1136
1350
 
1137
 
        Callables are run in the reverse of the order they are registered, 
 
1351
        Callables are run in the reverse of the order they are registered,
1138
1352
        ie last-in first-out.
1139
1353
        """
1140
 
        if callable in self._cleanups:
1141
 
            raise ValueError("cleanup function %r already registered on %s" 
1142
 
                    % (callable, self))
1143
 
        self._cleanups.append(callable)
 
1354
        self._cleanups.append((callable, args, kwargs))
1144
1355
 
1145
1356
    def _cleanEnvironment(self):
1146
1357
        new_env = {
1147
1358
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1148
1359
            'HOME': os.getcwd(),
1149
 
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1360
            # bzr now uses the Win32 API and doesn't rely on APPDATA, but the
 
1361
            # tests do check our impls match APPDATA
 
1362
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1363
            'VISUAL': None,
 
1364
            'EDITOR': None,
1150
1365
            'BZR_EMAIL': None,
1151
1366
            'BZREMAIL': None, # may still be present in the environment
1152
1367
            'EMAIL': None,
1153
1368
            'BZR_PROGRESS_BAR': None,
 
1369
            'BZR_LOG': None,
 
1370
            'BZR_PLUGIN_PATH': None,
 
1371
            # Make sure that any text ui tests are consistent regardless of
 
1372
            # the environment the test case is run in; you may want tests that
 
1373
            # test other combinations.  'dumb' is a reasonable guess for tests
 
1374
            # going to a pipe or a StringIO.
 
1375
            'TERM': 'dumb',
 
1376
            'LINES': '25',
 
1377
            'COLUMNS': '80',
1154
1378
            # SSH Agent
1155
1379
            'SSH_AUTH_SOCK': None,
1156
1380
            # Proxies
1162
1386
            'NO_PROXY': None,
1163
1387
            'all_proxy': None,
1164
1388
            'ALL_PROXY': None,
1165
 
            # Nobody cares about these ones AFAIK. So far at
 
1389
            # Nobody cares about ftp_proxy, FTP_PROXY AFAIK. So far at
1166
1390
            # least. If you do (care), please update this comment
1167
 
            # -- vila 20061212
 
1391
            # -- vila 20080401
1168
1392
            'ftp_proxy': None,
1169
1393
            'FTP_PROXY': None,
1170
1394
            'BZR_REMOTE_PATH': None,
1187
1411
            osutils.set_or_unset_env(name, value)
1188
1412
 
1189
1413
    def _restoreHooks(self):
1190
 
        for klass, hooks in self._preserved_hooks.items():
1191
 
            setattr(klass, 'hooks', hooks)
 
1414
        for klass, (name, hooks) in self._preserved_hooks.items():
 
1415
            setattr(klass, name, hooks)
1192
1416
 
1193
1417
    def knownFailure(self, reason):
1194
1418
        """This test has failed for some known reason."""
1195
1419
        raise KnownFailure(reason)
1196
1420
 
 
1421
    def _do_skip(self, result, reason):
 
1422
        addSkip = getattr(result, 'addSkip', None)
 
1423
        if not callable(addSkip):
 
1424
            result.addError(self, sys.exc_info())
 
1425
        else:
 
1426
            addSkip(self, reason)
 
1427
 
1197
1428
    def run(self, result=None):
1198
1429
        if result is None: result = self.defaultTestResult()
1199
1430
        for feature in getattr(self, '_test_needs_features', []):
1204
1435
                else:
1205
1436
                    result.addSuccess(self)
1206
1437
                result.stopTest(self)
1207
 
                return
1208
 
        return unittest.TestCase.run(self, result)
 
1438
                return result
 
1439
        try:
 
1440
            try:
 
1441
                result.startTest(self)
 
1442
                absent_attr = object()
 
1443
                # Python 2.5
 
1444
                method_name = getattr(self, '_testMethodName', absent_attr)
 
1445
                if method_name is absent_attr:
 
1446
                    # Python 2.4
 
1447
                    method_name = getattr(self, '_TestCase__testMethodName')
 
1448
                testMethod = getattr(self, method_name)
 
1449
                try:
 
1450
                    try:
 
1451
                        self.setUp()
 
1452
                        if not self._bzr_test_setUp_run:
 
1453
                            self.fail(
 
1454
                                "test setUp did not invoke "
 
1455
                                "bzrlib.tests.TestCase's setUp")
 
1456
                    except KeyboardInterrupt:
 
1457
                        self._runCleanups()
 
1458
                        raise
 
1459
                    except TestSkipped, e:
 
1460
                        self._do_skip(result, e.args[0])
 
1461
                        self.tearDown()
 
1462
                        return result
 
1463
                    except:
 
1464
                        result.addError(self, sys.exc_info())
 
1465
                        self._runCleanups()
 
1466
                        return result
 
1467
 
 
1468
                    ok = False
 
1469
                    try:
 
1470
                        testMethod()
 
1471
                        ok = True
 
1472
                    except self.failureException:
 
1473
                        result.addFailure(self, sys.exc_info())
 
1474
                    except TestSkipped, e:
 
1475
                        if not e.args:
 
1476
                            reason = "No reason given."
 
1477
                        else:
 
1478
                            reason = e.args[0]
 
1479
                        self._do_skip(result, reason)
 
1480
                    except KeyboardInterrupt:
 
1481
                        self._runCleanups()
 
1482
                        raise
 
1483
                    except:
 
1484
                        result.addError(self, sys.exc_info())
 
1485
 
 
1486
                    try:
 
1487
                        self.tearDown()
 
1488
                        if not self._bzr_test_tearDown_run:
 
1489
                            self.fail(
 
1490
                                "test tearDown did not invoke "
 
1491
                                "bzrlib.tests.TestCase's tearDown")
 
1492
                    except KeyboardInterrupt:
 
1493
                        self._runCleanups()
 
1494
                        raise
 
1495
                    except:
 
1496
                        result.addError(self, sys.exc_info())
 
1497
                        self._runCleanups()
 
1498
                        ok = False
 
1499
                    if ok: result.addSuccess(self)
 
1500
                finally:
 
1501
                    result.stopTest(self)
 
1502
                return result
 
1503
            except TestNotApplicable:
 
1504
                # Not moved from the result [yet].
 
1505
                self._runCleanups()
 
1506
                raise
 
1507
            except KeyboardInterrupt:
 
1508
                self._runCleanups()
 
1509
                raise
 
1510
        finally:
 
1511
            saved_attrs = {}
 
1512
            for attr_name in self.attrs_to_keep:
 
1513
                if attr_name in self.__dict__:
 
1514
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1515
            self.__dict__ = saved_attrs
1209
1516
 
1210
1517
    def tearDown(self):
1211
1518
        self._runCleanups()
 
1519
        self._log_contents = ''
 
1520
        self._bzr_test_tearDown_run = True
1212
1521
        unittest.TestCase.tearDown(self)
1213
1522
 
1214
1523
    def time(self, callable, *args, **kwargs):
1215
1524
        """Run callable and accrue the time it takes to the benchmark time.
1216
 
        
 
1525
 
1217
1526
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1218
1527
        this will cause lsprofile statistics to be gathered and stored in
1219
1528
        self._benchcalls.
1234
1543
            self._benchtime += time.time() - start
1235
1544
 
1236
1545
    def _runCleanups(self):
1237
 
        """Run registered cleanup functions. 
 
1546
        """Run registered cleanup functions.
1238
1547
 
1239
1548
        This should only be called from TestCase.tearDown.
1240
1549
        """
1241
 
        # TODO: Perhaps this should keep running cleanups even if 
 
1550
        # TODO: Perhaps this should keep running cleanups even if
1242
1551
        # one of them fails?
1243
1552
 
1244
1553
        # Actually pop the cleanups from the list so tearDown running
1245
1554
        # twice is safe (this happens for skipped tests).
1246
1555
        while self._cleanups:
1247
 
            self._cleanups.pop()()
 
1556
            cleanup, args, kwargs = self._cleanups.pop()
 
1557
            cleanup(*args, **kwargs)
1248
1558
 
1249
1559
    def log(self, *args):
1250
1560
        mutter(*args)
1260
1570
        """
1261
1571
        # flush the log file, to get all content
1262
1572
        import bzrlib.trace
1263
 
        bzrlib.trace._trace_file.flush()
 
1573
        if bzrlib.trace._trace_file:
 
1574
            bzrlib.trace._trace_file.flush()
1264
1575
        if self._log_contents:
 
1576
            # XXX: this can hardly contain the content flushed above --vila
 
1577
            # 20080128
1265
1578
            return self._log_contents
1266
1579
        if self._log_file_name is not None:
1267
1580
            logfile = open(self._log_file_name)
1275
1588
                    os.remove(self._log_file_name)
1276
1589
                except OSError, e:
1277
1590
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
1278
 
                        print >>sys.stderr, ('Unable to delete log file '
1279
 
                                             ' %r' % self._log_file_name)
 
1591
                        sys.stderr.write(('Unable to delete log file '
 
1592
                                             ' %r\n' % self._log_file_name))
1280
1593
                    else:
1281
1594
                        raise
1282
1595
            return log_contents
1283
1596
        else:
1284
1597
            return "DELETED log file to reduce memory footprint"
1285
1598
 
1286
 
    @deprecated_method(zero_eighteen)
1287
 
    def capture(self, cmd, retcode=0):
1288
 
        """Shortcut that splits cmd into words, runs, and returns stdout"""
1289
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1290
 
 
1291
1599
    def requireFeature(self, feature):
1292
1600
        """This test requires a specific feature is available.
1293
1601
 
1296
1604
        if not feature.available():
1297
1605
            raise UnavailableFeature(feature)
1298
1606
 
1299
 
    @deprecated_method(zero_eighteen)
1300
 
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1301
 
                         working_dir=None):
1302
 
        """Invoke bzr and return (stdout, stderr).
1303
 
 
1304
 
        Don't call this method, just use run_bzr() which is equivalent.
1305
 
 
1306
 
        :param argv: Arguments to invoke bzr.  This may be either a 
1307
 
            single string, in which case it is split by shlex into words, 
1308
 
            or a list of arguments.
1309
 
        :param retcode: Expected return code, or None for don't-care.
1310
 
        :param encoding: Encoding for sys.stdout and sys.stderr
1311
 
        :param stdin: A string to be used as stdin for the command.
1312
 
        :param working_dir: Change to this directory before running
1313
 
        """
1314
 
        return self._run_bzr_autosplit(argv, retcode=retcode,
1315
 
                encoding=encoding, stdin=stdin, working_dir=working_dir,
1316
 
                )
1317
 
 
1318
1607
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1319
1608
            working_dir):
1320
1609
        """Run bazaar command line, splitting up a string command line."""
1321
1610
        if isinstance(args, basestring):
1322
 
            args = list(shlex.split(args))
 
1611
            # shlex don't understand unicode strings,
 
1612
            # so args should be plain string (bialix 20070906)
 
1613
            args = list(shlex.split(str(args)))
1323
1614
        return self._run_bzr_core(args, retcode=retcode,
1324
1615
                encoding=encoding, stdin=stdin, working_dir=working_dir,
1325
1616
                )
1327
1618
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1328
1619
            working_dir):
1329
1620
        if encoding is None:
1330
 
            encoding = bzrlib.user_encoding
 
1621
            encoding = osutils.get_user_encoding()
1331
1622
        stdout = StringIOWrapper()
1332
1623
        stderr = StringIOWrapper()
1333
1624
        stdout.encoding = encoding
1350
1641
        try:
1351
1642
            result = self.apply_redirected(ui.ui_factory.stdin,
1352
1643
                stdout, stderr,
1353
 
                bzrlib.commands.run_bzr_catch_errors,
 
1644
                bzrlib.commands.run_bzr_catch_user_errors,
1354
1645
                args)
1355
1646
        finally:
1356
1647
            logger.removeHandler(handler)
1369
1660
                              message='Unexpected return code')
1370
1661
        return out, err
1371
1662
 
1372
 
    def run_bzr(self, *args, **kwargs):
 
1663
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1664
                working_dir=None, error_regexes=[], output_encoding=None):
1373
1665
        """Invoke bzr, as if it were run from the command line.
1374
1666
 
1375
1667
        The argument list should not include the bzr program name - the
1377
1669
        passed in three ways:
1378
1670
 
1379
1671
        1- A list of strings, eg ["commit", "a"].  This is recommended
1380
 
        when the command contains whitespace or metacharacters, or 
 
1672
        when the command contains whitespace or metacharacters, or
1381
1673
        is built up at run time.
1382
1674
 
1383
 
        2- A single string, eg "add a".  This is the most convenient 
 
1675
        2- A single string, eg "add a".  This is the most convenient
1384
1676
        for hardcoded commands.
1385
1677
 
1386
 
        3- Several varargs parameters, eg run_bzr("add", "a").  
1387
 
        This is not recommended for new code.
1388
 
 
1389
1678
        This runs bzr through the interface that catches and reports
1390
1679
        errors, and with logging set to something approximating the
1391
1680
        default, so that error reporting can be checked.
1404
1693
        :keyword error_regexes: A list of expected error messages.  If
1405
1694
            specified they must be seen in the error output of the command.
1406
1695
        """
1407
 
        retcode = kwargs.pop('retcode', 0)
1408
 
        encoding = kwargs.pop('encoding', None)
1409
 
        stdin = kwargs.pop('stdin', None)
1410
 
        working_dir = kwargs.pop('working_dir', None)
1411
 
        error_regexes = kwargs.pop('error_regexes', [])
1412
 
 
1413
 
        if kwargs:
1414
 
            raise TypeError("run_bzr() got unexpected keyword arguments '%s'"
1415
 
                            % kwargs.keys())
1416
 
 
1417
 
        if len(args) == 1:
1418
 
            if isinstance(args[0], (list, basestring)):
1419
 
                args = args[0]
1420
 
        else:
1421
 
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
1422
 
                                   DeprecationWarning, stacklevel=3)
1423
 
 
1424
 
        out, err = self._run_bzr_autosplit(args=args,
 
1696
        out, err = self._run_bzr_autosplit(
 
1697
            args=args,
1425
1698
            retcode=retcode,
1426
 
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1699
            encoding=encoding,
 
1700
            stdin=stdin,
 
1701
            working_dir=working_dir,
1427
1702
            )
1428
 
 
 
1703
        self.assertIsInstance(error_regexes, (list, tuple))
1429
1704
        for regex in error_regexes:
1430
1705
            self.assertContainsRe(err, regex)
1431
1706
        return out, err
1432
1707
 
1433
 
    def run_bzr_decode(self, *args, **kwargs):
1434
 
        if 'encoding' in kwargs:
1435
 
            encoding = kwargs['encoding']
1436
 
        else:
1437
 
            encoding = bzrlib.user_encoding
1438
 
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1439
 
 
1440
1708
    def run_bzr_error(self, error_regexes, *args, **kwargs):
1441
1709
        """Run bzr, and check that stderr contains the supplied regexes
1442
1710
 
1470
1738
    def run_bzr_subprocess(self, *args, **kwargs):
1471
1739
        """Run bzr in a subprocess for testing.
1472
1740
 
1473
 
        This starts a new Python interpreter and runs bzr in there. 
 
1741
        This starts a new Python interpreter and runs bzr in there.
1474
1742
        This should only be used for tests that have a justifiable need for
1475
1743
        this isolation: e.g. they are testing startup time, or signal
1476
 
        handling, or early startup code, etc.  Subprocess code can't be 
 
1744
        handling, or early startup code, etc.  Subprocess code can't be
1477
1745
        profiled or debugged so easily.
1478
1746
 
1479
1747
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
1497
1765
            elif isinstance(args[0], basestring):
1498
1766
                args = list(shlex.split(args[0]))
1499
1767
        else:
1500
 
            symbol_versioning.warn(zero_ninetyone %
1501
 
                                   "passing varargs to run_bzr_subprocess",
1502
 
                                   DeprecationWarning, stacklevel=3)
 
1768
            raise ValueError("passing varargs to run_bzr_subprocess")
1503
1769
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
1504
1770
                                            working_dir=working_dir,
1505
1771
                                            allow_plugins=allow_plugins)
1561
1827
            # so we will avoid using it on all platforms, just to
1562
1828
            # make sure the code path is used, and we don't break on win32
1563
1829
            cleanup_environment()
1564
 
            command = [sys.executable, bzr_path]
 
1830
            command = [sys.executable]
 
1831
            # frozen executables don't need the path to bzr
 
1832
            if getattr(sys, "frozen", None) is None:
 
1833
                command.append(bzr_path)
1565
1834
            if not allow_plugins:
1566
1835
                command.append('--no-plugins')
1567
1836
            command.extend(process_args)
1693
1962
        sio.encoding = output_encoding
1694
1963
        return sio
1695
1964
 
 
1965
    def disable_verb(self, verb):
 
1966
        """Disable a smart server verb for one test."""
 
1967
        from bzrlib.smart import request
 
1968
        request_handlers = request.request_handlers
 
1969
        orig_method = request_handlers.get(verb)
 
1970
        request_handlers.remove(verb)
 
1971
        def restoreVerb():
 
1972
            request_handlers.register(verb, orig_method)
 
1973
        self.addCleanup(restoreVerb)
 
1974
 
 
1975
 
 
1976
class CapturedCall(object):
 
1977
    """A helper for capturing smart server calls for easy debug analysis."""
 
1978
 
 
1979
    def __init__(self, params, prefix_length):
 
1980
        """Capture the call with params and skip prefix_length stack frames."""
 
1981
        self.call = params
 
1982
        import traceback
 
1983
        # The last 5 frames are the __init__, the hook frame, and 3 smart
 
1984
        # client frames. Beyond this we could get more clever, but this is good
 
1985
        # enough for now.
 
1986
        stack = traceback.extract_stack()[prefix_length:-5]
 
1987
        self.stack = ''.join(traceback.format_list(stack))
 
1988
 
 
1989
    def __str__(self):
 
1990
        return self.call.method
 
1991
 
 
1992
    def __repr__(self):
 
1993
        return self.call.method
 
1994
 
 
1995
    def stack(self):
 
1996
        return self.stack
 
1997
 
1696
1998
 
1697
1999
class TestCaseWithMemoryTransport(TestCase):
1698
2000
    """Common test class for tests that do not need disk resources.
1717
2019
    _TEST_NAME = 'test'
1718
2020
 
1719
2021
    def __init__(self, methodName='runTest'):
1720
 
        # allow test parameterisation after test construction and before test
1721
 
        # execution. Variables that the parameteriser sets need to be 
 
2022
        # allow test parameterization after test construction and before test
 
2023
        # execution. Variables that the parameterizer sets need to be
1722
2024
        # ones that are not set by setUp, or setUp will trash them.
1723
2025
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
1724
2026
        self.vfs_transport_factory = default_transport
1731
2033
 
1732
2034
        This transport is for the test scratch space relative to
1733
2035
        "self._test_root"
1734
 
        
 
2036
 
1735
2037
        :param relpath: a path relative to the base url.
1736
2038
        """
1737
2039
        t = get_transport(self.get_url(relpath))
1740
2042
 
1741
2043
    def get_readonly_transport(self, relpath=None):
1742
2044
        """Return a readonly transport for the test scratch space
1743
 
        
 
2045
 
1744
2046
        This can be used to test that operations which should only need
1745
2047
        readonly access in fact do not try to write.
1746
2048
 
1777
2079
    def get_readonly_url(self, relpath=None):
1778
2080
        """Get a URL for the readonly transport.
1779
2081
 
1780
 
        This will either be backed by '.' or a decorator to the transport 
 
2082
        This will either be backed by '.' or a decorator to the transport
1781
2083
        used by self.get_url()
1782
2084
        relpath provides for clients to get a path relative to the base url.
1783
2085
        These should only be downwards relative, not upwards.
1874
2176
        base = self.get_vfs_only_server().get_url()
1875
2177
        return self._adjust_url(base, relpath)
1876
2178
 
 
2179
    def _create_safety_net(self):
 
2180
        """Make a fake bzr directory.
 
2181
 
 
2182
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
2183
        real branch.
 
2184
        """
 
2185
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2186
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
2187
 
 
2188
    def _check_safety_net(self):
 
2189
        """Check that the safety .bzr directory have not been touched.
 
2190
 
 
2191
        _make_test_root have created a .bzr directory to prevent tests from
 
2192
        propagating. This method ensures than a test did not leaked.
 
2193
        """
 
2194
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2195
        wt = workingtree.WorkingTree.open(root)
 
2196
        last_rev = wt.last_revision()
 
2197
        if last_rev != 'null:':
 
2198
            # The current test have modified the /bzr directory, we need to
 
2199
            # recreate a new one or all the followng tests will fail.
 
2200
            # If you need to inspect its content uncomment the following line
 
2201
            # import pdb; pdb.set_trace()
 
2202
            _rmtree_temp_dir(root + '/.bzr')
 
2203
            self._create_safety_net()
 
2204
            raise AssertionError('%s/.bzr should not be modified' % root)
 
2205
 
1877
2206
    def _make_test_root(self):
1878
 
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1879
 
            return
1880
 
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1881
 
        TestCaseWithMemoryTransport.TEST_ROOT = root
1882
 
        
1883
 
        # make a fake bzr directory there to prevent any tests propagating
1884
 
        # up onto the source directory's real branch
1885
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
1886
 
 
1887
 
        # The same directory is used by all tests, and we're not specifically
1888
 
        # told when all tests are finished.  This will do.
1889
 
        atexit.register(_rmtree_temp_dir, root)
 
2207
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2208
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2209
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2210
 
 
2211
            self._create_safety_net()
 
2212
 
 
2213
            # The same directory is used by all tests, and we're not
 
2214
            # specifically told when all tests are finished.  This will do.
 
2215
            atexit.register(_rmtree_temp_dir, root)
 
2216
 
 
2217
        self.addCleanup(self._check_safety_net)
1890
2218
 
1891
2219
    def makeAndChdirToTestDir(self):
1892
2220
        """Create a temporary directories for this one test.
1893
 
        
 
2221
 
1894
2222
        This must set self.test_home_dir and self.test_dir and chdir to
1895
2223
        self.test_dir.
1896
 
        
 
2224
 
1897
2225
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1898
2226
        """
1899
2227
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1900
2228
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1901
2229
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1902
 
        
 
2230
 
1903
2231
    def make_branch(self, relpath, format=None):
1904
2232
        """Create a branch on the transport at relpath."""
1905
2233
        repo = self.make_repository(relpath, format=format)
1923
2251
 
1924
2252
    def make_repository(self, relpath, shared=False, format=None):
1925
2253
        """Create a repository on our default transport at relpath.
1926
 
        
 
2254
 
1927
2255
        Note that relpath must be a relative path, not a full url.
1928
2256
        """
1929
2257
        # FIXME: If you create a remoterepository this returns the underlying
1930
 
        # real format, which is incorrect.  Actually we should make sure that 
 
2258
        # real format, which is incorrect.  Actually we should make sure that
1931
2259
        # RemoteBzrDir returns a RemoteRepository.
1932
2260
        # maybe  mbp 20070410
1933
2261
        made_control = self.make_bzrdir(relpath, format=format)
1934
2262
        return made_control.create_repository(shared=shared)
1935
2263
 
 
2264
    def make_smart_server(self, path):
 
2265
        smart_server = server.SmartTCPServer_for_testing()
 
2266
        smart_server.setUp(self.get_server())
 
2267
        remote_transport = get_transport(smart_server.get_url()).clone(path)
 
2268
        self.addCleanup(smart_server.tearDown)
 
2269
        return remote_transport
 
2270
 
1936
2271
    def make_branch_and_memory_tree(self, relpath, format=None):
1937
2272
        """Create a branch on the default transport and a MemoryTree for it."""
1938
2273
        b = self.make_branch(relpath, format=format)
1939
2274
        return memorytree.MemoryTree.create_on_branch(b)
1940
2275
 
 
2276
    def make_branch_builder(self, relpath, format=None):
 
2277
        branch = self.make_branch(relpath, format=format)
 
2278
        return branchbuilder.BranchBuilder(branch=branch)
 
2279
 
1941
2280
    def overrideEnvironmentForTesting(self):
1942
2281
        os.environ['HOME'] = self.test_home_dir
1943
2282
        os.environ['BZR_HOME'] = self.test_home_dir
1944
 
        
 
2283
 
1945
2284
    def setUp(self):
1946
2285
        super(TestCaseWithMemoryTransport, self).setUp()
1947
2286
        self._make_test_root()
1955
2294
        self.__server = None
1956
2295
        self.reduceLockdirTimeout()
1957
2296
 
1958
 
     
 
2297
    def setup_smart_server_with_call_log(self):
 
2298
        """Sets up a smart server as the transport server with a call log."""
 
2299
        self.transport_server = server.SmartTCPServer_for_testing
 
2300
        self.hpss_calls = []
 
2301
        import traceback
 
2302
        # Skip the current stack down to the caller of
 
2303
        # setup_smart_server_with_call_log
 
2304
        prefix_length = len(traceback.extract_stack()) - 2
 
2305
        def capture_hpss_call(params):
 
2306
            self.hpss_calls.append(
 
2307
                CapturedCall(params, prefix_length))
 
2308
        client._SmartClient.hooks.install_named_hook(
 
2309
            'call', capture_hpss_call, None)
 
2310
 
 
2311
    def reset_smart_call_log(self):
 
2312
        self.hpss_calls = []
 
2313
 
 
2314
 
1959
2315
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1960
2316
    """Derived class that runs a test within a temporary directory.
1961
2317
 
1966
2322
    All test cases create their own directory within that.  If the
1967
2323
    tests complete successfully, the directory is removed.
1968
2324
 
1969
 
    :ivar test_base_dir: The path of the top-level directory for this 
 
2325
    :ivar test_base_dir: The path of the top-level directory for this
1970
2326
    test, which contains a home directory and a work directory.
1971
2327
 
1972
2328
    :ivar test_home_dir: An initially empty directory under test_base_dir
1986
2342
            self.log("actually: %r" % contents)
1987
2343
            self.fail("contents of %s not as expected" % filename)
1988
2344
 
 
2345
    def _getTestDirPrefix(self):
 
2346
        # create a directory within the top level test directory
 
2347
        if sys.platform in ('win32', 'cygwin'):
 
2348
            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
 
2349
            # windows is likely to have path-length limits so use a short name
 
2350
            name_prefix = name_prefix[-30:]
 
2351
        else:
 
2352
            name_prefix = re.sub('[/]', '_', self.id())
 
2353
        return name_prefix
 
2354
 
1989
2355
    def makeAndChdirToTestDir(self):
1990
2356
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1991
 
        
 
2357
 
1992
2358
        For TestCaseInTempDir we create a temporary directory based on the test
1993
2359
        name and then create two subdirs - test and home under it.
1994
2360
        """
1995
 
        # create a directory within the top level test directory
1996
 
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
2361
        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
 
2362
            self._getTestDirPrefix())
 
2363
        name = name_prefix
 
2364
        for i in range(100):
 
2365
            if os.path.exists(name):
 
2366
                name = name_prefix + '_' + str(i)
 
2367
            else:
 
2368
                os.mkdir(name)
 
2369
                break
1997
2370
        # now create test and home directories within this dir
1998
 
        self.test_base_dir = candidate_dir
 
2371
        self.test_base_dir = name
1999
2372
        self.test_home_dir = self.test_base_dir + '/home'
2000
2373
        os.mkdir(self.test_home_dir)
2001
2374
        self.test_dir = self.test_base_dir + '/work'
2010
2383
        self.addCleanup(self.deleteTestDir)
2011
2384
 
2012
2385
    def deleteTestDir(self):
2013
 
        os.chdir(self.TEST_ROOT)
 
2386
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2014
2387
        _rmtree_temp_dir(self.test_base_dir)
2015
2388
 
2016
2389
    def build_tree(self, shape, line_endings='binary', transport=None):
2023
2396
 
2024
2397
        This doesn't add anything to a branch.
2025
2398
 
 
2399
        :type shape:    list or tuple.
2026
2400
        :param line_endings: Either 'binary' or 'native'
2027
2401
            in binary mode, exact contents are written in native mode, the
2028
2402
            line endings match the default platform endings.
2030
2404
            If the transport is readonly or None, "." is opened automatically.
2031
2405
        :return: None
2032
2406
        """
 
2407
        if type(shape) not in (list, tuple):
 
2408
            raise AssertionError("Parameter 'shape' should be "
 
2409
                "a list or a tuple. Got %r instead" % (shape,))
2033
2410
        # It's OK to just create them using forward slashes on windows.
2034
2411
        if transport is None or transport.is_readonly():
2035
2412
            transport = get_transport(".")
2036
2413
        for name in shape:
2037
 
            self.assert_(isinstance(name, basestring))
 
2414
            self.assertIsInstance(name, basestring)
2038
2415
            if name[-1] == '/':
2039
2416
                transport.mkdir(urlutils.escape(name[:-1]))
2040
2417
            else:
2051
2428
    def build_tree_contents(self, shape):
2052
2429
        build_tree_contents(shape)
2053
2430
 
2054
 
    def assertFileEqual(self, content, path):
2055
 
        """Fail if path does not contain 'content'."""
2056
 
        self.failUnlessExists(path)
2057
 
        f = file(path, 'rb')
2058
 
        try:
2059
 
            s = f.read()
2060
 
        finally:
2061
 
            f.close()
2062
 
        self.assertEqualDiff(content, s)
2063
 
 
2064
 
    def failUnlessExists(self, path):
2065
 
        """Fail unless path or paths, which may be abs or relative, exist."""
2066
 
        if not isinstance(path, basestring):
2067
 
            for p in path:
2068
 
                self.failUnlessExists(p)
2069
 
        else:
2070
 
            self.failUnless(osutils.lexists(path),path+" does not exist")
2071
 
 
2072
 
    def failIfExists(self, path):
2073
 
        """Fail if path or paths, which may be abs or relative, exist."""
2074
 
        if not isinstance(path, basestring):
2075
 
            for p in path:
2076
 
                self.failIfExists(p)
2077
 
        else:
2078
 
            self.failIf(osutils.lexists(path),path+" exists")
2079
 
 
2080
 
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
2431
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2081
2432
        """Assert whether path or paths are in the WorkingTree"""
2082
2433
        if tree is None:
2083
2434
            tree = workingtree.WorkingTree.open(root_path)
2084
2435
        if not isinstance(path, basestring):
2085
2436
            for p in path:
2086
 
                self.assertInWorkingTree(p,tree=tree)
 
2437
                self.assertInWorkingTree(p, tree=tree)
2087
2438
        else:
2088
2439
            self.assertIsNot(tree.path2id(path), None,
2089
2440
                path+' not in working tree.')
2090
2441
 
2091
 
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
2442
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2092
2443
        """Assert whether path or paths are not in the WorkingTree"""
2093
2444
        if tree is None:
2094
2445
            tree = workingtree.WorkingTree.open(root_path)
2109
2460
    ReadonlyTransportDecorator is used instead which allows the use of non disk
2110
2461
    based read write transports.
2111
2462
 
2112
 
    If an explicit class is provided for readonly access, that server and the 
 
2463
    If an explicit class is provided for readonly access, that server and the
2113
2464
    readwrite one must both define get_url() as resolving to os.getcwd().
2114
2465
    """
2115
2466
 
2153
2504
                # the branch is colocated on disk, we cannot create a checkout.
2154
2505
                # hopefully callers will expect this.
2155
2506
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2156
 
                return local_controldir.create_workingtree()
 
2507
                wt = local_controldir.create_workingtree()
 
2508
                if wt.branch._format != b._format:
 
2509
                    wt._branch = b
 
2510
                    # Make sure that assigning to wt._branch fixes wt.branch,
 
2511
                    # in case the implementation details of workingtree objects
 
2512
                    # change.
 
2513
                    self.assertIs(b, wt.branch)
 
2514
                return wt
2157
2515
            else:
2158
2516
                return b.create_checkout(relpath, lightweight=True)
2159
2517
 
2194
2552
    for readonly urls.
2195
2553
 
2196
2554
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2197
 
                       be used without needed to redo it when a different 
 
2555
                       be used without needed to redo it when a different
2198
2556
                       subclass is in use ?
2199
2557
    """
2200
2558
 
2204
2562
            self.transport_readonly_server = HttpServer
2205
2563
 
2206
2564
 
2207
 
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2208
 
                       random_order=False):
2209
 
    """Create a test suite by filtering another one.
2210
 
    
 
2565
def condition_id_re(pattern):
 
2566
    """Create a condition filter which performs a re check on a test's id.
 
2567
 
 
2568
    :param pattern: A regular expression string.
 
2569
    :return: A callable that returns True if the re matches.
 
2570
    """
 
2571
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2572
        'test filter')
 
2573
    def condition(test):
 
2574
        test_id = test.id()
 
2575
        return filter_re.search(test_id)
 
2576
    return condition
 
2577
 
 
2578
 
 
2579
def condition_isinstance(klass_or_klass_list):
 
2580
    """Create a condition filter which returns isinstance(param, klass).
 
2581
 
 
2582
    :return: A callable which when called with one parameter obj return the
 
2583
        result of isinstance(obj, klass_or_klass_list).
 
2584
    """
 
2585
    def condition(obj):
 
2586
        return isinstance(obj, klass_or_klass_list)
 
2587
    return condition
 
2588
 
 
2589
 
 
2590
def condition_id_in_list(id_list):
 
2591
    """Create a condition filter which verify that test's id in a list.
 
2592
 
 
2593
    :param id_list: A TestIdList object.
 
2594
    :return: A callable that returns True if the test's id appears in the list.
 
2595
    """
 
2596
    def condition(test):
 
2597
        return id_list.includes(test.id())
 
2598
    return condition
 
2599
 
 
2600
 
 
2601
def condition_id_startswith(starts):
 
2602
    """Create a condition filter verifying that test's id starts with a string.
 
2603
 
 
2604
    :param starts: A list of string.
 
2605
    :return: A callable that returns True if the test's id starts with one of
 
2606
        the given strings.
 
2607
    """
 
2608
    def condition(test):
 
2609
        for start in starts:
 
2610
            if test.id().startswith(start):
 
2611
                return True
 
2612
        return False
 
2613
    return condition
 
2614
 
 
2615
 
 
2616
def exclude_tests_by_condition(suite, condition):
 
2617
    """Create a test suite which excludes some tests from suite.
 
2618
 
 
2619
    :param suite: The suite to get tests from.
 
2620
    :param condition: A callable whose result evaluates True when called with a
 
2621
        test case which should be excluded from the result.
 
2622
    :return: A suite which contains the tests found in suite that fail
 
2623
        condition.
 
2624
    """
 
2625
    result = []
 
2626
    for test in iter_suite_tests(suite):
 
2627
        if not condition(test):
 
2628
            result.append(test)
 
2629
    return TestUtil.TestSuite(result)
 
2630
 
 
2631
 
 
2632
def filter_suite_by_condition(suite, condition):
 
2633
    """Create a test suite by filtering another one.
 
2634
 
 
2635
    :param suite: The source suite.
 
2636
    :param condition: A callable whose result evaluates True when called with a
 
2637
        test case which should be included in the result.
 
2638
    :return: A suite which contains the tests found in suite that pass
 
2639
        condition.
 
2640
    """
 
2641
    result = []
 
2642
    for test in iter_suite_tests(suite):
 
2643
        if condition(test):
 
2644
            result.append(test)
 
2645
    return TestUtil.TestSuite(result)
 
2646
 
 
2647
 
 
2648
def filter_suite_by_re(suite, pattern):
 
2649
    """Create a test suite by filtering another one.
 
2650
 
2211
2651
    :param suite:           the source suite
2212
2652
    :param pattern:         pattern that names must match
2213
 
    :param exclude_pattern: pattern that names must not match, if any
2214
 
    :param random_order:    if True, tests in the new suite will be put in
2215
 
                            random order
2216
 
    :returns: the newly created suite
2217
 
    """ 
2218
 
    return sort_suite_by_re(suite, pattern, exclude_pattern,
2219
 
        random_order, False)
2220
 
 
2221
 
 
2222
 
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2223
 
                     random_order=False, append_rest=True):
2224
 
    """Create a test suite by sorting another one.
2225
 
    
2226
 
    :param suite:           the source suite
2227
 
    :param pattern:         pattern that names must match in order to go
2228
 
                            first in the new suite
2229
 
    :param exclude_pattern: pattern that names must not match, if any
2230
 
    :param random_order:    if True, tests in the new suite will be put in
2231
 
                            random order
2232
 
    :param append_rest:     if False, pattern is a strict filter and not
2233
 
                            just an ordering directive
2234
 
    :returns: the newly created suite
2235
 
    """ 
2236
 
    first = []
2237
 
    second = []
2238
 
    filter_re = re.compile(pattern)
2239
 
    if exclude_pattern is not None:
2240
 
        exclude_re = re.compile(exclude_pattern)
 
2653
    :returns: the newly created suite
 
2654
    """
 
2655
    condition = condition_id_re(pattern)
 
2656
    result_suite = filter_suite_by_condition(suite, condition)
 
2657
    return result_suite
 
2658
 
 
2659
 
 
2660
def filter_suite_by_id_list(suite, test_id_list):
 
2661
    """Create a test suite by filtering another one.
 
2662
 
 
2663
    :param suite: The source suite.
 
2664
    :param test_id_list: A list of the test ids to keep as strings.
 
2665
    :returns: the newly created suite
 
2666
    """
 
2667
    condition = condition_id_in_list(test_id_list)
 
2668
    result_suite = filter_suite_by_condition(suite, condition)
 
2669
    return result_suite
 
2670
 
 
2671
 
 
2672
def filter_suite_by_id_startswith(suite, start):
 
2673
    """Create a test suite by filtering another one.
 
2674
 
 
2675
    :param suite: The source suite.
 
2676
    :param start: A list of string the test id must start with one of.
 
2677
    :returns: the newly created suite
 
2678
    """
 
2679
    condition = condition_id_startswith(start)
 
2680
    result_suite = filter_suite_by_condition(suite, condition)
 
2681
    return result_suite
 
2682
 
 
2683
 
 
2684
def exclude_tests_by_re(suite, pattern):
 
2685
    """Create a test suite which excludes some tests from suite.
 
2686
 
 
2687
    :param suite: The suite to get tests from.
 
2688
    :param pattern: A regular expression string. Test ids that match this
 
2689
        pattern will be excluded from the result.
 
2690
    :return: A TestSuite that contains all the tests from suite without the
 
2691
        tests that matched pattern. The order of tests is the same as it was in
 
2692
        suite.
 
2693
    """
 
2694
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
 
2695
 
 
2696
 
 
2697
def preserve_input(something):
 
2698
    """A helper for performing test suite transformation chains.
 
2699
 
 
2700
    :param something: Anything you want to preserve.
 
2701
    :return: Something.
 
2702
    """
 
2703
    return something
 
2704
 
 
2705
 
 
2706
def randomize_suite(suite):
 
2707
    """Return a new TestSuite with suite's tests in random order.
 
2708
 
 
2709
    The tests in the input suite are flattened into a single suite in order to
 
2710
    accomplish this. Any nested TestSuites are removed to provide global
 
2711
    randomness.
 
2712
    """
 
2713
    tests = list(iter_suite_tests(suite))
 
2714
    random.shuffle(tests)
 
2715
    return TestUtil.TestSuite(tests)
 
2716
 
 
2717
 
 
2718
def split_suite_by_condition(suite, condition):
 
2719
    """Split a test suite into two by a condition.
 
2720
 
 
2721
    :param suite: The suite to split.
 
2722
    :param condition: The condition to match on. Tests that match this
 
2723
        condition are returned in the first test suite, ones that do not match
 
2724
        are in the second suite.
 
2725
    :return: A tuple of two test suites, where the first contains tests from
 
2726
        suite matching the condition, and the second contains the remainder
 
2727
        from suite. The order within each output suite is the same as it was in
 
2728
        suite.
 
2729
    """
 
2730
    matched = []
 
2731
    did_not_match = []
2241
2732
    for test in iter_suite_tests(suite):
2242
 
        test_id = test.id()
2243
 
        if exclude_pattern is None or not exclude_re.search(test_id):
2244
 
            if filter_re.search(test_id):
2245
 
                first.append(test)
2246
 
            elif append_rest:
2247
 
                second.append(test)
2248
 
    if random_order:
2249
 
        random.shuffle(first)
2250
 
        random.shuffle(second)
2251
 
    return TestUtil.TestSuite(first + second)
 
2733
        if condition(test):
 
2734
            matched.append(test)
 
2735
        else:
 
2736
            did_not_match.append(test)
 
2737
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
2738
 
 
2739
 
 
2740
def split_suite_by_re(suite, pattern):
 
2741
    """Split a test suite into two by a regular expression.
 
2742
 
 
2743
    :param suite: The suite to split.
 
2744
    :param pattern: A regular expression string. Test ids that match this
 
2745
        pattern will be in the first test suite returned, and the others in the
 
2746
        second test suite returned.
 
2747
    :return: A tuple of two test suites, where the first contains tests from
 
2748
        suite matching pattern, and the second contains the remainder from
 
2749
        suite. The order within each output suite is the same as it was in
 
2750
        suite.
 
2751
    """
 
2752
    return split_suite_by_condition(suite, condition_id_re(pattern))
2252
2753
 
2253
2754
 
2254
2755
def run_suite(suite, name='test', verbose=False, pattern=".*",
2259
2760
              random_seed=None,
2260
2761
              exclude_pattern=None,
2261
2762
              strict=False,
2262
 
              ):
 
2763
              runner_class=None,
 
2764
              suite_decorators=None,
 
2765
              stream=None):
 
2766
    """Run a test suite for bzr selftest.
 
2767
 
 
2768
    :param runner_class: The class of runner to use. Must support the
 
2769
        constructor arguments passed by run_suite which are more than standard
 
2770
        python uses.
 
2771
    :return: A boolean indicating success.
 
2772
    """
2263
2773
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2264
2774
    if verbose:
2265
2775
        verbosity = 2
2266
2776
    else:
2267
2777
        verbosity = 1
2268
 
    runner = TextTestRunner(stream=sys.stdout,
 
2778
    if runner_class is None:
 
2779
        runner_class = TextTestRunner
 
2780
    if stream is None:
 
2781
        stream = sys.stdout
 
2782
    runner = runner_class(stream=stream,
2269
2783
                            descriptions=0,
2270
2784
                            verbosity=verbosity,
2271
2785
                            bench_history=bench_history,
2272
2786
                            list_only=list_only,
 
2787
                            strict=strict,
2273
2788
                            )
2274
2789
    runner.stop_on_failure=stop_on_failure
2275
 
    # Initialise the random number generator and display the seed used.
2276
 
    # We convert the seed to a long to make it reuseable across invocations.
2277
 
    random_order = False
2278
 
    if random_seed is not None:
2279
 
        random_order = True
2280
 
        if random_seed == "now":
2281
 
            random_seed = long(time.time())
 
2790
    # built in decorator factories:
 
2791
    decorators = [
 
2792
        random_order(random_seed, runner),
 
2793
        exclude_tests(exclude_pattern),
 
2794
        ]
 
2795
    if matching_tests_first:
 
2796
        decorators.append(tests_first(pattern))
 
2797
    else:
 
2798
        decorators.append(filter_tests(pattern))
 
2799
    if suite_decorators:
 
2800
        decorators.extend(suite_decorators)
 
2801
    # tell the result object how many tests will be running: (except if
 
2802
    # --parallel=fork is being used. Robert said he will provide a better
 
2803
    # progress design later -- vila 20090817)
 
2804
    if fork_decorator not in decorators:
 
2805
        decorators.append(CountingDecorator)
 
2806
    for decorator in decorators:
 
2807
        suite = decorator(suite)
 
2808
    result = runner.run(suite)
 
2809
    if list_only:
 
2810
        return True
 
2811
    result.done()
 
2812
    if strict:
 
2813
        return result.wasStrictlySuccessful()
 
2814
    else:
 
2815
        return result.wasSuccessful()
 
2816
 
 
2817
 
 
2818
# A registry where get() returns a suite decorator.
 
2819
parallel_registry = registry.Registry()
 
2820
 
 
2821
 
 
2822
def fork_decorator(suite):
 
2823
    concurrency = osutils.local_concurrency()
 
2824
    if concurrency == 1:
 
2825
        return suite
 
2826
    from testtools import ConcurrentTestSuite
 
2827
    return ConcurrentTestSuite(suite, fork_for_tests)
 
2828
parallel_registry.register('fork', fork_decorator)
 
2829
 
 
2830
 
 
2831
def subprocess_decorator(suite):
 
2832
    concurrency = osutils.local_concurrency()
 
2833
    if concurrency == 1:
 
2834
        return suite
 
2835
    from testtools import ConcurrentTestSuite
 
2836
    return ConcurrentTestSuite(suite, reinvoke_for_tests)
 
2837
parallel_registry.register('subprocess', subprocess_decorator)
 
2838
 
 
2839
 
 
2840
def exclude_tests(exclude_pattern):
 
2841
    """Return a test suite decorator that excludes tests."""
 
2842
    if exclude_pattern is None:
 
2843
        return identity_decorator
 
2844
    def decorator(suite):
 
2845
        return ExcludeDecorator(suite, exclude_pattern)
 
2846
    return decorator
 
2847
 
 
2848
 
 
2849
def filter_tests(pattern):
 
2850
    if pattern == '.*':
 
2851
        return identity_decorator
 
2852
    def decorator(suite):
 
2853
        return FilterTestsDecorator(suite, pattern)
 
2854
    return decorator
 
2855
 
 
2856
 
 
2857
def random_order(random_seed, runner):
 
2858
    """Return a test suite decorator factory for randomising tests order.
 
2859
    
 
2860
    :param random_seed: now, a string which casts to a long, or a long.
 
2861
    :param runner: A test runner with a stream attribute to report on.
 
2862
    """
 
2863
    if random_seed is None:
 
2864
        return identity_decorator
 
2865
    def decorator(suite):
 
2866
        return RandomDecorator(suite, random_seed, runner.stream)
 
2867
    return decorator
 
2868
 
 
2869
 
 
2870
def tests_first(pattern):
 
2871
    if pattern == '.*':
 
2872
        return identity_decorator
 
2873
    def decorator(suite):
 
2874
        return TestFirstDecorator(suite, pattern)
 
2875
    return decorator
 
2876
 
 
2877
 
 
2878
def identity_decorator(suite):
 
2879
    """Return suite."""
 
2880
    return suite
 
2881
 
 
2882
 
 
2883
class TestDecorator(TestSuite):
 
2884
    """A decorator for TestCase/TestSuite objects.
 
2885
    
 
2886
    Usually, subclasses should override __iter__(used when flattening test
 
2887
    suites), which we do to filter, reorder, parallelise and so on, run() and
 
2888
    debug().
 
2889
    """
 
2890
 
 
2891
    def __init__(self, suite):
 
2892
        TestSuite.__init__(self)
 
2893
        self.addTest(suite)
 
2894
 
 
2895
    def countTestCases(self):
 
2896
        cases = 0
 
2897
        for test in self:
 
2898
            cases += test.countTestCases()
 
2899
        return cases
 
2900
 
 
2901
    def debug(self):
 
2902
        for test in self:
 
2903
            test.debug()
 
2904
 
 
2905
    def run(self, result):
 
2906
        # Use iteration on self, not self._tests, to allow subclasses to hook
 
2907
        # into __iter__.
 
2908
        for test in self:
 
2909
            if result.shouldStop:
 
2910
                break
 
2911
            test.run(result)
 
2912
        return result
 
2913
 
 
2914
 
 
2915
class CountingDecorator(TestDecorator):
 
2916
    """A decorator which calls result.progress(self.countTestCases)."""
 
2917
 
 
2918
    def run(self, result):
 
2919
        progress_method = getattr(result, 'progress', None)
 
2920
        if callable(progress_method):
 
2921
            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
 
2922
        return super(CountingDecorator, self).run(result)
 
2923
 
 
2924
 
 
2925
class ExcludeDecorator(TestDecorator):
 
2926
    """A decorator which excludes test matching an exclude pattern."""
 
2927
 
 
2928
    def __init__(self, suite, exclude_pattern):
 
2929
        TestDecorator.__init__(self, suite)
 
2930
        self.exclude_pattern = exclude_pattern
 
2931
        self.excluded = False
 
2932
 
 
2933
    def __iter__(self):
 
2934
        if self.excluded:
 
2935
            return iter(self._tests)
 
2936
        self.excluded = True
 
2937
        suite = exclude_tests_by_re(self, self.exclude_pattern)
 
2938
        del self._tests[:]
 
2939
        self.addTests(suite)
 
2940
        return iter(self._tests)
 
2941
 
 
2942
 
 
2943
class FilterTestsDecorator(TestDecorator):
 
2944
    """A decorator which filters tests to those matching a pattern."""
 
2945
 
 
2946
    def __init__(self, suite, pattern):
 
2947
        TestDecorator.__init__(self, suite)
 
2948
        self.pattern = pattern
 
2949
        self.filtered = False
 
2950
 
 
2951
    def __iter__(self):
 
2952
        if self.filtered:
 
2953
            return iter(self._tests)
 
2954
        self.filtered = True
 
2955
        suite = filter_suite_by_re(self, self.pattern)
 
2956
        del self._tests[:]
 
2957
        self.addTests(suite)
 
2958
        return iter(self._tests)
 
2959
 
 
2960
 
 
2961
class RandomDecorator(TestDecorator):
 
2962
    """A decorator which randomises the order of its tests."""
 
2963
 
 
2964
    def __init__(self, suite, random_seed, stream):
 
2965
        TestDecorator.__init__(self, suite)
 
2966
        self.random_seed = random_seed
 
2967
        self.randomised = False
 
2968
        self.stream = stream
 
2969
 
 
2970
    def __iter__(self):
 
2971
        if self.randomised:
 
2972
            return iter(self._tests)
 
2973
        self.randomised = True
 
2974
        self.stream.writeln("Randomizing test order using seed %s\n" %
 
2975
            (self.actual_seed()))
 
2976
        # Initialise the random number generator.
 
2977
        random.seed(self.actual_seed())
 
2978
        suite = randomize_suite(self)
 
2979
        del self._tests[:]
 
2980
        self.addTests(suite)
 
2981
        return iter(self._tests)
 
2982
 
 
2983
    def actual_seed(self):
 
2984
        if self.random_seed == "now":
 
2985
            # We convert the seed to a long to make it reuseable across
 
2986
            # invocations (because the user can reenter it).
 
2987
            self.random_seed = long(time.time())
2282
2988
        else:
2283
2989
            # Convert the seed to a long if we can
2284
2990
            try:
2285
 
                random_seed = long(random_seed)
 
2991
                self.random_seed = long(self.random_seed)
2286
2992
            except:
2287
2993
                pass
2288
 
        runner.stream.writeln("Randomizing test order using seed %s\n" %
2289
 
            (random_seed))
2290
 
        random.seed(random_seed)
2291
 
    # Customise the list of tests if requested
2292
 
    if pattern != '.*' or exclude_pattern is not None or random_order:
2293
 
        if matching_tests_first:
2294
 
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2295
 
                random_order)
2296
 
        else:
2297
 
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2298
 
                random_order)
2299
 
    result = runner.run(suite)
2300
 
 
2301
 
    if strict:
2302
 
        return result.wasStrictlySuccessful()
2303
 
 
2304
 
    return result.wasSuccessful()
 
2994
        return self.random_seed
 
2995
 
 
2996
 
 
2997
class TestFirstDecorator(TestDecorator):
 
2998
    """A decorator which moves named tests to the front."""
 
2999
 
 
3000
    def __init__(self, suite, pattern):
 
3001
        TestDecorator.__init__(self, suite)
 
3002
        self.pattern = pattern
 
3003
        self.filtered = False
 
3004
 
 
3005
    def __iter__(self):
 
3006
        if self.filtered:
 
3007
            return iter(self._tests)
 
3008
        self.filtered = True
 
3009
        suites = split_suite_by_re(self, self.pattern)
 
3010
        del self._tests[:]
 
3011
        self.addTests(suites)
 
3012
        return iter(self._tests)
 
3013
 
 
3014
 
 
3015
def partition_tests(suite, count):
 
3016
    """Partition suite into count lists of tests."""
 
3017
    result = []
 
3018
    tests = list(iter_suite_tests(suite))
 
3019
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3020
    for block in range(count):
 
3021
        low_test = block * tests_per_process
 
3022
        high_test = low_test + tests_per_process
 
3023
        process_tests = tests[low_test:high_test]
 
3024
        result.append(process_tests)
 
3025
    return result
 
3026
 
 
3027
 
 
3028
def fork_for_tests(suite):
 
3029
    """Take suite and start up one runner per CPU by forking()
 
3030
 
 
3031
    :return: An iterable of TestCase-like objects which can each have
 
3032
        run(result) called on them to feed tests to result.
 
3033
    """
 
3034
    concurrency = osutils.local_concurrency()
 
3035
    result = []
 
3036
    from subunit import TestProtocolClient, ProtocolTestCase
 
3037
    try:
 
3038
        from subunit.test_results import AutoTimingTestResultDecorator
 
3039
    except ImportError:
 
3040
        AutoTimingTestResultDecorator = lambda x:x
 
3041
    class TestInOtherProcess(ProtocolTestCase):
 
3042
        # Should be in subunit, I think. RBC.
 
3043
        def __init__(self, stream, pid):
 
3044
            ProtocolTestCase.__init__(self, stream)
 
3045
            self.pid = pid
 
3046
 
 
3047
        def run(self, result):
 
3048
            try:
 
3049
                ProtocolTestCase.run(self, result)
 
3050
            finally:
 
3051
                os.waitpid(self.pid, os.WNOHANG)
 
3052
 
 
3053
    test_blocks = partition_tests(suite, concurrency)
 
3054
    for process_tests in test_blocks:
 
3055
        process_suite = TestSuite()
 
3056
        process_suite.addTests(process_tests)
 
3057
        c2pread, c2pwrite = os.pipe()
 
3058
        pid = os.fork()
 
3059
        if pid == 0:
 
3060
            try:
 
3061
                os.close(c2pread)
 
3062
                # Leave stderr and stdout open so we can see test noise
 
3063
                # Close stdin so that the child goes away if it decides to
 
3064
                # read from stdin (otherwise its a roulette to see what
 
3065
                # child actually gets keystrokes for pdb etc).
 
3066
                sys.stdin.close()
 
3067
                sys.stdin = None
 
3068
                stream = os.fdopen(c2pwrite, 'wb', 1)
 
3069
                subunit_result = AutoTimingTestResultDecorator(
 
3070
                    TestProtocolClient(stream))
 
3071
                process_suite.run(subunit_result)
 
3072
            finally:
 
3073
                os._exit(0)
 
3074
        else:
 
3075
            os.close(c2pwrite)
 
3076
            stream = os.fdopen(c2pread, 'rb', 1)
 
3077
            test = TestInOtherProcess(stream, pid)
 
3078
            result.append(test)
 
3079
    return result
 
3080
 
 
3081
 
 
3082
def reinvoke_for_tests(suite):
 
3083
    """Take suite and start up one runner per CPU using subprocess().
 
3084
 
 
3085
    :return: An iterable of TestCase-like objects which can each have
 
3086
        run(result) called on them to feed tests to result.
 
3087
    """
 
3088
    concurrency = osutils.local_concurrency()
 
3089
    result = []
 
3090
    from subunit import ProtocolTestCase
 
3091
    class TestInSubprocess(ProtocolTestCase):
 
3092
        def __init__(self, process, name):
 
3093
            ProtocolTestCase.__init__(self, process.stdout)
 
3094
            self.process = process
 
3095
            self.process.stdin.close()
 
3096
            self.name = name
 
3097
 
 
3098
        def run(self, result):
 
3099
            try:
 
3100
                ProtocolTestCase.run(self, result)
 
3101
            finally:
 
3102
                self.process.wait()
 
3103
                os.unlink(self.name)
 
3104
            # print "pid %d finished" % finished_process
 
3105
    test_blocks = partition_tests(suite, concurrency)
 
3106
    for process_tests in test_blocks:
 
3107
        # ugly; currently reimplement rather than reuses TestCase methods.
 
3108
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
3109
        if not os.path.isfile(bzr_path):
 
3110
            # We are probably installed. Assume sys.argv is the right file
 
3111
            bzr_path = sys.argv[0]
 
3112
        fd, test_list_file_name = tempfile.mkstemp()
 
3113
        test_list_file = os.fdopen(fd, 'wb', 1)
 
3114
        for test in process_tests:
 
3115
            test_list_file.write(test.id() + '\n')
 
3116
        test_list_file.close()
 
3117
        try:
 
3118
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
 
3119
                '--subunit']
 
3120
            if '--no-plugins' in sys.argv:
 
3121
                argv.append('--no-plugins')
 
3122
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3123
            # stderr it can interrupt the subunit protocol.
 
3124
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3125
                bufsize=1)
 
3126
            test = TestInSubprocess(process, test_list_file_name)
 
3127
            result.append(test)
 
3128
        except:
 
3129
            os.unlink(test_list_file_name)
 
3130
            raise
 
3131
    return result
 
3132
 
 
3133
 
 
3134
class BZRTransformingResult(unittest.TestResult):
 
3135
 
 
3136
    def __init__(self, target):
 
3137
        unittest.TestResult.__init__(self)
 
3138
        self.result = target
 
3139
 
 
3140
    def startTest(self, test):
 
3141
        self.result.startTest(test)
 
3142
 
 
3143
    def stopTest(self, test):
 
3144
        self.result.stopTest(test)
 
3145
 
 
3146
    def addError(self, test, err):
 
3147
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3148
        if feature is not None:
 
3149
            self.result.addNotSupported(test, feature)
 
3150
        else:
 
3151
            self.result.addError(test, err)
 
3152
 
 
3153
    def addFailure(self, test, err):
 
3154
        known = self._error_looks_like('KnownFailure: ', err)
 
3155
        if known is not None:
 
3156
            self.result._addKnownFailure(test, [KnownFailure,
 
3157
                                                KnownFailure(known), None])
 
3158
        else:
 
3159
            self.result.addFailure(test, err)
 
3160
 
 
3161
    def addSkip(self, test, reason):
 
3162
        self.result.addSkip(test, reason)
 
3163
 
 
3164
    def addSuccess(self, test):
 
3165
        self.result.addSuccess(test)
 
3166
 
 
3167
    def _error_looks_like(self, prefix, err):
 
3168
        """Deserialize exception and returns the stringify value."""
 
3169
        import subunit
 
3170
        value = None
 
3171
        typ, exc, _ = err
 
3172
        if isinstance(exc, subunit.RemoteException):
 
3173
            # stringify the exception gives access to the remote traceback
 
3174
            # We search the last line for 'prefix'
 
3175
            lines = str(exc).split('\n')
 
3176
            while lines and not lines[-1]:
 
3177
                lines.pop(-1)
 
3178
            if lines:
 
3179
                if lines[-1].startswith(prefix):
 
3180
                    value = lines[-1][len(prefix):]
 
3181
        return value
 
3182
 
 
3183
 
 
3184
# Controlled by "bzr selftest -E=..." option
 
3185
# Currently supported:
 
3186
#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
 
3187
#                           preserves any flags supplied at the command line.
 
3188
#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
 
3189
#                           rather than failing tests. And no longer raise
 
3190
#                           LockContention when fctnl locks are not being used
 
3191
#                           with proper exclusion rules.
 
3192
selftest_debug_flags = set()
2305
3193
 
2306
3194
 
2307
3195
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2314
3202
             random_seed=None,
2315
3203
             exclude_pattern=None,
2316
3204
             strict=False,
 
3205
             load_list=None,
 
3206
             debug_flags=None,
 
3207
             starting_with=None,
 
3208
             runner_class=None,
 
3209
             suite_decorators=None,
 
3210
             stream=None,
2317
3211
             ):
2318
3212
    """Run the whole test suite under the enhanced runner"""
2319
3213
    # XXX: Very ugly way to do this...
2327
3221
        transport = default_transport
2328
3222
    old_transport = default_transport
2329
3223
    default_transport = transport
 
3224
    global selftest_debug_flags
 
3225
    old_debug_flags = selftest_debug_flags
 
3226
    if debug_flags is not None:
 
3227
        selftest_debug_flags = set(debug_flags)
2330
3228
    try:
 
3229
        if load_list is None:
 
3230
            keep_only = None
 
3231
        else:
 
3232
            keep_only = load_test_id_list(load_list)
 
3233
        if starting_with:
 
3234
            starting_with = [test_prefix_alias_registry.resolve_alias(start)
 
3235
                             for start in starting_with]
2331
3236
        if test_suite_factory is None:
2332
 
            suite = test_suite()
 
3237
            # Reduce loading time by loading modules based on the starting_with
 
3238
            # patterns.
 
3239
            suite = test_suite(keep_only, starting_with)
2333
3240
        else:
2334
3241
            suite = test_suite_factory()
 
3242
        if starting_with:
 
3243
            # But always filter as requested.
 
3244
            suite = filter_suite_by_id_startswith(suite, starting_with)
2335
3245
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2336
3246
                     stop_on_failure=stop_on_failure,
2337
3247
                     transport=transport,
2341
3251
                     list_only=list_only,
2342
3252
                     random_seed=random_seed,
2343
3253
                     exclude_pattern=exclude_pattern,
2344
 
                     strict=strict)
 
3254
                     strict=strict,
 
3255
                     runner_class=runner_class,
 
3256
                     suite_decorators=suite_decorators,
 
3257
                     stream=stream,
 
3258
                     )
2345
3259
    finally:
2346
3260
        default_transport = old_transport
2347
 
 
2348
 
 
2349
 
def test_suite():
 
3261
        selftest_debug_flags = old_debug_flags
 
3262
 
 
3263
 
 
3264
def load_test_id_list(file_name):
 
3265
    """Load a test id list from a text file.
 
3266
 
 
3267
    The format is one test id by line.  No special care is taken to impose
 
3268
    strict rules, these test ids are used to filter the test suite so a test id
 
3269
    that do not match an existing test will do no harm. This allows user to add
 
3270
    comments, leave blank lines, etc.
 
3271
    """
 
3272
    test_list = []
 
3273
    try:
 
3274
        ftest = open(file_name, 'rt')
 
3275
    except IOError, e:
 
3276
        if e.errno != errno.ENOENT:
 
3277
            raise
 
3278
        else:
 
3279
            raise errors.NoSuchFile(file_name)
 
3280
 
 
3281
    for test_name in ftest.readlines():
 
3282
        test_list.append(test_name.strip())
 
3283
    ftest.close()
 
3284
    return test_list
 
3285
 
 
3286
 
 
3287
def suite_matches_id_list(test_suite, id_list):
 
3288
    """Warns about tests not appearing or appearing more than once.
 
3289
 
 
3290
    :param test_suite: A TestSuite object.
 
3291
    :param test_id_list: The list of test ids that should be found in
 
3292
         test_suite.
 
3293
 
 
3294
    :return: (absents, duplicates) absents is a list containing the test found
 
3295
        in id_list but not in test_suite, duplicates is a list containing the
 
3296
        test found multiple times in test_suite.
 
3297
 
 
3298
    When using a prefined test id list, it may occurs that some tests do not
 
3299
    exist anymore or that some tests use the same id. This function warns the
 
3300
    tester about potential problems in his workflow (test lists are volatile)
 
3301
    or in the test suite itself (using the same id for several tests does not
 
3302
    help to localize defects).
 
3303
    """
 
3304
    # Build a dict counting id occurrences
 
3305
    tests = dict()
 
3306
    for test in iter_suite_tests(test_suite):
 
3307
        id = test.id()
 
3308
        tests[id] = tests.get(id, 0) + 1
 
3309
 
 
3310
    not_found = []
 
3311
    duplicates = []
 
3312
    for id in id_list:
 
3313
        occurs = tests.get(id, 0)
 
3314
        if not occurs:
 
3315
            not_found.append(id)
 
3316
        elif occurs > 1:
 
3317
            duplicates.append(id)
 
3318
 
 
3319
    return not_found, duplicates
 
3320
 
 
3321
 
 
3322
class TestIdList(object):
 
3323
    """Test id list to filter a test suite.
 
3324
 
 
3325
    Relying on the assumption that test ids are built as:
 
3326
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
 
3327
    notation, this class offers methods to :
 
3328
    - avoid building a test suite for modules not refered to in the test list,
 
3329
    - keep only the tests listed from the module test suite.
 
3330
    """
 
3331
 
 
3332
    def __init__(self, test_id_list):
 
3333
        # When a test suite needs to be filtered against us we compare test ids
 
3334
        # for equality, so a simple dict offers a quick and simple solution.
 
3335
        self.tests = dict().fromkeys(test_id_list, True)
 
3336
 
 
3337
        # While unittest.TestCase have ids like:
 
3338
        # <module>.<class>.<method>[(<param+)],
 
3339
        # doctest.DocTestCase can have ids like:
 
3340
        # <module>
 
3341
        # <module>.<class>
 
3342
        # <module>.<function>
 
3343
        # <module>.<class>.<method>
 
3344
 
 
3345
        # Since we can't predict a test class from its name only, we settle on
 
3346
        # a simple constraint: a test id always begins with its module name.
 
3347
 
 
3348
        modules = {}
 
3349
        for test_id in test_id_list:
 
3350
            parts = test_id.split('.')
 
3351
            mod_name = parts.pop(0)
 
3352
            modules[mod_name] = True
 
3353
            for part in parts:
 
3354
                mod_name += '.' + part
 
3355
                modules[mod_name] = True
 
3356
        self.modules = modules
 
3357
 
 
3358
    def refers_to(self, module_name):
 
3359
        """Is there tests for the module or one of its sub modules."""
 
3360
        return self.modules.has_key(module_name)
 
3361
 
 
3362
    def includes(self, test_id):
 
3363
        return self.tests.has_key(test_id)
 
3364
 
 
3365
 
 
3366
class TestPrefixAliasRegistry(registry.Registry):
 
3367
    """A registry for test prefix aliases.
 
3368
 
 
3369
    This helps implement shorcuts for the --starting-with selftest
 
3370
    option. Overriding existing prefixes is not allowed but not fatal (a
 
3371
    warning will be emitted).
 
3372
    """
 
3373
 
 
3374
    def register(self, key, obj, help=None, info=None,
 
3375
                 override_existing=False):
 
3376
        """See Registry.register.
 
3377
 
 
3378
        Trying to override an existing alias causes a warning to be emitted,
 
3379
        not a fatal execption.
 
3380
        """
 
3381
        try:
 
3382
            super(TestPrefixAliasRegistry, self).register(
 
3383
                key, obj, help=help, info=info, override_existing=False)
 
3384
        except KeyError:
 
3385
            actual = self.get(key)
 
3386
            note('Test prefix alias %s is already used for %s, ignoring %s'
 
3387
                 % (key, actual, obj))
 
3388
 
 
3389
    def resolve_alias(self, id_start):
 
3390
        """Replace the alias by the prefix in the given string.
 
3391
 
 
3392
        Using an unknown prefix is an error to help catching typos.
 
3393
        """
 
3394
        parts = id_start.split('.')
 
3395
        try:
 
3396
            parts[0] = self.get(parts[0])
 
3397
        except KeyError:
 
3398
            raise errors.BzrCommandError(
 
3399
                '%s is not a known test prefix alias' % parts[0])
 
3400
        return '.'.join(parts)
 
3401
 
 
3402
 
 
3403
test_prefix_alias_registry = TestPrefixAliasRegistry()
 
3404
"""Registry of test prefix aliases."""
 
3405
 
 
3406
 
 
3407
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
 
3408
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
 
3409
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
 
3410
 
 
3411
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3412
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
 
3413
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
 
3414
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
 
3415
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
 
3416
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
 
3417
 
 
3418
 
 
3419
def test_suite(keep_only=None, starting_with=None):
2350
3420
    """Build and return TestSuite for the whole of bzrlib.
2351
 
    
 
3421
 
 
3422
    :param keep_only: A list of test ids limiting the suite returned.
 
3423
 
 
3424
    :param starting_with: An id limiting the suite returned to the tests
 
3425
         starting with it.
 
3426
 
2352
3427
    This function can be replaced if you need to change the default test
2353
3428
    suite on a global basis, but it is not encouraged.
2354
3429
    """
2355
3430
    testmod_names = [
2356
 
                   'bzrlib.util.tests.test_bencode',
 
3431
                   'bzrlib.doc',
 
3432
                   'bzrlib.tests.blackbox',
 
3433
                   'bzrlib.tests.commands',
 
3434
                   'bzrlib.tests.per_branch',
 
3435
                   'bzrlib.tests.per_bzrdir',
 
3436
                   'bzrlib.tests.per_interrepository',
 
3437
                   'bzrlib.tests.per_intertree',
 
3438
                   'bzrlib.tests.per_inventory',
 
3439
                   'bzrlib.tests.per_interbranch',
 
3440
                   'bzrlib.tests.per_lock',
 
3441
                   'bzrlib.tests.per_transport',
 
3442
                   'bzrlib.tests.per_tree',
 
3443
                   'bzrlib.tests.per_pack_repository',
 
3444
                   'bzrlib.tests.per_repository',
 
3445
                   'bzrlib.tests.per_repository_chk',
 
3446
                   'bzrlib.tests.per_repository_reference',
 
3447
                   'bzrlib.tests.per_versionedfile',
 
3448
                   'bzrlib.tests.per_workingtree',
 
3449
                   'bzrlib.tests.test__annotator',
 
3450
                   'bzrlib.tests.test__chk_map',
2357
3451
                   'bzrlib.tests.test__dirstate_helpers',
 
3452
                   'bzrlib.tests.test__groupcompress',
 
3453
                   'bzrlib.tests.test__known_graph',
 
3454
                   'bzrlib.tests.test__rio',
 
3455
                   'bzrlib.tests.test__walkdirs_win32',
2358
3456
                   'bzrlib.tests.test_ancestry',
2359
3457
                   'bzrlib.tests.test_annotate',
2360
3458
                   'bzrlib.tests.test_api',
2361
3459
                   'bzrlib.tests.test_atomicfile',
2362
3460
                   'bzrlib.tests.test_bad_files',
 
3461
                   'bzrlib.tests.test_bencode',
 
3462
                   'bzrlib.tests.test_bisect_multi',
2363
3463
                   'bzrlib.tests.test_branch',
2364
3464
                   'bzrlib.tests.test_branchbuilder',
 
3465
                   'bzrlib.tests.test_btree_index',
2365
3466
                   'bzrlib.tests.test_bugtracker',
2366
3467
                   'bzrlib.tests.test_bundle',
2367
3468
                   'bzrlib.tests.test_bzrdir',
 
3469
                   'bzrlib.tests.test__chunks_to_lines',
2368
3470
                   'bzrlib.tests.test_cache_utf8',
 
3471
                   'bzrlib.tests.test_chk_map',
 
3472
                   'bzrlib.tests.test_chk_serializer',
 
3473
                   'bzrlib.tests.test_chunk_writer',
 
3474
                   'bzrlib.tests.test_clean_tree',
2369
3475
                   'bzrlib.tests.test_commands',
2370
3476
                   'bzrlib.tests.test_commit',
2371
3477
                   'bzrlib.tests.test_commit_merge',
2372
3478
                   'bzrlib.tests.test_config',
2373
3479
                   'bzrlib.tests.test_conflicts',
2374
3480
                   'bzrlib.tests.test_counted_lock',
 
3481
                   'bzrlib.tests.test_crash',
2375
3482
                   'bzrlib.tests.test_decorators',
2376
3483
                   'bzrlib.tests.test_delta',
 
3484
                   'bzrlib.tests.test_debug',
2377
3485
                   'bzrlib.tests.test_deprecated_graph',
2378
3486
                   'bzrlib.tests.test_diff',
 
3487
                   'bzrlib.tests.test_directory_service',
2379
3488
                   'bzrlib.tests.test_dirstate',
2380
3489
                   'bzrlib.tests.test_email_message',
 
3490
                   'bzrlib.tests.test_eol_filters',
2381
3491
                   'bzrlib.tests.test_errors',
2382
 
                   'bzrlib.tests.test_escaped_store',
 
3492
                   'bzrlib.tests.test_export',
2383
3493
                   'bzrlib.tests.test_extract',
2384
3494
                   'bzrlib.tests.test_fetch',
 
3495
                   'bzrlib.tests.test_fifo_cache',
 
3496
                   'bzrlib.tests.test_filters',
2385
3497
                   'bzrlib.tests.test_ftp_transport',
 
3498
                   'bzrlib.tests.test_foreign',
2386
3499
                   'bzrlib.tests.test_generate_docs',
2387
3500
                   'bzrlib.tests.test_generate_ids',
2388
3501
                   'bzrlib.tests.test_globbing',
2389
3502
                   'bzrlib.tests.test_gpg',
2390
3503
                   'bzrlib.tests.test_graph',
 
3504
                   'bzrlib.tests.test_groupcompress',
2391
3505
                   'bzrlib.tests.test_hashcache',
2392
3506
                   'bzrlib.tests.test_help',
2393
3507
                   'bzrlib.tests.test_hooks',
2399
3513
                   'bzrlib.tests.test_index',
2400
3514
                   'bzrlib.tests.test_info',
2401
3515
                   'bzrlib.tests.test_inv',
 
3516
                   'bzrlib.tests.test_inventory_delta',
2402
3517
                   'bzrlib.tests.test_knit',
2403
3518
                   'bzrlib.tests.test_lazy_import',
2404
3519
                   'bzrlib.tests.test_lazy_regex',
 
3520
                   'bzrlib.tests.test_lock',
 
3521
                   'bzrlib.tests.test_lockable_files',
2405
3522
                   'bzrlib.tests.test_lockdir',
2406
 
                   'bzrlib.tests.test_lockable_files',
2407
3523
                   'bzrlib.tests.test_log',
 
3524
                   'bzrlib.tests.test_lru_cache',
2408
3525
                   'bzrlib.tests.test_lsprof',
2409
3526
                   'bzrlib.tests.test_mail_client',
2410
3527
                   'bzrlib.tests.test_memorytree',
2415
3532
                   'bzrlib.tests.test_missing',
2416
3533
                   'bzrlib.tests.test_msgeditor',
2417
3534
                   'bzrlib.tests.test_multiparent',
 
3535
                   'bzrlib.tests.test_mutabletree',
2418
3536
                   'bzrlib.tests.test_nonascii',
2419
3537
                   'bzrlib.tests.test_options',
2420
3538
                   'bzrlib.tests.test_osutils',
2425
3543
                   'bzrlib.tests.test_permissions',
2426
3544
                   'bzrlib.tests.test_plugins',
2427
3545
                   'bzrlib.tests.test_progress',
 
3546
                   'bzrlib.tests.test_read_bundle',
2428
3547
                   'bzrlib.tests.test_reconcile',
 
3548
                   'bzrlib.tests.test_reconfigure',
2429
3549
                   'bzrlib.tests.test_registry',
2430
3550
                   'bzrlib.tests.test_remote',
 
3551
                   'bzrlib.tests.test_rename_map',
2431
3552
                   'bzrlib.tests.test_repository',
2432
3553
                   'bzrlib.tests.test_revert',
2433
3554
                   'bzrlib.tests.test_revision',
2434
 
                   'bzrlib.tests.test_revisionnamespaces',
 
3555
                   'bzrlib.tests.test_revisionspec',
2435
3556
                   'bzrlib.tests.test_revisiontree',
2436
3557
                   'bzrlib.tests.test_rio',
 
3558
                   'bzrlib.tests.test_rules',
2437
3559
                   'bzrlib.tests.test_sampler',
2438
3560
                   'bzrlib.tests.test_selftest',
 
3561
                   'bzrlib.tests.test_serializer',
2439
3562
                   'bzrlib.tests.test_setup',
2440
3563
                   'bzrlib.tests.test_sftp_transport',
 
3564
                   'bzrlib.tests.test_shelf',
 
3565
                   'bzrlib.tests.test_shelf_ui',
2441
3566
                   'bzrlib.tests.test_smart',
2442
3567
                   'bzrlib.tests.test_smart_add',
 
3568
                   'bzrlib.tests.test_smart_request',
2443
3569
                   'bzrlib.tests.test_smart_transport',
2444
3570
                   'bzrlib.tests.test_smtp_connection',
2445
3571
                   'bzrlib.tests.test_source',
2448
3574
                   'bzrlib.tests.test_store',
2449
3575
                   'bzrlib.tests.test_strace',
2450
3576
                   'bzrlib.tests.test_subsume',
 
3577
                   'bzrlib.tests.test_switch',
2451
3578
                   'bzrlib.tests.test_symbol_versioning',
2452
3579
                   'bzrlib.tests.test_tag',
2453
3580
                   'bzrlib.tests.test_testament',
2458
3585
                   'bzrlib.tests.test_transactions',
2459
3586
                   'bzrlib.tests.test_transform',
2460
3587
                   'bzrlib.tests.test_transport',
 
3588
                   'bzrlib.tests.test_transport_log',
2461
3589
                   'bzrlib.tests.test_tree',
2462
3590
                   'bzrlib.tests.test_treebuilder',
2463
3591
                   'bzrlib.tests.test_tsort',
2464
3592
                   'bzrlib.tests.test_tuned_gzip',
2465
3593
                   'bzrlib.tests.test_ui',
 
3594
                   'bzrlib.tests.test_uncommit',
2466
3595
                   'bzrlib.tests.test_upgrade',
 
3596
                   'bzrlib.tests.test_upgrade_stacked',
2467
3597
                   'bzrlib.tests.test_urlutils',
2468
 
                   'bzrlib.tests.test_versionedfile',
2469
3598
                   'bzrlib.tests.test_version',
2470
3599
                   'bzrlib.tests.test_version_info',
2471
3600
                   'bzrlib.tests.test_weave',
2476
3605
                   'bzrlib.tests.test_wsgi',
2477
3606
                   'bzrlib.tests.test_xml',
2478
3607
                   ]
2479
 
    test_transport_implementations = [
2480
 
        'bzrlib.tests.test_transport_implementations',
2481
 
        'bzrlib.tests.test_read_bundle',
2482
 
        ]
2483
 
    suite = TestUtil.TestSuite()
 
3608
 
2484
3609
    loader = TestUtil.TestLoader()
 
3610
 
 
3611
    if keep_only is not None:
 
3612
        id_filter = TestIdList(keep_only)
 
3613
    if starting_with:
 
3614
        # We take precedence over keep_only because *at loading time* using
 
3615
        # both options means we will load less tests for the same final result.
 
3616
        def interesting_module(name):
 
3617
            for start in starting_with:
 
3618
                if (
 
3619
                    # Either the module name starts with the specified string
 
3620
                    name.startswith(start)
 
3621
                    # or it may contain tests starting with the specified string
 
3622
                    or start.startswith(name)
 
3623
                    ):
 
3624
                    return True
 
3625
            return False
 
3626
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
 
3627
 
 
3628
    elif keep_only is not None:
 
3629
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3630
        def interesting_module(name):
 
3631
            return id_filter.refers_to(name)
 
3632
 
 
3633
    else:
 
3634
        loader = TestUtil.TestLoader()
 
3635
        def interesting_module(name):
 
3636
            # No filtering, all modules are interesting
 
3637
            return True
 
3638
 
 
3639
    suite = loader.suiteClass()
 
3640
 
 
3641
    # modules building their suite with loadTestsFromModuleNames
2485
3642
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2486
 
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2487
 
    adapter = TransportTestProviderAdapter()
2488
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
2489
 
    for package in packages_to_test():
2490
 
        suite.addTest(package.test_suite())
2491
 
    for m in MODULES_TO_TEST:
2492
 
        suite.addTest(loader.loadTestsFromModule(m))
2493
 
    for m in MODULES_TO_DOCTEST:
 
3643
 
 
3644
    modules_to_doctest = [
 
3645
        'bzrlib',
 
3646
        'bzrlib.branchbuilder',
 
3647
        'bzrlib.export',
 
3648
        'bzrlib.inventory',
 
3649
        'bzrlib.iterablefile',
 
3650
        'bzrlib.lockdir',
 
3651
        'bzrlib.merge3',
 
3652
        'bzrlib.option',
 
3653
        'bzrlib.symbol_versioning',
 
3654
        'bzrlib.tests',
 
3655
        'bzrlib.timestamp',
 
3656
        'bzrlib.version_info_formats.format_custom',
 
3657
        ]
 
3658
 
 
3659
    for mod in modules_to_doctest:
 
3660
        if not interesting_module(mod):
 
3661
            # No tests to keep here, move along
 
3662
            continue
2494
3663
        try:
2495
 
            suite.addTest(doctest.DocTestSuite(m))
 
3664
            # note that this really does mean "report only" -- doctest
 
3665
            # still runs the rest of the examples
 
3666
            doc_suite = doctest.DocTestSuite(mod,
 
3667
                optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
2496
3668
        except ValueError, e:
2497
 
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
3669
            print '**failed to get doctest for: %s\n%s' % (mod, e)
2498
3670
            raise
 
3671
        if len(doc_suite._tests) == 0:
 
3672
            raise errors.BzrError("no doctests found in %s" % (mod,))
 
3673
        suite.addTest(doc_suite)
 
3674
 
2499
3675
    default_encoding = sys.getdefaultencoding()
2500
3676
    for name, plugin in bzrlib.plugin.plugins().items():
2501
 
        try:
2502
 
            plugin_suite = plugin.test_suite()
2503
 
        except ImportError, e:
2504
 
            bzrlib.trace.warning(
2505
 
                'Unable to test plugin "%s": %s', name, e)
2506
 
        else:
2507
 
            if plugin_suite is not None:
2508
 
                suite.addTest(plugin_suite)
 
3677
        if not interesting_module(plugin.module.__name__):
 
3678
            continue
 
3679
        plugin_suite = plugin.test_suite()
 
3680
        # We used to catch ImportError here and turn it into just a warning,
 
3681
        # but really if you don't have --no-plugins this should be a failure.
 
3682
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
 
3683
        if plugin_suite is None:
 
3684
            plugin_suite = plugin.load_plugin_tests(loader)
 
3685
        if plugin_suite is not None:
 
3686
            suite.addTest(plugin_suite)
2509
3687
        if default_encoding != sys.getdefaultencoding():
2510
3688
            bzrlib.trace.warning(
2511
3689
                'Plugin "%s" tried to reset default encoding to: %s', name,
2512
3690
                sys.getdefaultencoding())
2513
3691
            reload(sys)
2514
3692
            sys.setdefaultencoding(default_encoding)
 
3693
 
 
3694
    if keep_only is not None:
 
3695
        # Now that the referred modules have loaded their tests, keep only the
 
3696
        # requested ones.
 
3697
        suite = filter_suite_by_id_list(suite, id_filter)
 
3698
        # Do some sanity checks on the id_list filtering
 
3699
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
 
3700
        if starting_with:
 
3701
            # The tester has used both keep_only and starting_with, so he is
 
3702
            # already aware that some tests are excluded from the list, there
 
3703
            # is no need to tell him which.
 
3704
            pass
 
3705
        else:
 
3706
            # Some tests mentioned in the list are not in the test suite. The
 
3707
            # list may be out of date, report to the tester.
 
3708
            for id in not_found:
 
3709
                bzrlib.trace.warning('"%s" not found in the test suite', id)
 
3710
        for id in duplicates:
 
3711
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
 
3712
 
2515
3713
    return suite
2516
3714
 
2517
3715
 
2518
 
def multiply_tests_from_modules(module_name_list, scenario_iter):
2519
 
    """Adapt all tests in some given modules to given scenarios.
2520
 
 
2521
 
    This is the recommended public interface for test parameterization.
2522
 
    Typically the test_suite() method for a per-implementation test
2523
 
    suite will call multiply_tests_from_modules and return the 
2524
 
    result.
2525
 
 
2526
 
    :param module_name_list: List of fully-qualified names of test
2527
 
        modules.
2528
 
    :param scenario_iter: Iterable of pairs of (scenario_name, 
 
3716
def multiply_scenarios(scenarios_left, scenarios_right):
 
3717
    """Multiply two sets of scenarios.
 
3718
 
 
3719
    :returns: the cartesian product of the two sets of scenarios, that is
 
3720
        a scenario for every possible combination of a left scenario and a
 
3721
        right scenario.
 
3722
    """
 
3723
    return [
 
3724
        ('%s,%s' % (left_name, right_name),
 
3725
         dict(left_dict.items() + right_dict.items()))
 
3726
        for left_name, left_dict in scenarios_left
 
3727
        for right_name, right_dict in scenarios_right]
 
3728
 
 
3729
 
 
3730
def multiply_tests(tests, scenarios, result):
 
3731
    """Multiply tests_list by scenarios into result.
 
3732
 
 
3733
    This is the core workhorse for test parameterisation.
 
3734
 
 
3735
    Typically the load_tests() method for a per-implementation test suite will
 
3736
    call multiply_tests and return the result.
 
3737
 
 
3738
    :param tests: The tests to parameterise.
 
3739
    :param scenarios: The scenarios to apply: pairs of (scenario_name,
2529
3740
        scenario_param_dict).
2530
 
 
2531
 
    This returns a new TestSuite containing the cross product of
2532
 
    all the tests in all the modules, each repeated for each scenario.
2533
 
    Each test is adapted by adding the scenario name at the end 
2534
 
    of its name, and updating the test object's __dict__ with the
2535
 
    scenario_param_dict.
2536
 
 
2537
 
    >>> r = multiply_tests_from_modules(
2538
 
    ...     ['bzrlib.tests.test_sampler'],
2539
 
    ...     [('one', dict(param=1)), 
2540
 
    ...      ('two', dict(param=2))])
 
3741
    :param result: A TestSuite to add created tests to.
 
3742
 
 
3743
    This returns the passed in result TestSuite with the cross product of all
 
3744
    the tests repeated once for each scenario.  Each test is adapted by adding
 
3745
    the scenario name at the end of its id(), and updating the test object's
 
3746
    __dict__ with the scenario_param_dict.
 
3747
 
 
3748
    >>> import bzrlib.tests.test_sampler
 
3749
    >>> r = multiply_tests(
 
3750
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
 
3751
    ...     [('one', dict(param=1)),
 
3752
    ...      ('two', dict(param=2))],
 
3753
    ...     TestSuite())
2541
3754
    >>> tests = list(iter_suite_tests(r))
2542
3755
    >>> len(tests)
2543
3756
    2
2548
3761
    >>> tests[1].param
2549
3762
    2
2550
3763
    """
2551
 
    loader = TestLoader()
2552
 
    suite = TestSuite()
2553
 
    adapter = TestScenarioApplier()
2554
 
    adapter.scenarios = list(scenario_iter)
2555
 
    adapt_modules(module_name_list, adapter, loader, suite)
2556
 
    return suite
2557
 
 
2558
 
 
2559
 
def adapt_modules(mods_list, adapter, loader, suite):
2560
 
    """Adapt the modules in mods_list using adapter and add to suite."""
2561
 
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2562
 
        suite.addTests(adapter.adapt(test))
 
3764
    for test in iter_suite_tests(tests):
 
3765
        apply_scenarios(test, scenarios, result)
 
3766
    return result
 
3767
 
 
3768
 
 
3769
def apply_scenarios(test, scenarios, result):
 
3770
    """Apply the scenarios in scenarios to test and add to result.
 
3771
 
 
3772
    :param test: The test to apply scenarios to.
 
3773
    :param scenarios: An iterable of scenarios to apply to test.
 
3774
    :return: result
 
3775
    :seealso: apply_scenario
 
3776
    """
 
3777
    for scenario in scenarios:
 
3778
        result.addTest(apply_scenario(test, scenario))
 
3779
    return result
 
3780
 
 
3781
 
 
3782
def apply_scenario(test, scenario):
 
3783
    """Copy test and apply scenario to it.
 
3784
 
 
3785
    :param test: A test to adapt.
 
3786
    :param scenario: A tuple describing the scenarion.
 
3787
        The first element of the tuple is the new test id.
 
3788
        The second element is a dict containing attributes to set on the
 
3789
        test.
 
3790
    :return: The adapted test.
 
3791
    """
 
3792
    new_id = "%s(%s)" % (test.id(), scenario[0])
 
3793
    new_test = clone_test(test, new_id)
 
3794
    for name, value in scenario[1].items():
 
3795
        setattr(new_test, name, value)
 
3796
    return new_test
 
3797
 
 
3798
 
 
3799
def clone_test(test, new_id):
 
3800
    """Clone a test giving it a new id.
 
3801
 
 
3802
    :param test: The test to clone.
 
3803
    :param new_id: The id to assign to it.
 
3804
    :return: The new test.
 
3805
    """
 
3806
    from copy import deepcopy
 
3807
    new_test = deepcopy(test)
 
3808
    new_test.id = lambda: new_id
 
3809
    return new_test
2563
3810
 
2564
3811
 
2565
3812
def _rmtree_temp_dir(dirname):
2577
3824
    try:
2578
3825
        osutils.rmtree(dirname)
2579
3826
    except OSError, e:
2580
 
        if sys.platform == 'win32' and e.errno == errno.EACCES:
2581
 
            print >>sys.stderr, ('Permission denied: '
2582
 
                                 'unable to remove testing dir '
2583
 
                                 '%s' % os.path.basename(dirname))
2584
 
        else:
2585
 
            raise
 
3827
        # We don't want to fail here because some useful display will be lost
 
3828
        # otherwise. Polluting the tmp dir is bad, but not giving all the
 
3829
        # possible info to the test runner is even worse.
 
3830
        sys.stderr.write('Unable to remove testing dir %s\n%s'
 
3831
                         % (os.path.basename(dirname), e))
2586
3832
 
2587
3833
 
2588
3834
class Feature(object):
2613
3859
        return self.__class__.__name__
2614
3860
 
2615
3861
 
2616
 
class TestScenarioApplier(object):
2617
 
    """A tool to apply scenarios to tests."""
2618
 
 
2619
 
    def adapt(self, test):
2620
 
        """Return a TestSuite containing a copy of test for each scenario."""
2621
 
        result = unittest.TestSuite()
2622
 
        for scenario in self.scenarios:
2623
 
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
3862
class _SymlinkFeature(Feature):
 
3863
 
 
3864
    def _probe(self):
 
3865
        return osutils.has_symlinks()
 
3866
 
 
3867
    def feature_name(self):
 
3868
        return 'symlinks'
 
3869
 
 
3870
SymlinkFeature = _SymlinkFeature()
 
3871
 
 
3872
 
 
3873
class _HardlinkFeature(Feature):
 
3874
 
 
3875
    def _probe(self):
 
3876
        return osutils.has_hardlinks()
 
3877
 
 
3878
    def feature_name(self):
 
3879
        return 'hardlinks'
 
3880
 
 
3881
HardlinkFeature = _HardlinkFeature()
 
3882
 
 
3883
 
 
3884
class _OsFifoFeature(Feature):
 
3885
 
 
3886
    def _probe(self):
 
3887
        return getattr(os, 'mkfifo', None)
 
3888
 
 
3889
    def feature_name(self):
 
3890
        return 'filesystem fifos'
 
3891
 
 
3892
OsFifoFeature = _OsFifoFeature()
 
3893
 
 
3894
 
 
3895
class _UnicodeFilenameFeature(Feature):
 
3896
    """Does the filesystem support Unicode filenames?"""
 
3897
 
 
3898
    def _probe(self):
 
3899
        try:
 
3900
            # Check for character combinations unlikely to be covered by any
 
3901
            # single non-unicode encoding. We use the characters
 
3902
            # - greek small letter alpha (U+03B1) and
 
3903
            # - braille pattern dots-123456 (U+283F).
 
3904
            os.stat(u'\u03b1\u283f')
 
3905
        except UnicodeEncodeError:
 
3906
            return False
 
3907
        except (IOError, OSError):
 
3908
            # The filesystem allows the Unicode filename but the file doesn't
 
3909
            # exist.
 
3910
            return True
 
3911
        else:
 
3912
            # The filesystem allows the Unicode filename and the file exists,
 
3913
            # for some reason.
 
3914
            return True
 
3915
 
 
3916
UnicodeFilenameFeature = _UnicodeFilenameFeature()
 
3917
 
 
3918
 
 
3919
def probe_unicode_in_user_encoding():
 
3920
    """Try to encode several unicode strings to use in unicode-aware tests.
 
3921
    Return first successfull match.
 
3922
 
 
3923
    :return:  (unicode value, encoded plain string value) or (None, None)
 
3924
    """
 
3925
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
3926
    for uni_val in possible_vals:
 
3927
        try:
 
3928
            str_val = uni_val.encode(osutils.get_user_encoding())
 
3929
        except UnicodeEncodeError:
 
3930
            # Try a different character
 
3931
            pass
 
3932
        else:
 
3933
            return uni_val, str_val
 
3934
    return None, None
 
3935
 
 
3936
 
 
3937
def probe_bad_non_ascii(encoding):
 
3938
    """Try to find [bad] character with code [128..255]
 
3939
    that cannot be decoded to unicode in some encoding.
 
3940
    Return None if all non-ascii characters is valid
 
3941
    for given encoding.
 
3942
    """
 
3943
    for i in xrange(128, 256):
 
3944
        char = chr(i)
 
3945
        try:
 
3946
            char.decode(encoding)
 
3947
        except UnicodeDecodeError:
 
3948
            return char
 
3949
    return None
 
3950
 
 
3951
 
 
3952
class _HTTPSServerFeature(Feature):
 
3953
    """Some tests want an https Server, check if one is available.
 
3954
 
 
3955
    Right now, the only way this is available is under python2.6 which provides
 
3956
    an ssl module.
 
3957
    """
 
3958
 
 
3959
    def _probe(self):
 
3960
        try:
 
3961
            import ssl
 
3962
            return True
 
3963
        except ImportError:
 
3964
            return False
 
3965
 
 
3966
    def feature_name(self):
 
3967
        return 'HTTPSServer'
 
3968
 
 
3969
 
 
3970
HTTPSServerFeature = _HTTPSServerFeature()
 
3971
 
 
3972
 
 
3973
class _UnicodeFilename(Feature):
 
3974
    """Does the filesystem support Unicode filenames?"""
 
3975
 
 
3976
    def _probe(self):
 
3977
        try:
 
3978
            os.stat(u'\u03b1')
 
3979
        except UnicodeEncodeError:
 
3980
            return False
 
3981
        except (IOError, OSError):
 
3982
            # The filesystem allows the Unicode filename but the file doesn't
 
3983
            # exist.
 
3984
            return True
 
3985
        else:
 
3986
            # The filesystem allows the Unicode filename and the file exists,
 
3987
            # for some reason.
 
3988
            return True
 
3989
 
 
3990
UnicodeFilename = _UnicodeFilename()
 
3991
 
 
3992
 
 
3993
class _UTF8Filesystem(Feature):
 
3994
    """Is the filesystem UTF-8?"""
 
3995
 
 
3996
    def _probe(self):
 
3997
        if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
 
3998
            return True
 
3999
        return False
 
4000
 
 
4001
UTF8Filesystem = _UTF8Filesystem()
 
4002
 
 
4003
 
 
4004
class _CaseInsCasePresFilenameFeature(Feature):
 
4005
    """Is the file-system case insensitive, but case-preserving?"""
 
4006
 
 
4007
    def _probe(self):
 
4008
        fileno, name = tempfile.mkstemp(prefix='MixedCase')
 
4009
        try:
 
4010
            # first check truly case-preserving for created files, then check
 
4011
            # case insensitive when opening existing files.
 
4012
            name = osutils.normpath(name)
 
4013
            base, rel = osutils.split(name)
 
4014
            found_rel = osutils.canonical_relpath(base, name)
 
4015
            return (found_rel == rel
 
4016
                    and os.path.isfile(name.upper())
 
4017
                    and os.path.isfile(name.lower()))
 
4018
        finally:
 
4019
            os.close(fileno)
 
4020
            os.remove(name)
 
4021
 
 
4022
    def feature_name(self):
 
4023
        return "case-insensitive case-preserving filesystem"
 
4024
 
 
4025
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
 
4026
 
 
4027
 
 
4028
class _CaseInsensitiveFilesystemFeature(Feature):
 
4029
    """Check if underlying filesystem is case-insensitive but *not* case
 
4030
    preserving.
 
4031
    """
 
4032
    # Note that on Windows, Cygwin, MacOS etc, the file-systems are far
 
4033
    # more likely to be case preserving, so this case is rare.
 
4034
 
 
4035
    def _probe(self):
 
4036
        if CaseInsCasePresFilenameFeature.available():
 
4037
            return False
 
4038
 
 
4039
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
4040
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
4041
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
4042
        else:
 
4043
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
4044
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
4045
            dir=root)
 
4046
        name_a = osutils.pathjoin(tdir, 'a')
 
4047
        name_A = osutils.pathjoin(tdir, 'A')
 
4048
        os.mkdir(name_a)
 
4049
        result = osutils.isdir(name_A)
 
4050
        _rmtree_temp_dir(tdir)
2624
4051
        return result
2625
4052
 
2626
 
    def adapt_test_to_scenario(self, test, scenario):
2627
 
        """Copy test and apply scenario to it.
2628
 
 
2629
 
        :param test: A test to adapt.
2630
 
        :param scenario: A tuple describing the scenarion.
2631
 
            The first element of the tuple is the new test id.
2632
 
            The second element is a dict containing attributes to set on the
2633
 
            test.
2634
 
        :return: The adapted test.
2635
 
        """
2636
 
        from copy import deepcopy
2637
 
        new_test = deepcopy(test)
2638
 
        for name, value in scenario[1].items():
2639
 
            setattr(new_test, name, value)
2640
 
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
2641
 
        new_test.id = lambda: new_id
2642
 
        return new_test
 
4053
    def feature_name(self):
 
4054
        return 'case-insensitive filesystem'
 
4055
 
 
4056
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
4057
 
 
4058
 
 
4059
class _SubUnitFeature(Feature):
 
4060
    """Check if subunit is available."""
 
4061
 
 
4062
    def _probe(self):
 
4063
        try:
 
4064
            import subunit
 
4065
            return True
 
4066
        except ImportError:
 
4067
            return False
 
4068
 
 
4069
    def feature_name(self):
 
4070
        return 'subunit'
 
4071
 
 
4072
SubUnitFeature = _SubUnitFeature()
 
4073
# Only define SubUnitBzrRunner if subunit is available.
 
4074
try:
 
4075
    from subunit import TestProtocolClient
 
4076
    try:
 
4077
        from subunit.test_results import AutoTimingTestResultDecorator
 
4078
    except ImportError:
 
4079
        AutoTimingTestResultDecorator = lambda x:x
 
4080
    class SubUnitBzrRunner(TextTestRunner):
 
4081
        def run(self, test):
 
4082
            result = AutoTimingTestResultDecorator(
 
4083
                TestProtocolClient(self.stream))
 
4084
            test.run(result)
 
4085
            return result
 
4086
except ImportError:
 
4087
    pass