~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/__init__.py

  • Committer: Martin Pool
  • Date: 2005-11-04 01:46:31 UTC
  • mto: (1185.33.49 bzr.dev)
  • mto: This revision was merged to the branch mainline in revision 1512.
  • Revision ID: mbp@sourcefrog.net-20051104014631-750e0ad4172c952c
Make biobench directly executable

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
23
 
 
24
 
# NOTE: Some classes in here use camelCaseNaming() rather than
25
 
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
 
# new assertFoo() methods.
28
 
 
29
 
import atexit
30
 
import codecs
31
18
from cStringIO import StringIO
32
19
import difflib
33
 
import doctest
34
20
import errno
35
21
import logging
36
22
import os
37
 
from pprint import pformat
38
 
import random
39
23
import re
40
 
import shlex
41
 
import stat
42
 
from subprocess import Popen, PIPE
 
24
import shutil
43
25
import sys
44
26
import tempfile
 
27
import unittest
45
28
import time
46
 
import unittest
47
 
import warnings
48
 
 
49
 
 
50
 
from bzrlib import (
51
 
    bzrdir,
52
 
    debug,
53
 
    errors,
54
 
    memorytree,
55
 
    osutils,
56
 
    progress,
57
 
    ui,
58
 
    urlutils,
59
 
    workingtree,
60
 
    )
61
 
import bzrlib.branch
 
29
 
62
30
import bzrlib.commands
63
 
import bzrlib.timestamp
64
 
import bzrlib.export
65
 
import bzrlib.inventory
66
 
import bzrlib.iterablefile
67
 
import bzrlib.lockdir
68
 
try:
69
 
    import bzrlib.lsprof
70
 
except ImportError:
71
 
    # lsprof not available
72
 
    pass
73
 
from bzrlib.merge import merge_inner
74
 
import bzrlib.merge3
75
 
import bzrlib.plugin
76
 
from bzrlib.revision import common_ancestor
77
 
import bzrlib.store
78
 
from bzrlib import symbol_versioning
79
 
from bzrlib.symbol_versioning import (
80
 
    DEPRECATED_PARAMETER,
81
 
    deprecated_function,
82
 
    deprecated_method,
83
 
    deprecated_passed,
84
 
    zero_ninetyone,
85
 
    zero_ninetytwo,
86
 
    one_zero,
87
 
    )
88
31
import bzrlib.trace
89
 
from bzrlib.transport import get_transport
90
 
import bzrlib.transport
91
 
from bzrlib.transport.local import LocalURLServer
92
 
from bzrlib.transport.memory import MemoryServer
93
 
from bzrlib.transport.readonly import ReadonlyServer
94
 
from bzrlib.trace import mutter, note
95
 
from bzrlib.tests import TestUtil
96
 
from bzrlib.tests.http_server import HttpServer
97
 
from bzrlib.tests.TestUtil import (
98
 
                          TestSuite,
99
 
                          TestLoader,
100
 
                          )
101
 
from bzrlib.tests.treeshape import build_tree_contents
102
 
import bzrlib.version_info_formats.format_custom
103
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
104
 
 
105
 
# Mark this python module as being part of the implementation
106
 
# of unittest: this gives us better tracebacks where the last
107
 
# shown frame is the test code, not our assertXYZ.
108
 
__unittest = 1
109
 
 
110
 
default_transport = LocalURLServer
 
32
import bzrlib.fetch
 
33
import bzrlib.osutils as osutils
 
34
from bzrlib.selftest import TestUtil
 
35
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
 
36
from bzrlib.selftest.treeshape import build_tree_contents
111
37
 
112
38
MODULES_TO_TEST = []
113
 
MODULES_TO_DOCTEST = [
114
 
        bzrlib,
115
 
        bzrlib.timestamp,
116
 
        bzrlib.errors,
117
 
        bzrlib.export,
118
 
        bzrlib.inventory,
119
 
        bzrlib.iterablefile,
120
 
        bzrlib.lockdir,
121
 
        bzrlib.merge3,
122
 
        bzrlib.option,
123
 
        bzrlib.store,
124
 
        bzrlib.version_info_formats.format_custom,
125
 
        # quoted to avoid module-loading circularity
126
 
        'bzrlib.tests',
127
 
        ]
128
 
 
129
 
 
130
 
def packages_to_test():
131
 
    """Return a list of packages to test.
132
 
 
133
 
    The packages are not globally imported so that import failures are
134
 
    triggered when running selftest, not when importing the command.
135
 
    """
136
 
    import bzrlib.doc
137
 
    import bzrlib.tests.blackbox
138
 
    import bzrlib.tests.branch_implementations
139
 
    import bzrlib.tests.bzrdir_implementations
140
 
    import bzrlib.tests.commands
141
 
    import bzrlib.tests.interrepository_implementations
142
 
    import bzrlib.tests.interversionedfile_implementations
143
 
    import bzrlib.tests.intertree_implementations
144
 
    import bzrlib.tests.inventory_implementations
145
 
    import bzrlib.tests.per_lock
146
 
    import bzrlib.tests.repository_implementations
147
 
    import bzrlib.tests.revisionstore_implementations
148
 
    import bzrlib.tests.tree_implementations
149
 
    import bzrlib.tests.workingtree_implementations
150
 
    return [
151
 
            bzrlib.doc,
152
 
            bzrlib.tests.blackbox,
153
 
            bzrlib.tests.branch_implementations,
154
 
            bzrlib.tests.bzrdir_implementations,
155
 
            bzrlib.tests.commands,
156
 
            bzrlib.tests.interrepository_implementations,
157
 
            bzrlib.tests.interversionedfile_implementations,
158
 
            bzrlib.tests.intertree_implementations,
159
 
            bzrlib.tests.inventory_implementations,
160
 
            bzrlib.tests.per_lock,
161
 
            bzrlib.tests.repository_implementations,
162
 
            bzrlib.tests.revisionstore_implementations,
163
 
            bzrlib.tests.tree_implementations,
164
 
            bzrlib.tests.workingtree_implementations,
165
 
            ]
166
 
 
167
 
 
168
 
class ExtendedTestResult(unittest._TextTestResult):
169
 
    """Accepts, reports and accumulates the results of running tests.
170
 
 
171
 
    Compared to the unittest version this class adds support for
172
 
    profiling, benchmarking, stopping as soon as a test fails,  and
173
 
    skipping tests.  There are further-specialized subclasses for
174
 
    different types of display.
175
 
 
176
 
    When a test finishes, in whatever way, it calls one of the addSuccess,
177
 
    addFailure or addError classes.  These in turn may redirect to a more
178
 
    specific case for the special test results supported by our extended
179
 
    tests.
180
 
 
181
 
    Note that just one of these objects is fed the results from many tests.
182
 
    """
183
 
 
184
 
    stop_early = False
185
 
    
186
 
    def __init__(self, stream, descriptions, verbosity,
187
 
                 bench_history=None,
188
 
                 num_tests=None,
189
 
                 ):
190
 
        """Construct new TestResult.
191
 
 
192
 
        :param bench_history: Optionally, a writable file object to accumulate
193
 
            benchmark results.
194
 
        """
195
 
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
196
 
        if bench_history is not None:
197
 
            from bzrlib.version import _get_bzr_source_tree
198
 
            src_tree = _get_bzr_source_tree()
199
 
            if src_tree:
200
 
                try:
201
 
                    revision_id = src_tree.get_parent_ids()[0]
202
 
                except IndexError:
203
 
                    # XXX: if this is a brand new tree, do the same as if there
204
 
                    # is no branch.
205
 
                    revision_id = ''
206
 
            else:
207
 
                # XXX: If there's no branch, what should we do?
208
 
                revision_id = ''
209
 
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
210
 
        self._bench_history = bench_history
211
 
        self.ui = ui.ui_factory
212
 
        self.num_tests = num_tests
213
 
        self.error_count = 0
214
 
        self.failure_count = 0
215
 
        self.known_failure_count = 0
216
 
        self.skip_count = 0
217
 
        self.not_applicable_count = 0
218
 
        self.unsupported = {}
219
 
        self.count = 0
220
 
        self._overall_start_time = time.time()
221
 
    
222
 
    def _extractBenchmarkTime(self, testCase):
223
 
        """Add a benchmark time for the current test case."""
224
 
        return getattr(testCase, "_benchtime", None)
225
 
    
226
 
    def _elapsedTestTimeString(self):
227
 
        """Return a time string for the overall time the current test has taken."""
228
 
        return self._formatTime(time.time() - self._start_time)
229
 
 
230
 
    def _testTimeString(self, testCase):
231
 
        benchmark_time = self._extractBenchmarkTime(testCase)
232
 
        if benchmark_time is not None:
233
 
            return "%s/%s" % (
234
 
                self._formatTime(benchmark_time),
235
 
                self._elapsedTestTimeString())
236
 
        else:
237
 
            return "           %s" % self._elapsedTestTimeString()
238
 
 
239
 
    def _formatTime(self, seconds):
240
 
        """Format seconds as milliseconds with leading spaces."""
241
 
        # some benchmarks can take thousands of seconds to run, so we need 8
242
 
        # places
243
 
        return "%8dms" % (1000 * seconds)
244
 
 
245
 
    def _shortened_test_description(self, test):
246
 
        what = test.id()
247
 
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
248
 
        return what
 
39
MODULES_TO_DOCTEST = []
 
40
 
 
41
from logging import debug, warning, error
 
42
 
 
43
 
 
44
class EarlyStoppingTestResultAdapter(object):
 
45
    """An adapter for TestResult to stop at the first first failure or error"""
 
46
 
 
47
    def __init__(self, result):
 
48
        self._result = result
 
49
 
 
50
    def addError(self, test, err):
 
51
        self._result.addError(test, err)
 
52
        self._result.stop()
 
53
 
 
54
    def addFailure(self, test, err):
 
55
        self._result.addFailure(test, err)
 
56
        self._result.stop()
 
57
 
 
58
    def __getattr__(self, name):
 
59
        return getattr(self._result, name)
 
60
 
 
61
    def __setattr__(self, name, value):
 
62
        if name == '_result':
 
63
            object.__setattr__(self, name, value)
 
64
        return setattr(self._result, name, value)
 
65
 
 
66
 
 
67
class _MyResult(unittest._TextTestResult):
 
68
    """
 
69
    Custom TestResult.
 
70
 
 
71
    No special behaviour for now.
 
72
    """
 
73
 
 
74
    def _elapsedTime(self):
 
75
        return "(Took %.3fs)" % (time.time() - self._start_time)
249
76
 
250
77
    def startTest(self, test):
251
78
        unittest.TestResult.startTest(self, test)
252
 
        self.report_test_start(test)
253
 
        test.number = self.count
254
 
        self._recordTestStartTime()
255
 
 
256
 
    def _recordTestStartTime(self):
257
 
        """Record that a test has started."""
 
79
        # TODO: Maybe show test.shortDescription somewhere?
 
80
        what = test.shortDescription() or test.id()        
 
81
        if self.showAll:
 
82
            self.stream.write('%-70.70s' % what)
 
83
        self.stream.flush()
258
84
        self._start_time = time.time()
259
85
 
260
 
    def _cleanupLogFile(self, test):
261
 
        # We can only do this if we have one of our TestCases, not if
262
 
        # we have a doctest.
263
 
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
264
 
        if setKeepLogfile is not None:
265
 
            setKeepLogfile()
266
 
 
267
86
    def addError(self, test, err):
268
 
        """Tell result that test finished with an error.
269
 
 
270
 
        Called from the TestCase run() method when the test
271
 
        fails with an unexpected error.
272
 
        """
273
 
        self._testConcluded(test)
274
 
        if isinstance(err[1], TestSkipped):
275
 
            return self._addSkipped(test, err)
276
 
        elif isinstance(err[1], UnavailableFeature):
277
 
            return self.addNotSupported(test, err[1].args[0])
278
 
        else:
279
 
            self._cleanupLogFile(test)
280
 
            unittest.TestResult.addError(self, test, err)
281
 
            self.error_count += 1
282
 
            self.report_error(test, err)
283
 
            if self.stop_early:
284
 
                self.stop()
 
87
        unittest.TestResult.addError(self, test, err)
 
88
        if self.showAll:
 
89
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
90
        elif self.dots:
 
91
            self.stream.write('E')
 
92
        self.stream.flush()
285
93
 
286
94
    def addFailure(self, test, err):
287
 
        """Tell result that test failed.
288
 
 
289
 
        Called from the TestCase run() method when the test
290
 
        fails because e.g. an assert() method failed.
291
 
        """
292
 
        self._testConcluded(test)
293
 
        if isinstance(err[1], KnownFailure):
294
 
            return self._addKnownFailure(test, err)
295
 
        else:
296
 
            self._cleanupLogFile(test)
297
 
            unittest.TestResult.addFailure(self, test, err)
298
 
            self.failure_count += 1
299
 
            self.report_failure(test, err)
300
 
            if self.stop_early:
301
 
                self.stop()
 
95
        unittest.TestResult.addFailure(self, test, err)
 
96
        if self.showAll:
 
97
            self.stream.writeln("FAIL %s" % self._elapsedTime())
 
98
        elif self.dots:
 
99
            self.stream.write('F')
 
100
        self.stream.flush()
302
101
 
303
102
    def addSuccess(self, test):
304
 
        """Tell result that test completed successfully.
305
 
 
306
 
        Called from the TestCase run()
307
 
        """
308
 
        self._testConcluded(test)
309
 
        if self._bench_history is not None:
310
 
            benchmark_time = self._extractBenchmarkTime(test)
311
 
            if benchmark_time is not None:
312
 
                self._bench_history.write("%s %s\n" % (
313
 
                    self._formatTime(benchmark_time),
314
 
                    test.id()))
315
 
        self.report_success(test)
316
 
        self._cleanupLogFile(test)
 
103
        if self.showAll:
 
104
            self.stream.writeln('OK %s' % self._elapsedTime())
 
105
        elif self.dots:
 
106
            self.stream.write('~')
 
107
        self.stream.flush()
317
108
        unittest.TestResult.addSuccess(self, test)
318
109
 
319
 
    def _testConcluded(self, test):
320
 
        """Common code when a test has finished.
321
 
 
322
 
        Called regardless of whether it succeded, failed, etc.
323
 
        """
324
 
        pass
325
 
 
326
 
    def _addKnownFailure(self, test, err):
327
 
        self.known_failure_count += 1
328
 
        self.report_known_failure(test, err)
329
 
 
330
 
    def addNotSupported(self, test, feature):
331
 
        """The test will not be run because of a missing feature.
332
 
        """
333
 
        # this can be called in two different ways: it may be that the
334
 
        # test started running, and then raised (through addError) 
335
 
        # UnavailableFeature.  Alternatively this method can be called
336
 
        # while probing for features before running the tests; in that
337
 
        # case we will see startTest and stopTest, but the test will never
338
 
        # actually run.
339
 
        self.unsupported.setdefault(str(feature), 0)
340
 
        self.unsupported[str(feature)] += 1
341
 
        self.report_unsupported(test, feature)
342
 
 
343
 
    def _addSkipped(self, test, skip_excinfo):
344
 
        if isinstance(skip_excinfo[1], TestNotApplicable):
345
 
            self.not_applicable_count += 1
346
 
            self.report_not_applicable(test, skip_excinfo)
347
 
        else:
348
 
            self.skip_count += 1
349
 
            self.report_skip(test, skip_excinfo)
350
 
        try:
351
 
            test.tearDown()
352
 
        except KeyboardInterrupt:
353
 
            raise
354
 
        except:
355
 
            self.addError(test, test._exc_info())
356
 
        else:
357
 
            # seems best to treat this as success from point-of-view of unittest
358
 
            # -- it actually does nothing so it barely matters :)
359
 
            unittest.TestResult.addSuccess(self, test)
360
 
 
361
110
    def printErrorList(self, flavour, errors):
362
111
        for test, err in errors:
363
112
            self.stream.writeln(self.separator1)
364
 
            self.stream.write("%s: " % flavour)
365
 
            self.stream.writeln(self.getDescription(test))
366
 
            if getattr(test, '_get_log', None) is not None:
367
 
                self.stream.write('\n')
368
 
                self.stream.write(
369
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
370
 
                self.stream.write('\n')
371
 
                self.stream.write(test._get_log())
372
 
                self.stream.write('\n')
373
 
                self.stream.write(
374
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
375
 
                self.stream.write('\n')
 
113
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
 
114
            if hasattr(test, '_get_log'):
 
115
                self.stream.writeln()
 
116
                self.stream.writeln('log from this test:')
 
117
                print >>self.stream, test._get_log()
376
118
            self.stream.writeln(self.separator2)
377
119
            self.stream.writeln("%s" % err)
378
120
 
379
 
    def finished(self):
380
 
        pass
381
 
 
382
 
    def report_cleaning_up(self):
383
 
        pass
384
 
 
385
 
    def report_success(self, test):
386
 
        pass
387
 
 
388
 
    def wasStrictlySuccessful(self):
389
 
        if self.unsupported or self.known_failure_count:
390
 
            return False
391
 
        return self.wasSuccessful()
392
 
 
393
 
 
394
 
class TextTestResult(ExtendedTestResult):
395
 
    """Displays progress and results of tests in text form"""
396
 
 
397
 
    def __init__(self, stream, descriptions, verbosity,
398
 
                 bench_history=None,
399
 
                 num_tests=None,
400
 
                 pb=None,
401
 
                 ):
402
 
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
403
 
            bench_history, num_tests)
404
 
        if pb is None:
405
 
            self.pb = self.ui.nested_progress_bar()
406
 
            self._supplied_pb = False
407
 
        else:
408
 
            self.pb = pb
409
 
            self._supplied_pb = True
410
 
        self.pb.show_pct = False
411
 
        self.pb.show_spinner = False
412
 
        self.pb.show_eta = False,
413
 
        self.pb.show_count = False
414
 
        self.pb.show_bar = False
415
 
 
416
 
    def report_starting(self):
417
 
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
418
 
 
419
 
    def _progress_prefix_text(self):
420
 
        a = '[%d' % self.count
421
 
        if self.num_tests is not None:
422
 
            a +='/%d' % self.num_tests
423
 
        a += ' in %ds' % (time.time() - self._overall_start_time)
424
 
        if self.error_count:
425
 
            a += ', %d errors' % self.error_count
426
 
        if self.failure_count:
427
 
            a += ', %d failed' % self.failure_count
428
 
        if self.known_failure_count:
429
 
            a += ', %d known failures' % self.known_failure_count
430
 
        if self.skip_count:
431
 
            a += ', %d skipped' % self.skip_count
432
 
        if self.unsupported:
433
 
            a += ', %d missing features' % len(self.unsupported)
434
 
        a += ']'
435
 
        return a
436
 
 
437
 
    def report_test_start(self, test):
438
 
        self.count += 1
439
 
        self.pb.update(
440
 
                self._progress_prefix_text()
441
 
                + ' ' 
442
 
                + self._shortened_test_description(test))
443
 
 
444
 
    def _test_description(self, test):
445
 
        return self._shortened_test_description(test)
446
 
 
447
 
    def report_error(self, test, err):
448
 
        self.pb.note('ERROR: %s\n    %s\n', 
449
 
            self._test_description(test),
450
 
            err[1],
451
 
            )
452
 
 
453
 
    def report_failure(self, test, err):
454
 
        self.pb.note('FAIL: %s\n    %s\n', 
455
 
            self._test_description(test),
456
 
            err[1],
457
 
            )
458
 
 
459
 
    def report_known_failure(self, test, err):
460
 
        self.pb.note('XFAIL: %s\n%s\n',
461
 
            self._test_description(test), err[1])
462
 
 
463
 
    def report_skip(self, test, skip_excinfo):
464
 
        pass
465
 
 
466
 
    def report_not_applicable(self, test, skip_excinfo):
467
 
        pass
468
 
 
469
 
    def report_unsupported(self, test, feature):
470
 
        """test cannot be run because feature is missing."""
471
 
                  
472
 
    def report_cleaning_up(self):
473
 
        self.pb.update('cleaning up...')
474
 
 
475
 
    def finished(self):
476
 
        if not self._supplied_pb:
477
 
            self.pb.finished()
478
 
 
479
 
 
480
 
class VerboseTestResult(ExtendedTestResult):
481
 
    """Produce long output, with one line per test run plus times"""
482
 
 
483
 
    def _ellipsize_to_right(self, a_string, final_width):
484
 
        """Truncate and pad a string, keeping the right hand side"""
485
 
        if len(a_string) > final_width:
486
 
            result = '...' + a_string[3-final_width:]
487
 
        else:
488
 
            result = a_string
489
 
        return result.ljust(final_width)
490
 
 
491
 
    def report_starting(self):
492
 
        self.stream.write('running %d tests...\n' % self.num_tests)
493
 
 
494
 
    def report_test_start(self, test):
495
 
        self.count += 1
496
 
        name = self._shortened_test_description(test)
497
 
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
498
 
        # numbers, plus a trailing blank
499
 
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
500
 
        self.stream.write(self._ellipsize_to_right(name,
501
 
                          osutils.terminal_width()-30))
502
 
        self.stream.flush()
503
 
 
504
 
    def _error_summary(self, err):
505
 
        indent = ' ' * 4
506
 
        return '%s%s' % (indent, err[1])
507
 
 
508
 
    def report_error(self, test, err):
509
 
        self.stream.writeln('ERROR %s\n%s'
510
 
                % (self._testTimeString(test),
511
 
                   self._error_summary(err)))
512
 
 
513
 
    def report_failure(self, test, err):
514
 
        self.stream.writeln(' FAIL %s\n%s'
515
 
                % (self._testTimeString(test),
516
 
                   self._error_summary(err)))
517
 
 
518
 
    def report_known_failure(self, test, err):
519
 
        self.stream.writeln('XFAIL %s\n%s'
520
 
                % (self._testTimeString(test),
521
 
                   self._error_summary(err)))
522
 
 
523
 
    def report_success(self, test):
524
 
        self.stream.writeln('   OK %s' % self._testTimeString(test))
525
 
        for bench_called, stats in getattr(test, '_benchcalls', []):
526
 
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
527
 
            stats.pprint(file=self.stream)
528
 
        # flush the stream so that we get smooth output. This verbose mode is
529
 
        # used to show the output in PQM.
530
 
        self.stream.flush()
531
 
 
532
 
    def report_skip(self, test, skip_excinfo):
533
 
        self.stream.writeln(' SKIP %s\n%s'
534
 
                % (self._testTimeString(test),
535
 
                   self._error_summary(skip_excinfo)))
536
 
 
537
 
    def report_not_applicable(self, test, skip_excinfo):
538
 
        self.stream.writeln('  N/A %s\n%s'
539
 
                % (self._testTimeString(test),
540
 
                   self._error_summary(skip_excinfo)))
541
 
 
542
 
    def report_unsupported(self, test, feature):
543
 
        """test cannot be run because feature is missing."""
544
 
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
545
 
                %(self._testTimeString(test), feature))
546
 
 
547
 
 
548
 
class TextTestRunner(object):
 
121
 
 
122
class TextTestRunner(unittest.TextTestRunner):
549
123
    stop_on_failure = False
550
124
 
551
 
    def __init__(self,
552
 
                 stream=sys.stderr,
553
 
                 descriptions=0,
554
 
                 verbosity=1,
555
 
                 bench_history=None,
556
 
                 list_only=False
557
 
                 ):
558
 
        self.stream = unittest._WritelnDecorator(stream)
559
 
        self.descriptions = descriptions
560
 
        self.verbosity = verbosity
561
 
        self._bench_history = bench_history
562
 
        self.list_only = list_only
563
 
 
564
 
    def run(self, test):
565
 
        "Run the given test case or test suite."
566
 
        startTime = time.time()
567
 
        if self.verbosity == 1:
568
 
            result_class = TextTestResult
569
 
        elif self.verbosity >= 2:
570
 
            result_class = VerboseTestResult
571
 
        result = result_class(self.stream,
572
 
                              self.descriptions,
573
 
                              self.verbosity,
574
 
                              bench_history=self._bench_history,
575
 
                              num_tests=test.countTestCases(),
576
 
                              )
577
 
        result.stop_early = self.stop_on_failure
578
 
        result.report_starting()
579
 
        if self.list_only:
580
 
            if self.verbosity >= 2:
581
 
                self.stream.writeln("Listing tests only ...\n")
582
 
            run = 0
583
 
            for t in iter_suite_tests(test):
584
 
                self.stream.writeln("%s" % (t.id()))
585
 
                run += 1
586
 
            actionTaken = "Listed"
587
 
        else: 
588
 
            test.run(result)
589
 
            run = result.testsRun
590
 
            actionTaken = "Ran"
591
 
        stopTime = time.time()
592
 
        timeTaken = stopTime - startTime
593
 
        result.printErrors()
594
 
        self.stream.writeln(result.separator2)
595
 
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
596
 
                            run, run != 1 and "s" or "", timeTaken))
597
 
        self.stream.writeln()
598
 
        if not result.wasSuccessful():
599
 
            self.stream.write("FAILED (")
600
 
            failed, errored = map(len, (result.failures, result.errors))
601
 
            if failed:
602
 
                self.stream.write("failures=%d" % failed)
603
 
            if errored:
604
 
                if failed: self.stream.write(", ")
605
 
                self.stream.write("errors=%d" % errored)
606
 
            if result.known_failure_count:
607
 
                if failed or errored: self.stream.write(", ")
608
 
                self.stream.write("known_failure_count=%d" %
609
 
                    result.known_failure_count)
610
 
            self.stream.writeln(")")
611
 
        else:
612
 
            if result.known_failure_count:
613
 
                self.stream.writeln("OK (known_failures=%d)" %
614
 
                    result.known_failure_count)
615
 
            else:
616
 
                self.stream.writeln("OK")
617
 
        if result.skip_count > 0:
618
 
            skipped = result.skip_count
619
 
            self.stream.writeln('%d test%s skipped' %
620
 
                                (skipped, skipped != 1 and "s" or ""))
621
 
        if result.unsupported:
622
 
            for feature, count in sorted(result.unsupported.items()):
623
 
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
624
 
                    (feature, count))
625
 
        result.finished()
 
125
    def _makeResult(self):
 
126
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
127
        if self.stop_on_failure:
 
128
            result = EarlyStoppingTestResultAdapter(result)
626
129
        return result
627
130
 
628
131
 
641
144
 
642
145
class TestSkipped(Exception):
643
146
    """Indicates that a test was intentionally skipped, rather than failing."""
644
 
 
645
 
 
646
 
class TestNotApplicable(TestSkipped):
647
 
    """A test is not applicable to the situation where it was run.
648
 
 
649
 
    This is only normally raised by parameterized tests, if they find that 
650
 
    the instance they're constructed upon does not support one aspect 
651
 
    of its interface.
652
 
    """
653
 
 
654
 
 
655
 
class KnownFailure(AssertionError):
656
 
    """Indicates that a test failed in a precisely expected manner.
657
 
 
658
 
    Such failures dont block the whole test suite from passing because they are
659
 
    indicators of partially completed code or of future work. We have an
660
 
    explicit error for them so that we can ensure that they are always visible:
661
 
    KnownFailures are always shown in the output of bzr selftest.
662
 
    """
663
 
 
664
 
 
665
 
class UnavailableFeature(Exception):
666
 
    """A feature required for this test was not available.
667
 
 
668
 
    The feature should be used to construct the exception.
669
 
    """
 
147
    # XXX: Not used yet
670
148
 
671
149
 
672
150
class CommandFailed(Exception):
673
151
    pass
674
152
 
675
 
 
676
 
class StringIOWrapper(object):
677
 
    """A wrapper around cStringIO which just adds an encoding attribute.
678
 
    
679
 
    Internally we can check sys.stdout to see what the output encoding
680
 
    should be. However, cStringIO has no encoding attribute that we can
681
 
    set. So we wrap it instead.
682
 
    """
683
 
    encoding='ascii'
684
 
    _cstring = None
685
 
 
686
 
    def __init__(self, s=None):
687
 
        if s is not None:
688
 
            self.__dict__['_cstring'] = StringIO(s)
689
 
        else:
690
 
            self.__dict__['_cstring'] = StringIO()
691
 
 
692
 
    def __getattr__(self, name, getattr=getattr):
693
 
        return getattr(self.__dict__['_cstring'], name)
694
 
 
695
 
    def __setattr__(self, name, val):
696
 
        if name == 'encoding':
697
 
            self.__dict__['encoding'] = val
698
 
        else:
699
 
            return setattr(self._cstring, name, val)
700
 
 
701
 
 
702
 
class TestUIFactory(ui.CLIUIFactory):
703
 
    """A UI Factory for testing.
704
 
 
705
 
    Hide the progress bar but emit note()s.
706
 
    Redirect stdin.
707
 
    Allows get_password to be tested without real tty attached.
708
 
    """
709
 
 
710
 
    def __init__(self,
711
 
                 stdout=None,
712
 
                 stderr=None,
713
 
                 stdin=None):
714
 
        super(TestUIFactory, self).__init__()
715
 
        if stdin is not None:
716
 
            # We use a StringIOWrapper to be able to test various
717
 
            # encodings, but the user is still responsible to
718
 
            # encode the string and to set the encoding attribute
719
 
            # of StringIOWrapper.
720
 
            self.stdin = StringIOWrapper(stdin)
721
 
        if stdout is None:
722
 
            self.stdout = sys.stdout
723
 
        else:
724
 
            self.stdout = stdout
725
 
        if stderr is None:
726
 
            self.stderr = sys.stderr
727
 
        else:
728
 
            self.stderr = stderr
729
 
 
730
 
    def clear(self):
731
 
        """See progress.ProgressBar.clear()."""
732
 
 
733
 
    def clear_term(self):
734
 
        """See progress.ProgressBar.clear_term()."""
735
 
 
736
 
    def clear_term(self):
737
 
        """See progress.ProgressBar.clear_term()."""
738
 
 
739
 
    def finished(self):
740
 
        """See progress.ProgressBar.finished()."""
741
 
 
742
 
    def note(self, fmt_string, *args, **kwargs):
743
 
        """See progress.ProgressBar.note()."""
744
 
        self.stdout.write((fmt_string + "\n") % args)
745
 
 
746
 
    def progress_bar(self):
747
 
        return self
748
 
 
749
 
    def nested_progress_bar(self):
750
 
        return self
751
 
 
752
 
    def update(self, message, count=None, total=None):
753
 
        """See progress.ProgressBar.update()."""
754
 
 
755
 
    def get_non_echoed_password(self, prompt):
756
 
        """Get password from stdin without trying to handle the echo mode"""
757
 
        if prompt:
758
 
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
759
 
        password = self.stdin.readline()
760
 
        if not password:
761
 
            raise EOFError
762
 
        if password[-1] == '\n':
763
 
            password = password[:-1]
764
 
        return password
765
 
 
766
 
 
767
153
class TestCase(unittest.TestCase):
768
154
    """Base class for bzr unit tests.
769
155
    
785
171
    accidentally overlooked.
786
172
    """
787
173
 
 
174
    BZRPATH = 'bzr'
788
175
    _log_file_name = None
789
176
    _log_contents = ''
790
 
    _keep_log_file = False
791
 
    # record lsprof data when performing benchmark calls.
792
 
    _gather_lsprof_in_benchmarks = False
793
177
 
794
 
    def __init__(self, methodName='testMethod'):
795
 
        super(TestCase, self).__init__(methodName)
 
178
    def setUp(self):
 
179
        unittest.TestCase.setUp(self)
796
180
        self._cleanups = []
797
 
 
798
 
    def setUp(self):
799
 
        unittest.TestCase.setUp(self)
800
181
        self._cleanEnvironment()
801
 
        self._silenceUI()
 
182
        bzrlib.trace.disable_default_logging()
802
183
        self._startLogFile()
803
 
        self._benchcalls = []
804
 
        self._benchtime = None
805
 
        self._clear_hooks()
806
 
        self._clear_debug_flags()
807
 
 
808
 
    def _clear_debug_flags(self):
809
 
        """Prevent externally set debug flags affecting tests.
810
 
        
811
 
        Tests that want to use debug flags can just set them in the
812
 
        debug_flags set during setup/teardown.
813
 
        """
814
 
        self._preserved_debug_flags = set(debug.debug_flags)
815
 
        debug.debug_flags.clear()
816
 
        self.addCleanup(self._restore_debug_flags)
817
 
 
818
 
    def _clear_hooks(self):
819
 
        # prevent hooks affecting tests
820
 
        import bzrlib.branch
821
 
        import bzrlib.smart.server
822
 
        self._preserved_hooks = {
823
 
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
824
 
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
825
 
            }
826
 
        self.addCleanup(self._restoreHooks)
827
 
        # reset all hooks to an empty instance of the appropriate type
828
 
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
829
 
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
830
 
 
831
 
    def _silenceUI(self):
832
 
        """Turn off UI for duration of test"""
833
 
        # by default the UI is off; tests can turn it on if they want it.
834
 
        saved = ui.ui_factory
835
 
        def _restore():
836
 
            ui.ui_factory = saved
837
 
        ui.ui_factory = ui.SilentUIFactory()
838
 
        self.addCleanup(_restore)
839
184
 
840
185
    def _ndiff_strings(self, a, b):
841
186
        """Return ndiff between two strings containing lines.
852
197
                                  charjunk=lambda x: False)
853
198
        return ''.join(difflines)
854
199
 
855
 
    def assertEqual(self, a, b, message=''):
856
 
        try:
857
 
            if a == b:
858
 
                return
859
 
        except UnicodeError, e:
860
 
            # If we can't compare without getting a UnicodeError, then
861
 
            # obviously they are different
862
 
            mutter('UnicodeError: %s', e)
863
 
        if message:
864
 
            message += '\n'
865
 
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
866
 
            % (message,
867
 
               pformat(a), pformat(b)))
868
 
 
869
 
    assertEquals = assertEqual
870
 
 
871
 
    def assertEqualDiff(self, a, b, message=None):
 
200
    def assertEqualDiff(self, a, b):
872
201
        """Assert two texts are equal, if not raise an exception.
873
202
        
874
203
        This is intended for use with multi-line strings where it can 
877
206
        # TODO: perhaps override assertEquals to call this for strings?
878
207
        if a == b:
879
208
            return
880
 
        if message is None:
881
 
            message = "texts not equal:\n"
882
 
        raise AssertionError(message +
883
 
                             self._ndiff_strings(a, b))
884
 
        
885
 
    def assertEqualMode(self, mode, mode_test):
886
 
        self.assertEqual(mode, mode_test,
887
 
                         'mode mismatch %o != %o' % (mode, mode_test))
888
 
 
889
 
    def assertPositive(self, val):
890
 
        """Assert that val is greater than 0."""
891
 
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
892
 
 
893
 
    def assertNegative(self, val):
894
 
        """Assert that val is less than 0."""
895
 
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
896
 
 
897
 
    def assertStartsWith(self, s, prefix):
898
 
        if not s.startswith(prefix):
899
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
900
 
 
901
 
    def assertEndsWith(self, s, suffix):
902
 
        """Asserts that s ends with suffix."""
903
 
        if not s.endswith(suffix):
904
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
209
        raise AssertionError("texts not equal:\n" + 
 
210
                             self._ndiff_strings(a, b))      
905
211
 
906
212
    def assertContainsRe(self, haystack, needle_re):
907
213
        """Assert that a contains something matching a regular expression."""
908
214
        if not re.search(needle_re, haystack):
909
 
            if '\n' in haystack or len(haystack) > 60:
910
 
                # a long string, format it in a more readable way
911
 
                raise AssertionError(
912
 
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
913
 
                        % (needle_re, haystack))
914
 
            else:
915
 
                raise AssertionError('pattern "%s" not found in "%s"'
916
 
                        % (needle_re, haystack))
917
 
 
918
 
    def assertNotContainsRe(self, haystack, needle_re):
919
 
        """Assert that a does not match a regular expression"""
920
 
        if re.search(needle_re, haystack):
921
 
            raise AssertionError('pattern "%s" found in "%s"'
 
215
            raise AssertionError('pattern "%s" not found in "%s"'
922
216
                    % (needle_re, haystack))
923
217
 
924
 
    def assertSubset(self, sublist, superlist):
925
 
        """Assert that every entry in sublist is present in superlist."""
926
 
        missing = set(sublist) - set(superlist)
927
 
        if len(missing) > 0:
928
 
            raise AssertionError("value(s) %r not present in container %r" %
929
 
                                 (missing, superlist))
930
 
 
931
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
932
 
        """Fail unless excClass is raised when the iterator from func is used.
933
 
        
934
 
        Many functions can return generators this makes sure
935
 
        to wrap them in a list() call to make sure the whole generator
936
 
        is run, and that the proper exception is raised.
937
 
        """
938
 
        try:
939
 
            list(func(*args, **kwargs))
940
 
        except excClass:
941
 
            return
942
 
        else:
943
 
            if getattr(excClass,'__name__', None) is not None:
944
 
                excName = excClass.__name__
945
 
            else:
946
 
                excName = str(excClass)
947
 
            raise self.failureException, "%s not raised" % excName
948
 
 
949
 
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
950
 
        """Assert that a callable raises a particular exception.
951
 
 
952
 
        :param excClass: As for the except statement, this may be either an
953
 
            exception class, or a tuple of classes.
954
 
        :param callableObj: A callable, will be passed ``*args`` and
955
 
            ``**kwargs``.
956
 
 
957
 
        Returns the exception so that you can examine it.
958
 
        """
959
 
        try:
960
 
            callableObj(*args, **kwargs)
961
 
        except excClass, e:
962
 
            return e
963
 
        else:
964
 
            if getattr(excClass,'__name__', None) is not None:
965
 
                excName = excClass.__name__
966
 
            else:
967
 
                # probably a tuple
968
 
                excName = str(excClass)
969
 
            raise self.failureException, "%s not raised" % excName
970
 
 
971
 
    def assertIs(self, left, right, message=None):
972
 
        if not (left is right):
973
 
            if message is not None:
974
 
                raise AssertionError(message)
975
 
            else:
976
 
                raise AssertionError("%r is not %r." % (left, right))
977
 
 
978
 
    def assertIsNot(self, left, right, message=None):
979
 
        if (left is right):
980
 
            if message is not None:
981
 
                raise AssertionError(message)
982
 
            else:
983
 
                raise AssertionError("%r is %r." % (left, right))
984
 
 
985
 
    def assertTransportMode(self, transport, path, mode):
986
 
        """Fail if a path does not have mode mode.
987
 
        
988
 
        If modes are not supported on this transport, the assertion is ignored.
989
 
        """
990
 
        if not transport._can_roundtrip_unix_modebits():
991
 
            return
992
 
        path_stat = transport.stat(path)
993
 
        actual_mode = stat.S_IMODE(path_stat.st_mode)
994
 
        self.assertEqual(mode, actual_mode,
995
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
996
 
 
997
 
    def assertIsSameRealPath(self, path1, path2):
998
 
        """Fail if path1 and path2 points to different files"""
999
 
        self.assertEqual(osutils.realpath(path1),
1000
 
                         osutils.realpath(path2),
1001
 
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1002
 
 
1003
 
    def assertIsInstance(self, obj, kls):
1004
 
        """Fail if obj is not an instance of kls"""
1005
 
        if not isinstance(obj, kls):
1006
 
            self.fail("%r is an instance of %s rather than %s" % (
1007
 
                obj, obj.__class__, kls))
1008
 
 
1009
 
    def expectFailure(self, reason, assertion, *args, **kwargs):
1010
 
        """Invoke a test, expecting it to fail for the given reason.
1011
 
 
1012
 
        This is for assertions that ought to succeed, but currently fail.
1013
 
        (The failure is *expected* but not *wanted*.)  Please be very precise
1014
 
        about the failure you're expecting.  If a new bug is introduced,
1015
 
        AssertionError should be raised, not KnownFailure.
1016
 
 
1017
 
        Frequently, expectFailure should be followed by an opposite assertion.
1018
 
        See example below.
1019
 
 
1020
 
        Intended to be used with a callable that raises AssertionError as the
1021
 
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
1022
 
 
1023
 
        Raises KnownFailure if the test fails.  Raises AssertionError if the
1024
 
        test succeeds.
1025
 
 
1026
 
        example usage::
1027
 
 
1028
 
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
1029
 
                             dynamic_val)
1030
 
          self.assertEqual(42, dynamic_val)
1031
 
 
1032
 
          This means that a dynamic_val of 54 will cause the test to raise
1033
 
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
1034
 
          only a dynamic_val of 42 will allow the test to pass.  Anything other
1035
 
          than 54 or 42 will cause an AssertionError.
1036
 
        """
1037
 
        try:
1038
 
            assertion(*args, **kwargs)
1039
 
        except AssertionError:
1040
 
            raise KnownFailure(reason)
1041
 
        else:
1042
 
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1043
 
 
1044
 
    def assertFileEqual(self, content, path):
1045
 
        """Fail if path does not contain 'content'."""
1046
 
        self.failUnlessExists(path)
1047
 
        f = file(path, 'rb')
1048
 
        try:
1049
 
            s = f.read()
1050
 
        finally:
1051
 
            f.close()
1052
 
        self.assertEqualDiff(content, s)
1053
 
 
1054
 
    def failUnlessExists(self, path):
1055
 
        """Fail unless path or paths, which may be abs or relative, exist."""
1056
 
        if not isinstance(path, basestring):
1057
 
            for p in path:
1058
 
                self.failUnlessExists(p)
1059
 
        else:
1060
 
            self.failUnless(osutils.lexists(path),path+" does not exist")
1061
 
 
1062
 
    def failIfExists(self, path):
1063
 
        """Fail if path or paths, which may be abs or relative, exist."""
1064
 
        if not isinstance(path, basestring):
1065
 
            for p in path:
1066
 
                self.failIfExists(p)
1067
 
        else:
1068
 
            self.failIf(osutils.lexists(path),path+" exists")
1069
 
 
1070
 
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1071
 
        """A helper for callDeprecated and applyDeprecated.
1072
 
 
1073
 
        :param a_callable: A callable to call.
1074
 
        :param args: The positional arguments for the callable
1075
 
        :param kwargs: The keyword arguments for the callable
1076
 
        :return: A tuple (warnings, result). result is the result of calling
1077
 
            a_callable(``*args``, ``**kwargs``).
1078
 
        """
1079
 
        local_warnings = []
1080
 
        def capture_warnings(msg, cls=None, stacklevel=None):
1081
 
            # we've hooked into a deprecation specific callpath,
1082
 
            # only deprecations should getting sent via it.
1083
 
            self.assertEqual(cls, DeprecationWarning)
1084
 
            local_warnings.append(msg)
1085
 
        original_warning_method = symbol_versioning.warn
1086
 
        symbol_versioning.set_warning_method(capture_warnings)
1087
 
        try:
1088
 
            result = a_callable(*args, **kwargs)
1089
 
        finally:
1090
 
            symbol_versioning.set_warning_method(original_warning_method)
1091
 
        return (local_warnings, result)
1092
 
 
1093
 
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1094
 
        """Call a deprecated callable without warning the user.
1095
 
 
1096
 
        Note that this only captures warnings raised by symbol_versioning.warn,
1097
 
        not other callers that go direct to the warning module.
1098
 
 
1099
 
        To test that a deprecated method raises an error, do something like
1100
 
        this::
1101
 
 
1102
 
        self.assertRaises(errors.ReservedId,
1103
 
            self.applyDeprecated, zero_ninetyone,
1104
 
                br.append_revision, 'current:')
1105
 
 
1106
 
        :param deprecation_format: The deprecation format that the callable
1107
 
            should have been deprecated with. This is the same type as the
1108
 
            parameter to deprecated_method/deprecated_function. If the
1109
 
            callable is not deprecated with this format, an assertion error
1110
 
            will be raised.
1111
 
        :param a_callable: A callable to call. This may be a bound method or
1112
 
            a regular function. It will be called with ``*args`` and
1113
 
            ``**kwargs``.
1114
 
        :param args: The positional arguments for the callable
1115
 
        :param kwargs: The keyword arguments for the callable
1116
 
        :return: The result of a_callable(``*args``, ``**kwargs``)
1117
 
        """
1118
 
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
1119
 
            *args, **kwargs)
1120
 
        expected_first_warning = symbol_versioning.deprecation_string(
1121
 
            a_callable, deprecation_format)
1122
 
        if len(call_warnings) == 0:
1123
 
            self.fail("No deprecation warning generated by call to %s" %
1124
 
                a_callable)
1125
 
        self.assertEqual(expected_first_warning, call_warnings[0])
1126
 
        return result
1127
 
 
1128
 
    def callCatchWarnings(self, fn, *args, **kw):
1129
 
        """Call a callable that raises python warnings.
1130
 
 
1131
 
        The caller's responsible for examining the returned warnings.
1132
 
 
1133
 
        If the callable raises an exception, the exception is not
1134
 
        caught and propagates up to the caller.  In that case, the list
1135
 
        of warnings is not available.
1136
 
 
1137
 
        :returns: ([warning_object, ...], fn_result)
1138
 
        """
1139
 
        # XXX: This is not perfect, because it completely overrides the
1140
 
        # warnings filters, and some code may depend on suppressing particular
1141
 
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
1142
 
        # though.  -- Andrew, 20071062
1143
 
        wlist = []
1144
 
        def _catcher(message, category, filename, lineno, file=None):
1145
 
            # despite the name, 'message' is normally(?) a Warning subclass
1146
 
            # instance
1147
 
            wlist.append(message)
1148
 
        saved_showwarning = warnings.showwarning
1149
 
        saved_filters = warnings.filters
1150
 
        try:
1151
 
            warnings.showwarning = _catcher
1152
 
            warnings.filters = []
1153
 
            result = fn(*args, **kw)
1154
 
        finally:
1155
 
            warnings.showwarning = saved_showwarning
1156
 
            warnings.filters = saved_filters
1157
 
        return wlist, result
1158
 
 
1159
 
    def callDeprecated(self, expected, callable, *args, **kwargs):
1160
 
        """Assert that a callable is deprecated in a particular way.
1161
 
 
1162
 
        This is a very precise test for unusual requirements. The 
1163
 
        applyDeprecated helper function is probably more suited for most tests
1164
 
        as it allows you to simply specify the deprecation format being used
1165
 
        and will ensure that that is issued for the function being called.
1166
 
 
1167
 
        Note that this only captures warnings raised by symbol_versioning.warn,
1168
 
        not other callers that go direct to the warning module.  To catch
1169
 
        general warnings, use callCatchWarnings.
1170
 
 
1171
 
        :param expected: a list of the deprecation warnings expected, in order
1172
 
        :param callable: The callable to call
1173
 
        :param args: The positional arguments for the callable
1174
 
        :param kwargs: The keyword arguments for the callable
1175
 
        """
1176
 
        call_warnings, result = self._capture_deprecation_warnings(callable,
1177
 
            *args, **kwargs)
1178
 
        self.assertEqual(expected, call_warnings)
1179
 
        return result
1180
 
 
1181
218
    def _startLogFile(self):
1182
219
        """Send bzr and test log messages to a temporary file.
1183
220
 
1185
222
        """
1186
223
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1187
224
        self._log_file = os.fdopen(fileno, 'w+')
1188
 
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
225
        hdlr = logging.StreamHandler(self._log_file)
 
226
        hdlr.setLevel(logging.DEBUG)
 
227
        hdlr.setFormatter(logging.Formatter('%(levelname)8s  %(message)s'))
 
228
        logging.getLogger('').addHandler(hdlr)
 
229
        logging.getLogger('').setLevel(logging.DEBUG)
 
230
        self._log_hdlr = hdlr
 
231
        debug('opened log file %s', name)
1189
232
        self._log_file_name = name
1190
233
        self.addCleanup(self._finishLogFile)
1191
234
 
1192
235
    def _finishLogFile(self):
1193
236
        """Finished with the log file.
1194
237
 
1195
 
        Close the file and delete it, unless setKeepLogfile was called.
 
238
        Read contents into memory, close, and delete.
1196
239
        """
1197
 
        if self._log_file is None:
1198
 
            return
1199
 
        bzrlib.trace.pop_log_file(self._log_memento)
 
240
        self._log_file.seek(0)
 
241
        self._log_contents = self._log_file.read()
1200
242
        self._log_file.close()
1201
 
        self._log_file = None
1202
 
        if not self._keep_log_file:
1203
 
            os.remove(self._log_file_name)
1204
 
            self._log_file_name = None
1205
 
 
1206
 
    def setKeepLogfile(self):
1207
 
        """Make the logfile not be deleted when _finishLogFile is called."""
1208
 
        self._keep_log_file = True
 
243
        os.remove(self._log_file_name)
 
244
        self._log_file = self._log_file_name = None
1209
245
 
1210
246
    def addCleanup(self, callable):
1211
247
        """Arrange to run a callable when this case is torn down.
1219
255
        self._cleanups.append(callable)
1220
256
 
1221
257
    def _cleanEnvironment(self):
1222
 
        new_env = {
1223
 
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1224
 
            'HOME': os.getcwd(),
1225
 
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
1226
 
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1227
 
            'BZR_EMAIL': None,
1228
 
            'BZREMAIL': None, # may still be present in the environment
1229
 
            'EMAIL': None,
1230
 
            'BZR_PROGRESS_BAR': None,
1231
 
            'BZR_LOG': None,
1232
 
            # SSH Agent
1233
 
            'SSH_AUTH_SOCK': None,
1234
 
            # Proxies
1235
 
            'http_proxy': None,
1236
 
            'HTTP_PROXY': None,
1237
 
            'https_proxy': None,
1238
 
            'HTTPS_PROXY': None,
1239
 
            'no_proxy': None,
1240
 
            'NO_PROXY': None,
1241
 
            'all_proxy': None,
1242
 
            'ALL_PROXY': None,
1243
 
            # Nobody cares about these ones AFAIK. So far at
1244
 
            # least. If you do (care), please update this comment
1245
 
            # -- vila 20061212
1246
 
            'ftp_proxy': None,
1247
 
            'FTP_PROXY': None,
1248
 
            'BZR_REMOTE_PATH': None,
1249
 
        }
1250
 
        self.__old_env = {}
 
258
        self.oldenv = os.environ.get('HOME', None)
 
259
        os.environ['HOME'] = os.getcwd()
 
260
        self.bzr_email = os.environ.get('BZREMAIL')
 
261
        if self.bzr_email is not None:
 
262
            del os.environ['BZREMAIL']
 
263
        self.email = os.environ.get('EMAIL')
 
264
        if self.email is not None:
 
265
            del os.environ['EMAIL']
1251
266
        self.addCleanup(self._restoreEnvironment)
1252
 
        for name, value in new_env.iteritems():
1253
 
            self._captureVar(name, value)
1254
 
 
1255
 
    def _captureVar(self, name, newvalue):
1256
 
        """Set an environment variable, and reset it when finished."""
1257
 
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1258
 
 
1259
 
    def _restore_debug_flags(self):
1260
 
        debug.debug_flags.clear()
1261
 
        debug.debug_flags.update(self._preserved_debug_flags)
1262
267
 
1263
268
    def _restoreEnvironment(self):
1264
 
        for name, value in self.__old_env.iteritems():
1265
 
            osutils.set_or_unset_env(name, value)
1266
 
 
1267
 
    def _restoreHooks(self):
1268
 
        for klass, hooks in self._preserved_hooks.items():
1269
 
            setattr(klass, 'hooks', hooks)
1270
 
 
1271
 
    def knownFailure(self, reason):
1272
 
        """This test has failed for some known reason."""
1273
 
        raise KnownFailure(reason)
1274
 
 
1275
 
    def run(self, result=None):
1276
 
        if result is None: result = self.defaultTestResult()
1277
 
        for feature in getattr(self, '_test_needs_features', []):
1278
 
            if not feature.available():
1279
 
                result.startTest(self)
1280
 
                if getattr(result, 'addNotSupported', None):
1281
 
                    result.addNotSupported(self, feature)
1282
 
                else:
1283
 
                    result.addSuccess(self)
1284
 
                result.stopTest(self)
1285
 
                return
1286
 
        return unittest.TestCase.run(self, result)
 
269
        os.environ['HOME'] = self.oldenv
 
270
        if os.environ.get('BZREMAIL') is not None:
 
271
            del os.environ['BZREMAIL']
 
272
        if self.bzr_email is not None:
 
273
            os.environ['BZREMAIL'] = self.bzr_email
 
274
        if os.environ.get('EMAIL') is not None:
 
275
            del os.environ['EMAIL']
 
276
        if self.email is not None:
 
277
            os.environ['EMAIL'] = self.email
1287
278
 
1288
279
    def tearDown(self):
 
280
        logging.getLogger('').removeHandler(self._log_hdlr)
 
281
        bzrlib.trace.enable_default_logging()
 
282
        logging.debug('%s teardown', self.id())
1289
283
        self._runCleanups()
1290
284
        unittest.TestCase.tearDown(self)
1291
285
 
1292
 
    def time(self, callable, *args, **kwargs):
1293
 
        """Run callable and accrue the time it takes to the benchmark time.
1294
 
        
1295
 
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1296
 
        this will cause lsprofile statistics to be gathered and stored in
1297
 
        self._benchcalls.
1298
 
        """
1299
 
        if self._benchtime is None:
1300
 
            self._benchtime = 0
1301
 
        start = time.time()
1302
 
        try:
1303
 
            if not self._gather_lsprof_in_benchmarks:
1304
 
                return callable(*args, **kwargs)
1305
 
            else:
1306
 
                # record this benchmark
1307
 
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1308
 
                stats.sort()
1309
 
                self._benchcalls.append(((callable, args, kwargs), stats))
1310
 
                return ret
1311
 
        finally:
1312
 
            self._benchtime += time.time() - start
1313
 
 
1314
286
    def _runCleanups(self):
1315
287
        """Run registered cleanup functions. 
1316
288
 
1317
289
        This should only be called from TestCase.tearDown.
1318
290
        """
1319
 
        # TODO: Perhaps this should keep running cleanups even if 
1320
 
        # one of them fails?
1321
 
 
1322
 
        # Actually pop the cleanups from the list so tearDown running
1323
 
        # twice is safe (this happens for skipped tests).
1324
 
        while self._cleanups:
1325
 
            self._cleanups.pop()()
 
291
        for callable in reversed(self._cleanups):
 
292
            callable()
1326
293
 
1327
294
    def log(self, *args):
1328
 
        mutter(*args)
1329
 
 
1330
 
    def _get_log(self, keep_log_file=False):
1331
 
        """Get the log from bzrlib.trace calls from this test.
1332
 
 
1333
 
        :param keep_log_file: When True, if the log is still a file on disk
1334
 
            leave it as a file on disk. When False, if the log is still a file
1335
 
            on disk, the log file is deleted and the log preserved as
1336
 
            self._log_contents.
1337
 
        :return: A string containing the log.
1338
 
        """
1339
 
        # flush the log file, to get all content
1340
 
        import bzrlib.trace
1341
 
        bzrlib.trace._trace_file.flush()
1342
 
        if self._log_contents:
1343
 
            # XXX: this can hardly contain the content flushed above --vila
1344
 
            # 20080128
 
295
        logging.debug(*args)
 
296
 
 
297
    def _get_log(self):
 
298
        """Return as a string the log for this test"""
 
299
        if self._log_file_name:
 
300
            return open(self._log_file_name).read()
 
301
        else:
1345
302
            return self._log_contents
1346
 
        if self._log_file_name is not None:
1347
 
            logfile = open(self._log_file_name)
1348
 
            try:
1349
 
                log_contents = logfile.read()
1350
 
            finally:
1351
 
                logfile.close()
1352
 
            if not keep_log_file:
1353
 
                self._log_contents = log_contents
1354
 
                try:
1355
 
                    os.remove(self._log_file_name)
1356
 
                except OSError, e:
1357
 
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
1358
 
                        sys.stderr.write(('Unable to delete log file '
1359
 
                                             ' %r\n' % self._log_file_name))
1360
 
                    else:
1361
 
                        raise
1362
 
            return log_contents
1363
 
        else:
1364
 
            return "DELETED log file to reduce memory footprint"
1365
 
 
1366
 
    def requireFeature(self, feature):
1367
 
        """This test requires a specific feature is available.
1368
 
 
1369
 
        :raises UnavailableFeature: When feature is not available.
 
303
 
 
304
    def capture(self, cmd, retcode=0):
 
305
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
306
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
307
 
 
308
    def run_bzr_captured(self, argv, retcode=0):
 
309
        """Invoke bzr and return (stdout, stderr).
 
310
 
 
311
        Useful for code that wants to check the contents of the
 
312
        output, the way error messages are presented, etc.
 
313
 
 
314
        This should be the main method for tests that want to exercise the
 
315
        overall behavior of the bzr application (rather than a unit test
 
316
        or a functional test of the library.)
 
317
 
 
318
        Much of the old code runs bzr by forking a new copy of Python, but
 
319
        that is slower, harder to debug, and generally not necessary.
 
320
 
 
321
        This runs bzr through the interface that catches and reports
 
322
        errors, and with logging set to something approximating the
 
323
        default, so that error reporting can be checked.
 
324
 
 
325
        argv -- arguments to invoke bzr
 
326
        retcode -- expected return code, or None for don't-care.
1370
327
        """
1371
 
        if not feature.available():
1372
 
            raise UnavailableFeature(feature)
1373
 
 
1374
 
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1375
 
            working_dir):
1376
 
        """Run bazaar command line, splitting up a string command line."""
1377
 
        if isinstance(args, basestring):
1378
 
            # shlex don't understand unicode strings,
1379
 
            # so args should be plain string (bialix 20070906)
1380
 
            args = list(shlex.split(str(args)))
1381
 
        return self._run_bzr_core(args, retcode=retcode,
1382
 
                encoding=encoding, stdin=stdin, working_dir=working_dir,
1383
 
                )
1384
 
 
1385
 
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1386
 
            working_dir):
1387
 
        if encoding is None:
1388
 
            encoding = bzrlib.user_encoding
1389
 
        stdout = StringIOWrapper()
1390
 
        stderr = StringIOWrapper()
1391
 
        stdout.encoding = encoding
1392
 
        stderr.encoding = encoding
1393
 
 
1394
 
        self.log('run bzr: %r', args)
1395
 
        # FIXME: don't call into logging here
 
328
        stdout = StringIO()
 
329
        stderr = StringIO()
 
330
        self.log('run bzr: %s', ' '.join(argv))
1396
331
        handler = logging.StreamHandler(stderr)
 
332
        handler.setFormatter(bzrlib.trace.QuietFormatter())
1397
333
        handler.setLevel(logging.INFO)
1398
334
        logger = logging.getLogger('')
1399
335
        logger.addHandler(handler)
1400
 
        old_ui_factory = ui.ui_factory
1401
 
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1402
 
 
1403
 
        cwd = None
1404
 
        if working_dir is not None:
1405
 
            cwd = osutils.getcwd()
1406
 
            os.chdir(working_dir)
1407
 
 
1408
336
        try:
1409
 
            result = self.apply_redirected(ui.ui_factory.stdin,
1410
 
                stdout, stderr,
1411
 
                bzrlib.commands.run_bzr_catch_user_errors,
1412
 
                args)
 
337
            result = self.apply_redirected(None, stdout, stderr,
 
338
                                           bzrlib.commands.run_bzr_catch_errors,
 
339
                                           argv)
1413
340
        finally:
1414
341
            logger.removeHandler(handler)
1415
 
            ui.ui_factory = old_ui_factory
1416
 
            if cwd is not None:
1417
 
                os.chdir(cwd)
1418
 
 
1419
342
        out = stdout.getvalue()
1420
343
        err = stderr.getvalue()
1421
344
        if out:
1422
 
            self.log('output:\n%r', out)
 
345
            self.log('output:\n%s', out)
1423
346
        if err:
1424
 
            self.log('errors:\n%r', err)
 
347
            self.log('errors:\n%s', err)
1425
348
        if retcode is not None:
1426
 
            self.assertEquals(retcode, result,
1427
 
                              message='Unexpected return code')
 
349
            self.assertEquals(result, retcode)
1428
350
        return out, err
1429
351
 
1430
 
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1431
 
                working_dir=None, error_regexes=[], output_encoding=None):
 
352
    def run_bzr(self, *args, **kwargs):
1432
353
        """Invoke bzr, as if it were run from the command line.
1433
354
 
1434
 
        The argument list should not include the bzr program name - the
1435
 
        first argument is normally the bzr command.  Arguments may be
1436
 
        passed in three ways:
1437
 
 
1438
 
        1- A list of strings, eg ["commit", "a"].  This is recommended
1439
 
        when the command contains whitespace or metacharacters, or 
1440
 
        is built up at run time.
1441
 
 
1442
 
        2- A single string, eg "add a".  This is the most convenient 
1443
 
        for hardcoded commands.
1444
 
 
1445
 
        This runs bzr through the interface that catches and reports
1446
 
        errors, and with logging set to something approximating the
1447
 
        default, so that error reporting can be checked.
1448
 
 
1449
355
        This should be the main method for tests that want to exercise the
1450
356
        overall behavior of the bzr application (rather than a unit test
1451
357
        or a functional test of the library.)
1452
358
 
1453
359
        This sends the stdout/stderr results into the test's log,
1454
360
        where it may be useful for debugging.  See also run_captured.
1455
 
 
1456
 
        :keyword stdin: A string to be used as stdin for the command.
1457
 
        :keyword retcode: The status code the command should return;
1458
 
            default 0.
1459
 
        :keyword working_dir: The directory to run the command in
1460
 
        :keyword error_regexes: A list of expected error messages.  If
1461
 
            specified they must be seen in the error output of the command.
1462
 
        """
1463
 
        out, err = self._run_bzr_autosplit(
1464
 
            args=args,
1465
 
            retcode=retcode,
1466
 
            encoding=encoding,
1467
 
            stdin=stdin,
1468
 
            working_dir=working_dir,
1469
 
            )
1470
 
        for regex in error_regexes:
1471
 
            self.assertContainsRe(err, regex)
1472
 
        return out, err
1473
 
 
1474
 
    def run_bzr_error(self, error_regexes, *args, **kwargs):
1475
 
        """Run bzr, and check that stderr contains the supplied regexes
1476
 
 
1477
 
        :param error_regexes: Sequence of regular expressions which
1478
 
            must each be found in the error output. The relative ordering
1479
 
            is not enforced.
1480
 
        :param args: command-line arguments for bzr
1481
 
        :param kwargs: Keyword arguments which are interpreted by run_bzr
1482
 
            This function changes the default value of retcode to be 3,
1483
 
            since in most cases this is run when you expect bzr to fail.
1484
 
 
1485
 
        :return: (out, err) The actual output of running the command (in case
1486
 
            you want to do more inspection)
1487
 
 
1488
 
        Examples of use::
1489
 
 
1490
 
            # Make sure that commit is failing because there is nothing to do
1491
 
            self.run_bzr_error(['no changes to commit'],
1492
 
                               ['commit', '-m', 'my commit comment'])
1493
 
            # Make sure --strict is handling an unknown file, rather than
1494
 
            # giving us the 'nothing to do' error
1495
 
            self.build_tree(['unknown'])
1496
 
            self.run_bzr_error(['Commit refused because there are unknown files'],
1497
 
                               ['commit', --strict', '-m', 'my commit comment'])
1498
 
        """
1499
 
        kwargs.setdefault('retcode', 3)
1500
 
        kwargs['error_regexes'] = error_regexes
1501
 
        out, err = self.run_bzr(*args, **kwargs)
1502
 
        return out, err
1503
 
 
1504
 
    def run_bzr_subprocess(self, *args, **kwargs):
1505
 
        """Run bzr in a subprocess for testing.
1506
 
 
1507
 
        This starts a new Python interpreter and runs bzr in there. 
1508
 
        This should only be used for tests that have a justifiable need for
1509
 
        this isolation: e.g. they are testing startup time, or signal
1510
 
        handling, or early startup code, etc.  Subprocess code can't be 
1511
 
        profiled or debugged so easily.
1512
 
 
1513
 
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
1514
 
            None is supplied, the status code is not checked.
1515
 
        :keyword env_changes: A dictionary which lists changes to environment
1516
 
            variables. A value of None will unset the env variable.
1517
 
            The values must be strings. The change will only occur in the
1518
 
            child, so you don't need to fix the environment after running.
1519
 
        :keyword universal_newlines: Convert CRLF => LF
1520
 
        :keyword allow_plugins: By default the subprocess is run with
1521
 
            --no-plugins to ensure test reproducibility. Also, it is possible
1522
 
            for system-wide plugins to create unexpected output on stderr,
1523
 
            which can cause unnecessary test failures.
1524
 
        """
1525
 
        env_changes = kwargs.get('env_changes', {})
1526
 
        working_dir = kwargs.get('working_dir', None)
1527
 
        allow_plugins = kwargs.get('allow_plugins', False)
1528
 
        if len(args) == 1:
1529
 
            if isinstance(args[0], list):
1530
 
                args = args[0]
1531
 
            elif isinstance(args[0], basestring):
1532
 
                args = list(shlex.split(args[0]))
1533
 
        else:
1534
 
            symbol_versioning.warn(zero_ninetyone %
1535
 
                                   "passing varargs to run_bzr_subprocess",
1536
 
                                   DeprecationWarning, stacklevel=3)
1537
 
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
1538
 
                                            working_dir=working_dir,
1539
 
                                            allow_plugins=allow_plugins)
1540
 
        # We distinguish between retcode=None and retcode not passed.
1541
 
        supplied_retcode = kwargs.get('retcode', 0)
1542
 
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1543
 
            universal_newlines=kwargs.get('universal_newlines', False),
1544
 
            process_args=args)
1545
 
 
1546
 
    def start_bzr_subprocess(self, process_args, env_changes=None,
1547
 
                             skip_if_plan_to_signal=False,
1548
 
                             working_dir=None,
1549
 
                             allow_plugins=False):
1550
 
        """Start bzr in a subprocess for testing.
1551
 
 
1552
 
        This starts a new Python interpreter and runs bzr in there.
1553
 
        This should only be used for tests that have a justifiable need for
1554
 
        this isolation: e.g. they are testing startup time, or signal
1555
 
        handling, or early startup code, etc.  Subprocess code can't be
1556
 
        profiled or debugged so easily.
1557
 
 
1558
 
        :param process_args: a list of arguments to pass to the bzr executable,
1559
 
            for example ``['--version']``.
1560
 
        :param env_changes: A dictionary which lists changes to environment
1561
 
            variables. A value of None will unset the env variable.
1562
 
            The values must be strings. The change will only occur in the
1563
 
            child, so you don't need to fix the environment after running.
1564
 
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1565
 
            is not available.
1566
 
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
1567
 
 
1568
 
        :returns: Popen object for the started process.
1569
 
        """
1570
 
        if skip_if_plan_to_signal:
1571
 
            if not getattr(os, 'kill', None):
1572
 
                raise TestSkipped("os.kill not available.")
1573
 
 
1574
 
        if env_changes is None:
1575
 
            env_changes = {}
1576
 
        old_env = {}
1577
 
 
1578
 
        def cleanup_environment():
1579
 
            for env_var, value in env_changes.iteritems():
1580
 
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1581
 
 
1582
 
        def restore_environment():
1583
 
            for env_var, value in old_env.iteritems():
1584
 
                osutils.set_or_unset_env(env_var, value)
1585
 
 
1586
 
        bzr_path = self.get_bzr_path()
1587
 
 
1588
 
        cwd = None
1589
 
        if working_dir is not None:
1590
 
            cwd = osutils.getcwd()
1591
 
            os.chdir(working_dir)
1592
 
 
1593
 
        try:
1594
 
            # win32 subprocess doesn't support preexec_fn
1595
 
            # so we will avoid using it on all platforms, just to
1596
 
            # make sure the code path is used, and we don't break on win32
1597
 
            cleanup_environment()
1598
 
            command = [sys.executable, bzr_path]
1599
 
            if not allow_plugins:
1600
 
                command.append('--no-plugins')
1601
 
            command.extend(process_args)
1602
 
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1603
 
        finally:
1604
 
            restore_environment()
1605
 
            if cwd is not None:
1606
 
                os.chdir(cwd)
1607
 
 
1608
 
        return process
1609
 
 
1610
 
    def _popen(self, *args, **kwargs):
1611
 
        """Place a call to Popen.
1612
 
 
1613
 
        Allows tests to override this method to intercept the calls made to
1614
 
        Popen for introspection.
1615
 
        """
1616
 
        return Popen(*args, **kwargs)
1617
 
 
1618
 
    def get_bzr_path(self):
1619
 
        """Return the path of the 'bzr' executable for this test suite."""
1620
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1621
 
        if not os.path.isfile(bzr_path):
1622
 
            # We are probably installed. Assume sys.argv is the right file
1623
 
            bzr_path = sys.argv[0]
1624
 
        return bzr_path
1625
 
 
1626
 
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1627
 
                              universal_newlines=False, process_args=None):
1628
 
        """Finish the execution of process.
1629
 
 
1630
 
        :param process: the Popen object returned from start_bzr_subprocess.
1631
 
        :param retcode: The status code that is expected.  Defaults to 0.  If
1632
 
            None is supplied, the status code is not checked.
1633
 
        :param send_signal: an optional signal to send to the process.
1634
 
        :param universal_newlines: Convert CRLF => LF
1635
 
        :returns: (stdout, stderr)
1636
 
        """
1637
 
        if send_signal is not None:
1638
 
            os.kill(process.pid, send_signal)
1639
 
        out, err = process.communicate()
1640
 
 
1641
 
        if universal_newlines:
1642
 
            out = out.replace('\r\n', '\n')
1643
 
            err = err.replace('\r\n', '\n')
1644
 
 
1645
 
        if retcode is not None and retcode != process.returncode:
1646
 
            if process_args is None:
1647
 
                process_args = "(unknown args)"
1648
 
            mutter('Output of bzr %s:\n%s', process_args, out)
1649
 
            mutter('Error for bzr %s:\n%s', process_args, err)
1650
 
            self.fail('Command bzr %s failed with retcode %s != %s'
1651
 
                      % (process_args, retcode, process.returncode))
1652
 
        return [out, err]
 
361
        """
 
362
        retcode = kwargs.pop('retcode', 0)
 
363
        return self.run_bzr_captured(args, retcode)
1653
364
 
1654
365
    def check_inventory_shape(self, inv, shape):
1655
366
        """Compare an inventory to a list of expected names.
1660
371
        shape = list(shape)             # copy
1661
372
        for path, ie in inv.entries():
1662
373
            name = path.replace('\\', '/')
1663
 
            if ie.kind == 'directory':
 
374
            if ie.kind == 'dir':
1664
375
                name = name + '/'
1665
376
            if name in shape:
1666
377
                shape.remove(name)
1681
392
        if stdin is None:
1682
393
            stdin = StringIO("")
1683
394
        if stdout is None:
1684
 
            if getattr(self, "_log_file", None) is not None:
 
395
            if hasattr(self, "_log_file"):
1685
396
                stdout = self._log_file
1686
397
            else:
1687
398
                stdout = StringIO()
1688
399
        if stderr is None:
1689
 
            if getattr(self, "_log_file", None is not None):
 
400
            if hasattr(self, "_log_file"):
1690
401
                stderr = self._log_file
1691
402
            else:
1692
403
                stderr = StringIO()
1703
414
            sys.stderr = real_stderr
1704
415
            sys.stdin = real_stdin
1705
416
 
1706
 
    def reduceLockdirTimeout(self):
1707
 
        """Reduce the default lock timeout for the duration of the test, so that
1708
 
        if LockContention occurs during a test, it does so quickly.
1709
 
 
1710
 
        Tests that expect to provoke LockContention errors should call this.
1711
 
        """
1712
 
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1713
 
        def resetTimeout():
1714
 
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1715
 
        self.addCleanup(resetTimeout)
1716
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1717
 
 
1718
 
    def make_utf8_encoded_stringio(self, encoding_type=None):
1719
 
        """Return a StringIOWrapper instance, that will encode Unicode
1720
 
        input to UTF-8.
1721
 
        """
1722
 
        if encoding_type is None:
1723
 
            encoding_type = 'strict'
1724
 
        sio = StringIO()
1725
 
        output_encoding = 'utf-8'
1726
 
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1727
 
        sio.encoding = output_encoding
1728
 
        return sio
1729
 
 
1730
 
 
1731
 
class TestCaseWithMemoryTransport(TestCase):
1732
 
    """Common test class for tests that do not need disk resources.
1733
 
 
1734
 
    Tests that need disk resources should derive from TestCaseWithTransport.
1735
 
 
1736
 
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1737
 
 
1738
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1739
 
    a directory which does not exist. This serves to help ensure test isolation
1740
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1741
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
1742
 
    file defaults for the transport in tests, nor does it obey the command line
1743
 
    override, so tests that accidentally write to the common directory should
1744
 
    be rare.
1745
 
 
1746
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
1747
 
    a .bzr directory that stops us ascending higher into the filesystem.
1748
 
    """
1749
 
 
1750
 
    TEST_ROOT = None
1751
 
    _TEST_NAME = 'test'
1752
 
 
1753
 
    def __init__(self, methodName='runTest'):
1754
 
        # allow test parameterization after test construction and before test
1755
 
        # execution. Variables that the parameterizer sets need to be 
1756
 
        # ones that are not set by setUp, or setUp will trash them.
1757
 
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
1758
 
        self.vfs_transport_factory = default_transport
1759
 
        self.transport_server = None
1760
 
        self.transport_readonly_server = None
1761
 
        self.__vfs_server = None
1762
 
 
1763
 
    def get_transport(self, relpath=None):
1764
 
        """Return a writeable transport.
1765
 
 
1766
 
        This transport is for the test scratch space relative to
1767
 
        "self._test_root"
1768
 
        
1769
 
        :param relpath: a path relative to the base url.
1770
 
        """
1771
 
        t = get_transport(self.get_url(relpath))
1772
 
        self.assertFalse(t.is_readonly())
1773
 
        return t
1774
 
 
1775
 
    def get_readonly_transport(self, relpath=None):
1776
 
        """Return a readonly transport for the test scratch space
1777
 
        
1778
 
        This can be used to test that operations which should only need
1779
 
        readonly access in fact do not try to write.
1780
 
 
1781
 
        :param relpath: a path relative to the base url.
1782
 
        """
1783
 
        t = get_transport(self.get_readonly_url(relpath))
1784
 
        self.assertTrue(t.is_readonly())
1785
 
        return t
1786
 
 
1787
 
    def create_transport_readonly_server(self):
1788
 
        """Create a transport server from class defined at init.
1789
 
 
1790
 
        This is mostly a hook for daughter classes.
1791
 
        """
1792
 
        return self.transport_readonly_server()
1793
 
 
1794
 
    def get_readonly_server(self):
1795
 
        """Get the server instance for the readonly transport
1796
 
 
1797
 
        This is useful for some tests with specific servers to do diagnostics.
1798
 
        """
1799
 
        if self.__readonly_server is None:
1800
 
            if self.transport_readonly_server is None:
1801
 
                # readonly decorator requested
1802
 
                # bring up the server
1803
 
                self.__readonly_server = ReadonlyServer()
1804
 
                self.__readonly_server.setUp(self.get_vfs_only_server())
1805
 
            else:
1806
 
                self.__readonly_server = self.create_transport_readonly_server()
1807
 
                self.__readonly_server.setUp(self.get_vfs_only_server())
1808
 
            self.addCleanup(self.__readonly_server.tearDown)
1809
 
        return self.__readonly_server
1810
 
 
1811
 
    def get_readonly_url(self, relpath=None):
1812
 
        """Get a URL for the readonly transport.
1813
 
 
1814
 
        This will either be backed by '.' or a decorator to the transport 
1815
 
        used by self.get_url()
1816
 
        relpath provides for clients to get a path relative to the base url.
1817
 
        These should only be downwards relative, not upwards.
1818
 
        """
1819
 
        base = self.get_readonly_server().get_url()
1820
 
        return self._adjust_url(base, relpath)
1821
 
 
1822
 
    def get_vfs_only_server(self):
1823
 
        """Get the vfs only read/write server instance.
1824
 
 
1825
 
        This is useful for some tests with specific servers that need
1826
 
        diagnostics.
1827
 
 
1828
 
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1829
 
        is no means to override it.
1830
 
        """
1831
 
        if self.__vfs_server is None:
1832
 
            self.__vfs_server = MemoryServer()
1833
 
            self.__vfs_server.setUp()
1834
 
            self.addCleanup(self.__vfs_server.tearDown)
1835
 
        return self.__vfs_server
1836
 
 
1837
 
    def get_server(self):
1838
 
        """Get the read/write server instance.
1839
 
 
1840
 
        This is useful for some tests with specific servers that need
1841
 
        diagnostics.
1842
 
 
1843
 
        This is built from the self.transport_server factory. If that is None,
1844
 
        then the self.get_vfs_server is returned.
1845
 
        """
1846
 
        if self.__server is None:
1847
 
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1848
 
                return self.get_vfs_only_server()
1849
 
            else:
1850
 
                # bring up a decorated means of access to the vfs only server.
1851
 
                self.__server = self.transport_server()
1852
 
                try:
1853
 
                    self.__server.setUp(self.get_vfs_only_server())
1854
 
                except TypeError, e:
1855
 
                    # This should never happen; the try:Except here is to assist
1856
 
                    # developers having to update code rather than seeing an
1857
 
                    # uninformative TypeError.
1858
 
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1859
 
            self.addCleanup(self.__server.tearDown)
1860
 
        return self.__server
1861
 
 
1862
 
    def _adjust_url(self, base, relpath):
1863
 
        """Get a URL (or maybe a path) for the readwrite transport.
1864
 
 
1865
 
        This will either be backed by '.' or to an equivalent non-file based
1866
 
        facility.
1867
 
        relpath provides for clients to get a path relative to the base url.
1868
 
        These should only be downwards relative, not upwards.
1869
 
        """
1870
 
        if relpath is not None and relpath != '.':
1871
 
            if not base.endswith('/'):
1872
 
                base = base + '/'
1873
 
            # XXX: Really base should be a url; we did after all call
1874
 
            # get_url()!  But sometimes it's just a path (from
1875
 
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
1876
 
            # to a non-escaped local path.
1877
 
            if base.startswith('./') or base.startswith('/'):
1878
 
                base += relpath
1879
 
            else:
1880
 
                base += urlutils.escape(relpath)
1881
 
        return base
1882
 
 
1883
 
    def get_url(self, relpath=None):
1884
 
        """Get a URL (or maybe a path) for the readwrite transport.
1885
 
 
1886
 
        This will either be backed by '.' or to an equivalent non-file based
1887
 
        facility.
1888
 
        relpath provides for clients to get a path relative to the base url.
1889
 
        These should only be downwards relative, not upwards.
1890
 
        """
1891
 
        base = self.get_server().get_url()
1892
 
        return self._adjust_url(base, relpath)
1893
 
 
1894
 
    def get_vfs_only_url(self, relpath=None):
1895
 
        """Get a URL (or maybe a path for the plain old vfs transport.
1896
 
 
1897
 
        This will never be a smart protocol.  It always has all the
1898
 
        capabilities of the local filesystem, but it might actually be a
1899
 
        MemoryTransport or some other similar virtual filesystem.
1900
 
 
1901
 
        This is the backing transport (if any) of the server returned by
1902
 
        get_url and get_readonly_url.
1903
 
 
1904
 
        :param relpath: provides for clients to get a path relative to the base
1905
 
            url.  These should only be downwards relative, not upwards.
1906
 
        :return: A URL
1907
 
        """
1908
 
        base = self.get_vfs_only_server().get_url()
1909
 
        return self._adjust_url(base, relpath)
1910
 
 
1911
 
    def _create_safety_net(self):
1912
 
        """Make a fake bzr directory.
1913
 
 
1914
 
        This prevents any tests propagating up onto the TEST_ROOT directory's
1915
 
        real branch.
1916
 
        """
1917
 
        root = TestCaseWithMemoryTransport.TEST_ROOT
1918
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
1919
 
 
1920
 
    def _check_safety_net(self):
1921
 
        """Check that the safety .bzr directory have not been touched.
1922
 
 
1923
 
        _make_test_root have created a .bzr directory to prevent tests from
1924
 
        propagating. This method ensures than a test did not leaked.
1925
 
        """
1926
 
        root = TestCaseWithMemoryTransport.TEST_ROOT
1927
 
        wt = workingtree.WorkingTree.open(root)
1928
 
        last_rev = wt.last_revision()
1929
 
        if last_rev != 'null:':
1930
 
            # The current test have modified the /bzr directory, we need to
1931
 
            # recreate a new one or all the followng tests will fail.
1932
 
            # If you need to inspect its content uncomment the following line
1933
 
            # import pdb; pdb.set_trace()
1934
 
            _rmtree_temp_dir(root + '/.bzr')
1935
 
            self._create_safety_net()
1936
 
            raise AssertionError('%s/.bzr should not be modified' % root)
1937
 
 
1938
 
    def _make_test_root(self):
1939
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
1940
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1941
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
1942
 
 
1943
 
            self._create_safety_net()
1944
 
 
1945
 
            # The same directory is used by all tests, and we're not
1946
 
            # specifically told when all tests are finished.  This will do.
1947
 
            atexit.register(_rmtree_temp_dir, root)
1948
 
 
1949
 
        self.addCleanup(self._check_safety_net)
1950
 
 
1951
 
    def makeAndChdirToTestDir(self):
1952
 
        """Create a temporary directories for this one test.
1953
 
        
1954
 
        This must set self.test_home_dir and self.test_dir and chdir to
1955
 
        self.test_dir.
1956
 
        
1957
 
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1958
 
        """
1959
 
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1960
 
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1961
 
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1962
 
        
1963
 
    def make_branch(self, relpath, format=None):
1964
 
        """Create a branch on the transport at relpath."""
1965
 
        repo = self.make_repository(relpath, format=format)
1966
 
        return repo.bzrdir.create_branch()
1967
 
 
1968
 
    def make_bzrdir(self, relpath, format=None):
1969
 
        try:
1970
 
            # might be a relative or absolute path
1971
 
            maybe_a_url = self.get_url(relpath)
1972
 
            segments = maybe_a_url.rsplit('/', 1)
1973
 
            t = get_transport(maybe_a_url)
1974
 
            if len(segments) > 1 and segments[-1] not in ('', '.'):
1975
 
                t.ensure_base()
1976
 
            if format is None:
1977
 
                format = 'default'
1978
 
            if isinstance(format, basestring):
1979
 
                format = bzrdir.format_registry.make_bzrdir(format)
1980
 
            return format.initialize_on_transport(t)
1981
 
        except errors.UninitializableFormat:
1982
 
            raise TestSkipped("Format %s is not initializable." % format)
1983
 
 
1984
 
    def make_repository(self, relpath, shared=False, format=None):
1985
 
        """Create a repository on our default transport at relpath.
1986
 
        
1987
 
        Note that relpath must be a relative path, not a full url.
1988
 
        """
1989
 
        # FIXME: If you create a remoterepository this returns the underlying
1990
 
        # real format, which is incorrect.  Actually we should make sure that 
1991
 
        # RemoteBzrDir returns a RemoteRepository.
1992
 
        # maybe  mbp 20070410
1993
 
        made_control = self.make_bzrdir(relpath, format=format)
1994
 
        return made_control.create_repository(shared=shared)
1995
 
 
1996
 
    def make_branch_and_memory_tree(self, relpath, format=None):
1997
 
        """Create a branch on the default transport and a MemoryTree for it."""
1998
 
        b = self.make_branch(relpath, format=format)
1999
 
        return memorytree.MemoryTree.create_on_branch(b)
2000
 
 
2001
 
    def overrideEnvironmentForTesting(self):
2002
 
        os.environ['HOME'] = self.test_home_dir
2003
 
        os.environ['BZR_HOME'] = self.test_home_dir
2004
 
        
2005
 
    def setUp(self):
2006
 
        super(TestCaseWithMemoryTransport, self).setUp()
2007
 
        self._make_test_root()
2008
 
        _currentdir = os.getcwdu()
2009
 
        def _leaveDirectory():
2010
 
            os.chdir(_currentdir)
2011
 
        self.addCleanup(_leaveDirectory)
2012
 
        self.makeAndChdirToTestDir()
2013
 
        self.overrideEnvironmentForTesting()
2014
 
        self.__readonly_server = None
2015
 
        self.__server = None
2016
 
        self.reduceLockdirTimeout()
 
417
 
 
418
BzrTestBase = TestCase
2017
419
 
2018
420
     
2019
 
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
421
class TestCaseInTempDir(TestCase):
2020
422
    """Derived class that runs a test within a temporary directory.
2021
423
 
2022
424
    This is useful for tests that need to create a branch, etc.
2026
428
    All test cases create their own directory within that.  If the
2027
429
    tests complete successfully, the directory is removed.
2028
430
 
2029
 
    :ivar test_base_dir: The path of the top-level directory for this 
2030
 
    test, which contains a home directory and a work directory.
2031
 
 
2032
 
    :ivar test_home_dir: An initially empty directory under test_base_dir
2033
 
    which is used as $HOME for this test.
2034
 
 
2035
 
    :ivar test_dir: A directory under test_base_dir used as the current
2036
 
    directory when the test proper is run.
 
431
    InTempDir is an old alias for FunctionalTestCase.
2037
432
    """
2038
433
 
 
434
    TEST_ROOT = None
 
435
    _TEST_NAME = 'test'
2039
436
    OVERRIDE_PYTHON = 'python'
2040
437
 
2041
438
    def check_file_contents(self, filename, expect):
2046
443
            self.log("actually: %r" % contents)
2047
444
            self.fail("contents of %s not as expected" % filename)
2048
445
 
2049
 
    def makeAndChdirToTestDir(self):
2050
 
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2051
 
        
2052
 
        For TestCaseInTempDir we create a temporary directory based on the test
2053
 
        name and then create two subdirs - test and home under it.
2054
 
        """
2055
 
        # create a directory within the top level test directory
2056
 
        candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
2057
 
        # now create test and home directories within this dir
2058
 
        self.test_base_dir = candidate_dir
2059
 
        self.test_home_dir = self.test_base_dir + '/home'
2060
 
        os.mkdir(self.test_home_dir)
2061
 
        self.test_dir = self.test_base_dir + '/work'
 
446
    def _make_test_root(self):
 
447
        if TestCaseInTempDir.TEST_ROOT is not None:
 
448
            return
 
449
        i = 0
 
450
        while True:
 
451
            root = u'test%04d.tmp' % i
 
452
            try:
 
453
                os.mkdir(root)
 
454
            except OSError, e:
 
455
                if e.errno == errno.EEXIST:
 
456
                    i += 1
 
457
                    continue
 
458
                else:
 
459
                    raise
 
460
            # successfully created
 
461
            TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
 
462
            break
 
463
        # make a fake bzr directory there to prevent any tests propagating
 
464
        # up onto the source directory's real branch
 
465
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
 
466
 
 
467
    def setUp(self):
 
468
        super(TestCaseInTempDir, self).setUp()
 
469
        self._make_test_root()
 
470
        _currentdir = os.getcwdu()
 
471
        short_id = self.id().replace('bzrlib.selftest.', '') \
 
472
                   .replace('__main__.', '')
 
473
        self.test_dir = os.path.join(self.TEST_ROOT, short_id)
2062
474
        os.mkdir(self.test_dir)
2063
475
        os.chdir(self.test_dir)
2064
 
        # put name of test inside
2065
 
        f = file(self.test_base_dir + '/name', 'w')
2066
 
        try:
2067
 
            f.write(self.id())
2068
 
        finally:
2069
 
            f.close()
2070
 
        self.addCleanup(self.deleteTestDir)
2071
 
 
2072
 
    def deleteTestDir(self):
2073
 
        os.chdir(self.TEST_ROOT)
2074
 
        _rmtree_temp_dir(self.test_base_dir)
2075
 
 
2076
 
    def build_tree(self, shape, line_endings='binary', transport=None):
 
476
        os.environ['HOME'] = self.test_dir
 
477
        def _leaveDirectory():
 
478
            os.chdir(_currentdir)
 
479
        self.addCleanup(_leaveDirectory)
 
480
        
 
481
    def build_tree(self, shape):
2077
482
        """Build a test tree according to a pattern.
2078
483
 
2079
484
        shape is a sequence of file specifications.  If the final
2080
485
        character is '/', a directory is created.
2081
486
 
2082
 
        This assumes that all the elements in the tree being built are new.
2083
 
 
2084
487
        This doesn't add anything to a branch.
2085
 
 
2086
 
        :type shape:    list or tuple.
2087
 
        :param line_endings: Either 'binary' or 'native'
2088
 
            in binary mode, exact contents are written in native mode, the
2089
 
            line endings match the default platform endings.
2090
 
        :param transport: A transport to write to, for building trees on VFS's.
2091
 
            If the transport is readonly or None, "." is opened automatically.
2092
 
        :return: None
2093
488
        """
2094
 
        if type(shape) not in (list, tuple):
2095
 
            raise AssertionError("Parameter 'shape' should be "
2096
 
                "a list or a tuple. Got %r instead" % (shape,))
2097
 
        # It's OK to just create them using forward slashes on windows.
2098
 
        if transport is None or transport.is_readonly():
2099
 
            transport = get_transport(".")
 
489
        # XXX: It's OK to just create them using forward slashes on windows?
2100
490
        for name in shape:
2101
491
            self.assert_(isinstance(name, basestring))
2102
492
            if name[-1] == '/':
2103
 
                transport.mkdir(urlutils.escape(name[:-1]))
 
493
                os.mkdir(name[:-1])
2104
494
            else:
2105
 
                if line_endings == 'binary':
2106
 
                    end = '\n'
2107
 
                elif line_endings == 'native':
2108
 
                    end = os.linesep
2109
 
                else:
2110
 
                    raise errors.BzrError(
2111
 
                        'Invalid line ending request %r' % line_endings)
2112
 
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2113
 
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
495
                f = file(name, 'wt')
 
496
                print >>f, "contents of", name
 
497
                f.close()
2114
498
 
2115
499
    def build_tree_contents(self, shape):
2116
 
        build_tree_contents(shape)
2117
 
 
2118
 
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2119
 
        """Assert whether path or paths are in the WorkingTree"""
2120
 
        if tree is None:
2121
 
            tree = workingtree.WorkingTree.open(root_path)
2122
 
        if not isinstance(path, basestring):
2123
 
            for p in path:
2124
 
                self.assertInWorkingTree(p,tree=tree)
2125
 
        else:
2126
 
            self.assertIsNot(tree.path2id(path), None,
2127
 
                path+' not in working tree.')
2128
 
 
2129
 
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2130
 
        """Assert whether path or paths are not in the WorkingTree"""
2131
 
        if tree is None:
2132
 
            tree = workingtree.WorkingTree.open(root_path)
2133
 
        if not isinstance(path, basestring):
2134
 
            for p in path:
2135
 
                self.assertNotInWorkingTree(p,tree=tree)
2136
 
        else:
2137
 
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
2138
 
 
2139
 
 
2140
 
class TestCaseWithTransport(TestCaseInTempDir):
2141
 
    """A test case that provides get_url and get_readonly_url facilities.
2142
 
 
2143
 
    These back onto two transport servers, one for readonly access and one for
2144
 
    read write access.
2145
 
 
2146
 
    If no explicit class is provided for readonly access, a
2147
 
    ReadonlyTransportDecorator is used instead which allows the use of non disk
2148
 
    based read write transports.
2149
 
 
2150
 
    If an explicit class is provided for readonly access, that server and the 
2151
 
    readwrite one must both define get_url() as resolving to os.getcwd().
2152
 
    """
2153
 
 
2154
 
    def get_vfs_only_server(self):
2155
 
        """See TestCaseWithMemoryTransport.
2156
 
 
2157
 
        This is useful for some tests with specific servers that need
2158
 
        diagnostics.
2159
 
        """
2160
 
        if self.__vfs_server is None:
2161
 
            self.__vfs_server = self.vfs_transport_factory()
2162
 
            self.__vfs_server.setUp()
2163
 
            self.addCleanup(self.__vfs_server.tearDown)
2164
 
        return self.__vfs_server
2165
 
 
2166
 
    def make_branch_and_tree(self, relpath, format=None):
2167
 
        """Create a branch on the transport and a tree locally.
2168
 
 
2169
 
        If the transport is not a LocalTransport, the Tree can't be created on
2170
 
        the transport.  In that case if the vfs_transport_factory is
2171
 
        LocalURLServer the working tree is created in the local
2172
 
        directory backing the transport, and the returned tree's branch and
2173
 
        repository will also be accessed locally. Otherwise a lightweight
2174
 
        checkout is created and returned.
2175
 
 
2176
 
        :param format: The BzrDirFormat.
2177
 
        :returns: the WorkingTree.
2178
 
        """
2179
 
        # TODO: always use the local disk path for the working tree,
2180
 
        # this obviously requires a format that supports branch references
2181
 
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2182
 
        # RBC 20060208
2183
 
        b = self.make_branch(relpath, format=format)
2184
 
        try:
2185
 
            return b.bzrdir.create_workingtree()
2186
 
        except errors.NotLocalUrl:
2187
 
            # We can only make working trees locally at the moment.  If the
2188
 
            # transport can't support them, then we keep the non-disk-backed
2189
 
            # branch and create a local checkout.
2190
 
            if self.vfs_transport_factory is LocalURLServer:
2191
 
                # the branch is colocated on disk, we cannot create a checkout.
2192
 
                # hopefully callers will expect this.
2193
 
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2194
 
                return local_controldir.create_workingtree()
2195
 
            else:
2196
 
                return b.create_checkout(relpath, lightweight=True)
2197
 
 
2198
 
    def assertIsDirectory(self, relpath, transport):
2199
 
        """Assert that relpath within transport is a directory.
2200
 
 
2201
 
        This may not be possible on all transports; in that case it propagates
2202
 
        a TransportNotPossible.
2203
 
        """
2204
 
        try:
2205
 
            mode = transport.stat(relpath).st_mode
2206
 
        except errors.NoSuchFile:
2207
 
            self.fail("path %s is not a directory; no such file"
2208
 
                      % (relpath))
2209
 
        if not stat.S_ISDIR(mode):
2210
 
            self.fail("path %s is not a directory; has mode %#o"
2211
 
                      % (relpath, mode))
2212
 
 
2213
 
    def assertTreesEqual(self, left, right):
2214
 
        """Check that left and right have the same content and properties."""
2215
 
        # we use a tree delta to check for equality of the content, and we
2216
 
        # manually check for equality of other things such as the parents list.
2217
 
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2218
 
        differences = left.changes_from(right)
2219
 
        self.assertFalse(differences.has_changed(),
2220
 
            "Trees %r and %r are different: %r" % (left, right, differences))
2221
 
 
2222
 
    def setUp(self):
2223
 
        super(TestCaseWithTransport, self).setUp()
2224
 
        self.__vfs_server = None
2225
 
 
2226
 
 
2227
 
class ChrootedTestCase(TestCaseWithTransport):
2228
 
    """A support class that provides readonly urls outside the local namespace.
2229
 
 
2230
 
    This is done by checking if self.transport_server is a MemoryServer. if it
2231
 
    is then we are chrooted already, if it is not then an HttpServer is used
2232
 
    for readonly urls.
2233
 
 
2234
 
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2235
 
                       be used without needed to redo it when a different 
2236
 
                       subclass is in use ?
2237
 
    """
2238
 
 
2239
 
    def setUp(self):
2240
 
        super(ChrootedTestCase, self).setUp()
2241
 
        if not self.vfs_transport_factory == MemoryServer:
2242
 
            self.transport_readonly_server = HttpServer
2243
 
 
2244
 
 
2245
 
def condition_id_re(pattern):
2246
 
    """Create a condition filter which performs a re check on a test's id.
2247
 
    
2248
 
    :param pattern: A regular expression string.
2249
 
    :return: A callable that returns True if the re matches.
2250
 
    """
2251
 
    filter_re = re.compile(pattern)
2252
 
    def condition(test):
2253
 
        test_id = test.id()
2254
 
        return filter_re.search(test_id)
2255
 
    return condition
2256
 
 
2257
 
 
2258
 
def condition_isinstance(klass_or_klass_list):
2259
 
    """Create a condition filter which returns isinstance(param, klass).
2260
 
    
2261
 
    :return: A callable which when called with one parameter obj return the
2262
 
        result of isinstance(obj, klass_or_klass_list).
2263
 
    """
2264
 
    def condition(obj):
2265
 
        return isinstance(obj, klass_or_klass_list)
2266
 
    return condition
2267
 
 
2268
 
 
2269
 
def condition_id_in_list(id_list):
2270
 
    """Create a condition filter which verify that test's id in a list.
2271
 
    
2272
 
    :param name: A TestIdList object.
2273
 
    :return: A callable that returns True if the test's id appears in the list.
2274
 
    """
2275
 
    def condition(test):
2276
 
        return id_list.test_in(test.id())
2277
 
    return condition
2278
 
 
2279
 
 
2280
 
def exclude_tests_by_condition(suite, condition):
2281
 
    """Create a test suite which excludes some tests from suite.
2282
 
 
2283
 
    :param suite: The suite to get tests from.
2284
 
    :param condition: A callable whose result evaluates True when called with a
2285
 
        test case which should be excluded from the result.
2286
 
    :return: A suite which contains the tests found in suite that fail
2287
 
        condition.
2288
 
    """
2289
 
    result = []
2290
 
    for test in iter_suite_tests(suite):
2291
 
        if not condition(test):
2292
 
            result.append(test)
2293
 
    return TestUtil.TestSuite(result)
2294
 
 
2295
 
 
2296
 
def filter_suite_by_condition(suite, condition):
2297
 
    """Create a test suite by filtering another one.
2298
 
    
2299
 
    :param suite: The source suite.
2300
 
    :param condition: A callable whose result evaluates True when called with a
2301
 
        test case which should be included in the result.
2302
 
    :return: A suite which contains the tests found in suite that pass
2303
 
        condition.
2304
 
    """ 
2305
 
    result = []
2306
 
    for test in iter_suite_tests(suite):
2307
 
        if condition(test):
2308
 
            result.append(test)
2309
 
    return TestUtil.TestSuite(result)
2310
 
 
2311
 
 
2312
 
def filter_suite_by_re(suite, pattern, exclude_pattern=DEPRECATED_PARAMETER,
2313
 
                       random_order=DEPRECATED_PARAMETER):
2314
 
    """Create a test suite by filtering another one.
2315
 
    
2316
 
    :param suite:           the source suite
2317
 
    :param pattern:         pattern that names must match
2318
 
    :param exclude_pattern: A pattern that names must not match. This parameter
2319
 
        is deprecated as of bzrlib 1.0. Please use the separate function
2320
 
        exclude_tests_by_re instead.
2321
 
    :param random_order:    If True, tests in the new suite will be put in
2322
 
        random order. This parameter is deprecated as of bzrlib 1.0. Please
2323
 
        use the separate function randomize_suite instead.
2324
 
    :returns: the newly created suite
2325
 
    """ 
2326
 
    if deprecated_passed(exclude_pattern):
2327
 
        symbol_versioning.warn(
2328
 
            one_zero % "passing exclude_pattern to filter_suite_by_re",
2329
 
                DeprecationWarning, stacklevel=2)
2330
 
        if exclude_pattern is not None:
2331
 
            suite = exclude_tests_by_re(suite, exclude_pattern)
2332
 
    condition = condition_id_re(pattern)
2333
 
    result_suite = filter_suite_by_condition(suite, condition)
2334
 
    if deprecated_passed(random_order):
2335
 
        symbol_versioning.warn(
2336
 
            one_zero % "passing random_order to filter_suite_by_re",
2337
 
                DeprecationWarning, stacklevel=2)
2338
 
        if random_order:
2339
 
            result_suite = randomize_suite(result_suite)
2340
 
    return result_suite
2341
 
 
2342
 
 
2343
 
def filter_suite_by_id_list(suite, test_id_list):
2344
 
    """Create a test suite by filtering another one.
2345
 
 
2346
 
    :param suite: The source suite.
2347
 
    :param test_id_list: A list of the test ids to keep as strings.
2348
 
    :returns: the newly created suite
2349
 
    """
2350
 
    condition = condition_id_in_list(test_id_list)
2351
 
    result_suite = filter_suite_by_condition(suite, condition)
2352
 
    return result_suite
2353
 
 
2354
 
 
2355
 
def exclude_tests_by_re(suite, pattern):
2356
 
    """Create a test suite which excludes some tests from suite.
2357
 
 
2358
 
    :param suite: The suite to get tests from.
2359
 
    :param pattern: A regular expression string. Test ids that match this
2360
 
        pattern will be excluded from the result.
2361
 
    :return: A TestSuite that contains all the tests from suite without the
2362
 
        tests that matched pattern. The order of tests is the same as it was in
2363
 
        suite.
2364
 
    """
2365
 
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
2366
 
 
2367
 
 
2368
 
def preserve_input(something):
2369
 
    """A helper for performing test suite transformation chains.
2370
 
 
2371
 
    :param something: Anything you want to preserve.
2372
 
    :return: Something.
2373
 
    """
2374
 
    return something
2375
 
 
2376
 
 
2377
 
def randomize_suite(suite):
2378
 
    """Return a new TestSuite with suite's tests in random order.
2379
 
    
2380
 
    The tests in the input suite are flattened into a single suite in order to
2381
 
    accomplish this. Any nested TestSuites are removed to provide global
2382
 
    randomness.
2383
 
    """
2384
 
    tests = list(iter_suite_tests(suite))
2385
 
    random.shuffle(tests)
2386
 
    return TestUtil.TestSuite(tests)
2387
 
 
2388
 
 
2389
 
@deprecated_function(one_zero)
2390
 
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2391
 
                     random_order=False, append_rest=True):
2392
 
    """DEPRECATED: Create a test suite by sorting another one.
2393
 
 
2394
 
    This method has been decomposed into separate helper methods that should be
2395
 
    called directly:
2396
 
     - filter_suite_by_re
2397
 
     - exclude_tests_by_re
2398
 
     - randomize_suite
2399
 
     - split_suite_by_re
2400
 
    
2401
 
    :param suite:           the source suite
2402
 
    :param pattern:         pattern that names must match in order to go
2403
 
                            first in the new suite
2404
 
    :param exclude_pattern: pattern that names must not match, if any
2405
 
    :param random_order:    if True, tests in the new suite will be put in
2406
 
                            random order (with all tests matching pattern
2407
 
                            first).
2408
 
    :param append_rest:     if False, pattern is a strict filter and not
2409
 
                            just an ordering directive
2410
 
    :returns: the newly created suite
2411
 
    """ 
2412
 
    if exclude_pattern is not None:
2413
 
        suite = exclude_tests_by_re(suite, exclude_pattern)
2414
 
    if random_order:
2415
 
        order_changer = randomize_suite
2416
 
    else:
2417
 
        order_changer = preserve_input
2418
 
    if append_rest:
2419
 
        suites = map(order_changer, split_suite_by_re(suite, pattern))
2420
 
        return TestUtil.TestSuite(suites)
2421
 
    else:
2422
 
        return order_changer(filter_suite_by_re(suite, pattern))
2423
 
 
2424
 
 
2425
 
def split_suite_by_re(suite, pattern):
2426
 
    """Split a test suite into two by a regular expression.
2427
 
    
2428
 
    :param suite: The suite to split.
2429
 
    :param pattern: A regular expression string. Test ids that match this
2430
 
        pattern will be in the first test suite returned, and the others in the
2431
 
        second test suite returned.
2432
 
    :return: A tuple of two test suites, where the first contains tests from
2433
 
        suite matching pattern, and the second contains the remainder from
2434
 
        suite. The order within each output suite is the same as it was in
2435
 
        suite.
2436
 
    """ 
2437
 
    matched = []
2438
 
    did_not_match = []
2439
 
    filter_re = re.compile(pattern)
2440
 
    for test in iter_suite_tests(suite):
2441
 
        test_id = test.id()
2442
 
        if filter_re.search(test_id):
2443
 
            matched.append(test)
2444
 
        else:
2445
 
            did_not_match.append(test)
2446
 
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
 
500
        bzrlib.selftest.build_tree_contents(shape)
 
501
 
 
502
    def failUnlessExists(self, path):
 
503
        """Fail unless path, which may be abs or relative, exists."""
 
504
        self.failUnless(osutils.lexists(path))
 
505
        
 
506
    def assertFileEqual(self, content, path):
 
507
        """Fail if path does not contain 'content'."""
 
508
        self.failUnless(osutils.lexists(path))
 
509
        self.assertEqualDiff(content, open(path, 'r').read())
 
510
        
 
511
 
 
512
class MetaTestLog(TestCase):
 
513
    def test_logging(self):
 
514
        """Test logs are captured when a test fails."""
 
515
        logging.info('an info message')
 
516
        warning('something looks dodgy...')
 
517
        logging.debug('hello, test is running')
 
518
 
 
519
 
 
520
def filter_suite_by_re(suite, pattern):
 
521
    result = TestUtil.TestSuite()
 
522
    filter_re = re.compile(pattern)
 
523
    for test in iter_suite_tests(suite):
 
524
        if filter_re.search(test.id()):
 
525
            result.addTest(test)
 
526
    return result
2447
527
 
2448
528
 
2449
529
def run_suite(suite, name='test', verbose=False, pattern=".*",
2450
 
              stop_on_failure=False,
2451
 
              transport=None, lsprof_timed=None, bench_history=None,
2452
 
              matching_tests_first=None,
2453
 
              list_only=False,
2454
 
              random_seed=None,
2455
 
              exclude_pattern=None,
2456
 
              strict=False):
2457
 
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
530
              stop_on_failure=False):
 
531
    TestCaseInTempDir._TEST_NAME = name
2458
532
    if verbose:
2459
533
        verbosity = 2
2460
534
    else:
2461
535
        verbosity = 1
2462
536
    runner = TextTestRunner(stream=sys.stdout,
2463
537
                            descriptions=0,
2464
 
                            verbosity=verbosity,
2465
 
                            bench_history=bench_history,
2466
 
                            list_only=list_only,
2467
 
                            )
 
538
                            verbosity=verbosity)
2468
539
    runner.stop_on_failure=stop_on_failure
2469
 
    # Initialise the random number generator and display the seed used.
2470
 
    # We convert the seed to a long to make it reuseable across invocations.
2471
 
    random_order = False
2472
 
    if random_seed is not None:
2473
 
        random_order = True
2474
 
        if random_seed == "now":
2475
 
            random_seed = long(time.time())
2476
 
        else:
2477
 
            # Convert the seed to a long if we can
2478
 
            try:
2479
 
                random_seed = long(random_seed)
2480
 
            except:
2481
 
                pass
2482
 
        runner.stream.writeln("Randomizing test order using seed %s\n" %
2483
 
            (random_seed))
2484
 
        random.seed(random_seed)
2485
 
    # Customise the list of tests if requested
2486
 
    if exclude_pattern is not None:
2487
 
        suite = exclude_tests_by_re(suite, exclude_pattern)
2488
 
    if random_order:
2489
 
        order_changer = randomize_suite
2490
 
    else:
2491
 
        order_changer = preserve_input
2492
 
    if pattern != '.*' or random_order:
2493
 
        if matching_tests_first:
2494
 
            suites = map(order_changer, split_suite_by_re(suite, pattern))
2495
 
            suite = TestUtil.TestSuite(suites)
2496
 
        else:
2497
 
            suite = order_changer(filter_suite_by_re(suite, pattern))
2498
 
 
 
540
    if pattern != '.*':
 
541
        suite = filter_suite_by_re(suite, pattern)
2499
542
    result = runner.run(suite)
2500
 
 
2501
 
    if strict:
2502
 
        return result.wasStrictlySuccessful()
2503
 
 
 
543
    # This is still a little bogus, 
 
544
    # but only a little. Folk not using our testrunner will
 
545
    # have to delete their temp directories themselves.
 
546
    if result.wasSuccessful():
 
547
        if TestCaseInTempDir.TEST_ROOT is not None:
 
548
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
549
    else:
 
550
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
2504
551
    return result.wasSuccessful()
2505
552
 
2506
553
 
2507
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2508
 
             transport=None,
2509
 
             test_suite_factory=None,
2510
 
             lsprof_timed=None,
2511
 
             bench_history=None,
2512
 
             matching_tests_first=None,
2513
 
             list_only=False,
2514
 
             random_seed=None,
2515
 
             exclude_pattern=None,
2516
 
             strict=False,
2517
 
             load_list=None,
2518
 
             ):
 
554
def selftest(verbose=False, pattern=".*", stop_on_failure=True):
2519
555
    """Run the whole test suite under the enhanced runner"""
2520
 
    # XXX: Very ugly way to do this...
2521
 
    # Disable warning about old formats because we don't want it to disturb
2522
 
    # any blackbox tests.
2523
 
    from bzrlib import repository
2524
 
    repository._deprecation_warning_done = True
2525
 
 
2526
 
    global default_transport
2527
 
    if transport is None:
2528
 
        transport = default_transport
2529
 
    old_transport = default_transport
2530
 
    default_transport = transport
2531
 
    try:
2532
 
        if load_list is None:
2533
 
            keep_only = None
2534
 
        else:
2535
 
            keep_only = load_test_id_list(load_list)
2536
 
        if test_suite_factory is None:
2537
 
            suite = test_suite(keep_only)
2538
 
        else:
2539
 
            suite = test_suite_factory()
2540
 
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2541
 
                     stop_on_failure=stop_on_failure,
2542
 
                     transport=transport,
2543
 
                     lsprof_timed=lsprof_timed,
2544
 
                     bench_history=bench_history,
2545
 
                     matching_tests_first=matching_tests_first,
2546
 
                     list_only=list_only,
2547
 
                     random_seed=random_seed,
2548
 
                     exclude_pattern=exclude_pattern,
2549
 
                     strict=strict)
2550
 
    finally:
2551
 
        default_transport = old_transport
2552
 
 
2553
 
 
2554
 
def load_test_id_list(file_name):
2555
 
    """Load a test id list from a text file.
2556
 
 
2557
 
    The format is one test id by line.  No special care is taken to impose
2558
 
    strict rules, these test ids are used to filter the test suite so a test id
2559
 
    that do not match an existing test will do no harm. This allows user to add
2560
 
    comments, leave blank lines, etc.
2561
 
    """
2562
 
    test_list = []
2563
 
    try:
2564
 
        ftest = open(file_name, 'rt')
2565
 
    except IOError, e:
2566
 
        if e.errno != errno.ENOENT:
2567
 
            raise
2568
 
        else:
2569
 
            raise errors.NoSuchFile(file_name)
2570
 
 
2571
 
    for test_name in ftest.readlines():
2572
 
        test_list.append(test_name.strip())
2573
 
    ftest.close()
2574
 
    return test_list
2575
 
 
2576
 
 
2577
 
class TestIdList(object):
2578
 
    """Test id list to filter a test suite.
2579
 
 
2580
 
    Relying on the assumption that test ids are built as:
2581
 
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2582
 
    notation, this class offers methods to :
2583
 
    - avoid building a test suite for modules not refered to in the test list,
2584
 
    - keep only the tests listed from the module test suite.
2585
 
    """
2586
 
 
2587
 
    def __init__(self, test_id_list):
2588
 
        # When a test suite needs to be filtered against us we compare test ids
2589
 
        # for equality, so a simple dict offers a quick and simple solution.
2590
 
        self.tests = dict().fromkeys(test_id_list, True)
2591
 
 
2592
 
        # While unittest.TestCase have ids like:
2593
 
        # <module>.<class>.<method>[(<param+)],
2594
 
        # doctest.DocTestCase can have ids like:
2595
 
        # <module>
2596
 
        # <module>.<class>
2597
 
        # <module>.<function>
2598
 
        # <module>.<class>.<method>
2599
 
 
2600
 
        # Since we can't predict a test class from its name only, we settle on
2601
 
        # a simple constraint: a test id always begins with its module name.
2602
 
 
2603
 
        modules = {}
2604
 
        for test_id in test_id_list:
2605
 
            parts = test_id.split('.')
2606
 
            mod_name = parts.pop(0)
2607
 
            modules[mod_name] = True
2608
 
            for part in parts:
2609
 
                mod_name += '.' + part
2610
 
                modules[mod_name] = True
2611
 
        self.modules = modules
2612
 
 
2613
 
    def is_module_name_used(self, module_name):
2614
 
        """Is there tests for the module or one of its sub modules."""
2615
 
        return self.modules.has_key(module_name)
2616
 
 
2617
 
    def test_in(self, test_id):
2618
 
        return self.tests.has_key(test_id)
2619
 
 
2620
 
 
2621
 
def test_suite(keep_only=None):
2622
 
    """Build and return TestSuite for the whole of bzrlib.
2623
 
 
2624
 
    :param keep_only: A list of test ids limiting the suite returned.
2625
 
 
2626
 
    This function can be replaced if you need to change the default test
2627
 
    suite on a global basis, but it is not encouraged.
2628
 
    """
2629
 
    testmod_names = [
2630
 
                   'bzrlib.util.tests.test_bencode',
2631
 
                   'bzrlib.tests.test__dirstate_helpers',
2632
 
                   'bzrlib.tests.test_ancestry',
2633
 
                   'bzrlib.tests.test_annotate',
2634
 
                   'bzrlib.tests.test_api',
2635
 
                   'bzrlib.tests.test_atomicfile',
2636
 
                   'bzrlib.tests.test_bad_files',
2637
 
                   'bzrlib.tests.test_bisect_multi',
2638
 
                   'bzrlib.tests.test_branch',
2639
 
                   'bzrlib.tests.test_branchbuilder',
2640
 
                   'bzrlib.tests.test_bugtracker',
2641
 
                   'bzrlib.tests.test_bundle',
2642
 
                   'bzrlib.tests.test_bzrdir',
2643
 
                   'bzrlib.tests.test_cache_utf8',
2644
 
                   'bzrlib.tests.test_commands',
2645
 
                   'bzrlib.tests.test_commit',
2646
 
                   'bzrlib.tests.test_commit_merge',
2647
 
                   'bzrlib.tests.test_config',
2648
 
                   'bzrlib.tests.test_conflicts',
2649
 
                   'bzrlib.tests.test_counted_lock',
2650
 
                   'bzrlib.tests.test_decorators',
2651
 
                   'bzrlib.tests.test_delta',
2652
 
                   'bzrlib.tests.test_deprecated_graph',
2653
 
                   'bzrlib.tests.test_diff',
2654
 
                   'bzrlib.tests.test_dirstate',
2655
 
                   'bzrlib.tests.test_directory_service',
2656
 
                   'bzrlib.tests.test_email_message',
2657
 
                   'bzrlib.tests.test_errors',
2658
 
                   'bzrlib.tests.test_escaped_store',
2659
 
                   'bzrlib.tests.test_extract',
2660
 
                   'bzrlib.tests.test_fetch',
2661
 
                   'bzrlib.tests.test_ftp_transport',
2662
 
                   'bzrlib.tests.test_generate_docs',
2663
 
                   'bzrlib.tests.test_generate_ids',
2664
 
                   'bzrlib.tests.test_globbing',
2665
 
                   'bzrlib.tests.test_gpg',
2666
 
                   'bzrlib.tests.test_graph',
2667
 
                   'bzrlib.tests.test_hashcache',
2668
 
                   'bzrlib.tests.test_help',
2669
 
                   'bzrlib.tests.test_hooks',
2670
 
                   'bzrlib.tests.test_http',
2671
 
                   'bzrlib.tests.test_http_implementations',
2672
 
                   'bzrlib.tests.test_http_response',
2673
 
                   'bzrlib.tests.test_https_ca_bundle',
2674
 
                   'bzrlib.tests.test_identitymap',
2675
 
                   'bzrlib.tests.test_ignores',
2676
 
                   'bzrlib.tests.test_index',
2677
 
                   'bzrlib.tests.test_info',
2678
 
                   'bzrlib.tests.test_inv',
2679
 
                   'bzrlib.tests.test_knit',
2680
 
                   'bzrlib.tests.test_lazy_import',
2681
 
                   'bzrlib.tests.test_lazy_regex',
2682
 
                   'bzrlib.tests.test_lockdir',
2683
 
                   'bzrlib.tests.test_lockable_files',
2684
 
                   'bzrlib.tests.test_log',
2685
 
                   'bzrlib.tests.test_lsprof',
2686
 
                   'bzrlib.tests.test_lru_cache',
2687
 
                   'bzrlib.tests.test_mail_client',
2688
 
                   'bzrlib.tests.test_memorytree',
2689
 
                   'bzrlib.tests.test_merge',
2690
 
                   'bzrlib.tests.test_merge3',
2691
 
                   'bzrlib.tests.test_merge_core',
2692
 
                   'bzrlib.tests.test_merge_directive',
2693
 
                   'bzrlib.tests.test_missing',
2694
 
                   'bzrlib.tests.test_msgeditor',
2695
 
                   'bzrlib.tests.test_multiparent',
2696
 
                   'bzrlib.tests.test_nonascii',
2697
 
                   'bzrlib.tests.test_options',
2698
 
                   'bzrlib.tests.test_osutils',
2699
 
                   'bzrlib.tests.test_osutils_encodings',
2700
 
                   'bzrlib.tests.test_pack',
2701
 
                   'bzrlib.tests.test_patch',
2702
 
                   'bzrlib.tests.test_patches',
2703
 
                   'bzrlib.tests.test_permissions',
2704
 
                   'bzrlib.tests.test_plugins',
2705
 
                   'bzrlib.tests.test_progress',
2706
 
                   'bzrlib.tests.test_reconfigure',
2707
 
                   'bzrlib.tests.test_reconcile',
2708
 
                   'bzrlib.tests.test_registry',
2709
 
                   'bzrlib.tests.test_remote',
2710
 
                   'bzrlib.tests.test_repository',
2711
 
                   'bzrlib.tests.test_revert',
2712
 
                   'bzrlib.tests.test_revision',
2713
 
                   'bzrlib.tests.test_revisionnamespaces',
2714
 
                   'bzrlib.tests.test_revisiontree',
2715
 
                   'bzrlib.tests.test_rio',
2716
 
                   'bzrlib.tests.test_sampler',
2717
 
                   'bzrlib.tests.test_selftest',
2718
 
                   'bzrlib.tests.test_setup',
2719
 
                   'bzrlib.tests.test_sftp_transport',
2720
 
                   'bzrlib.tests.test_smart',
2721
 
                   'bzrlib.tests.test_smart_add',
2722
 
                   'bzrlib.tests.test_smart_transport',
2723
 
                   'bzrlib.tests.test_smtp_connection',
2724
 
                   'bzrlib.tests.test_source',
2725
 
                   'bzrlib.tests.test_ssh_transport',
2726
 
                   'bzrlib.tests.test_status',
2727
 
                   'bzrlib.tests.test_store',
2728
 
                   'bzrlib.tests.test_strace',
2729
 
                   'bzrlib.tests.test_subsume',
2730
 
                   'bzrlib.tests.test_switch',
2731
 
                   'bzrlib.tests.test_symbol_versioning',
2732
 
                   'bzrlib.tests.test_tag',
2733
 
                   'bzrlib.tests.test_testament',
2734
 
                   'bzrlib.tests.test_textfile',
2735
 
                   'bzrlib.tests.test_textmerge',
2736
 
                   'bzrlib.tests.test_timestamp',
2737
 
                   'bzrlib.tests.test_trace',
2738
 
                   'bzrlib.tests.test_transactions',
2739
 
                   'bzrlib.tests.test_transform',
2740
 
                   'bzrlib.tests.test_transport',
2741
 
                   'bzrlib.tests.test_tree',
2742
 
                   'bzrlib.tests.test_treebuilder',
2743
 
                   'bzrlib.tests.test_tsort',
2744
 
                   'bzrlib.tests.test_tuned_gzip',
2745
 
                   'bzrlib.tests.test_ui',
2746
 
                   'bzrlib.tests.test_upgrade',
2747
 
                   'bzrlib.tests.test_urlutils',
2748
 
                   'bzrlib.tests.test_versionedfile',
2749
 
                   'bzrlib.tests.test_version',
2750
 
                   'bzrlib.tests.test_version_info',
2751
 
                   'bzrlib.tests.test_weave',
2752
 
                   'bzrlib.tests.test_whitebox',
2753
 
                   'bzrlib.tests.test_win32utils',
2754
 
                   'bzrlib.tests.test_workingtree',
2755
 
                   'bzrlib.tests.test_workingtree_4',
2756
 
                   'bzrlib.tests.test_wsgi',
2757
 
                   'bzrlib.tests.test_xml',
 
556
    return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
 
557
                     stop_on_failure=stop_on_failure)
 
558
 
 
559
 
 
560
def test_suite():
 
561
    """Build and return TestSuite for the whole program."""
 
562
    import bzrlib.store, bzrlib.inventory, bzrlib.branch
 
563
    import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
 
564
    from doctest import DocTestSuite
 
565
 
 
566
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
 
567
 
 
568
    testmod_names = \
 
569
                  ['bzrlib.selftest.MetaTestLog',
 
570
                   'bzrlib.selftest.testapi',
 
571
                   'bzrlib.selftest.testgpg',
 
572
                   'bzrlib.selftest.testidentitymap',
 
573
                   'bzrlib.selftest.testinv',
 
574
                   'bzrlib.selftest.test_ancestry',
 
575
                   'bzrlib.selftest.test_commit',
 
576
                   'bzrlib.selftest.test_command',
 
577
                   'bzrlib.selftest.test_commit_merge',
 
578
                   'bzrlib.selftest.testconfig',
 
579
                   'bzrlib.selftest.versioning',
 
580
                   'bzrlib.selftest.testmerge3',
 
581
                   'bzrlib.selftest.testmerge',
 
582
                   'bzrlib.selftest.testhashcache',
 
583
                   'bzrlib.selftest.teststatus',
 
584
                   'bzrlib.selftest.testlog',
 
585
                   'bzrlib.selftest.testrevisionnamespaces',
 
586
                   'bzrlib.selftest.testbranch',
 
587
                   'bzrlib.selftest.testrevision',
 
588
                   'bzrlib.selftest.test_revision_info',
 
589
                   'bzrlib.selftest.test_merge_core',
 
590
                   'bzrlib.selftest.test_smart_add',
 
591
                   'bzrlib.selftest.test_bad_files',
 
592
                   'bzrlib.selftest.testdiff',
 
593
                   'bzrlib.selftest.test_parent',
 
594
                   'bzrlib.selftest.test_xml',
 
595
                   'bzrlib.selftest.test_weave',
 
596
                   'bzrlib.selftest.testfetch',
 
597
                   'bzrlib.selftest.whitebox',
 
598
                   'bzrlib.selftest.teststore',
 
599
                   'bzrlib.selftest.blackbox',
 
600
                   'bzrlib.selftest.testsampler',
 
601
                   'bzrlib.selftest.testtransactions',
 
602
                   'bzrlib.selftest.testtransport',
 
603
                   'bzrlib.selftest.testsftp',
 
604
                   'bzrlib.selftest.testgraph',
 
605
                   'bzrlib.selftest.testworkingtree',
 
606
                   'bzrlib.selftest.test_upgrade',
 
607
                   'bzrlib.selftest.test_conflicts',
 
608
                   'bzrlib.selftest.testtestament',
 
609
                   'bzrlib.selftest.testannotate',
 
610
                   'bzrlib.selftest.testrevprops',
 
611
                   'bzrlib.selftest.testoptions',
 
612
                   'bzrlib.selftest.testhttp',
 
613
                   'bzrlib.selftest.testnonascii',
 
614
                   'bzrlib.selftest.testreweave',
 
615
                   'bzrlib.selftest.testtsort',
 
616
                   'bzrlib.selftest.testbasicio',
2758
617
                   ]
2759
 
    test_transport_implementations = [
2760
 
        'bzrlib.tests.test_transport_implementations',
2761
 
        'bzrlib.tests.test_read_bundle',
2762
 
        ]
2763
 
    suite = TestUtil.TestSuite()
2764
 
    loader = TestUtil.TestLoader()
2765
 
 
2766
 
    if keep_only is not None:
2767
 
        id_filter = TestIdList(keep_only)
2768
 
 
2769
 
    # modules building their suite with loadTestsFromModuleNames
2770
 
    if keep_only is None:
2771
 
        suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2772
 
    else:
2773
 
        for mod in [m for m in testmod_names
2774
 
                    if id_filter.is_module_name_used(m)]:
2775
 
            mod_suite = loader.loadTestsFromModuleNames([mod])
2776
 
            mod_suite = filter_suite_by_id_list(mod_suite, id_filter)
2777
 
            suite.addTest(mod_suite)
2778
 
 
2779
 
    # modules adapted for transport implementations
2780
 
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2781
 
    adapter = TransportTestProviderAdapter()
2782
 
    if keep_only is None:
2783
 
        adapt_modules(test_transport_implementations, adapter, loader, suite)
2784
 
    else:
2785
 
        for mod in [m for m in test_transport_implementations
2786
 
                    if id_filter.is_module_name_used(m)]:
2787
 
            mod_suite = TestUtil.TestSuite()
2788
 
            adapt_modules([mod], adapter, loader, mod_suite)
2789
 
            mod_suite = filter_suite_by_id_list(mod_suite, id_filter)
2790
 
            suite.addTest(mod_suite)
2791
 
 
2792
 
    # modules defining their own test_suite()
2793
 
    for package in [p for p in packages_to_test()
2794
 
                    if (keep_only is None
2795
 
                        or id_filter.is_module_name_used(p.__name__))]:
2796
 
        pack_suite = package.test_suite()
2797
 
        if keep_only is not None:
2798
 
            pack_suite = filter_suite_by_id_list(pack_suite, id_filter)
2799
 
        suite.addTest(pack_suite)
2800
 
 
2801
 
    # XXX: MODULES_TO_TEST should be obsoleted ?
2802
 
    for mod in [m for m in MODULES_TO_TEST
2803
 
                if keep_only is None or id_filter.is_module_name_used(m)]:
2804
 
        mod_suite = loader.loadTestsFromModule(mod)
2805
 
        if keep_only is not None:
2806
 
            mod_suite = filter_suite_by_id_list(mod_suite, id_filter)
2807
 
        suite.addTest(mod_suite)
2808
 
 
2809
 
    for mod in MODULES_TO_DOCTEST:
2810
 
        try:
2811
 
            doc_suite = doctest.DocTestSuite(mod)
2812
 
        except ValueError, e:
2813
 
            print '**failed to get doctest for: %s\n%s' % (mod, e)
2814
 
            raise
2815
 
        if keep_only is not None:
2816
 
            # DocTests may use ids which doesn't contain the module name
2817
 
            doc_suite = filter_suite_by_id_list(doc_suite, id_filter)
2818
 
        suite.addTest(doc_suite)
2819
 
 
2820
 
    default_encoding = sys.getdefaultencoding()
2821
 
    for name, plugin in bzrlib.plugin.plugins().items():
2822
 
        if keep_only is not None:
2823
 
            if not id_filter.is_module_name_used(plugin.module.__name__):
2824
 
                continue
2825
 
        plugin_suite = plugin.test_suite()
2826
 
        # We used to catch ImportError here and turn it into just a warning,
2827
 
        # but really if you don't have --no-plugins this should be a failure.
2828
 
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2829
 
        if plugin_suite is not None:
2830
 
            if keep_only is not None:
2831
 
                plugin_suite = filter_suite_by_id_list(plugin_suite,
2832
 
                                                       id_filter)
2833
 
            suite.addTest(plugin_suite)
2834
 
        if default_encoding != sys.getdefaultencoding():
2835
 
            bzrlib.trace.warning(
2836
 
                'Plugin "%s" tried to reset default encoding to: %s', name,
2837
 
                sys.getdefaultencoding())
2838
 
            reload(sys)
2839
 
            sys.setdefaultencoding(default_encoding)
2840
 
    return suite
2841
 
 
2842
 
 
2843
 
def multiply_tests_from_modules(module_name_list, scenario_iter):
2844
 
    """Adapt all tests in some given modules to given scenarios.
2845
 
 
2846
 
    This is the recommended public interface for test parameterization.
2847
 
    Typically the test_suite() method for a per-implementation test
2848
 
    suite will call multiply_tests_from_modules and return the 
2849
 
    result.
2850
 
 
2851
 
    :param module_name_list: List of fully-qualified names of test
2852
 
        modules.
2853
 
    :param scenario_iter: Iterable of pairs of (scenario_name, 
2854
 
        scenario_param_dict).
2855
 
 
2856
 
    This returns a new TestSuite containing the cross product of
2857
 
    all the tests in all the modules, each repeated for each scenario.
2858
 
    Each test is adapted by adding the scenario name at the end 
2859
 
    of its name, and updating the test object's __dict__ with the
2860
 
    scenario_param_dict.
2861
 
 
2862
 
    >>> r = multiply_tests_from_modules(
2863
 
    ...     ['bzrlib.tests.test_sampler'],
2864
 
    ...     [('one', dict(param=1)), 
2865
 
    ...      ('two', dict(param=2))])
2866
 
    >>> tests = list(iter_suite_tests(r))
2867
 
    >>> len(tests)
2868
 
    2
2869
 
    >>> tests[0].id()
2870
 
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
2871
 
    >>> tests[0].param
2872
 
    1
2873
 
    >>> tests[1].param
2874
 
    2
2875
 
    """
2876
 
    loader = TestLoader()
 
618
 
 
619
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
 
620
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3,
 
621
              bzrlib.errors,
 
622
              ):
 
623
        if m not in MODULES_TO_DOCTEST:
 
624
            MODULES_TO_DOCTEST.append(m)
 
625
 
 
626
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
 
627
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
 
628
    print
2877
629
    suite = TestSuite()
2878
 
    adapter = TestScenarioApplier()
2879
 
    adapter.scenarios = list(scenario_iter)
2880
 
    adapt_modules(module_name_list, adapter, loader, suite)
 
630
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
 
631
    for m in MODULES_TO_TEST:
 
632
         suite.addTest(TestLoader().loadTestsFromModule(m))
 
633
    for m in (MODULES_TO_DOCTEST):
 
634
        suite.addTest(DocTestSuite(m))
 
635
    for p in bzrlib.plugin.all_plugins:
 
636
        if hasattr(p, 'test_suite'):
 
637
            suite.addTest(p.test_suite())
2881
638
    return suite
2882
639
 
2883
 
 
2884
 
def multiply_scenarios(scenarios_left, scenarios_right):
2885
 
    """Multiply two sets of scenarios.
2886
 
 
2887
 
    :returns: the cartesian product of the two sets of scenarios, that is
2888
 
        a scenario for every possible combination of a left scenario and a
2889
 
        right scenario.
2890
 
    """
2891
 
    return [
2892
 
        ('%s,%s' % (left_name, right_name),
2893
 
         dict(left_dict.items() + right_dict.items()))
2894
 
        for left_name, left_dict in scenarios_left
2895
 
        for right_name, right_dict in scenarios_right]
2896
 
 
2897
 
 
2898
 
 
2899
 
def adapt_modules(mods_list, adapter, loader, suite):
2900
 
    """Adapt the modules in mods_list using adapter and add to suite."""
2901
 
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2902
 
        suite.addTests(adapter.adapt(test))
2903
 
 
2904
 
 
2905
 
def adapt_tests(tests_list, adapter, loader, suite):
2906
 
    """Adapt the tests in tests_list using adapter and add to suite."""
2907
 
    for test in tests_list:
2908
 
        suite.addTests(adapter.adapt(loader.loadTestsFromName(test)))
2909
 
 
2910
 
 
2911
 
def _rmtree_temp_dir(dirname):
2912
 
    # If LANG=C we probably have created some bogus paths
2913
 
    # which rmtree(unicode) will fail to delete
2914
 
    # so make sure we are using rmtree(str) to delete everything
2915
 
    # except on win32, where rmtree(str) will fail
2916
 
    # since it doesn't have the property of byte-stream paths
2917
 
    # (they are either ascii or mbcs)
2918
 
    if sys.platform == 'win32':
2919
 
        # make sure we are using the unicode win32 api
2920
 
        dirname = unicode(dirname)
2921
 
    else:
2922
 
        dirname = dirname.encode(sys.getfilesystemencoding())
2923
 
    try:
2924
 
        osutils.rmtree(dirname)
2925
 
    except OSError, e:
2926
 
        if sys.platform == 'win32' and e.errno == errno.EACCES:
2927
 
            sys.stderr.write(('Permission denied: '
2928
 
                                 'unable to remove testing dir '
2929
 
                                 '%s\n' % os.path.basename(dirname)))
2930
 
        else:
2931
 
            raise
2932
 
 
2933
 
 
2934
 
class Feature(object):
2935
 
    """An operating system Feature."""
2936
 
 
2937
 
    def __init__(self):
2938
 
        self._available = None
2939
 
 
2940
 
    def available(self):
2941
 
        """Is the feature available?
2942
 
 
2943
 
        :return: True if the feature is available.
2944
 
        """
2945
 
        if self._available is None:
2946
 
            self._available = self._probe()
2947
 
        return self._available
2948
 
 
2949
 
    def _probe(self):
2950
 
        """Implement this method in concrete features.
2951
 
 
2952
 
        :return: True if the feature is available.
2953
 
        """
2954
 
        raise NotImplementedError
2955
 
 
2956
 
    def __str__(self):
2957
 
        if getattr(self, 'feature_name', None):
2958
 
            return self.feature_name()
2959
 
        return self.__class__.__name__
2960
 
 
2961
 
 
2962
 
class _SymlinkFeature(Feature):
2963
 
 
2964
 
    def _probe(self):
2965
 
        return osutils.has_symlinks()
2966
 
 
2967
 
    def feature_name(self):
2968
 
        return 'symlinks'
2969
 
 
2970
 
SymlinkFeature = _SymlinkFeature()
2971
 
 
2972
 
 
2973
 
class _HardlinkFeature(Feature):
2974
 
 
2975
 
    def _probe(self):
2976
 
        return osutils.has_hardlinks()
2977
 
 
2978
 
    def feature_name(self):
2979
 
        return 'hardlinks'
2980
 
 
2981
 
HardlinkFeature = _HardlinkFeature()
2982
 
 
2983
 
 
2984
 
class _OsFifoFeature(Feature):
2985
 
 
2986
 
    def _probe(self):
2987
 
        return getattr(os, 'mkfifo', None)
2988
 
 
2989
 
    def feature_name(self):
2990
 
        return 'filesystem fifos'
2991
 
 
2992
 
OsFifoFeature = _OsFifoFeature()
2993
 
 
2994
 
 
2995
 
class TestScenarioApplier(object):
2996
 
    """A tool to apply scenarios to tests."""
2997
 
 
2998
 
    def adapt(self, test):
2999
 
        """Return a TestSuite containing a copy of test for each scenario."""
3000
 
        result = unittest.TestSuite()
3001
 
        for scenario in self.scenarios:
3002
 
            result.addTest(self.adapt_test_to_scenario(test, scenario))
3003
 
        return result
3004
 
 
3005
 
    def adapt_test_to_scenario(self, test, scenario):
3006
 
        """Copy test and apply scenario to it.
3007
 
 
3008
 
        :param test: A test to adapt.
3009
 
        :param scenario: A tuple describing the scenarion.
3010
 
            The first element of the tuple is the new test id.
3011
 
            The second element is a dict containing attributes to set on the
3012
 
            test.
3013
 
        :return: The adapted test.
3014
 
        """
3015
 
        from copy import deepcopy
3016
 
        new_test = deepcopy(test)
3017
 
        for name, value in scenario[1].items():
3018
 
            setattr(new_test, name, value)
3019
 
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
3020
 
        new_test.id = lambda: new_id
3021
 
        return new_test
3022
 
 
3023
 
 
3024
 
def probe_unicode_in_user_encoding():
3025
 
    """Try to encode several unicode strings to use in unicode-aware tests.
3026
 
    Return first successfull match.
3027
 
 
3028
 
    :return:  (unicode value, encoded plain string value) or (None, None)
3029
 
    """
3030
 
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3031
 
    for uni_val in possible_vals:
3032
 
        try:
3033
 
            str_val = uni_val.encode(bzrlib.user_encoding)
3034
 
        except UnicodeEncodeError:
3035
 
            # Try a different character
3036
 
            pass
3037
 
        else:
3038
 
            return uni_val, str_val
3039
 
    return None, None
3040
 
 
3041
 
 
3042
 
def probe_bad_non_ascii(encoding):
3043
 
    """Try to find [bad] character with code [128..255]
3044
 
    that cannot be decoded to unicode in some encoding.
3045
 
    Return None if all non-ascii characters is valid
3046
 
    for given encoding.
3047
 
    """
3048
 
    for i in xrange(128, 256):
3049
 
        char = chr(i)
3050
 
        try:
3051
 
            char.decode(encoding)
3052
 
        except UnicodeDecodeError:
3053
 
            return char
3054
 
    return None
3055
 
 
3056
 
 
3057
 
class _FTPServerFeature(Feature):
3058
 
    """Some tests want an FTP Server, check if one is available.
3059
 
 
3060
 
    Right now, the only way this is available is if 'medusa' is installed.
3061
 
    http://www.amk.ca/python/code/medusa.html
3062
 
    """
3063
 
 
3064
 
    def _probe(self):
3065
 
        try:
3066
 
            import bzrlib.tests.ftp_server
3067
 
            return True
3068
 
        except ImportError:
3069
 
            return False
3070
 
 
3071
 
    def feature_name(self):
3072
 
        return 'FTPServer'
3073
 
 
3074
 
FTPServerFeature = _FTPServerFeature()
3075
 
 
3076
 
 
3077
 
class _CaseInsensitiveFilesystemFeature(Feature):
3078
 
    """Check if underlined filesystem is case-insensitive
3079
 
    (e.g. on Windows, Cygwin, MacOS)
3080
 
    """
3081
 
 
3082
 
    def _probe(self):
3083
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
3084
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3085
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
3086
 
        else:
3087
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
3088
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3089
 
            dir=root)
3090
 
        name_a = osutils.pathjoin(tdir, 'a')
3091
 
        name_A = osutils.pathjoin(tdir, 'A')
3092
 
        os.mkdir(name_a)
3093
 
        result = osutils.isdir(name_A)
3094
 
        _rmtree_temp_dir(tdir)
3095
 
        return result
3096
 
 
3097
 
    def feature_name(self):
3098
 
        return 'case-insensitive filesystem'
3099
 
 
3100
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()