~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: John Arbash Meinel
  • Date: 2006-06-11 03:09:28 UTC
  • mto: (1711.7.2 win32)
  • mto: This revision was merged to the branch mainline in revision 1796.
  • Revision ID: john@arbash-meinel.com-20060611030928-502d4af47bd62fe1
Remove cp437 from the set of encodings, it isn't strictly needed

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import errno
 
33
import logging
 
34
import os
 
35
import re
 
36
import shlex
 
37
import stat
 
38
from subprocess import Popen, PIPE
 
39
import sys
 
40
import tempfile
 
41
import unittest
 
42
import time
 
43
 
 
44
 
 
45
import bzrlib.branch
 
46
import bzrlib.bzrdir as bzrdir
 
47
import bzrlib.commands
 
48
import bzrlib.bundle.serializer
 
49
import bzrlib.errors as errors
 
50
import bzrlib.inventory
 
51
import bzrlib.iterablefile
 
52
import bzrlib.lockdir
 
53
try:
 
54
    import bzrlib.lsprof
 
55
except ImportError:
 
56
    # lsprof not available
 
57
    pass
 
58
from bzrlib.merge import merge_inner
 
59
import bzrlib.merge3
 
60
import bzrlib.osutils
 
61
import bzrlib.osutils as osutils
 
62
import bzrlib.plugin
 
63
import bzrlib.progress as progress
 
64
from bzrlib.revision import common_ancestor
 
65
import bzrlib.store
 
66
import bzrlib.trace
 
67
from bzrlib.transport import get_transport
 
68
import bzrlib.transport
 
69
from bzrlib.transport.local import LocalRelpathServer
 
70
from bzrlib.transport.readonly import ReadonlyServer
 
71
from bzrlib.trace import mutter
 
72
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
73
from bzrlib.tests.treeshape import build_tree_contents
 
74
import bzrlib.urlutils as urlutils
 
75
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
76
 
 
77
default_transport = LocalRelpathServer
19
78
 
20
79
MODULES_TO_TEST = []
21
 
MODULES_TO_DOCTEST = []
22
 
 
23
 
def selftest():
24
 
    from unittest import TestLoader, TestSuite
25
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
26
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
27
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
28
 
 
29
 
    import bzrlib.selftest.whitebox
30
 
    import bzrlib.selftest.blackbox
31
 
    import bzrlib.selftest.versioning
32
 
    import bzrlib.selftest.testmerge3
33
 
    import bzrlib.selftest.testhashcache
34
 
    import bzrlib.selftest.testrevisionnamespaces
35
 
    import bzrlib.selftest.testbranch
36
 
    import bzrlib.selftest.teststatus
37
 
    import bzrlib.selftest.testinv
38
 
    import bzrlib.merge_core
 
80
MODULES_TO_DOCTEST = [
 
81
                      bzrlib.branch,
 
82
                      bzrlib.bundle.serializer,
 
83
                      bzrlib.commands,
 
84
                      bzrlib.errors,
 
85
                      bzrlib.inventory,
 
86
                      bzrlib.iterablefile,
 
87
                      bzrlib.lockdir,
 
88
                      bzrlib.merge3,
 
89
                      bzrlib.option,
 
90
                      bzrlib.osutils,
 
91
                      bzrlib.store
 
92
                      ]
 
93
 
 
94
 
 
95
def packages_to_test():
 
96
    """Return a list of packages to test.
 
97
 
 
98
    The packages are not globally imported so that import failures are
 
99
    triggered when running selftest, not when importing the command.
 
100
    """
 
101
    import bzrlib.doc
 
102
    import bzrlib.tests.blackbox
 
103
    import bzrlib.tests.branch_implementations
 
104
    import bzrlib.tests.bzrdir_implementations
 
105
    import bzrlib.tests.interrepository_implementations
 
106
    import bzrlib.tests.interversionedfile_implementations
 
107
    import bzrlib.tests.repository_implementations
 
108
    import bzrlib.tests.revisionstore_implementations
 
109
    import bzrlib.tests.workingtree_implementations
 
110
    return [
 
111
            bzrlib.doc,
 
112
            bzrlib.tests.blackbox,
 
113
            bzrlib.tests.branch_implementations,
 
114
            bzrlib.tests.bzrdir_implementations,
 
115
            bzrlib.tests.interrepository_implementations,
 
116
            bzrlib.tests.interversionedfile_implementations,
 
117
            bzrlib.tests.repository_implementations,
 
118
            bzrlib.tests.revisionstore_implementations,
 
119
            bzrlib.tests.workingtree_implementations,
 
120
            ]
 
121
 
 
122
 
 
123
class _MyResult(unittest._TextTestResult):
 
124
    """Custom TestResult.
 
125
 
 
126
    Shows output in a different format, including displaying runtime for tests.
 
127
    """
 
128
    stop_early = False
 
129
    
 
130
    def __init__(self, stream, descriptions, verbosity, pb=None):
 
131
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
132
        self.pb = pb
 
133
    
 
134
    def extractBenchmarkTime(self, testCase):
 
135
        """Add a benchmark time for the current test case."""
 
136
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
137
    
 
138
    def _elapsedTestTimeString(self):
 
139
        """Return a time string for the overall time the current test has taken."""
 
140
        return self._formatTime(time.time() - self._start_time)
 
141
 
 
142
    def _testTimeString(self):
 
143
        if self._benchmarkTime is not None:
 
144
            return "%s/%s" % (
 
145
                self._formatTime(self._benchmarkTime),
 
146
                self._elapsedTestTimeString())
 
147
        else:
 
148
            return "      %s" % self._elapsedTestTimeString()
 
149
 
 
150
    def _formatTime(self, seconds):
 
151
        """Format seconds as milliseconds with leading spaces."""
 
152
        return "%5dms" % (1000 * seconds)
 
153
 
 
154
    def _ellipsise_unimportant_words(self, a_string, final_width,
 
155
                                   keep_start=False):
 
156
        """Add ellipses (sp?) for overly long strings.
 
157
        
 
158
        :param keep_start: If true preserve the start of a_string rather
 
159
                           than the end of it.
 
160
        """
 
161
        if keep_start:
 
162
            if len(a_string) > final_width:
 
163
                result = a_string[:final_width-3] + '...'
 
164
            else:
 
165
                result = a_string
 
166
        else:
 
167
            if len(a_string) > final_width:
 
168
                result = '...' + a_string[3-final_width:]
 
169
            else:
 
170
                result = a_string
 
171
        return result.ljust(final_width)
 
172
 
 
173
    def startTest(self, test):
 
174
        unittest.TestResult.startTest(self, test)
 
175
        # In a short description, the important words are in
 
176
        # the beginning, but in an id, the important words are
 
177
        # at the end
 
178
        SHOW_DESCRIPTIONS = False
 
179
 
 
180
        if not self.showAll and self.dots and self.pb is not None:
 
181
            final_width = 13
 
182
        else:
 
183
            final_width = osutils.terminal_width()
 
184
            final_width = final_width - 15 - 8
 
185
        what = None
 
186
        if SHOW_DESCRIPTIONS:
 
187
            what = test.shortDescription()
 
188
            if what:
 
189
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
 
190
        if what is None:
 
191
            what = test.id()
 
192
            if what.startswith('bzrlib.tests.'):
 
193
                what = what[13:]
 
194
            what = self._ellipsise_unimportant_words(what, final_width)
 
195
        if self.showAll:
 
196
            self.stream.write(what)
 
197
        elif self.dots and self.pb is not None:
 
198
            self.pb.update(what, self.testsRun - 1, None)
 
199
        self.stream.flush()
 
200
        self._recordTestStartTime()
 
201
 
 
202
    def _recordTestStartTime(self):
 
203
        """Record that a test has started."""
 
204
        self._start_time = time.time()
 
205
 
 
206
    def addError(self, test, err):
 
207
        if isinstance(err[1], TestSkipped):
 
208
            return self.addSkipped(test, err)    
 
209
        unittest.TestResult.addError(self, test, err)
 
210
        self.extractBenchmarkTime(test)
 
211
        if self.showAll:
 
212
            self.stream.writeln("ERROR %s" % self._testTimeString())
 
213
        elif self.dots and self.pb is None:
 
214
            self.stream.write('E')
 
215
        elif self.dots:
 
216
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
 
217
        self.stream.flush()
 
218
        if self.stop_early:
 
219
            self.stop()
 
220
 
 
221
    def addFailure(self, test, err):
 
222
        unittest.TestResult.addFailure(self, test, err)
 
223
        self.extractBenchmarkTime(test)
 
224
        if self.showAll:
 
225
            self.stream.writeln(" FAIL %s" % self._testTimeString())
 
226
        elif self.dots and self.pb is None:
 
227
            self.stream.write('F')
 
228
        elif self.dots:
 
229
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
 
230
        self.stream.flush()
 
231
        if self.stop_early:
 
232
            self.stop()
 
233
 
 
234
    def addSuccess(self, test):
 
235
        self.extractBenchmarkTime(test)
 
236
        if self.showAll:
 
237
            self.stream.writeln('   OK %s' % self._testTimeString())
 
238
            for bench_called, stats in getattr(test, '_benchcalls', []):
 
239
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
240
                stats.pprint(file=self.stream)
 
241
        elif self.dots and self.pb is None:
 
242
            self.stream.write('~')
 
243
        elif self.dots:
 
244
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
 
245
        self.stream.flush()
 
246
        unittest.TestResult.addSuccess(self, test)
 
247
 
 
248
    def addSkipped(self, test, skip_excinfo):
 
249
        self.extractBenchmarkTime(test)
 
250
        if self.showAll:
 
251
            print >>self.stream, ' SKIP %s' % self._testTimeString()
 
252
            print >>self.stream, '     %s' % skip_excinfo[1]
 
253
        elif self.dots and self.pb is None:
 
254
            self.stream.write('S')
 
255
        elif self.dots:
 
256
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
 
257
        self.stream.flush()
 
258
        # seems best to treat this as success from point-of-view of unittest
 
259
        # -- it actually does nothing so it barely matters :)
 
260
        unittest.TestResult.addSuccess(self, test)
 
261
 
 
262
    def printErrorList(self, flavour, errors):
 
263
        for test, err in errors:
 
264
            self.stream.writeln(self.separator1)
 
265
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
266
            if getattr(test, '_get_log', None) is not None:
 
267
                print >>self.stream
 
268
                print >>self.stream, \
 
269
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
270
                print >>self.stream, test._get_log()
 
271
                print >>self.stream, \
 
272
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
273
            self.stream.writeln(self.separator2)
 
274
            self.stream.writeln("%s" % err)
 
275
 
 
276
 
 
277
class TextTestRunner(object):
 
278
    stop_on_failure = False
 
279
 
 
280
    def __init__(self,
 
281
                 stream=sys.stderr,
 
282
                 descriptions=0,
 
283
                 verbosity=1,
 
284
                 keep_output=False,
 
285
                 pb=None):
 
286
        self.stream = unittest._WritelnDecorator(stream)
 
287
        self.descriptions = descriptions
 
288
        self.verbosity = verbosity
 
289
        self.keep_output = keep_output
 
290
        self.pb = pb
 
291
 
 
292
    def _makeResult(self):
 
293
        result = _MyResult(self.stream,
 
294
                           self.descriptions,
 
295
                           self.verbosity,
 
296
                           pb=self.pb)
 
297
        result.stop_early = self.stop_on_failure
 
298
        return result
 
299
 
 
300
    def run(self, test):
 
301
        "Run the given test case or test suite."
 
302
        result = self._makeResult()
 
303
        startTime = time.time()
 
304
        if self.pb is not None:
 
305
            self.pb.update('Running tests', 0, test.countTestCases())
 
306
        test.run(result)
 
307
        stopTime = time.time()
 
308
        timeTaken = stopTime - startTime
 
309
        result.printErrors()
 
310
        self.stream.writeln(result.separator2)
 
311
        run = result.testsRun
 
312
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
313
                            (run, run != 1 and "s" or "", timeTaken))
 
314
        self.stream.writeln()
 
315
        if not result.wasSuccessful():
 
316
            self.stream.write("FAILED (")
 
317
            failed, errored = map(len, (result.failures, result.errors))
 
318
            if failed:
 
319
                self.stream.write("failures=%d" % failed)
 
320
            if errored:
 
321
                if failed: self.stream.write(", ")
 
322
                self.stream.write("errors=%d" % errored)
 
323
            self.stream.writeln(")")
 
324
        else:
 
325
            self.stream.writeln("OK")
 
326
        if self.pb is not None:
 
327
            self.pb.update('Cleaning up', 0, 1)
 
328
        # This is still a little bogus, 
 
329
        # but only a little. Folk not using our testrunner will
 
330
        # have to delete their temp directories themselves.
 
331
        test_root = TestCaseInTempDir.TEST_ROOT
 
332
        if result.wasSuccessful() or not self.keep_output:
 
333
            if test_root is not None:
 
334
                # If LANG=C we probably have created some bogus paths
 
335
                # which rmtree(unicode) will fail to delete
 
336
                # so make sure we are using rmtree(str) to delete everything
 
337
                # except on win32, where rmtree(str) will fail
 
338
                # since it doesn't have the property of byte-stream paths
 
339
                # (they are either ascii or mbcs)
 
340
                if sys.platform == 'win32':
 
341
                    # make sure we are using the unicode win32 api
 
342
                    test_root = unicode(test_root)
 
343
                else:
 
344
                    test_root = test_root.encode(
 
345
                        sys.getfilesystemencoding())
 
346
                osutils.rmtree(test_root)
 
347
        else:
 
348
            if self.pb is not None:
 
349
                self.pb.note("Failed tests working directories are in '%s'\n",
 
350
                             test_root)
 
351
            else:
 
352
                self.stream.writeln(
 
353
                    "Failed tests working directories are in '%s'\n" %
 
354
                    test_root)
 
355
        TestCaseInTempDir.TEST_ROOT = None
 
356
        if self.pb is not None:
 
357
            self.pb.clear()
 
358
        return result
 
359
 
 
360
 
 
361
def iter_suite_tests(suite):
 
362
    """Return all tests in a suite, recursing through nested suites"""
 
363
    for item in suite._tests:
 
364
        if isinstance(item, unittest.TestCase):
 
365
            yield item
 
366
        elif isinstance(item, unittest.TestSuite):
 
367
            for r in iter_suite_tests(item):
 
368
                yield r
 
369
        else:
 
370
            raise Exception('unknown object %r inside test suite %r'
 
371
                            % (item, suite))
 
372
 
 
373
 
 
374
class TestSkipped(Exception):
 
375
    """Indicates that a test was intentionally skipped, rather than failing."""
 
376
    # XXX: Not used yet
 
377
 
 
378
 
 
379
class CommandFailed(Exception):
 
380
    pass
 
381
 
 
382
 
 
383
class StringIOWrapper(object):
 
384
    """A wrapper around cStringIO which just adds an encoding attribute.
 
385
    
 
386
    Internally we can check sys.stdout to see what the output encoding
 
387
    should be. However, cStringIO has no encoding attribute that we can
 
388
    set. So we wrap it instead.
 
389
    """
 
390
    encoding='ascii'
 
391
    _cstring = None
 
392
 
 
393
    def __init__(self, s=None):
 
394
        if s is not None:
 
395
            self.__dict__['_cstring'] = StringIO(s)
 
396
        else:
 
397
            self.__dict__['_cstring'] = StringIO()
 
398
 
 
399
    def __getattr__(self, name, getattr=getattr):
 
400
        return getattr(self.__dict__['_cstring'], name)
 
401
 
 
402
    def __setattr__(self, name, val):
 
403
        if name == 'encoding':
 
404
            self.__dict__['encoding'] = val
 
405
        else:
 
406
            return setattr(self._cstring, name, val)
 
407
 
 
408
 
 
409
class TestCase(unittest.TestCase):
 
410
    """Base class for bzr unit tests.
 
411
    
 
412
    Tests that need access to disk resources should subclass 
 
413
    TestCaseInTempDir not TestCase.
 
414
 
 
415
    Error and debug log messages are redirected from their usual
 
416
    location into a temporary file, the contents of which can be
 
417
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
418
    so that it can also capture file IO.  When the test completes this file
 
419
    is read into memory and removed from disk.
 
420
       
 
421
    There are also convenience functions to invoke bzr's command-line
 
422
    routine, and to build and check bzr trees.
 
423
   
 
424
    In addition to the usual method of overriding tearDown(), this class also
 
425
    allows subclasses to register functions into the _cleanups list, which is
 
426
    run in order as the object is torn down.  It's less likely this will be
 
427
    accidentally overlooked.
 
428
    """
 
429
 
 
430
    _log_file_name = None
 
431
    _log_contents = ''
 
432
    # record lsprof data when performing benchmark calls.
 
433
    _gather_lsprof_in_benchmarks = False
 
434
 
 
435
    def __init__(self, methodName='testMethod'):
 
436
        super(TestCase, self).__init__(methodName)
 
437
        self._cleanups = []
 
438
 
 
439
    def setUp(self):
 
440
        unittest.TestCase.setUp(self)
 
441
        self._cleanEnvironment()
 
442
        bzrlib.trace.disable_default_logging()
 
443
        self._startLogFile()
 
444
        self._benchcalls = []
 
445
        self._benchtime = None
 
446
 
 
447
    def _ndiff_strings(self, a, b):
 
448
        """Return ndiff between two strings containing lines.
 
449
        
 
450
        A trailing newline is added if missing to make the strings
 
451
        print properly."""
 
452
        if b and b[-1] != '\n':
 
453
            b += '\n'
 
454
        if a and a[-1] != '\n':
 
455
            a += '\n'
 
456
        difflines = difflib.ndiff(a.splitlines(True),
 
457
                                  b.splitlines(True),
 
458
                                  linejunk=lambda x: False,
 
459
                                  charjunk=lambda x: False)
 
460
        return ''.join(difflines)
 
461
 
 
462
    def assertEqualDiff(self, a, b, message=None):
 
463
        """Assert two texts are equal, if not raise an exception.
 
464
        
 
465
        This is intended for use with multi-line strings where it can 
 
466
        be hard to find the differences by eye.
 
467
        """
 
468
        # TODO: perhaps override assertEquals to call this for strings?
 
469
        if a == b:
 
470
            return
 
471
        if message is None:
 
472
            message = "texts not equal:\n"
 
473
        raise AssertionError(message + 
 
474
                             self._ndiff_strings(a, b))      
 
475
        
 
476
    def assertEqualMode(self, mode, mode_test):
 
477
        self.assertEqual(mode, mode_test,
 
478
                         'mode mismatch %o != %o' % (mode, mode_test))
 
479
 
 
480
    def assertStartsWith(self, s, prefix):
 
481
        if not s.startswith(prefix):
 
482
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
483
 
 
484
    def assertEndsWith(self, s, suffix):
 
485
        """Asserts that s ends with suffix."""
 
486
        if not s.endswith(suffix):
 
487
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
488
 
 
489
    def assertContainsRe(self, haystack, needle_re):
 
490
        """Assert that a contains something matching a regular expression."""
 
491
        if not re.search(needle_re, haystack):
 
492
            raise AssertionError('pattern "%s" not found in "%s"'
 
493
                    % (needle_re, haystack))
 
494
 
 
495
    def assertSubset(self, sublist, superlist):
 
496
        """Assert that every entry in sublist is present in superlist."""
 
497
        missing = []
 
498
        for entry in sublist:
 
499
            if entry not in superlist:
 
500
                missing.append(entry)
 
501
        if len(missing) > 0:
 
502
            raise AssertionError("value(s) %r not present in container %r" % 
 
503
                                 (missing, superlist))
 
504
 
 
505
    def assertIs(self, left, right):
 
506
        if not (left is right):
 
507
            raise AssertionError("%r is not %r." % (left, right))
 
508
 
 
509
    def assertTransportMode(self, transport, path, mode):
 
510
        """Fail if a path does not have mode mode.
 
511
        
 
512
        If modes are not supported on this transport, the assertion is ignored.
 
513
        """
 
514
        if not transport._can_roundtrip_unix_modebits():
 
515
            return
 
516
        path_stat = transport.stat(path)
 
517
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
518
        self.assertEqual(mode, actual_mode,
 
519
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
520
 
 
521
    def assertIsInstance(self, obj, kls):
 
522
        """Fail if obj is not an instance of kls"""
 
523
        if not isinstance(obj, kls):
 
524
            self.fail("%r is an instance of %s rather than %s" % (
 
525
                obj, obj.__class__, kls))
 
526
 
 
527
    def _startLogFile(self):
 
528
        """Send bzr and test log messages to a temporary file.
 
529
 
 
530
        The file is removed as the test is torn down.
 
531
        """
 
532
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
533
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
534
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
535
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
536
        self._log_file_name = name
 
537
        self.addCleanup(self._finishLogFile)
 
538
 
 
539
    def _finishLogFile(self):
 
540
        """Finished with the log file.
 
541
 
 
542
        Read contents into memory, close, and delete.
 
543
        """
 
544
        bzrlib.trace.disable_test_log(self._log_nonce)
 
545
        self._log_file.seek(0)
 
546
        self._log_contents = self._log_file.read()
 
547
        self._log_file.close()
 
548
        os.remove(self._log_file_name)
 
549
        self._log_file = self._log_file_name = None
 
550
 
 
551
    def addCleanup(self, callable):
 
552
        """Arrange to run a callable when this case is torn down.
 
553
 
 
554
        Callables are run in the reverse of the order they are registered, 
 
555
        ie last-in first-out.
 
556
        """
 
557
        if callable in self._cleanups:
 
558
            raise ValueError("cleanup function %r already registered on %s" 
 
559
                    % (callable, self))
 
560
        self._cleanups.append(callable)
 
561
 
 
562
    def _cleanEnvironment(self):
 
563
        new_env = {
 
564
            'HOME': os.getcwd(),
 
565
            'APPDATA': os.getcwd(),
 
566
            'BZREMAIL': None,
 
567
            'EMAIL': None,
 
568
        }
 
569
        self.__old_env = {}
 
570
        self.addCleanup(self._restoreEnvironment)
 
571
        for name, value in new_env.iteritems():
 
572
            self._captureVar(name, value)
 
573
 
 
574
 
 
575
    def _captureVar(self, name, newvalue):
 
576
        """Set an environment variable, preparing it to be reset when finished."""
 
577
        self.__old_env[name] = os.environ.get(name, None)
 
578
        if newvalue is None:
 
579
            if name in os.environ:
 
580
                del os.environ[name]
 
581
        else:
 
582
            os.environ[name] = newvalue
 
583
 
 
584
    @staticmethod
 
585
    def _restoreVar(name, value):
 
586
        if value is None:
 
587
            if name in os.environ:
 
588
                del os.environ[name]
 
589
        else:
 
590
            os.environ[name] = value
 
591
 
 
592
    def _restoreEnvironment(self):
 
593
        for name, value in self.__old_env.iteritems():
 
594
            self._restoreVar(name, value)
 
595
 
 
596
    def tearDown(self):
 
597
        self._runCleanups()
 
598
        unittest.TestCase.tearDown(self)
 
599
 
 
600
    def time(self, callable, *args, **kwargs):
 
601
        """Run callable and accrue the time it takes to the benchmark time.
 
602
        
 
603
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
604
        this will cause lsprofile statistics to be gathered and stored in
 
605
        self._benchcalls.
 
606
        """
 
607
        if self._benchtime is None:
 
608
            self._benchtime = 0
 
609
        start = time.time()
 
610
        try:
 
611
            if not self._gather_lsprof_in_benchmarks:
 
612
                return callable(*args, **kwargs)
 
613
            else:
 
614
                # record this benchmark
 
615
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
616
                stats.sort()
 
617
                self._benchcalls.append(((callable, args, kwargs), stats))
 
618
                return ret
 
619
        finally:
 
620
            self._benchtime += time.time() - start
 
621
 
 
622
    def _runCleanups(self):
 
623
        """Run registered cleanup functions. 
 
624
 
 
625
        This should only be called from TestCase.tearDown.
 
626
        """
 
627
        # TODO: Perhaps this should keep running cleanups even if 
 
628
        # one of them fails?
 
629
        for cleanup_fn in reversed(self._cleanups):
 
630
            cleanup_fn()
 
631
 
 
632
    def log(self, *args):
 
633
        mutter(*args)
 
634
 
 
635
    def _get_log(self):
 
636
        """Return as a string the log for this test"""
 
637
        if self._log_file_name:
 
638
            return open(self._log_file_name).read()
 
639
        else:
 
640
            return self._log_contents
 
641
        # TODO: Delete the log after it's been read in
 
642
 
 
643
    def capture(self, cmd, retcode=0):
 
644
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
645
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
646
 
 
647
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
 
648
        """Invoke bzr and return (stdout, stderr).
 
649
 
 
650
        Useful for code that wants to check the contents of the
 
651
        output, the way error messages are presented, etc.
 
652
 
 
653
        This should be the main method for tests that want to exercise the
 
654
        overall behavior of the bzr application (rather than a unit test
 
655
        or a functional test of the library.)
 
656
 
 
657
        Much of the old code runs bzr by forking a new copy of Python, but
 
658
        that is slower, harder to debug, and generally not necessary.
 
659
 
 
660
        This runs bzr through the interface that catches and reports
 
661
        errors, and with logging set to something approximating the
 
662
        default, so that error reporting can be checked.
 
663
 
 
664
        :param argv: arguments to invoke bzr
 
665
        :param retcode: expected return code, or None for don't-care.
 
666
        :param encoding: encoding for sys.stdout and sys.stderr
 
667
        :param stdin: A string to be used as stdin for the command.
 
668
        """
 
669
        if encoding is None:
 
670
            encoding = bzrlib.user_encoding
 
671
        if stdin is not None:
 
672
            stdin = StringIO(stdin)
 
673
        stdout = StringIOWrapper()
 
674
        stderr = StringIOWrapper()
 
675
        stdout.encoding = encoding
 
676
        stderr.encoding = encoding
 
677
 
 
678
        self.log('run bzr: %r', argv)
 
679
        # FIXME: don't call into logging here
 
680
        handler = logging.StreamHandler(stderr)
 
681
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
682
        handler.setLevel(logging.INFO)
 
683
        logger = logging.getLogger('')
 
684
        logger.addHandler(handler)
 
685
        old_ui_factory = bzrlib.ui.ui_factory
 
686
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
687
            stdout=stdout,
 
688
            stderr=stderr)
 
689
        bzrlib.ui.ui_factory.stdin = stdin
 
690
        try:
 
691
            result = self.apply_redirected(stdin, stdout, stderr,
 
692
                                           bzrlib.commands.run_bzr_catch_errors,
 
693
                                           argv)
 
694
        finally:
 
695
            logger.removeHandler(handler)
 
696
            bzrlib.ui.ui_factory = old_ui_factory
 
697
 
 
698
        out = stdout.getvalue()
 
699
        err = stderr.getvalue()
 
700
        if out:
 
701
            self.log('output:\n%r', out)
 
702
        if err:
 
703
            self.log('errors:\n%r', err)
 
704
        if retcode is not None:
 
705
            self.assertEquals(retcode, result)
 
706
        return out, err
 
707
 
 
708
    def run_bzr(self, *args, **kwargs):
 
709
        """Invoke bzr, as if it were run from the command line.
 
710
 
 
711
        This should be the main method for tests that want to exercise the
 
712
        overall behavior of the bzr application (rather than a unit test
 
713
        or a functional test of the library.)
 
714
 
 
715
        This sends the stdout/stderr results into the test's log,
 
716
        where it may be useful for debugging.  See also run_captured.
 
717
 
 
718
        :param stdin: A string to be used as stdin for the command.
 
719
        """
 
720
        retcode = kwargs.pop('retcode', 0)
 
721
        encoding = kwargs.pop('encoding', None)
 
722
        stdin = kwargs.pop('stdin', None)
 
723
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
 
724
 
 
725
    def run_bzr_decode(self, *args, **kwargs):
 
726
        if kwargs.has_key('encoding'):
 
727
            encoding = kwargs['encoding']
 
728
        else:
 
729
            encoding = bzrlib.user_encoding
 
730
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
731
 
 
732
    def run_bzr_external(self, *args, **kwargs):
 
733
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
734
        if len(args) == 1:
 
735
            args = shlex.split(args[0])
 
736
        args = list(args)
 
737
        process = Popen([bzr_path]+args, stdout=PIPE, stderr=PIPE)
 
738
        out = process.stdout.read()
 
739
        err = process.stderr.read()
 
740
        retcode = process.wait()
 
741
        supplied_retcode = kwargs.get('retcode')
 
742
        if supplied_retcode is not None:
 
743
            assert supplied_retcode == retcode
 
744
        else:
 
745
            assert retcode == 0
 
746
        return [out, err]
 
747
 
 
748
    def check_inventory_shape(self, inv, shape):
 
749
        """Compare an inventory to a list of expected names.
 
750
 
 
751
        Fail if they are not precisely equal.
 
752
        """
 
753
        extras = []
 
754
        shape = list(shape)             # copy
 
755
        for path, ie in inv.entries():
 
756
            name = path.replace('\\', '/')
 
757
            if ie.kind == 'dir':
 
758
                name = name + '/'
 
759
            if name in shape:
 
760
                shape.remove(name)
 
761
            else:
 
762
                extras.append(name)
 
763
        if shape:
 
764
            self.fail("expected paths not found in inventory: %r" % shape)
 
765
        if extras:
 
766
            self.fail("unexpected paths found in inventory: %r" % extras)
 
767
 
 
768
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
769
                         a_callable=None, *args, **kwargs):
 
770
        """Call callable with redirected std io pipes.
 
771
 
 
772
        Returns the return code."""
 
773
        if not callable(a_callable):
 
774
            raise ValueError("a_callable must be callable.")
 
775
        if stdin is None:
 
776
            stdin = StringIO("")
 
777
        if stdout is None:
 
778
            if getattr(self, "_log_file", None) is not None:
 
779
                stdout = self._log_file
 
780
            else:
 
781
                stdout = StringIO()
 
782
        if stderr is None:
 
783
            if getattr(self, "_log_file", None is not None):
 
784
                stderr = self._log_file
 
785
            else:
 
786
                stderr = StringIO()
 
787
        real_stdin = sys.stdin
 
788
        real_stdout = sys.stdout
 
789
        real_stderr = sys.stderr
 
790
        try:
 
791
            sys.stdout = stdout
 
792
            sys.stderr = stderr
 
793
            sys.stdin = stdin
 
794
            return a_callable(*args, **kwargs)
 
795
        finally:
 
796
            sys.stdout = real_stdout
 
797
            sys.stderr = real_stderr
 
798
            sys.stdin = real_stdin
 
799
 
 
800
    def merge(self, branch_from, wt_to):
 
801
        """A helper for tests to do a ui-less merge.
 
802
 
 
803
        This should move to the main library when someone has time to integrate
 
804
        it in.
 
805
        """
 
806
        # minimal ui-less merge.
 
807
        wt_to.branch.fetch(branch_from)
 
808
        base_rev = common_ancestor(branch_from.last_revision(),
 
809
                                   wt_to.branch.last_revision(),
 
810
                                   wt_to.branch.repository)
 
811
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
 
812
                    wt_to.branch.repository.revision_tree(base_rev),
 
813
                    this_tree=wt_to)
 
814
        wt_to.add_pending_merge(branch_from.last_revision())
 
815
 
 
816
 
 
817
BzrTestBase = TestCase
 
818
 
 
819
     
 
820
class TestCaseInTempDir(TestCase):
 
821
    """Derived class that runs a test within a temporary directory.
 
822
 
 
823
    This is useful for tests that need to create a branch, etc.
 
824
 
 
825
    The directory is created in a slightly complex way: for each
 
826
    Python invocation, a new temporary top-level directory is created.
 
827
    All test cases create their own directory within that.  If the
 
828
    tests complete successfully, the directory is removed.
 
829
 
 
830
    InTempDir is an old alias for FunctionalTestCase.
 
831
    """
 
832
 
 
833
    TEST_ROOT = None
 
834
    _TEST_NAME = 'test'
 
835
    OVERRIDE_PYTHON = 'python'
 
836
 
 
837
    def check_file_contents(self, filename, expect):
 
838
        self.log("check contents of file %s" % filename)
 
839
        contents = file(filename, 'r').read()
 
840
        if contents != expect:
 
841
            self.log("expected: %r" % expect)
 
842
            self.log("actually: %r" % contents)
 
843
            self.fail("contents of %s not as expected" % filename)
 
844
 
 
845
    def _make_test_root(self):
 
846
        if TestCaseInTempDir.TEST_ROOT is not None:
 
847
            return
 
848
        i = 0
 
849
        while True:
 
850
            root = u'test%04d.tmp' % i
 
851
            try:
 
852
                os.mkdir(root)
 
853
            except OSError, e:
 
854
                if e.errno == errno.EEXIST:
 
855
                    i += 1
 
856
                    continue
 
857
                else:
 
858
                    raise
 
859
            # successfully created
 
860
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
861
            break
 
862
        # make a fake bzr directory there to prevent any tests propagating
 
863
        # up onto the source directory's real branch
 
864
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
 
865
 
 
866
    def setUp(self):
 
867
        super(TestCaseInTempDir, self).setUp()
 
868
        self._make_test_root()
 
869
        _currentdir = os.getcwdu()
 
870
        # shorten the name, to avoid test failures due to path length
 
871
        short_id = self.id().replace('bzrlib.tests.', '') \
 
872
                   .replace('__main__.', '')[-100:]
 
873
        # it's possible the same test class is run several times for
 
874
        # parameterized tests, so make sure the names don't collide.  
 
875
        i = 0
 
876
        while True:
 
877
            if i > 0:
 
878
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
879
            else:
 
880
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
881
            if os.path.exists(candidate_dir):
 
882
                i = i + 1
 
883
                continue
 
884
            else:
 
885
                self.test_dir = candidate_dir
 
886
                os.mkdir(self.test_dir)
 
887
                os.chdir(self.test_dir)
 
888
                break
 
889
        os.environ['HOME'] = self.test_dir
 
890
        os.environ['APPDATA'] = self.test_dir
 
891
        def _leaveDirectory():
 
892
            os.chdir(_currentdir)
 
893
        self.addCleanup(_leaveDirectory)
 
894
        
 
895
    def build_tree(self, shape, line_endings='native', transport=None):
 
896
        """Build a test tree according to a pattern.
 
897
 
 
898
        shape is a sequence of file specifications.  If the final
 
899
        character is '/', a directory is created.
 
900
 
 
901
        This doesn't add anything to a branch.
 
902
        :param line_endings: Either 'binary' or 'native'
 
903
                             in binary mode, exact contents are written
 
904
                             in native mode, the line endings match the
 
905
                             default platform endings.
 
906
 
 
907
        :param transport: A transport to write to, for building trees on 
 
908
                          VFS's. If the transport is readonly or None,
 
909
                          "." is opened automatically.
 
910
        """
 
911
        # XXX: It's OK to just create them using forward slashes on windows?
 
912
        if transport is None or transport.is_readonly():
 
913
            transport = get_transport(".")
 
914
        for name in shape:
 
915
            self.assert_(isinstance(name, basestring))
 
916
            if name[-1] == '/':
 
917
                transport.mkdir(urlutils.escape(name[:-1]))
 
918
            else:
 
919
                if line_endings == 'binary':
 
920
                    end = '\n'
 
921
                elif line_endings == 'native':
 
922
                    end = os.linesep
 
923
                else:
 
924
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
925
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
926
                transport.put(urlutils.escape(name), StringIO(content))
 
927
 
 
928
    def build_tree_contents(self, shape):
 
929
        build_tree_contents(shape)
 
930
 
 
931
    def failUnlessExists(self, path):
 
932
        """Fail unless path, which may be abs or relative, exists."""
 
933
        self.failUnless(osutils.lexists(path))
 
934
 
 
935
    def failIfExists(self, path):
 
936
        """Fail if path, which may be abs or relative, exists."""
 
937
        self.failIf(osutils.lexists(path))
 
938
        
 
939
    def assertFileEqual(self, content, path):
 
940
        """Fail if path does not contain 'content'."""
 
941
        self.failUnless(osutils.lexists(path))
 
942
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
943
        self.assertEqualDiff(content, open(path, 'r').read())
 
944
 
 
945
 
 
946
class TestCaseWithTransport(TestCaseInTempDir):
 
947
    """A test case that provides get_url and get_readonly_url facilities.
 
948
 
 
949
    These back onto two transport servers, one for readonly access and one for
 
950
    read write access.
 
951
 
 
952
    If no explicit class is provided for readonly access, a
 
953
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
954
    based read write transports.
 
955
 
 
956
    If an explicit class is provided for readonly access, that server and the 
 
957
    readwrite one must both define get_url() as resolving to os.getcwd().
 
958
    """
 
959
 
 
960
    def __init__(self, methodName='testMethod'):
 
961
        super(TestCaseWithTransport, self).__init__(methodName)
 
962
        self.__readonly_server = None
 
963
        self.__server = None
 
964
        self.transport_server = default_transport
 
965
        self.transport_readonly_server = None
 
966
 
 
967
    def get_readonly_url(self, relpath=None):
 
968
        """Get a URL for the readonly transport.
 
969
 
 
970
        This will either be backed by '.' or a decorator to the transport 
 
971
        used by self.get_url()
 
972
        relpath provides for clients to get a path relative to the base url.
 
973
        These should only be downwards relative, not upwards.
 
974
        """
 
975
        base = self.get_readonly_server().get_url()
 
976
        if relpath is not None:
 
977
            if not base.endswith('/'):
 
978
                base = base + '/'
 
979
            base = base + relpath
 
980
        return base
 
981
 
 
982
    def get_readonly_server(self):
 
983
        """Get the server instance for the readonly transport
 
984
 
 
985
        This is useful for some tests with specific servers to do diagnostics.
 
986
        """
 
987
        if self.__readonly_server is None:
 
988
            if self.transport_readonly_server is None:
 
989
                # readonly decorator requested
 
990
                # bring up the server
 
991
                self.get_url()
 
992
                self.__readonly_server = ReadonlyServer()
 
993
                self.__readonly_server.setUp(self.__server)
 
994
            else:
 
995
                self.__readonly_server = self.transport_readonly_server()
 
996
                self.__readonly_server.setUp()
 
997
            self.addCleanup(self.__readonly_server.tearDown)
 
998
        return self.__readonly_server
 
999
 
 
1000
    def get_server(self):
 
1001
        """Get the read/write server instance.
 
1002
 
 
1003
        This is useful for some tests with specific servers that need
 
1004
        diagnostics.
 
1005
        """
 
1006
        if self.__server is None:
 
1007
            self.__server = self.transport_server()
 
1008
            self.__server.setUp()
 
1009
            self.addCleanup(self.__server.tearDown)
 
1010
        return self.__server
 
1011
 
 
1012
    def get_url(self, relpath=None):
 
1013
        """Get a URL for the readwrite transport.
 
1014
 
 
1015
        This will either be backed by '.' or to an equivalent non-file based
 
1016
        facility.
 
1017
        relpath provides for clients to get a path relative to the base url.
 
1018
        These should only be downwards relative, not upwards.
 
1019
        """
 
1020
        base = self.get_server().get_url()
 
1021
        if relpath is not None and relpath != '.':
 
1022
            if not base.endswith('/'):
 
1023
                base = base + '/'
 
1024
            base = base + urlutils.escape(relpath)
 
1025
        return base
 
1026
 
 
1027
    def get_transport(self):
 
1028
        """Return a writeable transport for the test scratch space"""
 
1029
        t = get_transport(self.get_url())
 
1030
        self.assertFalse(t.is_readonly())
 
1031
        return t
 
1032
 
 
1033
    def get_readonly_transport(self):
 
1034
        """Return a readonly transport for the test scratch space
 
1035
        
 
1036
        This can be used to test that operations which should only need
 
1037
        readonly access in fact do not try to write.
 
1038
        """
 
1039
        t = get_transport(self.get_readonly_url())
 
1040
        self.assertTrue(t.is_readonly())
 
1041
        return t
 
1042
 
 
1043
    def make_branch(self, relpath, format=None):
 
1044
        """Create a branch on the transport at relpath."""
 
1045
        repo = self.make_repository(relpath, format=format)
 
1046
        return repo.bzrdir.create_branch()
 
1047
 
 
1048
    def make_bzrdir(self, relpath, format=None):
 
1049
        try:
 
1050
            url = self.get_url(relpath)
 
1051
            mutter('relpath %r => url %r', relpath, url)
 
1052
            segments = url.split('/')
 
1053
            if segments and segments[-1] not in ('', '.'):
 
1054
                parent = '/'.join(segments[:-1])
 
1055
                t = get_transport(parent)
 
1056
                try:
 
1057
                    t.mkdir(segments[-1])
 
1058
                except errors.FileExists:
 
1059
                    pass
 
1060
            if format is None:
 
1061
                format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
1062
            # FIXME: make this use a single transport someday. RBC 20060418
 
1063
            return format.initialize_on_transport(get_transport(relpath))
 
1064
        except errors.UninitializableFormat:
 
1065
            raise TestSkipped("Format %s is not initializable." % format)
 
1066
 
 
1067
    def make_repository(self, relpath, shared=False, format=None):
 
1068
        """Create a repository on our default transport at relpath."""
 
1069
        made_control = self.make_bzrdir(relpath, format=format)
 
1070
        return made_control.create_repository(shared=shared)
 
1071
 
 
1072
    def make_branch_and_tree(self, relpath, format=None):
 
1073
        """Create a branch on the transport and a tree locally.
 
1074
 
 
1075
        Returns the tree.
 
1076
        """
 
1077
        # TODO: always use the local disk path for the working tree,
 
1078
        # this obviously requires a format that supports branch references
 
1079
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1080
        # RBC 20060208
 
1081
        b = self.make_branch(relpath, format=format)
 
1082
        try:
 
1083
            return b.bzrdir.create_workingtree()
 
1084
        except errors.NotLocalUrl:
 
1085
            # new formats - catch No tree error and create
 
1086
            # a branch reference and a checkout.
 
1087
            # old formats at that point - raise TestSkipped.
 
1088
            # TODO: rbc 20060208
 
1089
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
1090
 
 
1091
    def assertIsDirectory(self, relpath, transport):
 
1092
        """Assert that relpath within transport is a directory.
 
1093
 
 
1094
        This may not be possible on all transports; in that case it propagates
 
1095
        a TransportNotPossible.
 
1096
        """
 
1097
        try:
 
1098
            mode = transport.stat(relpath).st_mode
 
1099
        except errors.NoSuchFile:
 
1100
            self.fail("path %s is not a directory; no such file"
 
1101
                      % (relpath))
 
1102
        if not stat.S_ISDIR(mode):
 
1103
            self.fail("path %s is not a directory; has mode %#o"
 
1104
                      % (relpath, mode))
 
1105
 
 
1106
 
 
1107
class ChrootedTestCase(TestCaseWithTransport):
 
1108
    """A support class that provides readonly urls outside the local namespace.
 
1109
 
 
1110
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1111
    is then we are chrooted already, if it is not then an HttpServer is used
 
1112
    for readonly urls.
 
1113
 
 
1114
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1115
                       be used without needed to redo it when a different 
 
1116
                       subclass is in use ?
 
1117
    """
 
1118
 
 
1119
    def setUp(self):
 
1120
        super(ChrootedTestCase, self).setUp()
 
1121
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
1122
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
1123
 
 
1124
 
 
1125
def filter_suite_by_re(suite, pattern):
 
1126
    result = TestSuite()
 
1127
    filter_re = re.compile(pattern)
 
1128
    for test in iter_suite_tests(suite):
 
1129
        if filter_re.search(test.id()):
 
1130
            result.addTest(test)
 
1131
    return result
 
1132
 
 
1133
 
 
1134
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
1135
              stop_on_failure=False, keep_output=False,
 
1136
              transport=None, lsprof_timed=None):
 
1137
    TestCaseInTempDir._TEST_NAME = name
 
1138
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
1139
    if verbose:
 
1140
        verbosity = 2
 
1141
        pb = None
 
1142
    else:
 
1143
        verbosity = 1
 
1144
        pb = progress.ProgressBar()
 
1145
    runner = TextTestRunner(stream=sys.stdout,
 
1146
                            descriptions=0,
 
1147
                            verbosity=verbosity,
 
1148
                            keep_output=keep_output,
 
1149
                            pb=pb)
 
1150
    runner.stop_on_failure=stop_on_failure
 
1151
    if pattern != '.*':
 
1152
        suite = filter_suite_by_re(suite, pattern)
 
1153
    result = runner.run(suite)
 
1154
    return result.wasSuccessful()
 
1155
 
 
1156
 
 
1157
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1158
             keep_output=False,
 
1159
             transport=None,
 
1160
             test_suite_factory=None,
 
1161
             lsprof_timed=None):
 
1162
    """Run the whole test suite under the enhanced runner"""
 
1163
    global default_transport
 
1164
    if transport is None:
 
1165
        transport = default_transport
 
1166
    old_transport = default_transport
 
1167
    default_transport = transport
 
1168
    try:
 
1169
        if test_suite_factory is None:
 
1170
            suite = test_suite()
 
1171
        else:
 
1172
            suite = test_suite_factory()
 
1173
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1174
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1175
                     transport=transport,
 
1176
                     lsprof_timed=lsprof_timed)
 
1177
    finally:
 
1178
        default_transport = old_transport
 
1179
 
 
1180
 
 
1181
def test_suite():
 
1182
    """Build and return TestSuite for the whole of bzrlib.
 
1183
    
 
1184
    This function can be replaced if you need to change the default test
 
1185
    suite on a global basis, but it is not encouraged.
 
1186
    """
39
1187
    from doctest import DocTestSuite
40
 
    import os
41
 
    import shutil
42
 
    import time
43
 
    import sys
44
 
    import unittest
45
 
 
46
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
47
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
48
 
        if m not in MODULES_TO_DOCTEST:
49
 
            MODULES_TO_DOCTEST.append(m)
50
 
            
51
 
    for m in (bzrlib.selftest.whitebox,
52
 
              bzrlib.selftest.versioning,
53
 
              bzrlib.selftest.testinv,
54
 
              bzrlib.selftest.testmerge3,
55
 
              bzrlib.selftest.testhashcache,
56
 
              bzrlib.selftest.teststatus,
57
 
              bzrlib.selftest.blackbox,
58
 
              bzrlib.selftest.testhashcache,
59
 
              bzrlib.selftest.testrevisionnamespaces,
60
 
              bzrlib.selftest.testbranch,
61
 
              ):
62
 
        if m not in MODULES_TO_TEST:
63
 
            MODULES_TO_TEST.append(m)
64
 
 
65
 
 
66
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
67
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
68
 
 
69
 
    print
 
1188
 
 
1189
    global MODULES_TO_DOCTEST
 
1190
 
 
1191
    testmod_names = [ \
 
1192
                   'bzrlib.tests.test_ancestry',
 
1193
                   'bzrlib.tests.test_api',
 
1194
                   'bzrlib.tests.test_bad_files',
 
1195
                   'bzrlib.tests.test_branch',
 
1196
                   'bzrlib.tests.test_bundle',
 
1197
                   'bzrlib.tests.test_bzrdir',
 
1198
                   'bzrlib.tests.test_command',
 
1199
                   'bzrlib.tests.test_commit',
 
1200
                   'bzrlib.tests.test_commit_merge',
 
1201
                   'bzrlib.tests.test_config',
 
1202
                   'bzrlib.tests.test_conflicts',
 
1203
                   'bzrlib.tests.test_decorators',
 
1204
                   'bzrlib.tests.test_diff',
 
1205
                   'bzrlib.tests.test_doc_generate',
 
1206
                   'bzrlib.tests.test_errors',
 
1207
                   'bzrlib.tests.test_escaped_store',
 
1208
                   'bzrlib.tests.test_fetch',
 
1209
                   'bzrlib.tests.test_gpg',
 
1210
                   'bzrlib.tests.test_graph',
 
1211
                   'bzrlib.tests.test_hashcache',
 
1212
                   'bzrlib.tests.test_http',
 
1213
                   'bzrlib.tests.test_identitymap',
 
1214
                   'bzrlib.tests.test_inv',
 
1215
                   'bzrlib.tests.test_knit',
 
1216
                   'bzrlib.tests.test_lockdir',
 
1217
                   'bzrlib.tests.test_lockable_files',
 
1218
                   'bzrlib.tests.test_log',
 
1219
                   'bzrlib.tests.test_merge',
 
1220
                   'bzrlib.tests.test_merge3',
 
1221
                   'bzrlib.tests.test_merge_core',
 
1222
                   'bzrlib.tests.test_missing',
 
1223
                   'bzrlib.tests.test_msgeditor',
 
1224
                   'bzrlib.tests.test_nonascii',
 
1225
                   'bzrlib.tests.test_options',
 
1226
                   'bzrlib.tests.test_osutils',
 
1227
                   'bzrlib.tests.test_patch',
 
1228
                   'bzrlib.tests.test_patches',
 
1229
                   'bzrlib.tests.test_permissions',
 
1230
                   'bzrlib.tests.test_plugins',
 
1231
                   'bzrlib.tests.test_progress',
 
1232
                   'bzrlib.tests.test_reconcile',
 
1233
                   'bzrlib.tests.test_repository',
 
1234
                   'bzrlib.tests.test_revision',
 
1235
                   'bzrlib.tests.test_revisionnamespaces',
 
1236
                   'bzrlib.tests.test_revprops',
 
1237
                   'bzrlib.tests.test_rio',
 
1238
                   'bzrlib.tests.test_sampler',
 
1239
                   'bzrlib.tests.test_selftest',
 
1240
                   'bzrlib.tests.test_setup',
 
1241
                   'bzrlib.tests.test_sftp_transport',
 
1242
                   'bzrlib.tests.test_smart_add',
 
1243
                   'bzrlib.tests.test_source',
 
1244
                   'bzrlib.tests.test_status',
 
1245
                   'bzrlib.tests.test_store',
 
1246
                   'bzrlib.tests.test_symbol_versioning',
 
1247
                   'bzrlib.tests.test_testament',
 
1248
                   'bzrlib.tests.test_textfile',
 
1249
                   'bzrlib.tests.test_textmerge',
 
1250
                   'bzrlib.tests.test_trace',
 
1251
                   'bzrlib.tests.test_transactions',
 
1252
                   'bzrlib.tests.test_transform',
 
1253
                   'bzrlib.tests.test_transport',
 
1254
                   'bzrlib.tests.test_tsort',
 
1255
                   'bzrlib.tests.test_tuned_gzip',
 
1256
                   'bzrlib.tests.test_ui',
 
1257
                   'bzrlib.tests.test_upgrade',
 
1258
                   'bzrlib.tests.test_urlutils',
 
1259
                   'bzrlib.tests.test_versionedfile',
 
1260
                   'bzrlib.tests.test_weave',
 
1261
                   'bzrlib.tests.test_whitebox',
 
1262
                   'bzrlib.tests.test_workingtree',
 
1263
                   'bzrlib.tests.test_xml',
 
1264
                   ]
 
1265
    test_transport_implementations = [
 
1266
        'bzrlib.tests.test_transport_implementations']
70
1267
 
71
1268
    suite = TestSuite()
72
 
 
73
 
    # should also test bzrlib.merge_core, but they seem to be out of date with
74
 
    # the code.
75
 
 
76
 
 
77
 
    # XXX: python2.3's TestLoader() doesn't seem to find all the
78
 
    # tests; don't know why
 
1269
    loader = TestUtil.TestLoader()
 
1270
    from bzrlib.transport import TransportTestProviderAdapter
 
1271
    adapter = TransportTestProviderAdapter()
 
1272
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1273
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1274
    for package in packages_to_test():
 
1275
        suite.addTest(package.test_suite())
79
1276
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
 
1277
        suite.addTest(loader.loadTestsFromModule(m))
82
1278
    for m in (MODULES_TO_DOCTEST):
83
1279
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
90
 
 
91
 
    return run_suite(suite, 'testbzr')
92
 
 
93
 
 
94
 
 
 
1280
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1281
        if getattr(plugin, 'test_suite', None) is not None:
 
1282
            suite.addTest(plugin.test_suite())
 
1283
    return suite
 
1284
 
 
1285
 
 
1286
def adapt_modules(mods_list, adapter, loader, suite):
 
1287
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1288
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1289
        suite.addTests(adapter.adapt(test))