~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

[merge] Erik Bågfors: add --revision to bzr pull

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
from cStringIO import StringIO
 
25
import difflib
 
26
import errno
 
27
import logging
 
28
import os
 
29
import re
 
30
import shutil
 
31
import stat
 
32
import sys
 
33
import tempfile
 
34
import unittest
 
35
import time
 
36
import codecs
 
37
 
 
38
import bzrlib.branch
 
39
import bzrlib.commands
 
40
from bzrlib.errors import (BzrError,
 
41
                           FileExists,
 
42
                           UninitializableFormat,
 
43
                           )
 
44
import bzrlib.inventory
 
45
import bzrlib.iterablefile
 
46
import bzrlib.merge3
 
47
import bzrlib.osutils
 
48
import bzrlib.osutils as osutils
 
49
import bzrlib.plugin
 
50
import bzrlib.store
 
51
import bzrlib.trace
 
52
from bzrlib.transport import urlescape
 
53
import bzrlib.transport
 
54
from bzrlib.transport.local import LocalRelpathServer
 
55
from bzrlib.transport.readonly import ReadonlyServer
 
56
from bzrlib.trace import mutter
 
57
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
58
from bzrlib.tests.treeshape import build_tree_contents
 
59
from bzrlib.workingtree import WorkingTree
 
60
 
 
61
default_transport = LocalRelpathServer
 
62
 
 
63
MODULES_TO_TEST = []
 
64
MODULES_TO_DOCTEST = [
 
65
                      bzrlib.branch,
 
66
                      bzrlib.commands,
 
67
                      bzrlib.errors,
 
68
                      bzrlib.inventory,
 
69
                      bzrlib.iterablefile,
 
70
                      bzrlib.merge3,
 
71
                      bzrlib.option,
 
72
                      bzrlib.osutils,
 
73
                      bzrlib.store
 
74
                      ]
 
75
def packages_to_test():
 
76
    """Return a list of packages to test.
 
77
 
 
78
    The packages are not globally imported so that import failures are
 
79
    triggered when running selftest, not when importing the command.
 
80
    """
 
81
    import bzrlib.doc
 
82
    import bzrlib.tests.blackbox
 
83
    import bzrlib.tests.branch_implementations
 
84
    return [
 
85
            bzrlib.doc,
 
86
            bzrlib.tests.blackbox,
 
87
            bzrlib.tests.branch_implementations,
 
88
            ]
 
89
 
 
90
 
 
91
class _MyResult(unittest._TextTestResult):
 
92
    """Custom TestResult.
 
93
 
 
94
    Shows output in a different format, including displaying runtime for tests.
 
95
    """
 
96
    stop_early = False
 
97
 
 
98
    def _elapsedTime(self):
 
99
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
100
 
 
101
    def startTest(self, test):
 
102
        unittest.TestResult.startTest(self, test)
 
103
        # In a short description, the important words are in
 
104
        # the beginning, but in an id, the important words are
 
105
        # at the end
 
106
        SHOW_DESCRIPTIONS = False
 
107
        if self.showAll:
 
108
            width = osutils.terminal_width()
 
109
            name_width = width - 15
 
110
            what = None
 
111
            if SHOW_DESCRIPTIONS:
 
112
                what = test.shortDescription()
 
113
                if what:
 
114
                    if len(what) > name_width:
 
115
                        what = what[:name_width-3] + '...'
 
116
            if what is None:
 
117
                what = test.id()
 
118
                if what.startswith('bzrlib.tests.'):
 
119
                    what = what[13:]
 
120
                if len(what) > name_width:
 
121
                    what = '...' + what[3-name_width:]
 
122
            what = what.ljust(name_width)
 
123
            self.stream.write(what)
 
124
        self.stream.flush()
 
125
        self._start_time = time.time()
 
126
 
 
127
    def addError(self, test, err):
 
128
        if isinstance(err[1], TestSkipped):
 
129
            return self.addSkipped(test, err)    
 
130
        unittest.TestResult.addError(self, test, err)
 
131
        if self.showAll:
 
132
            self.stream.writeln("ERROR %s" % self._elapsedTime())
 
133
        elif self.dots:
 
134
            self.stream.write('E')
 
135
        self.stream.flush()
 
136
        if self.stop_early:
 
137
            self.stop()
 
138
 
 
139
    def addFailure(self, test, err):
 
140
        unittest.TestResult.addFailure(self, test, err)
 
141
        if self.showAll:
 
142
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
 
143
        elif self.dots:
 
144
            self.stream.write('F')
 
145
        self.stream.flush()
 
146
        if self.stop_early:
 
147
            self.stop()
 
148
 
 
149
    def addSuccess(self, test):
 
150
        if self.showAll:
 
151
            self.stream.writeln('   OK %s' % self._elapsedTime())
 
152
        elif self.dots:
 
153
            self.stream.write('~')
 
154
        self.stream.flush()
 
155
        unittest.TestResult.addSuccess(self, test)
 
156
 
 
157
    def addSkipped(self, test, skip_excinfo):
 
158
        if self.showAll:
 
159
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
 
160
            print >>self.stream, '     %s' % skip_excinfo[1]
 
161
        elif self.dots:
 
162
            self.stream.write('S')
 
163
        self.stream.flush()
 
164
        # seems best to treat this as success from point-of-view of unittest
 
165
        # -- it actually does nothing so it barely matters :)
 
166
        unittest.TestResult.addSuccess(self, test)
 
167
 
 
168
    def printErrorList(self, flavour, errors):
 
169
        for test, err in errors:
 
170
            self.stream.writeln(self.separator1)
 
171
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
172
            if getattr(test, '_get_log', None) is not None:
 
173
                print >>self.stream
 
174
                print >>self.stream, \
 
175
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
176
                print >>self.stream, test._get_log()
 
177
                print >>self.stream, \
 
178
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
179
            self.stream.writeln(self.separator2)
 
180
            self.stream.writeln("%s" % err)
 
181
 
 
182
 
 
183
class TextTestRunner(unittest.TextTestRunner):
 
184
    stop_on_failure = False
 
185
 
 
186
    def _makeResult(self):
 
187
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
188
        result.stop_early = self.stop_on_failure
 
189
        return result
 
190
 
 
191
 
 
192
def iter_suite_tests(suite):
 
193
    """Return all tests in a suite, recursing through nested suites"""
 
194
    for item in suite._tests:
 
195
        if isinstance(item, unittest.TestCase):
 
196
            yield item
 
197
        elif isinstance(item, unittest.TestSuite):
 
198
            for r in iter_suite_tests(item):
 
199
                yield r
 
200
        else:
 
201
            raise Exception('unknown object %r inside test suite %r'
 
202
                            % (item, suite))
 
203
 
 
204
 
 
205
class TestSkipped(Exception):
 
206
    """Indicates that a test was intentionally skipped, rather than failing."""
 
207
    # XXX: Not used yet
 
208
 
 
209
 
 
210
class CommandFailed(Exception):
 
211
    pass
 
212
 
 
213
class TestCase(unittest.TestCase):
 
214
    """Base class for bzr unit tests.
 
215
    
 
216
    Tests that need access to disk resources should subclass 
 
217
    TestCaseInTempDir not TestCase.
 
218
 
 
219
    Error and debug log messages are redirected from their usual
 
220
    location into a temporary file, the contents of which can be
 
221
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
222
    so that it can also capture file IO.  When the test completes this file
 
223
    is read into memory and removed from disk.
 
224
       
 
225
    There are also convenience functions to invoke bzr's command-line
 
226
    routine, and to build and check bzr trees.
 
227
   
 
228
    In addition to the usual method of overriding tearDown(), this class also
 
229
    allows subclasses to register functions into the _cleanups list, which is
 
230
    run in order as the object is torn down.  It's less likely this will be
 
231
    accidentally overlooked.
 
232
    """
 
233
 
 
234
    BZRPATH = 'bzr'
 
235
    _log_file_name = None
 
236
    _log_contents = ''
 
237
 
 
238
    def __init__(self, methodName='testMethod'):
 
239
        super(TestCase, self).__init__(methodName)
 
240
        self._cleanups = []
 
241
 
 
242
    def setUp(self):
 
243
        unittest.TestCase.setUp(self)
 
244
        self._cleanEnvironment()
 
245
        bzrlib.trace.disable_default_logging()
 
246
        self._startLogFile()
 
247
 
 
248
    def _ndiff_strings(self, a, b):
 
249
        """Return ndiff between two strings containing lines.
 
250
        
 
251
        A trailing newline is added if missing to make the strings
 
252
        print properly."""
 
253
        if b and b[-1] != '\n':
 
254
            b += '\n'
 
255
        if a and a[-1] != '\n':
 
256
            a += '\n'
 
257
        difflines = difflib.ndiff(a.splitlines(True),
 
258
                                  b.splitlines(True),
 
259
                                  linejunk=lambda x: False,
 
260
                                  charjunk=lambda x: False)
 
261
        return ''.join(difflines)
 
262
 
 
263
    def assertEqualDiff(self, a, b):
 
264
        """Assert two texts are equal, if not raise an exception.
 
265
        
 
266
        This is intended for use with multi-line strings where it can 
 
267
        be hard to find the differences by eye.
 
268
        """
 
269
        # TODO: perhaps override assertEquals to call this for strings?
 
270
        if a == b:
 
271
            return
 
272
        raise AssertionError("texts not equal:\n" + 
 
273
                             self._ndiff_strings(a, b))      
 
274
        
 
275
    def assertStartsWith(self, s, prefix):
 
276
        if not s.startswith(prefix):
 
277
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
278
 
 
279
    def assertEndsWith(self, s, suffix):
 
280
        if not s.endswith(prefix):
 
281
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
282
 
 
283
    def assertContainsRe(self, haystack, needle_re):
 
284
        """Assert that a contains something matching a regular expression."""
 
285
        if not re.search(needle_re, haystack):
 
286
            raise AssertionError('pattern "%s" not found in "%s"'
 
287
                    % (needle_re, haystack))
 
288
 
 
289
    def AssertSubset(self, sublist, superlist):
 
290
        """Assert that every entry in sublist is present in superlist."""
 
291
        missing = []
 
292
        for entry in sublist:
 
293
            if entry not in superlist:
 
294
                missing.append(entry)
 
295
        if len(missing) > 0:
 
296
            raise AssertionError("value(s) %r not present in container %r" % 
 
297
                                 (missing, superlist))
 
298
 
 
299
    def assertIs(self, left, right):
 
300
        if not (left is right):
 
301
            raise AssertionError("%r is not %r." % (left, right))
 
302
 
 
303
    def assertTransportMode(self, transport, path, mode):
 
304
        """Fail if a path does not have mode mode.
 
305
        
 
306
        If modes are not supported on this platform, the test is skipped.
 
307
        """
 
308
        if sys.platform == 'win32':
 
309
            return
 
310
        path_stat = transport.stat(path)
 
311
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
312
        self.assertEqual(mode, actual_mode,
 
313
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
314
 
 
315
    def _startLogFile(self):
 
316
        """Send bzr and test log messages to a temporary file.
 
317
 
 
318
        The file is removed as the test is torn down.
 
319
        """
 
320
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
321
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
 
322
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
 
323
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
324
        self._log_file_name = name
 
325
        self.addCleanup(self._finishLogFile)
 
326
 
 
327
    def _finishLogFile(self):
 
328
        """Finished with the log file.
 
329
 
 
330
        Read contents into memory, close, and delete.
 
331
        """
 
332
        bzrlib.trace.disable_test_log(self._log_nonce)
 
333
        self._log_file.seek(0)
 
334
        self._log_contents = self._log_file.read()
 
335
        self._log_file.close()
 
336
        os.remove(self._log_file_name)
 
337
        self._log_file = self._log_file_name = None
 
338
 
 
339
    def addCleanup(self, callable):
 
340
        """Arrange to run a callable when this case is torn down.
 
341
 
 
342
        Callables are run in the reverse of the order they are registered, 
 
343
        ie last-in first-out.
 
344
        """
 
345
        if callable in self._cleanups:
 
346
            raise ValueError("cleanup function %r already registered on %s" 
 
347
                    % (callable, self))
 
348
        self._cleanups.append(callable)
 
349
 
 
350
    def _cleanEnvironment(self):
 
351
        new_env = {
 
352
            'HOME': os.getcwd(),
 
353
            'APPDATA': os.getcwd(),
 
354
            'BZREMAIL': None,
 
355
            'EMAIL': None,
 
356
        }
 
357
        self.__old_env = {}
 
358
        self.addCleanup(self._restoreEnvironment)
 
359
        for name, value in new_env.iteritems():
 
360
            self._captureVar(name, value)
 
361
 
 
362
 
 
363
    def _captureVar(self, name, newvalue):
 
364
        """Set an environment variable, preparing it to be reset when finished."""
 
365
        self.__old_env[name] = os.environ.get(name, None)
 
366
        if newvalue is None:
 
367
            if name in os.environ:
 
368
                del os.environ[name]
 
369
        else:
 
370
            os.environ[name] = newvalue
 
371
 
 
372
    @staticmethod
 
373
    def _restoreVar(name, value):
 
374
        if value is None:
 
375
            if name in os.environ:
 
376
                del os.environ[name]
 
377
        else:
 
378
            os.environ[name] = value
 
379
 
 
380
    def _restoreEnvironment(self):
 
381
        for name, value in self.__old_env.iteritems():
 
382
            self._restoreVar(name, value)
 
383
 
 
384
    def tearDown(self):
 
385
        self._runCleanups()
 
386
        unittest.TestCase.tearDown(self)
 
387
 
 
388
    def _runCleanups(self):
 
389
        """Run registered cleanup functions. 
 
390
 
 
391
        This should only be called from TestCase.tearDown.
 
392
        """
 
393
        # TODO: Perhaps this should keep running cleanups even if 
 
394
        # one of them fails?
 
395
        for cleanup_fn in reversed(self._cleanups):
 
396
            cleanup_fn()
 
397
 
 
398
    def log(self, *args):
 
399
        mutter(*args)
 
400
 
 
401
    def _get_log(self):
 
402
        """Return as a string the log for this test"""
 
403
        if self._log_file_name:
 
404
            return open(self._log_file_name).read()
 
405
        else:
 
406
            return self._log_contents
 
407
        # TODO: Delete the log after it's been read in
 
408
 
 
409
    def capture(self, cmd, retcode=0):
 
410
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
411
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
412
 
 
413
    def run_bzr_captured(self, argv, retcode=0):
 
414
        """Invoke bzr and return (stdout, stderr).
 
415
 
 
416
        Useful for code that wants to check the contents of the
 
417
        output, the way error messages are presented, etc.
 
418
 
 
419
        This should be the main method for tests that want to exercise the
 
420
        overall behavior of the bzr application (rather than a unit test
 
421
        or a functional test of the library.)
 
422
 
 
423
        Much of the old code runs bzr by forking a new copy of Python, but
 
424
        that is slower, harder to debug, and generally not necessary.
 
425
 
 
426
        This runs bzr through the interface that catches and reports
 
427
        errors, and with logging set to something approximating the
 
428
        default, so that error reporting can be checked.
 
429
 
 
430
        argv -- arguments to invoke bzr
 
431
        retcode -- expected return code, or None for don't-care.
 
432
        """
 
433
        stdout = StringIO()
 
434
        stderr = StringIO()
 
435
        self.log('run bzr: %s', ' '.join(argv))
 
436
        # FIXME: don't call into logging here
 
437
        handler = logging.StreamHandler(stderr)
 
438
        handler.setFormatter(bzrlib.trace.QuietFormatter())
 
439
        handler.setLevel(logging.INFO)
 
440
        logger = logging.getLogger('')
 
441
        logger.addHandler(handler)
 
442
        try:
 
443
            result = self.apply_redirected(None, stdout, stderr,
 
444
                                           bzrlib.commands.run_bzr_catch_errors,
 
445
                                           argv)
 
446
        finally:
 
447
            logger.removeHandler(handler)
 
448
        out = stdout.getvalue()
 
449
        err = stderr.getvalue()
 
450
        if out:
 
451
            self.log('output:\n%s', out)
 
452
        if err:
 
453
            self.log('errors:\n%s', err)
 
454
        if retcode is not None:
 
455
            self.assertEquals(result, retcode)
 
456
        return out, err
 
457
 
 
458
    def run_bzr(self, *args, **kwargs):
 
459
        """Invoke bzr, as if it were run from the command line.
 
460
 
 
461
        This should be the main method for tests that want to exercise the
 
462
        overall behavior of the bzr application (rather than a unit test
 
463
        or a functional test of the library.)
 
464
 
 
465
        This sends the stdout/stderr results into the test's log,
 
466
        where it may be useful for debugging.  See also run_captured.
 
467
        """
 
468
        retcode = kwargs.pop('retcode', 0)
 
469
        return self.run_bzr_captured(args, retcode)
 
470
 
 
471
    def check_inventory_shape(self, inv, shape):
 
472
        """Compare an inventory to a list of expected names.
 
473
 
 
474
        Fail if they are not precisely equal.
 
475
        """
 
476
        extras = []
 
477
        shape = list(shape)             # copy
 
478
        for path, ie in inv.entries():
 
479
            name = path.replace('\\', '/')
 
480
            if ie.kind == 'dir':
 
481
                name = name + '/'
 
482
            if name in shape:
 
483
                shape.remove(name)
 
484
            else:
 
485
                extras.append(name)
 
486
        if shape:
 
487
            self.fail("expected paths not found in inventory: %r" % shape)
 
488
        if extras:
 
489
            self.fail("unexpected paths found in inventory: %r" % extras)
 
490
 
 
491
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
492
                         a_callable=None, *args, **kwargs):
 
493
        """Call callable with redirected std io pipes.
 
494
 
 
495
        Returns the return code."""
 
496
        if not callable(a_callable):
 
497
            raise ValueError("a_callable must be callable.")
 
498
        if stdin is None:
 
499
            stdin = StringIO("")
 
500
        if stdout is None:
 
501
            if getattr(self, "_log_file", None) is not None:
 
502
                stdout = self._log_file
 
503
            else:
 
504
                stdout = StringIO()
 
505
        if stderr is None:
 
506
            if getattr(self, "_log_file", None is not None):
 
507
                stderr = self._log_file
 
508
            else:
 
509
                stderr = StringIO()
 
510
        real_stdin = sys.stdin
 
511
        real_stdout = sys.stdout
 
512
        real_stderr = sys.stderr
 
513
        try:
 
514
            sys.stdout = stdout
 
515
            sys.stderr = stderr
 
516
            sys.stdin = stdin
 
517
            return a_callable(*args, **kwargs)
 
518
        finally:
 
519
            sys.stdout = real_stdout
 
520
            sys.stderr = real_stderr
 
521
            sys.stdin = real_stdin
 
522
 
 
523
 
 
524
BzrTestBase = TestCase
 
525
 
 
526
     
 
527
class TestCaseInTempDir(TestCase):
 
528
    """Derived class that runs a test within a temporary directory.
 
529
 
 
530
    This is useful for tests that need to create a branch, etc.
 
531
 
 
532
    The directory is created in a slightly complex way: for each
 
533
    Python invocation, a new temporary top-level directory is created.
 
534
    All test cases create their own directory within that.  If the
 
535
    tests complete successfully, the directory is removed.
 
536
 
 
537
    InTempDir is an old alias for FunctionalTestCase.
 
538
    """
 
539
 
 
540
    TEST_ROOT = None
 
541
    _TEST_NAME = 'test'
 
542
    OVERRIDE_PYTHON = 'python'
 
543
 
 
544
    def check_file_contents(self, filename, expect):
 
545
        self.log("check contents of file %s" % filename)
 
546
        contents = file(filename, 'r').read()
 
547
        if contents != expect:
 
548
            self.log("expected: %r" % expect)
 
549
            self.log("actually: %r" % contents)
 
550
            self.fail("contents of %s not as expected" % filename)
 
551
 
 
552
    def _make_test_root(self):
 
553
        if TestCaseInTempDir.TEST_ROOT is not None:
 
554
            return
 
555
        i = 0
 
556
        while True:
 
557
            root = u'test%04d.tmp' % i
 
558
            try:
 
559
                os.mkdir(root)
 
560
            except OSError, e:
 
561
                if e.errno == errno.EEXIST:
 
562
                    i += 1
 
563
                    continue
 
564
                else:
 
565
                    raise
 
566
            # successfully created
 
567
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
568
            break
 
569
        # make a fake bzr directory there to prevent any tests propagating
 
570
        # up onto the source directory's real branch
 
571
        os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
 
572
 
 
573
    def setUp(self):
 
574
        super(TestCaseInTempDir, self).setUp()
 
575
        self._make_test_root()
 
576
        _currentdir = os.getcwdu()
 
577
        short_id = self.id().replace('bzrlib.tests.', '') \
 
578
                   .replace('__main__.', '')
 
579
        self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
 
580
        os.mkdir(self.test_dir)
 
581
        os.chdir(self.test_dir)
 
582
        os.environ['HOME'] = self.test_dir
 
583
        os.environ['APPDATA'] = self.test_dir
 
584
        def _leaveDirectory():
 
585
            os.chdir(_currentdir)
 
586
        self.addCleanup(_leaveDirectory)
 
587
        
 
588
    def build_tree(self, shape, line_endings='native', transport=None):
 
589
        """Build a test tree according to a pattern.
 
590
 
 
591
        shape is a sequence of file specifications.  If the final
 
592
        character is '/', a directory is created.
 
593
 
 
594
        This doesn't add anything to a branch.
 
595
        :param line_endings: Either 'binary' or 'native'
 
596
                             in binary mode, exact contents are written
 
597
                             in native mode, the line endings match the
 
598
                             default platform endings.
 
599
 
 
600
        :param transport: A transport to write to, for building trees on 
 
601
                          VFS's. If the transport is readonly or None,
 
602
                          "." is opened automatically.
 
603
        """
 
604
        # XXX: It's OK to just create them using forward slashes on windows?
 
605
        if transport is None or transport.is_readonly():
 
606
            transport = bzrlib.transport.get_transport(".")
 
607
        for name in shape:
 
608
            self.assert_(isinstance(name, basestring))
 
609
            if name[-1] == '/':
 
610
                transport.mkdir(urlescape(name[:-1]))
 
611
            else:
 
612
                if line_endings == 'binary':
 
613
                    end = '\n'
 
614
                elif line_endings == 'native':
 
615
                    end = os.linesep
 
616
                else:
 
617
                    raise BzrError('Invalid line ending request %r' % (line_endings,))
 
618
                content = "contents of %s%s" % (name, end)
 
619
                transport.put(urlescape(name), StringIO(content))
 
620
 
 
621
    def build_tree_contents(self, shape):
 
622
        build_tree_contents(shape)
 
623
 
 
624
    def failUnlessExists(self, path):
 
625
        """Fail unless path, which may be abs or relative, exists."""
 
626
        self.failUnless(osutils.lexists(path))
 
627
 
 
628
    def failIfExists(self, path):
 
629
        """Fail if path, which may be abs or relative, exists."""
 
630
        self.failIf(osutils.lexists(path))
 
631
        
 
632
    def assertFileEqual(self, content, path):
 
633
        """Fail if path does not contain 'content'."""
 
634
        self.failUnless(osutils.lexists(path))
 
635
        self.assertEqualDiff(content, open(path, 'r').read())
 
636
 
 
637
 
 
638
class TestCaseWithTransport(TestCaseInTempDir):
 
639
    """A test case that provides get_url and get_readonly_url facilities.
 
640
 
 
641
    These back onto two transport servers, one for readonly access and one for
 
642
    read write access.
 
643
 
 
644
    If no explicit class is provided for readonly access, a
 
645
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
646
    based read write transports.
 
647
 
 
648
    If an explicit class is provided for readonly access, that server and the 
 
649
    readwrite one must both define get_url() as resolving to os.getcwd().
 
650
    """
 
651
 
 
652
    def __init__(self, methodName='testMethod'):
 
653
        super(TestCaseWithTransport, self).__init__(methodName)
 
654
        self.__readonly_server = None
 
655
        self.__server = None
 
656
        self.transport_server = default_transport
 
657
        self.transport_readonly_server = None
 
658
 
 
659
    def get_readonly_url(self, relpath=None):
 
660
        """Get a URL for the readonly transport.
 
661
 
 
662
        This will either be backed by '.' or a decorator to the transport 
 
663
        used by self.get_url()
 
664
        relpath provides for clients to get a path relative to the base url.
 
665
        These should only be downwards relative, not upwards.
 
666
        """
 
667
        if self.__readonly_server is None:
 
668
            if self.transport_readonly_server is None:
 
669
                # readonly decorator requested
 
670
                # bring up the server
 
671
                self.get_url()
 
672
                self.__readonly_server = ReadonlyServer()
 
673
                self.__readonly_server.setUp(self.__server)
 
674
            else:
 
675
                self.__readonly_server = self.transport_readonly_server()
 
676
                self.__readonly_server.setUp()
 
677
            self.addCleanup(self.__readonly_server.tearDown)
 
678
        base = self.__readonly_server.get_url()
 
679
        if relpath is not None:
 
680
            if not base.endswith('/'):
 
681
                base = base + '/'
 
682
            base = base + relpath
 
683
        return base
 
684
 
 
685
    def get_url(self, relpath=None):
 
686
        """Get a URL for the readwrite transport.
 
687
 
 
688
        This will either be backed by '.' or to an equivalent non-file based
 
689
        facility.
 
690
        relpath provides for clients to get a path relative to the base url.
 
691
        These should only be downwards relative, not upwards.
 
692
        """
 
693
        if self.__server is None:
 
694
            self.__server = self.transport_server()
 
695
            self.__server.setUp()
 
696
            self.addCleanup(self.__server.tearDown)
 
697
        base = self.__server.get_url()
 
698
        if relpath is not None and relpath != '.':
 
699
            if not base.endswith('/'):
 
700
                base = base + '/'
 
701
            base = base + relpath
 
702
        return base
 
703
 
 
704
    def make_branch(self, relpath):
 
705
        """Create a branch on the transport at relpath."""
 
706
        try:
 
707
            url = self.get_url(relpath)
 
708
            segments = relpath.split('/')
 
709
            if segments and segments[-1] not in ('', '.'):
 
710
                parent = self.get_url('/'.join(segments[:-1]))
 
711
                t = bzrlib.transport.get_transport(parent)
 
712
                try:
 
713
                    t.mkdir(segments[-1])
 
714
                except FileExists:
 
715
                    pass
 
716
            return bzrlib.branch.Branch.create(url)
 
717
        except UninitializableFormat:
 
718
            raise TestSkipped("Format %s is not initializable.")
 
719
 
 
720
    def make_branch_and_tree(self, relpath):
 
721
        """Create a branch on the transport and a tree locally.
 
722
 
 
723
        Returns the tree.
 
724
        """
 
725
        b = self.make_branch(relpath)
 
726
        return WorkingTree.create(b, relpath)
 
727
 
 
728
 
 
729
class ChrootedTestCase(TestCaseWithTransport):
 
730
    """A support class that provides readonly urls outside the local namespace.
 
731
 
 
732
    This is done by checking if self.transport_server is a MemoryServer. if it
 
733
    is then we are chrooted already, if it is not then an HttpServer is used
 
734
    for readonly urls.
 
735
 
 
736
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
737
                       be used without needed to redo it when a different 
 
738
                       subclass is in use ?
 
739
    """
 
740
 
 
741
    def setUp(self):
 
742
        super(ChrootedTestCase, self).setUp()
 
743
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
744
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
745
 
 
746
 
 
747
def filter_suite_by_re(suite, pattern):
 
748
    result = TestSuite()
 
749
    filter_re = re.compile(pattern)
 
750
    for test in iter_suite_tests(suite):
 
751
        if filter_re.search(test.id()):
 
752
            result.addTest(test)
 
753
    return result
 
754
 
 
755
 
 
756
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
757
              stop_on_failure=False, keep_output=False,
 
758
              transport=None):
 
759
    TestCaseInTempDir._TEST_NAME = name
 
760
    if verbose:
 
761
        verbosity = 2
 
762
    else:
 
763
        verbosity = 1
 
764
    runner = TextTestRunner(stream=sys.stdout,
 
765
                            descriptions=0,
 
766
                            verbosity=verbosity)
 
767
    runner.stop_on_failure=stop_on_failure
 
768
    if pattern != '.*':
 
769
        suite = filter_suite_by_re(suite, pattern)
 
770
    result = runner.run(suite)
 
771
    # This is still a little bogus, 
 
772
    # but only a little. Folk not using our testrunner will
 
773
    # have to delete their temp directories themselves.
 
774
    if result.wasSuccessful() or not keep_output:
 
775
        if TestCaseInTempDir.TEST_ROOT is not None:
 
776
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
777
    else:
 
778
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
 
779
    return result.wasSuccessful()
 
780
 
 
781
 
 
782
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
783
             keep_output=False,
 
784
             transport=None):
 
785
    """Run the whole test suite under the enhanced runner"""
 
786
    global default_transport
 
787
    if transport is None:
 
788
        transport = default_transport
 
789
    old_transport = default_transport
 
790
    default_transport = transport
 
791
    suite = test_suite()
 
792
    try:
 
793
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
794
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
795
                     transport=transport)
 
796
    finally:
 
797
        default_transport = old_transport
 
798
 
 
799
 
 
800
 
 
801
def test_suite():
 
802
    """Build and return TestSuite for the whole program."""
 
803
    from doctest import DocTestSuite
 
804
 
 
805
    global MODULES_TO_DOCTEST
 
806
 
 
807
    testmod_names = [ \
 
808
                   'bzrlib.tests.test_ancestry',
 
809
                   'bzrlib.tests.test_annotate',
 
810
                   'bzrlib.tests.test_api',
 
811
                   'bzrlib.tests.test_bad_files',
 
812
                   'bzrlib.tests.test_basis_inventory',
 
813
                   'bzrlib.tests.test_branch',
 
814
                   'bzrlib.tests.test_command',
 
815
                   'bzrlib.tests.test_commit',
 
816
                   'bzrlib.tests.test_commit_merge',
 
817
                   'bzrlib.tests.test_config',
 
818
                   'bzrlib.tests.test_conflicts',
 
819
                   'bzrlib.tests.test_decorators',
 
820
                   'bzrlib.tests.test_diff',
 
821
                   'bzrlib.tests.test_doc_generate',
 
822
                   'bzrlib.tests.test_fetch',
 
823
                   'bzrlib.tests.test_fileid_involved',
 
824
                   'bzrlib.tests.test_gpg',
 
825
                   'bzrlib.tests.test_graph',
 
826
                   'bzrlib.tests.test_hashcache',
 
827
                   'bzrlib.tests.test_http',
 
828
                   'bzrlib.tests.test_identitymap',
 
829
                   'bzrlib.tests.test_inv',
 
830
                   'bzrlib.tests.test_lockable_files',
 
831
                   'bzrlib.tests.test_log',
 
832
                   'bzrlib.tests.test_merge',
 
833
                   'bzrlib.tests.test_merge3',
 
834
                   'bzrlib.tests.test_merge_core',
 
835
                   'bzrlib.tests.test_missing',
 
836
                   'bzrlib.tests.test_msgeditor',
 
837
                   'bzrlib.tests.test_nonascii',
 
838
                   'bzrlib.tests.test_options',
 
839
                   'bzrlib.tests.test_osutils',
 
840
                   'bzrlib.tests.test_parent',
 
841
                   'bzrlib.tests.test_permissions',
 
842
                   'bzrlib.tests.test_plugins',
 
843
                   'bzrlib.tests.test_revision',
 
844
                   'bzrlib.tests.test_revisionnamespaces',
 
845
                   'bzrlib.tests.test_revprops',
 
846
                   'bzrlib.tests.test_reweave',
 
847
                   'bzrlib.tests.test_rio',
 
848
                   'bzrlib.tests.test_sampler',
 
849
                   'bzrlib.tests.test_selftest',
 
850
                   'bzrlib.tests.test_setup',
 
851
                   'bzrlib.tests.test_sftp_transport',
 
852
                   'bzrlib.tests.test_smart_add',
 
853
                   'bzrlib.tests.test_source',
 
854
                   'bzrlib.tests.test_store',
 
855
                   'bzrlib.tests.test_symbol_versioning',
 
856
                   'bzrlib.tests.test_testament',
 
857
                   'bzrlib.tests.test_trace',
 
858
                   'bzrlib.tests.test_transactions',
 
859
                   'bzrlib.tests.test_transport',
 
860
                   'bzrlib.tests.test_tsort',
 
861
                   'bzrlib.tests.test_ui',
 
862
                   'bzrlib.tests.test_uncommit',
 
863
                   'bzrlib.tests.test_upgrade',
 
864
                   'bzrlib.tests.test_weave',
 
865
                   'bzrlib.tests.test_whitebox',
 
866
                   'bzrlib.tests.test_workingtree',
 
867
                   'bzrlib.tests.test_xml',
 
868
                   ]
 
869
    test_transport_implementations = [
 
870
        'bzrlib.tests.test_transport_implementations']
 
871
 
 
872
    TestCase.BZRPATH = osutils.pathjoin(
 
873
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
 
874
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
 
875
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
 
876
    print
 
877
    suite = TestSuite()
 
878
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
 
879
    # errors if it fails to load a named module - no indication of what's
 
880
    # actually wrong, just "no such module".  We should probably override that
 
881
    # class, but for the moment just load them ourselves. (mbp 20051202)
 
882
    loader = TestLoader()
 
883
    from bzrlib.transport import TransportTestProviderAdapter
 
884
    adapter = TransportTestProviderAdapter()
 
885
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
886
    for mod_name in testmod_names:
 
887
        mod = _load_module_by_name(mod_name)
 
888
        suite.addTest(loader.loadTestsFromModule(mod))
 
889
    for package in packages_to_test():
 
890
        suite.addTest(package.test_suite())
 
891
    for m in MODULES_TO_TEST:
 
892
        suite.addTest(loader.loadTestsFromModule(m))
 
893
    for m in (MODULES_TO_DOCTEST):
 
894
        suite.addTest(DocTestSuite(m))
 
895
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
896
        if getattr(plugin, 'test_suite', None) is not None:
 
897
            suite.addTest(plugin.test_suite())
 
898
    return suite
 
899
 
 
900
 
 
901
def adapt_modules(mods_list, adapter, loader, suite):
 
902
    """Adapt the modules in mods_list using adapter and add to suite."""
 
903
    for mod_name in mods_list:
 
904
        mod = _load_module_by_name(mod_name)
 
905
        for test in iter_suite_tests(loader.loadTestsFromModule(mod)):
 
906
            suite.addTests(adapter.adapt(test))
 
907
 
 
908
 
 
909
def _load_module_by_name(mod_name):
 
910
    parts = mod_name.split('.')
 
911
    module = __import__(mod_name)
 
912
    del parts[0]
 
913
    # for historical reasons python returns the top-level module even though
 
914
    # it loads the submodule; we need to walk down to get the one we want.
 
915
    while parts:
 
916
        module = getattr(module, parts.pop(0))
 
917
    return module