~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2007-04-12 21:18:29 UTC
  • mto: This revision was merged to the branch mainline in revision 2415.
  • Revision ID: john@arbash-meinel.com-20070412211829-dduwktk8in0e6zpl
Clean up test ordering

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
 
34
import logging
 
35
import os
 
36
from pprint import pformat
 
37
import re
 
38
import shlex
 
39
import stat
 
40
from subprocess import Popen, PIPE
 
41
import sys
 
42
import tempfile
 
43
import unittest
 
44
import time
 
45
 
 
46
 
 
47
from bzrlib import (
 
48
    bzrdir,
 
49
    debug,
 
50
    errors,
 
51
    memorytree,
 
52
    osutils,
 
53
    progress,
 
54
    ui,
 
55
    urlutils,
 
56
    )
 
57
import bzrlib.branch
 
58
import bzrlib.commands
 
59
import bzrlib.timestamp
 
60
import bzrlib.export
 
61
import bzrlib.inventory
 
62
import bzrlib.iterablefile
 
63
import bzrlib.lockdir
 
64
try:
 
65
    import bzrlib.lsprof
 
66
except ImportError:
 
67
    # lsprof not available
 
68
    pass
 
69
from bzrlib.merge import merge_inner
 
70
import bzrlib.merge3
 
71
import bzrlib.osutils
 
72
import bzrlib.plugin
 
73
from bzrlib.revision import common_ancestor
 
74
import bzrlib.store
 
75
from bzrlib import symbol_versioning
 
76
import bzrlib.trace
 
77
from bzrlib.transport import get_transport
 
78
import bzrlib.transport
 
79
from bzrlib.transport.local import LocalURLServer
 
80
from bzrlib.transport.memory import MemoryServer
 
81
from bzrlib.transport.readonly import ReadonlyServer
 
82
from bzrlib.trace import mutter, note
 
83
from bzrlib.tests import TestUtil
 
84
from bzrlib.tests.HttpServer import HttpServer
 
85
from bzrlib.tests.TestUtil import (
 
86
                          TestSuite,
 
87
                          TestLoader,
 
88
                          )
 
89
from bzrlib.tests.treeshape import build_tree_contents
 
90
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
91
 
 
92
default_transport = LocalURLServer
19
93
 
20
94
MODULES_TO_TEST = []
21
 
MODULES_TO_DOCTEST = []
22
 
 
23
 
def selftest():
24
 
    from unittest import TestLoader, TestSuite
25
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
26
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
27
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
28
 
 
29
 
    import bzrlib.selftest.whitebox
30
 
    import bzrlib.selftest.blackbox
31
 
    import bzrlib.selftest.versioning
32
 
    import bzrlib.selftest.testmerge3
33
 
    import bzrlib.selftest.testhashcache
34
 
    import bzrlib.selftest.testrevisionnamespaces
35
 
    import bzrlib.selftest.testbranch
36
 
    import bzrlib.selftest.teststatus
37
 
    import bzrlib.selftest.testinv
38
 
    import bzrlib.merge_core
39
 
    from doctest import DocTestSuite
40
 
    import os
 
95
MODULES_TO_DOCTEST = [
 
96
                      bzrlib.timestamp,
 
97
                      bzrlib.errors,
 
98
                      bzrlib.export,
 
99
                      bzrlib.inventory,
 
100
                      bzrlib.iterablefile,
 
101
                      bzrlib.lockdir,
 
102
                      bzrlib.merge3,
 
103
                      bzrlib.option,
 
104
                      bzrlib.store,
 
105
                      ]
 
106
 
 
107
 
 
108
def packages_to_test():
 
109
    """Return a list of packages to test.
 
110
 
 
111
    The packages are not globally imported so that import failures are
 
112
    triggered when running selftest, not when importing the command.
 
113
    """
 
114
    import bzrlib.doc
 
115
    import bzrlib.tests.blackbox
 
116
    import bzrlib.tests.branch_implementations
 
117
    import bzrlib.tests.bzrdir_implementations
 
118
    import bzrlib.tests.interrepository_implementations
 
119
    import bzrlib.tests.interversionedfile_implementations
 
120
    import bzrlib.tests.intertree_implementations
 
121
    import bzrlib.tests.per_lock
 
122
    import bzrlib.tests.repository_implementations
 
123
    import bzrlib.tests.revisionstore_implementations
 
124
    import bzrlib.tests.tree_implementations
 
125
    import bzrlib.tests.workingtree_implementations
 
126
    return [
 
127
            bzrlib.doc,
 
128
            bzrlib.tests.blackbox,
 
129
            bzrlib.tests.branch_implementations,
 
130
            bzrlib.tests.bzrdir_implementations,
 
131
            bzrlib.tests.interrepository_implementations,
 
132
            bzrlib.tests.interversionedfile_implementations,
 
133
            bzrlib.tests.intertree_implementations,
 
134
            bzrlib.tests.per_lock,
 
135
            bzrlib.tests.repository_implementations,
 
136
            bzrlib.tests.revisionstore_implementations,
 
137
            bzrlib.tests.tree_implementations,
 
138
            bzrlib.tests.workingtree_implementations,
 
139
            ]
 
140
 
 
141
 
 
142
class ExtendedTestResult(unittest._TextTestResult):
 
143
    """Accepts, reports and accumulates the results of running tests.
 
144
 
 
145
    Compared to this unittest version this class adds support for profiling,
 
146
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
147
    There are further-specialized subclasses for different types of display.
 
148
    """
 
149
 
 
150
    stop_early = False
 
151
    
 
152
    def __init__(self, stream, descriptions, verbosity,
 
153
                 bench_history=None,
 
154
                 num_tests=None,
 
155
                 use_numbered_dirs=False,
 
156
                 ):
 
157
        """Construct new TestResult.
 
158
 
 
159
        :param bench_history: Optionally, a writable file object to accumulate
 
160
            benchmark results.
 
161
        """
 
162
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
163
        if bench_history is not None:
 
164
            from bzrlib.version import _get_bzr_source_tree
 
165
            src_tree = _get_bzr_source_tree()
 
166
            if src_tree:
 
167
                try:
 
168
                    revision_id = src_tree.get_parent_ids()[0]
 
169
                except IndexError:
 
170
                    # XXX: if this is a brand new tree, do the same as if there
 
171
                    # is no branch.
 
172
                    revision_id = ''
 
173
            else:
 
174
                # XXX: If there's no branch, what should we do?
 
175
                revision_id = ''
 
176
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
177
        self._bench_history = bench_history
 
178
        self.ui = ui.ui_factory
 
179
        self.num_tests = num_tests
 
180
        self.error_count = 0
 
181
        self.failure_count = 0
 
182
        self.known_failure_count = 0
 
183
        self.skip_count = 0
 
184
        self.unsupported = {}
 
185
        self.count = 0
 
186
        self.use_numbered_dirs = use_numbered_dirs
 
187
        self._overall_start_time = time.time()
 
188
    
 
189
    def extractBenchmarkTime(self, testCase):
 
190
        """Add a benchmark time for the current test case."""
 
191
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
192
    
 
193
    def _elapsedTestTimeString(self):
 
194
        """Return a time string for the overall time the current test has taken."""
 
195
        return self._formatTime(time.time() - self._start_time)
 
196
 
 
197
    def _testTimeString(self):
 
198
        if self._benchmarkTime is not None:
 
199
            return "%s/%s" % (
 
200
                self._formatTime(self._benchmarkTime),
 
201
                self._elapsedTestTimeString())
 
202
        else:
 
203
            return "           %s" % self._elapsedTestTimeString()
 
204
 
 
205
    def _formatTime(self, seconds):
 
206
        """Format seconds as milliseconds with leading spaces."""
 
207
        # some benchmarks can take thousands of seconds to run, so we need 8
 
208
        # places
 
209
        return "%8dms" % (1000 * seconds)
 
210
 
 
211
    def _shortened_test_description(self, test):
 
212
        what = test.id()
 
213
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
214
        return what
 
215
 
 
216
    def startTest(self, test):
 
217
        unittest.TestResult.startTest(self, test)
 
218
        self.report_test_start(test)
 
219
        test.number = self.count
 
220
        self._recordTestStartTime()
 
221
 
 
222
    def _recordTestStartTime(self):
 
223
        """Record that a test has started."""
 
224
        self._start_time = time.time()
 
225
 
 
226
    def _cleanupLogFile(self, test):
 
227
        # We can only do this if we have one of our TestCases, not if
 
228
        # we have a doctest.
 
229
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
230
        if setKeepLogfile is not None:
 
231
            setKeepLogfile()
 
232
 
 
233
    def addError(self, test, err):
 
234
        self.extractBenchmarkTime(test)
 
235
        self._cleanupLogFile(test)
 
236
        if isinstance(err[1], TestSkipped):
 
237
            return self.addSkipped(test, err)
 
238
        elif isinstance(err[1], UnavailableFeature):
 
239
            return self.addNotSupported(test, err[1].args[0])
 
240
        unittest.TestResult.addError(self, test, err)
 
241
        self.error_count += 1
 
242
        self.report_error(test, err)
 
243
        if self.stop_early:
 
244
            self.stop()
 
245
 
 
246
    def addFailure(self, test, err):
 
247
        self._cleanupLogFile(test)
 
248
        self.extractBenchmarkTime(test)
 
249
        if isinstance(err[1], KnownFailure):
 
250
            return self.addKnownFailure(test, err)
 
251
        unittest.TestResult.addFailure(self, test, err)
 
252
        self.failure_count += 1
 
253
        self.report_failure(test, err)
 
254
        if self.stop_early:
 
255
            self.stop()
 
256
 
 
257
    def addKnownFailure(self, test, err):
 
258
        self.known_failure_count += 1
 
259
        self.report_known_failure(test, err)
 
260
 
 
261
    def addNotSupported(self, test, feature):
 
262
        self.unsupported.setdefault(str(feature), 0)
 
263
        self.unsupported[str(feature)] += 1
 
264
        self.report_unsupported(test, feature)
 
265
 
 
266
    def addSuccess(self, test):
 
267
        self.extractBenchmarkTime(test)
 
268
        if self._bench_history is not None:
 
269
            if self._benchmarkTime is not None:
 
270
                self._bench_history.write("%s %s\n" % (
 
271
                    self._formatTime(self._benchmarkTime),
 
272
                    test.id()))
 
273
        self.report_success(test)
 
274
        unittest.TestResult.addSuccess(self, test)
 
275
 
 
276
    def addSkipped(self, test, skip_excinfo):
 
277
        self.report_skip(test, skip_excinfo)
 
278
        # seems best to treat this as success from point-of-view of unittest
 
279
        # -- it actually does nothing so it barely matters :)
 
280
        try:
 
281
            test.tearDown()
 
282
        except KeyboardInterrupt:
 
283
            raise
 
284
        except:
 
285
            self.addError(test, test.__exc_info())
 
286
        else:
 
287
            unittest.TestResult.addSuccess(self, test)
 
288
 
 
289
    def printErrorList(self, flavour, errors):
 
290
        for test, err in errors:
 
291
            self.stream.writeln(self.separator1)
 
292
            self.stream.write("%s: " % flavour)
 
293
            if self.use_numbered_dirs:
 
294
                self.stream.write('#%d ' % test.number)
 
295
            self.stream.writeln(self.getDescription(test))
 
296
            if getattr(test, '_get_log', None) is not None:
 
297
                print >>self.stream
 
298
                print >>self.stream, \
 
299
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
300
                print >>self.stream, test._get_log()
 
301
                print >>self.stream, \
 
302
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
303
            self.stream.writeln(self.separator2)
 
304
            self.stream.writeln("%s" % err)
 
305
 
 
306
    def finished(self):
 
307
        pass
 
308
 
 
309
    def report_cleaning_up(self):
 
310
        pass
 
311
 
 
312
    def report_success(self, test):
 
313
        pass
 
314
 
 
315
 
 
316
class TextTestResult(ExtendedTestResult):
 
317
    """Displays progress and results of tests in text form"""
 
318
 
 
319
    def __init__(self, stream, descriptions, verbosity,
 
320
                 bench_history=None,
 
321
                 num_tests=None,
 
322
                 pb=None,
 
323
                 use_numbered_dirs=False,
 
324
                 ):
 
325
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
326
            bench_history, num_tests, use_numbered_dirs)
 
327
        if pb is None:
 
328
            self.pb = self.ui.nested_progress_bar()
 
329
            self._supplied_pb = False
 
330
        else:
 
331
            self.pb = pb
 
332
            self._supplied_pb = True
 
333
        self.pb.show_pct = False
 
334
        self.pb.show_spinner = False
 
335
        self.pb.show_eta = False,
 
336
        self.pb.show_count = False
 
337
        self.pb.show_bar = False
 
338
 
 
339
    def report_starting(self):
 
340
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
341
 
 
342
    def _progress_prefix_text(self):
 
343
        a = '[%d' % self.count
 
344
        if self.num_tests is not None:
 
345
            a +='/%d' % self.num_tests
 
346
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
347
        if self.error_count:
 
348
            a += ', %d errors' % self.error_count
 
349
        if self.failure_count:
 
350
            a += ', %d failed' % self.failure_count
 
351
        if self.known_failure_count:
 
352
            a += ', %d known failures' % self.known_failure_count
 
353
        if self.skip_count:
 
354
            a += ', %d skipped' % self.skip_count
 
355
        if self.unsupported:
 
356
            a += ', %d missing features' % len(self.unsupported)
 
357
        a += ']'
 
358
        return a
 
359
 
 
360
    def report_test_start(self, test):
 
361
        self.count += 1
 
362
        self.pb.update(
 
363
                self._progress_prefix_text()
 
364
                + ' ' 
 
365
                + self._shortened_test_description(test))
 
366
 
 
367
    def _test_description(self, test):
 
368
        if self.use_numbered_dirs:
 
369
            return '#%d %s' % (self.count,
 
370
                               self._shortened_test_description(test))
 
371
        else:
 
372
            return self._shortened_test_description(test)
 
373
 
 
374
    def report_error(self, test, err):
 
375
        self.pb.note('ERROR: %s\n    %s\n', 
 
376
            self._test_description(test),
 
377
            err[1],
 
378
            )
 
379
 
 
380
    def report_failure(self, test, err):
 
381
        self.pb.note('FAIL: %s\n    %s\n', 
 
382
            self._test_description(test),
 
383
            err[1],
 
384
            )
 
385
 
 
386
    def report_known_failure(self, test, err):
 
387
        self.pb.note('XFAIL: %s\n%s\n',
 
388
            self._test_description(test), err[1])
 
389
 
 
390
    def report_skip(self, test, skip_excinfo):
 
391
        self.skip_count += 1
 
392
        if False:
 
393
            # at the moment these are mostly not things we can fix
 
394
            # and so they just produce stipple; use the verbose reporter
 
395
            # to see them.
 
396
            if False:
 
397
                # show test and reason for skip
 
398
                self.pb.note('SKIP: %s\n    %s\n', 
 
399
                    self._shortened_test_description(test),
 
400
                    skip_excinfo[1])
 
401
            else:
 
402
                # since the class name was left behind in the still-visible
 
403
                # progress bar...
 
404
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
405
 
 
406
    def report_unsupported(self, test, feature):
 
407
        """test cannot be run because feature is missing."""
 
408
                  
 
409
    def report_cleaning_up(self):
 
410
        self.pb.update('cleaning up...')
 
411
 
 
412
    def finished(self):
 
413
        if not self._supplied_pb:
 
414
            self.pb.finished()
 
415
 
 
416
 
 
417
class VerboseTestResult(ExtendedTestResult):
 
418
    """Produce long output, with one line per test run plus times"""
 
419
 
 
420
    def _ellipsize_to_right(self, a_string, final_width):
 
421
        """Truncate and pad a string, keeping the right hand side"""
 
422
        if len(a_string) > final_width:
 
423
            result = '...' + a_string[3-final_width:]
 
424
        else:
 
425
            result = a_string
 
426
        return result.ljust(final_width)
 
427
 
 
428
    def report_starting(self):
 
429
        self.stream.write('running %d tests...\n' % self.num_tests)
 
430
 
 
431
    def report_test_start(self, test):
 
432
        self.count += 1
 
433
        name = self._shortened_test_description(test)
 
434
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
435
        # numbers, plus a trailing blank
 
436
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
437
        if self.use_numbered_dirs:
 
438
            self.stream.write('%5d ' % self.count)
 
439
            self.stream.write(self._ellipsize_to_right(name,
 
440
                                osutils.terminal_width()-36))
 
441
        else:
 
442
            self.stream.write(self._ellipsize_to_right(name,
 
443
                                osutils.terminal_width()-30))
 
444
        self.stream.flush()
 
445
 
 
446
    def _error_summary(self, err):
 
447
        indent = ' ' * 4
 
448
        if self.use_numbered_dirs:
 
449
            indent += ' ' * 6
 
450
        return '%s%s' % (indent, err[1])
 
451
 
 
452
    def report_error(self, test, err):
 
453
        self.stream.writeln('ERROR %s\n%s'
 
454
                % (self._testTimeString(),
 
455
                   self._error_summary(err)))
 
456
 
 
457
    def report_failure(self, test, err):
 
458
        self.stream.writeln(' FAIL %s\n%s'
 
459
                % (self._testTimeString(),
 
460
                   self._error_summary(err)))
 
461
 
 
462
    def report_known_failure(self, test, err):
 
463
        self.stream.writeln('XFAIL %s\n%s'
 
464
                % (self._testTimeString(),
 
465
                   self._error_summary(err)))
 
466
 
 
467
    def report_success(self, test):
 
468
        self.stream.writeln('   OK %s' % self._testTimeString())
 
469
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
470
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
471
            stats.pprint(file=self.stream)
 
472
        # flush the stream so that we get smooth output. This verbose mode is
 
473
        # used to show the output in PQM.
 
474
        self.stream.flush()
 
475
 
 
476
    def report_skip(self, test, skip_excinfo):
 
477
        self.skip_count += 1
 
478
        self.stream.writeln(' SKIP %s\n%s'
 
479
                % (self._testTimeString(),
 
480
                   self._error_summary(skip_excinfo)))
 
481
 
 
482
    def report_unsupported(self, test, feature):
 
483
        """test cannot be run because feature is missing."""
 
484
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
485
                %(self._testTimeString(), feature))
 
486
                  
 
487
 
 
488
 
 
489
class TextTestRunner(object):
 
490
    stop_on_failure = False
 
491
 
 
492
    def __init__(self,
 
493
                 stream=sys.stderr,
 
494
                 descriptions=0,
 
495
                 verbosity=1,
 
496
                 keep_output=False,
 
497
                 bench_history=None,
 
498
                 use_numbered_dirs=False,
 
499
                 ):
 
500
        self.stream = unittest._WritelnDecorator(stream)
 
501
        self.descriptions = descriptions
 
502
        self.verbosity = verbosity
 
503
        self.keep_output = keep_output
 
504
        self._bench_history = bench_history
 
505
        self.use_numbered_dirs = use_numbered_dirs
 
506
 
 
507
    def run(self, test):
 
508
        "Run the given test case or test suite."
 
509
        startTime = time.time()
 
510
        if self.verbosity == 1:
 
511
            result_class = TextTestResult
 
512
        elif self.verbosity >= 2:
 
513
            result_class = VerboseTestResult
 
514
        result = result_class(self.stream,
 
515
                              self.descriptions,
 
516
                              self.verbosity,
 
517
                              bench_history=self._bench_history,
 
518
                              num_tests=test.countTestCases(),
 
519
                              use_numbered_dirs=self.use_numbered_dirs,
 
520
                              )
 
521
        result.stop_early = self.stop_on_failure
 
522
        result.report_starting()
 
523
        test.run(result)
 
524
        stopTime = time.time()
 
525
        timeTaken = stopTime - startTime
 
526
        result.printErrors()
 
527
        self.stream.writeln(result.separator2)
 
528
        run = result.testsRun
 
529
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
530
                            (run, run != 1 and "s" or "", timeTaken))
 
531
        self.stream.writeln()
 
532
        if not result.wasSuccessful():
 
533
            self.stream.write("FAILED (")
 
534
            failed, errored = map(len, (result.failures, result.errors))
 
535
            if failed:
 
536
                self.stream.write("failures=%d" % failed)
 
537
            if errored:
 
538
                if failed: self.stream.write(", ")
 
539
                self.stream.write("errors=%d" % errored)
 
540
            if result.known_failure_count:
 
541
                if failed or errored: self.stream.write(", ")
 
542
                self.stream.write("known_failure_count=%d" %
 
543
                    result.known_failure_count)
 
544
            self.stream.writeln(")")
 
545
        else:
 
546
            if result.known_failure_count:
 
547
                self.stream.writeln("OK (known_failures=%d)" %
 
548
                    result.known_failure_count)
 
549
            else:
 
550
                self.stream.writeln("OK")
 
551
        if result.skip_count > 0:
 
552
            skipped = result.skip_count
 
553
            self.stream.writeln('%d test%s skipped' %
 
554
                                (skipped, skipped != 1 and "s" or ""))
 
555
        if result.unsupported:
 
556
            for feature, count in sorted(result.unsupported.items()):
 
557
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
558
                    (feature, count))
 
559
        result.report_cleaning_up()
 
560
        # This is still a little bogus, 
 
561
        # but only a little. Folk not using our testrunner will
 
562
        # have to delete their temp directories themselves.
 
563
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
564
        if result.wasSuccessful() or not self.keep_output:
 
565
            if test_root is not None:
 
566
                # If LANG=C we probably have created some bogus paths
 
567
                # which rmtree(unicode) will fail to delete
 
568
                # so make sure we are using rmtree(str) to delete everything
 
569
                # except on win32, where rmtree(str) will fail
 
570
                # since it doesn't have the property of byte-stream paths
 
571
                # (they are either ascii or mbcs)
 
572
                if sys.platform == 'win32':
 
573
                    # make sure we are using the unicode win32 api
 
574
                    test_root = unicode(test_root)
 
575
                else:
 
576
                    test_root = test_root.encode(
 
577
                        sys.getfilesystemencoding())
 
578
                _rmtree_temp_dir(test_root)
 
579
        else:
 
580
            note("Failed tests working directories are in '%s'\n", test_root)
 
581
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
582
        result.finished()
 
583
        return result
 
584
 
 
585
 
 
586
def iter_suite_tests(suite):
 
587
    """Return all tests in a suite, recursing through nested suites"""
 
588
    for item in suite._tests:
 
589
        if isinstance(item, unittest.TestCase):
 
590
            yield item
 
591
        elif isinstance(item, unittest.TestSuite):
 
592
            for r in iter_suite_tests(item):
 
593
                yield r
 
594
        else:
 
595
            raise Exception('unknown object %r inside test suite %r'
 
596
                            % (item, suite))
 
597
 
 
598
 
 
599
class TestSkipped(Exception):
 
600
    """Indicates that a test was intentionally skipped, rather than failing."""
 
601
 
 
602
 
 
603
class KnownFailure(AssertionError):
 
604
    """Indicates that a test failed in a precisely expected manner.
 
605
 
 
606
    Such failures dont block the whole test suite from passing because they are
 
607
    indicators of partially completed code or of future work. We have an
 
608
    explicit error for them so that we can ensure that they are always visible:
 
609
    KnownFailures are always shown in the output of bzr selftest.
 
610
    """
 
611
 
 
612
 
 
613
class UnavailableFeature(Exception):
 
614
    """A feature required for this test was not available.
 
615
 
 
616
    The feature should be used to construct the exception.
 
617
    """
 
618
 
 
619
 
 
620
class CommandFailed(Exception):
 
621
    pass
 
622
 
 
623
 
 
624
class StringIOWrapper(object):
 
625
    """A wrapper around cStringIO which just adds an encoding attribute.
 
626
    
 
627
    Internally we can check sys.stdout to see what the output encoding
 
628
    should be. However, cStringIO has no encoding attribute that we can
 
629
    set. So we wrap it instead.
 
630
    """
 
631
    encoding='ascii'
 
632
    _cstring = None
 
633
 
 
634
    def __init__(self, s=None):
 
635
        if s is not None:
 
636
            self.__dict__['_cstring'] = StringIO(s)
 
637
        else:
 
638
            self.__dict__['_cstring'] = StringIO()
 
639
 
 
640
    def __getattr__(self, name, getattr=getattr):
 
641
        return getattr(self.__dict__['_cstring'], name)
 
642
 
 
643
    def __setattr__(self, name, val):
 
644
        if name == 'encoding':
 
645
            self.__dict__['encoding'] = val
 
646
        else:
 
647
            return setattr(self._cstring, name, val)
 
648
 
 
649
 
 
650
class TestUIFactory(ui.CLIUIFactory):
 
651
    """A UI Factory for testing.
 
652
 
 
653
    Hide the progress bar but emit note()s.
 
654
    Redirect stdin.
 
655
    Allows get_password to be tested without real tty attached.
 
656
    """
 
657
 
 
658
    def __init__(self,
 
659
                 stdout=None,
 
660
                 stderr=None,
 
661
                 stdin=None):
 
662
        super(TestUIFactory, self).__init__()
 
663
        if stdin is not None:
 
664
            # We use a StringIOWrapper to be able to test various
 
665
            # encodings, but the user is still responsible to
 
666
            # encode the string and to set the encoding attribute
 
667
            # of StringIOWrapper.
 
668
            self.stdin = StringIOWrapper(stdin)
 
669
        if stdout is None:
 
670
            self.stdout = sys.stdout
 
671
        else:
 
672
            self.stdout = stdout
 
673
        if stderr is None:
 
674
            self.stderr = sys.stderr
 
675
        else:
 
676
            self.stderr = stderr
 
677
 
 
678
    def clear(self):
 
679
        """See progress.ProgressBar.clear()."""
 
680
 
 
681
    def clear_term(self):
 
682
        """See progress.ProgressBar.clear_term()."""
 
683
 
 
684
    def clear_term(self):
 
685
        """See progress.ProgressBar.clear_term()."""
 
686
 
 
687
    def finished(self):
 
688
        """See progress.ProgressBar.finished()."""
 
689
 
 
690
    def note(self, fmt_string, *args, **kwargs):
 
691
        """See progress.ProgressBar.note()."""
 
692
        self.stdout.write((fmt_string + "\n") % args)
 
693
 
 
694
    def progress_bar(self):
 
695
        return self
 
696
 
 
697
    def nested_progress_bar(self):
 
698
        return self
 
699
 
 
700
    def update(self, message, count=None, total=None):
 
701
        """See progress.ProgressBar.update()."""
 
702
 
 
703
    def get_non_echoed_password(self, prompt):
 
704
        """Get password from stdin without trying to handle the echo mode"""
 
705
        if prompt:
 
706
            self.stdout.write(prompt)
 
707
        password = self.stdin.readline()
 
708
        if not password:
 
709
            raise EOFError
 
710
        if password[-1] == '\n':
 
711
            password = password[:-1]
 
712
        return password
 
713
 
 
714
 
 
715
class TestCase(unittest.TestCase):
 
716
    """Base class for bzr unit tests.
 
717
    
 
718
    Tests that need access to disk resources should subclass 
 
719
    TestCaseInTempDir not TestCase.
 
720
 
 
721
    Error and debug log messages are redirected from their usual
 
722
    location into a temporary file, the contents of which can be
 
723
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
724
    so that it can also capture file IO.  When the test completes this file
 
725
    is read into memory and removed from disk.
 
726
       
 
727
    There are also convenience functions to invoke bzr's command-line
 
728
    routine, and to build and check bzr trees.
 
729
   
 
730
    In addition to the usual method of overriding tearDown(), this class also
 
731
    allows subclasses to register functions into the _cleanups list, which is
 
732
    run in order as the object is torn down.  It's less likely this will be
 
733
    accidentally overlooked.
 
734
    """
 
735
 
 
736
    _log_file_name = None
 
737
    _log_contents = ''
 
738
    _keep_log_file = False
 
739
    # record lsprof data when performing benchmark calls.
 
740
    _gather_lsprof_in_benchmarks = False
 
741
 
 
742
    def __init__(self, methodName='testMethod'):
 
743
        super(TestCase, self).__init__(methodName)
 
744
        self._cleanups = []
 
745
 
 
746
    def setUp(self):
 
747
        unittest.TestCase.setUp(self)
 
748
        self._cleanEnvironment()
 
749
        bzrlib.trace.disable_default_logging()
 
750
        self._silenceUI()
 
751
        self._startLogFile()
 
752
        self._benchcalls = []
 
753
        self._benchtime = None
 
754
        # prevent hooks affecting tests
 
755
        self._preserved_hooks = {
 
756
            bzrlib.branch.Branch:bzrlib.branch.Branch.hooks,
 
757
            bzrlib.smart.server.SmartTCPServer:bzrlib.smart.server.SmartTCPServer.hooks,
 
758
            }
 
759
        self.addCleanup(self._restoreHooks)
 
760
        # this list of hooks must be kept in sync with the defaults
 
761
        # in branch.py
 
762
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
763
 
 
764
    def _silenceUI(self):
 
765
        """Turn off UI for duration of test"""
 
766
        # by default the UI is off; tests can turn it on if they want it.
 
767
        saved = ui.ui_factory
 
768
        def _restore():
 
769
            ui.ui_factory = saved
 
770
        ui.ui_factory = ui.SilentUIFactory()
 
771
        self.addCleanup(_restore)
 
772
 
 
773
    def _ndiff_strings(self, a, b):
 
774
        """Return ndiff between two strings containing lines.
 
775
        
 
776
        A trailing newline is added if missing to make the strings
 
777
        print properly."""
 
778
        if b and b[-1] != '\n':
 
779
            b += '\n'
 
780
        if a and a[-1] != '\n':
 
781
            a += '\n'
 
782
        difflines = difflib.ndiff(a.splitlines(True),
 
783
                                  b.splitlines(True),
 
784
                                  linejunk=lambda x: False,
 
785
                                  charjunk=lambda x: False)
 
786
        return ''.join(difflines)
 
787
 
 
788
    def assertEqual(self, a, b, message=''):
 
789
        try:
 
790
            if a == b:
 
791
                return
 
792
        except UnicodeError, e:
 
793
            # If we can't compare without getting a UnicodeError, then
 
794
            # obviously they are different
 
795
            mutter('UnicodeError: %s', e)
 
796
        if message:
 
797
            message += '\n'
 
798
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
799
            % (message,
 
800
               pformat(a, indent=4), pformat(b, indent=4)))
 
801
 
 
802
    assertEquals = assertEqual
 
803
 
 
804
    def assertEqualDiff(self, a, b, message=None):
 
805
        """Assert two texts are equal, if not raise an exception.
 
806
        
 
807
        This is intended for use with multi-line strings where it can 
 
808
        be hard to find the differences by eye.
 
809
        """
 
810
        # TODO: perhaps override assertEquals to call this for strings?
 
811
        if a == b:
 
812
            return
 
813
        if message is None:
 
814
            message = "texts not equal:\n"
 
815
        raise AssertionError(message + 
 
816
                             self._ndiff_strings(a, b))      
 
817
        
 
818
    def assertEqualMode(self, mode, mode_test):
 
819
        self.assertEqual(mode, mode_test,
 
820
                         'mode mismatch %o != %o' % (mode, mode_test))
 
821
 
 
822
    def assertStartsWith(self, s, prefix):
 
823
        if not s.startswith(prefix):
 
824
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
825
 
 
826
    def assertEndsWith(self, s, suffix):
 
827
        """Asserts that s ends with suffix."""
 
828
        if not s.endswith(suffix):
 
829
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
830
 
 
831
    def assertContainsRe(self, haystack, needle_re):
 
832
        """Assert that a contains something matching a regular expression."""
 
833
        if not re.search(needle_re, haystack):
 
834
            raise AssertionError('pattern "%r" not found in "%r"'
 
835
                    % (needle_re, haystack))
 
836
 
 
837
    def assertNotContainsRe(self, haystack, needle_re):
 
838
        """Assert that a does not match a regular expression"""
 
839
        if re.search(needle_re, haystack):
 
840
            raise AssertionError('pattern "%s" found in "%s"'
 
841
                    % (needle_re, haystack))
 
842
 
 
843
    def assertSubset(self, sublist, superlist):
 
844
        """Assert that every entry in sublist is present in superlist."""
 
845
        missing = []
 
846
        for entry in sublist:
 
847
            if entry not in superlist:
 
848
                missing.append(entry)
 
849
        if len(missing) > 0:
 
850
            raise AssertionError("value(s) %r not present in container %r" % 
 
851
                                 (missing, superlist))
 
852
 
 
853
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
854
        """Fail unless excClass is raised when the iterator from func is used.
 
855
        
 
856
        Many functions can return generators this makes sure
 
857
        to wrap them in a list() call to make sure the whole generator
 
858
        is run, and that the proper exception is raised.
 
859
        """
 
860
        try:
 
861
            list(func(*args, **kwargs))
 
862
        except excClass:
 
863
            return
 
864
        else:
 
865
            if getattr(excClass,'__name__', None) is not None:
 
866
                excName = excClass.__name__
 
867
            else:
 
868
                excName = str(excClass)
 
869
            raise self.failureException, "%s not raised" % excName
 
870
 
 
871
    def assertRaises(self, excClass, func, *args, **kwargs):
 
872
        """Assert that a callable raises a particular exception.
 
873
 
 
874
        :param excClass: As for the except statement, this may be either an
 
875
        exception class, or a tuple of classes.
 
876
 
 
877
        Returns the exception so that you can examine it.
 
878
        """
 
879
        try:
 
880
            func(*args, **kwargs)
 
881
        except excClass, e:
 
882
            return e
 
883
        else:
 
884
            if getattr(excClass,'__name__', None) is not None:
 
885
                excName = excClass.__name__
 
886
            else:
 
887
                # probably a tuple
 
888
                excName = str(excClass)
 
889
            raise self.failureException, "%s not raised" % excName
 
890
 
 
891
    def assertIs(self, left, right, message=None):
 
892
        if not (left is right):
 
893
            if message is not None:
 
894
                raise AssertionError(message)
 
895
            else:
 
896
                raise AssertionError("%r is not %r." % (left, right))
 
897
 
 
898
    def assertIsNot(self, left, right, message=None):
 
899
        if (left is right):
 
900
            if message is not None:
 
901
                raise AssertionError(message)
 
902
            else:
 
903
                raise AssertionError("%r is %r." % (left, right))
 
904
 
 
905
    def assertTransportMode(self, transport, path, mode):
 
906
        """Fail if a path does not have mode mode.
 
907
        
 
908
        If modes are not supported on this transport, the assertion is ignored.
 
909
        """
 
910
        if not transport._can_roundtrip_unix_modebits():
 
911
            return
 
912
        path_stat = transport.stat(path)
 
913
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
914
        self.assertEqual(mode, actual_mode,
 
915
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
916
 
 
917
    def assertIsInstance(self, obj, kls):
 
918
        """Fail if obj is not an instance of kls"""
 
919
        if not isinstance(obj, kls):
 
920
            self.fail("%r is an instance of %s rather than %s" % (
 
921
                obj, obj.__class__, kls))
 
922
 
 
923
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
924
        """Invoke a test, expecting it to fail for the given reason.
 
925
 
 
926
        This is for assertions that ought to succeed, but currently fail.
 
927
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
928
        about the failure you're expecting.  If a new bug is introduced,
 
929
        AssertionError should be raised, not KnownFailure.
 
930
 
 
931
        Frequently, expectFailure should be followed by an opposite assertion.
 
932
        See example below.
 
933
 
 
934
        Intended to be used with a callable that raises AssertionError as the
 
935
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
936
 
 
937
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
938
        test succeeds.
 
939
 
 
940
        example usage::
 
941
 
 
942
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
943
                             dynamic_val)
 
944
          self.assertEqual(42, dynamic_val)
 
945
 
 
946
          This means that a dynamic_val of 54 will cause the test to raise
 
947
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
948
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
949
          than 54 or 42 will cause an AssertionError.
 
950
        """
 
951
        try:
 
952
            assertion(*args, **kwargs)
 
953
        except AssertionError:
 
954
            raise KnownFailure(reason)
 
955
        else:
 
956
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
957
 
 
958
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
959
        """A helper for callDeprecated and applyDeprecated.
 
960
 
 
961
        :param a_callable: A callable to call.
 
962
        :param args: The positional arguments for the callable
 
963
        :param kwargs: The keyword arguments for the callable
 
964
        :return: A tuple (warnings, result). result is the result of calling
 
965
            a_callable(*args, **kwargs).
 
966
        """
 
967
        local_warnings = []
 
968
        def capture_warnings(msg, cls=None, stacklevel=None):
 
969
            # we've hooked into a deprecation specific callpath,
 
970
            # only deprecations should getting sent via it.
 
971
            self.assertEqual(cls, DeprecationWarning)
 
972
            local_warnings.append(msg)
 
973
        original_warning_method = symbol_versioning.warn
 
974
        symbol_versioning.set_warning_method(capture_warnings)
 
975
        try:
 
976
            result = a_callable(*args, **kwargs)
 
977
        finally:
 
978
            symbol_versioning.set_warning_method(original_warning_method)
 
979
        return (local_warnings, result)
 
980
 
 
981
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
982
        """Call a deprecated callable without warning the user.
 
983
 
 
984
        :param deprecation_format: The deprecation format that the callable
 
985
            should have been deprecated with. This is the same type as the 
 
986
            parameter to deprecated_method/deprecated_function. If the 
 
987
            callable is not deprecated with this format, an assertion error
 
988
            will be raised.
 
989
        :param a_callable: A callable to call. This may be a bound method or
 
990
            a regular function. It will be called with *args and **kwargs.
 
991
        :param args: The positional arguments for the callable
 
992
        :param kwargs: The keyword arguments for the callable
 
993
        :return: The result of a_callable(*args, **kwargs)
 
994
        """
 
995
        call_warnings, result = self._capture_warnings(a_callable,
 
996
            *args, **kwargs)
 
997
        expected_first_warning = symbol_versioning.deprecation_string(
 
998
            a_callable, deprecation_format)
 
999
        if len(call_warnings) == 0:
 
1000
            self.fail("No deprecation warning generated by call to %s" %
 
1001
                a_callable)
 
1002
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1003
        return result
 
1004
 
 
1005
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1006
        """Assert that a callable is deprecated in a particular way.
 
1007
 
 
1008
        This is a very precise test for unusual requirements. The 
 
1009
        applyDeprecated helper function is probably more suited for most tests
 
1010
        as it allows you to simply specify the deprecation format being used
 
1011
        and will ensure that that is issued for the function being called.
 
1012
 
 
1013
        :param expected: a list of the deprecation warnings expected, in order
 
1014
        :param callable: The callable to call
 
1015
        :param args: The positional arguments for the callable
 
1016
        :param kwargs: The keyword arguments for the callable
 
1017
        """
 
1018
        call_warnings, result = self._capture_warnings(callable,
 
1019
            *args, **kwargs)
 
1020
        self.assertEqual(expected, call_warnings)
 
1021
        return result
 
1022
 
 
1023
    def _startLogFile(self):
 
1024
        """Send bzr and test log messages to a temporary file.
 
1025
 
 
1026
        The file is removed as the test is torn down.
 
1027
        """
 
1028
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1029
        self._log_file = os.fdopen(fileno, 'w+')
 
1030
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1031
        self._log_file_name = name
 
1032
        self.addCleanup(self._finishLogFile)
 
1033
 
 
1034
    def _finishLogFile(self):
 
1035
        """Finished with the log file.
 
1036
 
 
1037
        Close the file and delete it, unless setKeepLogfile was called.
 
1038
        """
 
1039
        if self._log_file is None:
 
1040
            return
 
1041
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1042
        self._log_file.close()
 
1043
        self._log_file = None
 
1044
        if not self._keep_log_file:
 
1045
            os.remove(self._log_file_name)
 
1046
            self._log_file_name = None
 
1047
 
 
1048
    def setKeepLogfile(self):
 
1049
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1050
        self._keep_log_file = True
 
1051
 
 
1052
    def addCleanup(self, callable):
 
1053
        """Arrange to run a callable when this case is torn down.
 
1054
 
 
1055
        Callables are run in the reverse of the order they are registered, 
 
1056
        ie last-in first-out.
 
1057
        """
 
1058
        if callable in self._cleanups:
 
1059
            raise ValueError("cleanup function %r already registered on %s" 
 
1060
                    % (callable, self))
 
1061
        self._cleanups.append(callable)
 
1062
 
 
1063
    def _cleanEnvironment(self):
 
1064
        new_env = {
 
1065
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1066
            'HOME': os.getcwd(),
 
1067
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1068
            'BZR_EMAIL': None,
 
1069
            'BZREMAIL': None, # may still be present in the environment
 
1070
            'EMAIL': None,
 
1071
            'BZR_PROGRESS_BAR': None,
 
1072
            # Proxies
 
1073
            'http_proxy': None,
 
1074
            'HTTP_PROXY': None,
 
1075
            'https_proxy': None,
 
1076
            'HTTPS_PROXY': None,
 
1077
            'no_proxy': None,
 
1078
            'NO_PROXY': None,
 
1079
            'all_proxy': None,
 
1080
            'ALL_PROXY': None,
 
1081
            # Nobody cares about these ones AFAIK. So far at
 
1082
            # least. If you do (care), please update this comment
 
1083
            # -- vila 20061212
 
1084
            'ftp_proxy': None,
 
1085
            'FTP_PROXY': None,
 
1086
        }
 
1087
        self.__old_env = {}
 
1088
        self.addCleanup(self._restoreEnvironment)
 
1089
        for name, value in new_env.iteritems():
 
1090
            self._captureVar(name, value)
 
1091
 
 
1092
    def _captureVar(self, name, newvalue):
 
1093
        """Set an environment variable, and reset it when finished."""
 
1094
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1095
 
 
1096
    def _restoreEnvironment(self):
 
1097
        for name, value in self.__old_env.iteritems():
 
1098
            osutils.set_or_unset_env(name, value)
 
1099
 
 
1100
    def _restoreHooks(self):
 
1101
        for klass, hooks in self._preserved_hooks.items():
 
1102
            setattr(klass, 'hooks', hooks)
 
1103
 
 
1104
    def knownFailure(self, reason):
 
1105
        """This test has failed for some known reason."""
 
1106
        raise KnownFailure(reason)
 
1107
 
 
1108
    def run(self, result=None):
 
1109
        if result is None: result = self.defaultTestResult()
 
1110
        for feature in getattr(self, '_test_needs_features', []):
 
1111
            if not feature.available():
 
1112
                result.startTest(self)
 
1113
                if getattr(result, 'addNotSupported', None):
 
1114
                    result.addNotSupported(self, feature)
 
1115
                else:
 
1116
                    result.addSuccess(self)
 
1117
                result.stopTest(self)
 
1118
                return
 
1119
        return unittest.TestCase.run(self, result)
 
1120
 
 
1121
    def tearDown(self):
 
1122
        self._runCleanups()
 
1123
        unittest.TestCase.tearDown(self)
 
1124
 
 
1125
    def time(self, callable, *args, **kwargs):
 
1126
        """Run callable and accrue the time it takes to the benchmark time.
 
1127
        
 
1128
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1129
        this will cause lsprofile statistics to be gathered and stored in
 
1130
        self._benchcalls.
 
1131
        """
 
1132
        if self._benchtime is None:
 
1133
            self._benchtime = 0
 
1134
        start = time.time()
 
1135
        try:
 
1136
            if not self._gather_lsprof_in_benchmarks:
 
1137
                return callable(*args, **kwargs)
 
1138
            else:
 
1139
                # record this benchmark
 
1140
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1141
                stats.sort()
 
1142
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1143
                return ret
 
1144
        finally:
 
1145
            self._benchtime += time.time() - start
 
1146
 
 
1147
    def _runCleanups(self):
 
1148
        """Run registered cleanup functions. 
 
1149
 
 
1150
        This should only be called from TestCase.tearDown.
 
1151
        """
 
1152
        # TODO: Perhaps this should keep running cleanups even if 
 
1153
        # one of them fails?
 
1154
 
 
1155
        # Actually pop the cleanups from the list so tearDown running
 
1156
        # twice is safe (this happens for skipped tests).
 
1157
        while self._cleanups:
 
1158
            self._cleanups.pop()()
 
1159
 
 
1160
    def log(self, *args):
 
1161
        mutter(*args)
 
1162
 
 
1163
    def _get_log(self, keep_log_file=False):
 
1164
        """Return as a string the log for this test. If the file is still
 
1165
        on disk and keep_log_file=False, delete the log file and store the
 
1166
        content in self._log_contents."""
 
1167
        # flush the log file, to get all content
 
1168
        import bzrlib.trace
 
1169
        bzrlib.trace._trace_file.flush()
 
1170
        if self._log_contents:
 
1171
            return self._log_contents
 
1172
        if self._log_file_name is not None:
 
1173
            logfile = open(self._log_file_name)
 
1174
            try:
 
1175
                log_contents = logfile.read()
 
1176
            finally:
 
1177
                logfile.close()
 
1178
            if not keep_log_file:
 
1179
                self._log_contents = log_contents
 
1180
                try:
 
1181
                    os.remove(self._log_file_name)
 
1182
                except OSError, e:
 
1183
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1184
                        print >>sys.stderr, ('Unable to delete log file '
 
1185
                                             ' %r' % self._log_file_name)
 
1186
                    else:
 
1187
                        raise
 
1188
            return log_contents
 
1189
        else:
 
1190
            return "DELETED log file to reduce memory footprint"
 
1191
 
 
1192
    def capture(self, cmd, retcode=0):
 
1193
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1194
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1195
 
 
1196
    def requireFeature(self, feature):
 
1197
        """This test requires a specific feature is available.
 
1198
 
 
1199
        :raises UnavailableFeature: When feature is not available.
 
1200
        """
 
1201
        if not feature.available():
 
1202
            raise UnavailableFeature(feature)
 
1203
 
 
1204
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1205
                         working_dir=None):
 
1206
        """Invoke bzr and return (stdout, stderr).
 
1207
 
 
1208
        Useful for code that wants to check the contents of the
 
1209
        output, the way error messages are presented, etc.
 
1210
 
 
1211
        This should be the main method for tests that want to exercise the
 
1212
        overall behavior of the bzr application (rather than a unit test
 
1213
        or a functional test of the library.)
 
1214
 
 
1215
        Much of the old code runs bzr by forking a new copy of Python, but
 
1216
        that is slower, harder to debug, and generally not necessary.
 
1217
 
 
1218
        This runs bzr through the interface that catches and reports
 
1219
        errors, and with logging set to something approximating the
 
1220
        default, so that error reporting can be checked.
 
1221
 
 
1222
        :param argv: arguments to invoke bzr
 
1223
        :param retcode: expected return code, or None for don't-care.
 
1224
        :param encoding: encoding for sys.stdout and sys.stderr
 
1225
        :param stdin: A string to be used as stdin for the command.
 
1226
        :param working_dir: Change to this directory before running
 
1227
        """
 
1228
        if encoding is None:
 
1229
            encoding = bzrlib.user_encoding
 
1230
        stdout = StringIOWrapper()
 
1231
        stderr = StringIOWrapper()
 
1232
        stdout.encoding = encoding
 
1233
        stderr.encoding = encoding
 
1234
 
 
1235
        self.log('run bzr: %r', argv)
 
1236
        # FIXME: don't call into logging here
 
1237
        handler = logging.StreamHandler(stderr)
 
1238
        handler.setLevel(logging.INFO)
 
1239
        logger = logging.getLogger('')
 
1240
        logger.addHandler(handler)
 
1241
        old_ui_factory = ui.ui_factory
 
1242
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1243
 
 
1244
        cwd = None
 
1245
        if working_dir is not None:
 
1246
            cwd = osutils.getcwd()
 
1247
            os.chdir(working_dir)
 
1248
 
 
1249
        try:
 
1250
            saved_debug_flags = frozenset(debug.debug_flags)
 
1251
            debug.debug_flags.clear()
 
1252
            try:
 
1253
                result = self.apply_redirected(ui.ui_factory.stdin,
 
1254
                                               stdout, stderr,
 
1255
                                               bzrlib.commands.run_bzr_catch_errors,
 
1256
                                               argv)
 
1257
            finally:
 
1258
                debug.debug_flags.update(saved_debug_flags)
 
1259
        finally:
 
1260
            logger.removeHandler(handler)
 
1261
            ui.ui_factory = old_ui_factory
 
1262
            if cwd is not None:
 
1263
                os.chdir(cwd)
 
1264
 
 
1265
        out = stdout.getvalue()
 
1266
        err = stderr.getvalue()
 
1267
        if out:
 
1268
            self.log('output:\n%r', out)
 
1269
        if err:
 
1270
            self.log('errors:\n%r', err)
 
1271
        if retcode is not None:
 
1272
            self.assertEquals(retcode, result)
 
1273
        return out, err
 
1274
 
 
1275
    def run_bzr(self, *args, **kwargs):
 
1276
        """Invoke bzr, as if it were run from the command line.
 
1277
 
 
1278
        This should be the main method for tests that want to exercise the
 
1279
        overall behavior of the bzr application (rather than a unit test
 
1280
        or a functional test of the library.)
 
1281
 
 
1282
        This sends the stdout/stderr results into the test's log,
 
1283
        where it may be useful for debugging.  See also run_captured.
 
1284
 
 
1285
        :param stdin: A string to be used as stdin for the command.
 
1286
        :param retcode: The status code the command should return
 
1287
        :param working_dir: The directory to run the command in
 
1288
        """
 
1289
        retcode = kwargs.pop('retcode', 0)
 
1290
        encoding = kwargs.pop('encoding', None)
 
1291
        stdin = kwargs.pop('stdin', None)
 
1292
        working_dir = kwargs.pop('working_dir', None)
 
1293
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
1294
                                     stdin=stdin, working_dir=working_dir)
 
1295
 
 
1296
    def run_bzr_decode(self, *args, **kwargs):
 
1297
        if 'encoding' in kwargs:
 
1298
            encoding = kwargs['encoding']
 
1299
        else:
 
1300
            encoding = bzrlib.user_encoding
 
1301
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1302
 
 
1303
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1304
        """Run bzr, and check that stderr contains the supplied regexes
 
1305
        
 
1306
        :param error_regexes: Sequence of regular expressions which 
 
1307
            must each be found in the error output. The relative ordering
 
1308
            is not enforced.
 
1309
        :param args: command-line arguments for bzr
 
1310
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1311
            This function changes the default value of retcode to be 3,
 
1312
            since in most cases this is run when you expect bzr to fail.
 
1313
        :return: (out, err) The actual output of running the command (in case you
 
1314
                 want to do more inspection)
 
1315
 
 
1316
        Examples of use:
 
1317
            # Make sure that commit is failing because there is nothing to do
 
1318
            self.run_bzr_error(['no changes to commit'],
 
1319
                               'commit', '-m', 'my commit comment')
 
1320
            # Make sure --strict is handling an unknown file, rather than
 
1321
            # giving us the 'nothing to do' error
 
1322
            self.build_tree(['unknown'])
 
1323
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1324
                               'commit', '--strict', '-m', 'my commit comment')
 
1325
        """
 
1326
        kwargs.setdefault('retcode', 3)
 
1327
        out, err = self.run_bzr(*args, **kwargs)
 
1328
        for regex in error_regexes:
 
1329
            self.assertContainsRe(err, regex)
 
1330
        return out, err
 
1331
 
 
1332
    def run_bzr_subprocess(self, *args, **kwargs):
 
1333
        """Run bzr in a subprocess for testing.
 
1334
 
 
1335
        This starts a new Python interpreter and runs bzr in there. 
 
1336
        This should only be used for tests that have a justifiable need for
 
1337
        this isolation: e.g. they are testing startup time, or signal
 
1338
        handling, or early startup code, etc.  Subprocess code can't be 
 
1339
        profiled or debugged so easily.
 
1340
 
 
1341
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1342
            None is supplied, the status code is not checked.
 
1343
        :param env_changes: A dictionary which lists changes to environment
 
1344
            variables. A value of None will unset the env variable.
 
1345
            The values must be strings. The change will only occur in the
 
1346
            child, so you don't need to fix the environment after running.
 
1347
        :param universal_newlines: Convert CRLF => LF
 
1348
        :param allow_plugins: By default the subprocess is run with
 
1349
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1350
            for system-wide plugins to create unexpected output on stderr,
 
1351
            which can cause unnecessary test failures.
 
1352
        """
 
1353
        env_changes = kwargs.get('env_changes', {})
 
1354
        working_dir = kwargs.get('working_dir', None)
 
1355
        allow_plugins = kwargs.get('allow_plugins', False)
 
1356
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1357
                                            working_dir=working_dir,
 
1358
                                            allow_plugins=allow_plugins)
 
1359
        # We distinguish between retcode=None and retcode not passed.
 
1360
        supplied_retcode = kwargs.get('retcode', 0)
 
1361
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1362
            universal_newlines=kwargs.get('universal_newlines', False),
 
1363
            process_args=args)
 
1364
 
 
1365
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1366
                             skip_if_plan_to_signal=False,
 
1367
                             working_dir=None,
 
1368
                             allow_plugins=False):
 
1369
        """Start bzr in a subprocess for testing.
 
1370
 
 
1371
        This starts a new Python interpreter and runs bzr in there.
 
1372
        This should only be used for tests that have a justifiable need for
 
1373
        this isolation: e.g. they are testing startup time, or signal
 
1374
        handling, or early startup code, etc.  Subprocess code can't be
 
1375
        profiled or debugged so easily.
 
1376
 
 
1377
        :param process_args: a list of arguments to pass to the bzr executable,
 
1378
            for example `['--version']`.
 
1379
        :param env_changes: A dictionary which lists changes to environment
 
1380
            variables. A value of None will unset the env variable.
 
1381
            The values must be strings. The change will only occur in the
 
1382
            child, so you don't need to fix the environment after running.
 
1383
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1384
            is not available.
 
1385
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1386
 
 
1387
        :returns: Popen object for the started process.
 
1388
        """
 
1389
        if skip_if_plan_to_signal:
 
1390
            if not getattr(os, 'kill', None):
 
1391
                raise TestSkipped("os.kill not available.")
 
1392
 
 
1393
        if env_changes is None:
 
1394
            env_changes = {}
 
1395
        old_env = {}
 
1396
 
 
1397
        def cleanup_environment():
 
1398
            for env_var, value in env_changes.iteritems():
 
1399
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1400
 
 
1401
        def restore_environment():
 
1402
            for env_var, value in old_env.iteritems():
 
1403
                osutils.set_or_unset_env(env_var, value)
 
1404
 
 
1405
        bzr_path = self.get_bzr_path()
 
1406
 
 
1407
        cwd = None
 
1408
        if working_dir is not None:
 
1409
            cwd = osutils.getcwd()
 
1410
            os.chdir(working_dir)
 
1411
 
 
1412
        try:
 
1413
            # win32 subprocess doesn't support preexec_fn
 
1414
            # so we will avoid using it on all platforms, just to
 
1415
            # make sure the code path is used, and we don't break on win32
 
1416
            cleanup_environment()
 
1417
            command = [sys.executable, bzr_path]
 
1418
            if not allow_plugins:
 
1419
                command.append('--no-plugins')
 
1420
            command.extend(process_args)
 
1421
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1422
        finally:
 
1423
            restore_environment()
 
1424
            if cwd is not None:
 
1425
                os.chdir(cwd)
 
1426
 
 
1427
        return process
 
1428
 
 
1429
    def _popen(self, *args, **kwargs):
 
1430
        """Place a call to Popen.
 
1431
 
 
1432
        Allows tests to override this method to intercept the calls made to
 
1433
        Popen for introspection.
 
1434
        """
 
1435
        return Popen(*args, **kwargs)
 
1436
 
 
1437
    def get_bzr_path(self):
 
1438
        """Return the path of the 'bzr' executable for this test suite."""
 
1439
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1440
        if not os.path.isfile(bzr_path):
 
1441
            # We are probably installed. Assume sys.argv is the right file
 
1442
            bzr_path = sys.argv[0]
 
1443
        return bzr_path
 
1444
 
 
1445
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1446
                              universal_newlines=False, process_args=None):
 
1447
        """Finish the execution of process.
 
1448
 
 
1449
        :param process: the Popen object returned from start_bzr_subprocess.
 
1450
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1451
            None is supplied, the status code is not checked.
 
1452
        :param send_signal: an optional signal to send to the process.
 
1453
        :param universal_newlines: Convert CRLF => LF
 
1454
        :returns: (stdout, stderr)
 
1455
        """
 
1456
        if send_signal is not None:
 
1457
            os.kill(process.pid, send_signal)
 
1458
        out, err = process.communicate()
 
1459
 
 
1460
        if universal_newlines:
 
1461
            out = out.replace('\r\n', '\n')
 
1462
            err = err.replace('\r\n', '\n')
 
1463
 
 
1464
        if retcode is not None and retcode != process.returncode:
 
1465
            if process_args is None:
 
1466
                process_args = "(unknown args)"
 
1467
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1468
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1469
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1470
                      % (process_args, retcode, process.returncode))
 
1471
        return [out, err]
 
1472
 
 
1473
    def check_inventory_shape(self, inv, shape):
 
1474
        """Compare an inventory to a list of expected names.
 
1475
 
 
1476
        Fail if they are not precisely equal.
 
1477
        """
 
1478
        extras = []
 
1479
        shape = list(shape)             # copy
 
1480
        for path, ie in inv.entries():
 
1481
            name = path.replace('\\', '/')
 
1482
            if ie.kind == 'dir':
 
1483
                name = name + '/'
 
1484
            if name in shape:
 
1485
                shape.remove(name)
 
1486
            else:
 
1487
                extras.append(name)
 
1488
        if shape:
 
1489
            self.fail("expected paths not found in inventory: %r" % shape)
 
1490
        if extras:
 
1491
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1492
 
 
1493
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1494
                         a_callable=None, *args, **kwargs):
 
1495
        """Call callable with redirected std io pipes.
 
1496
 
 
1497
        Returns the return code."""
 
1498
        if not callable(a_callable):
 
1499
            raise ValueError("a_callable must be callable.")
 
1500
        if stdin is None:
 
1501
            stdin = StringIO("")
 
1502
        if stdout is None:
 
1503
            if getattr(self, "_log_file", None) is not None:
 
1504
                stdout = self._log_file
 
1505
            else:
 
1506
                stdout = StringIO()
 
1507
        if stderr is None:
 
1508
            if getattr(self, "_log_file", None is not None):
 
1509
                stderr = self._log_file
 
1510
            else:
 
1511
                stderr = StringIO()
 
1512
        real_stdin = sys.stdin
 
1513
        real_stdout = sys.stdout
 
1514
        real_stderr = sys.stderr
 
1515
        try:
 
1516
            sys.stdout = stdout
 
1517
            sys.stderr = stderr
 
1518
            sys.stdin = stdin
 
1519
            return a_callable(*args, **kwargs)
 
1520
        finally:
 
1521
            sys.stdout = real_stdout
 
1522
            sys.stderr = real_stderr
 
1523
            sys.stdin = real_stdin
 
1524
 
 
1525
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1526
    def merge(self, branch_from, wt_to):
 
1527
        """A helper for tests to do a ui-less merge.
 
1528
 
 
1529
        This should move to the main library when someone has time to integrate
 
1530
        it in.
 
1531
        """
 
1532
        # minimal ui-less merge.
 
1533
        wt_to.branch.fetch(branch_from)
 
1534
        base_rev = common_ancestor(branch_from.last_revision(),
 
1535
                                   wt_to.branch.last_revision(),
 
1536
                                   wt_to.branch.repository)
 
1537
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1538
                    wt_to.branch.repository.revision_tree(base_rev),
 
1539
                    this_tree=wt_to)
 
1540
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1541
 
 
1542
    def reduceLockdirTimeout(self):
 
1543
        """Reduce the default lock timeout for the duration of the test, so that
 
1544
        if LockContention occurs during a test, it does so quickly.
 
1545
 
 
1546
        Tests that expect to provoke LockContention errors should call this.
 
1547
        """
 
1548
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1549
        def resetTimeout():
 
1550
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1551
        self.addCleanup(resetTimeout)
 
1552
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1553
 
 
1554
BzrTestBase = TestCase
 
1555
 
 
1556
 
 
1557
class TestCaseWithMemoryTransport(TestCase):
 
1558
    """Common test class for tests that do not need disk resources.
 
1559
 
 
1560
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1561
 
 
1562
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1563
 
 
1564
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1565
    a directory which does not exist. This serves to help ensure test isolation
 
1566
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1567
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1568
    file defaults for the transport in tests, nor does it obey the command line
 
1569
    override, so tests that accidentally write to the common directory should
 
1570
    be rare.
 
1571
    """
 
1572
 
 
1573
    TEST_ROOT = None
 
1574
    _TEST_NAME = 'test'
 
1575
 
 
1576
 
 
1577
    def __init__(self, methodName='runTest'):
 
1578
        # allow test parameterisation after test construction and before test
 
1579
        # execution. Variables that the parameteriser sets need to be 
 
1580
        # ones that are not set by setUp, or setUp will trash them.
 
1581
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1582
        self.vfs_transport_factory = default_transport
 
1583
        self.transport_server = None
 
1584
        self.transport_readonly_server = None
 
1585
        self.__vfs_server = None
 
1586
 
 
1587
    def get_transport(self):
 
1588
        """Return a writeable transport for the test scratch space"""
 
1589
        t = get_transport(self.get_url())
 
1590
        self.assertFalse(t.is_readonly())
 
1591
        return t
 
1592
 
 
1593
    def get_readonly_transport(self):
 
1594
        """Return a readonly transport for the test scratch space
 
1595
        
 
1596
        This can be used to test that operations which should only need
 
1597
        readonly access in fact do not try to write.
 
1598
        """
 
1599
        t = get_transport(self.get_readonly_url())
 
1600
        self.assertTrue(t.is_readonly())
 
1601
        return t
 
1602
 
 
1603
    def create_transport_readonly_server(self):
 
1604
        """Create a transport server from class defined at init.
 
1605
 
 
1606
        This is mostly a hook for daughter classes.
 
1607
        """
 
1608
        return self.transport_readonly_server()
 
1609
 
 
1610
    def get_readonly_server(self):
 
1611
        """Get the server instance for the readonly transport
 
1612
 
 
1613
        This is useful for some tests with specific servers to do diagnostics.
 
1614
        """
 
1615
        if self.__readonly_server is None:
 
1616
            if self.transport_readonly_server is None:
 
1617
                # readonly decorator requested
 
1618
                # bring up the server
 
1619
                self.__readonly_server = ReadonlyServer()
 
1620
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1621
            else:
 
1622
                self.__readonly_server = self.create_transport_readonly_server()
 
1623
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1624
            self.addCleanup(self.__readonly_server.tearDown)
 
1625
        return self.__readonly_server
 
1626
 
 
1627
    def get_readonly_url(self, relpath=None):
 
1628
        """Get a URL for the readonly transport.
 
1629
 
 
1630
        This will either be backed by '.' or a decorator to the transport 
 
1631
        used by self.get_url()
 
1632
        relpath provides for clients to get a path relative to the base url.
 
1633
        These should only be downwards relative, not upwards.
 
1634
        """
 
1635
        base = self.get_readonly_server().get_url()
 
1636
        if relpath is not None:
 
1637
            if not base.endswith('/'):
 
1638
                base = base + '/'
 
1639
            base = base + relpath
 
1640
        return base
 
1641
 
 
1642
    def get_vfs_only_server(self):
 
1643
        """Get the vfs only read/write server instance.
 
1644
 
 
1645
        This is useful for some tests with specific servers that need
 
1646
        diagnostics.
 
1647
 
 
1648
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1649
        is no means to override it.
 
1650
        """
 
1651
        if self.__vfs_server is None:
 
1652
            self.__vfs_server = MemoryServer()
 
1653
            self.__vfs_server.setUp()
 
1654
            self.addCleanup(self.__vfs_server.tearDown)
 
1655
        return self.__vfs_server
 
1656
 
 
1657
    def get_server(self):
 
1658
        """Get the read/write server instance.
 
1659
 
 
1660
        This is useful for some tests with specific servers that need
 
1661
        diagnostics.
 
1662
 
 
1663
        This is built from the self.transport_server factory. If that is None,
 
1664
        then the self.get_vfs_server is returned.
 
1665
        """
 
1666
        if self.__server is None:
 
1667
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1668
                return self.get_vfs_only_server()
 
1669
            else:
 
1670
                # bring up a decorated means of access to the vfs only server.
 
1671
                self.__server = self.transport_server()
 
1672
                try:
 
1673
                    self.__server.setUp(self.get_vfs_only_server())
 
1674
                except TypeError, e:
 
1675
                    # This should never happen; the try:Except here is to assist
 
1676
                    # developers having to update code rather than seeing an
 
1677
                    # uninformative TypeError.
 
1678
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1679
            self.addCleanup(self.__server.tearDown)
 
1680
        return self.__server
 
1681
 
 
1682
    def _adjust_url(self, base, relpath):
 
1683
        """Get a URL (or maybe a path) for the readwrite transport.
 
1684
 
 
1685
        This will either be backed by '.' or to an equivalent non-file based
 
1686
        facility.
 
1687
        relpath provides for clients to get a path relative to the base url.
 
1688
        These should only be downwards relative, not upwards.
 
1689
        """
 
1690
        if relpath is not None and relpath != '.':
 
1691
            if not base.endswith('/'):
 
1692
                base = base + '/'
 
1693
            # XXX: Really base should be a url; we did after all call
 
1694
            # get_url()!  But sometimes it's just a path (from
 
1695
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1696
            # to a non-escaped local path.
 
1697
            if base.startswith('./') or base.startswith('/'):
 
1698
                base += relpath
 
1699
            else:
 
1700
                base += urlutils.escape(relpath)
 
1701
        return base
 
1702
 
 
1703
    def get_url(self, relpath=None):
 
1704
        """Get a URL (or maybe a path) for the readwrite transport.
 
1705
 
 
1706
        This will either be backed by '.' or to an equivalent non-file based
 
1707
        facility.
 
1708
        relpath provides for clients to get a path relative to the base url.
 
1709
        These should only be downwards relative, not upwards.
 
1710
        """
 
1711
        base = self.get_server().get_url()
 
1712
        return self._adjust_url(base, relpath)
 
1713
 
 
1714
    def get_vfs_only_url(self, relpath=None):
 
1715
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1716
 
 
1717
        This will never be a smart protocol.
 
1718
        :param relpath: provides for clients to get a path relative to the base
 
1719
            url.  These should only be downwards relative, not upwards.
 
1720
        """
 
1721
        base = self.get_vfs_only_server().get_url()
 
1722
        return self._adjust_url(base, relpath)
 
1723
 
 
1724
    def _make_test_root(self):
 
1725
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1726
            return
 
1727
        i = 0
 
1728
        while True:
 
1729
            root = u'test%04d.tmp' % i
 
1730
            try:
 
1731
                os.mkdir(root)
 
1732
            except OSError, e:
 
1733
                if e.errno == errno.EEXIST:
 
1734
                    i += 1
 
1735
                    continue
 
1736
                else:
 
1737
                    raise
 
1738
            # successfully created
 
1739
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1740
            break
 
1741
        # make a fake bzr directory there to prevent any tests propagating
 
1742
        # up onto the source directory's real branch
 
1743
        bzrdir.BzrDir.create_standalone_workingtree(
 
1744
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1745
 
 
1746
    def makeAndChdirToTestDir(self):
 
1747
        """Create a temporary directories for this one test.
 
1748
        
 
1749
        This must set self.test_home_dir and self.test_dir and chdir to
 
1750
        self.test_dir.
 
1751
        
 
1752
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1753
        """
 
1754
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1755
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1756
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1757
        
 
1758
    def make_branch(self, relpath, format=None):
 
1759
        """Create a branch on the transport at relpath."""
 
1760
        repo = self.make_repository(relpath, format=format)
 
1761
        return repo.bzrdir.create_branch()
 
1762
 
 
1763
    def make_bzrdir(self, relpath, format=None):
 
1764
        try:
 
1765
            # might be a relative or absolute path
 
1766
            maybe_a_url = self.get_url(relpath)
 
1767
            segments = maybe_a_url.rsplit('/', 1)
 
1768
            t = get_transport(maybe_a_url)
 
1769
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1770
                try:
 
1771
                    t.mkdir('.')
 
1772
                except errors.FileExists:
 
1773
                    pass
 
1774
            if format is None:
 
1775
                format = 'default'
 
1776
            if isinstance(format, basestring):
 
1777
                format = bzrdir.format_registry.make_bzrdir(format)
 
1778
            return format.initialize_on_transport(t)
 
1779
        except errors.UninitializableFormat:
 
1780
            raise TestSkipped("Format %s is not initializable." % format)
 
1781
 
 
1782
    def make_repository(self, relpath, shared=False, format=None):
 
1783
        """Create a repository on our default transport at relpath."""
 
1784
        made_control = self.make_bzrdir(relpath, format=format)
 
1785
        return made_control.create_repository(shared=shared)
 
1786
 
 
1787
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1788
        """Create a branch on the default transport and a MemoryTree for it."""
 
1789
        b = self.make_branch(relpath, format=format)
 
1790
        return memorytree.MemoryTree.create_on_branch(b)
 
1791
 
 
1792
    def overrideEnvironmentForTesting(self):
 
1793
        os.environ['HOME'] = self.test_home_dir
 
1794
        os.environ['BZR_HOME'] = self.test_home_dir
 
1795
        
 
1796
    def setUp(self):
 
1797
        super(TestCaseWithMemoryTransport, self).setUp()
 
1798
        self._make_test_root()
 
1799
        _currentdir = os.getcwdu()
 
1800
        def _leaveDirectory():
 
1801
            os.chdir(_currentdir)
 
1802
        self.addCleanup(_leaveDirectory)
 
1803
        self.makeAndChdirToTestDir()
 
1804
        self.overrideEnvironmentForTesting()
 
1805
        self.__readonly_server = None
 
1806
        self.__server = None
 
1807
        self.reduceLockdirTimeout()
 
1808
 
 
1809
     
 
1810
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1811
    """Derived class that runs a test within a temporary directory.
 
1812
 
 
1813
    This is useful for tests that need to create a branch, etc.
 
1814
 
 
1815
    The directory is created in a slightly complex way: for each
 
1816
    Python invocation, a new temporary top-level directory is created.
 
1817
    All test cases create their own directory within that.  If the
 
1818
    tests complete successfully, the directory is removed.
 
1819
 
 
1820
    InTempDir is an old alias for FunctionalTestCase.
 
1821
    """
 
1822
 
 
1823
    OVERRIDE_PYTHON = 'python'
 
1824
    use_numbered_dirs = False
 
1825
 
 
1826
    def check_file_contents(self, filename, expect):
 
1827
        self.log("check contents of file %s" % filename)
 
1828
        contents = file(filename, 'r').read()
 
1829
        if contents != expect:
 
1830
            self.log("expected: %r" % expect)
 
1831
            self.log("actually: %r" % contents)
 
1832
            self.fail("contents of %s not as expected" % filename)
 
1833
 
 
1834
    def makeAndChdirToTestDir(self):
 
1835
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1836
        
 
1837
        For TestCaseInTempDir we create a temporary directory based on the test
 
1838
        name and then create two subdirs - test and home under it.
 
1839
        """
 
1840
        if self.use_numbered_dirs:  # strongly recommended on Windows
 
1841
                                    # due the path length limitation (260 ch.)
 
1842
            candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
 
1843
                                             int(self.number/1000),
 
1844
                                             self.number)
 
1845
            os.makedirs(candidate_dir)
 
1846
            self.test_home_dir = candidate_dir + '/home'
 
1847
            os.mkdir(self.test_home_dir)
 
1848
            self.test_dir = candidate_dir + '/work'
 
1849
            os.mkdir(self.test_dir)
 
1850
            os.chdir(self.test_dir)
 
1851
            # put name of test inside
 
1852
            f = file(candidate_dir + '/name', 'w')
 
1853
            f.write(self.id())
 
1854
            f.close()
 
1855
            return
 
1856
        # Else NAMED DIRS
 
1857
        # shorten the name, to avoid test failures due to path length
 
1858
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1859
                   .replace('__main__.', '')[-100:]
 
1860
        # it's possible the same test class is run several times for
 
1861
        # parameterized tests, so make sure the names don't collide.  
 
1862
        i = 0
 
1863
        while True:
 
1864
            if i > 0:
 
1865
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1866
            else:
 
1867
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1868
            if os.path.exists(candidate_dir):
 
1869
                i = i + 1
 
1870
                continue
 
1871
            else:
 
1872
                os.mkdir(candidate_dir)
 
1873
                self.test_home_dir = candidate_dir + '/home'
 
1874
                os.mkdir(self.test_home_dir)
 
1875
                self.test_dir = candidate_dir + '/work'
 
1876
                os.mkdir(self.test_dir)
 
1877
                os.chdir(self.test_dir)
 
1878
                break
 
1879
 
 
1880
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1881
        """Build a test tree according to a pattern.
 
1882
 
 
1883
        shape is a sequence of file specifications.  If the final
 
1884
        character is '/', a directory is created.
 
1885
 
 
1886
        This assumes that all the elements in the tree being built are new.
 
1887
 
 
1888
        This doesn't add anything to a branch.
 
1889
        :param line_endings: Either 'binary' or 'native'
 
1890
                             in binary mode, exact contents are written
 
1891
                             in native mode, the line endings match the
 
1892
                             default platform endings.
 
1893
 
 
1894
        :param transport: A transport to write to, for building trees on 
 
1895
                          VFS's. If the transport is readonly or None,
 
1896
                          "." is opened automatically.
 
1897
        """
 
1898
        # It's OK to just create them using forward slashes on windows.
 
1899
        if transport is None or transport.is_readonly():
 
1900
            transport = get_transport(".")
 
1901
        for name in shape:
 
1902
            self.assert_(isinstance(name, basestring))
 
1903
            if name[-1] == '/':
 
1904
                transport.mkdir(urlutils.escape(name[:-1]))
 
1905
            else:
 
1906
                if line_endings == 'binary':
 
1907
                    end = '\n'
 
1908
                elif line_endings == 'native':
 
1909
                    end = os.linesep
 
1910
                else:
 
1911
                    raise errors.BzrError(
 
1912
                        'Invalid line ending request %r' % line_endings)
 
1913
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1914
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1915
 
 
1916
    def build_tree_contents(self, shape):
 
1917
        build_tree_contents(shape)
 
1918
 
 
1919
    def assertFileEqual(self, content, path):
 
1920
        """Fail if path does not contain 'content'."""
 
1921
        self.failUnlessExists(path)
 
1922
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1923
        f = file(path, 'r')
 
1924
        try:
 
1925
            s = f.read()
 
1926
        finally:
 
1927
            f.close()
 
1928
        self.assertEqualDiff(content, s)
 
1929
 
 
1930
    def failUnlessExists(self, path):
 
1931
        """Fail unless path, which may be abs or relative, exists."""
 
1932
        self.failUnless(osutils.lexists(path),path+" does not exist")
 
1933
 
 
1934
    def failIfExists(self, path):
 
1935
        """Fail if path, which may be abs or relative, exists."""
 
1936
        self.failIf(osutils.lexists(path),path+" exists")
 
1937
 
 
1938
 
 
1939
class TestCaseWithTransport(TestCaseInTempDir):
 
1940
    """A test case that provides get_url and get_readonly_url facilities.
 
1941
 
 
1942
    These back onto two transport servers, one for readonly access and one for
 
1943
    read write access.
 
1944
 
 
1945
    If no explicit class is provided for readonly access, a
 
1946
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1947
    based read write transports.
 
1948
 
 
1949
    If an explicit class is provided for readonly access, that server and the 
 
1950
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1951
    """
 
1952
 
 
1953
    def get_vfs_only_server(self):
 
1954
        """See TestCaseWithMemoryTransport.
 
1955
 
 
1956
        This is useful for some tests with specific servers that need
 
1957
        diagnostics.
 
1958
        """
 
1959
        if self.__vfs_server is None:
 
1960
            self.__vfs_server = self.vfs_transport_factory()
 
1961
            self.__vfs_server.setUp()
 
1962
            self.addCleanup(self.__vfs_server.tearDown)
 
1963
        return self.__vfs_server
 
1964
 
 
1965
    def make_branch_and_tree(self, relpath, format=None):
 
1966
        """Create a branch on the transport and a tree locally.
 
1967
 
 
1968
        If the transport is not a LocalTransport, the Tree can't be created on
 
1969
        the transport.  In that case if the vfs_transport_factory is
 
1970
        LocalURLServer the working tree is created in the local
 
1971
        directory backing the transport, and the returned tree's branch and
 
1972
        repository will also be accessed locally. Otherwise a lightweight
 
1973
        checkout is created and returned.
 
1974
 
 
1975
        :param format: The BzrDirFormat.
 
1976
        :returns: the WorkingTree.
 
1977
        """
 
1978
        # TODO: always use the local disk path for the working tree,
 
1979
        # this obviously requires a format that supports branch references
 
1980
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1981
        # RBC 20060208
 
1982
        b = self.make_branch(relpath, format=format)
 
1983
        try:
 
1984
            return b.bzrdir.create_workingtree()
 
1985
        except errors.NotLocalUrl:
 
1986
            # We can only make working trees locally at the moment.  If the
 
1987
            # transport can't support them, then we keep the non-disk-backed
 
1988
            # branch and create a local checkout.
 
1989
            if self.vfs_transport_factory is LocalURLServer:
 
1990
                # the branch is colocated on disk, we cannot create a checkout.
 
1991
                # hopefully callers will expect this.
 
1992
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
1993
                return local_controldir.create_workingtree()
 
1994
            else:
 
1995
                return b.create_checkout(relpath, lightweight=True)
 
1996
 
 
1997
    def assertIsDirectory(self, relpath, transport):
 
1998
        """Assert that relpath within transport is a directory.
 
1999
 
 
2000
        This may not be possible on all transports; in that case it propagates
 
2001
        a TransportNotPossible.
 
2002
        """
 
2003
        try:
 
2004
            mode = transport.stat(relpath).st_mode
 
2005
        except errors.NoSuchFile:
 
2006
            self.fail("path %s is not a directory; no such file"
 
2007
                      % (relpath))
 
2008
        if not stat.S_ISDIR(mode):
 
2009
            self.fail("path %s is not a directory; has mode %#o"
 
2010
                      % (relpath, mode))
 
2011
 
 
2012
    def assertTreesEqual(self, left, right):
 
2013
        """Check that left and right have the same content and properties."""
 
2014
        # we use a tree delta to check for equality of the content, and we
 
2015
        # manually check for equality of other things such as the parents list.
 
2016
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2017
        differences = left.changes_from(right)
 
2018
        self.assertFalse(differences.has_changed(),
 
2019
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2020
 
 
2021
    def setUp(self):
 
2022
        super(TestCaseWithTransport, self).setUp()
 
2023
        self.__vfs_server = None
 
2024
 
 
2025
 
 
2026
class ChrootedTestCase(TestCaseWithTransport):
 
2027
    """A support class that provides readonly urls outside the local namespace.
 
2028
 
 
2029
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2030
    is then we are chrooted already, if it is not then an HttpServer is used
 
2031
    for readonly urls.
 
2032
 
 
2033
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2034
                       be used without needed to redo it when a different 
 
2035
                       subclass is in use ?
 
2036
    """
 
2037
 
 
2038
    def setUp(self):
 
2039
        super(ChrootedTestCase, self).setUp()
 
2040
        if not self.vfs_transport_factory == MemoryServer:
 
2041
            self.transport_readonly_server = HttpServer
 
2042
 
 
2043
 
 
2044
def filter_suite_by_re(suite, pattern):
 
2045
    result = TestUtil.TestSuite()
 
2046
    filter_re = re.compile(pattern)
 
2047
    for test in iter_suite_tests(suite):
 
2048
        if filter_re.search(test.id()):
 
2049
            result.addTest(test)
 
2050
    return result
 
2051
 
 
2052
 
 
2053
def sort_suite_by_re(suite, pattern):
 
2054
    first = []
 
2055
    second = []
 
2056
    filter_re = re.compile(pattern)
 
2057
    for test in iter_suite_tests(suite):
 
2058
        if filter_re.search(test.id()):
 
2059
            first.append(test)
 
2060
        else:
 
2061
            second.append(test)
 
2062
    return TestUtil.TestSuite(first + second)
 
2063
 
 
2064
 
 
2065
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2066
              stop_on_failure=False, keep_output=False,
 
2067
              transport=None, lsprof_timed=None, bench_history=None,
 
2068
              matching_tests_first=None,
 
2069
              numbered_dirs=None):
 
2070
    use_numbered_dirs = bool(numbered_dirs)
 
2071
 
 
2072
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2073
    if numbered_dirs is not None:
 
2074
        TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
 
2075
    if verbose:
 
2076
        verbosity = 2
 
2077
    else:
 
2078
        verbosity = 1
 
2079
    runner = TextTestRunner(stream=sys.stdout,
 
2080
                            descriptions=0,
 
2081
                            verbosity=verbosity,
 
2082
                            keep_output=keep_output,
 
2083
                            bench_history=bench_history,
 
2084
                            use_numbered_dirs=use_numbered_dirs,
 
2085
                            )
 
2086
    runner.stop_on_failure=stop_on_failure
 
2087
    if pattern != '.*':
 
2088
        if matching_tests_first:
 
2089
            suite = sort_suite_by_re(suite, pattern)
 
2090
        else:
 
2091
            suite = filter_suite_by_re(suite, pattern)
 
2092
    result = runner.run(suite)
 
2093
    return result.wasSuccessful()
 
2094
 
 
2095
 
 
2096
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2097
             keep_output=False,
 
2098
             transport=None,
 
2099
             test_suite_factory=None,
 
2100
             lsprof_timed=None,
 
2101
             bench_history=None,
 
2102
             matching_tests_first=None,
 
2103
             numbered_dirs=None):
 
2104
    """Run the whole test suite under the enhanced runner"""
 
2105
    # XXX: Very ugly way to do this...
 
2106
    # Disable warning about old formats because we don't want it to disturb
 
2107
    # any blackbox tests.
 
2108
    from bzrlib import repository
 
2109
    repository._deprecation_warning_done = True
 
2110
 
 
2111
    global default_transport
 
2112
    if transport is None:
 
2113
        transport = default_transport
 
2114
    old_transport = default_transport
 
2115
    default_transport = transport
 
2116
    try:
 
2117
        if test_suite_factory is None:
 
2118
            suite = test_suite()
 
2119
        else:
 
2120
            suite = test_suite_factory()
 
2121
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2122
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
2123
                     transport=transport,
 
2124
                     lsprof_timed=lsprof_timed,
 
2125
                     bench_history=bench_history,
 
2126
                     matching_tests_first=matching_tests_first,
 
2127
                     numbered_dirs=numbered_dirs)
 
2128
    finally:
 
2129
        default_transport = old_transport
 
2130
 
 
2131
 
 
2132
def test_suite():
 
2133
    """Build and return TestSuite for the whole of bzrlib.
 
2134
    
 
2135
    This function can be replaced if you need to change the default test
 
2136
    suite on a global basis, but it is not encouraged.
 
2137
    """
 
2138
    testmod_names = [
 
2139
                   'bzrlib.tests.test_ancestry',
 
2140
                   'bzrlib.tests.test_annotate',
 
2141
                   'bzrlib.tests.test_api',
 
2142
                   'bzrlib.tests.test_atomicfile',
 
2143
                   'bzrlib.tests.test_bad_files',
 
2144
                   'bzrlib.tests.test_branch',
 
2145
                   'bzrlib.tests.test_bundle',
 
2146
                   'bzrlib.tests.test_bzrdir',
 
2147
                   'bzrlib.tests.test_cache_utf8',
 
2148
                   'bzrlib.tests.test_commands',
 
2149
                   'bzrlib.tests.test_commit',
 
2150
                   'bzrlib.tests.test_commit_merge',
 
2151
                   'bzrlib.tests.test_config',
 
2152
                   'bzrlib.tests.test_conflicts',
 
2153
                   'bzrlib.tests.test_decorators',
 
2154
                   'bzrlib.tests.test_delta',
 
2155
                   'bzrlib.tests.test_diff',
 
2156
                   'bzrlib.tests.test_dirstate',
 
2157
                   'bzrlib.tests.test_doc_generate',
 
2158
                   'bzrlib.tests.test_errors',
 
2159
                   'bzrlib.tests.test_escaped_store',
 
2160
                   'bzrlib.tests.test_extract',
 
2161
                   'bzrlib.tests.test_fetch',
 
2162
                   'bzrlib.tests.test_ftp_transport',
 
2163
                   'bzrlib.tests.test_generate_docs',
 
2164
                   'bzrlib.tests.test_generate_ids',
 
2165
                   'bzrlib.tests.test_globbing',
 
2166
                   'bzrlib.tests.test_gpg',
 
2167
                   'bzrlib.tests.test_graph',
 
2168
                   'bzrlib.tests.test_hashcache',
 
2169
                   'bzrlib.tests.test_http',
 
2170
                   'bzrlib.tests.test_http_response',
 
2171
                   'bzrlib.tests.test_https_ca_bundle',
 
2172
                   'bzrlib.tests.test_identitymap',
 
2173
                   'bzrlib.tests.test_ignores',
 
2174
                   'bzrlib.tests.test_inv',
 
2175
                   'bzrlib.tests.test_knit',
 
2176
                   'bzrlib.tests.test_lazy_import',
 
2177
                   'bzrlib.tests.test_lazy_regex',
 
2178
                   'bzrlib.tests.test_lockdir',
 
2179
                   'bzrlib.tests.test_lockable_files',
 
2180
                   'bzrlib.tests.test_log',
 
2181
                   'bzrlib.tests.test_memorytree',
 
2182
                   'bzrlib.tests.test_merge',
 
2183
                   'bzrlib.tests.test_merge3',
 
2184
                   'bzrlib.tests.test_merge_core',
 
2185
                   'bzrlib.tests.test_merge_directive',
 
2186
                   'bzrlib.tests.test_missing',
 
2187
                   'bzrlib.tests.test_msgeditor',
 
2188
                   'bzrlib.tests.test_nonascii',
 
2189
                   'bzrlib.tests.test_options',
 
2190
                   'bzrlib.tests.test_osutils',
 
2191
                   'bzrlib.tests.test_osutils_encodings',
 
2192
                   'bzrlib.tests.test_patch',
 
2193
                   'bzrlib.tests.test_patches',
 
2194
                   'bzrlib.tests.test_permissions',
 
2195
                   'bzrlib.tests.test_plugins',
 
2196
                   'bzrlib.tests.test_progress',
 
2197
                   'bzrlib.tests.test_reconcile',
 
2198
                   'bzrlib.tests.test_registry',
 
2199
                   'bzrlib.tests.test_repository',
 
2200
                   'bzrlib.tests.test_revert',
 
2201
                   'bzrlib.tests.test_revision',
 
2202
                   'bzrlib.tests.test_revisionnamespaces',
 
2203
                   'bzrlib.tests.test_revisiontree',
 
2204
                   'bzrlib.tests.test_rio',
 
2205
                   'bzrlib.tests.test_sampler',
 
2206
                   'bzrlib.tests.test_selftest',
 
2207
                   'bzrlib.tests.test_setup',
 
2208
                   'bzrlib.tests.test_sftp_transport',
 
2209
                   'bzrlib.tests.test_smart_add',
 
2210
                   'bzrlib.tests.test_smart_transport',
 
2211
                   'bzrlib.tests.test_source',
 
2212
                   'bzrlib.tests.test_ssh_transport',
 
2213
                   'bzrlib.tests.test_status',
 
2214
                   'bzrlib.tests.test_store',
 
2215
                   'bzrlib.tests.test_strace',
 
2216
                   'bzrlib.tests.test_subsume',
 
2217
                   'bzrlib.tests.test_symbol_versioning',
 
2218
                   'bzrlib.tests.test_tag',
 
2219
                   'bzrlib.tests.test_testament',
 
2220
                   'bzrlib.tests.test_textfile',
 
2221
                   'bzrlib.tests.test_textmerge',
 
2222
                   'bzrlib.tests.test_timestamp',
 
2223
                   'bzrlib.tests.test_trace',
 
2224
                   'bzrlib.tests.test_transactions',
 
2225
                   'bzrlib.tests.test_transform',
 
2226
                   'bzrlib.tests.test_transport',
 
2227
                   'bzrlib.tests.test_tree',
 
2228
                   'bzrlib.tests.test_treebuilder',
 
2229
                   'bzrlib.tests.test_tsort',
 
2230
                   'bzrlib.tests.test_tuned_gzip',
 
2231
                   'bzrlib.tests.test_ui',
 
2232
                   'bzrlib.tests.test_upgrade',
 
2233
                   'bzrlib.tests.test_urlutils',
 
2234
                   'bzrlib.tests.test_versionedfile',
 
2235
                   'bzrlib.tests.test_version',
 
2236
                   'bzrlib.tests.test_version_info',
 
2237
                   'bzrlib.tests.test_weave',
 
2238
                   'bzrlib.tests.test_whitebox',
 
2239
                   'bzrlib.tests.test_workingtree',
 
2240
                   'bzrlib.tests.test_workingtree_4',
 
2241
                   'bzrlib.tests.test_wsgi',
 
2242
                   'bzrlib.tests.test_xml',
 
2243
                   ]
 
2244
    test_transport_implementations = [
 
2245
        'bzrlib.tests.test_transport_implementations',
 
2246
        'bzrlib.tests.test_read_bundle',
 
2247
        ]
 
2248
    suite = TestUtil.TestSuite()
 
2249
    loader = TestUtil.TestLoader()
 
2250
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2251
    from bzrlib.transport import TransportTestProviderAdapter
 
2252
    adapter = TransportTestProviderAdapter()
 
2253
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2254
    for package in packages_to_test():
 
2255
        suite.addTest(package.test_suite())
 
2256
    for m in MODULES_TO_TEST:
 
2257
        suite.addTest(loader.loadTestsFromModule(m))
 
2258
    for m in MODULES_TO_DOCTEST:
 
2259
        try:
 
2260
            suite.addTest(doctest.DocTestSuite(m))
 
2261
        except ValueError, e:
 
2262
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2263
            raise
 
2264
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2265
        if getattr(plugin, 'test_suite', None) is not None:
 
2266
            default_encoding = sys.getdefaultencoding()
 
2267
            try:
 
2268
                plugin_suite = plugin.test_suite()
 
2269
            except ImportError, e:
 
2270
                bzrlib.trace.warning(
 
2271
                    'Unable to test plugin "%s": %s', name, e)
 
2272
            else:
 
2273
                suite.addTest(plugin_suite)
 
2274
            if default_encoding != sys.getdefaultencoding():
 
2275
                bzrlib.trace.warning(
 
2276
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2277
                    sys.getdefaultencoding())
 
2278
                reload(sys)
 
2279
                sys.setdefaultencoding(default_encoding)
 
2280
    return suite
 
2281
 
 
2282
 
 
2283
def adapt_modules(mods_list, adapter, loader, suite):
 
2284
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2285
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2286
        suite.addTests(adapter.adapt(test))
 
2287
 
 
2288
 
 
2289
def _rmtree_temp_dir(dirname):
 
2290
    try:
 
2291
        osutils.rmtree(dirname)
 
2292
    except OSError, e:
 
2293
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2294
            print >>sys.stderr, ('Permission denied: '
 
2295
                                 'unable to remove testing dir '
 
2296
                                 '%s' % os.path.basename(test_root))
 
2297
        else:
 
2298
            raise
 
2299
 
 
2300
 
 
2301
def clean_selftest_output(root=None, quiet=False):
 
2302
    """Remove all selftest output directories from root directory.
 
2303
 
 
2304
    :param  root:   root directory for clean
 
2305
                    (if ommitted or None then clean current directory).
 
2306
    :param  quiet:  suppress report about deleting directories
 
2307
    """
 
2308
    import re
41
2309
    import shutil
42
 
    import time
43
 
    import sys
44
 
    import unittest
45
 
 
46
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
47
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
48
 
        if m not in MODULES_TO_DOCTEST:
49
 
            MODULES_TO_DOCTEST.append(m)
50
 
            
51
 
    for m in (bzrlib.selftest.whitebox,
52
 
              bzrlib.selftest.versioning,
53
 
              bzrlib.selftest.testinv,
54
 
              bzrlib.selftest.testmerge3,
55
 
              bzrlib.selftest.testhashcache,
56
 
              bzrlib.selftest.teststatus,
57
 
              bzrlib.selftest.blackbox,
58
 
              bzrlib.selftest.testhashcache,
59
 
              bzrlib.selftest.testrevisionnamespaces,
60
 
              bzrlib.selftest.testbranch,
61
 
              ):
62
 
        if m not in MODULES_TO_TEST:
63
 
            MODULES_TO_TEST.append(m)
64
 
 
65
 
 
66
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
67
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
68
 
 
69
 
    print
70
 
 
71
 
    suite = TestSuite()
72
 
 
73
 
    # should also test bzrlib.merge_core, but they seem to be out of date with
74
 
    # the code.
75
 
 
76
 
 
77
 
    # XXX: python2.3's TestLoader() doesn't seem to find all the
78
 
    # tests; don't know why
79
 
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
82
 
    for m in (MODULES_TO_DOCTEST):
83
 
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
90
 
 
91
 
    return run_suite(suite, 'testbzr')
92
 
 
93
 
 
94
 
 
 
2310
 
 
2311
    re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
 
2312
    if root is None:
 
2313
        root = u'.'
 
2314
    for i in os.listdir(root):
 
2315
        if os.path.isdir(i) and re_dir.match(i):
 
2316
            if not quiet:
 
2317
                print 'delete directory:', i
 
2318
            _rmtree_temp_dir(i)
 
2319
 
 
2320
 
 
2321
class Feature(object):
 
2322
    """An operating system Feature."""
 
2323
 
 
2324
    def __init__(self):
 
2325
        self._available = None
 
2326
 
 
2327
    def available(self):
 
2328
        """Is the feature available?
 
2329
 
 
2330
        :return: True if the feature is available.
 
2331
        """
 
2332
        if self._available is None:
 
2333
            self._available = self._probe()
 
2334
        return self._available
 
2335
 
 
2336
    def _probe(self):
 
2337
        """Implement this method in concrete features.
 
2338
 
 
2339
        :return: True if the feature is available.
 
2340
        """
 
2341
        raise NotImplementedError
 
2342
 
 
2343
    def __str__(self):
 
2344
        if getattr(self, 'feature_name', None):
 
2345
            return self.feature_name()
 
2346
        return self.__class__.__name__