~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Erik Bågfors
  • Date: 2006-02-03 19:50:59 UTC
  • mto: (1185.50.77 bzr-jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1554.
  • Revision ID: erik@bagfors.nu-20060203195059-1cf8ff5aa68de0ea
Support for plugins to register log formatters and set default formatter
Also, change one command line option for "log"

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
from cStringIO import StringIO
 
25
import difflib
 
26
import errno
 
27
import logging
 
28
import os
 
29
import re
 
30
import shutil
 
31
import stat
 
32
import sys
 
33
import tempfile
 
34
import unittest
 
35
import time
 
36
import codecs
 
37
 
 
38
import bzrlib.branch
 
39
import bzrlib.commands
 
40
from bzrlib.errors import (BzrError,
 
41
                           FileExists,
 
42
                           UninitializableFormat,
 
43
                           )
 
44
import bzrlib.inventory
 
45
import bzrlib.iterablefile
 
46
import bzrlib.merge3
 
47
import bzrlib.osutils
 
48
import bzrlib.osutils as osutils
 
49
import bzrlib.plugin
 
50
import bzrlib.store
 
51
import bzrlib.trace
 
52
from bzrlib.transport import urlescape
 
53
import bzrlib.transport
 
54
from bzrlib.transport.local import LocalRelpathServer
 
55
from bzrlib.transport.readonly import ReadonlyServer
 
56
from bzrlib.trace import mutter
 
57
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
58
from bzrlib.tests.treeshape import build_tree_contents
 
59
from bzrlib.workingtree import WorkingTree
 
60
 
 
61
default_transport = LocalRelpathServer
19
62
 
20
63
MODULES_TO_TEST = []
21
 
MODULES_TO_DOCTEST = []
22
 
 
23
 
def selftest():
24
 
    from unittest import TestLoader, TestSuite
25
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
26
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
27
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
28
 
 
29
 
    import bzrlib.selftest.whitebox
30
 
    import bzrlib.selftest.blackbox
31
 
    import bzrlib.selftest.versioning
32
 
    import bzrlib.selftest.testmerge3
33
 
    import bzrlib.selftest.testhashcache
34
 
    import bzrlib.selftest.testrevisionnamespaces
35
 
    import bzrlib.selftest.testbranch
36
 
    import bzrlib.selftest.teststatus
37
 
    import bzrlib.selftest.testinv
38
 
    import bzrlib.merge_core
 
64
MODULES_TO_DOCTEST = [
 
65
                      bzrlib.branch,
 
66
                      bzrlib.commands,
 
67
                      bzrlib.errors,
 
68
                      bzrlib.inventory,
 
69
                      bzrlib.iterablefile,
 
70
                      bzrlib.merge3,
 
71
                      bzrlib.option,
 
72
                      bzrlib.osutils,
 
73
                      bzrlib.store
 
74
                      ]
 
75
def packages_to_test():
 
76
    """Return a list of packages to test.
 
77
 
 
78
    The packages are not globally imported so that import failures are
 
79
    triggered when running selftest, not when importing the command.
 
80
    """
 
81
    import bzrlib.doc
 
82
    import bzrlib.tests.blackbox
 
83
    import bzrlib.tests.branch_implementations
 
84
    return [
 
85
            bzrlib.doc,
 
86
            bzrlib.tests.branch_implementations,
 
87
            ]
 
88
 
 
89
 
 
90
class _MyResult(unittest._TextTestResult):
 
91
    """Custom TestResult.
 
92
 
 
93
    Shows output in a different format, including displaying runtime for tests.
 
94
    """
 
95
    stop_early = False
 
96
 
 
97
    def _elapsedTime(self):
 
98
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
99
 
 
100
    def startTest(self, test):
 
101
        unittest.TestResult.startTest(self, test)
 
102
        # In a short description, the important words are in
 
103
        # the beginning, but in an id, the important words are
 
104
        # at the end
 
105
        SHOW_DESCRIPTIONS = False
 
106
        if self.showAll:
 
107
            width = osutils.terminal_width()
 
108
            name_width = width - 15
 
109
            what = None
 
110
            if SHOW_DESCRIPTIONS:
 
111
                what = test.shortDescription()
 
112
                if what:
 
113
                    if len(what) > name_width:
 
114
                        what = what[:name_width-3] + '...'
 
115
            if what is None:
 
116
                what = test.id()
 
117
                if what.startswith('bzrlib.tests.'):
 
118
                    what = what[13:]
 
119
                if len(what) > name_width:
 
120
                    what = '...' + what[3-name_width:]
 
121
            what = what.ljust(name_width)
 
122
            self.stream.write(what)
 
123
        self.stream.flush()
 
124
        self._start_time = time.time()
 
125
 
 
126
    def addError(self, test, err):
 
127
        if isinstance(err[1], TestSkipped):
 
128
            return self.addSkipped(test, err)    
 
129
        unittest.TestResult.addError(self, test, err)
 
130
        if self.showAll:
 
131
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
132
        elif self.dots:
 
133
            self.stream.write('E')
 
134
        self.stream.flush()
 
135
        if self.stop_early:
 
136
            self.stop()
 
137
 
 
138
    def addFailure(self, test, err):
 
139
        unittest.TestResult.addFailure(self, test, err)
 
140
        if self.showAll:
 
141
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
 
142
        elif self.dots:
 
143
            self.stream.write('F')
 
144
        self.stream.flush()
 
145
        if self.stop_early:
 
146
            self.stop()
 
147
 
 
148
    def addSuccess(self, test):
 
149
        if self.showAll:
 
150
            self.stream.writeln('   OK %s' % self._elapsedTime())
 
151
        elif self.dots:
 
152
            self.stream.write('~')
 
153
        self.stream.flush()
 
154
        unittest.TestResult.addSuccess(self, test)
 
155
 
 
156
    def addSkipped(self, test, skip_excinfo):
 
157
        if self.showAll:
 
158
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
159
            print >>self.stream, '     %s' % skip_excinfo[1]
 
160
        elif self.dots:
 
161
            self.stream.write('S')
 
162
        self.stream.flush()
 
163
        # seems best to treat this as success from point-of-view of unittest
 
164
        # -- it actually does nothing so it barely matters :)
 
165
        unittest.TestResult.addSuccess(self, test)
 
166
 
 
167
    def printErrorList(self, flavour, errors):
 
168
        for test, err in errors:
 
169
            self.stream.writeln(self.separator1)
 
170
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
171
            if getattr(test, '_get_log', None) is not None:
 
172
                print >>self.stream
 
173
                print >>self.stream, \
 
174
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
175
                print >>self.stream, test._get_log()
 
176
                print >>self.stream, \
 
177
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
178
            self.stream.writeln(self.separator2)
 
179
            self.stream.writeln("%s" % err)
 
180
 
 
181
 
 
182
class TextTestRunner(unittest.TextTestRunner):
 
183
    stop_on_failure = False
 
184
 
 
185
    def _makeResult(self):
 
186
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
187
        result.stop_early = self.stop_on_failure
 
188
        return result
 
189
 
 
190
 
 
191
def iter_suite_tests(suite):
 
192
    """Return all tests in a suite, recursing through nested suites"""
 
193
    for item in suite._tests:
 
194
        if isinstance(item, unittest.TestCase):
 
195
            yield item
 
196
        elif isinstance(item, unittest.TestSuite):
 
197
            for r in iter_suite_tests(item):
 
198
                yield r
 
199
        else:
 
200
            raise Exception('unknown object %r inside test suite %r'
 
201
                            % (item, suite))
 
202
 
 
203
 
 
204
class TestSkipped(Exception):
 
205
    """Indicates that a test was intentionally skipped, rather than failing."""
 
206
    # XXX: Not used yet
 
207
 
 
208
 
 
209
class CommandFailed(Exception):
 
210
    pass
 
211
 
 
212
class TestCase(unittest.TestCase):
 
213
    """Base class for bzr unit tests.
 
214
    
 
215
    Tests that need access to disk resources should subclass 
 
216
    TestCaseInTempDir not TestCase.
 
217
 
 
218
    Error and debug log messages are redirected from their usual
 
219
    location into a temporary file, the contents of which can be
 
220
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
221
    so that it can also capture file IO.  When the test completes this file
 
222
    is read into memory and removed from disk.
 
223
       
 
224
    There are also convenience functions to invoke bzr's command-line
 
225
    routine, and to build and check bzr trees.
 
226
   
 
227
    In addition to the usual method of overriding tearDown(), this class also
 
228
    allows subclasses to register functions into the _cleanups list, which is
 
229
    run in order as the object is torn down.  It's less likely this will be
 
230
    accidentally overlooked.
 
231
    """
 
232
 
 
233
    BZRPATH = 'bzr'
 
234
    _log_file_name = None
 
235
    _log_contents = ''
 
236
 
 
237
    def __init__(self, methodName='testMethod'):
 
238
        super(TestCase, self).__init__(methodName)
 
239
        self._cleanups = []
 
240
 
 
241
    def setUp(self):
 
242
        unittest.TestCase.setUp(self)
 
243
        self._cleanEnvironment()
 
244
        bzrlib.trace.disable_default_logging()
 
245
        self._startLogFile()
 
246
 
 
247
    def _ndiff_strings(self, a, b):
 
248
        """Return ndiff between two strings containing lines.
 
249
        
 
250
        A trailing newline is added if missing to make the strings
 
251
        print properly."""
 
252
        if b and b[-1] != '\n':
 
253
            b += '\n'
 
254
        if a and a[-1] != '\n':
 
255
            a += '\n'
 
256
        difflines = difflib.ndiff(a.splitlines(True),
 
257
                                  b.splitlines(True),
 
258
                                  linejunk=lambda x: False,
 
259
                                  charjunk=lambda x: False)
 
260
        return ''.join(difflines)
 
261
 
 
262
    def assertEqualDiff(self, a, b):
 
263
        """Assert two texts are equal, if not raise an exception.
 
264
        
 
265
        This is intended for use with multi-line strings where it can 
 
266
        be hard to find the differences by eye.
 
267
        """
 
268
        # TODO: perhaps override assertEquals to call this for strings?
 
269
        if a == b:
 
270
            return
 
271
        raise AssertionError("texts not equal:\n" + 
 
272
                             self._ndiff_strings(a, b))      
 
273
        
 
274
    def assertStartsWith(self, s, prefix):
 
275
        if not s.startswith(prefix):
 
276
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
277
 
 
278
    def assertEndsWith(self, s, suffix):
 
279
        if not s.endswith(prefix):
 
280
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
281
 
 
282
    def assertContainsRe(self, haystack, needle_re):
 
283
        """Assert that a contains something matching a regular expression."""
 
284
        if not re.search(needle_re, haystack):
 
285
            raise AssertionError('pattern "%s" not found in "%s"'
 
286
                    % (needle_re, haystack))
 
287
 
 
288
    def AssertSubset(self, sublist, superlist):
 
289
        """Assert that every entry in sublist is present in superlist."""
 
290
        missing = []
 
291
        for entry in sublist:
 
292
            if entry not in superlist:
 
293
                missing.append(entry)
 
294
        if len(missing) > 0:
 
295
            raise AssertionError("value(s) %r not present in container %r" % 
 
296
                                 (missing, superlist))
 
297
 
 
298
    def assertIs(self, left, right):
 
299
        if not (left is right):
 
300
            raise AssertionError("%r is not %r." % (left, right))
 
301
 
 
302
    def assertTransportMode(self, transport, path, mode):
 
303
        """Fail if a path does not have mode mode.
 
304
        
 
305
        If modes are not supported on this platform, the test is skipped.
 
306
        """
 
307
        if sys.platform == 'win32':
 
308
            return
 
309
        path_stat = transport.stat(path)
 
310
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
311
        self.assertEqual(mode, actual_mode,
 
312
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
313
 
 
314
    def _startLogFile(self):
 
315
        """Send bzr and test log messages to a temporary file.
 
316
 
 
317
        The file is removed as the test is torn down.
 
318
        """
 
319
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
320
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
321
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
322
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
323
        self._log_file_name = name
 
324
        self.addCleanup(self._finishLogFile)
 
325
 
 
326
    def _finishLogFile(self):
 
327
        """Finished with the log file.
 
328
 
 
329
        Read contents into memory, close, and delete.
 
330
        """
 
331
        bzrlib.trace.disable_test_log(self._log_nonce)
 
332
        self._log_file.seek(0)
 
333
        self._log_contents = self._log_file.read()
 
334
        self._log_file.close()
 
335
        os.remove(self._log_file_name)
 
336
        self._log_file = self._log_file_name = None
 
337
 
 
338
    def addCleanup(self, callable):
 
339
        """Arrange to run a callable when this case is torn down.
 
340
 
 
341
        Callables are run in the reverse of the order they are registered, 
 
342
        ie last-in first-out.
 
343
        """
 
344
        if callable in self._cleanups:
 
345
            raise ValueError("cleanup function %r already registered on %s" 
 
346
                    % (callable, self))
 
347
        self._cleanups.append(callable)
 
348
 
 
349
    def _cleanEnvironment(self):
 
350
        new_env = {
 
351
            'HOME': os.getcwd(),
 
352
            'APPDATA': os.getcwd(),
 
353
            'BZREMAIL': None,
 
354
            'EMAIL': None,
 
355
        }
 
356
        self.__old_env = {}
 
357
        self.addCleanup(self._restoreEnvironment)
 
358
        for name, value in new_env.iteritems():
 
359
            self._captureVar(name, value)
 
360
 
 
361
 
 
362
    def _captureVar(self, name, newvalue):
 
363
        """Set an environment variable, preparing it to be reset when finished."""
 
364
        self.__old_env[name] = os.environ.get(name, None)
 
365
        if newvalue is None:
 
366
            if name in os.environ:
 
367
                del os.environ[name]
 
368
        else:
 
369
            os.environ[name] = newvalue
 
370
 
 
371
    @staticmethod
 
372
    def _restoreVar(name, value):
 
373
        if value is None:
 
374
            if name in os.environ:
 
375
                del os.environ[name]
 
376
        else:
 
377
            os.environ[name] = value
 
378
 
 
379
    def _restoreEnvironment(self):
 
380
        for name, value in self.__old_env.iteritems():
 
381
            self._restoreVar(name, value)
 
382
 
 
383
    def tearDown(self):
 
384
        self._runCleanups()
 
385
        unittest.TestCase.tearDown(self)
 
386
 
 
387
    def _runCleanups(self):
 
388
        """Run registered cleanup functions. 
 
389
 
 
390
        This should only be called from TestCase.tearDown.
 
391
        """
 
392
        # TODO: Perhaps this should keep running cleanups even if 
 
393
        # one of them fails?
 
394
        for cleanup_fn in reversed(self._cleanups):
 
395
            cleanup_fn()
 
396
 
 
397
    def log(self, *args):
 
398
        mutter(*args)
 
399
 
 
400
    def _get_log(self):
 
401
        """Return as a string the log for this test"""
 
402
        if self._log_file_name:
 
403
            return open(self._log_file_name).read()
 
404
        else:
 
405
            return self._log_contents
 
406
        # TODO: Delete the log after it's been read in
 
407
 
 
408
    def capture(self, cmd, retcode=0):
 
409
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
410
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
411
 
 
412
    def run_bzr_captured(self, argv, retcode=0):
 
413
        """Invoke bzr and return (stdout, stderr).
 
414
 
 
415
        Useful for code that wants to check the contents of the
 
416
        output, the way error messages are presented, etc.
 
417
 
 
418
        This should be the main method for tests that want to exercise the
 
419
        overall behavior of the bzr application (rather than a unit test
 
420
        or a functional test of the library.)
 
421
 
 
422
        Much of the old code runs bzr by forking a new copy of Python, but
 
423
        that is slower, harder to debug, and generally not necessary.
 
424
 
 
425
        This runs bzr through the interface that catches and reports
 
426
        errors, and with logging set to something approximating the
 
427
        default, so that error reporting can be checked.
 
428
 
 
429
        argv -- arguments to invoke bzr
 
430
        retcode -- expected return code, or None for don't-care.
 
431
        """
 
432
        stdout = StringIO()
 
433
        stderr = StringIO()
 
434
        self.log('run bzr: %s', ' '.join(argv))
 
435
        # FIXME: don't call into logging here
 
436
        handler = logging.StreamHandler(stderr)
 
437
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
438
        handler.setLevel(logging.INFO)
 
439
        logger = logging.getLogger('')
 
440
        logger.addHandler(handler)
 
441
        try:
 
442
            result = self.apply_redirected(None, stdout, stderr,
 
443
                                           bzrlib.commands.run_bzr_catch_errors,
 
444
                                           argv)
 
445
        finally:
 
446
            logger.removeHandler(handler)
 
447
        out = stdout.getvalue()
 
448
        err = stderr.getvalue()
 
449
        if out:
 
450
            self.log('output:\n%s', out)
 
451
        if err:
 
452
            self.log('errors:\n%s', err)
 
453
        if retcode is not None:
 
454
            self.assertEquals(result, retcode)
 
455
        return out, err
 
456
 
 
457
    def run_bzr(self, *args, **kwargs):
 
458
        """Invoke bzr, as if it were run from the command line.
 
459
 
 
460
        This should be the main method for tests that want to exercise the
 
461
        overall behavior of the bzr application (rather than a unit test
 
462
        or a functional test of the library.)
 
463
 
 
464
        This sends the stdout/stderr results into the test's log,
 
465
        where it may be useful for debugging.  See also run_captured.
 
466
        """
 
467
        retcode = kwargs.pop('retcode', 0)
 
468
        return self.run_bzr_captured(args, retcode)
 
469
 
 
470
    def check_inventory_shape(self, inv, shape):
 
471
        """Compare an inventory to a list of expected names.
 
472
 
 
473
        Fail if they are not precisely equal.
 
474
        """
 
475
        extras = []
 
476
        shape = list(shape)             # copy
 
477
        for path, ie in inv.entries():
 
478
            name = path.replace('\\', '/')
 
479
            if ie.kind == 'dir':
 
480
                name = name + '/'
 
481
            if name in shape:
 
482
                shape.remove(name)
 
483
            else:
 
484
                extras.append(name)
 
485
        if shape:
 
486
            self.fail("expected paths not found in inventory: %r" % shape)
 
487
        if extras:
 
488
            self.fail("unexpected paths found in inventory: %r" % extras)
 
489
 
 
490
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
491
                         a_callable=None, *args, **kwargs):
 
492
        """Call callable with redirected std io pipes.
 
493
 
 
494
        Returns the return code."""
 
495
        if not callable(a_callable):
 
496
            raise ValueError("a_callable must be callable.")
 
497
        if stdin is None:
 
498
            stdin = StringIO("")
 
499
        if stdout is None:
 
500
            if getattr(self, "_log_file", None) is not None:
 
501
                stdout = self._log_file
 
502
            else:
 
503
                stdout = StringIO()
 
504
        if stderr is None:
 
505
            if getattr(self, "_log_file", None is not None):
 
506
                stderr = self._log_file
 
507
            else:
 
508
                stderr = StringIO()
 
509
        real_stdin = sys.stdin
 
510
        real_stdout = sys.stdout
 
511
        real_stderr = sys.stderr
 
512
        try:
 
513
            sys.stdout = stdout
 
514
            sys.stderr = stderr
 
515
            sys.stdin = stdin
 
516
            return a_callable(*args, **kwargs)
 
517
        finally:
 
518
            sys.stdout = real_stdout
 
519
            sys.stderr = real_stderr
 
520
            sys.stdin = real_stdin
 
521
 
 
522
 
 
523
BzrTestBase = TestCase
 
524
 
 
525
     
 
526
class TestCaseInTempDir(TestCase):
 
527
    """Derived class that runs a test within a temporary directory.
 
528
 
 
529
    This is useful for tests that need to create a branch, etc.
 
530
 
 
531
    The directory is created in a slightly complex way: for each
 
532
    Python invocation, a new temporary top-level directory is created.
 
533
    All test cases create their own directory within that.  If the
 
534
    tests complete successfully, the directory is removed.
 
535
 
 
536
    InTempDir is an old alias for FunctionalTestCase.
 
537
    """
 
538
 
 
539
    TEST_ROOT = None
 
540
    _TEST_NAME = 'test'
 
541
    OVERRIDE_PYTHON = 'python'
 
542
 
 
543
    def check_file_contents(self, filename, expect):
 
544
        self.log("check contents of file %s" % filename)
 
545
        contents = file(filename, 'r').read()
 
546
        if contents != expect:
 
547
            self.log("expected: %r" % expect)
 
548
            self.log("actually: %r" % contents)
 
549
            self.fail("contents of %s not as expected" % filename)
 
550
 
 
551
    def _make_test_root(self):
 
552
        if TestCaseInTempDir.TEST_ROOT is not None:
 
553
            return
 
554
        i = 0
 
555
        while True:
 
556
            root = u'test%04d.tmp' % i
 
557
            try:
 
558
                os.mkdir(root)
 
559
            except OSError, e:
 
560
                if e.errno == errno.EEXIST:
 
561
                    i += 1
 
562
                    continue
 
563
                else:
 
564
                    raise
 
565
            # successfully created
 
566
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
567
            break
 
568
        # make a fake bzr directory there to prevent any tests propagating
 
569
        # up onto the source directory's real branch
 
570
        os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
 
571
 
 
572
    def setUp(self):
 
573
        super(TestCaseInTempDir, self).setUp()
 
574
        self._make_test_root()
 
575
        _currentdir = os.getcwdu()
 
576
        short_id = self.id().replace('bzrlib.tests.', '') \
 
577
                   .replace('__main__.', '')
 
578
        self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
 
579
        os.mkdir(self.test_dir)
 
580
        os.chdir(self.test_dir)
 
581
        os.environ['HOME'] = self.test_dir
 
582
        os.environ['APPDATA'] = self.test_dir
 
583
        def _leaveDirectory():
 
584
            os.chdir(_currentdir)
 
585
        self.addCleanup(_leaveDirectory)
 
586
        
 
587
    def build_tree(self, shape, line_endings='native', transport=None):
 
588
        """Build a test tree according to a pattern.
 
589
 
 
590
        shape is a sequence of file specifications.  If the final
 
591
        character is '/', a directory is created.
 
592
 
 
593
        This doesn't add anything to a branch.
 
594
        :param line_endings: Either 'binary' or 'native'
 
595
                             in binary mode, exact contents are written
 
596
                             in native mode, the line endings match the
 
597
                             default platform endings.
 
598
 
 
599
        :param transport: A transport to write to, for building trees on 
 
600
                          VFS's. If the transport is readonly or None,
 
601
                          "." is opened automatically.
 
602
        """
 
603
        # XXX: It's OK to just create them using forward slashes on windows?
 
604
        if transport is None or transport.is_readonly():
 
605
            transport = bzrlib.transport.get_transport(".")
 
606
        for name in shape:
 
607
            self.assert_(isinstance(name, basestring))
 
608
            if name[-1] == '/':
 
609
                transport.mkdir(urlescape(name[:-1]))
 
610
            else:
 
611
                if line_endings == 'binary':
 
612
                    end = '\n'
 
613
                elif line_endings == 'native':
 
614
                    end = os.linesep
 
615
                else:
 
616
                    raise BzrError('Invalid line ending request %r' % (line_endings,))
 
617
                content = "contents of %s%s" % (name, end)
 
618
                transport.put(urlescape(name), StringIO(content))
 
619
 
 
620
    def build_tree_contents(self, shape):
 
621
        build_tree_contents(shape)
 
622
 
 
623
    def failUnlessExists(self, path):
 
624
        """Fail unless path, which may be abs or relative, exists."""
 
625
        self.failUnless(osutils.lexists(path))
 
626
 
 
627
    def failIfExists(self, path):
 
628
        """Fail if path, which may be abs or relative, exists."""
 
629
        self.failIf(osutils.lexists(path))
 
630
        
 
631
    def assertFileEqual(self, content, path):
 
632
        """Fail if path does not contain 'content'."""
 
633
        self.failUnless(osutils.lexists(path))
 
634
        self.assertEqualDiff(content, open(path, 'r').read())
 
635
 
 
636
 
 
637
class TestCaseWithTransport(TestCaseInTempDir):
 
638
    """A test case that provides get_url and get_readonly_url facilities.
 
639
 
 
640
    These back onto two transport servers, one for readonly access and one for
 
641
    read write access.
 
642
 
 
643
    If no explicit class is provided for readonly access, a
 
644
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
645
    based read write transports.
 
646
 
 
647
    If an explicit class is provided for readonly access, that server and the 
 
648
    readwrite one must both define get_url() as resolving to os.getcwd().
 
649
    """
 
650
 
 
651
    def __init__(self, methodName='testMethod'):
 
652
        super(TestCaseWithTransport, self).__init__(methodName)
 
653
        self.__readonly_server = None
 
654
        self.__server = None
 
655
        self.transport_server = default_transport
 
656
        self.transport_readonly_server = None
 
657
 
 
658
    def get_readonly_url(self, relpath=None):
 
659
        """Get a URL for the readonly transport.
 
660
 
 
661
        This will either be backed by '.' or a decorator to the transport 
 
662
        used by self.get_url()
 
663
        relpath provides for clients to get a path relative to the base url.
 
664
        These should only be downwards relative, not upwards.
 
665
        """
 
666
        if self.__readonly_server is None:
 
667
            if self.transport_readonly_server is None:
 
668
                # readonly decorator requested
 
669
                # bring up the server
 
670
                self.get_url()
 
671
                self.__readonly_server = ReadonlyServer()
 
672
                self.__readonly_server.setUp(self.__server)
 
673
            else:
 
674
                self.__readonly_server = self.transport_readonly_server()
 
675
                self.__readonly_server.setUp()
 
676
            self.addCleanup(self.__readonly_server.tearDown)
 
677
        base = self.__readonly_server.get_url()
 
678
        if relpath is not None:
 
679
            if not base.endswith('/'):
 
680
                base = base + '/'
 
681
            base = base + relpath
 
682
        return base
 
683
 
 
684
    def get_url(self, relpath=None):
 
685
        """Get a URL for the readwrite transport.
 
686
 
 
687
        This will either be backed by '.' or to an equivalent non-file based
 
688
        facility.
 
689
        relpath provides for clients to get a path relative to the base url.
 
690
        These should only be downwards relative, not upwards.
 
691
        """
 
692
        if self.__server is None:
 
693
            self.__server = self.transport_server()
 
694
            self.__server.setUp()
 
695
            self.addCleanup(self.__server.tearDown)
 
696
        base = self.__server.get_url()
 
697
        if relpath is not None and relpath != '.':
 
698
            if not base.endswith('/'):
 
699
                base = base + '/'
 
700
            base = base + relpath
 
701
        return base
 
702
 
 
703
    def make_branch(self, relpath):
 
704
        """Create a branch on the transport at relpath."""
 
705
        try:
 
706
            url = self.get_url(relpath)
 
707
            segments = relpath.split('/')
 
708
            if segments and segments[-1] not in ('', '.'):
 
709
                parent = self.get_url('/'.join(segments[:-1]))
 
710
                t = bzrlib.transport.get_transport(parent)
 
711
                try:
 
712
                    t.mkdir(segments[-1])
 
713
                except FileExists:
 
714
                    pass
 
715
            return bzrlib.branch.Branch.create(url)
 
716
        except UninitializableFormat:
 
717
            raise TestSkipped("Format %s is not initializable.")
 
718
 
 
719
    def make_branch_and_tree(self, relpath):
 
720
        """Create a branch on the transport and a tree locally.
 
721
 
 
722
        Returns the tree.
 
723
        """
 
724
        b = self.make_branch(relpath)
 
725
        return WorkingTree.create(b, relpath)
 
726
 
 
727
 
 
728
class ChrootedTestCase(TestCaseWithTransport):
 
729
    """A support class that provides readonly urls outside the local namespace.
 
730
 
 
731
    This is done by checking if self.transport_server is a MemoryServer. if it
 
732
    is then we are chrooted already, if it is not then an HttpServer is used
 
733
    for readonly urls.
 
734
 
 
735
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
736
                       be used without needed to redo it when a different 
 
737
                       subclass is in use ?
 
738
    """
 
739
 
 
740
    def setUp(self):
 
741
        super(ChrootedTestCase, self).setUp()
 
742
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
743
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
744
 
 
745
 
 
746
def filter_suite_by_re(suite, pattern):
 
747
    result = TestSuite()
 
748
    filter_re = re.compile(pattern)
 
749
    for test in iter_suite_tests(suite):
 
750
        if filter_re.search(test.id()):
 
751
            result.addTest(test)
 
752
    return result
 
753
 
 
754
 
 
755
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
756
              stop_on_failure=False, keep_output=False,
 
757
              transport=None):
 
758
    TestCaseInTempDir._TEST_NAME = name
 
759
    if verbose:
 
760
        verbosity = 2
 
761
    else:
 
762
        verbosity = 1
 
763
    runner = TextTestRunner(stream=sys.stdout,
 
764
                            descriptions=0,
 
765
                            verbosity=verbosity)
 
766
    runner.stop_on_failure=stop_on_failure
 
767
    if pattern != '.*':
 
768
        suite = filter_suite_by_re(suite, pattern)
 
769
    result = runner.run(suite)
 
770
    # This is still a little bogus, 
 
771
    # but only a little. Folk not using our testrunner will
 
772
    # have to delete their temp directories themselves.
 
773
    if result.wasSuccessful() or not keep_output:
 
774
        if TestCaseInTempDir.TEST_ROOT is not None:
 
775
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
776
    else:
 
777
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
778
    return result.wasSuccessful()
 
779
 
 
780
 
 
781
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
782
             keep_output=False,
 
783
             transport=None):
 
784
    """Run the whole test suite under the enhanced runner"""
 
785
    global default_transport
 
786
    if transport is None:
 
787
        transport = default_transport
 
788
    old_transport = default_transport
 
789
    default_transport = transport
 
790
    suite = test_suite()
 
791
    try:
 
792
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
793
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
794
                     transport=transport)
 
795
    finally:
 
796
        default_transport = old_transport
 
797
 
 
798
 
 
799
 
 
800
def test_suite():
 
801
    """Build and return TestSuite for the whole program."""
39
802
    from doctest import DocTestSuite
40
 
    import os
41
 
    import shutil
42
 
    import time
43
 
    import sys
44
 
    import unittest
45
 
 
46
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
47
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
48
 
        if m not in MODULES_TO_DOCTEST:
49
 
            MODULES_TO_DOCTEST.append(m)
50
 
            
51
 
    for m in (bzrlib.selftest.whitebox,
52
 
              bzrlib.selftest.versioning,
53
 
              bzrlib.selftest.testinv,
54
 
              bzrlib.selftest.testmerge3,
55
 
              bzrlib.selftest.testhashcache,
56
 
              bzrlib.selftest.teststatus,
57
 
              bzrlib.selftest.blackbox,
58
 
              bzrlib.selftest.testhashcache,
59
 
              bzrlib.selftest.testrevisionnamespaces,
60
 
              bzrlib.selftest.testbranch,
61
 
              ):
62
 
        if m not in MODULES_TO_TEST:
63
 
            MODULES_TO_TEST.append(m)
64
 
 
65
 
 
66
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
67
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
68
 
 
 
803
 
 
804
    global MODULES_TO_DOCTEST
 
805
 
 
806
    testmod_names = [ \
 
807
                   'bzrlib.tests.test_ancestry',
 
808
                   'bzrlib.tests.test_annotate',
 
809
                   'bzrlib.tests.test_api',
 
810
                   'bzrlib.tests.test_bad_files',
 
811
                   'bzrlib.tests.test_basis_inventory',
 
812
                   'bzrlib.tests.test_branch',
 
813
                   'bzrlib.tests.test_command',
 
814
                   'bzrlib.tests.test_commit',
 
815
                   'bzrlib.tests.test_commit_merge',
 
816
                   'bzrlib.tests.test_config',
 
817
                   'bzrlib.tests.test_conflicts',
 
818
                   'bzrlib.tests.test_diff',
 
819
                   'bzrlib.tests.test_decorators',
 
820
                   'bzrlib.tests.test_fetch',
 
821
                   'bzrlib.tests.test_fileid_involved',
 
822
                   'bzrlib.tests.test_gpg',
 
823
                   'bzrlib.tests.test_graph',
 
824
                   'bzrlib.tests.test_hashcache',
 
825
                   'bzrlib.tests.test_http',
 
826
                   'bzrlib.tests.test_identitymap',
 
827
                   'bzrlib.tests.test_inv',
 
828
                   'bzrlib.tests.test_lockable_files',
 
829
                   'bzrlib.tests.test_log',
 
830
                   'bzrlib.tests.test_merge',
 
831
                   'bzrlib.tests.test_merge3',
 
832
                   'bzrlib.tests.test_merge_core',
 
833
                   'bzrlib.tests.test_missing',
 
834
                   'bzrlib.tests.test_msgeditor',
 
835
                   'bzrlib.tests.test_nonascii',
 
836
                   'bzrlib.tests.test_options',
 
837
                   'bzrlib.tests.test_osutils',
 
838
                   'bzrlib.tests.test_parent',
 
839
                   'bzrlib.tests.test_permissions',
 
840
                   'bzrlib.tests.test_plugins',
 
841
                   'bzrlib.tests.test_revision',
 
842
                   'bzrlib.tests.test_revisionnamespaces',
 
843
                   'bzrlib.tests.test_revprops',
 
844
                   'bzrlib.tests.test_reweave',
 
845
                   'bzrlib.tests.test_rio',
 
846
                   'bzrlib.tests.test_sampler',
 
847
                   'bzrlib.tests.test_selftest',
 
848
                   'bzrlib.tests.test_setup',
 
849
                   'bzrlib.tests.test_sftp_transport',
 
850
                   'bzrlib.tests.test_smart_add',
 
851
                   'bzrlib.tests.test_source',
 
852
                   'bzrlib.tests.test_store',
 
853
                   'bzrlib.tests.test_symbol_versioning',
 
854
                   'bzrlib.tests.test_testament',
 
855
                   'bzrlib.tests.test_trace',
 
856
                   'bzrlib.tests.test_transactions',
 
857
                   'bzrlib.tests.test_transport',
 
858
                   'bzrlib.tests.test_tsort',
 
859
                   'bzrlib.tests.test_ui',
 
860
                   'bzrlib.tests.test_uncommit',
 
861
                   'bzrlib.tests.test_upgrade',
 
862
                   'bzrlib.tests.test_weave',
 
863
                   'bzrlib.tests.test_whitebox',
 
864
                   'bzrlib.tests.test_workingtree',
 
865
                   'bzrlib.tests.test_xml',
 
866
                   ]
 
867
    test_transport_implementations = [
 
868
        'bzrlib.tests.test_transport_implementations']
 
869
 
 
870
    TestCase.BZRPATH = osutils.pathjoin(
 
871
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
 
872
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
 
873
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
69
874
    print
70
 
 
71
875
    suite = TestSuite()
72
 
 
73
 
    # should also test bzrlib.merge_core, but they seem to be out of date with
74
 
    # the code.
75
 
 
76
 
 
77
 
    # XXX: python2.3's TestLoader() doesn't seem to find all the
78
 
    # tests; don't know why
 
876
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
 
877
    # errors if it fails to load a named module - no indication of what's
 
878
    # actually wrong, just "no such module".  We should probably override that
 
879
    # class, but for the moment just load them ourselves. (mbp 20051202)
 
880
    loader = TestLoader()
 
881
    from bzrlib.transport import TransportTestProviderAdapter
 
882
    adapter = TransportTestProviderAdapter()
 
883
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
884
    for mod_name in testmod_names:
 
885
        mod = _load_module_by_name(mod_name)
 
886
        suite.addTest(loader.loadTestsFromModule(mod))
 
887
    for package in packages_to_test():
 
888
        suite.addTest(package.test_suite())
79
889
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
 
890
        suite.addTest(loader.loadTestsFromModule(m))
82
891
    for m in (MODULES_TO_DOCTEST):
83
892
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
90
 
 
91
 
    return run_suite(suite, 'testbzr')
92
 
 
93
 
 
94
 
 
 
893
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
894
        if getattr(plugin, 'test_suite', None) is not None:
 
895
            suite.addTest(plugin.test_suite())
 
896
    return suite
 
897
 
 
898
 
 
899
def adapt_modules(mods_list, adapter, loader, suite):
 
900
    """Adapt the modules in mods_list using adapter and add to suite."""
 
901
    for mod_name in mods_list:
 
902
        mod = _load_module_by_name(mod_name)
 
903
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
 
904
            suite.addTests(adapter.adapt(test))
 
905
 
 
906
 
 
907
def _load_module_by_name(mod_name):
 
908
    parts = mod_name.split('.')
 
909
    module = __import__(mod_name)
 
910
    del parts[0]
 
911
    # for historical reasons python returns the top-level module even though
 
912
    # it loads the submodule; we need to walk down to get the one we want.
 
913
    while parts:
 
914
        module = getattr(module, parts.pop(0))
 
915
    return module