~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-04-10 05:26:37 UTC
  • mfrom: (1645.1.1 mergeui)
  • Revision ID: pqm@pqm.ubuntu.com-20060410052637-9c6229889bc13061
Implement single-file merge

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import errno
 
33
import logging
 
34
import os
 
35
import re
 
36
import shutil
 
37
import stat
 
38
import sys
 
39
import tempfile
 
40
import unittest
 
41
import time
 
42
 
 
43
 
 
44
import bzrlib.branch
 
45
import bzrlib.bzrdir as bzrdir
 
46
import bzrlib.commands
 
47
import bzrlib.errors as errors
 
48
import bzrlib.inventory
 
49
import bzrlib.iterablefile
 
50
import bzrlib.lockdir
 
51
import bzrlib.merge3
 
52
import bzrlib.osutils
 
53
import bzrlib.osutils as osutils
 
54
import bzrlib.plugin
 
55
import bzrlib.store
 
56
import bzrlib.trace
 
57
from bzrlib.transport import urlescape, get_transport
 
58
import bzrlib.transport
 
59
from bzrlib.transport.local import LocalRelpathServer
 
60
from bzrlib.transport.readonly import ReadonlyServer
 
61
from bzrlib.trace import mutter
 
62
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
63
from bzrlib.tests.treeshape import build_tree_contents
 
64
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
65
 
 
66
default_transport = LocalRelpathServer
 
67
 
 
68
MODULES_TO_TEST = []
 
69
MODULES_TO_DOCTEST = [
 
70
                      bzrlib.branch,
 
71
                      bzrlib.commands,
 
72
                      bzrlib.errors,
 
73
                      bzrlib.inventory,
 
74
                      bzrlib.iterablefile,
 
75
                      bzrlib.lockdir,
 
76
                      bzrlib.merge3,
 
77
                      bzrlib.option,
 
78
                      bzrlib.osutils,
 
79
                      bzrlib.store
 
80
                      ]
 
81
def packages_to_test():
 
82
    """Return a list of packages to test.
 
83
 
 
84
    The packages are not globally imported so that import failures are
 
85
    triggered when running selftest, not when importing the command.
 
86
    """
 
87
    import bzrlib.doc
 
88
    import bzrlib.tests.blackbox
 
89
    import bzrlib.tests.branch_implementations
 
90
    import bzrlib.tests.bzrdir_implementations
 
91
    import bzrlib.tests.interrepository_implementations
 
92
    import bzrlib.tests.interversionedfile_implementations
 
93
    import bzrlib.tests.repository_implementations
 
94
    import bzrlib.tests.revisionstore_implementations
 
95
    import bzrlib.tests.workingtree_implementations
 
96
    return [
 
97
            bzrlib.doc,
 
98
            bzrlib.tests.blackbox,
 
99
            bzrlib.tests.branch_implementations,
 
100
            bzrlib.tests.bzrdir_implementations,
 
101
            bzrlib.tests.interrepository_implementations,
 
102
            bzrlib.tests.interversionedfile_implementations,
 
103
            bzrlib.tests.repository_implementations,
 
104
            bzrlib.tests.revisionstore_implementations,
 
105
            bzrlib.tests.workingtree_implementations,
 
106
            ]
 
107
 
 
108
 
 
109
class _MyResult(unittest._TextTestResult):
 
110
    """Custom TestResult.
 
111
 
 
112
    Shows output in a different format, including displaying runtime for tests.
 
113
    """
 
114
    stop_early = False
 
115
 
 
116
    def _elapsedTime(self):
 
117
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
118
 
 
119
    def startTest(self, test):
 
120
        unittest.TestResult.startTest(self, test)
 
121
        # In a short description, the important words are in
 
122
        # the beginning, but in an id, the important words are
 
123
        # at the end
 
124
        SHOW_DESCRIPTIONS = False
 
125
        if self.showAll:
 
126
            width = osutils.terminal_width()
 
127
            name_width = width - 15
 
128
            what = None
 
129
            if SHOW_DESCRIPTIONS:
 
130
                what = test.shortDescription()
 
131
                if what:
 
132
                    if len(what) > name_width:
 
133
                        what = what[:name_width-3] + '...'
 
134
            if what is None:
 
135
                what = test.id()
 
136
                if what.startswith('bzrlib.tests.'):
 
137
                    what = what[13:]
 
138
                if len(what) > name_width:
 
139
                    what = '...' + what[3-name_width:]
 
140
            what = what.ljust(name_width)
 
141
            self.stream.write(what)
 
142
        self.stream.flush()
 
143
        self._start_time = time.time()
 
144
 
 
145
    def addError(self, test, err):
 
146
        if isinstance(err[1], TestSkipped):
 
147
            return self.addSkipped(test, err)    
 
148
        unittest.TestResult.addError(self, test, err)
 
149
        if self.showAll:
 
150
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
151
        elif self.dots:
 
152
            self.stream.write('E')
 
153
        self.stream.flush()
 
154
        if self.stop_early:
 
155
            self.stop()
 
156
 
 
157
    def addFailure(self, test, err):
 
158
        unittest.TestResult.addFailure(self, test, err)
 
159
        if self.showAll:
 
160
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
 
161
        elif self.dots:
 
162
            self.stream.write('F')
 
163
        self.stream.flush()
 
164
        if self.stop_early:
 
165
            self.stop()
 
166
 
 
167
    def addSuccess(self, test):
 
168
        if self.showAll:
 
169
            self.stream.writeln('   OK %s' % self._elapsedTime())
 
170
        elif self.dots:
 
171
            self.stream.write('~')
 
172
        self.stream.flush()
 
173
        unittest.TestResult.addSuccess(self, test)
 
174
 
 
175
    def addSkipped(self, test, skip_excinfo):
 
176
        if self.showAll:
 
177
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
178
            print >>self.stream, '     %s' % skip_excinfo[1]
 
179
        elif self.dots:
 
180
            self.stream.write('S')
 
181
        self.stream.flush()
 
182
        # seems best to treat this as success from point-of-view of unittest
 
183
        # -- it actually does nothing so it barely matters :)
 
184
        unittest.TestResult.addSuccess(self, test)
 
185
 
 
186
    def printErrorList(self, flavour, errors):
 
187
        for test, err in errors:
 
188
            self.stream.writeln(self.separator1)
 
189
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
190
            if getattr(test, '_get_log', None) is not None:
 
191
                print >>self.stream
 
192
                print >>self.stream, \
 
193
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
194
                print >>self.stream, test._get_log()
 
195
                print >>self.stream, \
 
196
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
197
            self.stream.writeln(self.separator2)
 
198
            self.stream.writeln("%s" % err)
 
199
 
 
200
 
 
201
class TextTestRunner(unittest.TextTestRunner):
 
202
    stop_on_failure = False
 
203
 
 
204
    def _makeResult(self):
 
205
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
206
        result.stop_early = self.stop_on_failure
 
207
        return result
 
208
 
 
209
 
 
210
def iter_suite_tests(suite):
 
211
    """Return all tests in a suite, recursing through nested suites"""
 
212
    for item in suite._tests:
 
213
        if isinstance(item, unittest.TestCase):
 
214
            yield item
 
215
        elif isinstance(item, unittest.TestSuite):
 
216
            for r in iter_suite_tests(item):
 
217
                yield r
 
218
        else:
 
219
            raise Exception('unknown object %r inside test suite %r'
 
220
                            % (item, suite))
 
221
 
 
222
 
 
223
class TestSkipped(Exception):
 
224
    """Indicates that a test was intentionally skipped, rather than failing."""
 
225
    # XXX: Not used yet
 
226
 
 
227
 
 
228
class CommandFailed(Exception):
 
229
    pass
 
230
 
 
231
class TestCase(unittest.TestCase):
 
232
    """Base class for bzr unit tests.
 
233
    
 
234
    Tests that need access to disk resources should subclass 
 
235
    TestCaseInTempDir not TestCase.
 
236
 
 
237
    Error and debug log messages are redirected from their usual
 
238
    location into a temporary file, the contents of which can be
 
239
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
240
    so that it can also capture file IO.  When the test completes this file
 
241
    is read into memory and removed from disk.
 
242
       
 
243
    There are also convenience functions to invoke bzr's command-line
 
244
    routine, and to build and check bzr trees.
 
245
   
 
246
    In addition to the usual method of overriding tearDown(), this class also
 
247
    allows subclasses to register functions into the _cleanups list, which is
 
248
    run in order as the object is torn down.  It's less likely this will be
 
249
    accidentally overlooked.
 
250
    """
 
251
 
 
252
    BZRPATH = 'bzr'
 
253
    _log_file_name = None
 
254
    _log_contents = ''
 
255
 
 
256
    def __init__(self, methodName='testMethod'):
 
257
        super(TestCase, self).__init__(methodName)
 
258
        self._cleanups = []
 
259
 
 
260
    def setUp(self):
 
261
        unittest.TestCase.setUp(self)
 
262
        self._cleanEnvironment()
 
263
        bzrlib.trace.disable_default_logging()
 
264
        self._startLogFile()
 
265
 
 
266
    def _ndiff_strings(self, a, b):
 
267
        """Return ndiff between two strings containing lines.
 
268
        
 
269
        A trailing newline is added if missing to make the strings
 
270
        print properly."""
 
271
        if b and b[-1] != '\n':
 
272
            b += '\n'
 
273
        if a and a[-1] != '\n':
 
274
            a += '\n'
 
275
        difflines = difflib.ndiff(a.splitlines(True),
 
276
                                  b.splitlines(True),
 
277
                                  linejunk=lambda x: False,
 
278
                                  charjunk=lambda x: False)
 
279
        return ''.join(difflines)
 
280
 
 
281
    def assertEqualDiff(self, a, b, message=None):
 
282
        """Assert two texts are equal, if not raise an exception.
 
283
        
 
284
        This is intended for use with multi-line strings where it can 
 
285
        be hard to find the differences by eye.
 
286
        """
 
287
        # TODO: perhaps override assertEquals to call this for strings?
 
288
        if a == b:
 
289
            return
 
290
        if message is None:
 
291
            message = "texts not equal:\n"
 
292
        raise AssertionError(message + 
 
293
                             self._ndiff_strings(a, b))      
 
294
        
 
295
    def assertEqualMode(self, mode, mode_test):
 
296
        self.assertEqual(mode, mode_test,
 
297
                         'mode mismatch %o != %o' % (mode, mode_test))
 
298
 
 
299
    def assertStartsWith(self, s, prefix):
 
300
        if not s.startswith(prefix):
 
301
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
302
 
 
303
    def assertEndsWith(self, s, suffix):
 
304
        if not s.endswith(prefix):
 
305
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
306
 
 
307
    def assertContainsRe(self, haystack, needle_re):
 
308
        """Assert that a contains something matching a regular expression."""
 
309
        if not re.search(needle_re, haystack):
 
310
            raise AssertionError('pattern "%s" not found in "%s"'
 
311
                    % (needle_re, haystack))
 
312
 
 
313
    def assertSubset(self, sublist, superlist):
 
314
        """Assert that every entry in sublist is present in superlist."""
 
315
        missing = []
 
316
        for entry in sublist:
 
317
            if entry not in superlist:
 
318
                missing.append(entry)
 
319
        if len(missing) > 0:
 
320
            raise AssertionError("value(s) %r not present in container %r" % 
 
321
                                 (missing, superlist))
 
322
 
 
323
    def assertIs(self, left, right):
 
324
        if not (left is right):
 
325
            raise AssertionError("%r is not %r." % (left, right))
 
326
 
 
327
    def assertTransportMode(self, transport, path, mode):
 
328
        """Fail if a path does not have mode mode.
 
329
        
 
330
        If modes are not supported on this platform, the test is skipped.
 
331
        """
 
332
        if sys.platform == 'win32':
 
333
            return
 
334
        path_stat = transport.stat(path)
 
335
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
336
        self.assertEqual(mode, actual_mode,
 
337
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
338
 
 
339
    def assertIsInstance(self, obj, kls):
 
340
        """Fail if obj is not an instance of kls"""
 
341
        if not isinstance(obj, kls):
 
342
            self.fail("%r is not an instance of %s" % (obj, kls))
 
343
 
 
344
    def _startLogFile(self):
 
345
        """Send bzr and test log messages to a temporary file.
 
346
 
 
347
        The file is removed as the test is torn down.
 
348
        """
 
349
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
350
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
351
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
352
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
353
        self._log_file_name = name
 
354
        self.addCleanup(self._finishLogFile)
 
355
 
 
356
    def _finishLogFile(self):
 
357
        """Finished with the log file.
 
358
 
 
359
        Read contents into memory, close, and delete.
 
360
        """
 
361
        bzrlib.trace.disable_test_log(self._log_nonce)
 
362
        self._log_file.seek(0)
 
363
        self._log_contents = self._log_file.read()
 
364
        self._log_file.close()
 
365
        os.remove(self._log_file_name)
 
366
        self._log_file = self._log_file_name = None
 
367
 
 
368
    def addCleanup(self, callable):
 
369
        """Arrange to run a callable when this case is torn down.
 
370
 
 
371
        Callables are run in the reverse of the order they are registered, 
 
372
        ie last-in first-out.
 
373
        """
 
374
        if callable in self._cleanups:
 
375
            raise ValueError("cleanup function %r already registered on %s" 
 
376
                    % (callable, self))
 
377
        self._cleanups.append(callable)
 
378
 
 
379
    def _cleanEnvironment(self):
 
380
        new_env = {
 
381
            'HOME': os.getcwd(),
 
382
            'APPDATA': os.getcwd(),
 
383
            'BZREMAIL': None,
 
384
            'EMAIL': None,
 
385
        }
 
386
        self.__old_env = {}
 
387
        self.addCleanup(self._restoreEnvironment)
 
388
        for name, value in new_env.iteritems():
 
389
            self._captureVar(name, value)
 
390
 
 
391
 
 
392
    def _captureVar(self, name, newvalue):
 
393
        """Set an environment variable, preparing it to be reset when finished."""
 
394
        self.__old_env[name] = os.environ.get(name, None)
 
395
        if newvalue is None:
 
396
            if name in os.environ:
 
397
                del os.environ[name]
 
398
        else:
 
399
            os.environ[name] = newvalue
 
400
 
 
401
    @staticmethod
 
402
    def _restoreVar(name, value):
 
403
        if value is None:
 
404
            if name in os.environ:
 
405
                del os.environ[name]
 
406
        else:
 
407
            os.environ[name] = value
 
408
 
 
409
    def _restoreEnvironment(self):
 
410
        for name, value in self.__old_env.iteritems():
 
411
            self._restoreVar(name, value)
 
412
 
 
413
    def tearDown(self):
 
414
        self._runCleanups()
 
415
        unittest.TestCase.tearDown(self)
 
416
 
 
417
    def _runCleanups(self):
 
418
        """Run registered cleanup functions. 
 
419
 
 
420
        This should only be called from TestCase.tearDown.
 
421
        """
 
422
        # TODO: Perhaps this should keep running cleanups even if 
 
423
        # one of them fails?
 
424
        for cleanup_fn in reversed(self._cleanups):
 
425
            cleanup_fn()
 
426
 
 
427
    def log(self, *args):
 
428
        mutter(*args)
 
429
 
 
430
    def _get_log(self):
 
431
        """Return as a string the log for this test"""
 
432
        if self._log_file_name:
 
433
            return open(self._log_file_name).read()
 
434
        else:
 
435
            return self._log_contents
 
436
        # TODO: Delete the log after it's been read in
 
437
 
 
438
    def capture(self, cmd, retcode=0):
 
439
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
440
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
441
 
 
442
    def run_bzr_captured(self, argv, retcode=0):
 
443
        """Invoke bzr and return (stdout, stderr).
 
444
 
 
445
        Useful for code that wants to check the contents of the
 
446
        output, the way error messages are presented, etc.
 
447
 
 
448
        This should be the main method for tests that want to exercise the
 
449
        overall behavior of the bzr application (rather than a unit test
 
450
        or a functional test of the library.)
 
451
 
 
452
        Much of the old code runs bzr by forking a new copy of Python, but
 
453
        that is slower, harder to debug, and generally not necessary.
 
454
 
 
455
        This runs bzr through the interface that catches and reports
 
456
        errors, and with logging set to something approximating the
 
457
        default, so that error reporting can be checked.
 
458
 
 
459
        argv -- arguments to invoke bzr
 
460
        retcode -- expected return code, or None for don't-care.
 
461
        """
 
462
        stdout = StringIO()
 
463
        stderr = StringIO()
 
464
        self.log('run bzr: %s', ' '.join(argv))
 
465
        # FIXME: don't call into logging here
 
466
        handler = logging.StreamHandler(stderr)
 
467
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
468
        handler.setLevel(logging.INFO)
 
469
        logger = logging.getLogger('')
 
470
        logger.addHandler(handler)
 
471
        try:
 
472
            result = self.apply_redirected(None, stdout, stderr,
 
473
                                           bzrlib.commands.run_bzr_catch_errors,
 
474
                                           argv)
 
475
        finally:
 
476
            logger.removeHandler(handler)
 
477
        out = stdout.getvalue()
 
478
        err = stderr.getvalue()
 
479
        if out:
 
480
            self.log('output:\n%s', out)
 
481
        if err:
 
482
            self.log('errors:\n%s', err)
 
483
        if retcode is not None:
 
484
            self.assertEquals(result, retcode)
 
485
        return out, err
 
486
 
 
487
    def run_bzr(self, *args, **kwargs):
 
488
        """Invoke bzr, as if it were run from the command line.
 
489
 
 
490
        This should be the main method for tests that want to exercise the
 
491
        overall behavior of the bzr application (rather than a unit test
 
492
        or a functional test of the library.)
 
493
 
 
494
        This sends the stdout/stderr results into the test's log,
 
495
        where it may be useful for debugging.  See also run_captured.
 
496
        """
 
497
        retcode = kwargs.pop('retcode', 0)
 
498
        return self.run_bzr_captured(args, retcode)
 
499
 
 
500
    def check_inventory_shape(self, inv, shape):
 
501
        """Compare an inventory to a list of expected names.
 
502
 
 
503
        Fail if they are not precisely equal.
 
504
        """
 
505
        extras = []
 
506
        shape = list(shape)             # copy
 
507
        for path, ie in inv.entries():
 
508
            name = path.replace('\\', '/')
 
509
            if ie.kind == 'dir':
 
510
                name = name + '/'
 
511
            if name in shape:
 
512
                shape.remove(name)
 
513
            else:
 
514
                extras.append(name)
 
515
        if shape:
 
516
            self.fail("expected paths not found in inventory: %r" % shape)
 
517
        if extras:
 
518
            self.fail("unexpected paths found in inventory: %r" % extras)
 
519
 
 
520
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
521
                         a_callable=None, *args, **kwargs):
 
522
        """Call callable with redirected std io pipes.
 
523
 
 
524
        Returns the return code."""
 
525
        if not callable(a_callable):
 
526
            raise ValueError("a_callable must be callable.")
 
527
        if stdin is None:
 
528
            stdin = StringIO("")
 
529
        if stdout is None:
 
530
            if getattr(self, "_log_file", None) is not None:
 
531
                stdout = self._log_file
 
532
            else:
 
533
                stdout = StringIO()
 
534
        if stderr is None:
 
535
            if getattr(self, "_log_file", None is not None):
 
536
                stderr = self._log_file
 
537
            else:
 
538
                stderr = StringIO()
 
539
        real_stdin = sys.stdin
 
540
        real_stdout = sys.stdout
 
541
        real_stderr = sys.stderr
 
542
        try:
 
543
            sys.stdout = stdout
 
544
            sys.stderr = stderr
 
545
            sys.stdin = stdin
 
546
            return a_callable(*args, **kwargs)
 
547
        finally:
 
548
            sys.stdout = real_stdout
 
549
            sys.stderr = real_stderr
 
550
            sys.stdin = real_stdin
 
551
 
 
552
 
 
553
BzrTestBase = TestCase
 
554
 
 
555
     
 
556
class TestCaseInTempDir(TestCase):
 
557
    """Derived class that runs a test within a temporary directory.
 
558
 
 
559
    This is useful for tests that need to create a branch, etc.
 
560
 
 
561
    The directory is created in a slightly complex way: for each
 
562
    Python invocation, a new temporary top-level directory is created.
 
563
    All test cases create their own directory within that.  If the
 
564
    tests complete successfully, the directory is removed.
 
565
 
 
566
    InTempDir is an old alias for FunctionalTestCase.
 
567
    """
 
568
 
 
569
    TEST_ROOT = None
 
570
    _TEST_NAME = 'test'
 
571
    OVERRIDE_PYTHON = 'python'
 
572
 
 
573
    def check_file_contents(self, filename, expect):
 
574
        self.log("check contents of file %s" % filename)
 
575
        contents = file(filename, 'r').read()
 
576
        if contents != expect:
 
577
            self.log("expected: %r" % expect)
 
578
            self.log("actually: %r" % contents)
 
579
            self.fail("contents of %s not as expected" % filename)
 
580
 
 
581
    def _make_test_root(self):
 
582
        if TestCaseInTempDir.TEST_ROOT is not None:
 
583
            return
 
584
        i = 0
 
585
        while True:
 
586
            root = u'test%04d.tmp' % i
 
587
            try:
 
588
                os.mkdir(root)
 
589
            except OSError, e:
 
590
                if e.errno == errno.EEXIST:
 
591
                    i += 1
 
592
                    continue
 
593
                else:
 
594
                    raise
 
595
            # successfully created
 
596
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
597
            break
 
598
        # make a fake bzr directory there to prevent any tests propagating
 
599
        # up onto the source directory's real branch
 
600
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
 
601
 
 
602
    def setUp(self):
 
603
        super(TestCaseInTempDir, self).setUp()
 
604
        self._make_test_root()
 
605
        _currentdir = os.getcwdu()
 
606
        # shorten the name, to avoid test failures due to path length
 
607
        short_id = self.id().replace('bzrlib.tests.', '') \
 
608
                   .replace('__main__.', '')[-100:]
 
609
        # it's possible the same test class is run several times for
 
610
        # parameterized tests, so make sure the names don't collide.  
 
611
        i = 0
 
612
        while True:
 
613
            if i > 0:
 
614
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
615
            else:
 
616
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
617
            if os.path.exists(candidate_dir):
 
618
                i = i + 1
 
619
                continue
 
620
            else:
 
621
                self.test_dir = candidate_dir
 
622
                os.mkdir(self.test_dir)
 
623
                os.chdir(self.test_dir)
 
624
                break
 
625
        os.environ['HOME'] = self.test_dir
 
626
        os.environ['APPDATA'] = self.test_dir
 
627
        def _leaveDirectory():
 
628
            os.chdir(_currentdir)
 
629
        self.addCleanup(_leaveDirectory)
 
630
        
 
631
    def build_tree(self, shape, line_endings='native', transport=None):
 
632
        """Build a test tree according to a pattern.
 
633
 
 
634
        shape is a sequence of file specifications.  If the final
 
635
        character is '/', a directory is created.
 
636
 
 
637
        This doesn't add anything to a branch.
 
638
        :param line_endings: Either 'binary' or 'native'
 
639
                             in binary mode, exact contents are written
 
640
                             in native mode, the line endings match the
 
641
                             default platform endings.
 
642
 
 
643
        :param transport: A transport to write to, for building trees on 
 
644
                          VFS's. If the transport is readonly or None,
 
645
                          "." is opened automatically.
 
646
        """
 
647
        # XXX: It's OK to just create them using forward slashes on windows?
 
648
        if transport is None or transport.is_readonly():
 
649
            transport = get_transport(".")
 
650
        for name in shape:
 
651
            self.assert_(isinstance(name, basestring))
 
652
            if name[-1] == '/':
 
653
                transport.mkdir(urlescape(name[:-1]))
 
654
            else:
 
655
                if line_endings == 'binary':
 
656
                    end = '\n'
 
657
                elif line_endings == 'native':
 
658
                    end = os.linesep
 
659
                else:
 
660
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
661
                content = "contents of %s%s" % (name, end)
 
662
                transport.put(urlescape(name), StringIO(content))
 
663
 
 
664
    def build_tree_contents(self, shape):
 
665
        build_tree_contents(shape)
 
666
 
 
667
    def failUnlessExists(self, path):
 
668
        """Fail unless path, which may be abs or relative, exists."""
 
669
        self.failUnless(osutils.lexists(path))
 
670
 
 
671
    def failIfExists(self, path):
 
672
        """Fail if path, which may be abs or relative, exists."""
 
673
        self.failIf(osutils.lexists(path))
 
674
        
 
675
    def assertFileEqual(self, content, path):
 
676
        """Fail if path does not contain 'content'."""
 
677
        self.failUnless(osutils.lexists(path))
 
678
        self.assertEqualDiff(content, open(path, 'r').read())
 
679
 
 
680
 
 
681
class TestCaseWithTransport(TestCaseInTempDir):
 
682
    """A test case that provides get_url and get_readonly_url facilities.
 
683
 
 
684
    These back onto two transport servers, one for readonly access and one for
 
685
    read write access.
 
686
 
 
687
    If no explicit class is provided for readonly access, a
 
688
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
689
    based read write transports.
 
690
 
 
691
    If an explicit class is provided for readonly access, that server and the 
 
692
    readwrite one must both define get_url() as resolving to os.getcwd().
 
693
    """
 
694
 
 
695
    def __init__(self, methodName='testMethod'):
 
696
        super(TestCaseWithTransport, self).__init__(methodName)
 
697
        self.__readonly_server = None
 
698
        self.__server = None
 
699
        self.transport_server = default_transport
 
700
        self.transport_readonly_server = None
 
701
 
 
702
    def get_readonly_url(self, relpath=None):
 
703
        """Get a URL for the readonly transport.
 
704
 
 
705
        This will either be backed by '.' or a decorator to the transport 
 
706
        used by self.get_url()
 
707
        relpath provides for clients to get a path relative to the base url.
 
708
        These should only be downwards relative, not upwards.
 
709
        """
 
710
        base = self.get_readonly_server().get_url()
 
711
        if relpath is not None:
 
712
            if not base.endswith('/'):
 
713
                base = base + '/'
 
714
            base = base + relpath
 
715
        return base
 
716
 
 
717
    def get_readonly_server(self):
 
718
        """Get the server instance for the readonly transport
 
719
 
 
720
        This is useful for some tests with specific servers to do diagnostics.
 
721
        """
 
722
        if self.__readonly_server is None:
 
723
            if self.transport_readonly_server is None:
 
724
                # readonly decorator requested
 
725
                # bring up the server
 
726
                self.get_url()
 
727
                self.__readonly_server = ReadonlyServer()
 
728
                self.__readonly_server.setUp(self.__server)
 
729
            else:
 
730
                self.__readonly_server = self.transport_readonly_server()
 
731
                self.__readonly_server.setUp()
 
732
            self.addCleanup(self.__readonly_server.tearDown)
 
733
        return self.__readonly_server
 
734
 
 
735
    def get_server(self):
 
736
        """Get the read/write server instance.
 
737
 
 
738
        This is useful for some tests with specific servers that need
 
739
        diagnostics.
 
740
        """
 
741
        if self.__server is None:
 
742
            self.__server = self.transport_server()
 
743
            self.__server.setUp()
 
744
            self.addCleanup(self.__server.tearDown)
 
745
        return self.__server
 
746
 
 
747
    def get_url(self, relpath=None):
 
748
        """Get a URL for the readwrite transport.
 
749
 
 
750
        This will either be backed by '.' or to an equivalent non-file based
 
751
        facility.
 
752
        relpath provides for clients to get a path relative to the base url.
 
753
        These should only be downwards relative, not upwards.
 
754
        """
 
755
        base = self.get_server().get_url()
 
756
        if relpath is not None and relpath != '.':
 
757
            if not base.endswith('/'):
 
758
                base = base + '/'
 
759
            base = base + relpath
 
760
        return base
 
761
 
 
762
    def get_transport(self):
 
763
        """Return a writeable transport for the test scratch space"""
 
764
        t = get_transport(self.get_url())
 
765
        self.assertFalse(t.is_readonly())
 
766
        return t
 
767
 
 
768
    def get_readonly_transport(self):
 
769
        """Return a readonly transport for the test scratch space
 
770
        
 
771
        This can be used to test that operations which should only need
 
772
        readonly access in fact do not try to write.
 
773
        """
 
774
        t = get_transport(self.get_readonly_url())
 
775
        self.assertTrue(t.is_readonly())
 
776
        return t
 
777
 
 
778
    def make_branch(self, relpath):
 
779
        """Create a branch on the transport at relpath."""
 
780
        repo = self.make_repository(relpath)
 
781
        return repo.bzrdir.create_branch()
 
782
 
 
783
    def make_bzrdir(self, relpath):
 
784
        try:
 
785
            url = self.get_url(relpath)
 
786
            segments = relpath.split('/')
 
787
            if segments and segments[-1] not in ('', '.'):
 
788
                parent = self.get_url('/'.join(segments[:-1]))
 
789
                t = get_transport(parent)
 
790
                try:
 
791
                    t.mkdir(segments[-1])
 
792
                except errors.FileExists:
 
793
                    pass
 
794
            return bzrlib.bzrdir.BzrDir.create(url)
 
795
        except errors.UninitializableFormat:
 
796
            raise TestSkipped("Format %s is not initializable.")
 
797
 
 
798
    def make_repository(self, relpath, shared=False):
 
799
        """Create a repository on our default transport at relpath."""
 
800
        made_control = self.make_bzrdir(relpath)
 
801
        return made_control.create_repository(shared=shared)
 
802
 
 
803
    def make_branch_and_tree(self, relpath):
 
804
        """Create a branch on the transport and a tree locally.
 
805
 
 
806
        Returns the tree.
 
807
        """
 
808
        # TODO: always use the local disk path for the working tree,
 
809
        # this obviously requires a format that supports branch references
 
810
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
811
        # RBC 20060208
 
812
        b = self.make_branch(relpath)
 
813
        try:
 
814
            return b.bzrdir.create_workingtree()
 
815
        except errors.NotLocalUrl:
 
816
            # new formats - catch No tree error and create
 
817
            # a branch reference and a checkout.
 
818
            # old formats at that point - raise TestSkipped.
 
819
            # TODO: rbc 20060208
 
820
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
 
821
 
 
822
    def assertIsDirectory(self, relpath, transport):
 
823
        """Assert that relpath within transport is a directory.
 
824
 
 
825
        This may not be possible on all transports; in that case it propagates
 
826
        a TransportNotPossible.
 
827
        """
 
828
        try:
 
829
            mode = transport.stat(relpath).st_mode
 
830
        except errors.NoSuchFile:
 
831
            self.fail("path %s is not a directory; no such file"
 
832
                      % (relpath))
 
833
        if not stat.S_ISDIR(mode):
 
834
            self.fail("path %s is not a directory; has mode %#o"
 
835
                      % (relpath, mode))
 
836
 
 
837
 
 
838
class ChrootedTestCase(TestCaseWithTransport):
 
839
    """A support class that provides readonly urls outside the local namespace.
 
840
 
 
841
    This is done by checking if self.transport_server is a MemoryServer. if it
 
842
    is then we are chrooted already, if it is not then an HttpServer is used
 
843
    for readonly urls.
 
844
 
 
845
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
846
                       be used without needed to redo it when a different 
 
847
                       subclass is in use ?
 
848
    """
 
849
 
 
850
    def setUp(self):
 
851
        super(ChrootedTestCase, self).setUp()
 
852
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
853
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
854
 
 
855
 
 
856
def filter_suite_by_re(suite, pattern):
 
857
    result = TestSuite()
 
858
    filter_re = re.compile(pattern)
 
859
    for test in iter_suite_tests(suite):
 
860
        if filter_re.search(test.id()):
 
861
            result.addTest(test)
 
862
    return result
 
863
 
 
864
 
 
865
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
866
              stop_on_failure=False, keep_output=False,
 
867
              transport=None):
 
868
    TestCaseInTempDir._TEST_NAME = name
 
869
    if verbose:
 
870
        verbosity = 2
 
871
    else:
 
872
        verbosity = 1
 
873
    runner = TextTestRunner(stream=sys.stdout,
 
874
                            descriptions=0,
 
875
                            verbosity=verbosity)
 
876
    runner.stop_on_failure=stop_on_failure
 
877
    if pattern != '.*':
 
878
        suite = filter_suite_by_re(suite, pattern)
 
879
    result = runner.run(suite)
 
880
    # This is still a little bogus, 
 
881
    # but only a little. Folk not using our testrunner will
 
882
    # have to delete their temp directories themselves.
 
883
    test_root = TestCaseInTempDir.TEST_ROOT
 
884
    if result.wasSuccessful() or not keep_output:
 
885
        if test_root is not None:
 
886
            print 'Deleting test root %s...' % test_root
 
887
            try:
 
888
                shutil.rmtree(test_root)
 
889
            finally:
 
890
                print
 
891
    else:
 
892
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
893
    return result.wasSuccessful()
 
894
 
 
895
 
 
896
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
897
             keep_output=False,
 
898
             transport=None):
 
899
    """Run the whole test suite under the enhanced runner"""
 
900
    global default_transport
 
901
    if transport is None:
 
902
        transport = default_transport
 
903
    old_transport = default_transport
 
904
    default_transport = transport
 
905
    suite = test_suite()
 
906
    try:
 
907
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
908
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
909
                     transport=transport)
 
910
    finally:
 
911
        default_transport = old_transport
 
912
 
 
913
 
 
914
 
 
915
def test_suite():
 
916
    """Build and return TestSuite for the whole program."""
 
917
    from doctest import DocTestSuite
 
918
 
 
919
    global MODULES_TO_DOCTEST
 
920
 
 
921
    testmod_names = [ \
 
922
                   'bzrlib.tests.test_ancestry',
 
923
                   'bzrlib.tests.test_annotate',
 
924
                   'bzrlib.tests.test_api',
 
925
                   'bzrlib.tests.test_bad_files',
 
926
                   'bzrlib.tests.test_basis_inventory',
 
927
                   'bzrlib.tests.test_branch',
 
928
                   'bzrlib.tests.test_bzrdir',
 
929
                   'bzrlib.tests.test_command',
 
930
                   'bzrlib.tests.test_commit',
 
931
                   'bzrlib.tests.test_commit_merge',
 
932
                   'bzrlib.tests.test_config',
 
933
                   'bzrlib.tests.test_conflicts',
 
934
                   'bzrlib.tests.test_decorators',
 
935
                   'bzrlib.tests.test_diff',
 
936
                   'bzrlib.tests.test_doc_generate',
 
937
                   'bzrlib.tests.test_errors',
 
938
                   'bzrlib.tests.test_fetch',
 
939
                   'bzrlib.tests.test_gpg',
 
940
                   'bzrlib.tests.test_graph',
 
941
                   'bzrlib.tests.test_hashcache',
 
942
                   'bzrlib.tests.test_http',
 
943
                   'bzrlib.tests.test_identitymap',
 
944
                   'bzrlib.tests.test_inv',
 
945
                   'bzrlib.tests.test_knit',
 
946
                   'bzrlib.tests.test_lockdir',
 
947
                   'bzrlib.tests.test_lockable_files',
 
948
                   'bzrlib.tests.test_log',
 
949
                   'bzrlib.tests.test_merge',
 
950
                   'bzrlib.tests.test_merge3',
 
951
                   'bzrlib.tests.test_merge_core',
 
952
                   'bzrlib.tests.test_missing',
 
953
                   'bzrlib.tests.test_msgeditor',
 
954
                   'bzrlib.tests.test_nonascii',
 
955
                   'bzrlib.tests.test_options',
 
956
                   'bzrlib.tests.test_osutils',
 
957
                   'bzrlib.tests.test_permissions',
 
958
                   'bzrlib.tests.test_plugins',
 
959
                   'bzrlib.tests.test_progress',
 
960
                   'bzrlib.tests.test_reconcile',
 
961
                   'bzrlib.tests.test_repository',
 
962
                   'bzrlib.tests.test_revision',
 
963
                   'bzrlib.tests.test_revisionnamespaces',
 
964
                   'bzrlib.tests.test_revprops',
 
965
                   'bzrlib.tests.test_rio',
 
966
                   'bzrlib.tests.test_sampler',
 
967
                   'bzrlib.tests.test_selftest',
 
968
                   'bzrlib.tests.test_setup',
 
969
                   'bzrlib.tests.test_sftp_transport',
 
970
                   'bzrlib.tests.test_smart_add',
 
971
                   'bzrlib.tests.test_source',
 
972
                   'bzrlib.tests.test_store',
 
973
                   'bzrlib.tests.test_symbol_versioning',
 
974
                   'bzrlib.tests.test_testament',
 
975
                   'bzrlib.tests.test_trace',
 
976
                   'bzrlib.tests.test_transactions',
 
977
                   'bzrlib.tests.test_transform',
 
978
                   'bzrlib.tests.test_transport',
 
979
                   'bzrlib.tests.test_tsort',
 
980
                   'bzrlib.tests.test_ui',
 
981
                   'bzrlib.tests.test_uncommit',
 
982
                   'bzrlib.tests.test_upgrade',
 
983
                   'bzrlib.tests.test_versionedfile',
 
984
                   'bzrlib.tests.test_weave',
 
985
                   'bzrlib.tests.test_whitebox',
 
986
                   'bzrlib.tests.test_workingtree',
 
987
                   'bzrlib.tests.test_xml',
 
988
                   ]
 
989
    test_transport_implementations = [
 
990
        'bzrlib.tests.test_transport_implementations']
 
991
 
 
992
    TestCase.BZRPATH = osutils.pathjoin(
 
993
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
 
994
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
 
995
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
 
996
    print
 
997
    suite = TestSuite()
 
998
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
 
999
    # errors if it fails to load a named module - no indication of what's
 
1000
    # actually wrong, just "no such module".  We should probably override that
 
1001
    # class, but for the moment just load them ourselves. (mbp 20051202)
 
1002
    loader = TestLoader()
 
1003
    from bzrlib.transport import TransportTestProviderAdapter
 
1004
    adapter = TransportTestProviderAdapter()
 
1005
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1006
    for mod_name in testmod_names:
 
1007
        mod = _load_module_by_name(mod_name)
 
1008
        suite.addTest(loader.loadTestsFromModule(mod))
 
1009
    for package in packages_to_test():
 
1010
        suite.addTest(package.test_suite())
 
1011
    for m in MODULES_TO_TEST:
 
1012
        suite.addTest(loader.loadTestsFromModule(m))
 
1013
    for m in (MODULES_TO_DOCTEST):
 
1014
        suite.addTest(DocTestSuite(m))
 
1015
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1016
        if getattr(plugin, 'test_suite', None) is not None:
 
1017
            suite.addTest(plugin.test_suite())
 
1018
    return suite
 
1019
 
 
1020
 
 
1021
def adapt_modules(mods_list, adapter, loader, suite):
 
1022
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1023
    for mod_name in mods_list:
 
1024
        mod = _load_module_by_name(mod_name)
 
1025
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
 
1026
            suite.addTests(adapter.adapt(test))
 
1027
 
 
1028
 
 
1029
def _load_module_by_name(mod_name):
 
1030
    parts = mod_name.split('.')
 
1031
    module = __import__(mod_name)
 
1032
    del parts[0]
 
1033
    # for historical reasons python returns the top-level module even though
 
1034
    # it loads the submodule; we need to walk down to get the one we want.
 
1035
    while parts:
 
1036
        module = getattr(module, parts.pop(0))
 
1037
    return module