~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/__init__.py

merge merge tweaks from aaron, which includes latest .dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
23
 
 
24
 
# NOTE: Some classes in here use camelCaseNaming() rather than
25
 
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
 
# new assertFoo() methods.
28
 
 
29
 
import atexit
30
 
import codecs
31
18
from cStringIO import StringIO
32
 
import difflib
33
 
import doctest
34
 
import errno
35
19
import logging
 
20
import unittest
 
21
import tempfile
36
22
import os
37
 
from pprint import pformat
38
 
import random
39
 
import re
40
 
import shlex
41
 
import stat
42
 
from subprocess import Popen, PIPE
43
23
import sys
44
 
import tempfile
45
 
import threading
46
 
import time
47
 
import unittest
48
 
import warnings
49
 
 
50
 
 
51
 
from bzrlib import (
52
 
    bzrdir,
53
 
    debug,
54
 
    errors,
55
 
    memorytree,
56
 
    osutils,
57
 
    progress,
58
 
    ui,
59
 
    urlutils,
60
 
    workingtree,
61
 
    )
62
 
import bzrlib.branch
 
24
import errno
 
25
import subprocess
 
26
import shutil
 
27
import testsweet
 
28
 
63
29
import bzrlib.commands
64
 
import bzrlib.timestamp
65
 
import bzrlib.export
66
 
import bzrlib.inventory
67
 
import bzrlib.iterablefile
68
 
import bzrlib.lockdir
69
 
try:
70
 
    import bzrlib.lsprof
71
 
except ImportError:
72
 
    # lsprof not available
73
 
    pass
74
 
from bzrlib.merge import merge_inner
75
 
import bzrlib.merge3
76
 
import bzrlib.plugin
77
 
import bzrlib.store
78
 
from bzrlib import symbol_versioning
79
 
from bzrlib.symbol_versioning import (
80
 
    DEPRECATED_PARAMETER,
81
 
    deprecated_function,
82
 
    deprecated_method,
83
 
    deprecated_passed,
84
 
    )
85
30
import bzrlib.trace
86
 
from bzrlib.transport import get_transport
87
 
import bzrlib.transport
88
 
from bzrlib.transport.local import LocalURLServer
89
 
from bzrlib.transport.memory import MemoryServer
90
 
from bzrlib.transport.readonly import ReadonlyServer
91
 
from bzrlib.trace import mutter, note
92
 
from bzrlib.tests import TestUtil
93
 
from bzrlib.tests.http_server import HttpServer
94
 
from bzrlib.tests.TestUtil import (
95
 
                          TestSuite,
96
 
                          TestLoader,
97
 
                          )
98
 
from bzrlib.tests.treeshape import build_tree_contents
99
 
import bzrlib.version_info_formats.format_custom
100
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
101
 
 
102
 
# Mark this python module as being part of the implementation
103
 
# of unittest: this gives us better tracebacks where the last
104
 
# shown frame is the test code, not our assertXYZ.
105
 
__unittest = 1
106
 
 
107
 
default_transport = LocalURLServer
108
 
 
109
 
 
110
 
class ExtendedTestResult(unittest._TextTestResult):
111
 
    """Accepts, reports and accumulates the results of running tests.
112
 
 
113
 
    Compared to the unittest version this class adds support for
114
 
    profiling, benchmarking, stopping as soon as a test fails,  and
115
 
    skipping tests.  There are further-specialized subclasses for
116
 
    different types of display.
117
 
 
118
 
    When a test finishes, in whatever way, it calls one of the addSuccess,
119
 
    addFailure or addError classes.  These in turn may redirect to a more
120
 
    specific case for the special test results supported by our extended
121
 
    tests.
122
 
 
123
 
    Note that just one of these objects is fed the results from many tests.
124
 
    """
125
 
 
126
 
    stop_early = False
127
 
    
128
 
    def __init__(self, stream, descriptions, verbosity,
129
 
                 bench_history=None,
130
 
                 num_tests=None,
131
 
                 ):
132
 
        """Construct new TestResult.
133
 
 
134
 
        :param bench_history: Optionally, a writable file object to accumulate
135
 
            benchmark results.
136
 
        """
137
 
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
138
 
        if bench_history is not None:
139
 
            from bzrlib.version import _get_bzr_source_tree
140
 
            src_tree = _get_bzr_source_tree()
141
 
            if src_tree:
142
 
                try:
143
 
                    revision_id = src_tree.get_parent_ids()[0]
144
 
                except IndexError:
145
 
                    # XXX: if this is a brand new tree, do the same as if there
146
 
                    # is no branch.
147
 
                    revision_id = ''
148
 
            else:
149
 
                # XXX: If there's no branch, what should we do?
150
 
                revision_id = ''
151
 
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
152
 
        self._bench_history = bench_history
153
 
        self.ui = ui.ui_factory
154
 
        self.num_tests = num_tests
155
 
        self.error_count = 0
156
 
        self.failure_count = 0
157
 
        self.known_failure_count = 0
158
 
        self.skip_count = 0
159
 
        self.not_applicable_count = 0
160
 
        self.unsupported = {}
161
 
        self.count = 0
162
 
        self._overall_start_time = time.time()
163
 
    
164
 
    def _extractBenchmarkTime(self, testCase):
165
 
        """Add a benchmark time for the current test case."""
166
 
        return getattr(testCase, "_benchtime", None)
167
 
    
168
 
    def _elapsedTestTimeString(self):
169
 
        """Return a time string for the overall time the current test has taken."""
170
 
        return self._formatTime(time.time() - self._start_time)
171
 
 
172
 
    def _testTimeString(self, testCase):
173
 
        benchmark_time = self._extractBenchmarkTime(testCase)
174
 
        if benchmark_time is not None:
175
 
            return "%s/%s" % (
176
 
                self._formatTime(benchmark_time),
177
 
                self._elapsedTestTimeString())
178
 
        else:
179
 
            return "           %s" % self._elapsedTestTimeString()
180
 
 
181
 
    def _formatTime(self, seconds):
182
 
        """Format seconds as milliseconds with leading spaces."""
183
 
        # some benchmarks can take thousands of seconds to run, so we need 8
184
 
        # places
185
 
        return "%8dms" % (1000 * seconds)
186
 
 
187
 
    def _shortened_test_description(self, test):
188
 
        what = test.id()
189
 
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
190
 
        return what
191
 
 
192
 
    def startTest(self, test):
193
 
        unittest.TestResult.startTest(self, test)
194
 
        self.report_test_start(test)
195
 
        test.number = self.count
196
 
        self._recordTestStartTime()
197
 
 
198
 
    def _recordTestStartTime(self):
199
 
        """Record that a test has started."""
200
 
        self._start_time = time.time()
201
 
 
202
 
    def _cleanupLogFile(self, test):
203
 
        # We can only do this if we have one of our TestCases, not if
204
 
        # we have a doctest.
205
 
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
206
 
        if setKeepLogfile is not None:
207
 
            setKeepLogfile()
208
 
 
209
 
    def addError(self, test, err):
210
 
        """Tell result that test finished with an error.
211
 
 
212
 
        Called from the TestCase run() method when the test
213
 
        fails with an unexpected error.
214
 
        """
215
 
        self._testConcluded(test)
216
 
        if isinstance(err[1], TestSkipped):
217
 
            return self._addSkipped(test, err)
218
 
        elif isinstance(err[1], UnavailableFeature):
219
 
            return self.addNotSupported(test, err[1].args[0])
220
 
        else:
221
 
            self._cleanupLogFile(test)
222
 
            unittest.TestResult.addError(self, test, err)
223
 
            self.error_count += 1
224
 
            self.report_error(test, err)
225
 
            if self.stop_early:
226
 
                self.stop()
227
 
 
228
 
    def addFailure(self, test, err):
229
 
        """Tell result that test failed.
230
 
 
231
 
        Called from the TestCase run() method when the test
232
 
        fails because e.g. an assert() method failed.
233
 
        """
234
 
        self._testConcluded(test)
235
 
        if isinstance(err[1], KnownFailure):
236
 
            return self._addKnownFailure(test, err)
237
 
        else:
238
 
            self._cleanupLogFile(test)
239
 
            unittest.TestResult.addFailure(self, test, err)
240
 
            self.failure_count += 1
241
 
            self.report_failure(test, err)
242
 
            if self.stop_early:
243
 
                self.stop()
244
 
 
245
 
    def addSuccess(self, test):
246
 
        """Tell result that test completed successfully.
247
 
 
248
 
        Called from the TestCase run()
249
 
        """
250
 
        self._testConcluded(test)
251
 
        if self._bench_history is not None:
252
 
            benchmark_time = self._extractBenchmarkTime(test)
253
 
            if benchmark_time is not None:
254
 
                self._bench_history.write("%s %s\n" % (
255
 
                    self._formatTime(benchmark_time),
256
 
                    test.id()))
257
 
        self.report_success(test)
258
 
        self._cleanupLogFile(test)
259
 
        unittest.TestResult.addSuccess(self, test)
260
 
        test._log_contents = ''
261
 
 
262
 
    def _testConcluded(self, test):
263
 
        """Common code when a test has finished.
264
 
 
265
 
        Called regardless of whether it succeded, failed, etc.
266
 
        """
267
 
        pass
268
 
 
269
 
    def _addKnownFailure(self, test, err):
270
 
        self.known_failure_count += 1
271
 
        self.report_known_failure(test, err)
272
 
 
273
 
    def addNotSupported(self, test, feature):
274
 
        """The test will not be run because of a missing feature.
275
 
        """
276
 
        # this can be called in two different ways: it may be that the
277
 
        # test started running, and then raised (through addError) 
278
 
        # UnavailableFeature.  Alternatively this method can be called
279
 
        # while probing for features before running the tests; in that
280
 
        # case we will see startTest and stopTest, but the test will never
281
 
        # actually run.
282
 
        self.unsupported.setdefault(str(feature), 0)
283
 
        self.unsupported[str(feature)] += 1
284
 
        self.report_unsupported(test, feature)
285
 
 
286
 
    def _addSkipped(self, test, skip_excinfo):
287
 
        if isinstance(skip_excinfo[1], TestNotApplicable):
288
 
            self.not_applicable_count += 1
289
 
            self.report_not_applicable(test, skip_excinfo)
290
 
        else:
291
 
            self.skip_count += 1
292
 
            self.report_skip(test, skip_excinfo)
293
 
        try:
294
 
            test.tearDown()
295
 
        except KeyboardInterrupt:
296
 
            raise
297
 
        except:
298
 
            self.addError(test, test._exc_info())
299
 
        else:
300
 
            # seems best to treat this as success from point-of-view of unittest
301
 
            # -- it actually does nothing so it barely matters :)
302
 
            unittest.TestResult.addSuccess(self, test)
303
 
            test._log_contents = ''
304
 
 
305
 
    def printErrorList(self, flavour, errors):
306
 
        for test, err in errors:
307
 
            self.stream.writeln(self.separator1)
308
 
            self.stream.write("%s: " % flavour)
309
 
            self.stream.writeln(self.getDescription(test))
310
 
            if getattr(test, '_get_log', None) is not None:
311
 
                self.stream.write('\n')
312
 
                self.stream.write(
313
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
314
 
                self.stream.write('\n')
315
 
                self.stream.write(test._get_log())
316
 
                self.stream.write('\n')
317
 
                self.stream.write(
318
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
319
 
                self.stream.write('\n')
320
 
            self.stream.writeln(self.separator2)
321
 
            self.stream.writeln("%s" % err)
322
 
 
323
 
    def finished(self):
324
 
        pass
325
 
 
326
 
    def report_cleaning_up(self):
327
 
        pass
328
 
 
329
 
    def report_success(self, test):
330
 
        pass
331
 
 
332
 
    def wasStrictlySuccessful(self):
333
 
        if self.unsupported or self.known_failure_count:
334
 
            return False
335
 
        return self.wasSuccessful()
336
 
 
337
 
 
338
 
class TextTestResult(ExtendedTestResult):
339
 
    """Displays progress and results of tests in text form"""
340
 
 
341
 
    def __init__(self, stream, descriptions, verbosity,
342
 
                 bench_history=None,
343
 
                 num_tests=None,
344
 
                 pb=None,
345
 
                 ):
346
 
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
347
 
            bench_history, num_tests)
348
 
        if pb is None:
349
 
            self.pb = self.ui.nested_progress_bar()
350
 
            self._supplied_pb = False
351
 
        else:
352
 
            self.pb = pb
353
 
            self._supplied_pb = True
354
 
        self.pb.show_pct = False
355
 
        self.pb.show_spinner = False
356
 
        self.pb.show_eta = False,
357
 
        self.pb.show_count = False
358
 
        self.pb.show_bar = False
359
 
 
360
 
    def report_starting(self):
361
 
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
362
 
 
363
 
    def _progress_prefix_text(self):
364
 
        # the longer this text, the less space we have to show the test
365
 
        # name...
366
 
        a = '[%d' % self.count              # total that have been run
367
 
        # tests skipped as known not to be relevant are not important enough
368
 
        # to show here
369
 
        ## if self.skip_count:
370
 
        ##     a += ', %d skip' % self.skip_count
371
 
        ## if self.known_failure_count:
372
 
        ##     a += '+%dX' % self.known_failure_count
373
 
        if self.num_tests is not None:
374
 
            a +='/%d' % self.num_tests
375
 
        a += ' in '
376
 
        runtime = time.time() - self._overall_start_time
377
 
        if runtime >= 60:
378
 
            a += '%dm%ds' % (runtime / 60, runtime % 60)
379
 
        else:
380
 
            a += '%ds' % runtime
381
 
        if self.error_count:
382
 
            a += ', %d err' % self.error_count
383
 
        if self.failure_count:
384
 
            a += ', %d fail' % self.failure_count
385
 
        if self.unsupported:
386
 
            a += ', %d missing' % len(self.unsupported)
387
 
        a += ']'
388
 
        return a
389
 
 
390
 
    def report_test_start(self, test):
391
 
        self.count += 1
392
 
        self.pb.update(
393
 
                self._progress_prefix_text()
394
 
                + ' ' 
395
 
                + self._shortened_test_description(test))
396
 
 
397
 
    def _test_description(self, test):
398
 
        return self._shortened_test_description(test)
399
 
 
400
 
    def report_error(self, test, err):
401
 
        self.pb.note('ERROR: %s\n    %s\n', 
402
 
            self._test_description(test),
403
 
            err[1],
404
 
            )
405
 
 
406
 
    def report_failure(self, test, err):
407
 
        self.pb.note('FAIL: %s\n    %s\n', 
408
 
            self._test_description(test),
409
 
            err[1],
410
 
            )
411
 
 
412
 
    def report_known_failure(self, test, err):
413
 
        self.pb.note('XFAIL: %s\n%s\n',
414
 
            self._test_description(test), err[1])
415
 
 
416
 
    def report_skip(self, test, skip_excinfo):
417
 
        pass
418
 
 
419
 
    def report_not_applicable(self, test, skip_excinfo):
420
 
        pass
421
 
 
422
 
    def report_unsupported(self, test, feature):
423
 
        """test cannot be run because feature is missing."""
424
 
                  
425
 
    def report_cleaning_up(self):
426
 
        self.pb.update('cleaning up...')
427
 
 
428
 
    def finished(self):
429
 
        if not self._supplied_pb:
430
 
            self.pb.finished()
431
 
 
432
 
 
433
 
class VerboseTestResult(ExtendedTestResult):
434
 
    """Produce long output, with one line per test run plus times"""
435
 
 
436
 
    def _ellipsize_to_right(self, a_string, final_width):
437
 
        """Truncate and pad a string, keeping the right hand side"""
438
 
        if len(a_string) > final_width:
439
 
            result = '...' + a_string[3-final_width:]
440
 
        else:
441
 
            result = a_string
442
 
        return result.ljust(final_width)
443
 
 
444
 
    def report_starting(self):
445
 
        self.stream.write('running %d tests...\n' % self.num_tests)
446
 
 
447
 
    def report_test_start(self, test):
448
 
        self.count += 1
449
 
        name = self._shortened_test_description(test)
450
 
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
451
 
        # numbers, plus a trailing blank
452
 
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
453
 
        self.stream.write(self._ellipsize_to_right(name,
454
 
                          osutils.terminal_width()-30))
455
 
        self.stream.flush()
456
 
 
457
 
    def _error_summary(self, err):
458
 
        indent = ' ' * 4
459
 
        return '%s%s' % (indent, err[1])
460
 
 
461
 
    def report_error(self, test, err):
462
 
        self.stream.writeln('ERROR %s\n%s'
463
 
                % (self._testTimeString(test),
464
 
                   self._error_summary(err)))
465
 
 
466
 
    def report_failure(self, test, err):
467
 
        self.stream.writeln(' FAIL %s\n%s'
468
 
                % (self._testTimeString(test),
469
 
                   self._error_summary(err)))
470
 
 
471
 
    def report_known_failure(self, test, err):
472
 
        self.stream.writeln('XFAIL %s\n%s'
473
 
                % (self._testTimeString(test),
474
 
                   self._error_summary(err)))
475
 
 
476
 
    def report_success(self, test):
477
 
        self.stream.writeln('   OK %s' % self._testTimeString(test))
478
 
        for bench_called, stats in getattr(test, '_benchcalls', []):
479
 
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
480
 
            stats.pprint(file=self.stream)
481
 
        # flush the stream so that we get smooth output. This verbose mode is
482
 
        # used to show the output in PQM.
483
 
        self.stream.flush()
484
 
 
485
 
    def report_skip(self, test, skip_excinfo):
486
 
        self.stream.writeln(' SKIP %s\n%s'
487
 
                % (self._testTimeString(test),
488
 
                   self._error_summary(skip_excinfo)))
489
 
 
490
 
    def report_not_applicable(self, test, skip_excinfo):
491
 
        self.stream.writeln('  N/A %s\n%s'
492
 
                % (self._testTimeString(test),
493
 
                   self._error_summary(skip_excinfo)))
494
 
 
495
 
    def report_unsupported(self, test, feature):
496
 
        """test cannot be run because feature is missing."""
497
 
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
498
 
                %(self._testTimeString(test), feature))
499
 
 
500
 
 
501
 
class TextTestRunner(object):
502
 
    stop_on_failure = False
503
 
 
504
 
    def __init__(self,
505
 
                 stream=sys.stderr,
506
 
                 descriptions=0,
507
 
                 verbosity=1,
508
 
                 bench_history=None,
509
 
                 list_only=False
510
 
                 ):
511
 
        self.stream = unittest._WritelnDecorator(stream)
512
 
        self.descriptions = descriptions
513
 
        self.verbosity = verbosity
514
 
        self._bench_history = bench_history
515
 
        self.list_only = list_only
516
 
 
517
 
    def run(self, test):
518
 
        "Run the given test case or test suite."
519
 
        startTime = time.time()
520
 
        if self.verbosity == 1:
521
 
            result_class = TextTestResult
522
 
        elif self.verbosity >= 2:
523
 
            result_class = VerboseTestResult
524
 
        result = result_class(self.stream,
525
 
                              self.descriptions,
526
 
                              self.verbosity,
527
 
                              bench_history=self._bench_history,
528
 
                              num_tests=test.countTestCases(),
529
 
                              )
530
 
        result.stop_early = self.stop_on_failure
531
 
        result.report_starting()
532
 
        if self.list_only:
533
 
            if self.verbosity >= 2:
534
 
                self.stream.writeln("Listing tests only ...\n")
535
 
            run = 0
536
 
            for t in iter_suite_tests(test):
537
 
                self.stream.writeln("%s" % (t.id()))
538
 
                run += 1
539
 
            actionTaken = "Listed"
540
 
        else: 
541
 
            test.run(result)
542
 
            run = result.testsRun
543
 
            actionTaken = "Ran"
544
 
        stopTime = time.time()
545
 
        timeTaken = stopTime - startTime
546
 
        result.printErrors()
547
 
        self.stream.writeln(result.separator2)
548
 
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
549
 
                            run, run != 1 and "s" or "", timeTaken))
550
 
        self.stream.writeln()
551
 
        if not result.wasSuccessful():
552
 
            self.stream.write("FAILED (")
553
 
            failed, errored = map(len, (result.failures, result.errors))
554
 
            if failed:
555
 
                self.stream.write("failures=%d" % failed)
556
 
            if errored:
557
 
                if failed: self.stream.write(", ")
558
 
                self.stream.write("errors=%d" % errored)
559
 
            if result.known_failure_count:
560
 
                if failed or errored: self.stream.write(", ")
561
 
                self.stream.write("known_failure_count=%d" %
562
 
                    result.known_failure_count)
563
 
            self.stream.writeln(")")
564
 
        else:
565
 
            if result.known_failure_count:
566
 
                self.stream.writeln("OK (known_failures=%d)" %
567
 
                    result.known_failure_count)
568
 
            else:
569
 
                self.stream.writeln("OK")
570
 
        if result.skip_count > 0:
571
 
            skipped = result.skip_count
572
 
            self.stream.writeln('%d test%s skipped' %
573
 
                                (skipped, skipped != 1 and "s" or ""))
574
 
        if result.unsupported:
575
 
            for feature, count in sorted(result.unsupported.items()):
576
 
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
577
 
                    (feature, count))
578
 
        result.finished()
579
 
        return result
580
 
 
581
 
 
582
 
def iter_suite_tests(suite):
583
 
    """Return all tests in a suite, recursing through nested suites"""
584
 
    for item in suite._tests:
585
 
        if isinstance(item, unittest.TestCase):
586
 
            yield item
587
 
        elif isinstance(item, unittest.TestSuite):
588
 
            for r in iter_suite_tests(item):
589
 
                yield r
590
 
        else:
591
 
            raise Exception('unknown object %r inside test suite %r'
592
 
                            % (item, suite))
593
 
 
594
 
 
595
 
class TestSkipped(Exception):
596
 
    """Indicates that a test was intentionally skipped, rather than failing."""
597
 
 
598
 
 
599
 
class TestNotApplicable(TestSkipped):
600
 
    """A test is not applicable to the situation where it was run.
601
 
 
602
 
    This is only normally raised by parameterized tests, if they find that 
603
 
    the instance they're constructed upon does not support one aspect 
604
 
    of its interface.
605
 
    """
606
 
 
607
 
 
608
 
class KnownFailure(AssertionError):
609
 
    """Indicates that a test failed in a precisely expected manner.
610
 
 
611
 
    Such failures dont block the whole test suite from passing because they are
612
 
    indicators of partially completed code or of future work. We have an
613
 
    explicit error for them so that we can ensure that they are always visible:
614
 
    KnownFailures are always shown in the output of bzr selftest.
615
 
    """
616
 
 
617
 
 
618
 
class UnavailableFeature(Exception):
619
 
    """A feature required for this test was not available.
620
 
 
621
 
    The feature should be used to construct the exception.
622
 
    """
623
 
 
 
31
import bzrlib.fetch
 
32
 
 
33
 
 
34
MODULES_TO_TEST = []
 
35
MODULES_TO_DOCTEST = []
 
36
 
 
37
from logging import debug, warning, error
624
38
 
625
39
class CommandFailed(Exception):
626
40
    pass
627
41
 
628
 
 
629
 
class StringIOWrapper(object):
630
 
    """A wrapper around cStringIO which just adds an encoding attribute.
631
 
    
632
 
    Internally we can check sys.stdout to see what the output encoding
633
 
    should be. However, cStringIO has no encoding attribute that we can
634
 
    set. So we wrap it instead.
635
 
    """
636
 
    encoding='ascii'
637
 
    _cstring = None
638
 
 
639
 
    def __init__(self, s=None):
640
 
        if s is not None:
641
 
            self.__dict__['_cstring'] = StringIO(s)
642
 
        else:
643
 
            self.__dict__['_cstring'] = StringIO()
644
 
 
645
 
    def __getattr__(self, name, getattr=getattr):
646
 
        return getattr(self.__dict__['_cstring'], name)
647
 
 
648
 
    def __setattr__(self, name, val):
649
 
        if name == 'encoding':
650
 
            self.__dict__['encoding'] = val
651
 
        else:
652
 
            return setattr(self._cstring, name, val)
653
 
 
654
 
 
655
 
class TestUIFactory(ui.CLIUIFactory):
656
 
    """A UI Factory for testing.
657
 
 
658
 
    Hide the progress bar but emit note()s.
659
 
    Redirect stdin.
660
 
    Allows get_password to be tested without real tty attached.
661
 
    """
662
 
 
663
 
    def __init__(self,
664
 
                 stdout=None,
665
 
                 stderr=None,
666
 
                 stdin=None):
667
 
        super(TestUIFactory, self).__init__()
668
 
        if stdin is not None:
669
 
            # We use a StringIOWrapper to be able to test various
670
 
            # encodings, but the user is still responsible to
671
 
            # encode the string and to set the encoding attribute
672
 
            # of StringIOWrapper.
673
 
            self.stdin = StringIOWrapper(stdin)
674
 
        if stdout is None:
675
 
            self.stdout = sys.stdout
676
 
        else:
677
 
            self.stdout = stdout
678
 
        if stderr is None:
679
 
            self.stderr = sys.stderr
680
 
        else:
681
 
            self.stderr = stderr
682
 
 
683
 
    def clear(self):
684
 
        """See progress.ProgressBar.clear()."""
685
 
 
686
 
    def clear_term(self):
687
 
        """See progress.ProgressBar.clear_term()."""
688
 
 
689
 
    def clear_term(self):
690
 
        """See progress.ProgressBar.clear_term()."""
691
 
 
692
 
    def finished(self):
693
 
        """See progress.ProgressBar.finished()."""
694
 
 
695
 
    def note(self, fmt_string, *args, **kwargs):
696
 
        """See progress.ProgressBar.note()."""
697
 
        self.stdout.write((fmt_string + "\n") % args)
698
 
 
699
 
    def progress_bar(self):
700
 
        return self
701
 
 
702
 
    def nested_progress_bar(self):
703
 
        return self
704
 
 
705
 
    def update(self, message, count=None, total=None):
706
 
        """See progress.ProgressBar.update()."""
707
 
 
708
 
    def get_non_echoed_password(self, prompt):
709
 
        """Get password from stdin without trying to handle the echo mode"""
710
 
        if prompt:
711
 
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
712
 
        password = self.stdin.readline()
713
 
        if not password:
714
 
            raise EOFError
715
 
        if password[-1] == '\n':
716
 
            password = password[:-1]
717
 
        return password
718
 
 
719
 
 
720
 
def _report_leaked_threads():
721
 
    bzrlib.trace.warning('%s is leaking threads among %d leaking tests',
722
 
                         TestCase._first_thread_leaker_id,
723
 
                         TestCase._leaking_threads_tests)
724
 
 
725
 
 
726
42
class TestCase(unittest.TestCase):
727
43
    """Base class for bzr unit tests.
728
44
    
731
47
 
732
48
    Error and debug log messages are redirected from their usual
733
49
    location into a temporary file, the contents of which can be
734
 
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
735
 
    so that it can also capture file IO.  When the test completes this file
736
 
    is read into memory and removed from disk.
 
50
    retrieved by _get_log().
737
51
       
738
52
    There are also convenience functions to invoke bzr's command-line
739
 
    routine, and to build and check bzr trees.
740
 
   
741
 
    In addition to the usual method of overriding tearDown(), this class also
742
 
    allows subclasses to register functions into the _cleanups list, which is
743
 
    run in order as the object is torn down.  It's less likely this will be
744
 
    accidentally overlooked.
745
 
    """
746
 
 
747
 
    _active_threads = None
748
 
    _leaking_threads_tests = 0
749
 
    _first_thread_leaker_id = None
750
 
    _log_file_name = None
751
 
    _log_contents = ''
752
 
    _keep_log_file = False
753
 
    # record lsprof data when performing benchmark calls.
754
 
    _gather_lsprof_in_benchmarks = False
755
 
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
756
 
                     '_log_contents', '_log_file_name', '_benchtime',
757
 
                     '_TestCase__testMethodName')
758
 
 
759
 
    def __init__(self, methodName='testMethod'):
760
 
        super(TestCase, self).__init__(methodName)
761
 
        self._cleanups = []
 
53
    routine, and to build and check bzr trees."""
 
54
 
 
55
    BZRPATH = 'bzr'
762
56
 
763
57
    def setUp(self):
 
58
        # this replaces the default testsweet.TestCase; we don't want logging changed
764
59
        unittest.TestCase.setUp(self)
765
 
        self._cleanEnvironment()
766
 
        self._silenceUI()
767
 
        self._startLogFile()
768
 
        self._benchcalls = []
769
 
        self._benchtime = None
770
 
        self._clear_hooks()
771
 
        self._clear_debug_flags()
772
 
        TestCase._active_threads = threading.activeCount()
773
 
        self.addCleanup(self._check_leaked_threads)
774
 
 
775
 
    def _check_leaked_threads(self):
776
 
        active = threading.activeCount()
777
 
        leaked_threads = active - TestCase._active_threads
778
 
        TestCase._active_threads = active
779
 
        if leaked_threads:
780
 
            TestCase._leaking_threads_tests += 1
781
 
            if TestCase._first_thread_leaker_id is None:
782
 
                TestCase._first_thread_leaker_id = self.id()
783
 
                # we're not specifically told when all tests are finished.
784
 
                # This will do. We use a function to avoid keeping a reference
785
 
                # to a TestCase object.
786
 
                atexit.register(_report_leaked_threads)
787
 
 
788
 
    def _clear_debug_flags(self):
789
 
        """Prevent externally set debug flags affecting tests.
790
 
        
791
 
        Tests that want to use debug flags can just set them in the
792
 
        debug_flags set during setup/teardown.
793
 
        """
794
 
        if 'allow_debug' not in selftest_debug_flags:
795
 
            self._preserved_debug_flags = set(debug.debug_flags)
796
 
            debug.debug_flags.clear()
797
 
            self.addCleanup(self._restore_debug_flags)
798
 
 
799
 
    def _clear_hooks(self):
800
 
        # prevent hooks affecting tests
801
 
        import bzrlib.branch
802
 
        import bzrlib.smart.server
803
 
        self._preserved_hooks = {
804
 
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
805
 
            bzrlib.mutabletree.MutableTree: bzrlib.mutabletree.MutableTree.hooks,
806
 
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
807
 
            }
808
 
        self.addCleanup(self._restoreHooks)
809
 
        # reset all hooks to an empty instance of the appropriate type
810
 
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
811
 
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
812
 
 
813
 
    def _silenceUI(self):
814
 
        """Turn off UI for duration of test"""
815
 
        # by default the UI is off; tests can turn it on if they want it.
816
 
        saved = ui.ui_factory
817
 
        def _restore():
818
 
            ui.ui_factory = saved
819
 
        ui.ui_factory = ui.SilentUIFactory()
820
 
        self.addCleanup(_restore)
821
 
 
822
 
    def _ndiff_strings(self, a, b):
823
 
        """Return ndiff between two strings containing lines.
824
 
        
825
 
        A trailing newline is added if missing to make the strings
826
 
        print properly."""
827
 
        if b and b[-1] != '\n':
828
 
            b += '\n'
829
 
        if a and a[-1] != '\n':
830
 
            a += '\n'
831
 
        difflines = difflib.ndiff(a.splitlines(True),
832
 
                                  b.splitlines(True),
833
 
                                  linejunk=lambda x: False,
834
 
                                  charjunk=lambda x: False)
835
 
        return ''.join(difflines)
836
 
 
837
 
    def assertEqual(self, a, b, message=''):
838
 
        try:
839
 
            if a == b:
840
 
                return
841
 
        except UnicodeError, e:
842
 
            # If we can't compare without getting a UnicodeError, then
843
 
            # obviously they are different
844
 
            mutter('UnicodeError: %s', e)
845
 
        if message:
846
 
            message += '\n'
847
 
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
848
 
            % (message,
849
 
               pformat(a), pformat(b)))
850
 
 
851
 
    assertEquals = assertEqual
852
 
 
853
 
    def assertEqualDiff(self, a, b, message=None):
854
 
        """Assert two texts are equal, if not raise an exception.
855
 
        
856
 
        This is intended for use with multi-line strings where it can 
857
 
        be hard to find the differences by eye.
858
 
        """
859
 
        # TODO: perhaps override assertEquals to call this for strings?
860
 
        if a == b:
861
 
            return
862
 
        if message is None:
863
 
            message = "texts not equal:\n"
864
 
        if a == b + '\n':
865
 
            message = 'first string is missing a final newline.\n'
866
 
        if a + '\n' == b:
867
 
            message = 'second string is missing a final newline.\n'
868
 
        raise AssertionError(message +
869
 
                             self._ndiff_strings(a, b))
870
 
        
871
 
    def assertEqualMode(self, mode, mode_test):
872
 
        self.assertEqual(mode, mode_test,
873
 
                         'mode mismatch %o != %o' % (mode, mode_test))
874
 
 
875
 
    def assertPositive(self, val):
876
 
        """Assert that val is greater than 0."""
877
 
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
878
 
 
879
 
    def assertNegative(self, val):
880
 
        """Assert that val is less than 0."""
881
 
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
882
 
 
883
 
    def assertStartsWith(self, s, prefix):
884
 
        if not s.startswith(prefix):
885
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
886
 
 
887
 
    def assertEndsWith(self, s, suffix):
888
 
        """Asserts that s ends with suffix."""
889
 
        if not s.endswith(suffix):
890
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
891
 
 
892
 
    def assertContainsRe(self, haystack, needle_re):
893
 
        """Assert that a contains something matching a regular expression."""
894
 
        if not re.search(needle_re, haystack):
895
 
            if '\n' in haystack or len(haystack) > 60:
896
 
                # a long string, format it in a more readable way
897
 
                raise AssertionError(
898
 
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
899
 
                        % (needle_re, haystack))
900
 
            else:
901
 
                raise AssertionError('pattern "%s" not found in "%s"'
902
 
                        % (needle_re, haystack))
903
 
 
904
 
    def assertNotContainsRe(self, haystack, needle_re):
905
 
        """Assert that a does not match a regular expression"""
906
 
        if re.search(needle_re, haystack):
907
 
            raise AssertionError('pattern "%s" found in "%s"'
908
 
                    % (needle_re, haystack))
909
 
 
910
 
    def assertSubset(self, sublist, superlist):
911
 
        """Assert that every entry in sublist is present in superlist."""
912
 
        missing = set(sublist) - set(superlist)
913
 
        if len(missing) > 0:
914
 
            raise AssertionError("value(s) %r not present in container %r" %
915
 
                                 (missing, superlist))
916
 
 
917
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
918
 
        """Fail unless excClass is raised when the iterator from func is used.
919
 
        
920
 
        Many functions can return generators this makes sure
921
 
        to wrap them in a list() call to make sure the whole generator
922
 
        is run, and that the proper exception is raised.
923
 
        """
924
 
        try:
925
 
            list(func(*args, **kwargs))
926
 
        except excClass:
927
 
            return
928
 
        else:
929
 
            if getattr(excClass,'__name__', None) is not None:
930
 
                excName = excClass.__name__
931
 
            else:
932
 
                excName = str(excClass)
933
 
            raise self.failureException, "%s not raised" % excName
934
 
 
935
 
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
936
 
        """Assert that a callable raises a particular exception.
937
 
 
938
 
        :param excClass: As for the except statement, this may be either an
939
 
            exception class, or a tuple of classes.
940
 
        :param callableObj: A callable, will be passed ``*args`` and
941
 
            ``**kwargs``.
942
 
 
943
 
        Returns the exception so that you can examine it.
944
 
        """
945
 
        try:
946
 
            callableObj(*args, **kwargs)
947
 
        except excClass, e:
948
 
            return e
949
 
        else:
950
 
            if getattr(excClass,'__name__', None) is not None:
951
 
                excName = excClass.__name__
952
 
            else:
953
 
                # probably a tuple
954
 
                excName = str(excClass)
955
 
            raise self.failureException, "%s not raised" % excName
956
 
 
957
 
    def assertIs(self, left, right, message=None):
958
 
        if not (left is right):
959
 
            if message is not None:
960
 
                raise AssertionError(message)
961
 
            else:
962
 
                raise AssertionError("%r is not %r." % (left, right))
963
 
 
964
 
    def assertIsNot(self, left, right, message=None):
965
 
        if (left is right):
966
 
            if message is not None:
967
 
                raise AssertionError(message)
968
 
            else:
969
 
                raise AssertionError("%r is %r." % (left, right))
970
 
 
971
 
    def assertTransportMode(self, transport, path, mode):
972
 
        """Fail if a path does not have mode mode.
973
 
        
974
 
        If modes are not supported on this transport, the assertion is ignored.
975
 
        """
976
 
        if not transport._can_roundtrip_unix_modebits():
977
 
            return
978
 
        path_stat = transport.stat(path)
979
 
        actual_mode = stat.S_IMODE(path_stat.st_mode)
980
 
        self.assertEqual(mode, actual_mode,
981
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
982
 
 
983
 
    def assertIsSameRealPath(self, path1, path2):
984
 
        """Fail if path1 and path2 points to different files"""
985
 
        self.assertEqual(osutils.realpath(path1),
986
 
                         osutils.realpath(path2),
987
 
                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
988
 
 
989
 
    def assertIsInstance(self, obj, kls):
990
 
        """Fail if obj is not an instance of kls"""
991
 
        if not isinstance(obj, kls):
992
 
            self.fail("%r is an instance of %s rather than %s" % (
993
 
                obj, obj.__class__, kls))
994
 
 
995
 
    def expectFailure(self, reason, assertion, *args, **kwargs):
996
 
        """Invoke a test, expecting it to fail for the given reason.
997
 
 
998
 
        This is for assertions that ought to succeed, but currently fail.
999
 
        (The failure is *expected* but not *wanted*.)  Please be very precise
1000
 
        about the failure you're expecting.  If a new bug is introduced,
1001
 
        AssertionError should be raised, not KnownFailure.
1002
 
 
1003
 
        Frequently, expectFailure should be followed by an opposite assertion.
1004
 
        See example below.
1005
 
 
1006
 
        Intended to be used with a callable that raises AssertionError as the
1007
 
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
1008
 
 
1009
 
        Raises KnownFailure if the test fails.  Raises AssertionError if the
1010
 
        test succeeds.
1011
 
 
1012
 
        example usage::
1013
 
 
1014
 
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
1015
 
                             dynamic_val)
1016
 
          self.assertEqual(42, dynamic_val)
1017
 
 
1018
 
          This means that a dynamic_val of 54 will cause the test to raise
1019
 
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
1020
 
          only a dynamic_val of 42 will allow the test to pass.  Anything other
1021
 
          than 54 or 42 will cause an AssertionError.
1022
 
        """
1023
 
        try:
1024
 
            assertion(*args, **kwargs)
1025
 
        except AssertionError:
1026
 
            raise KnownFailure(reason)
1027
 
        else:
1028
 
            self.fail('Unexpected success.  Should have failed: %s' % reason)
1029
 
 
1030
 
    def assertFileEqual(self, content, path):
1031
 
        """Fail if path does not contain 'content'."""
1032
 
        self.failUnlessExists(path)
1033
 
        f = file(path, 'rb')
1034
 
        try:
1035
 
            s = f.read()
1036
 
        finally:
1037
 
            f.close()
1038
 
        self.assertEqualDiff(content, s)
1039
 
 
1040
 
    def failUnlessExists(self, path):
1041
 
        """Fail unless path or paths, which may be abs or relative, exist."""
1042
 
        if not isinstance(path, basestring):
1043
 
            for p in path:
1044
 
                self.failUnlessExists(p)
1045
 
        else:
1046
 
            self.failUnless(osutils.lexists(path),path+" does not exist")
1047
 
 
1048
 
    def failIfExists(self, path):
1049
 
        """Fail if path or paths, which may be abs or relative, exist."""
1050
 
        if not isinstance(path, basestring):
1051
 
            for p in path:
1052
 
                self.failIfExists(p)
1053
 
        else:
1054
 
            self.failIf(osutils.lexists(path),path+" exists")
1055
 
 
1056
 
    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1057
 
        """A helper for callDeprecated and applyDeprecated.
1058
 
 
1059
 
        :param a_callable: A callable to call.
1060
 
        :param args: The positional arguments for the callable
1061
 
        :param kwargs: The keyword arguments for the callable
1062
 
        :return: A tuple (warnings, result). result is the result of calling
1063
 
            a_callable(``*args``, ``**kwargs``).
1064
 
        """
1065
 
        local_warnings = []
1066
 
        def capture_warnings(msg, cls=None, stacklevel=None):
1067
 
            # we've hooked into a deprecation specific callpath,
1068
 
            # only deprecations should getting sent via it.
1069
 
            self.assertEqual(cls, DeprecationWarning)
1070
 
            local_warnings.append(msg)
1071
 
        original_warning_method = symbol_versioning.warn
1072
 
        symbol_versioning.set_warning_method(capture_warnings)
1073
 
        try:
1074
 
            result = a_callable(*args, **kwargs)
1075
 
        finally:
1076
 
            symbol_versioning.set_warning_method(original_warning_method)
1077
 
        return (local_warnings, result)
1078
 
 
1079
 
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1080
 
        """Call a deprecated callable without warning the user.
1081
 
 
1082
 
        Note that this only captures warnings raised by symbol_versioning.warn,
1083
 
        not other callers that go direct to the warning module.
1084
 
 
1085
 
        To test that a deprecated method raises an error, do something like
1086
 
        this::
1087
 
 
1088
 
            self.assertRaises(errors.ReservedId,
1089
 
                self.applyDeprecated,
1090
 
                deprecated_in((1, 5, 0)),
1091
 
                br.append_revision,
1092
 
                'current:')
1093
 
 
1094
 
        :param deprecation_format: The deprecation format that the callable
1095
 
            should have been deprecated with. This is the same type as the
1096
 
            parameter to deprecated_method/deprecated_function. If the
1097
 
            callable is not deprecated with this format, an assertion error
1098
 
            will be raised.
1099
 
        :param a_callable: A callable to call. This may be a bound method or
1100
 
            a regular function. It will be called with ``*args`` and
1101
 
            ``**kwargs``.
1102
 
        :param args: The positional arguments for the callable
1103
 
        :param kwargs: The keyword arguments for the callable
1104
 
        :return: The result of a_callable(``*args``, ``**kwargs``)
1105
 
        """
1106
 
        call_warnings, result = self._capture_deprecation_warnings(a_callable,
1107
 
            *args, **kwargs)
1108
 
        expected_first_warning = symbol_versioning.deprecation_string(
1109
 
            a_callable, deprecation_format)
1110
 
        if len(call_warnings) == 0:
1111
 
            self.fail("No deprecation warning generated by call to %s" %
1112
 
                a_callable)
1113
 
        self.assertEqual(expected_first_warning, call_warnings[0])
1114
 
        return result
1115
 
 
1116
 
    def callCatchWarnings(self, fn, *args, **kw):
1117
 
        """Call a callable that raises python warnings.
1118
 
 
1119
 
        The caller's responsible for examining the returned warnings.
1120
 
 
1121
 
        If the callable raises an exception, the exception is not
1122
 
        caught and propagates up to the caller.  In that case, the list
1123
 
        of warnings is not available.
1124
 
 
1125
 
        :returns: ([warning_object, ...], fn_result)
1126
 
        """
1127
 
        # XXX: This is not perfect, because it completely overrides the
1128
 
        # warnings filters, and some code may depend on suppressing particular
1129
 
        # warnings.  It's the easiest way to insulate ourselves from -Werror,
1130
 
        # though.  -- Andrew, 20071062
1131
 
        wlist = []
1132
 
        def _catcher(message, category, filename, lineno, file=None):
1133
 
            # despite the name, 'message' is normally(?) a Warning subclass
1134
 
            # instance
1135
 
            wlist.append(message)
1136
 
        saved_showwarning = warnings.showwarning
1137
 
        saved_filters = warnings.filters
1138
 
        try:
1139
 
            warnings.showwarning = _catcher
1140
 
            warnings.filters = []
1141
 
            result = fn(*args, **kw)
1142
 
        finally:
1143
 
            warnings.showwarning = saved_showwarning
1144
 
            warnings.filters = saved_filters
1145
 
        return wlist, result
1146
 
 
1147
 
    def callDeprecated(self, expected, callable, *args, **kwargs):
1148
 
        """Assert that a callable is deprecated in a particular way.
1149
 
 
1150
 
        This is a very precise test for unusual requirements. The 
1151
 
        applyDeprecated helper function is probably more suited for most tests
1152
 
        as it allows you to simply specify the deprecation format being used
1153
 
        and will ensure that that is issued for the function being called.
1154
 
 
1155
 
        Note that this only captures warnings raised by symbol_versioning.warn,
1156
 
        not other callers that go direct to the warning module.  To catch
1157
 
        general warnings, use callCatchWarnings.
1158
 
 
1159
 
        :param expected: a list of the deprecation warnings expected, in order
1160
 
        :param callable: The callable to call
1161
 
        :param args: The positional arguments for the callable
1162
 
        :param kwargs: The keyword arguments for the callable
1163
 
        """
1164
 
        call_warnings, result = self._capture_deprecation_warnings(callable,
1165
 
            *args, **kwargs)
1166
 
        self.assertEqual(expected, call_warnings)
1167
 
        return result
1168
 
 
1169
 
    def _startLogFile(self):
1170
 
        """Send bzr and test log messages to a temporary file.
1171
 
 
1172
 
        The file is removed as the test is torn down.
1173
 
        """
 
60
        bzrlib.trace.disable_default_logging()
 
61
        self._enable_file_logging()
 
62
 
 
63
 
 
64
    def _enable_file_logging(self):
1174
65
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
66
 
1175
67
        self._log_file = os.fdopen(fileno, 'w+')
1176
 
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
68
 
 
69
        hdlr = logging.StreamHandler(self._log_file)
 
70
        hdlr.setLevel(logging.DEBUG)
 
71
        hdlr.setFormatter(logging.Formatter('%(levelname)4.4s  %(message)s'))
 
72
        logging.getLogger('').addHandler(hdlr)
 
73
        logging.getLogger('').setLevel(logging.DEBUG)
 
74
        self._log_hdlr = hdlr
 
75
        debug('opened log file %s', name)
 
76
        
1177
77
        self._log_file_name = name
1178
 
        self.addCleanup(self._finishLogFile)
1179
 
 
1180
 
    def _finishLogFile(self):
1181
 
        """Finished with the log file.
1182
 
 
1183
 
        Close the file and delete it, unless setKeepLogfile was called.
1184
 
        """
1185
 
        if self._log_file is None:
1186
 
            return
1187
 
        bzrlib.trace.pop_log_file(self._log_memento)
 
78
 
 
79
    def tearDown(self):
 
80
        logging.getLogger('').removeHandler(self._log_hdlr)
 
81
        bzrlib.trace.enable_default_logging()
 
82
        logging.debug('%s teardown', self.id())
1188
83
        self._log_file.close()
1189
 
        self._log_file = None
1190
 
        if not self._keep_log_file:
1191
 
            os.remove(self._log_file_name)
1192
 
            self._log_file_name = None
1193
 
 
1194
 
    def setKeepLogfile(self):
1195
 
        """Make the logfile not be deleted when _finishLogFile is called."""
1196
 
        self._keep_log_file = True
1197
 
 
1198
 
    def addCleanup(self, callable):
1199
 
        """Arrange to run a callable when this case is torn down.
1200
 
 
1201
 
        Callables are run in the reverse of the order they are registered, 
1202
 
        ie last-in first-out.
1203
 
        """
1204
 
        if callable in self._cleanups:
1205
 
            raise ValueError("cleanup function %r already registered on %s" 
1206
 
                    % (callable, self))
1207
 
        self._cleanups.append(callable)
1208
 
 
1209
 
    def _cleanEnvironment(self):
1210
 
        new_env = {
1211
 
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1212
 
            'HOME': os.getcwd(),
1213
 
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
1214
 
            'BZR_EDITOR': None, # test_msgeditor manipulates this variable
1215
 
            'BZR_EMAIL': None,
1216
 
            'BZREMAIL': None, # may still be present in the environment
1217
 
            'EMAIL': None,
1218
 
            'BZR_PROGRESS_BAR': None,
1219
 
            'BZR_LOG': None,
1220
 
            # SSH Agent
1221
 
            'SSH_AUTH_SOCK': None,
1222
 
            # Proxies
1223
 
            'http_proxy': None,
1224
 
            'HTTP_PROXY': None,
1225
 
            'https_proxy': None,
1226
 
            'HTTPS_PROXY': None,
1227
 
            'no_proxy': None,
1228
 
            'NO_PROXY': None,
1229
 
            'all_proxy': None,
1230
 
            'ALL_PROXY': None,
1231
 
            # Nobody cares about these ones AFAIK. So far at
1232
 
            # least. If you do (care), please update this comment
1233
 
            # -- vila 20061212
1234
 
            'ftp_proxy': None,
1235
 
            'FTP_PROXY': None,
1236
 
            'BZR_REMOTE_PATH': None,
1237
 
        }
1238
 
        self.__old_env = {}
1239
 
        self.addCleanup(self._restoreEnvironment)
1240
 
        for name, value in new_env.iteritems():
1241
 
            self._captureVar(name, value)
1242
 
 
1243
 
    def _captureVar(self, name, newvalue):
1244
 
        """Set an environment variable, and reset it when finished."""
1245
 
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1246
 
 
1247
 
    def _restore_debug_flags(self):
1248
 
        debug.debug_flags.clear()
1249
 
        debug.debug_flags.update(self._preserved_debug_flags)
1250
 
 
1251
 
    def _restoreEnvironment(self):
1252
 
        for name, value in self.__old_env.iteritems():
1253
 
            osutils.set_or_unset_env(name, value)
1254
 
 
1255
 
    def _restoreHooks(self):
1256
 
        for klass, hooks in self._preserved_hooks.items():
1257
 
            setattr(klass, 'hooks', hooks)
1258
 
 
1259
 
    def knownFailure(self, reason):
1260
 
        """This test has failed for some known reason."""
1261
 
        raise KnownFailure(reason)
1262
 
 
1263
 
    def run(self, result=None):
1264
 
        if result is None: result = self.defaultTestResult()
1265
 
        for feature in getattr(self, '_test_needs_features', []):
1266
 
            if not feature.available():
1267
 
                result.startTest(self)
1268
 
                if getattr(result, 'addNotSupported', None):
1269
 
                    result.addNotSupported(self, feature)
1270
 
                else:
1271
 
                    result.addSuccess(self)
1272
 
                result.stopTest(self)
1273
 
                return
1274
 
        try:
1275
 
            return unittest.TestCase.run(self, result)
1276
 
        finally:
1277
 
            saved_attrs = {}
1278
 
            absent_attr = object()
1279
 
            for attr_name in self.attrs_to_keep:
1280
 
                attr = getattr(self, attr_name, absent_attr)
1281
 
                if attr is not absent_attr:
1282
 
                    saved_attrs[attr_name] = attr
1283
 
            self.__dict__ = saved_attrs
1284
 
 
1285
 
    def tearDown(self):
1286
 
        self._runCleanups()
1287
84
        unittest.TestCase.tearDown(self)
1288
85
 
1289
 
    def time(self, callable, *args, **kwargs):
1290
 
        """Run callable and accrue the time it takes to the benchmark time.
1291
 
        
1292
 
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1293
 
        this will cause lsprofile statistics to be gathered and stored in
1294
 
        self._benchcalls.
1295
 
        """
1296
 
        if self._benchtime is None:
1297
 
            self._benchtime = 0
1298
 
        start = time.time()
1299
 
        try:
1300
 
            if not self._gather_lsprof_in_benchmarks:
1301
 
                return callable(*args, **kwargs)
1302
 
            else:
1303
 
                # record this benchmark
1304
 
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1305
 
                stats.sort()
1306
 
                self._benchcalls.append(((callable, args, kwargs), stats))
1307
 
                return ret
1308
 
        finally:
1309
 
            self._benchtime += time.time() - start
1310
 
 
1311
 
    def _runCleanups(self):
1312
 
        """Run registered cleanup functions. 
1313
 
 
1314
 
        This should only be called from TestCase.tearDown.
1315
 
        """
1316
 
        # TODO: Perhaps this should keep running cleanups even if 
1317
 
        # one of them fails?
1318
 
 
1319
 
        # Actually pop the cleanups from the list so tearDown running
1320
 
        # twice is safe (this happens for skipped tests).
1321
 
        while self._cleanups:
1322
 
            self._cleanups.pop()()
1323
 
 
1324
86
    def log(self, *args):
1325
 
        mutter(*args)
1326
 
 
1327
 
    def _get_log(self, keep_log_file=False):
1328
 
        """Get the log from bzrlib.trace calls from this test.
1329
 
 
1330
 
        :param keep_log_file: When True, if the log is still a file on disk
1331
 
            leave it as a file on disk. When False, if the log is still a file
1332
 
            on disk, the log file is deleted and the log preserved as
1333
 
            self._log_contents.
1334
 
        :return: A string containing the log.
1335
 
        """
1336
 
        # flush the log file, to get all content
1337
 
        import bzrlib.trace
1338
 
        bzrlib.trace._trace_file.flush()
1339
 
        if self._log_contents:
1340
 
            # XXX: this can hardly contain the content flushed above --vila
1341
 
            # 20080128
1342
 
            return self._log_contents
1343
 
        if self._log_file_name is not None:
1344
 
            logfile = open(self._log_file_name)
1345
 
            try:
1346
 
                log_contents = logfile.read()
1347
 
            finally:
1348
 
                logfile.close()
1349
 
            if not keep_log_file:
1350
 
                self._log_contents = log_contents
1351
 
                try:
1352
 
                    os.remove(self._log_file_name)
1353
 
                except OSError, e:
1354
 
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
1355
 
                        sys.stderr.write(('Unable to delete log file '
1356
 
                                             ' %r\n' % self._log_file_name))
1357
 
                    else:
1358
 
                        raise
1359
 
            return log_contents
1360
 
        else:
1361
 
            return "DELETED log file to reduce memory footprint"
1362
 
 
1363
 
    def requireFeature(self, feature):
1364
 
        """This test requires a specific feature is available.
1365
 
 
1366
 
        :raises UnavailableFeature: When feature is not available.
1367
 
        """
1368
 
        if not feature.available():
1369
 
            raise UnavailableFeature(feature)
1370
 
 
1371
 
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1372
 
            working_dir):
1373
 
        """Run bazaar command line, splitting up a string command line."""
1374
 
        if isinstance(args, basestring):
1375
 
            # shlex don't understand unicode strings,
1376
 
            # so args should be plain string (bialix 20070906)
1377
 
            args = list(shlex.split(str(args)))
1378
 
        return self._run_bzr_core(args, retcode=retcode,
1379
 
                encoding=encoding, stdin=stdin, working_dir=working_dir,
1380
 
                )
1381
 
 
1382
 
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1383
 
            working_dir):
1384
 
        if encoding is None:
1385
 
            encoding = bzrlib.user_encoding
1386
 
        stdout = StringIOWrapper()
1387
 
        stderr = StringIOWrapper()
1388
 
        stdout.encoding = encoding
1389
 
        stderr.encoding = encoding
1390
 
 
1391
 
        self.log('run bzr: %r', args)
1392
 
        # FIXME: don't call into logging here
 
87
        logging.debug(*args)
 
88
 
 
89
    def _get_log(self):
 
90
        """Return as a string the log for this test"""
 
91
        return open(self._log_file_name).read()
 
92
 
 
93
 
 
94
    def capture(self, cmd):
 
95
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
96
        return self.run_bzr_captured(cmd.split())[0]
 
97
 
 
98
    def run_bzr_captured(self, argv, retcode=0):
 
99
        """Invoke bzr and return (result, stdout, stderr).
 
100
 
 
101
        Useful for code that wants to check the contents of the
 
102
        output, the way error messages are presented, etc.
 
103
 
 
104
        This should be the main method for tests that want to exercise the
 
105
        overall behavior of the bzr application (rather than a unit test
 
106
        or a functional test of the library.)
 
107
 
 
108
        Much of the old code runs bzr by forking a new copy of Python, but
 
109
        that is slower, harder to debug, and generally not necessary.
 
110
 
 
111
        This runs bzr through the interface that catches and reports
 
112
        errors, and with logging set to something approximating the
 
113
        default, so that error reporting can be checked.
 
114
 
 
115
        argv -- arguments to invoke bzr
 
116
        retcode -- expected return code, or None for don't-care.
 
117
        """
 
118
        stdout = StringIO()
 
119
        stderr = StringIO()
 
120
        self.log('run bzr: %s', ' '.join(argv))
1393
121
        handler = logging.StreamHandler(stderr)
 
122
        handler.setFormatter(bzrlib.trace.QuietFormatter())
1394
123
        handler.setLevel(logging.INFO)
1395
124
        logger = logging.getLogger('')
1396
125
        logger.addHandler(handler)
1397
 
        old_ui_factory = ui.ui_factory
1398
 
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1399
 
 
1400
 
        cwd = None
1401
 
        if working_dir is not None:
1402
 
            cwd = osutils.getcwd()
1403
 
            os.chdir(working_dir)
1404
 
 
1405
126
        try:
1406
 
            result = self.apply_redirected(ui.ui_factory.stdin,
1407
 
                stdout, stderr,
1408
 
                bzrlib.commands.run_bzr_catch_user_errors,
1409
 
                args)
 
127
            result = self.apply_redirected(None, stdout, stderr,
 
128
                                           bzrlib.commands.run_bzr_catch_errors,
 
129
                                           argv)
1410
130
        finally:
1411
131
            logger.removeHandler(handler)
1412
 
            ui.ui_factory = old_ui_factory
1413
 
            if cwd is not None:
1414
 
                os.chdir(cwd)
1415
 
 
1416
132
        out = stdout.getvalue()
1417
133
        err = stderr.getvalue()
1418
134
        if out:
1419
 
            self.log('output:\n%r', out)
 
135
            self.log('output:\n%s', out)
1420
136
        if err:
1421
 
            self.log('errors:\n%r', err)
 
137
            self.log('errors:\n%s', err)
1422
138
        if retcode is not None:
1423
 
            self.assertEquals(retcode, result,
1424
 
                              message='Unexpected return code')
 
139
            self.assertEquals(result, retcode)
1425
140
        return out, err
1426
141
 
1427
 
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1428
 
                working_dir=None, error_regexes=[], output_encoding=None):
 
142
    def run_bzr(self, *args, **kwargs):
1429
143
        """Invoke bzr, as if it were run from the command line.
1430
144
 
1431
 
        The argument list should not include the bzr program name - the
1432
 
        first argument is normally the bzr command.  Arguments may be
1433
 
        passed in three ways:
1434
 
 
1435
 
        1- A list of strings, eg ["commit", "a"].  This is recommended
1436
 
        when the command contains whitespace or metacharacters, or 
1437
 
        is built up at run time.
1438
 
 
1439
 
        2- A single string, eg "add a".  This is the most convenient 
1440
 
        for hardcoded commands.
1441
 
 
1442
 
        This runs bzr through the interface that catches and reports
1443
 
        errors, and with logging set to something approximating the
1444
 
        default, so that error reporting can be checked.
1445
 
 
1446
145
        This should be the main method for tests that want to exercise the
1447
146
        overall behavior of the bzr application (rather than a unit test
1448
147
        or a functional test of the library.)
1449
148
 
1450
149
        This sends the stdout/stderr results into the test's log,
1451
150
        where it may be useful for debugging.  See also run_captured.
1452
 
 
1453
 
        :keyword stdin: A string to be used as stdin for the command.
1454
 
        :keyword retcode: The status code the command should return;
1455
 
            default 0.
1456
 
        :keyword working_dir: The directory to run the command in
1457
 
        :keyword error_regexes: A list of expected error messages.  If
1458
 
            specified they must be seen in the error output of the command.
1459
 
        """
1460
 
        out, err = self._run_bzr_autosplit(
1461
 
            args=args,
1462
 
            retcode=retcode,
1463
 
            encoding=encoding,
1464
 
            stdin=stdin,
1465
 
            working_dir=working_dir,
1466
 
            )
1467
 
        for regex in error_regexes:
1468
 
            self.assertContainsRe(err, regex)
1469
 
        return out, err
1470
 
 
1471
 
    def run_bzr_error(self, error_regexes, *args, **kwargs):
1472
 
        """Run bzr, and check that stderr contains the supplied regexes
1473
 
 
1474
 
        :param error_regexes: Sequence of regular expressions which
1475
 
            must each be found in the error output. The relative ordering
1476
 
            is not enforced.
1477
 
        :param args: command-line arguments for bzr
1478
 
        :param kwargs: Keyword arguments which are interpreted by run_bzr
1479
 
            This function changes the default value of retcode to be 3,
1480
 
            since in most cases this is run when you expect bzr to fail.
1481
 
 
1482
 
        :return: (out, err) The actual output of running the command (in case
1483
 
            you want to do more inspection)
1484
 
 
1485
 
        Examples of use::
1486
 
 
1487
 
            # Make sure that commit is failing because there is nothing to do
1488
 
            self.run_bzr_error(['no changes to commit'],
1489
 
                               ['commit', '-m', 'my commit comment'])
1490
 
            # Make sure --strict is handling an unknown file, rather than
1491
 
            # giving us the 'nothing to do' error
1492
 
            self.build_tree(['unknown'])
1493
 
            self.run_bzr_error(['Commit refused because there are unknown files'],
1494
 
                               ['commit', --strict', '-m', 'my commit comment'])
1495
 
        """
1496
 
        kwargs.setdefault('retcode', 3)
1497
 
        kwargs['error_regexes'] = error_regexes
1498
 
        out, err = self.run_bzr(*args, **kwargs)
1499
 
        return out, err
1500
 
 
1501
 
    def run_bzr_subprocess(self, *args, **kwargs):
1502
 
        """Run bzr in a subprocess for testing.
1503
 
 
1504
 
        This starts a new Python interpreter and runs bzr in there. 
1505
 
        This should only be used for tests that have a justifiable need for
1506
 
        this isolation: e.g. they are testing startup time, or signal
1507
 
        handling, or early startup code, etc.  Subprocess code can't be 
1508
 
        profiled or debugged so easily.
1509
 
 
1510
 
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
1511
 
            None is supplied, the status code is not checked.
1512
 
        :keyword env_changes: A dictionary which lists changes to environment
1513
 
            variables. A value of None will unset the env variable.
1514
 
            The values must be strings. The change will only occur in the
1515
 
            child, so you don't need to fix the environment after running.
1516
 
        :keyword universal_newlines: Convert CRLF => LF
1517
 
        :keyword allow_plugins: By default the subprocess is run with
1518
 
            --no-plugins to ensure test reproducibility. Also, it is possible
1519
 
            for system-wide plugins to create unexpected output on stderr,
1520
 
            which can cause unnecessary test failures.
1521
 
        """
1522
 
        env_changes = kwargs.get('env_changes', {})
1523
 
        working_dir = kwargs.get('working_dir', None)
1524
 
        allow_plugins = kwargs.get('allow_plugins', False)
1525
 
        if len(args) == 1:
1526
 
            if isinstance(args[0], list):
1527
 
                args = args[0]
1528
 
            elif isinstance(args[0], basestring):
1529
 
                args = list(shlex.split(args[0]))
1530
 
        else:
1531
 
            raise ValueError("passing varargs to run_bzr_subprocess")
1532
 
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
1533
 
                                            working_dir=working_dir,
1534
 
                                            allow_plugins=allow_plugins)
1535
 
        # We distinguish between retcode=None and retcode not passed.
1536
 
        supplied_retcode = kwargs.get('retcode', 0)
1537
 
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1538
 
            universal_newlines=kwargs.get('universal_newlines', False),
1539
 
            process_args=args)
1540
 
 
1541
 
    def start_bzr_subprocess(self, process_args, env_changes=None,
1542
 
                             skip_if_plan_to_signal=False,
1543
 
                             working_dir=None,
1544
 
                             allow_plugins=False):
1545
 
        """Start bzr in a subprocess for testing.
1546
 
 
1547
 
        This starts a new Python interpreter and runs bzr in there.
1548
 
        This should only be used for tests that have a justifiable need for
1549
 
        this isolation: e.g. they are testing startup time, or signal
1550
 
        handling, or early startup code, etc.  Subprocess code can't be
1551
 
        profiled or debugged so easily.
1552
 
 
1553
 
        :param process_args: a list of arguments to pass to the bzr executable,
1554
 
            for example ``['--version']``.
1555
 
        :param env_changes: A dictionary which lists changes to environment
1556
 
            variables. A value of None will unset the env variable.
1557
 
            The values must be strings. The change will only occur in the
1558
 
            child, so you don't need to fix the environment after running.
1559
 
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1560
 
            is not available.
1561
 
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
1562
 
 
1563
 
        :returns: Popen object for the started process.
1564
 
        """
1565
 
        if skip_if_plan_to_signal:
1566
 
            if not getattr(os, 'kill', None):
1567
 
                raise TestSkipped("os.kill not available.")
1568
 
 
1569
 
        if env_changes is None:
1570
 
            env_changes = {}
1571
 
        old_env = {}
1572
 
 
1573
 
        def cleanup_environment():
1574
 
            for env_var, value in env_changes.iteritems():
1575
 
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1576
 
 
1577
 
        def restore_environment():
1578
 
            for env_var, value in old_env.iteritems():
1579
 
                osutils.set_or_unset_env(env_var, value)
1580
 
 
1581
 
        bzr_path = self.get_bzr_path()
1582
 
 
1583
 
        cwd = None
1584
 
        if working_dir is not None:
1585
 
            cwd = osutils.getcwd()
1586
 
            os.chdir(working_dir)
1587
 
 
1588
 
        try:
1589
 
            # win32 subprocess doesn't support preexec_fn
1590
 
            # so we will avoid using it on all platforms, just to
1591
 
            # make sure the code path is used, and we don't break on win32
1592
 
            cleanup_environment()
1593
 
            command = [sys.executable, bzr_path]
1594
 
            if not allow_plugins:
1595
 
                command.append('--no-plugins')
1596
 
            command.extend(process_args)
1597
 
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1598
 
        finally:
1599
 
            restore_environment()
1600
 
            if cwd is not None:
1601
 
                os.chdir(cwd)
1602
 
 
1603
 
        return process
1604
 
 
1605
 
    def _popen(self, *args, **kwargs):
1606
 
        """Place a call to Popen.
1607
 
 
1608
 
        Allows tests to override this method to intercept the calls made to
1609
 
        Popen for introspection.
1610
 
        """
1611
 
        return Popen(*args, **kwargs)
1612
 
 
1613
 
    def get_bzr_path(self):
1614
 
        """Return the path of the 'bzr' executable for this test suite."""
1615
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1616
 
        if not os.path.isfile(bzr_path):
1617
 
            # We are probably installed. Assume sys.argv is the right file
1618
 
            bzr_path = sys.argv[0]
1619
 
        return bzr_path
1620
 
 
1621
 
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1622
 
                              universal_newlines=False, process_args=None):
1623
 
        """Finish the execution of process.
1624
 
 
1625
 
        :param process: the Popen object returned from start_bzr_subprocess.
1626
 
        :param retcode: The status code that is expected.  Defaults to 0.  If
1627
 
            None is supplied, the status code is not checked.
1628
 
        :param send_signal: an optional signal to send to the process.
1629
 
        :param universal_newlines: Convert CRLF => LF
1630
 
        :returns: (stdout, stderr)
1631
 
        """
1632
 
        if send_signal is not None:
1633
 
            os.kill(process.pid, send_signal)
1634
 
        out, err = process.communicate()
1635
 
 
1636
 
        if universal_newlines:
1637
 
            out = out.replace('\r\n', '\n')
1638
 
            err = err.replace('\r\n', '\n')
1639
 
 
1640
 
        if retcode is not None and retcode != process.returncode:
1641
 
            if process_args is None:
1642
 
                process_args = "(unknown args)"
1643
 
            mutter('Output of bzr %s:\n%s', process_args, out)
1644
 
            mutter('Error for bzr %s:\n%s', process_args, err)
1645
 
            self.fail('Command bzr %s failed with retcode %s != %s'
1646
 
                      % (process_args, retcode, process.returncode))
1647
 
        return [out, err]
 
151
        """
 
152
        retcode = kwargs.pop('retcode', 0)
 
153
        return self.run_bzr_captured(args, retcode)
1648
154
 
1649
155
    def check_inventory_shape(self, inv, shape):
1650
 
        """Compare an inventory to a list of expected names.
 
156
        """
 
157
        Compare an inventory to a list of expected names.
1651
158
 
1652
159
        Fail if they are not precisely equal.
1653
160
        """
1655
162
        shape = list(shape)             # copy
1656
163
        for path, ie in inv.entries():
1657
164
            name = path.replace('\\', '/')
1658
 
            if ie.kind == 'directory':
 
165
            if ie.kind == 'dir':
1659
166
                name = name + '/'
1660
167
            if name in shape:
1661
168
                shape.remove(name)
1676
183
        if stdin is None:
1677
184
            stdin = StringIO("")
1678
185
        if stdout is None:
1679
 
            if getattr(self, "_log_file", None) is not None:
 
186
            if hasattr(self, "_log_file"):
1680
187
                stdout = self._log_file
1681
188
            else:
1682
189
                stdout = StringIO()
1683
190
        if stderr is None:
1684
 
            if getattr(self, "_log_file", None is not None):
 
191
            if hasattr(self, "_log_file"):
1685
192
                stderr = self._log_file
1686
193
            else:
1687
194
                stderr = StringIO()
1698
205
            sys.stderr = real_stderr
1699
206
            sys.stdin = real_stdin
1700
207
 
1701
 
    def reduceLockdirTimeout(self):
1702
 
        """Reduce the default lock timeout for the duration of the test, so that
1703
 
        if LockContention occurs during a test, it does so quickly.
1704
 
 
1705
 
        Tests that expect to provoke LockContention errors should call this.
1706
 
        """
1707
 
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1708
 
        def resetTimeout():
1709
 
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1710
 
        self.addCleanup(resetTimeout)
1711
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1712
 
 
1713
 
    def make_utf8_encoded_stringio(self, encoding_type=None):
1714
 
        """Return a StringIOWrapper instance, that will encode Unicode
1715
 
        input to UTF-8.
1716
 
        """
1717
 
        if encoding_type is None:
1718
 
            encoding_type = 'strict'
1719
 
        sio = StringIO()
1720
 
        output_encoding = 'utf-8'
1721
 
        sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1722
 
        sio.encoding = output_encoding
1723
 
        return sio
1724
 
 
1725
 
 
1726
 
class TestCaseWithMemoryTransport(TestCase):
1727
 
    """Common test class for tests that do not need disk resources.
1728
 
 
1729
 
    Tests that need disk resources should derive from TestCaseWithTransport.
1730
 
 
1731
 
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1732
 
 
1733
 
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1734
 
    a directory which does not exist. This serves to help ensure test isolation
1735
 
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1736
 
    must exist. However, TestCaseWithMemoryTransport does not offer local
1737
 
    file defaults for the transport in tests, nor does it obey the command line
1738
 
    override, so tests that accidentally write to the common directory should
1739
 
    be rare.
1740
 
 
1741
 
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
1742
 
    a .bzr directory that stops us ascending higher into the filesystem.
1743
 
    """
1744
 
 
1745
 
    TEST_ROOT = None
1746
 
    _TEST_NAME = 'test'
1747
 
 
1748
 
    def __init__(self, methodName='runTest'):
1749
 
        # allow test parameterization after test construction and before test
1750
 
        # execution. Variables that the parameterizer sets need to be 
1751
 
        # ones that are not set by setUp, or setUp will trash them.
1752
 
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
1753
 
        self.vfs_transport_factory = default_transport
1754
 
        self.transport_server = None
1755
 
        self.transport_readonly_server = None
1756
 
        self.__vfs_server = None
1757
 
 
1758
 
    def get_transport(self, relpath=None):
1759
 
        """Return a writeable transport.
1760
 
 
1761
 
        This transport is for the test scratch space relative to
1762
 
        "self._test_root"
1763
 
        
1764
 
        :param relpath: a path relative to the base url.
1765
 
        """
1766
 
        t = get_transport(self.get_url(relpath))
1767
 
        self.assertFalse(t.is_readonly())
1768
 
        return t
1769
 
 
1770
 
    def get_readonly_transport(self, relpath=None):
1771
 
        """Return a readonly transport for the test scratch space
1772
 
        
1773
 
        This can be used to test that operations which should only need
1774
 
        readonly access in fact do not try to write.
1775
 
 
1776
 
        :param relpath: a path relative to the base url.
1777
 
        """
1778
 
        t = get_transport(self.get_readonly_url(relpath))
1779
 
        self.assertTrue(t.is_readonly())
1780
 
        return t
1781
 
 
1782
 
    def create_transport_readonly_server(self):
1783
 
        """Create a transport server from class defined at init.
1784
 
 
1785
 
        This is mostly a hook for daughter classes.
1786
 
        """
1787
 
        return self.transport_readonly_server()
1788
 
 
1789
 
    def get_readonly_server(self):
1790
 
        """Get the server instance for the readonly transport
1791
 
 
1792
 
        This is useful for some tests with specific servers to do diagnostics.
1793
 
        """
1794
 
        if self.__readonly_server is None:
1795
 
            if self.transport_readonly_server is None:
1796
 
                # readonly decorator requested
1797
 
                # bring up the server
1798
 
                self.__readonly_server = ReadonlyServer()
1799
 
                self.__readonly_server.setUp(self.get_vfs_only_server())
1800
 
            else:
1801
 
                self.__readonly_server = self.create_transport_readonly_server()
1802
 
                self.__readonly_server.setUp(self.get_vfs_only_server())
1803
 
            self.addCleanup(self.__readonly_server.tearDown)
1804
 
        return self.__readonly_server
1805
 
 
1806
 
    def get_readonly_url(self, relpath=None):
1807
 
        """Get a URL for the readonly transport.
1808
 
 
1809
 
        This will either be backed by '.' or a decorator to the transport 
1810
 
        used by self.get_url()
1811
 
        relpath provides for clients to get a path relative to the base url.
1812
 
        These should only be downwards relative, not upwards.
1813
 
        """
1814
 
        base = self.get_readonly_server().get_url()
1815
 
        return self._adjust_url(base, relpath)
1816
 
 
1817
 
    def get_vfs_only_server(self):
1818
 
        """Get the vfs only read/write server instance.
1819
 
 
1820
 
        This is useful for some tests with specific servers that need
1821
 
        diagnostics.
1822
 
 
1823
 
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1824
 
        is no means to override it.
1825
 
        """
1826
 
        if self.__vfs_server is None:
1827
 
            self.__vfs_server = MemoryServer()
1828
 
            self.__vfs_server.setUp()
1829
 
            self.addCleanup(self.__vfs_server.tearDown)
1830
 
        return self.__vfs_server
1831
 
 
1832
 
    def get_server(self):
1833
 
        """Get the read/write server instance.
1834
 
 
1835
 
        This is useful for some tests with specific servers that need
1836
 
        diagnostics.
1837
 
 
1838
 
        This is built from the self.transport_server factory. If that is None,
1839
 
        then the self.get_vfs_server is returned.
1840
 
        """
1841
 
        if self.__server is None:
1842
 
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1843
 
                return self.get_vfs_only_server()
1844
 
            else:
1845
 
                # bring up a decorated means of access to the vfs only server.
1846
 
                self.__server = self.transport_server()
1847
 
                try:
1848
 
                    self.__server.setUp(self.get_vfs_only_server())
1849
 
                except TypeError, e:
1850
 
                    # This should never happen; the try:Except here is to assist
1851
 
                    # developers having to update code rather than seeing an
1852
 
                    # uninformative TypeError.
1853
 
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1854
 
            self.addCleanup(self.__server.tearDown)
1855
 
        return self.__server
1856
 
 
1857
 
    def _adjust_url(self, base, relpath):
1858
 
        """Get a URL (or maybe a path) for the readwrite transport.
1859
 
 
1860
 
        This will either be backed by '.' or to an equivalent non-file based
1861
 
        facility.
1862
 
        relpath provides for clients to get a path relative to the base url.
1863
 
        These should only be downwards relative, not upwards.
1864
 
        """
1865
 
        if relpath is not None and relpath != '.':
1866
 
            if not base.endswith('/'):
1867
 
                base = base + '/'
1868
 
            # XXX: Really base should be a url; we did after all call
1869
 
            # get_url()!  But sometimes it's just a path (from
1870
 
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
1871
 
            # to a non-escaped local path.
1872
 
            if base.startswith('./') or base.startswith('/'):
1873
 
                base += relpath
1874
 
            else:
1875
 
                base += urlutils.escape(relpath)
1876
 
        return base
1877
 
 
1878
 
    def get_url(self, relpath=None):
1879
 
        """Get a URL (or maybe a path) for the readwrite transport.
1880
 
 
1881
 
        This will either be backed by '.' or to an equivalent non-file based
1882
 
        facility.
1883
 
        relpath provides for clients to get a path relative to the base url.
1884
 
        These should only be downwards relative, not upwards.
1885
 
        """
1886
 
        base = self.get_server().get_url()
1887
 
        return self._adjust_url(base, relpath)
1888
 
 
1889
 
    def get_vfs_only_url(self, relpath=None):
1890
 
        """Get a URL (or maybe a path for the plain old vfs transport.
1891
 
 
1892
 
        This will never be a smart protocol.  It always has all the
1893
 
        capabilities of the local filesystem, but it might actually be a
1894
 
        MemoryTransport or some other similar virtual filesystem.
1895
 
 
1896
 
        This is the backing transport (if any) of the server returned by
1897
 
        get_url and get_readonly_url.
1898
 
 
1899
 
        :param relpath: provides for clients to get a path relative to the base
1900
 
            url.  These should only be downwards relative, not upwards.
1901
 
        :return: A URL
1902
 
        """
1903
 
        base = self.get_vfs_only_server().get_url()
1904
 
        return self._adjust_url(base, relpath)
1905
 
 
1906
 
    def _create_safety_net(self):
1907
 
        """Make a fake bzr directory.
1908
 
 
1909
 
        This prevents any tests propagating up onto the TEST_ROOT directory's
1910
 
        real branch.
1911
 
        """
1912
 
        root = TestCaseWithMemoryTransport.TEST_ROOT
1913
 
        bzrdir.BzrDir.create_standalone_workingtree(root)
1914
 
 
1915
 
    def _check_safety_net(self):
1916
 
        """Check that the safety .bzr directory have not been touched.
1917
 
 
1918
 
        _make_test_root have created a .bzr directory to prevent tests from
1919
 
        propagating. This method ensures than a test did not leaked.
1920
 
        """
1921
 
        root = TestCaseWithMemoryTransport.TEST_ROOT
1922
 
        wt = workingtree.WorkingTree.open(root)
1923
 
        last_rev = wt.last_revision()
1924
 
        if last_rev != 'null:':
1925
 
            # The current test have modified the /bzr directory, we need to
1926
 
            # recreate a new one or all the followng tests will fail.
1927
 
            # If you need to inspect its content uncomment the following line
1928
 
            # import pdb; pdb.set_trace()
1929
 
            _rmtree_temp_dir(root + '/.bzr')
1930
 
            self._create_safety_net()
1931
 
            raise AssertionError('%s/.bzr should not be modified' % root)
1932
 
 
1933
 
    def _make_test_root(self):
1934
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
1935
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
1936
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
1937
 
 
1938
 
            self._create_safety_net()
1939
 
 
1940
 
            # The same directory is used by all tests, and we're not
1941
 
            # specifically told when all tests are finished.  This will do.
1942
 
            atexit.register(_rmtree_temp_dir, root)
1943
 
 
1944
 
        self.addCleanup(self._check_safety_net)
1945
 
 
1946
 
    def makeAndChdirToTestDir(self):
1947
 
        """Create a temporary directories for this one test.
1948
 
        
1949
 
        This must set self.test_home_dir and self.test_dir and chdir to
1950
 
        self.test_dir.
1951
 
        
1952
 
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1953
 
        """
1954
 
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1955
 
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1956
 
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1957
 
        
1958
 
    def make_branch(self, relpath, format=None):
1959
 
        """Create a branch on the transport at relpath."""
1960
 
        repo = self.make_repository(relpath, format=format)
1961
 
        return repo.bzrdir.create_branch()
1962
 
 
1963
 
    def make_bzrdir(self, relpath, format=None):
1964
 
        try:
1965
 
            # might be a relative or absolute path
1966
 
            maybe_a_url = self.get_url(relpath)
1967
 
            segments = maybe_a_url.rsplit('/', 1)
1968
 
            t = get_transport(maybe_a_url)
1969
 
            if len(segments) > 1 and segments[-1] not in ('', '.'):
1970
 
                t.ensure_base()
1971
 
            if format is None:
1972
 
                format = 'default'
1973
 
            if isinstance(format, basestring):
1974
 
                format = bzrdir.format_registry.make_bzrdir(format)
1975
 
            return format.initialize_on_transport(t)
1976
 
        except errors.UninitializableFormat:
1977
 
            raise TestSkipped("Format %s is not initializable." % format)
1978
 
 
1979
 
    def make_repository(self, relpath, shared=False, format=None):
1980
 
        """Create a repository on our default transport at relpath.
1981
 
        
1982
 
        Note that relpath must be a relative path, not a full url.
1983
 
        """
1984
 
        # FIXME: If you create a remoterepository this returns the underlying
1985
 
        # real format, which is incorrect.  Actually we should make sure that 
1986
 
        # RemoteBzrDir returns a RemoteRepository.
1987
 
        # maybe  mbp 20070410
1988
 
        made_control = self.make_bzrdir(relpath, format=format)
1989
 
        return made_control.create_repository(shared=shared)
1990
 
 
1991
 
    def make_branch_and_memory_tree(self, relpath, format=None):
1992
 
        """Create a branch on the default transport and a MemoryTree for it."""
1993
 
        b = self.make_branch(relpath, format=format)
1994
 
        return memorytree.MemoryTree.create_on_branch(b)
1995
 
 
1996
 
    def overrideEnvironmentForTesting(self):
1997
 
        os.environ['HOME'] = self.test_home_dir
1998
 
        os.environ['BZR_HOME'] = self.test_home_dir
1999
 
        
2000
 
    def setUp(self):
2001
 
        super(TestCaseWithMemoryTransport, self).setUp()
2002
 
        self._make_test_root()
2003
 
        _currentdir = os.getcwdu()
2004
 
        def _leaveDirectory():
2005
 
            os.chdir(_currentdir)
2006
 
        self.addCleanup(_leaveDirectory)
2007
 
        self.makeAndChdirToTestDir()
2008
 
        self.overrideEnvironmentForTesting()
2009
 
        self.__readonly_server = None
2010
 
        self.__server = None
2011
 
        self.reduceLockdirTimeout()
 
208
 
 
209
BzrTestBase = TestCase
2012
210
 
2013
211
     
2014
 
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
212
class TestCaseInTempDir(TestCase):
2015
213
    """Derived class that runs a test within a temporary directory.
2016
214
 
2017
215
    This is useful for tests that need to create a branch, etc.
2021
219
    All test cases create their own directory within that.  If the
2022
220
    tests complete successfully, the directory is removed.
2023
221
 
2024
 
    :ivar test_base_dir: The path of the top-level directory for this 
2025
 
    test, which contains a home directory and a work directory.
2026
 
 
2027
 
    :ivar test_home_dir: An initially empty directory under test_base_dir
2028
 
    which is used as $HOME for this test.
2029
 
 
2030
 
    :ivar test_dir: A directory under test_base_dir used as the current
2031
 
    directory when the test proper is run.
 
222
    InTempDir is an old alias for FunctionalTestCase.
2032
223
    """
2033
224
 
 
225
    TEST_ROOT = None
 
226
    _TEST_NAME = 'test'
2034
227
    OVERRIDE_PYTHON = 'python'
2035
228
 
2036
229
    def check_file_contents(self, filename, expect):
2039
232
        if contents != expect:
2040
233
            self.log("expected: %r" % expect)
2041
234
            self.log("actually: %r" % contents)
2042
 
            self.fail("contents of %s not as expected" % filename)
2043
 
 
2044
 
    def makeAndChdirToTestDir(self):
2045
 
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2046
 
        
2047
 
        For TestCaseInTempDir we create a temporary directory based on the test
2048
 
        name and then create two subdirs - test and home under it.
2049
 
        """
2050
 
        # create a directory within the top level test directory
2051
 
        candidate_dir = osutils.mkdtemp(dir=self.TEST_ROOT)
2052
 
        # now create test and home directories within this dir
2053
 
        self.test_base_dir = candidate_dir
2054
 
        self.test_home_dir = self.test_base_dir + '/home'
2055
 
        os.mkdir(self.test_home_dir)
2056
 
        self.test_dir = self.test_base_dir + '/work'
 
235
            self.fail("contents of %s not as expected")
 
236
 
 
237
    def _make_test_root(self):
 
238
        if TestCaseInTempDir.TEST_ROOT is not None:
 
239
            return
 
240
        i = 0
 
241
        while True:
 
242
            root = 'test%04d.tmp' % i
 
243
            try:
 
244
                os.mkdir(root)
 
245
            except OSError, e:
 
246
                if e.errno == errno.EEXIST:
 
247
                    i += 1
 
248
                    continue
 
249
                else:
 
250
                    raise
 
251
            # successfully created
 
252
            TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
 
253
            break
 
254
        # make a fake bzr directory there to prevent any tests propagating
 
255
        # up onto the source directory's real branch
 
256
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
 
257
 
 
258
    def setUp(self):
 
259
        super(TestCaseInTempDir, self).setUp()
 
260
        self._make_test_root()
 
261
        self._currentdir = os.getcwdu()
 
262
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
2057
263
        os.mkdir(self.test_dir)
2058
264
        os.chdir(self.test_dir)
2059
 
        # put name of test inside
2060
 
        f = file(self.test_base_dir + '/name', 'w')
2061
 
        try:
2062
 
            f.write(self.id())
2063
 
        finally:
2064
 
            f.close()
2065
 
        self.addCleanup(self.deleteTestDir)
2066
 
 
2067
 
    def deleteTestDir(self):
2068
 
        os.chdir(self.TEST_ROOT)
2069
 
        _rmtree_temp_dir(self.test_base_dir)
2070
 
 
2071
 
    def build_tree(self, shape, line_endings='binary', transport=None):
 
265
        
 
266
    def tearDown(self):
 
267
        os.chdir(self._currentdir)
 
268
        super(TestCaseInTempDir, self).tearDown()
 
269
 
 
270
    def build_tree(self, shape):
2072
271
        """Build a test tree according to a pattern.
2073
272
 
2074
273
        shape is a sequence of file specifications.  If the final
2075
274
        character is '/', a directory is created.
2076
275
 
2077
 
        This assumes that all the elements in the tree being built are new.
2078
 
 
2079
276
        This doesn't add anything to a branch.
2080
 
 
2081
 
        :type shape:    list or tuple.
2082
 
        :param line_endings: Either 'binary' or 'native'
2083
 
            in binary mode, exact contents are written in native mode, the
2084
 
            line endings match the default platform endings.
2085
 
        :param transport: A transport to write to, for building trees on VFS's.
2086
 
            If the transport is readonly or None, "." is opened automatically.
2087
 
        :return: None
2088
277
        """
2089
 
        if type(shape) not in (list, tuple):
2090
 
            raise AssertionError("Parameter 'shape' should be "
2091
 
                "a list or a tuple. Got %r instead" % (shape,))
2092
 
        # It's OK to just create them using forward slashes on windows.
2093
 
        if transport is None or transport.is_readonly():
2094
 
            transport = get_transport(".")
 
278
        # XXX: It's OK to just create them using forward slashes on windows?
2095
279
        for name in shape:
2096
 
            self.assert_(isinstance(name, basestring))
 
280
            assert isinstance(name, basestring)
2097
281
            if name[-1] == '/':
2098
 
                transport.mkdir(urlutils.escape(name[:-1]))
2099
 
            else:
2100
 
                if line_endings == 'binary':
2101
 
                    end = '\n'
2102
 
                elif line_endings == 'native':
2103
 
                    end = os.linesep
2104
 
                else:
2105
 
                    raise errors.BzrError(
2106
 
                        'Invalid line ending request %r' % line_endings)
2107
 
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2108
 
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2109
 
 
2110
 
    def build_tree_contents(self, shape):
2111
 
        build_tree_contents(shape)
2112
 
 
2113
 
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2114
 
        """Assert whether path or paths are in the WorkingTree"""
2115
 
        if tree is None:
2116
 
            tree = workingtree.WorkingTree.open(root_path)
2117
 
        if not isinstance(path, basestring):
2118
 
            for p in path:
2119
 
                self.assertInWorkingTree(p,tree=tree)
2120
 
        else:
2121
 
            self.assertIsNot(tree.path2id(path), None,
2122
 
                path+' not in working tree.')
2123
 
 
2124
 
    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2125
 
        """Assert whether path or paths are not in the WorkingTree"""
2126
 
        if tree is None:
2127
 
            tree = workingtree.WorkingTree.open(root_path)
2128
 
        if not isinstance(path, basestring):
2129
 
            for p in path:
2130
 
                self.assertNotInWorkingTree(p,tree=tree)
2131
 
        else:
2132
 
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
2133
 
 
2134
 
 
2135
 
class TestCaseWithTransport(TestCaseInTempDir):
2136
 
    """A test case that provides get_url and get_readonly_url facilities.
2137
 
 
2138
 
    These back onto two transport servers, one for readonly access and one for
2139
 
    read write access.
2140
 
 
2141
 
    If no explicit class is provided for readonly access, a
2142
 
    ReadonlyTransportDecorator is used instead which allows the use of non disk
2143
 
    based read write transports.
2144
 
 
2145
 
    If an explicit class is provided for readonly access, that server and the 
2146
 
    readwrite one must both define get_url() as resolving to os.getcwd().
2147
 
    """
2148
 
 
2149
 
    def get_vfs_only_server(self):
2150
 
        """See TestCaseWithMemoryTransport.
2151
 
 
2152
 
        This is useful for some tests with specific servers that need
2153
 
        diagnostics.
2154
 
        """
2155
 
        if self.__vfs_server is None:
2156
 
            self.__vfs_server = self.vfs_transport_factory()
2157
 
            self.__vfs_server.setUp()
2158
 
            self.addCleanup(self.__vfs_server.tearDown)
2159
 
        return self.__vfs_server
2160
 
 
2161
 
    def make_branch_and_tree(self, relpath, format=None):
2162
 
        """Create a branch on the transport and a tree locally.
2163
 
 
2164
 
        If the transport is not a LocalTransport, the Tree can't be created on
2165
 
        the transport.  In that case if the vfs_transport_factory is
2166
 
        LocalURLServer the working tree is created in the local
2167
 
        directory backing the transport, and the returned tree's branch and
2168
 
        repository will also be accessed locally. Otherwise a lightweight
2169
 
        checkout is created and returned.
2170
 
 
2171
 
        :param format: The BzrDirFormat.
2172
 
        :returns: the WorkingTree.
2173
 
        """
2174
 
        # TODO: always use the local disk path for the working tree,
2175
 
        # this obviously requires a format that supports branch references
2176
 
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2177
 
        # RBC 20060208
2178
 
        b = self.make_branch(relpath, format=format)
2179
 
        try:
2180
 
            return b.bzrdir.create_workingtree()
2181
 
        except errors.NotLocalUrl:
2182
 
            # We can only make working trees locally at the moment.  If the
2183
 
            # transport can't support them, then we keep the non-disk-backed
2184
 
            # branch and create a local checkout.
2185
 
            if self.vfs_transport_factory is LocalURLServer:
2186
 
                # the branch is colocated on disk, we cannot create a checkout.
2187
 
                # hopefully callers will expect this.
2188
 
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2189
 
                return local_controldir.create_workingtree()
2190
 
            else:
2191
 
                return b.create_checkout(relpath, lightweight=True)
2192
 
 
2193
 
    def assertIsDirectory(self, relpath, transport):
2194
 
        """Assert that relpath within transport is a directory.
2195
 
 
2196
 
        This may not be possible on all transports; in that case it propagates
2197
 
        a TransportNotPossible.
2198
 
        """
2199
 
        try:
2200
 
            mode = transport.stat(relpath).st_mode
2201
 
        except errors.NoSuchFile:
2202
 
            self.fail("path %s is not a directory; no such file"
2203
 
                      % (relpath))
2204
 
        if not stat.S_ISDIR(mode):
2205
 
            self.fail("path %s is not a directory; has mode %#o"
2206
 
                      % (relpath, mode))
2207
 
 
2208
 
    def assertTreesEqual(self, left, right):
2209
 
        """Check that left and right have the same content and properties."""
2210
 
        # we use a tree delta to check for equality of the content, and we
2211
 
        # manually check for equality of other things such as the parents list.
2212
 
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2213
 
        differences = left.changes_from(right)
2214
 
        self.assertFalse(differences.has_changed(),
2215
 
            "Trees %r and %r are different: %r" % (left, right, differences))
2216
 
 
2217
 
    def setUp(self):
2218
 
        super(TestCaseWithTransport, self).setUp()
2219
 
        self.__vfs_server = None
2220
 
 
2221
 
 
2222
 
class ChrootedTestCase(TestCaseWithTransport):
2223
 
    """A support class that provides readonly urls outside the local namespace.
2224
 
 
2225
 
    This is done by checking if self.transport_server is a MemoryServer. if it
2226
 
    is then we are chrooted already, if it is not then an HttpServer is used
2227
 
    for readonly urls.
2228
 
 
2229
 
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2230
 
                       be used without needed to redo it when a different 
2231
 
                       subclass is in use ?
2232
 
    """
2233
 
 
2234
 
    def setUp(self):
2235
 
        super(ChrootedTestCase, self).setUp()
2236
 
        if not self.vfs_transport_factory == MemoryServer:
2237
 
            self.transport_readonly_server = HttpServer
2238
 
 
2239
 
 
2240
 
def condition_id_re(pattern):
2241
 
    """Create a condition filter which performs a re check on a test's id.
2242
 
    
2243
 
    :param pattern: A regular expression string.
2244
 
    :return: A callable that returns True if the re matches.
2245
 
    """
2246
 
    filter_re = re.compile(pattern)
2247
 
    def condition(test):
2248
 
        test_id = test.id()
2249
 
        return filter_re.search(test_id)
2250
 
    return condition
2251
 
 
2252
 
 
2253
 
def condition_isinstance(klass_or_klass_list):
2254
 
    """Create a condition filter which returns isinstance(param, klass).
2255
 
    
2256
 
    :return: A callable which when called with one parameter obj return the
2257
 
        result of isinstance(obj, klass_or_klass_list).
2258
 
    """
2259
 
    def condition(obj):
2260
 
        return isinstance(obj, klass_or_klass_list)
2261
 
    return condition
2262
 
 
2263
 
 
2264
 
def condition_id_in_list(id_list):
2265
 
    """Create a condition filter which verify that test's id in a list.
2266
 
    
2267
 
    :param id_list: A TestIdList object.
2268
 
    :return: A callable that returns True if the test's id appears in the list.
2269
 
    """
2270
 
    def condition(test):
2271
 
        return id_list.includes(test.id())
2272
 
    return condition
2273
 
 
2274
 
 
2275
 
def condition_id_startswith(start):
2276
 
    """Create a condition filter verifying that test's id starts with a string.
2277
 
    
2278
 
    :param start: A string.
2279
 
    :return: A callable that returns True if the test's id starts with the
2280
 
        given string.
2281
 
    """
2282
 
    def condition(test):
2283
 
        return test.id().startswith(start)
2284
 
    return condition
2285
 
 
2286
 
 
2287
 
def exclude_tests_by_condition(suite, condition):
2288
 
    """Create a test suite which excludes some tests from suite.
2289
 
 
2290
 
    :param suite: The suite to get tests from.
2291
 
    :param condition: A callable whose result evaluates True when called with a
2292
 
        test case which should be excluded from the result.
2293
 
    :return: A suite which contains the tests found in suite that fail
2294
 
        condition.
2295
 
    """
2296
 
    result = []
2297
 
    for test in iter_suite_tests(suite):
2298
 
        if not condition(test):
2299
 
            result.append(test)
2300
 
    return TestUtil.TestSuite(result)
2301
 
 
2302
 
 
2303
 
def filter_suite_by_condition(suite, condition):
2304
 
    """Create a test suite by filtering another one.
2305
 
    
2306
 
    :param suite: The source suite.
2307
 
    :param condition: A callable whose result evaluates True when called with a
2308
 
        test case which should be included in the result.
2309
 
    :return: A suite which contains the tests found in suite that pass
2310
 
        condition.
2311
 
    """ 
2312
 
    result = []
2313
 
    for test in iter_suite_tests(suite):
2314
 
        if condition(test):
2315
 
            result.append(test)
2316
 
    return TestUtil.TestSuite(result)
2317
 
 
2318
 
 
2319
 
def filter_suite_by_re(suite, pattern):
2320
 
    """Create a test suite by filtering another one.
2321
 
    
2322
 
    :param suite:           the source suite
2323
 
    :param pattern:         pattern that names must match
2324
 
    :returns: the newly created suite
2325
 
    """ 
2326
 
    condition = condition_id_re(pattern)
2327
 
    result_suite = filter_suite_by_condition(suite, condition)
2328
 
    return result_suite
2329
 
 
2330
 
 
2331
 
def filter_suite_by_id_list(suite, test_id_list):
2332
 
    """Create a test suite by filtering another one.
2333
 
 
2334
 
    :param suite: The source suite.
2335
 
    :param test_id_list: A list of the test ids to keep as strings.
2336
 
    :returns: the newly created suite
2337
 
    """
2338
 
    condition = condition_id_in_list(test_id_list)
2339
 
    result_suite = filter_suite_by_condition(suite, condition)
2340
 
    return result_suite
2341
 
 
2342
 
 
2343
 
def filter_suite_by_id_startswith(suite, start):
2344
 
    """Create a test suite by filtering another one.
2345
 
 
2346
 
    :param suite: The source suite.
2347
 
    :param start: A string the test id must start with.
2348
 
    :returns: the newly created suite
2349
 
    """
2350
 
    condition = condition_id_startswith(start)
2351
 
    result_suite = filter_suite_by_condition(suite, condition)
2352
 
    return result_suite
2353
 
 
2354
 
 
2355
 
def exclude_tests_by_re(suite, pattern):
2356
 
    """Create a test suite which excludes some tests from suite.
2357
 
 
2358
 
    :param suite: The suite to get tests from.
2359
 
    :param pattern: A regular expression string. Test ids that match this
2360
 
        pattern will be excluded from the result.
2361
 
    :return: A TestSuite that contains all the tests from suite without the
2362
 
        tests that matched pattern. The order of tests is the same as it was in
2363
 
        suite.
2364
 
    """
2365
 
    return exclude_tests_by_condition(suite, condition_id_re(pattern))
2366
 
 
2367
 
 
2368
 
def preserve_input(something):
2369
 
    """A helper for performing test suite transformation chains.
2370
 
 
2371
 
    :param something: Anything you want to preserve.
2372
 
    :return: Something.
2373
 
    """
2374
 
    return something
2375
 
 
2376
 
 
2377
 
def randomize_suite(suite):
2378
 
    """Return a new TestSuite with suite's tests in random order.
2379
 
    
2380
 
    The tests in the input suite are flattened into a single suite in order to
2381
 
    accomplish this. Any nested TestSuites are removed to provide global
2382
 
    randomness.
2383
 
    """
2384
 
    tests = list(iter_suite_tests(suite))
2385
 
    random.shuffle(tests)
2386
 
    return TestUtil.TestSuite(tests)
2387
 
 
2388
 
 
2389
 
def split_suite_by_condition(suite, condition):
2390
 
    """Split a test suite into two by a condition.
2391
 
    
2392
 
    :param suite: The suite to split.
2393
 
    :param condition: The condition to match on. Tests that match this
2394
 
        condition are returned in the first test suite, ones that do not match
2395
 
        are in the second suite.
2396
 
    :return: A tuple of two test suites, where the first contains tests from
2397
 
        suite matching the condition, and the second contains the remainder
2398
 
        from suite. The order within each output suite is the same as it was in
2399
 
        suite.
2400
 
    """ 
2401
 
    matched = []
2402
 
    did_not_match = []
2403
 
    for test in iter_suite_tests(suite):
2404
 
        if condition(test):
2405
 
            matched.append(test)
2406
 
        else:
2407
 
            did_not_match.append(test)
2408
 
    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2409
 
 
2410
 
 
2411
 
def split_suite_by_re(suite, pattern):
2412
 
    """Split a test suite into two by a regular expression.
2413
 
    
2414
 
    :param suite: The suite to split.
2415
 
    :param pattern: A regular expression string. Test ids that match this
2416
 
        pattern will be in the first test suite returned, and the others in the
2417
 
        second test suite returned.
2418
 
    :return: A tuple of two test suites, where the first contains tests from
2419
 
        suite matching pattern, and the second contains the remainder from
2420
 
        suite. The order within each output suite is the same as it was in
2421
 
        suite.
2422
 
    """ 
2423
 
    return split_suite_by_condition(suite, condition_id_re(pattern))
2424
 
 
2425
 
 
2426
 
def run_suite(suite, name='test', verbose=False, pattern=".*",
2427
 
              stop_on_failure=False,
2428
 
              transport=None, lsprof_timed=None, bench_history=None,
2429
 
              matching_tests_first=None,
2430
 
              list_only=False,
2431
 
              random_seed=None,
2432
 
              exclude_pattern=None,
2433
 
              strict=False):
2434
 
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2435
 
    if verbose:
2436
 
        verbosity = 2
2437
 
    else:
2438
 
        verbosity = 1
2439
 
    runner = TextTestRunner(stream=sys.stdout,
2440
 
                            descriptions=0,
2441
 
                            verbosity=verbosity,
2442
 
                            bench_history=bench_history,
2443
 
                            list_only=list_only,
2444
 
                            )
2445
 
    runner.stop_on_failure=stop_on_failure
2446
 
    # Initialise the random number generator and display the seed used.
2447
 
    # We convert the seed to a long to make it reuseable across invocations.
2448
 
    random_order = False
2449
 
    if random_seed is not None:
2450
 
        random_order = True
2451
 
        if random_seed == "now":
2452
 
            random_seed = long(time.time())
2453
 
        else:
2454
 
            # Convert the seed to a long if we can
2455
 
            try:
2456
 
                random_seed = long(random_seed)
2457
 
            except:
2458
 
                pass
2459
 
        runner.stream.writeln("Randomizing test order using seed %s\n" %
2460
 
            (random_seed))
2461
 
        random.seed(random_seed)
2462
 
    # Customise the list of tests if requested
2463
 
    if exclude_pattern is not None:
2464
 
        suite = exclude_tests_by_re(suite, exclude_pattern)
2465
 
    if random_order:
2466
 
        order_changer = randomize_suite
2467
 
    else:
2468
 
        order_changer = preserve_input
2469
 
    if pattern != '.*' or random_order:
2470
 
        if matching_tests_first:
2471
 
            suites = map(order_changer, split_suite_by_re(suite, pattern))
2472
 
            suite = TestUtil.TestSuite(suites)
2473
 
        else:
2474
 
            suite = order_changer(filter_suite_by_re(suite, pattern))
2475
 
 
2476
 
    result = runner.run(suite)
2477
 
 
2478
 
    if strict:
2479
 
        return result.wasStrictlySuccessful()
2480
 
 
2481
 
    return result.wasSuccessful()
2482
 
 
2483
 
 
2484
 
# Controlled by "bzr selftest -E=..." option
2485
 
selftest_debug_flags = set()
2486
 
 
2487
 
 
2488
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2489
 
             transport=None,
2490
 
             test_suite_factory=None,
2491
 
             lsprof_timed=None,
2492
 
             bench_history=None,
2493
 
             matching_tests_first=None,
2494
 
             list_only=False,
2495
 
             random_seed=None,
2496
 
             exclude_pattern=None,
2497
 
             strict=False,
2498
 
             load_list=None,
2499
 
             debug_flags=None,
2500
 
             starting_with=None,
2501
 
             ):
2502
 
    """Run the whole test suite under the enhanced runner"""
2503
 
    # XXX: Very ugly way to do this...
2504
 
    # Disable warning about old formats because we don't want it to disturb
2505
 
    # any blackbox tests.
2506
 
    from bzrlib import repository
2507
 
    repository._deprecation_warning_done = True
2508
 
 
2509
 
    global default_transport
2510
 
    if transport is None:
2511
 
        transport = default_transport
2512
 
    old_transport = default_transport
2513
 
    default_transport = transport
2514
 
    global selftest_debug_flags
2515
 
    old_debug_flags = selftest_debug_flags
2516
 
    if debug_flags is not None:
2517
 
        selftest_debug_flags = set(debug_flags)
2518
 
    try:
2519
 
        if load_list is None:
2520
 
            keep_only = None
2521
 
        else:
2522
 
            keep_only = load_test_id_list(load_list)
2523
 
        if test_suite_factory is None:
2524
 
            suite = test_suite(keep_only, starting_with)
2525
 
        else:
2526
 
            suite = test_suite_factory()
2527
 
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2528
 
                     stop_on_failure=stop_on_failure,
2529
 
                     transport=transport,
2530
 
                     lsprof_timed=lsprof_timed,
2531
 
                     bench_history=bench_history,
2532
 
                     matching_tests_first=matching_tests_first,
2533
 
                     list_only=list_only,
2534
 
                     random_seed=random_seed,
2535
 
                     exclude_pattern=exclude_pattern,
2536
 
                     strict=strict)
2537
 
    finally:
2538
 
        default_transport = old_transport
2539
 
        selftest_debug_flags = old_debug_flags
2540
 
 
2541
 
 
2542
 
def load_test_id_list(file_name):
2543
 
    """Load a test id list from a text file.
2544
 
 
2545
 
    The format is one test id by line.  No special care is taken to impose
2546
 
    strict rules, these test ids are used to filter the test suite so a test id
2547
 
    that do not match an existing test will do no harm. This allows user to add
2548
 
    comments, leave blank lines, etc.
2549
 
    """
2550
 
    test_list = []
2551
 
    try:
2552
 
        ftest = open(file_name, 'rt')
2553
 
    except IOError, e:
2554
 
        if e.errno != errno.ENOENT:
2555
 
            raise
2556
 
        else:
2557
 
            raise errors.NoSuchFile(file_name)
2558
 
 
2559
 
    for test_name in ftest.readlines():
2560
 
        test_list.append(test_name.strip())
2561
 
    ftest.close()
2562
 
    return test_list
2563
 
 
2564
 
 
2565
 
def suite_matches_id_list(test_suite, id_list):
2566
 
    """Warns about tests not appearing or appearing more than once.
2567
 
 
2568
 
    :param test_suite: A TestSuite object.
2569
 
    :param test_id_list: The list of test ids that should be found in 
2570
 
         test_suite.
2571
 
 
2572
 
    :return: (absents, duplicates) absents is a list containing the test found
2573
 
        in id_list but not in test_suite, duplicates is a list containing the
2574
 
        test found multiple times in test_suite.
2575
 
 
2576
 
    When using a prefined test id list, it may occurs that some tests do not
2577
 
    exist anymore or that some tests use the same id. This function warns the
2578
 
    tester about potential problems in his workflow (test lists are volatile)
2579
 
    or in the test suite itself (using the same id for several tests does not
2580
 
    help to localize defects).
2581
 
    """
2582
 
    # Build a dict counting id occurrences
2583
 
    tests = dict()
2584
 
    for test in iter_suite_tests(test_suite):
2585
 
        id = test.id()
2586
 
        tests[id] = tests.get(id, 0) + 1
2587
 
 
2588
 
    not_found = []
2589
 
    duplicates = []
2590
 
    for id in id_list:
2591
 
        occurs = tests.get(id, 0)
2592
 
        if not occurs:
2593
 
            not_found.append(id)
2594
 
        elif occurs > 1:
2595
 
            duplicates.append(id)
2596
 
 
2597
 
    return not_found, duplicates
2598
 
 
2599
 
 
2600
 
class TestIdList(object):
2601
 
    """Test id list to filter a test suite.
2602
 
 
2603
 
    Relying on the assumption that test ids are built as:
2604
 
    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2605
 
    notation, this class offers methods to :
2606
 
    - avoid building a test suite for modules not refered to in the test list,
2607
 
    - keep only the tests listed from the module test suite.
2608
 
    """
2609
 
 
2610
 
    def __init__(self, test_id_list):
2611
 
        # When a test suite needs to be filtered against us we compare test ids
2612
 
        # for equality, so a simple dict offers a quick and simple solution.
2613
 
        self.tests = dict().fromkeys(test_id_list, True)
2614
 
 
2615
 
        # While unittest.TestCase have ids like:
2616
 
        # <module>.<class>.<method>[(<param+)],
2617
 
        # doctest.DocTestCase can have ids like:
2618
 
        # <module>
2619
 
        # <module>.<class>
2620
 
        # <module>.<function>
2621
 
        # <module>.<class>.<method>
2622
 
 
2623
 
        # Since we can't predict a test class from its name only, we settle on
2624
 
        # a simple constraint: a test id always begins with its module name.
2625
 
 
2626
 
        modules = {}
2627
 
        for test_id in test_id_list:
2628
 
            parts = test_id.split('.')
2629
 
            mod_name = parts.pop(0)
2630
 
            modules[mod_name] = True
2631
 
            for part in parts:
2632
 
                mod_name += '.' + part
2633
 
                modules[mod_name] = True
2634
 
        self.modules = modules
2635
 
 
2636
 
    def refers_to(self, module_name):
2637
 
        """Is there tests for the module or one of its sub modules."""
2638
 
        return self.modules.has_key(module_name)
2639
 
 
2640
 
    def includes(self, test_id):
2641
 
        return self.tests.has_key(test_id)
2642
 
 
2643
 
 
2644
 
def test_suite(keep_only=None, starting_with=None):
2645
 
    """Build and return TestSuite for the whole of bzrlib.
2646
 
 
2647
 
    :param keep_only: A list of test ids limiting the suite returned.
2648
 
 
2649
 
    :param starting_with: An id limiting the suite returned to the tests
2650
 
         starting with it.
2651
 
 
2652
 
    This function can be replaced if you need to change the default test
2653
 
    suite on a global basis, but it is not encouraged.
2654
 
    """
2655
 
    testmod_names = [
2656
 
                   'bzrlib.doc',
2657
 
                   'bzrlib.util.tests.test_bencode',
2658
 
                   'bzrlib.tests.blackbox',
2659
 
                   'bzrlib.tests.branch_implementations',
2660
 
                   'bzrlib.tests.bzrdir_implementations',
2661
 
                   'bzrlib.tests.commands',
2662
 
                   'bzrlib.tests.inventory_implementations',
2663
 
                   'bzrlib.tests.interrepository_implementations',
2664
 
                   'bzrlib.tests.intertree_implementations',
2665
 
                   'bzrlib.tests.interversionedfile_implementations',
2666
 
                   'bzrlib.tests.per_lock',
2667
 
                   'bzrlib.tests.repository_implementations',
2668
 
                   'bzrlib.tests.revisionstore_implementations',
2669
 
                   'bzrlib.tests.test__dirstate_helpers',
2670
 
                   'bzrlib.tests.test_ancestry',
2671
 
                   'bzrlib.tests.test_annotate',
2672
 
                   'bzrlib.tests.test_api',
2673
 
                   'bzrlib.tests.test_atomicfile',
2674
 
                   'bzrlib.tests.test_bad_files',
2675
 
                   'bzrlib.tests.test_bisect_multi',
2676
 
                   'bzrlib.tests.test_branch',
2677
 
                   'bzrlib.tests.test_branchbuilder',
2678
 
                   'bzrlib.tests.test_bugtracker',
2679
 
                   'bzrlib.tests.test_bundle',
2680
 
                   'bzrlib.tests.test_bzrdir',
2681
 
                   'bzrlib.tests.test_cache_utf8',
2682
 
                   'bzrlib.tests.test_commands',
2683
 
                   'bzrlib.tests.test_commit',
2684
 
                   'bzrlib.tests.test_commit_merge',
2685
 
                   'bzrlib.tests.test_config',
2686
 
                   'bzrlib.tests.test_conflicts',
2687
 
                   'bzrlib.tests.test_counted_lock',
2688
 
                   'bzrlib.tests.test_decorators',
2689
 
                   'bzrlib.tests.test_delta',
2690
 
                   'bzrlib.tests.test_deprecated_graph',
2691
 
                   'bzrlib.tests.test_diff',
2692
 
                   'bzrlib.tests.test_dirstate',
2693
 
                   'bzrlib.tests.test_directory_service',
2694
 
                   'bzrlib.tests.test_email_message',
2695
 
                   'bzrlib.tests.test_errors',
2696
 
                   'bzrlib.tests.test_escaped_store',
2697
 
                   'bzrlib.tests.test_extract',
2698
 
                   'bzrlib.tests.test_fetch',
2699
 
                   'bzrlib.tests.test_ftp_transport',
2700
 
                   'bzrlib.tests.test_generate_docs',
2701
 
                   'bzrlib.tests.test_generate_ids',
2702
 
                   'bzrlib.tests.test_globbing',
2703
 
                   'bzrlib.tests.test_gpg',
2704
 
                   'bzrlib.tests.test_graph',
2705
 
                   'bzrlib.tests.test_hashcache',
2706
 
                   'bzrlib.tests.test_help',
2707
 
                   'bzrlib.tests.test_hooks',
2708
 
                   'bzrlib.tests.test_http',
2709
 
                   'bzrlib.tests.test_http_implementations',
2710
 
                   'bzrlib.tests.test_http_response',
2711
 
                   'bzrlib.tests.test_https_ca_bundle',
2712
 
                   'bzrlib.tests.test_identitymap',
2713
 
                   'bzrlib.tests.test_ignores',
2714
 
                   'bzrlib.tests.test_index',
2715
 
                   'bzrlib.tests.test_info',
2716
 
                   'bzrlib.tests.test_inv',
2717
 
                   'bzrlib.tests.test_knit',
2718
 
                   'bzrlib.tests.test_lazy_import',
2719
 
                   'bzrlib.tests.test_lazy_regex',
2720
 
                   'bzrlib.tests.test_lockdir',
2721
 
                   'bzrlib.tests.test_lockable_files',
2722
 
                   'bzrlib.tests.test_log',
2723
 
                   'bzrlib.tests.test_lsprof',
2724
 
                   'bzrlib.tests.test_lru_cache',
2725
 
                   'bzrlib.tests.test_mail_client',
2726
 
                   'bzrlib.tests.test_memorytree',
2727
 
                   'bzrlib.tests.test_merge',
2728
 
                   'bzrlib.tests.test_merge3',
2729
 
                   'bzrlib.tests.test_merge_core',
2730
 
                   'bzrlib.tests.test_merge_directive',
2731
 
                   'bzrlib.tests.test_missing',
2732
 
                   'bzrlib.tests.test_msgeditor',
2733
 
                   'bzrlib.tests.test_multiparent',
2734
 
                   'bzrlib.tests.test_mutabletree',
2735
 
                   'bzrlib.tests.test_nonascii',
2736
 
                   'bzrlib.tests.test_options',
2737
 
                   'bzrlib.tests.test_osutils',
2738
 
                   'bzrlib.tests.test_osutils_encodings',
2739
 
                   'bzrlib.tests.test_pack',
2740
 
                   'bzrlib.tests.test_patch',
2741
 
                   'bzrlib.tests.test_patches',
2742
 
                   'bzrlib.tests.test_permissions',
2743
 
                   'bzrlib.tests.test_plugins',
2744
 
                   'bzrlib.tests.test_progress',
2745
 
                   'bzrlib.tests.test_read_bundle',
2746
 
                   'bzrlib.tests.test_reconfigure',
2747
 
                   'bzrlib.tests.test_reconcile',
2748
 
                   'bzrlib.tests.test_registry',
2749
 
                   'bzrlib.tests.test_remote',
2750
 
                   'bzrlib.tests.test_repository',
2751
 
                   'bzrlib.tests.per_repository_reference',
2752
 
                   'bzrlib.tests.test_revert',
2753
 
                   'bzrlib.tests.test_revision',
2754
 
                   'bzrlib.tests.test_revisionspec',
2755
 
                   'bzrlib.tests.test_revisiontree',
2756
 
                   'bzrlib.tests.test_rio',
2757
 
                   'bzrlib.tests.test_sampler',
2758
 
                   'bzrlib.tests.test_selftest',
2759
 
                   'bzrlib.tests.test_setup',
2760
 
                   'bzrlib.tests.test_sftp_transport',
2761
 
                   'bzrlib.tests.test_smart',
2762
 
                   'bzrlib.tests.test_smart_add',
2763
 
                   'bzrlib.tests.test_smart_transport',
2764
 
                   'bzrlib.tests.test_smtp_connection',
2765
 
                   'bzrlib.tests.test_source',
2766
 
                   'bzrlib.tests.test_ssh_transport',
2767
 
                   'bzrlib.tests.test_status',
2768
 
                   'bzrlib.tests.test_store',
2769
 
                   'bzrlib.tests.test_strace',
2770
 
                   'bzrlib.tests.test_subsume',
2771
 
                   'bzrlib.tests.test_switch',
2772
 
                   'bzrlib.tests.test_symbol_versioning',
2773
 
                   'bzrlib.tests.test_tag',
2774
 
                   'bzrlib.tests.test_testament',
2775
 
                   'bzrlib.tests.test_textfile',
2776
 
                   'bzrlib.tests.test_textmerge',
2777
 
                   'bzrlib.tests.test_timestamp',
2778
 
                   'bzrlib.tests.test_trace',
2779
 
                   'bzrlib.tests.test_transactions',
2780
 
                   'bzrlib.tests.test_transform',
2781
 
                   'bzrlib.tests.test_transport',
2782
 
                   'bzrlib.tests.test_transport_implementations',
2783
 
                   'bzrlib.tests.test_tree',
2784
 
                   'bzrlib.tests.test_treebuilder',
2785
 
                   'bzrlib.tests.test_tsort',
2786
 
                   'bzrlib.tests.test_tuned_gzip',
2787
 
                   'bzrlib.tests.test_ui',
2788
 
                   'bzrlib.tests.test_uncommit',
2789
 
                   'bzrlib.tests.test_upgrade',
2790
 
                   'bzrlib.tests.test_urlutils',
2791
 
                   'bzrlib.tests.test_versionedfile',
2792
 
                   'bzrlib.tests.test_version',
2793
 
                   'bzrlib.tests.test_version_info',
2794
 
                   'bzrlib.tests.test_weave',
2795
 
                   'bzrlib.tests.test_whitebox',
2796
 
                   'bzrlib.tests.test_win32utils',
2797
 
                   'bzrlib.tests.test_workingtree',
2798
 
                   'bzrlib.tests.test_workingtree_4',
2799
 
                   'bzrlib.tests.test_wsgi',
2800
 
                   'bzrlib.tests.test_xml',
2801
 
                   'bzrlib.tests.tree_implementations',
2802
 
                   'bzrlib.tests.workingtree_implementations',
 
282
                os.mkdir(name[:-1])
 
283
            else:
 
284
                f = file(name, 'wt')
 
285
                print >>f, "contents of", name
 
286
                f.close()
 
287
 
 
288
 
 
289
class MetaTestLog(TestCase):
 
290
    def test_logging(self):
 
291
        """Test logs are captured when a test fails."""
 
292
        logging.info('an info message')
 
293
        warning('something looks dodgy...')
 
294
        logging.debug('hello, test is running')
 
295
        ##assert 0
 
296
 
 
297
 
 
298
def selftest(verbose=False, pattern=".*"):
 
299
    return testsweet.run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
 
300
 
 
301
 
 
302
def test_suite():
 
303
    from bzrlib.selftest.TestUtil import TestLoader, TestSuite
 
304
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
 
305
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
 
306
    from doctest import DocTestSuite
 
307
 
 
308
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
 
309
 
 
310
    testmod_names = \
 
311
                  ['bzrlib.selftest.MetaTestLog',
 
312
                   'bzrlib.selftest.test_parent',
 
313
                   'bzrlib.selftest.testinv',
 
314
                   'bzrlib.selftest.testfetch',
 
315
                   'bzrlib.selftest.versioning',
 
316
                   'bzrlib.selftest.whitebox',
 
317
                   'bzrlib.selftest.testmerge3',
 
318
                   'bzrlib.selftest.testmerge',
 
319
                   'bzrlib.selftest.testhashcache',
 
320
                   'bzrlib.selftest.teststatus',
 
321
                   'bzrlib.selftest.testlog',
 
322
                   'bzrlib.selftest.blackbox',
 
323
                   'bzrlib.selftest.testrevisionnamespaces',
 
324
                   'bzrlib.selftest.testbranch',
 
325
                   'bzrlib.selftest.testremotebranch',
 
326
                   'bzrlib.selftest.testrevision',
 
327
                   'bzrlib.selftest.test_revision_info',
 
328
                   'bzrlib.selftest.test_merge_core',
 
329
                   'bzrlib.selftest.test_smart_add',
 
330
                   'bzrlib.selftest.testdiff',
 
331
                   'bzrlib.selftest.test_xml',
 
332
                   'bzrlib.fetch',
 
333
                   'bzrlib.selftest.teststore',
 
334
                   'bzrlib.selftest.testgraph',
2803
335
                   ]
2804
336
 
2805
 
    loader = TestUtil.TestLoader()
2806
 
 
2807
 
    if starting_with is not None:
2808
 
        # We take precedence over keep_only because *at loading time* using
2809
 
        # both options means we will load less tests for the same final result.
2810
 
        def interesting_module(name):
2811
 
            return (
2812
 
                # Either the module name starts with the specified string
2813
 
                name.startswith(starting_with)
2814
 
                # or it may contain tests starting with the specified string
2815
 
                or starting_with.startswith(name)
2816
 
                )
2817
 
        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
2818
 
 
2819
 
    elif keep_only is not None:
2820
 
        id_filter = TestIdList(keep_only)
2821
 
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
2822
 
        def interesting_module(name):
2823
 
            return id_filter.refers_to(name)
2824
 
 
2825
 
    else:
2826
 
        loader = TestUtil.TestLoader()
2827
 
        def interesting_module(name):
2828
 
            # No filtering, all modules are interesting
2829
 
            return True
2830
 
 
2831
 
    suite = loader.suiteClass()
2832
 
 
2833
 
    # modules building their suite with loadTestsFromModuleNames
2834
 
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2835
 
 
2836
 
    modules_to_doctest = [
2837
 
        'bzrlib',
2838
 
        'bzrlib.errors',
2839
 
        'bzrlib.export',
2840
 
        'bzrlib.inventory',
2841
 
        'bzrlib.iterablefile',
2842
 
        'bzrlib.lockdir',
2843
 
        'bzrlib.merge3',
2844
 
        'bzrlib.option',
2845
 
        'bzrlib.store',
2846
 
        'bzrlib.symbol_versioning',
2847
 
        'bzrlib.tests',
2848
 
        'bzrlib.timestamp',
2849
 
        'bzrlib.version_info_formats.format_custom',
2850
 
        ]
2851
 
 
2852
 
    for mod in modules_to_doctest:
2853
 
        if not interesting_module(mod):
2854
 
            # No tests to keep here, move along
2855
 
            continue
2856
 
        try:
2857
 
            doc_suite = doctest.DocTestSuite(mod)
2858
 
        except ValueError, e:
2859
 
            print '**failed to get doctest for: %s\n%s' % (mod, e)
2860
 
            raise
2861
 
        suite.addTest(doc_suite)
2862
 
 
2863
 
    default_encoding = sys.getdefaultencoding()
2864
 
    for name, plugin in bzrlib.plugin.plugins().items():
2865
 
        if not interesting_module(plugin.module.__name__):
2866
 
            continue
2867
 
        plugin_suite = plugin.test_suite()
2868
 
        # We used to catch ImportError here and turn it into just a warning,
2869
 
        # but really if you don't have --no-plugins this should be a failure.
2870
 
        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
2871
 
        if plugin_suite is None:
2872
 
            plugin_suite = plugin.load_plugin_tests(loader)
2873
 
        if plugin_suite is not None:
2874
 
            suite.addTest(plugin_suite)
2875
 
        if default_encoding != sys.getdefaultencoding():
2876
 
            bzrlib.trace.warning(
2877
 
                'Plugin "%s" tried to reset default encoding to: %s', name,
2878
 
                sys.getdefaultencoding())
2879
 
            reload(sys)
2880
 
            sys.setdefaultencoding(default_encoding)
2881
 
 
2882
 
    if starting_with is not None:
2883
 
        suite = filter_suite_by_id_startswith(suite, starting_with)
2884
 
 
2885
 
    if keep_only is not None:
2886
 
        # Now that the referred modules have loaded their tests, keep only the
2887
 
        # requested ones.
2888
 
        suite = filter_suite_by_id_list(suite, id_filter)
2889
 
        # Do some sanity checks on the id_list filtering
2890
 
        not_found, duplicates = suite_matches_id_list(suite, keep_only)
2891
 
        if starting_with is not None:
2892
 
            # The tester has used both keep_only and starting_with, so he is
2893
 
            # already aware that some tests are excluded from the list, there
2894
 
            # is no need to tell him which.
2895
 
            pass
2896
 
        else:
2897
 
            # Some tests mentioned in the list are not in the test suite. The
2898
 
            # list may be out of date, report to the tester.
2899
 
            for id in not_found:
2900
 
                bzrlib.trace.warning('"%s" not found in the test suite', id)
2901
 
        for id in duplicates:
2902
 
            bzrlib.trace.warning('"%s" is used as an id by several tests', id)
2903
 
 
2904
 
    return suite
2905
 
 
2906
 
 
2907
 
def multiply_tests_from_modules(module_name_list, scenario_iter, loader=None):
2908
 
    """Adapt all tests in some given modules to given scenarios.
2909
 
 
2910
 
    This is the recommended public interface for test parameterization.
2911
 
    Typically the test_suite() method for a per-implementation test
2912
 
    suite will call multiply_tests_from_modules and return the 
2913
 
    result.
2914
 
 
2915
 
    :param module_name_list: List of fully-qualified names of test
2916
 
        modules.
2917
 
    :param scenario_iter: Iterable of pairs of (scenario_name, 
2918
 
        scenario_param_dict).
2919
 
    :param loader: If provided, will be used instead of a new 
2920
 
        bzrlib.tests.TestLoader() instance.
2921
 
 
2922
 
    This returns a new TestSuite containing the cross product of
2923
 
    all the tests in all the modules, each repeated for each scenario.
2924
 
    Each test is adapted by adding the scenario name at the end 
2925
 
    of its name, and updating the test object's __dict__ with the
2926
 
    scenario_param_dict.
2927
 
 
2928
 
    >>> r = multiply_tests_from_modules(
2929
 
    ...     ['bzrlib.tests.test_sampler'],
2930
 
    ...     [('one', dict(param=1)), 
2931
 
    ...      ('two', dict(param=2))])
2932
 
    >>> tests = list(iter_suite_tests(r))
2933
 
    >>> len(tests)
2934
 
    2
2935
 
    >>> tests[0].id()
2936
 
    'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
2937
 
    >>> tests[0].param
2938
 
    1
2939
 
    >>> tests[1].param
2940
 
    2
2941
 
    """
2942
 
    # XXX: Isn't load_tests() a better way to provide the same functionality
2943
 
    # without forcing a predefined TestScenarioApplier ? --vila 080215
2944
 
    if loader is None:
2945
 
        loader = TestUtil.TestLoader()
2946
 
 
2947
 
    suite = loader.suiteClass()
2948
 
 
2949
 
    adapter = TestScenarioApplier()
2950
 
    adapter.scenarios = list(scenario_iter)
2951
 
    adapt_modules(module_name_list, adapter, loader, suite)
2952
 
    return suite
2953
 
 
2954
 
 
2955
 
def multiply_scenarios(scenarios_left, scenarios_right):
2956
 
    """Multiply two sets of scenarios.
2957
 
 
2958
 
    :returns: the cartesian product of the two sets of scenarios, that is
2959
 
        a scenario for every possible combination of a left scenario and a
2960
 
        right scenario.
2961
 
    """
2962
 
    return [
2963
 
        ('%s,%s' % (left_name, right_name),
2964
 
         dict(left_dict.items() + right_dict.items()))
2965
 
        for left_name, left_dict in scenarios_left
2966
 
        for right_name, right_dict in scenarios_right]
2967
 
 
2968
 
 
2969
 
 
2970
 
def adapt_modules(mods_list, adapter, loader, suite):
2971
 
    """Adapt the modules in mods_list using adapter and add to suite."""
2972
 
    tests = loader.loadTestsFromModuleNames(mods_list)
2973
 
    adapt_tests(tests, adapter, suite)
2974
 
 
2975
 
 
2976
 
def adapt_tests(tests_list, adapter, suite):
2977
 
    """Adapt the tests in tests_list using adapter and add to suite."""
2978
 
    for test in iter_suite_tests(tests_list):
2979
 
        suite.addTests(adapter.adapt(test))
2980
 
 
2981
 
 
2982
 
def _rmtree_temp_dir(dirname):
2983
 
    # If LANG=C we probably have created some bogus paths
2984
 
    # which rmtree(unicode) will fail to delete
2985
 
    # so make sure we are using rmtree(str) to delete everything
2986
 
    # except on win32, where rmtree(str) will fail
2987
 
    # since it doesn't have the property of byte-stream paths
2988
 
    # (they are either ascii or mbcs)
2989
 
    if sys.platform == 'win32':
2990
 
        # make sure we are using the unicode win32 api
2991
 
        dirname = unicode(dirname)
2992
 
    else:
2993
 
        dirname = dirname.encode(sys.getfilesystemencoding())
2994
 
    try:
2995
 
        osutils.rmtree(dirname)
2996
 
    except OSError, e:
2997
 
        if sys.platform == 'win32' and e.errno == errno.EACCES:
2998
 
            sys.stderr.write(('Permission denied: '
2999
 
                                 'unable to remove testing dir '
3000
 
                                 '%s\n' % os.path.basename(dirname)))
3001
 
        else:
3002
 
            raise
3003
 
 
3004
 
 
3005
 
class Feature(object):
3006
 
    """An operating system Feature."""
3007
 
 
3008
 
    def __init__(self):
3009
 
        self._available = None
3010
 
 
3011
 
    def available(self):
3012
 
        """Is the feature available?
3013
 
 
3014
 
        :return: True if the feature is available.
3015
 
        """
3016
 
        if self._available is None:
3017
 
            self._available = self._probe()
3018
 
        return self._available
3019
 
 
3020
 
    def _probe(self):
3021
 
        """Implement this method in concrete features.
3022
 
 
3023
 
        :return: True if the feature is available.
3024
 
        """
3025
 
        raise NotImplementedError
3026
 
 
3027
 
    def __str__(self):
3028
 
        if getattr(self, 'feature_name', None):
3029
 
            return self.feature_name()
3030
 
        return self.__class__.__name__
3031
 
 
3032
 
 
3033
 
class _SymlinkFeature(Feature):
3034
 
 
3035
 
    def _probe(self):
3036
 
        return osutils.has_symlinks()
3037
 
 
3038
 
    def feature_name(self):
3039
 
        return 'symlinks'
3040
 
 
3041
 
SymlinkFeature = _SymlinkFeature()
3042
 
 
3043
 
 
3044
 
class _HardlinkFeature(Feature):
3045
 
 
3046
 
    def _probe(self):
3047
 
        return osutils.has_hardlinks()
3048
 
 
3049
 
    def feature_name(self):
3050
 
        return 'hardlinks'
3051
 
 
3052
 
HardlinkFeature = _HardlinkFeature()
3053
 
 
3054
 
 
3055
 
class _OsFifoFeature(Feature):
3056
 
 
3057
 
    def _probe(self):
3058
 
        return getattr(os, 'mkfifo', None)
3059
 
 
3060
 
    def feature_name(self):
3061
 
        return 'filesystem fifos'
3062
 
 
3063
 
OsFifoFeature = _OsFifoFeature()
3064
 
 
3065
 
 
3066
 
class _UnicodeFilenameFeature(Feature):
3067
 
    """Does the filesystem support Unicode filenames?"""
3068
 
 
3069
 
    def _probe(self):
3070
 
        try:
3071
 
            os.stat(u'\u03b1')
3072
 
        except UnicodeEncodeError:
3073
 
            return False
3074
 
        except (IOError, OSError):
3075
 
            # The filesystem allows the Unicode filename but the file doesn't
3076
 
            # exist.
3077
 
            return True
3078
 
        else:
3079
 
            # The filesystem allows the Unicode filename and the file exists,
3080
 
            # for some reason.
3081
 
            return True
3082
 
 
3083
 
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3084
 
 
3085
 
 
3086
 
class TestScenarioApplier(object):
3087
 
    """A tool to apply scenarios to tests."""
3088
 
 
3089
 
    def adapt(self, test):
3090
 
        """Return a TestSuite containing a copy of test for each scenario."""
3091
 
        result = unittest.TestSuite()
3092
 
        for scenario in self.scenarios:
3093
 
            result.addTest(self.adapt_test_to_scenario(test, scenario))
3094
 
        return result
3095
 
 
3096
 
    def adapt_test_to_scenario(self, test, scenario):
3097
 
        """Copy test and apply scenario to it.
3098
 
 
3099
 
        :param test: A test to adapt.
3100
 
        :param scenario: A tuple describing the scenarion.
3101
 
            The first element of the tuple is the new test id.
3102
 
            The second element is a dict containing attributes to set on the
3103
 
            test.
3104
 
        :return: The adapted test.
3105
 
        """
3106
 
        from copy import deepcopy
3107
 
        new_test = deepcopy(test)
3108
 
        for name, value in scenario[1].items():
3109
 
            setattr(new_test, name, value)
3110
 
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
3111
 
        new_test.id = lambda: new_id
3112
 
        return new_test
3113
 
 
3114
 
 
3115
 
def probe_unicode_in_user_encoding():
3116
 
    """Try to encode several unicode strings to use in unicode-aware tests.
3117
 
    Return first successfull match.
3118
 
 
3119
 
    :return:  (unicode value, encoded plain string value) or (None, None)
3120
 
    """
3121
 
    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3122
 
    for uni_val in possible_vals:
3123
 
        try:
3124
 
            str_val = uni_val.encode(bzrlib.user_encoding)
3125
 
        except UnicodeEncodeError:
3126
 
            # Try a different character
3127
 
            pass
3128
 
        else:
3129
 
            return uni_val, str_val
3130
 
    return None, None
3131
 
 
3132
 
 
3133
 
def probe_bad_non_ascii(encoding):
3134
 
    """Try to find [bad] character with code [128..255]
3135
 
    that cannot be decoded to unicode in some encoding.
3136
 
    Return None if all non-ascii characters is valid
3137
 
    for given encoding.
3138
 
    """
3139
 
    for i in xrange(128, 256):
3140
 
        char = chr(i)
3141
 
        try:
3142
 
            char.decode(encoding)
3143
 
        except UnicodeDecodeError:
3144
 
            return char
3145
 
    return None
3146
 
 
3147
 
 
3148
 
class _FTPServerFeature(Feature):
3149
 
    """Some tests want an FTP Server, check if one is available.
3150
 
 
3151
 
    Right now, the only way this is available is if 'medusa' is installed.
3152
 
    http://www.amk.ca/python/code/medusa.html
3153
 
    """
3154
 
 
3155
 
    def _probe(self):
3156
 
        try:
3157
 
            import bzrlib.tests.ftp_server
3158
 
            return True
3159
 
        except ImportError:
3160
 
            return False
3161
 
 
3162
 
    def feature_name(self):
3163
 
        return 'FTPServer'
3164
 
 
3165
 
FTPServerFeature = _FTPServerFeature()
3166
 
 
3167
 
 
3168
 
class _CaseInsensitiveFilesystemFeature(Feature):
3169
 
    """Check if underlined filesystem is case-insensitive
3170
 
    (e.g. on Windows, Cygwin, MacOS)
3171
 
    """
3172
 
 
3173
 
    def _probe(self):
3174
 
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
3175
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3176
 
            TestCaseWithMemoryTransport.TEST_ROOT = root
3177
 
        else:
3178
 
            root = TestCaseWithMemoryTransport.TEST_ROOT
3179
 
        tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3180
 
            dir=root)
3181
 
        name_a = osutils.pathjoin(tdir, 'a')
3182
 
        name_A = osutils.pathjoin(tdir, 'A')
3183
 
        os.mkdir(name_a)
3184
 
        result = osutils.isdir(name_A)
3185
 
        _rmtree_temp_dir(tdir)
3186
 
        return result
3187
 
 
3188
 
    def feature_name(self):
3189
 
        return 'case-insensitive filesystem'
3190
 
 
3191
 
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
 
337
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
 
338
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
 
339
        if m not in MODULES_TO_DOCTEST:
 
340
            MODULES_TO_DOCTEST.append(m)
 
341
 
 
342
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
 
343
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
 
344
    print
 
345
    suite = TestSuite()
 
346
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
 
347
    for m in MODULES_TO_TEST:
 
348
         suite.addTest(TestLoader().loadTestsFromModule(m))
 
349
    for m in (MODULES_TO_DOCTEST):
 
350
        suite.addTest(DocTestSuite(m))
 
351
    for p in bzrlib.plugin.all_plugins:
 
352
        if hasattr(p, 'test_suite'):
 
353
            suite.addTest(p.test_suite())
 
354
    return suite
 
355