~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

move reference material out of User Guide into User Reference

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import time
 
46
import trace
 
47
import unittest
 
48
import warnings
 
49
 
 
50
 
 
51
from bzrlib import (
 
52
    bzrdir,
 
53
    debug,
 
54
    errors,
 
55
    memorytree,
 
56
    osutils,
 
57
    progress,
 
58
    ui,
 
59
    urlutils,
 
60
    workingtree,
 
61
    )
 
62
import bzrlib.branch
 
63
import bzrlib.commands
 
64
import bzrlib.timestamp
 
65
import bzrlib.export
 
66
import bzrlib.inventory
 
67
import bzrlib.iterablefile
 
68
import bzrlib.lockdir
 
69
try:
 
70
    import bzrlib.lsprof
 
71
except ImportError:
 
72
    # lsprof not available
 
73
    pass
 
74
from bzrlib.merge import merge_inner
 
75
import bzrlib.merge3
 
76
import bzrlib.plugin
 
77
from bzrlib.revision import common_ancestor
 
78
import bzrlib.store
 
79
from bzrlib import symbol_versioning
 
80
from bzrlib.symbol_versioning import (
 
81
    deprecated_method,
 
82
    zero_ninetyone,
 
83
    zero_ninetytwo,
 
84
    )
 
85
import bzrlib.trace
 
86
from bzrlib.transport import get_transport
 
87
import bzrlib.transport
 
88
from bzrlib.transport.local import LocalURLServer
 
89
from bzrlib.transport.memory import MemoryServer
 
90
from bzrlib.transport.readonly import ReadonlyServer
 
91
from bzrlib.trace import mutter, note
 
92
from bzrlib.tests import TestUtil
 
93
from bzrlib.tests.HttpServer import HttpServer
 
94
from bzrlib.tests.TestUtil import (
 
95
                          TestSuite,
 
96
                          TestLoader,
 
97
                          )
 
98
from bzrlib.tests.EncodingAdapter import EncodingTestAdapter
 
99
from bzrlib.tests.treeshape import build_tree_contents
 
100
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
101
 
 
102
# Mark this python module as being part of the implementation
 
103
# of unittest: this gives us better tracebacks where the last
 
104
# shown frame is the test code, not our assertXYZ.
 
105
__unittest = 1
 
106
 
 
107
default_transport = LocalURLServer
 
108
 
 
109
MODULES_TO_TEST = []
 
110
MODULES_TO_DOCTEST = [
 
111
        bzrlib.timestamp,
 
112
        bzrlib.errors,
 
113
        bzrlib.export,
 
114
        bzrlib.inventory,
 
115
        bzrlib.iterablefile,
 
116
        bzrlib.lockdir,
 
117
        bzrlib.merge3,
 
118
        bzrlib.option,
 
119
        bzrlib.store,
 
120
        # quoted to avoid module-loading circularity
 
121
        'bzrlib.tests',
 
122
        ]
 
123
 
 
124
 
 
125
def packages_to_test():
 
126
    """Return a list of packages to test.
 
127
 
 
128
    The packages are not globally imported so that import failures are
 
129
    triggered when running selftest, not when importing the command.
 
130
    """
 
131
    import bzrlib.doc
 
132
    import bzrlib.tests.blackbox
 
133
    import bzrlib.tests.branch_implementations
 
134
    import bzrlib.tests.bzrdir_implementations
 
135
    import bzrlib.tests.commands
 
136
    import bzrlib.tests.interrepository_implementations
 
137
    import bzrlib.tests.interversionedfile_implementations
 
138
    import bzrlib.tests.intertree_implementations
 
139
    import bzrlib.tests.inventory_implementations
 
140
    import bzrlib.tests.per_lock
 
141
    import bzrlib.tests.repository_implementations
 
142
    import bzrlib.tests.revisionstore_implementations
 
143
    import bzrlib.tests.tree_implementations
 
144
    import bzrlib.tests.workingtree_implementations
 
145
    return [
 
146
            bzrlib.doc,
 
147
            bzrlib.tests.blackbox,
 
148
            bzrlib.tests.branch_implementations,
 
149
            bzrlib.tests.bzrdir_implementations,
 
150
            bzrlib.tests.commands,
 
151
            bzrlib.tests.interrepository_implementations,
 
152
            bzrlib.tests.interversionedfile_implementations,
 
153
            bzrlib.tests.intertree_implementations,
 
154
            bzrlib.tests.inventory_implementations,
 
155
            bzrlib.tests.per_lock,
 
156
            bzrlib.tests.repository_implementations,
 
157
            bzrlib.tests.revisionstore_implementations,
 
158
            bzrlib.tests.tree_implementations,
 
159
            bzrlib.tests.workingtree_implementations,
 
160
            ]
 
161
 
 
162
 
 
163
class ExtendedTestResult(unittest._TextTestResult):
 
164
    """Accepts, reports and accumulates the results of running tests.
 
165
 
 
166
    Compared to the unittest version this class adds support for
 
167
    profiling, benchmarking, stopping as soon as a test fails,  and
 
168
    skipping tests.  There are further-specialized subclasses for
 
169
    different types of display.
 
170
 
 
171
    When a test finishes, in whatever way, it calls one of the addSuccess,
 
172
    addFailure or addError classes.  These in turn may redirect to a more
 
173
    specific case for the special test results supported by our extended
 
174
    tests.
 
175
 
 
176
    Note that just one of these objects is fed the results from many tests.
 
177
    """
 
178
 
 
179
    stop_early = False
 
180
    
 
181
    def __init__(self, stream, descriptions, verbosity,
 
182
                 bench_history=None,
 
183
                 num_tests=None,
 
184
                 ):
 
185
        """Construct new TestResult.
 
186
 
 
187
        :param bench_history: Optionally, a writable file object to accumulate
 
188
            benchmark results.
 
189
        """
 
190
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
191
        if bench_history is not None:
 
192
            from bzrlib.version import _get_bzr_source_tree
 
193
            src_tree = _get_bzr_source_tree()
 
194
            if src_tree:
 
195
                try:
 
196
                    revision_id = src_tree.get_parent_ids()[0]
 
197
                except IndexError:
 
198
                    # XXX: if this is a brand new tree, do the same as if there
 
199
                    # is no branch.
 
200
                    revision_id = ''
 
201
            else:
 
202
                # XXX: If there's no branch, what should we do?
 
203
                revision_id = ''
 
204
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
205
        self._bench_history = bench_history
 
206
        self.ui = ui.ui_factory
 
207
        self.num_tests = num_tests
 
208
        self.error_count = 0
 
209
        self.failure_count = 0
 
210
        self.known_failure_count = 0
 
211
        self.skip_count = 0
 
212
        self.not_applicable_count = 0
 
213
        self.unsupported = {}
 
214
        self.count = 0
 
215
        self._overall_start_time = time.time()
 
216
    
 
217
    def _extractBenchmarkTime(self, testCase):
 
218
        """Add a benchmark time for the current test case."""
 
219
        return getattr(testCase, "_benchtime", None)
 
220
    
 
221
    def _elapsedTestTimeString(self):
 
222
        """Return a time string for the overall time the current test has taken."""
 
223
        return self._formatTime(time.time() - self._start_time)
 
224
 
 
225
    def _testTimeString(self, testCase):
 
226
        benchmark_time = self._extractBenchmarkTime(testCase)
 
227
        if benchmark_time is not None:
 
228
            return "%s/%s" % (
 
229
                self._formatTime(benchmark_time),
 
230
                self._elapsedTestTimeString())
 
231
        else:
 
232
            return "           %s" % self._elapsedTestTimeString()
 
233
 
 
234
    def _formatTime(self, seconds):
 
235
        """Format seconds as milliseconds with leading spaces."""
 
236
        # some benchmarks can take thousands of seconds to run, so we need 8
 
237
        # places
 
238
        return "%8dms" % (1000 * seconds)
 
239
 
 
240
    def _shortened_test_description(self, test):
 
241
        what = test.id()
 
242
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
243
        return what
 
244
 
 
245
    def startTest(self, test):
 
246
        unittest.TestResult.startTest(self, test)
 
247
        self.report_test_start(test)
 
248
        test.number = self.count
 
249
        self._recordTestStartTime()
 
250
 
 
251
    def _recordTestStartTime(self):
 
252
        """Record that a test has started."""
 
253
        self._start_time = time.time()
 
254
 
 
255
    def _cleanupLogFile(self, test):
 
256
        # We can only do this if we have one of our TestCases, not if
 
257
        # we have a doctest.
 
258
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
259
        if setKeepLogfile is not None:
 
260
            setKeepLogfile()
 
261
 
 
262
    def addError(self, test, err):
 
263
        """Tell result that test finished with an error.
 
264
 
 
265
        Called from the TestCase run() method when the test
 
266
        fails with an unexpected error.
 
267
        """
 
268
        self._testConcluded(test)
 
269
        if isinstance(err[1], TestSkipped):
 
270
            return self._addSkipped(test, err)
 
271
        elif isinstance(err[1], UnavailableFeature):
 
272
            return self.addNotSupported(test, err[1].args[0])
 
273
        else:
 
274
            unittest.TestResult.addError(self, test, err)
 
275
            self.error_count += 1
 
276
            self.report_error(test, err)
 
277
            if self.stop_early:
 
278
                self.stop()
 
279
 
 
280
    def addFailure(self, test, err):
 
281
        """Tell result that test failed.
 
282
 
 
283
        Called from the TestCase run() method when the test
 
284
        fails because e.g. an assert() method failed.
 
285
        """
 
286
        self._testConcluded(test)
 
287
        if isinstance(err[1], KnownFailure):
 
288
            return self._addKnownFailure(test, err)
 
289
        else:
 
290
            unittest.TestResult.addFailure(self, test, err)
 
291
            self.failure_count += 1
 
292
            self.report_failure(test, err)
 
293
            if self.stop_early:
 
294
                self.stop()
 
295
 
 
296
    def addSuccess(self, test):
 
297
        """Tell result that test completed successfully.
 
298
 
 
299
        Called from the TestCase run()
 
300
        """
 
301
        self._testConcluded(test)
 
302
        if self._bench_history is not None:
 
303
            benchmark_time = self._extractBenchmarkTime(test)
 
304
            if benchmark_time is not None:
 
305
                self._bench_history.write("%s %s\n" % (
 
306
                    self._formatTime(benchmark_time),
 
307
                    test.id()))
 
308
        self.report_success(test)
 
309
        unittest.TestResult.addSuccess(self, test)
 
310
 
 
311
    def _testConcluded(self, test):
 
312
        """Common code when a test has finished.
 
313
 
 
314
        Called regardless of whether it succeded, failed, etc.
 
315
        """
 
316
        self._cleanupLogFile(test)
 
317
 
 
318
    def _addKnownFailure(self, test, err):
 
319
        self.known_failure_count += 1
 
320
        self.report_known_failure(test, err)
 
321
 
 
322
    def addNotSupported(self, test, feature):
 
323
        """The test will not be run because of a missing feature.
 
324
        """
 
325
        # this can be called in two different ways: it may be that the
 
326
        # test started running, and then raised (through addError) 
 
327
        # UnavailableFeature.  Alternatively this method can be called
 
328
        # while probing for features before running the tests; in that
 
329
        # case we will see startTest and stopTest, but the test will never
 
330
        # actually run.
 
331
        self.unsupported.setdefault(str(feature), 0)
 
332
        self.unsupported[str(feature)] += 1
 
333
        self.report_unsupported(test, feature)
 
334
 
 
335
    def _addSkipped(self, test, skip_excinfo):
 
336
        if isinstance(skip_excinfo[1], TestNotApplicable):
 
337
            self.not_applicable_count += 1
 
338
            self.report_not_applicable(test, skip_excinfo)
 
339
        else:
 
340
            self.skip_count += 1
 
341
            self.report_skip(test, skip_excinfo)
 
342
        try:
 
343
            test.tearDown()
 
344
        except KeyboardInterrupt:
 
345
            raise
 
346
        except:
 
347
            self.addError(test, test._exc_info())
 
348
        else:
 
349
            # seems best to treat this as success from point-of-view of unittest
 
350
            # -- it actually does nothing so it barely matters :)
 
351
            unittest.TestResult.addSuccess(self, test)
 
352
 
 
353
    def printErrorList(self, flavour, errors):
 
354
        for test, err in errors:
 
355
            self.stream.writeln(self.separator1)
 
356
            self.stream.write("%s: " % flavour)
 
357
            self.stream.writeln(self.getDescription(test))
 
358
            if getattr(test, '_get_log', None) is not None:
 
359
                self.stream.write('\n')
 
360
                self.stream.write(
 
361
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
362
                self.stream.write('\n')
 
363
                self.stream.write(test._get_log())
 
364
                self.stream.write('\n')
 
365
                self.stream.write(
 
366
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
367
                self.stream.write('\n')
 
368
            self.stream.writeln(self.separator2)
 
369
            self.stream.writeln("%s" % err)
 
370
 
 
371
    def finished(self):
 
372
        pass
 
373
 
 
374
    def report_cleaning_up(self):
 
375
        pass
 
376
 
 
377
    def report_success(self, test):
 
378
        pass
 
379
 
 
380
    def wasStrictlySuccessful(self):
 
381
        if self.unsupported or self.known_failure_count:
 
382
            return False
 
383
        return self.wasSuccessful()
 
384
 
 
385
 
 
386
class TextTestResult(ExtendedTestResult):
 
387
    """Displays progress and results of tests in text form"""
 
388
 
 
389
    def __init__(self, stream, descriptions, verbosity,
 
390
                 bench_history=None,
 
391
                 num_tests=None,
 
392
                 pb=None,
 
393
                 ):
 
394
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
395
            bench_history, num_tests)
 
396
        if pb is None:
 
397
            self.pb = self.ui.nested_progress_bar()
 
398
            self._supplied_pb = False
 
399
        else:
 
400
            self.pb = pb
 
401
            self._supplied_pb = True
 
402
        self.pb.show_pct = False
 
403
        self.pb.show_spinner = False
 
404
        self.pb.show_eta = False,
 
405
        self.pb.show_count = False
 
406
        self.pb.show_bar = False
 
407
 
 
408
    def report_starting(self):
 
409
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
410
 
 
411
    def _progress_prefix_text(self):
 
412
        a = '[%d' % self.count
 
413
        if self.num_tests is not None:
 
414
            a +='/%d' % self.num_tests
 
415
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
416
        if self.error_count:
 
417
            a += ', %d errors' % self.error_count
 
418
        if self.failure_count:
 
419
            a += ', %d failed' % self.failure_count
 
420
        if self.known_failure_count:
 
421
            a += ', %d known failures' % self.known_failure_count
 
422
        if self.skip_count:
 
423
            a += ', %d skipped' % self.skip_count
 
424
        if self.unsupported:
 
425
            a += ', %d missing features' % len(self.unsupported)
 
426
        a += ']'
 
427
        return a
 
428
 
 
429
    def report_test_start(self, test):
 
430
        self.count += 1
 
431
        self.pb.update(
 
432
                self._progress_prefix_text()
 
433
                + ' ' 
 
434
                + self._shortened_test_description(test))
 
435
 
 
436
    def _test_description(self, test):
 
437
        return self._shortened_test_description(test)
 
438
 
 
439
    def report_error(self, test, err):
 
440
        self.pb.note('ERROR: %s\n    %s\n', 
 
441
            self._test_description(test),
 
442
            err[1],
 
443
            )
 
444
 
 
445
    def report_failure(self, test, err):
 
446
        self.pb.note('FAIL: %s\n    %s\n', 
 
447
            self._test_description(test),
 
448
            err[1],
 
449
            )
 
450
 
 
451
    def report_known_failure(self, test, err):
 
452
        self.pb.note('XFAIL: %s\n%s\n',
 
453
            self._test_description(test), err[1])
 
454
 
 
455
    def report_skip(self, test, skip_excinfo):
 
456
        pass
 
457
 
 
458
    def report_not_applicable(self, test, skip_excinfo):
 
459
        pass
 
460
 
 
461
    def report_unsupported(self, test, feature):
 
462
        """test cannot be run because feature is missing."""
 
463
                  
 
464
    def report_cleaning_up(self):
 
465
        self.pb.update('cleaning up...')
 
466
 
 
467
    def finished(self):
 
468
        if not self._supplied_pb:
 
469
            self.pb.finished()
 
470
 
 
471
 
 
472
class VerboseTestResult(ExtendedTestResult):
 
473
    """Produce long output, with one line per test run plus times"""
 
474
 
 
475
    def _ellipsize_to_right(self, a_string, final_width):
 
476
        """Truncate and pad a string, keeping the right hand side"""
 
477
        if len(a_string) > final_width:
 
478
            result = '...' + a_string[3-final_width:]
 
479
        else:
 
480
            result = a_string
 
481
        return result.ljust(final_width)
 
482
 
 
483
    def report_starting(self):
 
484
        self.stream.write('running %d tests...\n' % self.num_tests)
 
485
 
 
486
    def report_test_start(self, test):
 
487
        self.count += 1
 
488
        name = self._shortened_test_description(test)
 
489
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
490
        # numbers, plus a trailing blank
 
491
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
492
        self.stream.write(self._ellipsize_to_right(name,
 
493
                          osutils.terminal_width()-30))
 
494
        self.stream.flush()
 
495
 
 
496
    def _error_summary(self, err):
 
497
        indent = ' ' * 4
 
498
        return '%s%s' % (indent, err[1])
 
499
 
 
500
    def report_error(self, test, err):
 
501
        self.stream.writeln('ERROR %s\n%s'
 
502
                % (self._testTimeString(test),
 
503
                   self._error_summary(err)))
 
504
 
 
505
    def report_failure(self, test, err):
 
506
        self.stream.writeln(' FAIL %s\n%s'
 
507
                % (self._testTimeString(test),
 
508
                   self._error_summary(err)))
 
509
 
 
510
    def report_known_failure(self, test, err):
 
511
        self.stream.writeln('XFAIL %s\n%s'
 
512
                % (self._testTimeString(test),
 
513
                   self._error_summary(err)))
 
514
 
 
515
    def report_success(self, test):
 
516
        self.stream.writeln('   OK %s' % self._testTimeString(test))
 
517
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
518
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
519
            stats.pprint(file=self.stream)
 
520
        # flush the stream so that we get smooth output. This verbose mode is
 
521
        # used to show the output in PQM.
 
522
        self.stream.flush()
 
523
 
 
524
    def report_skip(self, test, skip_excinfo):
 
525
        self.stream.writeln(' SKIP %s\n%s'
 
526
                % (self._testTimeString(test),
 
527
                   self._error_summary(skip_excinfo)))
 
528
 
 
529
    def report_not_applicable(self, test, skip_excinfo):
 
530
        self.stream.writeln('  N/A %s\n%s'
 
531
                % (self._testTimeString(test),
 
532
                   self._error_summary(skip_excinfo)))
 
533
 
 
534
    def report_unsupported(self, test, feature):
 
535
        """test cannot be run because feature is missing."""
 
536
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
537
                %(self._testTimeString(test), feature))
 
538
 
 
539
 
 
540
class TextTestRunner(object):
 
541
    stop_on_failure = False
 
542
 
 
543
    def __init__(self,
 
544
                 stream=sys.stderr,
 
545
                 descriptions=0,
 
546
                 verbosity=1,
 
547
                 bench_history=None,
 
548
                 list_only=False
 
549
                 ):
 
550
        self.stream = unittest._WritelnDecorator(stream)
 
551
        self.descriptions = descriptions
 
552
        self.verbosity = verbosity
 
553
        self._bench_history = bench_history
 
554
        self.list_only = list_only
 
555
 
 
556
    def run(self, test):
 
557
        "Run the given test case or test suite."
 
558
        startTime = time.time()
 
559
        if self.verbosity == 1:
 
560
            result_class = TextTestResult
 
561
        elif self.verbosity >= 2:
 
562
            result_class = VerboseTestResult
 
563
        result = result_class(self.stream,
 
564
                              self.descriptions,
 
565
                              self.verbosity,
 
566
                              bench_history=self._bench_history,
 
567
                              num_tests=test.countTestCases(),
 
568
                              )
 
569
        result.stop_early = self.stop_on_failure
 
570
        result.report_starting()
 
571
        if self.list_only:
 
572
            if self.verbosity >= 2:
 
573
                self.stream.writeln("Listing tests only ...\n")
 
574
            run = 0
 
575
            for t in iter_suite_tests(test):
 
576
                self.stream.writeln("%s" % (t.id()))
 
577
                run += 1
 
578
            actionTaken = "Listed"
 
579
        else: 
 
580
            test.run(result)
 
581
            run = result.testsRun
 
582
            actionTaken = "Ran"
 
583
        stopTime = time.time()
 
584
        timeTaken = stopTime - startTime
 
585
        result.printErrors()
 
586
        self.stream.writeln(result.separator2)
 
587
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
588
                            run, run != 1 and "s" or "", timeTaken))
 
589
        self.stream.writeln()
 
590
        if not result.wasSuccessful():
 
591
            self.stream.write("FAILED (")
 
592
            failed, errored = map(len, (result.failures, result.errors))
 
593
            if failed:
 
594
                self.stream.write("failures=%d" % failed)
 
595
            if errored:
 
596
                if failed: self.stream.write(", ")
 
597
                self.stream.write("errors=%d" % errored)
 
598
            if result.known_failure_count:
 
599
                if failed or errored: self.stream.write(", ")
 
600
                self.stream.write("known_failure_count=%d" %
 
601
                    result.known_failure_count)
 
602
            self.stream.writeln(")")
 
603
        else:
 
604
            if result.known_failure_count:
 
605
                self.stream.writeln("OK (known_failures=%d)" %
 
606
                    result.known_failure_count)
 
607
            else:
 
608
                self.stream.writeln("OK")
 
609
        if result.skip_count > 0:
 
610
            skipped = result.skip_count
 
611
            self.stream.writeln('%d test%s skipped' %
 
612
                                (skipped, skipped != 1 and "s" or ""))
 
613
        if result.unsupported:
 
614
            for feature, count in sorted(result.unsupported.items()):
 
615
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
616
                    (feature, count))
 
617
        result.finished()
 
618
        return result
 
619
 
 
620
 
 
621
def iter_suite_tests(suite):
 
622
    """Return all tests in a suite, recursing through nested suites"""
 
623
    for item in suite._tests:
 
624
        if isinstance(item, unittest.TestCase):
 
625
            yield item
 
626
        elif isinstance(item, unittest.TestSuite):
 
627
            for r in iter_suite_tests(item):
 
628
                yield r
 
629
        else:
 
630
            raise Exception('unknown object %r inside test suite %r'
 
631
                            % (item, suite))
 
632
 
 
633
 
 
634
class TestSkipped(Exception):
 
635
    """Indicates that a test was intentionally skipped, rather than failing."""
 
636
 
 
637
 
 
638
class TestNotApplicable(TestSkipped):
 
639
    """A test is not applicable to the situation where it was run.
 
640
 
 
641
    This is only normally raised by parameterized tests, if they find that 
 
642
    the instance they're constructed upon does not support one aspect 
 
643
    of its interface.
 
644
    """
 
645
 
 
646
 
 
647
class KnownFailure(AssertionError):
 
648
    """Indicates that a test failed in a precisely expected manner.
 
649
 
 
650
    Such failures dont block the whole test suite from passing because they are
 
651
    indicators of partially completed code or of future work. We have an
 
652
    explicit error for them so that we can ensure that they are always visible:
 
653
    KnownFailures are always shown in the output of bzr selftest.
 
654
    """
 
655
 
 
656
 
 
657
class UnavailableFeature(Exception):
 
658
    """A feature required for this test was not available.
 
659
 
 
660
    The feature should be used to construct the exception.
 
661
    """
 
662
 
 
663
 
 
664
class CommandFailed(Exception):
 
665
    pass
 
666
 
 
667
 
 
668
class StringIOWrapper(object):
 
669
    """A wrapper around cStringIO which just adds an encoding attribute.
 
670
    
 
671
    Internally we can check sys.stdout to see what the output encoding
 
672
    should be. However, cStringIO has no encoding attribute that we can
 
673
    set. So we wrap it instead.
 
674
    """
 
675
    encoding='ascii'
 
676
    _cstring = None
 
677
 
 
678
    def __init__(self, s=None):
 
679
        if s is not None:
 
680
            self.__dict__['_cstring'] = StringIO(s)
 
681
        else:
 
682
            self.__dict__['_cstring'] = StringIO()
 
683
 
 
684
    def __getattr__(self, name, getattr=getattr):
 
685
        return getattr(self.__dict__['_cstring'], name)
 
686
 
 
687
    def __setattr__(self, name, val):
 
688
        if name == 'encoding':
 
689
            self.__dict__['encoding'] = val
 
690
        else:
 
691
            return setattr(self._cstring, name, val)
 
692
 
 
693
 
 
694
class TestUIFactory(ui.CLIUIFactory):
 
695
    """A UI Factory for testing.
 
696
 
 
697
    Hide the progress bar but emit note()s.
 
698
    Redirect stdin.
 
699
    Allows get_password to be tested without real tty attached.
 
700
    """
 
701
 
 
702
    def __init__(self,
 
703
                 stdout=None,
 
704
                 stderr=None,
 
705
                 stdin=None):
 
706
        super(TestUIFactory, self).__init__()
 
707
        if stdin is not None:
 
708
            # We use a StringIOWrapper to be able to test various
 
709
            # encodings, but the user is still responsible to
 
710
            # encode the string and to set the encoding attribute
 
711
            # of StringIOWrapper.
 
712
            self.stdin = StringIOWrapper(stdin)
 
713
        if stdout is None:
 
714
            self.stdout = sys.stdout
 
715
        else:
 
716
            self.stdout = stdout
 
717
        if stderr is None:
 
718
            self.stderr = sys.stderr
 
719
        else:
 
720
            self.stderr = stderr
 
721
 
 
722
    def clear(self):
 
723
        """See progress.ProgressBar.clear()."""
 
724
 
 
725
    def clear_term(self):
 
726
        """See progress.ProgressBar.clear_term()."""
 
727
 
 
728
    def clear_term(self):
 
729
        """See progress.ProgressBar.clear_term()."""
 
730
 
 
731
    def finished(self):
 
732
        """See progress.ProgressBar.finished()."""
 
733
 
 
734
    def note(self, fmt_string, *args, **kwargs):
 
735
        """See progress.ProgressBar.note()."""
 
736
        self.stdout.write((fmt_string + "\n") % args)
 
737
 
 
738
    def progress_bar(self):
 
739
        return self
 
740
 
 
741
    def nested_progress_bar(self):
 
742
        return self
 
743
 
 
744
    def update(self, message, count=None, total=None):
 
745
        """See progress.ProgressBar.update()."""
 
746
 
 
747
    def get_non_echoed_password(self, prompt):
 
748
        """Get password from stdin without trying to handle the echo mode"""
 
749
        if prompt:
 
750
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
751
        password = self.stdin.readline()
 
752
        if not password:
 
753
            raise EOFError
 
754
        if password[-1] == '\n':
 
755
            password = password[:-1]
 
756
        return password
 
757
 
 
758
 
 
759
class TestCase(unittest.TestCase):
 
760
    """Base class for bzr unit tests.
 
761
    
 
762
    Tests that need access to disk resources should subclass 
 
763
    TestCaseInTempDir not TestCase.
 
764
 
 
765
    Error and debug log messages are redirected from their usual
 
766
    location into a temporary file, the contents of which can be
 
767
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
768
    so that it can also capture file IO.  When the test completes this file
 
769
    is read into memory and removed from disk.
 
770
       
 
771
    There are also convenience functions to invoke bzr's command-line
 
772
    routine, and to build and check bzr trees.
 
773
   
 
774
    In addition to the usual method of overriding tearDown(), this class also
 
775
    allows subclasses to register functions into the _cleanups list, which is
 
776
    run in order as the object is torn down.  It's less likely this will be
 
777
    accidentally overlooked.
 
778
    """
 
779
 
 
780
    _log_file_name = None
 
781
    _log_contents = ''
 
782
    _keep_log_file = False
 
783
    # record lsprof data when performing benchmark calls.
 
784
    _gather_lsprof_in_benchmarks = False
 
785
 
 
786
    def __init__(self, methodName='testMethod'):
 
787
        super(TestCase, self).__init__(methodName)
 
788
        self._cleanups = []
 
789
 
 
790
    def setUp(self):
 
791
        unittest.TestCase.setUp(self)
 
792
        self._cleanEnvironment()
 
793
        bzrlib.trace.disable_default_logging()
 
794
        self._silenceUI()
 
795
        self._startLogFile()
 
796
        self._benchcalls = []
 
797
        self._benchtime = None
 
798
        self._clear_hooks()
 
799
        self._clear_debug_flags()
 
800
 
 
801
    def _clear_debug_flags(self):
 
802
        """Prevent externally set debug flags affecting tests.
 
803
        
 
804
        Tests that want to use debug flags can just set them in the
 
805
        debug_flags set during setup/teardown.
 
806
        """
 
807
        self._preserved_debug_flags = set(debug.debug_flags)
 
808
        debug.debug_flags.clear()
 
809
        self.addCleanup(self._restore_debug_flags)
 
810
 
 
811
    def _clear_hooks(self):
 
812
        # prevent hooks affecting tests
 
813
        import bzrlib.branch
 
814
        import bzrlib.smart.server
 
815
        self._preserved_hooks = {
 
816
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
817
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
818
            }
 
819
        self.addCleanup(self._restoreHooks)
 
820
        # reset all hooks to an empty instance of the appropriate type
 
821
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
822
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
823
 
 
824
    def _silenceUI(self):
 
825
        """Turn off UI for duration of test"""
 
826
        # by default the UI is off; tests can turn it on if they want it.
 
827
        saved = ui.ui_factory
 
828
        def _restore():
 
829
            ui.ui_factory = saved
 
830
        ui.ui_factory = ui.SilentUIFactory()
 
831
        self.addCleanup(_restore)
 
832
 
 
833
    def _ndiff_strings(self, a, b):
 
834
        """Return ndiff between two strings containing lines.
 
835
        
 
836
        A trailing newline is added if missing to make the strings
 
837
        print properly."""
 
838
        if b and b[-1] != '\n':
 
839
            b += '\n'
 
840
        if a and a[-1] != '\n':
 
841
            a += '\n'
 
842
        difflines = difflib.ndiff(a.splitlines(True),
 
843
                                  b.splitlines(True),
 
844
                                  linejunk=lambda x: False,
 
845
                                  charjunk=lambda x: False)
 
846
        return ''.join(difflines)
 
847
 
 
848
    def assertEqual(self, a, b, message=''):
 
849
        try:
 
850
            if a == b:
 
851
                return
 
852
        except UnicodeError, e:
 
853
            # If we can't compare without getting a UnicodeError, then
 
854
            # obviously they are different
 
855
            mutter('UnicodeError: %s', e)
 
856
        if message:
 
857
            message += '\n'
 
858
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
859
            % (message,
 
860
               pformat(a), pformat(b)))
 
861
 
 
862
    assertEquals = assertEqual
 
863
 
 
864
    def assertEqualDiff(self, a, b, message=None):
 
865
        """Assert two texts are equal, if not raise an exception.
 
866
        
 
867
        This is intended for use with multi-line strings where it can 
 
868
        be hard to find the differences by eye.
 
869
        """
 
870
        # TODO: perhaps override assertEquals to call this for strings?
 
871
        if a == b:
 
872
            return
 
873
        if message is None:
 
874
            message = "texts not equal:\n"
 
875
        raise AssertionError(message +
 
876
                             self._ndiff_strings(a, b))
 
877
        
 
878
    def assertEqualMode(self, mode, mode_test):
 
879
        self.assertEqual(mode, mode_test,
 
880
                         'mode mismatch %o != %o' % (mode, mode_test))
 
881
 
 
882
    def assertPositive(self, val):
 
883
        """Assert that val is greater than 0."""
 
884
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
885
 
 
886
    def assertNegative(self, val):
 
887
        """Assert that val is less than 0."""
 
888
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
889
 
 
890
    def assertStartsWith(self, s, prefix):
 
891
        if not s.startswith(prefix):
 
892
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
893
 
 
894
    def assertEndsWith(self, s, suffix):
 
895
        """Asserts that s ends with suffix."""
 
896
        if not s.endswith(suffix):
 
897
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
898
 
 
899
    def assertContainsRe(self, haystack, needle_re):
 
900
        """Assert that a contains something matching a regular expression."""
 
901
        if not re.search(needle_re, haystack):
 
902
            if '\n' in haystack or len(haystack) > 60:
 
903
                # a long string, format it in a more readable way
 
904
                raise AssertionError(
 
905
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
906
                        % (needle_re, haystack))
 
907
            else:
 
908
                raise AssertionError('pattern "%s" not found in "%s"'
 
909
                        % (needle_re, haystack))
 
910
 
 
911
    def assertNotContainsRe(self, haystack, needle_re):
 
912
        """Assert that a does not match a regular expression"""
 
913
        if re.search(needle_re, haystack):
 
914
            raise AssertionError('pattern "%s" found in "%s"'
 
915
                    % (needle_re, haystack))
 
916
 
 
917
    def assertSubset(self, sublist, superlist):
 
918
        """Assert that every entry in sublist is present in superlist."""
 
919
        missing = set(sublist) - set(superlist)
 
920
        if len(missing) > 0:
 
921
            raise AssertionError("value(s) %r not present in container %r" %
 
922
                                 (missing, superlist))
 
923
 
 
924
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
925
        """Fail unless excClass is raised when the iterator from func is used.
 
926
        
 
927
        Many functions can return generators this makes sure
 
928
        to wrap them in a list() call to make sure the whole generator
 
929
        is run, and that the proper exception is raised.
 
930
        """
 
931
        try:
 
932
            list(func(*args, **kwargs))
 
933
        except excClass:
 
934
            return
 
935
        else:
 
936
            if getattr(excClass,'__name__', None) is not None:
 
937
                excName = excClass.__name__
 
938
            else:
 
939
                excName = str(excClass)
 
940
            raise self.failureException, "%s not raised" % excName
 
941
 
 
942
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
943
        """Assert that a callable raises a particular exception.
 
944
 
 
945
        :param excClass: As for the except statement, this may be either an
 
946
            exception class, or a tuple of classes.
 
947
        :param callableObj: A callable, will be passed ``*args`` and
 
948
            ``**kwargs``.
 
949
 
 
950
        Returns the exception so that you can examine it.
 
951
        """
 
952
        try:
 
953
            callableObj(*args, **kwargs)
 
954
        except excClass, e:
 
955
            return e
 
956
        else:
 
957
            if getattr(excClass,'__name__', None) is not None:
 
958
                excName = excClass.__name__
 
959
            else:
 
960
                # probably a tuple
 
961
                excName = str(excClass)
 
962
            raise self.failureException, "%s not raised" % excName
 
963
 
 
964
    def assertIs(self, left, right, message=None):
 
965
        if not (left is right):
 
966
            if message is not None:
 
967
                raise AssertionError(message)
 
968
            else:
 
969
                raise AssertionError("%r is not %r." % (left, right))
 
970
 
 
971
    def assertIsNot(self, left, right, message=None):
 
972
        if (left is right):
 
973
            if message is not None:
 
974
                raise AssertionError(message)
 
975
            else:
 
976
                raise AssertionError("%r is %r." % (left, right))
 
977
 
 
978
    def assertTransportMode(self, transport, path, mode):
 
979
        """Fail if a path does not have mode mode.
 
980
        
 
981
        If modes are not supported on this transport, the assertion is ignored.
 
982
        """
 
983
        if not transport._can_roundtrip_unix_modebits():
 
984
            return
 
985
        path_stat = transport.stat(path)
 
986
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
987
        self.assertEqual(mode, actual_mode,
 
988
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
989
 
 
990
    def assertIsSameRealPath(self, path1, path2):
 
991
        """Fail if path1 and path2 points to different files"""
 
992
        self.assertEqual(osutils.realpath(path1),
 
993
                         osutils.realpath(path2),
 
994
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
 
995
 
 
996
    def assertIsInstance(self, obj, kls):
 
997
        """Fail if obj is not an instance of kls"""
 
998
        if not isinstance(obj, kls):
 
999
            self.fail("%r is an instance of %s rather than %s" % (
 
1000
                obj, obj.__class__, kls))
 
1001
 
 
1002
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1003
        """Invoke a test, expecting it to fail for the given reason.
 
1004
 
 
1005
        This is for assertions that ought to succeed, but currently fail.
 
1006
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1007
        about the failure you're expecting.  If a new bug is introduced,
 
1008
        AssertionError should be raised, not KnownFailure.
 
1009
 
 
1010
        Frequently, expectFailure should be followed by an opposite assertion.
 
1011
        See example below.
 
1012
 
 
1013
        Intended to be used with a callable that raises AssertionError as the
 
1014
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1015
 
 
1016
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1017
        test succeeds.
 
1018
 
 
1019
        example usage::
 
1020
 
 
1021
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1022
                             dynamic_val)
 
1023
          self.assertEqual(42, dynamic_val)
 
1024
 
 
1025
          This means that a dynamic_val of 54 will cause the test to raise
 
1026
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1027
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1028
          than 54 or 42 will cause an AssertionError.
 
1029
        """
 
1030
        try:
 
1031
            assertion(*args, **kwargs)
 
1032
        except AssertionError:
 
1033
            raise KnownFailure(reason)
 
1034
        else:
 
1035
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1036
 
 
1037
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
 
1038
        """A helper for callDeprecated and applyDeprecated.
 
1039
 
 
1040
        :param a_callable: A callable to call.
 
1041
        :param args: The positional arguments for the callable
 
1042
        :param kwargs: The keyword arguments for the callable
 
1043
        :return: A tuple (warnings, result). result is the result of calling
 
1044
            a_callable(``*args``, ``**kwargs``).
 
1045
        """
 
1046
        local_warnings = []
 
1047
        def capture_warnings(msg, cls=None, stacklevel=None):
 
1048
            # we've hooked into a deprecation specific callpath,
 
1049
            # only deprecations should getting sent via it.
 
1050
            self.assertEqual(cls, DeprecationWarning)
 
1051
            local_warnings.append(msg)
 
1052
        original_warning_method = symbol_versioning.warn
 
1053
        symbol_versioning.set_warning_method(capture_warnings)
 
1054
        try:
 
1055
            result = a_callable(*args, **kwargs)
 
1056
        finally:
 
1057
            symbol_versioning.set_warning_method(original_warning_method)
 
1058
        return (local_warnings, result)
 
1059
 
 
1060
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
1061
        """Call a deprecated callable without warning the user.
 
1062
 
 
1063
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1064
        not other callers that go direct to the warning module.
 
1065
 
 
1066
        To test that a deprecated method raises an error, do something like
 
1067
        this::
 
1068
 
 
1069
        self.assertRaises(errors.ReservedId,
 
1070
            self.applyDeprecated, zero_ninetyone,
 
1071
                br.append_revision, 'current:')
 
1072
 
 
1073
        :param deprecation_format: The deprecation format that the callable
 
1074
            should have been deprecated with. This is the same type as the
 
1075
            parameter to deprecated_method/deprecated_function. If the
 
1076
            callable is not deprecated with this format, an assertion error
 
1077
            will be raised.
 
1078
        :param a_callable: A callable to call. This may be a bound method or
 
1079
            a regular function. It will be called with ``*args`` and
 
1080
            ``**kwargs``.
 
1081
        :param args: The positional arguments for the callable
 
1082
        :param kwargs: The keyword arguments for the callable
 
1083
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1084
        """
 
1085
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
 
1086
            *args, **kwargs)
 
1087
        expected_first_warning = symbol_versioning.deprecation_string(
 
1088
            a_callable, deprecation_format)
 
1089
        if len(call_warnings) == 0:
 
1090
            self.fail("No deprecation warning generated by call to %s" %
 
1091
                a_callable)
 
1092
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1093
        return result
 
1094
 
 
1095
    def callCatchWarnings(self, fn, *args, **kw):
 
1096
        """Call a callable that raises python warnings.
 
1097
 
 
1098
        The caller's responsible for examining the returned warnings.
 
1099
 
 
1100
        If the callable raises an exception, the exception is not
 
1101
        caught and propagates up to the caller.  In that case, the list
 
1102
        of warnings is not available.
 
1103
 
 
1104
        :returns: ([warning_object, ...], fn_result)
 
1105
        """
 
1106
        # XXX: This is not perfect, because it completely overrides the
 
1107
        # warnings filters, and some code may depend on suppressing particular
 
1108
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
 
1109
        # though.  -- Andrew, 20071062
 
1110
        wlist = []
 
1111
        def _catcher(message, category, filename, lineno, file=None):
 
1112
            # despite the name, 'message' is normally(?) a Warning subclass
 
1113
            # instance
 
1114
            wlist.append(message)
 
1115
        saved_showwarning = warnings.showwarning
 
1116
        saved_filters = warnings.filters
 
1117
        try:
 
1118
            warnings.showwarning = _catcher
 
1119
            warnings.filters = []
 
1120
            result = fn(*args, **kw)
 
1121
        finally:
 
1122
            warnings.showwarning = saved_showwarning
 
1123
            warnings.filters = saved_filters
 
1124
        return wlist, result
 
1125
 
 
1126
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1127
        """Assert that a callable is deprecated in a particular way.
 
1128
 
 
1129
        This is a very precise test for unusual requirements. The 
 
1130
        applyDeprecated helper function is probably more suited for most tests
 
1131
        as it allows you to simply specify the deprecation format being used
 
1132
        and will ensure that that is issued for the function being called.
 
1133
 
 
1134
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1135
        not other callers that go direct to the warning module.  To catch
 
1136
        general warnings, use callCatchWarnings.
 
1137
 
 
1138
        :param expected: a list of the deprecation warnings expected, in order
 
1139
        :param callable: The callable to call
 
1140
        :param args: The positional arguments for the callable
 
1141
        :param kwargs: The keyword arguments for the callable
 
1142
        """
 
1143
        call_warnings, result = self._capture_deprecation_warnings(callable,
 
1144
            *args, **kwargs)
 
1145
        self.assertEqual(expected, call_warnings)
 
1146
        return result
 
1147
 
 
1148
    def _startLogFile(self):
 
1149
        """Send bzr and test log messages to a temporary file.
 
1150
 
 
1151
        The file is removed as the test is torn down.
 
1152
        """
 
1153
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1154
        self._log_file = os.fdopen(fileno, 'w+')
 
1155
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1156
        self._log_file_name = name
 
1157
        self.addCleanup(self._finishLogFile)
 
1158
 
 
1159
    def _finishLogFile(self):
 
1160
        """Finished with the log file.
 
1161
 
 
1162
        Close the file and delete it, unless setKeepLogfile was called.
 
1163
        """
 
1164
        if self._log_file is None:
 
1165
            return
 
1166
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1167
        self._log_file.close()
 
1168
        self._log_file = None
 
1169
        if not self._keep_log_file:
 
1170
            os.remove(self._log_file_name)
 
1171
            self._log_file_name = None
 
1172
 
 
1173
    def setKeepLogfile(self):
 
1174
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1175
        self._keep_log_file = True
 
1176
 
 
1177
    def addCleanup(self, callable):
 
1178
        """Arrange to run a callable when this case is torn down.
 
1179
 
 
1180
        Callables are run in the reverse of the order they are registered, 
 
1181
        ie last-in first-out.
 
1182
        """
 
1183
        if callable in self._cleanups:
 
1184
            raise ValueError("cleanup function %r already registered on %s" 
 
1185
                    % (callable, self))
 
1186
        self._cleanups.append(callable)
 
1187
 
 
1188
    def _cleanEnvironment(self):
 
1189
        new_env = {
 
1190
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1191
            'HOME': os.getcwd(),
 
1192
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1193
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
 
1194
            'BZR_EMAIL': None,
 
1195
            'BZREMAIL': None, # may still be present in the environment
 
1196
            'EMAIL': None,
 
1197
            'BZR_PROGRESS_BAR': None,
 
1198
            # SSH Agent
 
1199
            'SSH_AUTH_SOCK': None,
 
1200
            # Proxies
 
1201
            'http_proxy': None,
 
1202
            'HTTP_PROXY': None,
 
1203
            'https_proxy': None,
 
1204
            'HTTPS_PROXY': None,
 
1205
            'no_proxy': None,
 
1206
            'NO_PROXY': None,
 
1207
            'all_proxy': None,
 
1208
            'ALL_PROXY': None,
 
1209
            # Nobody cares about these ones AFAIK. So far at
 
1210
            # least. If you do (care), please update this comment
 
1211
            # -- vila 20061212
 
1212
            'ftp_proxy': None,
 
1213
            'FTP_PROXY': None,
 
1214
            'BZR_REMOTE_PATH': None,
 
1215
        }
 
1216
        self.__old_env = {}
 
1217
        self.addCleanup(self._restoreEnvironment)
 
1218
        for name, value in new_env.iteritems():
 
1219
            self._captureVar(name, value)
 
1220
 
 
1221
    def _captureVar(self, name, newvalue):
 
1222
        """Set an environment variable, and reset it when finished."""
 
1223
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1224
 
 
1225
    def _restore_debug_flags(self):
 
1226
        debug.debug_flags.clear()
 
1227
        debug.debug_flags.update(self._preserved_debug_flags)
 
1228
 
 
1229
    def _restoreEnvironment(self):
 
1230
        for name, value in self.__old_env.iteritems():
 
1231
            osutils.set_or_unset_env(name, value)
 
1232
 
 
1233
    def _restoreHooks(self):
 
1234
        for klass, hooks in self._preserved_hooks.items():
 
1235
            setattr(klass, 'hooks', hooks)
 
1236
 
 
1237
    def knownFailure(self, reason):
 
1238
        """This test has failed for some known reason."""
 
1239
        raise KnownFailure(reason)
 
1240
 
 
1241
    def run(self, result=None):
 
1242
        if result is None: result = self.defaultTestResult()
 
1243
        for feature in getattr(self, '_test_needs_features', []):
 
1244
            if not feature.available():
 
1245
                result.startTest(self)
 
1246
                if getattr(result, 'addNotSupported', None):
 
1247
                    result.addNotSupported(self, feature)
 
1248
                else:
 
1249
                    result.addSuccess(self)
 
1250
                result.stopTest(self)
 
1251
                return
 
1252
        return unittest.TestCase.run(self, result)
 
1253
 
 
1254
    def tearDown(self):
 
1255
        self._runCleanups()
 
1256
        unittest.TestCase.tearDown(self)
 
1257
 
 
1258
    def time(self, callable, *args, **kwargs):
 
1259
        """Run callable and accrue the time it takes to the benchmark time.
 
1260
        
 
1261
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1262
        this will cause lsprofile statistics to be gathered and stored in
 
1263
        self._benchcalls.
 
1264
        """
 
1265
        if self._benchtime is None:
 
1266
            self._benchtime = 0
 
1267
        start = time.time()
 
1268
        try:
 
1269
            if not self._gather_lsprof_in_benchmarks:
 
1270
                return callable(*args, **kwargs)
 
1271
            else:
 
1272
                # record this benchmark
 
1273
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1274
                stats.sort()
 
1275
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1276
                return ret
 
1277
        finally:
 
1278
            self._benchtime += time.time() - start
 
1279
 
 
1280
    def _runCleanups(self):
 
1281
        """Run registered cleanup functions. 
 
1282
 
 
1283
        This should only be called from TestCase.tearDown.
 
1284
        """
 
1285
        # TODO: Perhaps this should keep running cleanups even if 
 
1286
        # one of them fails?
 
1287
 
 
1288
        # Actually pop the cleanups from the list so tearDown running
 
1289
        # twice is safe (this happens for skipped tests).
 
1290
        while self._cleanups:
 
1291
            self._cleanups.pop()()
 
1292
 
 
1293
    def log(self, *args):
 
1294
        mutter(*args)
 
1295
 
 
1296
    def _get_log(self, keep_log_file=False):
 
1297
        """Get the log from bzrlib.trace calls from this test.
 
1298
 
 
1299
        :param keep_log_file: When True, if the log is still a file on disk
 
1300
            leave it as a file on disk. When False, if the log is still a file
 
1301
            on disk, the log file is deleted and the log preserved as
 
1302
            self._log_contents.
 
1303
        :return: A string containing the log.
 
1304
        """
 
1305
        # flush the log file, to get all content
 
1306
        import bzrlib.trace
 
1307
        bzrlib.trace._trace_file.flush()
 
1308
        if self._log_contents:
 
1309
            return self._log_contents
 
1310
        if self._log_file_name is not None:
 
1311
            logfile = open(self._log_file_name)
 
1312
            try:
 
1313
                log_contents = logfile.read()
 
1314
            finally:
 
1315
                logfile.close()
 
1316
            if not keep_log_file:
 
1317
                self._log_contents = log_contents
 
1318
                try:
 
1319
                    os.remove(self._log_file_name)
 
1320
                except OSError, e:
 
1321
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1322
                        sys.stderr.write(('Unable to delete log file '
 
1323
                                             ' %r\n' % self._log_file_name))
 
1324
                    else:
 
1325
                        raise
 
1326
            return log_contents
 
1327
        else:
 
1328
            return "DELETED log file to reduce memory footprint"
 
1329
 
 
1330
    def requireFeature(self, feature):
 
1331
        """This test requires a specific feature is available.
 
1332
 
 
1333
        :raises UnavailableFeature: When feature is not available.
 
1334
        """
 
1335
        if not feature.available():
 
1336
            raise UnavailableFeature(feature)
 
1337
 
 
1338
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1339
            working_dir):
 
1340
        """Run bazaar command line, splitting up a string command line."""
 
1341
        if isinstance(args, basestring):
 
1342
            # shlex don't understand unicode strings,
 
1343
            # so args should be plain string (bialix 20070906)
 
1344
            args = list(shlex.split(str(args)))
 
1345
        return self._run_bzr_core(args, retcode=retcode,
 
1346
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1347
                )
 
1348
 
 
1349
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1350
            working_dir):
 
1351
        if encoding is None:
 
1352
            encoding = bzrlib.user_encoding
 
1353
        stdout = StringIOWrapper()
 
1354
        stderr = StringIOWrapper()
 
1355
        stdout.encoding = encoding
 
1356
        stderr.encoding = encoding
 
1357
 
 
1358
        self.log('run bzr: %r', args)
 
1359
        # FIXME: don't call into logging here
 
1360
        handler = logging.StreamHandler(stderr)
 
1361
        handler.setLevel(logging.INFO)
 
1362
        logger = logging.getLogger('')
 
1363
        logger.addHandler(handler)
 
1364
        old_ui_factory = ui.ui_factory
 
1365
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1366
 
 
1367
        cwd = None
 
1368
        if working_dir is not None:
 
1369
            cwd = osutils.getcwd()
 
1370
            os.chdir(working_dir)
 
1371
 
 
1372
        try:
 
1373
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1374
                stdout, stderr,
 
1375
                bzrlib.commands.run_bzr_catch_user_errors,
 
1376
                args)
 
1377
        finally:
 
1378
            logger.removeHandler(handler)
 
1379
            ui.ui_factory = old_ui_factory
 
1380
            if cwd is not None:
 
1381
                os.chdir(cwd)
 
1382
 
 
1383
        out = stdout.getvalue()
 
1384
        err = stderr.getvalue()
 
1385
        if out:
 
1386
            self.log('output:\n%r', out)
 
1387
        if err:
 
1388
            self.log('errors:\n%r', err)
 
1389
        if retcode is not None:
 
1390
            self.assertEquals(retcode, result,
 
1391
                              message='Unexpected return code')
 
1392
        return out, err
 
1393
 
 
1394
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
 
1395
                working_dir=None, error_regexes=[], output_encoding=None):
 
1396
        """Invoke bzr, as if it were run from the command line.
 
1397
 
 
1398
        The argument list should not include the bzr program name - the
 
1399
        first argument is normally the bzr command.  Arguments may be
 
1400
        passed in three ways:
 
1401
 
 
1402
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1403
        when the command contains whitespace or metacharacters, or 
 
1404
        is built up at run time.
 
1405
 
 
1406
        2- A single string, eg "add a".  This is the most convenient 
 
1407
        for hardcoded commands.
 
1408
 
 
1409
        This runs bzr through the interface that catches and reports
 
1410
        errors, and with logging set to something approximating the
 
1411
        default, so that error reporting can be checked.
 
1412
 
 
1413
        This should be the main method for tests that want to exercise the
 
1414
        overall behavior of the bzr application (rather than a unit test
 
1415
        or a functional test of the library.)
 
1416
 
 
1417
        This sends the stdout/stderr results into the test's log,
 
1418
        where it may be useful for debugging.  See also run_captured.
 
1419
 
 
1420
        :keyword stdin: A string to be used as stdin for the command.
 
1421
        :keyword retcode: The status code the command should return;
 
1422
            default 0.
 
1423
        :keyword working_dir: The directory to run the command in
 
1424
        :keyword error_regexes: A list of expected error messages.  If
 
1425
            specified they must be seen in the error output of the command.
 
1426
        """
 
1427
        out, err = self._run_bzr_autosplit(
 
1428
            args=args,
 
1429
            retcode=retcode,
 
1430
            encoding=encoding,
 
1431
            stdin=stdin,
 
1432
            working_dir=working_dir,
 
1433
            )
 
1434
        for regex in error_regexes:
 
1435
            self.assertContainsRe(err, regex)
 
1436
        return out, err
 
1437
 
 
1438
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1439
        """Run bzr, and check that stderr contains the supplied regexes
 
1440
 
 
1441
        :param error_regexes: Sequence of regular expressions which
 
1442
            must each be found in the error output. The relative ordering
 
1443
            is not enforced.
 
1444
        :param args: command-line arguments for bzr
 
1445
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1446
            This function changes the default value of retcode to be 3,
 
1447
            since in most cases this is run when you expect bzr to fail.
 
1448
 
 
1449
        :return: (out, err) The actual output of running the command (in case
 
1450
            you want to do more inspection)
 
1451
 
 
1452
        Examples of use::
 
1453
 
 
1454
            # Make sure that commit is failing because there is nothing to do
 
1455
            self.run_bzr_error(['no changes to commit'],
 
1456
                               ['commit', '-m', 'my commit comment'])
 
1457
            # Make sure --strict is handling an unknown file, rather than
 
1458
            # giving us the 'nothing to do' error
 
1459
            self.build_tree(['unknown'])
 
1460
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1461
                               ['commit', --strict', '-m', 'my commit comment'])
 
1462
        """
 
1463
        kwargs.setdefault('retcode', 3)
 
1464
        kwargs['error_regexes'] = error_regexes
 
1465
        out, err = self.run_bzr(*args, **kwargs)
 
1466
        return out, err
 
1467
 
 
1468
    def run_bzr_subprocess(self, *args, **kwargs):
 
1469
        """Run bzr in a subprocess for testing.
 
1470
 
 
1471
        This starts a new Python interpreter and runs bzr in there. 
 
1472
        This should only be used for tests that have a justifiable need for
 
1473
        this isolation: e.g. they are testing startup time, or signal
 
1474
        handling, or early startup code, etc.  Subprocess code can't be 
 
1475
        profiled or debugged so easily.
 
1476
 
 
1477
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1478
            None is supplied, the status code is not checked.
 
1479
        :keyword env_changes: A dictionary which lists changes to environment
 
1480
            variables. A value of None will unset the env variable.
 
1481
            The values must be strings. The change will only occur in the
 
1482
            child, so you don't need to fix the environment after running.
 
1483
        :keyword universal_newlines: Convert CRLF => LF
 
1484
        :keyword allow_plugins: By default the subprocess is run with
 
1485
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1486
            for system-wide plugins to create unexpected output on stderr,
 
1487
            which can cause unnecessary test failures.
 
1488
        """
 
1489
        env_changes = kwargs.get('env_changes', {})
 
1490
        working_dir = kwargs.get('working_dir', None)
 
1491
        allow_plugins = kwargs.get('allow_plugins', False)
 
1492
        if len(args) == 1:
 
1493
            if isinstance(args[0], list):
 
1494
                args = args[0]
 
1495
            elif isinstance(args[0], basestring):
 
1496
                args = list(shlex.split(args[0]))
 
1497
        else:
 
1498
            symbol_versioning.warn(zero_ninetyone %
 
1499
                                   "passing varargs to run_bzr_subprocess",
 
1500
                                   DeprecationWarning, stacklevel=3)
 
1501
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1502
                                            working_dir=working_dir,
 
1503
                                            allow_plugins=allow_plugins)
 
1504
        # We distinguish between retcode=None and retcode not passed.
 
1505
        supplied_retcode = kwargs.get('retcode', 0)
 
1506
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1507
            universal_newlines=kwargs.get('universal_newlines', False),
 
1508
            process_args=args)
 
1509
 
 
1510
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1511
                             skip_if_plan_to_signal=False,
 
1512
                             working_dir=None,
 
1513
                             allow_plugins=False):
 
1514
        """Start bzr in a subprocess for testing.
 
1515
 
 
1516
        This starts a new Python interpreter and runs bzr in there.
 
1517
        This should only be used for tests that have a justifiable need for
 
1518
        this isolation: e.g. they are testing startup time, or signal
 
1519
        handling, or early startup code, etc.  Subprocess code can't be
 
1520
        profiled or debugged so easily.
 
1521
 
 
1522
        :param process_args: a list of arguments to pass to the bzr executable,
 
1523
            for example ``['--version']``.
 
1524
        :param env_changes: A dictionary which lists changes to environment
 
1525
            variables. A value of None will unset the env variable.
 
1526
            The values must be strings. The change will only occur in the
 
1527
            child, so you don't need to fix the environment after running.
 
1528
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1529
            is not available.
 
1530
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1531
 
 
1532
        :returns: Popen object for the started process.
 
1533
        """
 
1534
        if skip_if_plan_to_signal:
 
1535
            if not getattr(os, 'kill', None):
 
1536
                raise TestSkipped("os.kill not available.")
 
1537
 
 
1538
        if env_changes is None:
 
1539
            env_changes = {}
 
1540
        old_env = {}
 
1541
 
 
1542
        def cleanup_environment():
 
1543
            for env_var, value in env_changes.iteritems():
 
1544
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1545
 
 
1546
        def restore_environment():
 
1547
            for env_var, value in old_env.iteritems():
 
1548
                osutils.set_or_unset_env(env_var, value)
 
1549
 
 
1550
        bzr_path = self.get_bzr_path()
 
1551
 
 
1552
        cwd = None
 
1553
        if working_dir is not None:
 
1554
            cwd = osutils.getcwd()
 
1555
            os.chdir(working_dir)
 
1556
 
 
1557
        try:
 
1558
            # win32 subprocess doesn't support preexec_fn
 
1559
            # so we will avoid using it on all platforms, just to
 
1560
            # make sure the code path is used, and we don't break on win32
 
1561
            cleanup_environment()
 
1562
            command = [sys.executable, bzr_path]
 
1563
            if not allow_plugins:
 
1564
                command.append('--no-plugins')
 
1565
            command.extend(process_args)
 
1566
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1567
        finally:
 
1568
            restore_environment()
 
1569
            if cwd is not None:
 
1570
                os.chdir(cwd)
 
1571
 
 
1572
        return process
 
1573
 
 
1574
    def _popen(self, *args, **kwargs):
 
1575
        """Place a call to Popen.
 
1576
 
 
1577
        Allows tests to override this method to intercept the calls made to
 
1578
        Popen for introspection.
 
1579
        """
 
1580
        return Popen(*args, **kwargs)
 
1581
 
 
1582
    def get_bzr_path(self):
 
1583
        """Return the path of the 'bzr' executable for this test suite."""
 
1584
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1585
        if not os.path.isfile(bzr_path):
 
1586
            # We are probably installed. Assume sys.argv is the right file
 
1587
            bzr_path = sys.argv[0]
 
1588
        return bzr_path
 
1589
 
 
1590
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1591
                              universal_newlines=False, process_args=None):
 
1592
        """Finish the execution of process.
 
1593
 
 
1594
        :param process: the Popen object returned from start_bzr_subprocess.
 
1595
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1596
            None is supplied, the status code is not checked.
 
1597
        :param send_signal: an optional signal to send to the process.
 
1598
        :param universal_newlines: Convert CRLF => LF
 
1599
        :returns: (stdout, stderr)
 
1600
        """
 
1601
        if send_signal is not None:
 
1602
            os.kill(process.pid, send_signal)
 
1603
        out, err = process.communicate()
 
1604
 
 
1605
        if universal_newlines:
 
1606
            out = out.replace('\r\n', '\n')
 
1607
            err = err.replace('\r\n', '\n')
 
1608
 
 
1609
        if retcode is not None and retcode != process.returncode:
 
1610
            if process_args is None:
 
1611
                process_args = "(unknown args)"
 
1612
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1613
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1614
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1615
                      % (process_args, retcode, process.returncode))
 
1616
        return [out, err]
 
1617
 
 
1618
    def check_inventory_shape(self, inv, shape):
 
1619
        """Compare an inventory to a list of expected names.
 
1620
 
 
1621
        Fail if they are not precisely equal.
 
1622
        """
 
1623
        extras = []
 
1624
        shape = list(shape)             # copy
 
1625
        for path, ie in inv.entries():
 
1626
            name = path.replace('\\', '/')
 
1627
            if ie.kind == 'directory':
 
1628
                name = name + '/'
 
1629
            if name in shape:
 
1630
                shape.remove(name)
 
1631
            else:
 
1632
                extras.append(name)
 
1633
        if shape:
 
1634
            self.fail("expected paths not found in inventory: %r" % shape)
 
1635
        if extras:
 
1636
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1637
 
 
1638
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1639
                         a_callable=None, *args, **kwargs):
 
1640
        """Call callable with redirected std io pipes.
 
1641
 
 
1642
        Returns the return code."""
 
1643
        if not callable(a_callable):
 
1644
            raise ValueError("a_callable must be callable.")
 
1645
        if stdin is None:
 
1646
            stdin = StringIO("")
 
1647
        if stdout is None:
 
1648
            if getattr(self, "_log_file", None) is not None:
 
1649
                stdout = self._log_file
 
1650
            else:
 
1651
                stdout = StringIO()
 
1652
        if stderr is None:
 
1653
            if getattr(self, "_log_file", None is not None):
 
1654
                stderr = self._log_file
 
1655
            else:
 
1656
                stderr = StringIO()
 
1657
        real_stdin = sys.stdin
 
1658
        real_stdout = sys.stdout
 
1659
        real_stderr = sys.stderr
 
1660
        try:
 
1661
            sys.stdout = stdout
 
1662
            sys.stderr = stderr
 
1663
            sys.stdin = stdin
 
1664
            return a_callable(*args, **kwargs)
 
1665
        finally:
 
1666
            sys.stdout = real_stdout
 
1667
            sys.stderr = real_stderr
 
1668
            sys.stdin = real_stdin
 
1669
 
 
1670
    def reduceLockdirTimeout(self):
 
1671
        """Reduce the default lock timeout for the duration of the test, so that
 
1672
        if LockContention occurs during a test, it does so quickly.
 
1673
 
 
1674
        Tests that expect to provoke LockContention errors should call this.
 
1675
        """
 
1676
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1677
        def resetTimeout():
 
1678
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1679
        self.addCleanup(resetTimeout)
 
1680
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1681
 
 
1682
    def make_utf8_encoded_stringio(self, encoding_type=None):
 
1683
        """Return a StringIOWrapper instance, that will encode Unicode
 
1684
        input to UTF-8.
 
1685
        """
 
1686
        if encoding_type is None:
 
1687
            encoding_type = 'strict'
 
1688
        sio = StringIO()
 
1689
        output_encoding = 'utf-8'
 
1690
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
 
1691
        sio.encoding = output_encoding
 
1692
        return sio
 
1693
 
 
1694
 
 
1695
class TestCaseWithMemoryTransport(TestCase):
 
1696
    """Common test class for tests that do not need disk resources.
 
1697
 
 
1698
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1699
 
 
1700
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1701
 
 
1702
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1703
    a directory which does not exist. This serves to help ensure test isolation
 
1704
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1705
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1706
    file defaults for the transport in tests, nor does it obey the command line
 
1707
    override, so tests that accidentally write to the common directory should
 
1708
    be rare.
 
1709
 
 
1710
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1711
    a .bzr directory that stops us ascending higher into the filesystem.
 
1712
    """
 
1713
 
 
1714
    TEST_ROOT = None
 
1715
    _TEST_NAME = 'test'
 
1716
 
 
1717
    def __init__(self, methodName='runTest'):
 
1718
        # allow test parameterisation after test construction and before test
 
1719
        # execution. Variables that the parameteriser sets need to be 
 
1720
        # ones that are not set by setUp, or setUp will trash them.
 
1721
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1722
        self.vfs_transport_factory = default_transport
 
1723
        self.transport_server = None
 
1724
        self.transport_readonly_server = None
 
1725
        self.__vfs_server = None
 
1726
 
 
1727
    def get_transport(self, relpath=None):
 
1728
        """Return a writeable transport.
 
1729
 
 
1730
        This transport is for the test scratch space relative to
 
1731
        "self._test_root"
 
1732
        
 
1733
        :param relpath: a path relative to the base url.
 
1734
        """
 
1735
        t = get_transport(self.get_url(relpath))
 
1736
        self.assertFalse(t.is_readonly())
 
1737
        return t
 
1738
 
 
1739
    def get_readonly_transport(self, relpath=None):
 
1740
        """Return a readonly transport for the test scratch space
 
1741
        
 
1742
        This can be used to test that operations which should only need
 
1743
        readonly access in fact do not try to write.
 
1744
 
 
1745
        :param relpath: a path relative to the base url.
 
1746
        """
 
1747
        t = get_transport(self.get_readonly_url(relpath))
 
1748
        self.assertTrue(t.is_readonly())
 
1749
        return t
 
1750
 
 
1751
    def create_transport_readonly_server(self):
 
1752
        """Create a transport server from class defined at init.
 
1753
 
 
1754
        This is mostly a hook for daughter classes.
 
1755
        """
 
1756
        return self.transport_readonly_server()
 
1757
 
 
1758
    def get_readonly_server(self):
 
1759
        """Get the server instance for the readonly transport
 
1760
 
 
1761
        This is useful for some tests with specific servers to do diagnostics.
 
1762
        """
 
1763
        if self.__readonly_server is None:
 
1764
            if self.transport_readonly_server is None:
 
1765
                # readonly decorator requested
 
1766
                # bring up the server
 
1767
                self.__readonly_server = ReadonlyServer()
 
1768
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1769
            else:
 
1770
                self.__readonly_server = self.create_transport_readonly_server()
 
1771
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1772
            self.addCleanup(self.__readonly_server.tearDown)
 
1773
        return self.__readonly_server
 
1774
 
 
1775
    def get_readonly_url(self, relpath=None):
 
1776
        """Get a URL for the readonly transport.
 
1777
 
 
1778
        This will either be backed by '.' or a decorator to the transport 
 
1779
        used by self.get_url()
 
1780
        relpath provides for clients to get a path relative to the base url.
 
1781
        These should only be downwards relative, not upwards.
 
1782
        """
 
1783
        base = self.get_readonly_server().get_url()
 
1784
        return self._adjust_url(base, relpath)
 
1785
 
 
1786
    def get_vfs_only_server(self):
 
1787
        """Get the vfs only read/write server instance.
 
1788
 
 
1789
        This is useful for some tests with specific servers that need
 
1790
        diagnostics.
 
1791
 
 
1792
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1793
        is no means to override it.
 
1794
        """
 
1795
        if self.__vfs_server is None:
 
1796
            self.__vfs_server = MemoryServer()
 
1797
            self.__vfs_server.setUp()
 
1798
            self.addCleanup(self.__vfs_server.tearDown)
 
1799
        return self.__vfs_server
 
1800
 
 
1801
    def get_server(self):
 
1802
        """Get the read/write server instance.
 
1803
 
 
1804
        This is useful for some tests with specific servers that need
 
1805
        diagnostics.
 
1806
 
 
1807
        This is built from the self.transport_server factory. If that is None,
 
1808
        then the self.get_vfs_server is returned.
 
1809
        """
 
1810
        if self.__server is None:
 
1811
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1812
                return self.get_vfs_only_server()
 
1813
            else:
 
1814
                # bring up a decorated means of access to the vfs only server.
 
1815
                self.__server = self.transport_server()
 
1816
                try:
 
1817
                    self.__server.setUp(self.get_vfs_only_server())
 
1818
                except TypeError, e:
 
1819
                    # This should never happen; the try:Except here is to assist
 
1820
                    # developers having to update code rather than seeing an
 
1821
                    # uninformative TypeError.
 
1822
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1823
            self.addCleanup(self.__server.tearDown)
 
1824
        return self.__server
 
1825
 
 
1826
    def _adjust_url(self, base, relpath):
 
1827
        """Get a URL (or maybe a path) for the readwrite transport.
 
1828
 
 
1829
        This will either be backed by '.' or to an equivalent non-file based
 
1830
        facility.
 
1831
        relpath provides for clients to get a path relative to the base url.
 
1832
        These should only be downwards relative, not upwards.
 
1833
        """
 
1834
        if relpath is not None and relpath != '.':
 
1835
            if not base.endswith('/'):
 
1836
                base = base + '/'
 
1837
            # XXX: Really base should be a url; we did after all call
 
1838
            # get_url()!  But sometimes it's just a path (from
 
1839
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1840
            # to a non-escaped local path.
 
1841
            if base.startswith('./') or base.startswith('/'):
 
1842
                base += relpath
 
1843
            else:
 
1844
                base += urlutils.escape(relpath)
 
1845
        return base
 
1846
 
 
1847
    def get_url(self, relpath=None):
 
1848
        """Get a URL (or maybe a path) for the readwrite transport.
 
1849
 
 
1850
        This will either be backed by '.' or to an equivalent non-file based
 
1851
        facility.
 
1852
        relpath provides for clients to get a path relative to the base url.
 
1853
        These should only be downwards relative, not upwards.
 
1854
        """
 
1855
        base = self.get_server().get_url()
 
1856
        return self._adjust_url(base, relpath)
 
1857
 
 
1858
    def get_vfs_only_url(self, relpath=None):
 
1859
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1860
 
 
1861
        This will never be a smart protocol.  It always has all the
 
1862
        capabilities of the local filesystem, but it might actually be a
 
1863
        MemoryTransport or some other similar virtual filesystem.
 
1864
 
 
1865
        This is the backing transport (if any) of the server returned by
 
1866
        get_url and get_readonly_url.
 
1867
 
 
1868
        :param relpath: provides for clients to get a path relative to the base
 
1869
            url.  These should only be downwards relative, not upwards.
 
1870
        :return: A URL
 
1871
        """
 
1872
        base = self.get_vfs_only_server().get_url()
 
1873
        return self._adjust_url(base, relpath)
 
1874
 
 
1875
    def _create_safety_net(self):
 
1876
        """Make a fake bzr directory.
 
1877
 
 
1878
        This prevents any tests propagating up onto the TEST_ROOT directory's
 
1879
        real branch.
 
1880
        """
 
1881
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1882
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1883
 
 
1884
    def _check_safety_net(self):
 
1885
        """Check that the safety .bzr directory have not been touched.
 
1886
 
 
1887
        _make_test_root have created a .bzr directory to prevent tests from
 
1888
        propagating. This method ensures than a test did not leaked.
 
1889
        """
 
1890
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
1891
        wt = workingtree.WorkingTree.open(root)
 
1892
        last_rev = wt.last_revision()
 
1893
        if last_rev != 'null:':
 
1894
            # The current test have modified the /bzr directory, we need to
 
1895
            # recreate a new one or all the followng tests will fail.
 
1896
            # If you need to inspect its content uncomment the following line
 
1897
            # import pdb; pdb.set_trace()
 
1898
            _rmtree_temp_dir(root + '/.bzr')
 
1899
            self._create_safety_net()
 
1900
            raise AssertionError('%s/.bzr should not be modified' % root)
 
1901
 
 
1902
    def _make_test_root(self):
 
1903
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
1904
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1905
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
1906
 
 
1907
            self._create_safety_net()
 
1908
 
 
1909
            # The same directory is used by all tests, and we're not
 
1910
            # specifically told when all tests are finished.  This will do.
 
1911
            atexit.register(_rmtree_temp_dir, root)
 
1912
 
 
1913
        self.addCleanup(self._check_safety_net)
 
1914
 
 
1915
    def makeAndChdirToTestDir(self):
 
1916
        """Create a temporary directories for this one test.
 
1917
        
 
1918
        This must set self.test_home_dir and self.test_dir and chdir to
 
1919
        self.test_dir.
 
1920
        
 
1921
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1922
        """
 
1923
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1924
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1925
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1926
        
 
1927
    def make_branch(self, relpath, format=None):
 
1928
        """Create a branch on the transport at relpath."""
 
1929
        repo = self.make_repository(relpath, format=format)
 
1930
        return repo.bzrdir.create_branch()
 
1931
 
 
1932
    def make_bzrdir(self, relpath, format=None):
 
1933
        try:
 
1934
            # might be a relative or absolute path
 
1935
            maybe_a_url = self.get_url(relpath)
 
1936
            segments = maybe_a_url.rsplit('/', 1)
 
1937
            t = get_transport(maybe_a_url)
 
1938
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1939
                t.ensure_base()
 
1940
            if format is None:
 
1941
                format = 'default'
 
1942
            if isinstance(format, basestring):
 
1943
                format = bzrdir.format_registry.make_bzrdir(format)
 
1944
            return format.initialize_on_transport(t)
 
1945
        except errors.UninitializableFormat:
 
1946
            raise TestSkipped("Format %s is not initializable." % format)
 
1947
 
 
1948
    def make_repository(self, relpath, shared=False, format=None):
 
1949
        """Create a repository on our default transport at relpath.
 
1950
        
 
1951
        Note that relpath must be a relative path, not a full url.
 
1952
        """
 
1953
        # FIXME: If you create a remoterepository this returns the underlying
 
1954
        # real format, which is incorrect.  Actually we should make sure that 
 
1955
        # RemoteBzrDir returns a RemoteRepository.
 
1956
        # maybe  mbp 20070410
 
1957
        made_control = self.make_bzrdir(relpath, format=format)
 
1958
        return made_control.create_repository(shared=shared)
 
1959
 
 
1960
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1961
        """Create a branch on the default transport and a MemoryTree for it."""
 
1962
        b = self.make_branch(relpath, format=format)
 
1963
        return memorytree.MemoryTree.create_on_branch(b)
 
1964
 
 
1965
    def overrideEnvironmentForTesting(self):
 
1966
        os.environ['HOME'] = self.test_home_dir
 
1967
        os.environ['BZR_HOME'] = self.test_home_dir
 
1968
        
 
1969
    def setUp(self):
 
1970
        super(TestCaseWithMemoryTransport, self).setUp()
 
1971
        self._make_test_root()
 
1972
        _currentdir = os.getcwdu()
 
1973
        def _leaveDirectory():
 
1974
            os.chdir(_currentdir)
 
1975
        self.addCleanup(_leaveDirectory)
 
1976
        self.makeAndChdirToTestDir()
 
1977
        self.overrideEnvironmentForTesting()
 
1978
        self.__readonly_server = None
 
1979
        self.__server = None
 
1980
        self.reduceLockdirTimeout()
 
1981
 
 
1982
     
 
1983
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1984
    """Derived class that runs a test within a temporary directory.
 
1985
 
 
1986
    This is useful for tests that need to create a branch, etc.
 
1987
 
 
1988
    The directory is created in a slightly complex way: for each
 
1989
    Python invocation, a new temporary top-level directory is created.
 
1990
    All test cases create their own directory within that.  If the
 
1991
    tests complete successfully, the directory is removed.
 
1992
 
 
1993
    :ivar test_base_dir: The path of the top-level directory for this 
 
1994
    test, which contains a home directory and a work directory.
 
1995
 
 
1996
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1997
    which is used as $HOME for this test.
 
1998
 
 
1999
    :ivar test_dir: A directory under test_base_dir used as the current
 
2000
    directory when the test proper is run.
 
2001
    """
 
2002
 
 
2003
    OVERRIDE_PYTHON = 'python'
 
2004
 
 
2005
    def check_file_contents(self, filename, expect):
 
2006
        self.log("check contents of file %s" % filename)
 
2007
        contents = file(filename, 'r').read()
 
2008
        if contents != expect:
 
2009
            self.log("expected: %r" % expect)
 
2010
            self.log("actually: %r" % contents)
 
2011
            self.fail("contents of %s not as expected" % filename)
 
2012
 
 
2013
    def makeAndChdirToTestDir(self):
 
2014
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
2015
        
 
2016
        For TestCaseInTempDir we create a temporary directory based on the test
 
2017
        name and then create two subdirs - test and home under it.
 
2018
        """
 
2019
        # create a directory within the top level test directory
 
2020
        candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
 
2021
        # now create test and home directories within this dir
 
2022
        self.test_base_dir = candidate_dir
 
2023
        self.test_home_dir = self.test_base_dir + '/home'
 
2024
        os.mkdir(self.test_home_dir)
 
2025
        self.test_dir = self.test_base_dir + '/work'
 
2026
        os.mkdir(self.test_dir)
 
2027
        os.chdir(self.test_dir)
 
2028
        # put name of test inside
 
2029
        f = file(self.test_base_dir + '/name', 'w')
 
2030
        try:
 
2031
            f.write(self.id())
 
2032
        finally:
 
2033
            f.close()
 
2034
        self.addCleanup(self.deleteTestDir)
 
2035
 
 
2036
    def deleteTestDir(self):
 
2037
        os.chdir(self.TEST_ROOT)
 
2038
        _rmtree_temp_dir(self.test_base_dir)
 
2039
 
 
2040
    def build_tree(self, shape, line_endings='binary', transport=None):
 
2041
        """Build a test tree according to a pattern.
 
2042
 
 
2043
        shape is a sequence of file specifications.  If the final
 
2044
        character is '/', a directory is created.
 
2045
 
 
2046
        This assumes that all the elements in the tree being built are new.
 
2047
 
 
2048
        This doesn't add anything to a branch.
 
2049
 
 
2050
        :type shape:    list or tuple.
 
2051
        :param line_endings: Either 'binary' or 'native'
 
2052
            in binary mode, exact contents are written in native mode, the
 
2053
            line endings match the default platform endings.
 
2054
        :param transport: A transport to write to, for building trees on VFS's.
 
2055
            If the transport is readonly or None, "." is opened automatically.
 
2056
        :return: None
 
2057
        """
 
2058
        if type(shape) not in (list, tuple):
 
2059
            raise AssertionError("Parameter 'shape' should be "
 
2060
                "a list or a tuple. Got %r instead" % (shape,))
 
2061
        # It's OK to just create them using forward slashes on windows.
 
2062
        if transport is None or transport.is_readonly():
 
2063
            transport = get_transport(".")
 
2064
        for name in shape:
 
2065
            self.assert_(isinstance(name, basestring))
 
2066
            if name[-1] == '/':
 
2067
                transport.mkdir(urlutils.escape(name[:-1]))
 
2068
            else:
 
2069
                if line_endings == 'binary':
 
2070
                    end = '\n'
 
2071
                elif line_endings == 'native':
 
2072
                    end = os.linesep
 
2073
                else:
 
2074
                    raise errors.BzrError(
 
2075
                        'Invalid line ending request %r' % line_endings)
 
2076
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
2077
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
2078
 
 
2079
    def build_tree_contents(self, shape):
 
2080
        build_tree_contents(shape)
 
2081
 
 
2082
    def assertFileEqual(self, content, path):
 
2083
        """Fail if path does not contain 'content'."""
 
2084
        self.failUnlessExists(path)
 
2085
        f = file(path, 'rb')
 
2086
        try:
 
2087
            s = f.read()
 
2088
        finally:
 
2089
            f.close()
 
2090
        self.assertEqualDiff(content, s)
 
2091
 
 
2092
    def failUnlessExists(self, path):
 
2093
        """Fail unless path or paths, which may be abs or relative, exist."""
 
2094
        if not isinstance(path, basestring):
 
2095
            for p in path:
 
2096
                self.failUnlessExists(p)
 
2097
        else:
 
2098
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
2099
 
 
2100
    def failIfExists(self, path):
 
2101
        """Fail if path or paths, which may be abs or relative, exist."""
 
2102
        if not isinstance(path, basestring):
 
2103
            for p in path:
 
2104
                self.failIfExists(p)
 
2105
        else:
 
2106
            self.failIf(osutils.lexists(path),path+" exists")
 
2107
 
 
2108
    def assertInWorkingTree(self, path, root_path='.', tree=None):
 
2109
        """Assert whether path or paths are in the WorkingTree"""
 
2110
        if tree is None:
 
2111
            tree = workingtree.WorkingTree.open(root_path)
 
2112
        if not isinstance(path, basestring):
 
2113
            for p in path:
 
2114
                self.assertInWorkingTree(p,tree=tree)
 
2115
        else:
 
2116
            self.assertIsNot(tree.path2id(path), None,
 
2117
                path+' not in working tree.')
 
2118
 
 
2119
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
 
2120
        """Assert whether path or paths are not in the WorkingTree"""
 
2121
        if tree is None:
 
2122
            tree = workingtree.WorkingTree.open(root_path)
 
2123
        if not isinstance(path, basestring):
 
2124
            for p in path:
 
2125
                self.assertNotInWorkingTree(p,tree=tree)
 
2126
        else:
 
2127
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2128
 
 
2129
 
 
2130
class TestCaseWithTransport(TestCaseInTempDir):
 
2131
    """A test case that provides get_url and get_readonly_url facilities.
 
2132
 
 
2133
    These back onto two transport servers, one for readonly access and one for
 
2134
    read write access.
 
2135
 
 
2136
    If no explicit class is provided for readonly access, a
 
2137
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2138
    based read write transports.
 
2139
 
 
2140
    If an explicit class is provided for readonly access, that server and the 
 
2141
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2142
    """
 
2143
 
 
2144
    def get_vfs_only_server(self):
 
2145
        """See TestCaseWithMemoryTransport.
 
2146
 
 
2147
        This is useful for some tests with specific servers that need
 
2148
        diagnostics.
 
2149
        """
 
2150
        if self.__vfs_server is None:
 
2151
            self.__vfs_server = self.vfs_transport_factory()
 
2152
            self.__vfs_server.setUp()
 
2153
            self.addCleanup(self.__vfs_server.tearDown)
 
2154
        return self.__vfs_server
 
2155
 
 
2156
    def make_branch_and_tree(self, relpath, format=None):
 
2157
        """Create a branch on the transport and a tree locally.
 
2158
 
 
2159
        If the transport is not a LocalTransport, the Tree can't be created on
 
2160
        the transport.  In that case if the vfs_transport_factory is
 
2161
        LocalURLServer the working tree is created in the local
 
2162
        directory backing the transport, and the returned tree's branch and
 
2163
        repository will also be accessed locally. Otherwise a lightweight
 
2164
        checkout is created and returned.
 
2165
 
 
2166
        :param format: The BzrDirFormat.
 
2167
        :returns: the WorkingTree.
 
2168
        """
 
2169
        # TODO: always use the local disk path for the working tree,
 
2170
        # this obviously requires a format that supports branch references
 
2171
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2172
        # RBC 20060208
 
2173
        b = self.make_branch(relpath, format=format)
 
2174
        try:
 
2175
            return b.bzrdir.create_workingtree()
 
2176
        except errors.NotLocalUrl:
 
2177
            # We can only make working trees locally at the moment.  If the
 
2178
            # transport can't support them, then we keep the non-disk-backed
 
2179
            # branch and create a local checkout.
 
2180
            if self.vfs_transport_factory is LocalURLServer:
 
2181
                # the branch is colocated on disk, we cannot create a checkout.
 
2182
                # hopefully callers will expect this.
 
2183
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2184
                return local_controldir.create_workingtree()
 
2185
            else:
 
2186
                return b.create_checkout(relpath, lightweight=True)
 
2187
 
 
2188
    def assertIsDirectory(self, relpath, transport):
 
2189
        """Assert that relpath within transport is a directory.
 
2190
 
 
2191
        This may not be possible on all transports; in that case it propagates
 
2192
        a TransportNotPossible.
 
2193
        """
 
2194
        try:
 
2195
            mode = transport.stat(relpath).st_mode
 
2196
        except errors.NoSuchFile:
 
2197
            self.fail("path %s is not a directory; no such file"
 
2198
                      % (relpath))
 
2199
        if not stat.S_ISDIR(mode):
 
2200
            self.fail("path %s is not a directory; has mode %#o"
 
2201
                      % (relpath, mode))
 
2202
 
 
2203
    def assertTreesEqual(self, left, right):
 
2204
        """Check that left and right have the same content and properties."""
 
2205
        # we use a tree delta to check for equality of the content, and we
 
2206
        # manually check for equality of other things such as the parents list.
 
2207
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2208
        differences = left.changes_from(right)
 
2209
        self.assertFalse(differences.has_changed(),
 
2210
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2211
 
 
2212
    def setUp(self):
 
2213
        super(TestCaseWithTransport, self).setUp()
 
2214
        self.__vfs_server = None
 
2215
 
 
2216
 
 
2217
class ChrootedTestCase(TestCaseWithTransport):
 
2218
    """A support class that provides readonly urls outside the local namespace.
 
2219
 
 
2220
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2221
    is then we are chrooted already, if it is not then an HttpServer is used
 
2222
    for readonly urls.
 
2223
 
 
2224
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2225
                       be used without needed to redo it when a different 
 
2226
                       subclass is in use ?
 
2227
    """
 
2228
 
 
2229
    def setUp(self):
 
2230
        super(ChrootedTestCase, self).setUp()
 
2231
        if not self.vfs_transport_factory == MemoryServer:
 
2232
            self.transport_readonly_server = HttpServer
 
2233
 
 
2234
 
 
2235
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2236
                       random_order=False):
 
2237
    """Create a test suite by filtering another one.
 
2238
    
 
2239
    :param suite:           the source suite
 
2240
    :param pattern:         pattern that names must match
 
2241
    :param exclude_pattern: pattern that names must not match, if any
 
2242
    :param random_order:    if True, tests in the new suite will be put in
 
2243
                            random order
 
2244
    :returns: the newly created suite
 
2245
    """ 
 
2246
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2247
        random_order, False)
 
2248
 
 
2249
 
 
2250
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2251
                     random_order=False, append_rest=True):
 
2252
    """Create a test suite by sorting another one.
 
2253
    
 
2254
    :param suite:           the source suite
 
2255
    :param pattern:         pattern that names must match in order to go
 
2256
                            first in the new suite
 
2257
    :param exclude_pattern: pattern that names must not match, if any
 
2258
    :param random_order:    if True, tests in the new suite will be put in
 
2259
                            random order
 
2260
    :param append_rest:     if False, pattern is a strict filter and not
 
2261
                            just an ordering directive
 
2262
    :returns: the newly created suite
 
2263
    """ 
 
2264
    first = []
 
2265
    second = []
 
2266
    filter_re = re.compile(pattern)
 
2267
    if exclude_pattern is not None:
 
2268
        exclude_re = re.compile(exclude_pattern)
 
2269
    for test in iter_suite_tests(suite):
 
2270
        test_id = test.id()
 
2271
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2272
            if filter_re.search(test_id):
 
2273
                first.append(test)
 
2274
            elif append_rest:
 
2275
                second.append(test)
 
2276
    if random_order:
 
2277
        random.shuffle(first)
 
2278
        random.shuffle(second)
 
2279
    return TestUtil.TestSuite(first + second)
 
2280
 
 
2281
 
 
2282
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2283
              stop_on_failure=False,
 
2284
              transport=None, lsprof_timed=None, bench_history=None,
 
2285
              matching_tests_first=None,
 
2286
              list_only=False,
 
2287
              random_seed=None,
 
2288
              exclude_pattern=None,
 
2289
              strict=False,
 
2290
              coverage_dir=None,
 
2291
              ):
 
2292
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2293
    if verbose:
 
2294
        verbosity = 2
 
2295
    else:
 
2296
        verbosity = 1
 
2297
    runner = TextTestRunner(stream=sys.stdout,
 
2298
                            descriptions=0,
 
2299
                            verbosity=verbosity,
 
2300
                            bench_history=bench_history,
 
2301
                            list_only=list_only,
 
2302
                            )
 
2303
    runner.stop_on_failure=stop_on_failure
 
2304
    # Initialise the random number generator and display the seed used.
 
2305
    # We convert the seed to a long to make it reuseable across invocations.
 
2306
    random_order = False
 
2307
    if random_seed is not None:
 
2308
        random_order = True
 
2309
        if random_seed == "now":
 
2310
            random_seed = long(time.time())
 
2311
        else:
 
2312
            # Convert the seed to a long if we can
 
2313
            try:
 
2314
                random_seed = long(random_seed)
 
2315
            except:
 
2316
                pass
 
2317
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2318
            (random_seed))
 
2319
        random.seed(random_seed)
 
2320
    # Customise the list of tests if requested
 
2321
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2322
        if matching_tests_first:
 
2323
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2324
                random_order)
 
2325
        else:
 
2326
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2327
                random_order)
 
2328
 
 
2329
    if coverage_dir is not None:
 
2330
        tracer = trace.Trace(count=1, trace=0)
 
2331
        sys.settrace(tracer.globaltrace)
 
2332
 
 
2333
    result = runner.run(suite)
 
2334
 
 
2335
    if coverage_dir is not None:
 
2336
        sys.settrace(None)
 
2337
        results = tracer.results()
 
2338
        results.write_results(show_missing=1, summary=False,
 
2339
                              coverdir=coverage_dir)
 
2340
 
 
2341
    if strict:
 
2342
        return result.wasStrictlySuccessful()
 
2343
 
 
2344
    return result.wasSuccessful()
 
2345
 
 
2346
 
 
2347
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2348
             transport=None,
 
2349
             test_suite_factory=None,
 
2350
             lsprof_timed=None,
 
2351
             bench_history=None,
 
2352
             matching_tests_first=None,
 
2353
             list_only=False,
 
2354
             random_seed=None,
 
2355
             exclude_pattern=None,
 
2356
             strict=False,
 
2357
             coverage_dir=None,
 
2358
             ):
 
2359
    """Run the whole test suite under the enhanced runner"""
 
2360
    # XXX: Very ugly way to do this...
 
2361
    # Disable warning about old formats because we don't want it to disturb
 
2362
    # any blackbox tests.
 
2363
    from bzrlib import repository
 
2364
    repository._deprecation_warning_done = True
 
2365
 
 
2366
    global default_transport
 
2367
    if transport is None:
 
2368
        transport = default_transport
 
2369
    old_transport = default_transport
 
2370
    default_transport = transport
 
2371
    try:
 
2372
        if test_suite_factory is None:
 
2373
            suite = test_suite()
 
2374
        else:
 
2375
            suite = test_suite_factory()
 
2376
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2377
                     stop_on_failure=stop_on_failure,
 
2378
                     transport=transport,
 
2379
                     lsprof_timed=lsprof_timed,
 
2380
                     bench_history=bench_history,
 
2381
                     matching_tests_first=matching_tests_first,
 
2382
                     list_only=list_only,
 
2383
                     random_seed=random_seed,
 
2384
                     exclude_pattern=exclude_pattern,
 
2385
                     strict=strict,
 
2386
                     coverage_dir=coverage_dir)
 
2387
    finally:
 
2388
        default_transport = old_transport
 
2389
 
 
2390
 
 
2391
def test_suite():
 
2392
    """Build and return TestSuite for the whole of bzrlib.
 
2393
    
 
2394
    This function can be replaced if you need to change the default test
 
2395
    suite on a global basis, but it is not encouraged.
 
2396
    """
 
2397
    testmod_names = [
 
2398
                   'bzrlib.util.tests.test_bencode',
 
2399
                   'bzrlib.tests.test__dirstate_helpers',
 
2400
                   'bzrlib.tests.test_ancestry',
 
2401
                   'bzrlib.tests.test_annotate',
 
2402
                   'bzrlib.tests.test_api',
 
2403
                   'bzrlib.tests.test_atomicfile',
 
2404
                   'bzrlib.tests.test_bad_files',
 
2405
                   'bzrlib.tests.test_bisect_multi',
 
2406
                   'bzrlib.tests.test_branch',
 
2407
                   'bzrlib.tests.test_branchbuilder',
 
2408
                   'bzrlib.tests.test_bugtracker',
 
2409
                   'bzrlib.tests.test_bundle',
 
2410
                   'bzrlib.tests.test_bzrdir',
 
2411
                   'bzrlib.tests.test_cache_utf8',
 
2412
                   'bzrlib.tests.test_commands',
 
2413
                   'bzrlib.tests.test_commit',
 
2414
                   'bzrlib.tests.test_commit_merge',
 
2415
                   'bzrlib.tests.test_config',
 
2416
                   'bzrlib.tests.test_conflicts',
 
2417
                   'bzrlib.tests.test_counted_lock',
 
2418
                   'bzrlib.tests.test_decorators',
 
2419
                   'bzrlib.tests.test_delta',
 
2420
                   'bzrlib.tests.test_deprecated_graph',
 
2421
                   'bzrlib.tests.test_diff',
 
2422
                   'bzrlib.tests.test_dirstate',
 
2423
                   'bzrlib.tests.test_email_message',
 
2424
                   'bzrlib.tests.test_errors',
 
2425
                   'bzrlib.tests.test_escaped_store',
 
2426
                   'bzrlib.tests.test_extract',
 
2427
                   'bzrlib.tests.test_fetch',
 
2428
                   'bzrlib.tests.test_ftp_transport',
 
2429
                   'bzrlib.tests.test_generate_docs',
 
2430
                   'bzrlib.tests.test_generate_ids',
 
2431
                   'bzrlib.tests.test_globbing',
 
2432
                   'bzrlib.tests.test_gpg',
 
2433
                   'bzrlib.tests.test_graph',
 
2434
                   'bzrlib.tests.test_hashcache',
 
2435
                   'bzrlib.tests.test_help',
 
2436
                   'bzrlib.tests.test_hooks',
 
2437
                   'bzrlib.tests.test_http',
 
2438
                   'bzrlib.tests.test_http_response',
 
2439
                   'bzrlib.tests.test_https_ca_bundle',
 
2440
                   'bzrlib.tests.test_identitymap',
 
2441
                   'bzrlib.tests.test_ignores',
 
2442
                   'bzrlib.tests.test_index',
 
2443
                   'bzrlib.tests.test_info',
 
2444
                   'bzrlib.tests.test_inv',
 
2445
                   'bzrlib.tests.test_knit',
 
2446
                   'bzrlib.tests.test_lazy_import',
 
2447
                   'bzrlib.tests.test_lazy_regex',
 
2448
                   'bzrlib.tests.test_lockdir',
 
2449
                   'bzrlib.tests.test_lockable_files',
 
2450
                   'bzrlib.tests.test_log',
 
2451
                   'bzrlib.tests.test_lsprof',
 
2452
                   'bzrlib.tests.test_lru_cache',
 
2453
                   'bzrlib.tests.test_mail_client',
 
2454
                   'bzrlib.tests.test_memorytree',
 
2455
                   'bzrlib.tests.test_merge',
 
2456
                   'bzrlib.tests.test_merge3',
 
2457
                   'bzrlib.tests.test_merge_core',
 
2458
                   'bzrlib.tests.test_merge_directive',
 
2459
                   'bzrlib.tests.test_missing',
 
2460
                   'bzrlib.tests.test_msgeditor',
 
2461
                   'bzrlib.tests.test_multiparent',
 
2462
                   'bzrlib.tests.test_nonascii',
 
2463
                   'bzrlib.tests.test_options',
 
2464
                   'bzrlib.tests.test_osutils',
 
2465
                   'bzrlib.tests.test_osutils_encodings',
 
2466
                   'bzrlib.tests.test_pack',
 
2467
                   'bzrlib.tests.test_patch',
 
2468
                   'bzrlib.tests.test_patches',
 
2469
                   'bzrlib.tests.test_permissions',
 
2470
                   'bzrlib.tests.test_plugins',
 
2471
                   'bzrlib.tests.test_progress',
 
2472
                   'bzrlib.tests.test_reconfigure',
 
2473
                   'bzrlib.tests.test_reconcile',
 
2474
                   'bzrlib.tests.test_registry',
 
2475
                   'bzrlib.tests.test_remote',
 
2476
                   'bzrlib.tests.test_repository',
 
2477
                   'bzrlib.tests.test_revert',
 
2478
                   'bzrlib.tests.test_revision',
 
2479
                   'bzrlib.tests.test_revisionnamespaces',
 
2480
                   'bzrlib.tests.test_revisiontree',
 
2481
                   'bzrlib.tests.test_rio',
 
2482
                   'bzrlib.tests.test_sampler',
 
2483
                   'bzrlib.tests.test_selftest',
 
2484
                   'bzrlib.tests.test_setup',
 
2485
                   'bzrlib.tests.test_sftp_transport',
 
2486
                   'bzrlib.tests.test_smart',
 
2487
                   'bzrlib.tests.test_smart_add',
 
2488
                   'bzrlib.tests.test_smart_transport',
 
2489
                   'bzrlib.tests.test_smtp_connection',
 
2490
                   'bzrlib.tests.test_source',
 
2491
                   'bzrlib.tests.test_ssh_transport',
 
2492
                   'bzrlib.tests.test_status',
 
2493
                   'bzrlib.tests.test_store',
 
2494
                   'bzrlib.tests.test_strace',
 
2495
                   'bzrlib.tests.test_subsume',
 
2496
                   'bzrlib.tests.test_switch',
 
2497
                   'bzrlib.tests.test_symbol_versioning',
 
2498
                   'bzrlib.tests.test_tag',
 
2499
                   'bzrlib.tests.test_testament',
 
2500
                   'bzrlib.tests.test_textfile',
 
2501
                   'bzrlib.tests.test_textmerge',
 
2502
                   'bzrlib.tests.test_timestamp',
 
2503
                   'bzrlib.tests.test_trace',
 
2504
                   'bzrlib.tests.test_transactions',
 
2505
                   'bzrlib.tests.test_transform',
 
2506
                   'bzrlib.tests.test_transport',
 
2507
                   'bzrlib.tests.test_tree',
 
2508
                   'bzrlib.tests.test_treebuilder',
 
2509
                   'bzrlib.tests.test_tsort',
 
2510
                   'bzrlib.tests.test_tuned_gzip',
 
2511
                   'bzrlib.tests.test_ui',
 
2512
                   'bzrlib.tests.test_upgrade',
 
2513
                   'bzrlib.tests.test_urlutils',
 
2514
                   'bzrlib.tests.test_versionedfile',
 
2515
                   'bzrlib.tests.test_version',
 
2516
                   'bzrlib.tests.test_version_info',
 
2517
                   'bzrlib.tests.test_weave',
 
2518
                   'bzrlib.tests.test_whitebox',
 
2519
                   'bzrlib.tests.test_win32utils',
 
2520
                   'bzrlib.tests.test_workingtree',
 
2521
                   'bzrlib.tests.test_workingtree_4',
 
2522
                   'bzrlib.tests.test_wsgi',
 
2523
                   'bzrlib.tests.test_xml',
 
2524
                   ]
 
2525
    test_transport_implementations = [
 
2526
        'bzrlib.tests.test_transport_implementations',
 
2527
        'bzrlib.tests.test_read_bundle',
 
2528
        ]
 
2529
    suite = TestUtil.TestSuite()
 
2530
    loader = TestUtil.TestLoader()
 
2531
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2532
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2533
    adapter = TransportTestProviderAdapter()
 
2534
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2535
    adapt_tests(
 
2536
        ["bzrlib.tests.test_msgeditor.MsgEditorTest."
 
2537
         "test__create_temp_file_with_commit_template_in_unicode_dir"],
 
2538
        EncodingTestAdapter(), loader, suite)
 
2539
    for package in packages_to_test():
 
2540
        suite.addTest(package.test_suite())
 
2541
    for m in MODULES_TO_TEST:
 
2542
        suite.addTest(loader.loadTestsFromModule(m))
 
2543
    for m in MODULES_TO_DOCTEST:
 
2544
        try:
 
2545
            suite.addTest(doctest.DocTestSuite(m))
 
2546
        except ValueError, e:
 
2547
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2548
            raise
 
2549
    default_encoding = sys.getdefaultencoding()
 
2550
    for name, plugin in bzrlib.plugin.plugins().items():
 
2551
        try:
 
2552
            plugin_suite = plugin.test_suite()
 
2553
        except ImportError, e:
 
2554
            bzrlib.trace.warning(
 
2555
                'Unable to test plugin "%s": %s', name, e)
 
2556
        else:
 
2557
            if plugin_suite is not None:
 
2558
                suite.addTest(plugin_suite)
 
2559
        if default_encoding != sys.getdefaultencoding():
 
2560
            bzrlib.trace.warning(
 
2561
                'Plugin "%s" tried to reset default encoding to: %s', name,
 
2562
                sys.getdefaultencoding())
 
2563
            reload(sys)
 
2564
            sys.setdefaultencoding(default_encoding)
 
2565
    return suite
 
2566
 
 
2567
 
 
2568
def multiply_tests_from_modules(module_name_list, scenario_iter):
 
2569
    """Adapt all tests in some given modules to given scenarios.
 
2570
 
 
2571
    This is the recommended public interface for test parameterization.
 
2572
    Typically the test_suite() method for a per-implementation test
 
2573
    suite will call multiply_tests_from_modules and return the 
 
2574
    result.
 
2575
 
 
2576
    :param module_name_list: List of fully-qualified names of test
 
2577
        modules.
 
2578
    :param scenario_iter: Iterable of pairs of (scenario_name, 
 
2579
        scenario_param_dict).
 
2580
 
 
2581
    This returns a new TestSuite containing the cross product of
 
2582
    all the tests in all the modules, each repeated for each scenario.
 
2583
    Each test is adapted by adding the scenario name at the end 
 
2584
    of its name, and updating the test object's __dict__ with the
 
2585
    scenario_param_dict.
 
2586
 
 
2587
    >>> r = multiply_tests_from_modules(
 
2588
    ...     ['bzrlib.tests.test_sampler'],
 
2589
    ...     [('one', dict(param=1)), 
 
2590
    ...      ('two', dict(param=2))])
 
2591
    >>> tests = list(iter_suite_tests(r))
 
2592
    >>> len(tests)
 
2593
    2
 
2594
    >>> tests[0].id()
 
2595
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
 
2596
    >>> tests[0].param
 
2597
    1
 
2598
    >>> tests[1].param
 
2599
    2
 
2600
    """
 
2601
    loader = TestLoader()
 
2602
    suite = TestSuite()
 
2603
    adapter = TestScenarioApplier()
 
2604
    adapter.scenarios = list(scenario_iter)
 
2605
    adapt_modules(module_name_list, adapter, loader, suite)
 
2606
    return suite
 
2607
 
 
2608
 
 
2609
def multiply_scenarios(scenarios_left, scenarios_right):
 
2610
    """Multiply two sets of scenarios.
 
2611
 
 
2612
    :returns: the cartesian product of the two sets of scenarios, that is
 
2613
        a scenario for every possible combination of a left scenario and a
 
2614
        right scenario.
 
2615
    """
 
2616
    return [
 
2617
        ('%s,%s' % (left_name, right_name),
 
2618
         dict(left_dict.items() + right_dict.items()))
 
2619
        for left_name, left_dict in scenarios_left
 
2620
        for right_name, right_dict in scenarios_right]
 
2621
 
 
2622
 
 
2623
 
 
2624
def adapt_modules(mods_list, adapter, loader, suite):
 
2625
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2626
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2627
        suite.addTests(adapter.adapt(test))
 
2628
 
 
2629
 
 
2630
def adapt_tests(tests_list, adapter, loader, suite):
 
2631
    """Adapt the tests in tests_list using adapter and add to suite."""
 
2632
    for test in tests_list:
 
2633
        suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
 
2634
 
 
2635
 
 
2636
def _rmtree_temp_dir(dirname):
 
2637
    # If LANG=C we probably have created some bogus paths
 
2638
    # which rmtree(unicode) will fail to delete
 
2639
    # so make sure we are using rmtree(str) to delete everything
 
2640
    # except on win32, where rmtree(str) will fail
 
2641
    # since it doesn't have the property of byte-stream paths
 
2642
    # (they are either ascii or mbcs)
 
2643
    if sys.platform == 'win32':
 
2644
        # make sure we are using the unicode win32 api
 
2645
        dirname = unicode(dirname)
 
2646
    else:
 
2647
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2648
    try:
 
2649
        osutils.rmtree(dirname)
 
2650
    except OSError, e:
 
2651
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2652
            sys.stderr.write(('Permission denied: '
 
2653
                                 'unable to remove testing dir '
 
2654
                                 '%s\n' % os.path.basename(dirname)))
 
2655
        else:
 
2656
            raise
 
2657
 
 
2658
 
 
2659
class Feature(object):
 
2660
    """An operating system Feature."""
 
2661
 
 
2662
    def __init__(self):
 
2663
        self._available = None
 
2664
 
 
2665
    def available(self):
 
2666
        """Is the feature available?
 
2667
 
 
2668
        :return: True if the feature is available.
 
2669
        """
 
2670
        if self._available is None:
 
2671
            self._available = self._probe()
 
2672
        return self._available
 
2673
 
 
2674
    def _probe(self):
 
2675
        """Implement this method in concrete features.
 
2676
 
 
2677
        :return: True if the feature is available.
 
2678
        """
 
2679
        raise NotImplementedError
 
2680
 
 
2681
    def __str__(self):
 
2682
        if getattr(self, 'feature_name', None):
 
2683
            return self.feature_name()
 
2684
        return self.__class__.__name__
 
2685
 
 
2686
 
 
2687
class _SymlinkFeature(Feature):
 
2688
 
 
2689
    def _probe(self):
 
2690
        return osutils.has_symlinks()
 
2691
 
 
2692
    def feature_name(self):
 
2693
        return 'symlinks'
 
2694
 
 
2695
SymlinkFeature = _SymlinkFeature()
 
2696
 
 
2697
 
 
2698
class _OsFifoFeature(Feature):
 
2699
 
 
2700
    def _probe(self):
 
2701
        return getattr(os, 'mkfifo', None)
 
2702
 
 
2703
    def feature_name(self):
 
2704
        return 'filesystem fifos'
 
2705
 
 
2706
OsFifoFeature = _OsFifoFeature()
 
2707
 
 
2708
 
 
2709
class TestScenarioApplier(object):
 
2710
    """A tool to apply scenarios to tests."""
 
2711
 
 
2712
    def adapt(self, test):
 
2713
        """Return a TestSuite containing a copy of test for each scenario."""
 
2714
        result = unittest.TestSuite()
 
2715
        for scenario in self.scenarios:
 
2716
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
2717
        return result
 
2718
 
 
2719
    def adapt_test_to_scenario(self, test, scenario):
 
2720
        """Copy test and apply scenario to it.
 
2721
 
 
2722
        :param test: A test to adapt.
 
2723
        :param scenario: A tuple describing the scenarion.
 
2724
            The first element of the tuple is the new test id.
 
2725
            The second element is a dict containing attributes to set on the
 
2726
            test.
 
2727
        :return: The adapted test.
 
2728
        """
 
2729
        from copy import deepcopy
 
2730
        new_test = deepcopy(test)
 
2731
        for name, value in scenario[1].items():
 
2732
            setattr(new_test, name, value)
 
2733
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
2734
        new_test.id = lambda: new_id
 
2735
        return new_test
 
2736
 
 
2737
 
 
2738
def probe_unicode_in_user_encoding():
 
2739
    """Try to encode several unicode strings to use in unicode-aware tests.
 
2740
    Return first successfull match.
 
2741
 
 
2742
    :return:  (unicode value, encoded plain string value) or (None, None)
 
2743
    """
 
2744
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
2745
    for uni_val in possible_vals:
 
2746
        try:
 
2747
            str_val = uni_val.encode(bzrlib.user_encoding)
 
2748
        except UnicodeEncodeError:
 
2749
            # Try a different character
 
2750
            pass
 
2751
        else:
 
2752
            return uni_val, str_val
 
2753
    return None, None
 
2754
 
 
2755
 
 
2756
def probe_bad_non_ascii(encoding):
 
2757
    """Try to find [bad] character with code [128..255]
 
2758
    that cannot be decoded to unicode in some encoding.
 
2759
    Return None if all non-ascii characters is valid
 
2760
    for given encoding.
 
2761
    """
 
2762
    for i in xrange(128, 256):
 
2763
        char = chr(i)
 
2764
        try:
 
2765
            char.decode(encoding)
 
2766
        except UnicodeDecodeError:
 
2767
            return char
 
2768
    return None
 
2769
 
 
2770
 
 
2771
class _FTPServerFeature(Feature):
 
2772
    """Some tests want an FTP Server, check if one is available.
 
2773
 
 
2774
    Right now, the only way this is available is if 'medusa' is installed.
 
2775
    http://www.amk.ca/python/code/medusa.html
 
2776
    """
 
2777
 
 
2778
    def _probe(self):
 
2779
        try:
 
2780
            import bzrlib.tests.ftp_server
 
2781
            return True
 
2782
        except ImportError:
 
2783
            return False
 
2784
 
 
2785
    def feature_name(self):
 
2786
        return 'FTPServer'
 
2787
 
 
2788
FTPServerFeature = _FTPServerFeature()
 
2789
 
 
2790
 
 
2791
class _CaseInsensitiveFilesystemFeature(Feature):
 
2792
    """Check if underlined filesystem is case-insensitive
 
2793
    (e.g. on Windows, Cygwin, MacOS)
 
2794
    """
 
2795
 
 
2796
    def _probe(self):
 
2797
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
 
2798
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2799
            TestCaseWithMemoryTransport.TEST_ROOT = root
 
2800
        else:
 
2801
            root = TestCaseWithMemoryTransport.TEST_ROOT
 
2802
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
 
2803
            dir=root)
 
2804
        name_a = osutils.pathjoin(tdir, 'a')
 
2805
        name_A = osutils.pathjoin(tdir, 'A')
 
2806
        os.mkdir(name_a)
 
2807
        result = osutils.isdir(name_A)
 
2808
        _rmtree_temp_dir(tdir)
 
2809
        return result
 
2810
 
 
2811
    def feature_name(self):
 
2812
        return 'case-insensitive filesystem'
 
2813
 
 
2814
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()