~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: mbp at sourcefrog
  • Date: 2005-04-04 13:57:54 UTC
  • Revision ID: mbp@sourcefrog.net-20050404135754-ae2e4e5fb0094c91
- Write .bzr.log in utf8

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
2
 
 
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
 
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
 
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
23
 
 
24
 
# NOTE: Some classes in here use camelCaseNaming() rather than
25
 
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
 
# new assertFoo() methods.
28
 
 
29
 
import codecs
30
 
from cStringIO import StringIO
31
 
import difflib
32
 
import errno
33
 
import logging
34
 
import os
35
 
import re
36
 
import shutil
37
 
import stat
38
 
import sys
39
 
import tempfile
40
 
import unittest
41
 
import time
42
 
 
43
 
 
44
 
import bzrlib.branch
45
 
import bzrlib.bzrdir as bzrdir
46
 
import bzrlib.commands
47
 
import bzrlib.errors as errors
48
 
import bzrlib.inventory
49
 
import bzrlib.iterablefile
50
 
import bzrlib.lockdir
51
 
import bzrlib.merge3
52
 
import bzrlib.osutils
53
 
import bzrlib.osutils as osutils
54
 
import bzrlib.plugin
55
 
import bzrlib.store
56
 
import bzrlib.trace
57
 
from bzrlib.transport import urlescape, get_transport
58
 
import bzrlib.transport
59
 
from bzrlib.transport.local import LocalRelpathServer
60
 
from bzrlib.transport.readonly import ReadonlyServer
61
 
from bzrlib.trace import mutter
62
 
from bzrlib.tests.TestUtil import TestLoader, TestSuite
63
 
from bzrlib.tests.treeshape import build_tree_contents
64
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
65
 
 
66
 
default_transport = LocalRelpathServer
67
 
 
68
 
MODULES_TO_TEST = []
69
 
MODULES_TO_DOCTEST = [
70
 
                      bzrlib.branch,
71
 
                      bzrlib.commands,
72
 
                      bzrlib.errors,
73
 
                      bzrlib.inventory,
74
 
                      bzrlib.iterablefile,
75
 
                      bzrlib.lockdir,
76
 
                      bzrlib.merge3,
77
 
                      bzrlib.option,
78
 
                      bzrlib.osutils,
79
 
                      bzrlib.store
80
 
                      ]
81
 
def packages_to_test():
82
 
    """Return a list of packages to test.
83
 
 
84
 
    The packages are not globally imported so that import failures are
85
 
    triggered when running selftest, not when importing the command.
86
 
    """
87
 
    import bzrlib.doc
88
 
    import bzrlib.tests.blackbox
89
 
    import bzrlib.tests.branch_implementations
90
 
    import bzrlib.tests.bzrdir_implementations
91
 
    import bzrlib.tests.interrepository_implementations
92
 
    import bzrlib.tests.repository_implementations
93
 
    import bzrlib.tests.workingtree_implementations
94
 
    return [
95
 
            bzrlib.doc,
96
 
            bzrlib.tests.blackbox,
97
 
            bzrlib.tests.branch_implementations,
98
 
            bzrlib.tests.bzrdir_implementations,
99
 
            bzrlib.tests.interrepository_implementations,
100
 
            bzrlib.tests.repository_implementations,
101
 
            bzrlib.tests.workingtree_implementations,
102
 
            ]
103
 
 
104
 
 
105
 
class _MyResult(unittest._TextTestResult):
106
 
    """Custom TestResult.
107
 
 
108
 
    Shows output in a different format, including displaying runtime for tests.
109
 
    """
110
 
    stop_early = False
111
 
 
112
 
    def _elapsedTime(self):
113
 
        return "%5dms" % (1000 * (time.time() - self._start_time))
114
 
 
115
 
    def startTest(self, test):
116
 
        unittest.TestResult.startTest(self, test)
117
 
        # In a short description, the important words are in
118
 
        # the beginning, but in an id, the important words are
119
 
        # at the end
120
 
        SHOW_DESCRIPTIONS = False
121
 
        if self.showAll:
122
 
            width = osutils.terminal_width()
123
 
            name_width = width - 15
124
 
            what = None
125
 
            if SHOW_DESCRIPTIONS:
126
 
                what = test.shortDescription()
127
 
                if what:
128
 
                    if len(what) > name_width:
129
 
                        what = what[:name_width-3] + '...'
130
 
            if what is None:
131
 
                what = test.id()
132
 
                if what.startswith('bzrlib.tests.'):
133
 
                    what = what[13:]
134
 
                if len(what) > name_width:
135
 
                    what = '...' + what[3-name_width:]
136
 
            what = what.ljust(name_width)
137
 
            self.stream.write(what)
138
 
        self.stream.flush()
139
 
        self._start_time = time.time()
140
 
 
141
 
    def addError(self, test, err):
142
 
        if isinstance(err[1], TestSkipped):
143
 
            return self.addSkipped(test, err)    
144
 
        unittest.TestResult.addError(self, test, err)
145
 
        if self.showAll:
146
 
            self.stream.writeln("ERROR %s" % self._elapsedTime())
147
 
        elif self.dots:
148
 
            self.stream.write('E')
149
 
        self.stream.flush()
150
 
        if self.stop_early:
151
 
            self.stop()
152
 
 
153
 
    def addFailure(self, test, err):
154
 
        unittest.TestResult.addFailure(self, test, err)
155
 
        if self.showAll:
156
 
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
157
 
        elif self.dots:
158
 
            self.stream.write('F')
159
 
        self.stream.flush()
160
 
        if self.stop_early:
161
 
            self.stop()
162
 
 
163
 
    def addSuccess(self, test):
164
 
        if self.showAll:
165
 
            self.stream.writeln('   OK %s' % self._elapsedTime())
166
 
        elif self.dots:
167
 
            self.stream.write('~')
168
 
        self.stream.flush()
169
 
        unittest.TestResult.addSuccess(self, test)
170
 
 
171
 
    def addSkipped(self, test, skip_excinfo):
172
 
        if self.showAll:
173
 
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
174
 
            print >>self.stream, '     %s' % skip_excinfo[1]
175
 
        elif self.dots:
176
 
            self.stream.write('S')
177
 
        self.stream.flush()
178
 
        # seems best to treat this as success from point-of-view of unittest
179
 
        # -- it actually does nothing so it barely matters :)
180
 
        unittest.TestResult.addSuccess(self, test)
181
 
 
182
 
    def printErrorList(self, flavour, errors):
183
 
        for test, err in errors:
184
 
            self.stream.writeln(self.separator1)
185
 
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
186
 
            if getattr(test, '_get_log', None) is not None:
187
 
                print >>self.stream
188
 
                print >>self.stream, \
189
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
190
 
                print >>self.stream, test._get_log()
191
 
                print >>self.stream, \
192
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
193
 
            self.stream.writeln(self.separator2)
194
 
            self.stream.writeln("%s" % err)
195
 
 
196
 
 
197
 
class TextTestRunner(unittest.TextTestRunner):
198
 
    stop_on_failure = False
199
 
 
200
 
    def _makeResult(self):
201
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
202
 
        result.stop_early = self.stop_on_failure
203
 
        return result
204
 
 
205
 
 
206
 
def iter_suite_tests(suite):
207
 
    """Return all tests in a suite, recursing through nested suites"""
208
 
    for item in suite._tests:
209
 
        if isinstance(item, unittest.TestCase):
210
 
            yield item
211
 
        elif isinstance(item, unittest.TestSuite):
212
 
            for r in iter_suite_tests(item):
213
 
                yield r
214
 
        else:
215
 
            raise Exception('unknown object %r inside test suite %r'
216
 
                            % (item, suite))
217
 
 
218
 
 
219
 
class TestSkipped(Exception):
220
 
    """Indicates that a test was intentionally skipped, rather than failing."""
221
 
    # XXX: Not used yet
222
 
 
223
 
 
224
 
class CommandFailed(Exception):
225
 
    pass
226
 
 
227
 
class TestCase(unittest.TestCase):
228
 
    """Base class for bzr unit tests.
229
 
    
230
 
    Tests that need access to disk resources should subclass 
231
 
    TestCaseInTempDir not TestCase.
232
 
 
233
 
    Error and debug log messages are redirected from their usual
234
 
    location into a temporary file, the contents of which can be
235
 
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
236
 
    so that it can also capture file IO.  When the test completes this file
237
 
    is read into memory and removed from disk.
238
 
       
239
 
    There are also convenience functions to invoke bzr's command-line
240
 
    routine, and to build and check bzr trees.
241
 
   
242
 
    In addition to the usual method of overriding tearDown(), this class also
243
 
    allows subclasses to register functions into the _cleanups list, which is
244
 
    run in order as the object is torn down.  It's less likely this will be
245
 
    accidentally overlooked.
246
 
    """
247
 
 
248
 
    BZRPATH = 'bzr'
249
 
    _log_file_name = None
250
 
    _log_contents = ''
251
 
 
252
 
    def __init__(self, methodName='testMethod'):
253
 
        super(TestCase, self).__init__(methodName)
254
 
        self._cleanups = []
255
 
 
256
 
    def setUp(self):
257
 
        unittest.TestCase.setUp(self)
258
 
        self._cleanEnvironment()
259
 
        bzrlib.trace.disable_default_logging()
260
 
        self._startLogFile()
261
 
 
262
 
    def _ndiff_strings(self, a, b):
263
 
        """Return ndiff between two strings containing lines.
264
 
        
265
 
        A trailing newline is added if missing to make the strings
266
 
        print properly."""
267
 
        if b and b[-1] != '\n':
268
 
            b += '\n'
269
 
        if a and a[-1] != '\n':
270
 
            a += '\n'
271
 
        difflines = difflib.ndiff(a.splitlines(True),
272
 
                                  b.splitlines(True),
273
 
                                  linejunk=lambda x: False,
274
 
                                  charjunk=lambda x: False)
275
 
        return ''.join(difflines)
276
 
 
277
 
    def assertEqualDiff(self, a, b, message=None):
278
 
        """Assert two texts are equal, if not raise an exception.
279
 
        
280
 
        This is intended for use with multi-line strings where it can 
281
 
        be hard to find the differences by eye.
282
 
        """
283
 
        # TODO: perhaps override assertEquals to call this for strings?
284
 
        if a == b:
285
 
            return
286
 
        if message is None:
287
 
            message = "texts not equal:\n"
288
 
        raise AssertionError(message + 
289
 
                             self._ndiff_strings(a, b))      
290
 
        
291
 
    def assertEqualMode(self, mode, mode_test):
292
 
        self.assertEqual(mode, mode_test,
293
 
                         'mode mismatch %o != %o' % (mode, mode_test))
294
 
 
295
 
    def assertStartsWith(self, s, prefix):
296
 
        if not s.startswith(prefix):
297
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
298
 
 
299
 
    def assertEndsWith(self, s, suffix):
300
 
        if not s.endswith(prefix):
301
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
302
 
 
303
 
    def assertContainsRe(self, haystack, needle_re):
304
 
        """Assert that a contains something matching a regular expression."""
305
 
        if not re.search(needle_re, haystack):
306
 
            raise AssertionError('pattern "%s" not found in "%s"'
307
 
                    % (needle_re, haystack))
308
 
 
309
 
    def assertSubset(self, sublist, superlist):
310
 
        """Assert that every entry in sublist is present in superlist."""
311
 
        missing = []
312
 
        for entry in sublist:
313
 
            if entry not in superlist:
314
 
                missing.append(entry)
315
 
        if len(missing) > 0:
316
 
            raise AssertionError("value(s) %r not present in container %r" % 
317
 
                                 (missing, superlist))
318
 
 
319
 
    def assertIs(self, left, right):
320
 
        if not (left is right):
321
 
            raise AssertionError("%r is not %r." % (left, right))
322
 
 
323
 
    def assertTransportMode(self, transport, path, mode):
324
 
        """Fail if a path does not have mode mode.
325
 
        
326
 
        If modes are not supported on this platform, the test is skipped.
327
 
        """
328
 
        if sys.platform == 'win32':
329
 
            return
330
 
        path_stat = transport.stat(path)
331
 
        actual_mode = stat.S_IMODE(path_stat.st_mode)
332
 
        self.assertEqual(mode, actual_mode,
333
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
334
 
 
335
 
    def _startLogFile(self):
336
 
        """Send bzr and test log messages to a temporary file.
337
 
 
338
 
        The file is removed as the test is torn down.
339
 
        """
340
 
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
341
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
342
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
343
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
344
 
        self._log_file_name = name
345
 
        self.addCleanup(self._finishLogFile)
346
 
 
347
 
    def _finishLogFile(self):
348
 
        """Finished with the log file.
349
 
 
350
 
        Read contents into memory, close, and delete.
351
 
        """
352
 
        bzrlib.trace.disable_test_log(self._log_nonce)
353
 
        self._log_file.seek(0)
354
 
        self._log_contents = self._log_file.read()
355
 
        self._log_file.close()
356
 
        os.remove(self._log_file_name)
357
 
        self._log_file = self._log_file_name = None
358
 
 
359
 
    def addCleanup(self, callable):
360
 
        """Arrange to run a callable when this case is torn down.
361
 
 
362
 
        Callables are run in the reverse of the order they are registered, 
363
 
        ie last-in first-out.
364
 
        """
365
 
        if callable in self._cleanups:
366
 
            raise ValueError("cleanup function %r already registered on %s" 
367
 
                    % (callable, self))
368
 
        self._cleanups.append(callable)
369
 
 
370
 
    def _cleanEnvironment(self):
371
 
        new_env = {
372
 
            'HOME': os.getcwd(),
373
 
            'APPDATA': os.getcwd(),
374
 
            'BZREMAIL': None,
375
 
            'EMAIL': None,
376
 
        }
377
 
        self.__old_env = {}
378
 
        self.addCleanup(self._restoreEnvironment)
379
 
        for name, value in new_env.iteritems():
380
 
            self._captureVar(name, value)
381
 
 
382
 
 
383
 
    def _captureVar(self, name, newvalue):
384
 
        """Set an environment variable, preparing it to be reset when finished."""
385
 
        self.__old_env[name] = os.environ.get(name, None)
386
 
        if newvalue is None:
387
 
            if name in os.environ:
388
 
                del os.environ[name]
389
 
        else:
390
 
            os.environ[name] = newvalue
391
 
 
392
 
    @staticmethod
393
 
    def _restoreVar(name, value):
394
 
        if value is None:
395
 
            if name in os.environ:
396
 
                del os.environ[name]
397
 
        else:
398
 
            os.environ[name] = value
399
 
 
400
 
    def _restoreEnvironment(self):
401
 
        for name, value in self.__old_env.iteritems():
402
 
            self._restoreVar(name, value)
403
 
 
404
 
    def tearDown(self):
405
 
        self._runCleanups()
406
 
        unittest.TestCase.tearDown(self)
407
 
 
408
 
    def _runCleanups(self):
409
 
        """Run registered cleanup functions. 
410
 
 
411
 
        This should only be called from TestCase.tearDown.
412
 
        """
413
 
        # TODO: Perhaps this should keep running cleanups even if 
414
 
        # one of them fails?
415
 
        for cleanup_fn in reversed(self._cleanups):
416
 
            cleanup_fn()
417
 
 
418
 
    def log(self, *args):
419
 
        mutter(*args)
420
 
 
421
 
    def _get_log(self):
422
 
        """Return as a string the log for this test"""
423
 
        if self._log_file_name:
424
 
            return open(self._log_file_name).read()
425
 
        else:
426
 
            return self._log_contents
427
 
        # TODO: Delete the log after it's been read in
428
 
 
429
 
    def capture(self, cmd, retcode=0):
430
 
        """Shortcut that splits cmd into words, runs, and returns stdout"""
431
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
432
 
 
433
 
    def run_bzr_captured(self, argv, retcode=0):
434
 
        """Invoke bzr and return (stdout, stderr).
435
 
 
436
 
        Useful for code that wants to check the contents of the
437
 
        output, the way error messages are presented, etc.
438
 
 
439
 
        This should be the main method for tests that want to exercise the
440
 
        overall behavior of the bzr application (rather than a unit test
441
 
        or a functional test of the library.)
442
 
 
443
 
        Much of the old code runs bzr by forking a new copy of Python, but
444
 
        that is slower, harder to debug, and generally not necessary.
445
 
 
446
 
        This runs bzr through the interface that catches and reports
447
 
        errors, and with logging set to something approximating the
448
 
        default, so that error reporting can be checked.
449
 
 
450
 
        argv -- arguments to invoke bzr
451
 
        retcode -- expected return code, or None for don't-care.
452
 
        """
453
 
        stdout = StringIO()
454
 
        stderr = StringIO()
455
 
        self.log('run bzr: %s', ' '.join(argv))
456
 
        # FIXME: don't call into logging here
457
 
        handler = logging.StreamHandler(stderr)
458
 
        handler.setFormatter(bzrlib.trace.QuietFormatter())
459
 
        handler.setLevel(logging.INFO)
460
 
        logger = logging.getLogger('')
461
 
        logger.addHandler(handler)
462
 
        try:
463
 
            result = self.apply_redirected(None, stdout, stderr,
464
 
                                           bzrlib.commands.run_bzr_catch_errors,
465
 
                                           argv)
466
 
        finally:
467
 
            logger.removeHandler(handler)
468
 
        out = stdout.getvalue()
469
 
        err = stderr.getvalue()
470
 
        if out:
471
 
            self.log('output:\n%s', out)
472
 
        if err:
473
 
            self.log('errors:\n%s', err)
474
 
        if retcode is not None:
475
 
            self.assertEquals(result, retcode)
476
 
        return out, err
477
 
 
478
 
    def run_bzr(self, *args, **kwargs):
479
 
        """Invoke bzr, as if it were run from the command line.
480
 
 
481
 
        This should be the main method for tests that want to exercise the
482
 
        overall behavior of the bzr application (rather than a unit test
483
 
        or a functional test of the library.)
484
 
 
485
 
        This sends the stdout/stderr results into the test's log,
486
 
        where it may be useful for debugging.  See also run_captured.
487
 
        """
488
 
        retcode = kwargs.pop('retcode', 0)
489
 
        return self.run_bzr_captured(args, retcode)
490
 
 
491
 
    def check_inventory_shape(self, inv, shape):
492
 
        """Compare an inventory to a list of expected names.
493
 
 
494
 
        Fail if they are not precisely equal.
495
 
        """
496
 
        extras = []
497
 
        shape = list(shape)             # copy
498
 
        for path, ie in inv.entries():
499
 
            name = path.replace('\\', '/')
500
 
            if ie.kind == 'dir':
501
 
                name = name + '/'
502
 
            if name in shape:
503
 
                shape.remove(name)
504
 
            else:
505
 
                extras.append(name)
506
 
        if shape:
507
 
            self.fail("expected paths not found in inventory: %r" % shape)
508
 
        if extras:
509
 
            self.fail("unexpected paths found in inventory: %r" % extras)
510
 
 
511
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
512
 
                         a_callable=None, *args, **kwargs):
513
 
        """Call callable with redirected std io pipes.
514
 
 
515
 
        Returns the return code."""
516
 
        if not callable(a_callable):
517
 
            raise ValueError("a_callable must be callable.")
518
 
        if stdin is None:
519
 
            stdin = StringIO("")
520
 
        if stdout is None:
521
 
            if getattr(self, "_log_file", None) is not None:
522
 
                stdout = self._log_file
523
 
            else:
524
 
                stdout = StringIO()
525
 
        if stderr is None:
526
 
            if getattr(self, "_log_file", None is not None):
527
 
                stderr = self._log_file
528
 
            else:
529
 
                stderr = StringIO()
530
 
        real_stdin = sys.stdin
531
 
        real_stdout = sys.stdout
532
 
        real_stderr = sys.stderr
533
 
        try:
534
 
            sys.stdout = stdout
535
 
            sys.stderr = stderr
536
 
            sys.stdin = stdin
537
 
            return a_callable(*args, **kwargs)
538
 
        finally:
539
 
            sys.stdout = real_stdout
540
 
            sys.stderr = real_stderr
541
 
            sys.stdin = real_stdin
542
 
 
543
 
 
544
 
BzrTestBase = TestCase
545
 
 
546
 
     
547
 
class TestCaseInTempDir(TestCase):
548
 
    """Derived class that runs a test within a temporary directory.
549
 
 
550
 
    This is useful for tests that need to create a branch, etc.
551
 
 
552
 
    The directory is created in a slightly complex way: for each
553
 
    Python invocation, a new temporary top-level directory is created.
554
 
    All test cases create their own directory within that.  If the
555
 
    tests complete successfully, the directory is removed.
556
 
 
557
 
    InTempDir is an old alias for FunctionalTestCase.
558
 
    """
559
 
 
560
 
    TEST_ROOT = None
561
 
    _TEST_NAME = 'test'
562
 
    OVERRIDE_PYTHON = 'python'
563
 
 
564
 
    def check_file_contents(self, filename, expect):
565
 
        self.log("check contents of file %s" % filename)
566
 
        contents = file(filename, 'r').read()
567
 
        if contents != expect:
568
 
            self.log("expected: %r" % expect)
569
 
            self.log("actually: %r" % contents)
570
 
            self.fail("contents of %s not as expected" % filename)
571
 
 
572
 
    def _make_test_root(self):
573
 
        if TestCaseInTempDir.TEST_ROOT is not None:
574
 
            return
575
 
        i = 0
576
 
        while True:
577
 
            root = u'test%04d.tmp' % i
578
 
            try:
579
 
                os.mkdir(root)
580
 
            except OSError, e:
581
 
                if e.errno == errno.EEXIST:
582
 
                    i += 1
583
 
                    continue
584
 
                else:
585
 
                    raise
586
 
            # successfully created
587
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
588
 
            break
589
 
        # make a fake bzr directory there to prevent any tests propagating
590
 
        # up onto the source directory's real branch
591
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
592
 
 
593
 
    def setUp(self):
594
 
        super(TestCaseInTempDir, self).setUp()
595
 
        self._make_test_root()
596
 
        _currentdir = os.getcwdu()
597
 
        short_id = self.id().replace('bzrlib.tests.', '') \
598
 
                   .replace('__main__.', '')
599
 
        self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
600
 
        os.mkdir(self.test_dir)
601
 
        os.chdir(self.test_dir)
602
 
        os.environ['HOME'] = self.test_dir
603
 
        os.environ['APPDATA'] = self.test_dir
604
 
        def _leaveDirectory():
605
 
            os.chdir(_currentdir)
606
 
        self.addCleanup(_leaveDirectory)
607
 
        
608
 
    def build_tree(self, shape, line_endings='native', transport=None):
609
 
        """Build a test tree according to a pattern.
610
 
 
611
 
        shape is a sequence of file specifications.  If the final
612
 
        character is '/', a directory is created.
613
 
 
614
 
        This doesn't add anything to a branch.
615
 
        :param line_endings: Either 'binary' or 'native'
616
 
                             in binary mode, exact contents are written
617
 
                             in native mode, the line endings match the
618
 
                             default platform endings.
619
 
 
620
 
        :param transport: A transport to write to, for building trees on 
621
 
                          VFS's. If the transport is readonly or None,
622
 
                          "." is opened automatically.
623
 
        """
624
 
        # XXX: It's OK to just create them using forward slashes on windows?
625
 
        if transport is None or transport.is_readonly():
626
 
            transport = get_transport(".")
627
 
        for name in shape:
628
 
            self.assert_(isinstance(name, basestring))
629
 
            if name[-1] == '/':
630
 
                transport.mkdir(urlescape(name[:-1]))
631
 
            else:
632
 
                if line_endings == 'binary':
633
 
                    end = '\n'
634
 
                elif line_endings == 'native':
635
 
                    end = os.linesep
636
 
                else:
637
 
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
638
 
                content = "contents of %s%s" % (name, end)
639
 
                transport.put(urlescape(name), StringIO(content))
640
 
 
641
 
    def build_tree_contents(self, shape):
642
 
        build_tree_contents(shape)
643
 
 
644
 
    def failUnlessExists(self, path):
645
 
        """Fail unless path, which may be abs or relative, exists."""
646
 
        self.failUnless(osutils.lexists(path))
647
 
 
648
 
    def failIfExists(self, path):
649
 
        """Fail if path, which may be abs or relative, exists."""
650
 
        self.failIf(osutils.lexists(path))
651
 
        
652
 
    def assertFileEqual(self, content, path):
653
 
        """Fail if path does not contain 'content'."""
654
 
        self.failUnless(osutils.lexists(path))
655
 
        self.assertEqualDiff(content, open(path, 'r').read())
656
 
 
657
 
 
658
 
class TestCaseWithTransport(TestCaseInTempDir):
659
 
    """A test case that provides get_url and get_readonly_url facilities.
660
 
 
661
 
    These back onto two transport servers, one for readonly access and one for
662
 
    read write access.
663
 
 
664
 
    If no explicit class is provided for readonly access, a
665
 
    ReadonlyTransportDecorator is used instead which allows the use of non disk
666
 
    based read write transports.
667
 
 
668
 
    If an explicit class is provided for readonly access, that server and the 
669
 
    readwrite one must both define get_url() as resolving to os.getcwd().
670
 
    """
671
 
 
672
 
    def __init__(self, methodName='testMethod'):
673
 
        super(TestCaseWithTransport, self).__init__(methodName)
674
 
        self.__readonly_server = None
675
 
        self.__server = None
676
 
        self.transport_server = default_transport
677
 
        self.transport_readonly_server = None
678
 
 
679
 
    def get_readonly_url(self, relpath=None):
680
 
        """Get a URL for the readonly transport.
681
 
 
682
 
        This will either be backed by '.' or a decorator to the transport 
683
 
        used by self.get_url()
684
 
        relpath provides for clients to get a path relative to the base url.
685
 
        These should only be downwards relative, not upwards.
686
 
        """
687
 
        base = self.get_readonly_server().get_url()
688
 
        if relpath is not None:
689
 
            if not base.endswith('/'):
690
 
                base = base + '/'
691
 
            base = base + relpath
692
 
        return base
693
 
 
694
 
    def get_readonly_server(self):
695
 
        """Get the server instance for the readonly transport
696
 
 
697
 
        This is useful for some tests with specific servers to do diagnostics.
698
 
        """
699
 
        if self.__readonly_server is None:
700
 
            if self.transport_readonly_server is None:
701
 
                # readonly decorator requested
702
 
                # bring up the server
703
 
                self.get_url()
704
 
                self.__readonly_server = ReadonlyServer()
705
 
                self.__readonly_server.setUp(self.__server)
706
 
            else:
707
 
                self.__readonly_server = self.transport_readonly_server()
708
 
                self.__readonly_server.setUp()
709
 
            self.addCleanup(self.__readonly_server.tearDown)
710
 
        return self.__readonly_server
711
 
 
712
 
    def get_server(self):
713
 
        """Get the read/write server instance.
714
 
 
715
 
        This is useful for some tests with specific servers that need
716
 
        diagnostics.
717
 
        """
718
 
        if self.__server is None:
719
 
            self.__server = self.transport_server()
720
 
            self.__server.setUp()
721
 
            self.addCleanup(self.__server.tearDown)
722
 
        return self.__server
723
 
 
724
 
    def get_url(self, relpath=None):
725
 
        """Get a URL for the readwrite transport.
726
 
 
727
 
        This will either be backed by '.' or to an equivalent non-file based
728
 
        facility.
729
 
        relpath provides for clients to get a path relative to the base url.
730
 
        These should only be downwards relative, not upwards.
731
 
        """
732
 
        base = self.get_server().get_url()
733
 
        if relpath is not None and relpath != '.':
734
 
            if not base.endswith('/'):
735
 
                base = base + '/'
736
 
            base = base + relpath
737
 
        return base
738
 
 
739
 
    def get_transport(self):
740
 
        """Return a writeable transport for the test scratch space"""
741
 
        t = get_transport(self.get_url())
742
 
        self.assertFalse(t.is_readonly())
743
 
        return t
744
 
 
745
 
    def get_readonly_transport(self):
746
 
        """Return a readonly transport for the test scratch space
747
 
        
748
 
        This can be used to test that operations which should only need
749
 
        readonly access in fact do not try to write.
750
 
        """
751
 
        t = get_transport(self.get_readonly_url())
752
 
        self.assertTrue(t.is_readonly())
753
 
        return t
754
 
 
755
 
    def make_branch(self, relpath):
756
 
        """Create a branch on the transport at relpath."""
757
 
        repo = self.make_repository(relpath)
758
 
        return repo.bzrdir.create_branch()
759
 
 
760
 
    def make_bzrdir(self, relpath):
761
 
        try:
762
 
            url = self.get_url(relpath)
763
 
            segments = relpath.split('/')
764
 
            if segments and segments[-1] not in ('', '.'):
765
 
                parent = self.get_url('/'.join(segments[:-1]))
766
 
                t = get_transport(parent)
767
 
                try:
768
 
                    t.mkdir(segments[-1])
769
 
                except errors.FileExists:
770
 
                    pass
771
 
            return bzrlib.bzrdir.BzrDir.create(url)
772
 
        except errors.UninitializableFormat:
773
 
            raise TestSkipped("Format %s is not initializable.")
774
 
 
775
 
    def make_repository(self, relpath, shared=False):
776
 
        """Create a repository on our default transport at relpath."""
777
 
        made_control = self.make_bzrdir(relpath)
778
 
        return made_control.create_repository(shared=shared)
779
 
 
780
 
    def make_branch_and_tree(self, relpath):
781
 
        """Create a branch on the transport and a tree locally.
782
 
 
783
 
        Returns the tree.
784
 
        """
785
 
        # TODO: always use the local disk path for the working tree,
786
 
        # this obviously requires a format that supports branch references
787
 
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
788
 
        # RBC 20060208
789
 
        b = self.make_branch(relpath)
790
 
        try:
791
 
            return b.bzrdir.create_workingtree()
792
 
        except errors.NotLocalUrl:
793
 
            # new formats - catch No tree error and create
794
 
            # a branch reference and a checkout.
795
 
            # old formats at that point - raise TestSkipped.
796
 
            # TODO: rbc 20060208
797
 
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
798
 
 
799
 
    def assertIsDirectory(self, relpath, transport):
800
 
        """Assert that relpath within transport is a directory.
801
 
 
802
 
        This may not be possible on all transports; in that case it propagates
803
 
        a TransportNotPossible.
804
 
        """
805
 
        try:
806
 
            mode = transport.stat(relpath).st_mode
807
 
        except errors.NoSuchFile:
808
 
            self.fail("path %s is not a directory; no such file"
809
 
                      % (relpath))
810
 
        if not stat.S_ISDIR(mode):
811
 
            self.fail("path %s is not a directory; has mode %#o"
812
 
                      % (relpath, mode))
813
 
 
814
 
 
815
 
class ChrootedTestCase(TestCaseWithTransport):
816
 
    """A support class that provides readonly urls outside the local namespace.
817
 
 
818
 
    This is done by checking if self.transport_server is a MemoryServer. if it
819
 
    is then we are chrooted already, if it is not then an HttpServer is used
820
 
    for readonly urls.
821
 
 
822
 
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
823
 
                       be used without needed to redo it when a different 
824
 
                       subclass is in use ?
825
 
    """
826
 
 
827
 
    def setUp(self):
828
 
        super(ChrootedTestCase, self).setUp()
829
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
830
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
831
 
 
832
 
 
833
 
def filter_suite_by_re(suite, pattern):
834
 
    result = TestSuite()
835
 
    filter_re = re.compile(pattern)
836
 
    for test in iter_suite_tests(suite):
837
 
        if filter_re.search(test.id()):
838
 
            result.addTest(test)
839
 
    return result
840
 
 
841
 
 
842
 
def run_suite(suite, name='test', verbose=False, pattern=".*",
843
 
              stop_on_failure=False, keep_output=False,
844
 
              transport=None):
845
 
    TestCaseInTempDir._TEST_NAME = name
846
 
    if verbose:
847
 
        verbosity = 2
848
 
    else:
849
 
        verbosity = 1
850
 
    runner = TextTestRunner(stream=sys.stdout,
851
 
                            descriptions=0,
852
 
                            verbosity=verbosity)
853
 
    runner.stop_on_failure=stop_on_failure
854
 
    if pattern != '.*':
855
 
        suite = filter_suite_by_re(suite, pattern)
856
 
    result = runner.run(suite)
857
 
    # This is still a little bogus, 
858
 
    # but only a little. Folk not using our testrunner will
859
 
    # have to delete their temp directories themselves.
860
 
    if result.wasSuccessful() or not keep_output:
861
 
        if TestCaseInTempDir.TEST_ROOT is not None:
862
 
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
863
 
    else:
864
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
865
 
    return result.wasSuccessful()
866
 
 
867
 
 
868
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
869
 
             keep_output=False,
870
 
             transport=None):
871
 
    """Run the whole test suite under the enhanced runner"""
872
 
    global default_transport
873
 
    if transport is None:
874
 
        transport = default_transport
875
 
    old_transport = default_transport
876
 
    default_transport = transport
877
 
    suite = test_suite()
878
 
    try:
879
 
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
880
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
881
 
                     transport=transport)
882
 
    finally:
883
 
        default_transport = old_transport
884
 
 
885
 
 
886
 
 
887
 
def test_suite():
888
 
    """Build and return TestSuite for the whole program."""
889
 
    from doctest import DocTestSuite
890
 
 
891
 
    global MODULES_TO_DOCTEST
892
 
 
893
 
    testmod_names = [ \
894
 
                   'bzrlib.tests.test_ancestry',
895
 
                   'bzrlib.tests.test_annotate',
896
 
                   'bzrlib.tests.test_api',
897
 
                   'bzrlib.tests.test_bad_files',
898
 
                   'bzrlib.tests.test_basis_inventory',
899
 
                   'bzrlib.tests.test_branch',
900
 
                   'bzrlib.tests.test_bzrdir',
901
 
                   'bzrlib.tests.test_command',
902
 
                   'bzrlib.tests.test_commit',
903
 
                   'bzrlib.tests.test_commit_merge',
904
 
                   'bzrlib.tests.test_config',
905
 
                   'bzrlib.tests.test_conflicts',
906
 
                   'bzrlib.tests.test_decorators',
907
 
                   'bzrlib.tests.test_diff',
908
 
                   'bzrlib.tests.test_doc_generate',
909
 
                   'bzrlib.tests.test_errors',
910
 
                   'bzrlib.tests.test_fetch',
911
 
                   'bzrlib.tests.test_gpg',
912
 
                   'bzrlib.tests.test_graph',
913
 
                   'bzrlib.tests.test_hashcache',
914
 
                   'bzrlib.tests.test_http',
915
 
                   'bzrlib.tests.test_identitymap',
916
 
                   'bzrlib.tests.test_inv',
917
 
                   'bzrlib.tests.test_lockdir',
918
 
                   'bzrlib.tests.test_lockable_files',
919
 
                   'bzrlib.tests.test_log',
920
 
                   'bzrlib.tests.test_merge',
921
 
                   'bzrlib.tests.test_merge3',
922
 
                   'bzrlib.tests.test_merge_core',
923
 
                   'bzrlib.tests.test_missing',
924
 
                   'bzrlib.tests.test_msgeditor',
925
 
                   'bzrlib.tests.test_nonascii',
926
 
                   'bzrlib.tests.test_options',
927
 
                   'bzrlib.tests.test_osutils',
928
 
                   'bzrlib.tests.test_permissions',
929
 
                   'bzrlib.tests.test_plugins',
930
 
                   'bzrlib.tests.test_reconcile',
931
 
                   'bzrlib.tests.test_repository',
932
 
                   'bzrlib.tests.test_revision',
933
 
                   'bzrlib.tests.test_revisionnamespaces',
934
 
                   'bzrlib.tests.test_revprops',
935
 
                   'bzrlib.tests.test_reweave',
936
 
                   'bzrlib.tests.test_rio',
937
 
                   'bzrlib.tests.test_sampler',
938
 
                   'bzrlib.tests.test_selftest',
939
 
                   'bzrlib.tests.test_setup',
940
 
                   'bzrlib.tests.test_sftp_transport',
941
 
                   'bzrlib.tests.test_smart_add',
942
 
                   'bzrlib.tests.test_source',
943
 
                   'bzrlib.tests.test_store',
944
 
                   'bzrlib.tests.test_symbol_versioning',
945
 
                   'bzrlib.tests.test_testament',
946
 
                   'bzrlib.tests.test_trace',
947
 
                   'bzrlib.tests.test_transactions',
948
 
                   'bzrlib.tests.test_transform',
949
 
                   'bzrlib.tests.test_transport',
950
 
                   'bzrlib.tests.test_tsort',
951
 
                   'bzrlib.tests.test_ui',
952
 
                   'bzrlib.tests.test_uncommit',
953
 
                   'bzrlib.tests.test_upgrade',
954
 
                   'bzrlib.tests.test_weave',
955
 
                   'bzrlib.tests.test_whitebox',
956
 
                   'bzrlib.tests.test_workingtree',
957
 
                   'bzrlib.tests.test_xml',
958
 
                   ]
959
 
    test_transport_implementations = [
960
 
        'bzrlib.tests.test_transport_implementations']
961
 
 
962
 
    TestCase.BZRPATH = osutils.pathjoin(
963
 
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
964
 
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
965
 
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
966
 
    print
967
 
    suite = TestSuite()
968
 
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
969
 
    # errors if it fails to load a named module - no indication of what's
970
 
    # actually wrong, just "no such module".  We should probably override that
971
 
    # class, but for the moment just load them ourselves. (mbp 20051202)
972
 
    loader = TestLoader()
973
 
    from bzrlib.transport import TransportTestProviderAdapter
974
 
    adapter = TransportTestProviderAdapter()
975
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
976
 
    for mod_name in testmod_names:
977
 
        mod = _load_module_by_name(mod_name)
978
 
        suite.addTest(loader.loadTestsFromModule(mod))
979
 
    for package in packages_to_test():
980
 
        suite.addTest(package.test_suite())
981
 
    for m in MODULES_TO_TEST:
982
 
        suite.addTest(loader.loadTestsFromModule(m))
983
 
    for m in (MODULES_TO_DOCTEST):
984
 
        suite.addTest(DocTestSuite(m))
985
 
    for name, plugin in bzrlib.plugin.all_plugins().items():
986
 
        if getattr(plugin, 'test_suite', None) is not None:
987
 
            suite.addTest(plugin.test_suite())
988
 
    return suite
989
 
 
990
 
 
991
 
def adapt_modules(mods_list, adapter, loader, suite):
992
 
    """Adapt the modules in mods_list using adapter and add to suite."""
993
 
    for mod_name in mods_list:
994
 
        mod = _load_module_by_name(mod_name)
995
 
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
996
 
            suite.addTests(adapter.adapt(test))
997
 
 
998
 
 
999
 
def _load_module_by_name(mod_name):
1000
 
    parts = mod_name.split('.')
1001
 
    module = __import__(mod_name)
1002
 
    del parts[0]
1003
 
    # for historical reasons python returns the top-level module even though
1004
 
    # it loads the submodule; we need to walk down to get the one we want.
1005
 
    while parts:
1006
 
        module = getattr(module, parts.pop(0))
1007
 
    return module