~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/__init__.py

  • Committer: Martin Pool
  • Date: 2005-07-22 22:37:53 UTC
  • Revision ID: mbp@sourcefrog.net-20050722223753-7dced4e32d3ce21d
- add the start of a test for inventory file-id matching

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005 by Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
23
 
 
24
 
# NOTE: Some classes in here use camelCaseNaming() rather than
25
 
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
 
# new assertFoo() methods.
28
 
 
29
 
import codecs
30
 
from cStringIO import StringIO
31
 
import difflib
32
 
import doctest
33
 
import errno
34
 
import logging
35
 
import os
36
 
import re
37
 
import shlex
38
 
import stat
39
 
from subprocess import Popen, PIPE
40
 
import sys
41
 
import tempfile
42
 
import unittest
43
 
import time
44
 
 
45
 
 
46
 
import bzrlib.branch
47
 
import bzrlib.bzrdir as bzrdir
48
 
import bzrlib.commands
49
 
import bzrlib.bundle.serializer
50
 
import bzrlib.errors as errors
51
 
import bzrlib.inventory
52
 
import bzrlib.iterablefile
53
 
import bzrlib.lockdir
54
 
try:
55
 
    import bzrlib.lsprof
56
 
except ImportError:
57
 
    # lsprof not available
58
 
    pass
59
 
from bzrlib.merge import merge_inner
60
 
import bzrlib.merge3
61
 
import bzrlib.osutils
62
 
import bzrlib.osutils as osutils
63
 
import bzrlib.plugin
64
 
import bzrlib.progress as progress
65
 
from bzrlib.revision import common_ancestor
66
 
import bzrlib.store
67
 
import bzrlib.trace
68
 
from bzrlib.transport import get_transport
69
 
import bzrlib.transport
70
 
from bzrlib.transport.local import LocalRelpathServer
71
 
from bzrlib.transport.readonly import ReadonlyServer
72
 
from bzrlib.trace import mutter
73
 
from bzrlib.tests import TestUtil
74
 
from bzrlib.tests.TestUtil import (
75
 
                          TestSuite,
76
 
                          TestLoader,
77
 
                          )
78
 
from bzrlib.tests.treeshape import build_tree_contents
79
 
import bzrlib.urlutils as urlutils
80
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
81
 
 
82
 
default_transport = LocalRelpathServer
 
18
from testsweet import TestBase, run_suite, InTempDir
83
19
 
84
20
MODULES_TO_TEST = []
85
 
MODULES_TO_DOCTEST = [
86
 
                      bzrlib.branch,
87
 
                      bzrlib.bundle.serializer,
88
 
                      bzrlib.commands,
89
 
                      bzrlib.errors,
90
 
                      bzrlib.inventory,
91
 
                      bzrlib.iterablefile,
92
 
                      bzrlib.lockdir,
93
 
                      bzrlib.merge3,
94
 
                      bzrlib.option,
95
 
                      bzrlib.osutils,
96
 
                      bzrlib.store
97
 
                      ]
98
 
 
99
 
 
100
 
def packages_to_test():
101
 
    """Return a list of packages to test.
102
 
 
103
 
    The packages are not globally imported so that import failures are
104
 
    triggered when running selftest, not when importing the command.
105
 
    """
106
 
    import bzrlib.doc
107
 
    import bzrlib.tests.blackbox
108
 
    import bzrlib.tests.branch_implementations
109
 
    import bzrlib.tests.bzrdir_implementations
110
 
    import bzrlib.tests.interrepository_implementations
111
 
    import bzrlib.tests.interversionedfile_implementations
112
 
    import bzrlib.tests.repository_implementations
113
 
    import bzrlib.tests.revisionstore_implementations
114
 
    import bzrlib.tests.tree_implementations
115
 
    import bzrlib.tests.workingtree_implementations
116
 
    return [
117
 
            bzrlib.doc,
118
 
            bzrlib.tests.blackbox,
119
 
            bzrlib.tests.branch_implementations,
120
 
            bzrlib.tests.bzrdir_implementations,
121
 
            bzrlib.tests.interrepository_implementations,
122
 
            bzrlib.tests.interversionedfile_implementations,
123
 
            bzrlib.tests.repository_implementations,
124
 
            bzrlib.tests.revisionstore_implementations,
125
 
            bzrlib.tests.tree_implementations,
126
 
            bzrlib.tests.workingtree_implementations,
127
 
            ]
128
 
 
129
 
 
130
 
class _MyResult(unittest._TextTestResult):
131
 
    """Custom TestResult.
132
 
 
133
 
    Shows output in a different format, including displaying runtime for tests.
134
 
    """
135
 
    stop_early = False
136
 
    
137
 
    def __init__(self, stream, descriptions, verbosity, pb=None):
138
 
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
139
 
        self.pb = pb
140
 
    
141
 
    def extractBenchmarkTime(self, testCase):
142
 
        """Add a benchmark time for the current test case."""
143
 
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
144
 
    
145
 
    def _elapsedTestTimeString(self):
146
 
        """Return a time string for the overall time the current test has taken."""
147
 
        return self._formatTime(time.time() - self._start_time)
148
 
 
149
 
    def _testTimeString(self):
150
 
        if self._benchmarkTime is not None:
151
 
            return "%s/%s" % (
152
 
                self._formatTime(self._benchmarkTime),
153
 
                self._elapsedTestTimeString())
154
 
        else:
155
 
            return "      %s" % self._elapsedTestTimeString()
156
 
 
157
 
    def _formatTime(self, seconds):
158
 
        """Format seconds as milliseconds with leading spaces."""
159
 
        return "%5dms" % (1000 * seconds)
160
 
 
161
 
    def _ellipsise_unimportant_words(self, a_string, final_width,
162
 
                                   keep_start=False):
163
 
        """Add ellipses (sp?) for overly long strings.
164
 
        
165
 
        :param keep_start: If true preserve the start of a_string rather
166
 
                           than the end of it.
167
 
        """
168
 
        if keep_start:
169
 
            if len(a_string) > final_width:
170
 
                result = a_string[:final_width-3] + '...'
171
 
            else:
172
 
                result = a_string
173
 
        else:
174
 
            if len(a_string) > final_width:
175
 
                result = '...' + a_string[3-final_width:]
176
 
            else:
177
 
                result = a_string
178
 
        return result.ljust(final_width)
179
 
 
180
 
    def startTest(self, test):
181
 
        unittest.TestResult.startTest(self, test)
182
 
        # In a short description, the important words are in
183
 
        # the beginning, but in an id, the important words are
184
 
        # at the end
185
 
        SHOW_DESCRIPTIONS = False
186
 
 
187
 
        if not self.showAll and self.dots and self.pb is not None:
188
 
            final_width = 13
189
 
        else:
190
 
            final_width = osutils.terminal_width()
191
 
            final_width = final_width - 15 - 8
192
 
        what = None
193
 
        if SHOW_DESCRIPTIONS:
194
 
            what = test.shortDescription()
195
 
            if what:
196
 
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
197
 
        if what is None:
198
 
            what = test.id()
199
 
            if what.startswith('bzrlib.tests.'):
200
 
                what = what[13:]
201
 
            what = self._ellipsise_unimportant_words(what, final_width)
202
 
        if self.showAll:
203
 
            self.stream.write(what)
204
 
        elif self.dots and self.pb is not None:
205
 
            self.pb.update(what, self.testsRun - 1, None)
206
 
        self.stream.flush()
207
 
        self._recordTestStartTime()
208
 
 
209
 
    def _recordTestStartTime(self):
210
 
        """Record that a test has started."""
211
 
        self._start_time = time.time()
212
 
 
213
 
    def addError(self, test, err):
214
 
        if isinstance(err[1], TestSkipped):
215
 
            return self.addSkipped(test, err)    
216
 
        unittest.TestResult.addError(self, test, err)
217
 
        self.extractBenchmarkTime(test)
218
 
        if self.showAll:
219
 
            self.stream.writeln("ERROR %s" % self._testTimeString())
220
 
        elif self.dots and self.pb is None:
221
 
            self.stream.write('E')
222
 
        elif self.dots:
223
 
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
224
 
            self.pb.note(self._ellipsise_unimportant_words(
225
 
                            test.id() + ': ERROR',
226
 
                            osutils.terminal_width()))
227
 
        self.stream.flush()
228
 
        if self.stop_early:
229
 
            self.stop()
230
 
 
231
 
    def addFailure(self, test, err):
232
 
        unittest.TestResult.addFailure(self, test, err)
233
 
        self.extractBenchmarkTime(test)
234
 
        if self.showAll:
235
 
            self.stream.writeln(" FAIL %s" % self._testTimeString())
236
 
        elif self.dots and self.pb is None:
237
 
            self.stream.write('F')
238
 
        elif self.dots:
239
 
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
240
 
            self.pb.note(self._ellipsise_unimportant_words(
241
 
                            test.id() + ': FAIL',
242
 
                            osutils.terminal_width()))
243
 
        self.stream.flush()
244
 
        if self.stop_early:
245
 
            self.stop()
246
 
 
247
 
    def addSuccess(self, test):
248
 
        self.extractBenchmarkTime(test)
249
 
        if self.showAll:
250
 
            self.stream.writeln('   OK %s' % self._testTimeString())
251
 
            for bench_called, stats in getattr(test, '_benchcalls', []):
252
 
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
253
 
                stats.pprint(file=self.stream)
254
 
        elif self.dots and self.pb is None:
255
 
            self.stream.write('~')
256
 
        elif self.dots:
257
 
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
258
 
        self.stream.flush()
259
 
        unittest.TestResult.addSuccess(self, test)
260
 
 
261
 
    def addSkipped(self, test, skip_excinfo):
262
 
        self.extractBenchmarkTime(test)
263
 
        if self.showAll:
264
 
            print >>self.stream, ' SKIP %s' % self._testTimeString()
265
 
            print >>self.stream, '     %s' % skip_excinfo[1]
266
 
        elif self.dots and self.pb is None:
267
 
            self.stream.write('S')
268
 
        elif self.dots:
269
 
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
270
 
        self.stream.flush()
271
 
        # seems best to treat this as success from point-of-view of unittest
272
 
        # -- it actually does nothing so it barely matters :)
273
 
        try:
274
 
            test.tearDown()
275
 
        except KeyboardInterrupt:
276
 
            raise
277
 
        except:
278
 
            self.addError(test, test.__exc_info())
279
 
        else:
280
 
            unittest.TestResult.addSuccess(self, test)
281
 
 
282
 
    def printErrorList(self, flavour, errors):
283
 
        for test, err in errors:
284
 
            self.stream.writeln(self.separator1)
285
 
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
286
 
            if getattr(test, '_get_log', None) is not None:
287
 
                print >>self.stream
288
 
                print >>self.stream, \
289
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
290
 
                print >>self.stream, test._get_log()
291
 
                print >>self.stream, \
292
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
293
 
            self.stream.writeln(self.separator2)
294
 
            self.stream.writeln("%s" % err)
295
 
 
296
 
 
297
 
class TextTestRunner(object):
298
 
    stop_on_failure = False
299
 
 
300
 
    def __init__(self,
301
 
                 stream=sys.stderr,
302
 
                 descriptions=0,
303
 
                 verbosity=1,
304
 
                 keep_output=False,
305
 
                 pb=None):
306
 
        self.stream = unittest._WritelnDecorator(stream)
307
 
        self.descriptions = descriptions
308
 
        self.verbosity = verbosity
309
 
        self.keep_output = keep_output
310
 
        self.pb = pb
311
 
 
312
 
    def _makeResult(self):
313
 
        result = _MyResult(self.stream,
314
 
                           self.descriptions,
315
 
                           self.verbosity,
316
 
                           pb=self.pb)
317
 
        result.stop_early = self.stop_on_failure
318
 
        return result
319
 
 
320
 
    def run(self, test):
321
 
        "Run the given test case or test suite."
322
 
        result = self._makeResult()
323
 
        startTime = time.time()
324
 
        if self.pb is not None:
325
 
            self.pb.update('Running tests', 0, test.countTestCases())
326
 
        test.run(result)
327
 
        stopTime = time.time()
328
 
        timeTaken = stopTime - startTime
329
 
        result.printErrors()
330
 
        self.stream.writeln(result.separator2)
331
 
        run = result.testsRun
332
 
        self.stream.writeln("Ran %d test%s in %.3fs" %
333
 
                            (run, run != 1 and "s" or "", timeTaken))
334
 
        self.stream.writeln()
335
 
        if not result.wasSuccessful():
336
 
            self.stream.write("FAILED (")
337
 
            failed, errored = map(len, (result.failures, result.errors))
338
 
            if failed:
339
 
                self.stream.write("failures=%d" % failed)
340
 
            if errored:
341
 
                if failed: self.stream.write(", ")
342
 
                self.stream.write("errors=%d" % errored)
343
 
            self.stream.writeln(")")
344
 
        else:
345
 
            self.stream.writeln("OK")
346
 
        if self.pb is not None:
347
 
            self.pb.update('Cleaning up', 0, 1)
348
 
        # This is still a little bogus, 
349
 
        # but only a little. Folk not using our testrunner will
350
 
        # have to delete their temp directories themselves.
351
 
        test_root = TestCaseInTempDir.TEST_ROOT
352
 
        if result.wasSuccessful() or not self.keep_output:
353
 
            if test_root is not None:
354
 
                # If LANG=C we probably have created some bogus paths
355
 
                # which rmtree(unicode) will fail to delete
356
 
                # so make sure we are using rmtree(str) to delete everything
357
 
                # except on win32, where rmtree(str) will fail
358
 
                # since it doesn't have the property of byte-stream paths
359
 
                # (they are either ascii or mbcs)
360
 
                if sys.platform == 'win32':
361
 
                    # make sure we are using the unicode win32 api
362
 
                    test_root = unicode(test_root)
363
 
                else:
364
 
                    test_root = test_root.encode(
365
 
                        sys.getfilesystemencoding())
366
 
                osutils.rmtree(test_root)
367
 
        else:
368
 
            if self.pb is not None:
369
 
                self.pb.note("Failed tests working directories are in '%s'\n",
370
 
                             test_root)
371
 
            else:
372
 
                self.stream.writeln(
373
 
                    "Failed tests working directories are in '%s'\n" %
374
 
                    test_root)
375
 
        TestCaseInTempDir.TEST_ROOT = None
376
 
        if self.pb is not None:
377
 
            self.pb.clear()
378
 
        return result
379
 
 
380
 
 
381
 
def iter_suite_tests(suite):
382
 
    """Return all tests in a suite, recursing through nested suites"""
383
 
    for item in suite._tests:
384
 
        if isinstance(item, unittest.TestCase):
385
 
            yield item
386
 
        elif isinstance(item, unittest.TestSuite):
387
 
            for r in iter_suite_tests(item):
388
 
                yield r
389
 
        else:
390
 
            raise Exception('unknown object %r inside test suite %r'
391
 
                            % (item, suite))
392
 
 
393
 
 
394
 
class TestSkipped(Exception):
395
 
    """Indicates that a test was intentionally skipped, rather than failing."""
396
 
    # XXX: Not used yet
397
 
 
398
 
 
399
 
class CommandFailed(Exception):
400
 
    pass
401
 
 
402
 
 
403
 
class StringIOWrapper(object):
404
 
    """A wrapper around cStringIO which just adds an encoding attribute.
405
 
    
406
 
    Internally we can check sys.stdout to see what the output encoding
407
 
    should be. However, cStringIO has no encoding attribute that we can
408
 
    set. So we wrap it instead.
409
 
    """
410
 
    encoding='ascii'
411
 
    _cstring = None
412
 
 
413
 
    def __init__(self, s=None):
414
 
        if s is not None:
415
 
            self.__dict__['_cstring'] = StringIO(s)
416
 
        else:
417
 
            self.__dict__['_cstring'] = StringIO()
418
 
 
419
 
    def __getattr__(self, name, getattr=getattr):
420
 
        return getattr(self.__dict__['_cstring'], name)
421
 
 
422
 
    def __setattr__(self, name, val):
423
 
        if name == 'encoding':
424
 
            self.__dict__['encoding'] = val
425
 
        else:
426
 
            return setattr(self._cstring, name, val)
427
 
 
428
 
 
429
 
class TestCase(unittest.TestCase):
430
 
    """Base class for bzr unit tests.
431
 
    
432
 
    Tests that need access to disk resources should subclass 
433
 
    TestCaseInTempDir not TestCase.
434
 
 
435
 
    Error and debug log messages are redirected from their usual
436
 
    location into a temporary file, the contents of which can be
437
 
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
438
 
    so that it can also capture file IO.  When the test completes this file
439
 
    is read into memory and removed from disk.
440
 
       
441
 
    There are also convenience functions to invoke bzr's command-line
442
 
    routine, and to build and check bzr trees.
443
 
   
444
 
    In addition to the usual method of overriding tearDown(), this class also
445
 
    allows subclasses to register functions into the _cleanups list, which is
446
 
    run in order as the object is torn down.  It's less likely this will be
447
 
    accidentally overlooked.
448
 
    """
449
 
 
450
 
    _log_file_name = None
451
 
    _log_contents = ''
452
 
    # record lsprof data when performing benchmark calls.
453
 
    _gather_lsprof_in_benchmarks = False
454
 
 
455
 
    def __init__(self, methodName='testMethod'):
456
 
        super(TestCase, self).__init__(methodName)
457
 
        self._cleanups = []
458
 
 
459
 
    def setUp(self):
460
 
        unittest.TestCase.setUp(self)
461
 
        self._cleanEnvironment()
462
 
        bzrlib.trace.disable_default_logging()
463
 
        self._startLogFile()
464
 
        self._benchcalls = []
465
 
        self._benchtime = None
466
 
 
467
 
    def _ndiff_strings(self, a, b):
468
 
        """Return ndiff between two strings containing lines.
469
 
        
470
 
        A trailing newline is added if missing to make the strings
471
 
        print properly."""
472
 
        if b and b[-1] != '\n':
473
 
            b += '\n'
474
 
        if a and a[-1] != '\n':
475
 
            a += '\n'
476
 
        difflines = difflib.ndiff(a.splitlines(True),
477
 
                                  b.splitlines(True),
478
 
                                  linejunk=lambda x: False,
479
 
                                  charjunk=lambda x: False)
480
 
        return ''.join(difflines)
481
 
 
482
 
    def assertEqualDiff(self, a, b, message=None):
483
 
        """Assert two texts are equal, if not raise an exception.
484
 
        
485
 
        This is intended for use with multi-line strings where it can 
486
 
        be hard to find the differences by eye.
487
 
        """
488
 
        # TODO: perhaps override assertEquals to call this for strings?
489
 
        if a == b:
490
 
            return
491
 
        if message is None:
492
 
            message = "texts not equal:\n"
493
 
        raise AssertionError(message + 
494
 
                             self._ndiff_strings(a, b))      
495
 
        
496
 
    def assertEqualMode(self, mode, mode_test):
497
 
        self.assertEqual(mode, mode_test,
498
 
                         'mode mismatch %o != %o' % (mode, mode_test))
499
 
 
500
 
    def assertStartsWith(self, s, prefix):
501
 
        if not s.startswith(prefix):
502
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
503
 
 
504
 
    def assertEndsWith(self, s, suffix):
505
 
        """Asserts that s ends with suffix."""
506
 
        if not s.endswith(suffix):
507
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
508
 
 
509
 
    def assertContainsRe(self, haystack, needle_re):
510
 
        """Assert that a contains something matching a regular expression."""
511
 
        if not re.search(needle_re, haystack):
512
 
            raise AssertionError('pattern "%s" not found in "%s"'
513
 
                    % (needle_re, haystack))
514
 
 
515
 
    def assertNotContainsRe(self, haystack, needle_re):
516
 
        """Assert that a does not match a regular expression"""
517
 
        if re.search(needle_re, haystack):
518
 
            raise AssertionError('pattern "%s" found in "%s"'
519
 
                    % (needle_re, haystack))
520
 
 
521
 
    def assertSubset(self, sublist, superlist):
522
 
        """Assert that every entry in sublist is present in superlist."""
523
 
        missing = []
524
 
        for entry in sublist:
525
 
            if entry not in superlist:
526
 
                missing.append(entry)
527
 
        if len(missing) > 0:
528
 
            raise AssertionError("value(s) %r not present in container %r" % 
529
 
                                 (missing, superlist))
530
 
 
531
 
    def assertIs(self, left, right):
532
 
        if not (left is right):
533
 
            raise AssertionError("%r is not %r." % (left, right))
534
 
 
535
 
    def assertTransportMode(self, transport, path, mode):
536
 
        """Fail if a path does not have mode mode.
537
 
        
538
 
        If modes are not supported on this transport, the assertion is ignored.
539
 
        """
540
 
        if not transport._can_roundtrip_unix_modebits():
541
 
            return
542
 
        path_stat = transport.stat(path)
543
 
        actual_mode = stat.S_IMODE(path_stat.st_mode)
544
 
        self.assertEqual(mode, actual_mode,
545
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
546
 
 
547
 
    def assertIsInstance(self, obj, kls):
548
 
        """Fail if obj is not an instance of kls"""
549
 
        if not isinstance(obj, kls):
550
 
            self.fail("%r is an instance of %s rather than %s" % (
551
 
                obj, obj.__class__, kls))
552
 
 
553
 
    def _startLogFile(self):
554
 
        """Send bzr and test log messages to a temporary file.
555
 
 
556
 
        The file is removed as the test is torn down.
557
 
        """
558
 
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
559
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
560
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
561
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
562
 
        self._log_file_name = name
563
 
        self.addCleanup(self._finishLogFile)
564
 
 
565
 
    def _finishLogFile(self):
566
 
        """Finished with the log file.
567
 
 
568
 
        Read contents into memory, close, and delete.
569
 
        """
570
 
        if self._log_file is None:
571
 
            return
572
 
        bzrlib.trace.disable_test_log(self._log_nonce)
573
 
        self._log_file.seek(0)
574
 
        self._log_contents = self._log_file.read()
575
 
        self._log_file.close()
576
 
        os.remove(self._log_file_name)
577
 
        self._log_file = self._log_file_name = None
578
 
 
579
 
    def addCleanup(self, callable):
580
 
        """Arrange to run a callable when this case is torn down.
581
 
 
582
 
        Callables are run in the reverse of the order they are registered, 
583
 
        ie last-in first-out.
584
 
        """
585
 
        if callable in self._cleanups:
586
 
            raise ValueError("cleanup function %r already registered on %s" 
587
 
                    % (callable, self))
588
 
        self._cleanups.append(callable)
589
 
 
590
 
    def _cleanEnvironment(self):
591
 
        new_env = {
592
 
            'HOME': os.getcwd(),
593
 
            'APPDATA': os.getcwd(),
594
 
            'BZREMAIL': None,
595
 
            'EMAIL': None,
596
 
        }
597
 
        self.__old_env = {}
598
 
        self.addCleanup(self._restoreEnvironment)
599
 
        for name, value in new_env.iteritems():
600
 
            self._captureVar(name, value)
601
 
 
602
 
 
603
 
    def _captureVar(self, name, newvalue):
604
 
        """Set an environment variable, preparing it to be reset when finished."""
605
 
        self.__old_env[name] = os.environ.get(name, None)
606
 
        if newvalue is None:
607
 
            if name in os.environ:
608
 
                del os.environ[name]
609
 
        else:
610
 
            os.environ[name] = newvalue
611
 
 
612
 
    @staticmethod
613
 
    def _restoreVar(name, value):
614
 
        if value is None:
615
 
            if name in os.environ:
616
 
                del os.environ[name]
617
 
        else:
618
 
            os.environ[name] = value
619
 
 
620
 
    def _restoreEnvironment(self):
621
 
        for name, value in self.__old_env.iteritems():
622
 
            self._restoreVar(name, value)
623
 
 
624
 
    def tearDown(self):
625
 
        self._runCleanups()
626
 
        unittest.TestCase.tearDown(self)
627
 
 
628
 
    def time(self, callable, *args, **kwargs):
629
 
        """Run callable and accrue the time it takes to the benchmark time.
630
 
        
631
 
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
632
 
        this will cause lsprofile statistics to be gathered and stored in
633
 
        self._benchcalls.
634
 
        """
635
 
        if self._benchtime is None:
636
 
            self._benchtime = 0
637
 
        start = time.time()
638
 
        try:
639
 
            if not self._gather_lsprof_in_benchmarks:
640
 
                return callable(*args, **kwargs)
641
 
            else:
642
 
                # record this benchmark
643
 
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
644
 
                stats.sort()
645
 
                self._benchcalls.append(((callable, args, kwargs), stats))
646
 
                return ret
647
 
        finally:
648
 
            self._benchtime += time.time() - start
649
 
 
650
 
    def _runCleanups(self):
651
 
        """Run registered cleanup functions. 
652
 
 
653
 
        This should only be called from TestCase.tearDown.
654
 
        """
655
 
        # TODO: Perhaps this should keep running cleanups even if 
656
 
        # one of them fails?
657
 
        for cleanup_fn in reversed(self._cleanups):
658
 
            cleanup_fn()
659
 
 
660
 
    def log(self, *args):
661
 
        mutter(*args)
662
 
 
663
 
    def _get_log(self):
664
 
        """Return as a string the log for this test"""
665
 
        if self._log_file_name:
666
 
            return open(self._log_file_name).read()
667
 
        else:
668
 
            return self._log_contents
669
 
        # TODO: Delete the log after it's been read in
670
 
 
671
 
    def capture(self, cmd, retcode=0):
672
 
        """Shortcut that splits cmd into words, runs, and returns stdout"""
673
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
674
 
 
675
 
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
676
 
        """Invoke bzr and return (stdout, stderr).
677
 
 
678
 
        Useful for code that wants to check the contents of the
679
 
        output, the way error messages are presented, etc.
680
 
 
681
 
        This should be the main method for tests that want to exercise the
682
 
        overall behavior of the bzr application (rather than a unit test
683
 
        or a functional test of the library.)
684
 
 
685
 
        Much of the old code runs bzr by forking a new copy of Python, but
686
 
        that is slower, harder to debug, and generally not necessary.
687
 
 
688
 
        This runs bzr through the interface that catches and reports
689
 
        errors, and with logging set to something approximating the
690
 
        default, so that error reporting can be checked.
691
 
 
692
 
        :param argv: arguments to invoke bzr
693
 
        :param retcode: expected return code, or None for don't-care.
694
 
        :param encoding: encoding for sys.stdout and sys.stderr
695
 
        :param stdin: A string to be used as stdin for the command.
696
 
        """
697
 
        if encoding is None:
698
 
            encoding = bzrlib.user_encoding
699
 
        if stdin is not None:
700
 
            stdin = StringIO(stdin)
701
 
        stdout = StringIOWrapper()
702
 
        stderr = StringIOWrapper()
703
 
        stdout.encoding = encoding
704
 
        stderr.encoding = encoding
705
 
 
706
 
        self.log('run bzr: %r', argv)
707
 
        # FIXME: don't call into logging here
708
 
        handler = logging.StreamHandler(stderr)
709
 
        handler.setLevel(logging.INFO)
710
 
        logger = logging.getLogger('')
711
 
        logger.addHandler(handler)
712
 
        old_ui_factory = bzrlib.ui.ui_factory
713
 
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
714
 
            stdout=stdout,
715
 
            stderr=stderr)
716
 
        bzrlib.ui.ui_factory.stdin = stdin
717
 
        try:
718
 
            result = self.apply_redirected(stdin, stdout, stderr,
719
 
                                           bzrlib.commands.run_bzr_catch_errors,
720
 
                                           argv)
721
 
        finally:
722
 
            logger.removeHandler(handler)
723
 
            bzrlib.ui.ui_factory = old_ui_factory
724
 
 
725
 
        out = stdout.getvalue()
726
 
        err = stderr.getvalue()
727
 
        if out:
728
 
            self.log('output:\n%r', out)
729
 
        if err:
730
 
            self.log('errors:\n%r', err)
731
 
        if retcode is not None:
732
 
            self.assertEquals(retcode, result)
733
 
        return out, err
734
 
 
735
 
    def run_bzr(self, *args, **kwargs):
736
 
        """Invoke bzr, as if it were run from the command line.
737
 
 
738
 
        This should be the main method for tests that want to exercise the
739
 
        overall behavior of the bzr application (rather than a unit test
740
 
        or a functional test of the library.)
741
 
 
742
 
        This sends the stdout/stderr results into the test's log,
743
 
        where it may be useful for debugging.  See also run_captured.
744
 
 
745
 
        :param stdin: A string to be used as stdin for the command.
746
 
        """
747
 
        retcode = kwargs.pop('retcode', 0)
748
 
        encoding = kwargs.pop('encoding', None)
749
 
        stdin = kwargs.pop('stdin', None)
750
 
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
751
 
 
752
 
    def run_bzr_decode(self, *args, **kwargs):
753
 
        if kwargs.has_key('encoding'):
754
 
            encoding = kwargs['encoding']
755
 
        else:
756
 
            encoding = bzrlib.user_encoding
757
 
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
758
 
 
759
 
    def run_bzr_error(self, error_regexes, *args, **kwargs):
760
 
        """Run bzr, and check that stderr contains the supplied regexes
761
 
        
762
 
        :param error_regexes: Sequence of regular expressions which 
763
 
            must each be found in the error output. The relative ordering
764
 
            is not enforced.
765
 
        :param args: command-line arguments for bzr
766
 
        :param kwargs: Keyword arguments which are interpreted by run_bzr
767
 
            This function changes the default value of retcode to be 3,
768
 
            since in most cases this is run when you expect bzr to fail.
769
 
        :return: (out, err) The actual output of running the command (in case you
770
 
                 want to do more inspection)
771
 
 
772
 
        Examples of use:
773
 
            # Make sure that commit is failing because there is nothing to do
774
 
            self.run_bzr_error(['no changes to commit'],
775
 
                               'commit', '-m', 'my commit comment')
776
 
            # Make sure --strict is handling an unknown file, rather than
777
 
            # giving us the 'nothing to do' error
778
 
            self.build_tree(['unknown'])
779
 
            self.run_bzr_error(['Commit refused because there are unknown files'],
780
 
                               'commit', '--strict', '-m', 'my commit comment')
781
 
        """
782
 
        kwargs.setdefault('retcode', 3)
783
 
        out, err = self.run_bzr(*args, **kwargs)
784
 
        for regex in error_regexes:
785
 
            self.assertContainsRe(err, regex)
786
 
        return out, err
787
 
 
788
 
    def run_bzr_subprocess(self, *args, **kwargs):
789
 
        """Run bzr in a subprocess for testing.
790
 
 
791
 
        This starts a new Python interpreter and runs bzr in there. 
792
 
        This should only be used for tests that have a justifiable need for
793
 
        this isolation: e.g. they are testing startup time, or signal
794
 
        handling, or early startup code, etc.  Subprocess code can't be 
795
 
        profiled or debugged so easily.
796
 
 
797
 
        :param retcode: The status code that is expected.  Defaults to 0.  If
798
 
        None is supplied, the status code is not checked.
799
 
        """
800
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
801
 
        args = list(args)
802
 
        process = Popen([sys.executable, bzr_path]+args, stdout=PIPE, 
803
 
                         stderr=PIPE)
804
 
        out = process.stdout.read()
805
 
        err = process.stderr.read()
806
 
        retcode = process.wait()
807
 
        supplied_retcode = kwargs.get('retcode', 0)
808
 
        if supplied_retcode is not None:
809
 
            assert supplied_retcode == retcode
810
 
        return [out, err]
811
 
 
812
 
    def check_inventory_shape(self, inv, shape):
813
 
        """Compare an inventory to a list of expected names.
814
 
 
815
 
        Fail if they are not precisely equal.
816
 
        """
817
 
        extras = []
818
 
        shape = list(shape)             # copy
819
 
        for path, ie in inv.entries():
820
 
            name = path.replace('\\', '/')
821
 
            if ie.kind == 'dir':
822
 
                name = name + '/'
823
 
            if name in shape:
824
 
                shape.remove(name)
825
 
            else:
826
 
                extras.append(name)
827
 
        if shape:
828
 
            self.fail("expected paths not found in inventory: %r" % shape)
829
 
        if extras:
830
 
            self.fail("unexpected paths found in inventory: %r" % extras)
831
 
 
832
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
833
 
                         a_callable=None, *args, **kwargs):
834
 
        """Call callable with redirected std io pipes.
835
 
 
836
 
        Returns the return code."""
837
 
        if not callable(a_callable):
838
 
            raise ValueError("a_callable must be callable.")
839
 
        if stdin is None:
840
 
            stdin = StringIO("")
841
 
        if stdout is None:
842
 
            if getattr(self, "_log_file", None) is not None:
843
 
                stdout = self._log_file
844
 
            else:
845
 
                stdout = StringIO()
846
 
        if stderr is None:
847
 
            if getattr(self, "_log_file", None is not None):
848
 
                stderr = self._log_file
849
 
            else:
850
 
                stderr = StringIO()
851
 
        real_stdin = sys.stdin
852
 
        real_stdout = sys.stdout
853
 
        real_stderr = sys.stderr
854
 
        try:
855
 
            sys.stdout = stdout
856
 
            sys.stderr = stderr
857
 
            sys.stdin = stdin
858
 
            return a_callable(*args, **kwargs)
859
 
        finally:
860
 
            sys.stdout = real_stdout
861
 
            sys.stderr = real_stderr
862
 
            sys.stdin = real_stdin
863
 
 
864
 
    def merge(self, branch_from, wt_to):
865
 
        """A helper for tests to do a ui-less merge.
866
 
 
867
 
        This should move to the main library when someone has time to integrate
868
 
        it in.
869
 
        """
870
 
        # minimal ui-less merge.
871
 
        wt_to.branch.fetch(branch_from)
872
 
        base_rev = common_ancestor(branch_from.last_revision(),
873
 
                                   wt_to.branch.last_revision(),
874
 
                                   wt_to.branch.repository)
875
 
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
876
 
                    wt_to.branch.repository.revision_tree(base_rev),
877
 
                    this_tree=wt_to)
878
 
        wt_to.add_pending_merge(branch_from.last_revision())
879
 
 
880
 
 
881
 
BzrTestBase = TestCase
882
 
 
883
 
     
884
 
class TestCaseInTempDir(TestCase):
885
 
    """Derived class that runs a test within a temporary directory.
886
 
 
887
 
    This is useful for tests that need to create a branch, etc.
888
 
 
889
 
    The directory is created in a slightly complex way: for each
890
 
    Python invocation, a new temporary top-level directory is created.
891
 
    All test cases create their own directory within that.  If the
892
 
    tests complete successfully, the directory is removed.
893
 
 
894
 
    InTempDir is an old alias for FunctionalTestCase.
895
 
    """
896
 
 
897
 
    TEST_ROOT = None
898
 
    _TEST_NAME = 'test'
899
 
    OVERRIDE_PYTHON = 'python'
900
 
 
901
 
    def check_file_contents(self, filename, expect):
902
 
        self.log("check contents of file %s" % filename)
903
 
        contents = file(filename, 'r').read()
904
 
        if contents != expect:
905
 
            self.log("expected: %r" % expect)
906
 
            self.log("actually: %r" % contents)
907
 
            self.fail("contents of %s not as expected" % filename)
908
 
 
909
 
    def _make_test_root(self):
910
 
        if TestCaseInTempDir.TEST_ROOT is not None:
911
 
            return
912
 
        i = 0
913
 
        while True:
914
 
            root = u'test%04d.tmp' % i
915
 
            try:
916
 
                os.mkdir(root)
917
 
            except OSError, e:
918
 
                if e.errno == errno.EEXIST:
919
 
                    i += 1
920
 
                    continue
921
 
                else:
922
 
                    raise
923
 
            # successfully created
924
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
925
 
            break
926
 
        # make a fake bzr directory there to prevent any tests propagating
927
 
        # up onto the source directory's real branch
928
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
929
 
 
930
 
    def setUp(self):
931
 
        super(TestCaseInTempDir, self).setUp()
932
 
        self._make_test_root()
933
 
        _currentdir = os.getcwdu()
934
 
        # shorten the name, to avoid test failures due to path length
935
 
        short_id = self.id().replace('bzrlib.tests.', '') \
936
 
                   .replace('__main__.', '')[-100:]
937
 
        # it's possible the same test class is run several times for
938
 
        # parameterized tests, so make sure the names don't collide.  
939
 
        i = 0
940
 
        while True:
941
 
            if i > 0:
942
 
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
943
 
            else:
944
 
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
945
 
            if os.path.exists(candidate_dir):
946
 
                i = i + 1
947
 
                continue
948
 
            else:
949
 
                self.test_dir = candidate_dir
950
 
                os.mkdir(self.test_dir)
951
 
                os.chdir(self.test_dir)
952
 
                break
953
 
        os.environ['HOME'] = self.test_dir
954
 
        os.environ['APPDATA'] = self.test_dir
955
 
        def _leaveDirectory():
956
 
            os.chdir(_currentdir)
957
 
        self.addCleanup(_leaveDirectory)
958
 
        
959
 
    def build_tree(self, shape, line_endings='native', transport=None):
960
 
        """Build a test tree according to a pattern.
961
 
 
962
 
        shape is a sequence of file specifications.  If the final
963
 
        character is '/', a directory is created.
964
 
 
965
 
        This doesn't add anything to a branch.
966
 
        :param line_endings: Either 'binary' or 'native'
967
 
                             in binary mode, exact contents are written
968
 
                             in native mode, the line endings match the
969
 
                             default platform endings.
970
 
 
971
 
        :param transport: A transport to write to, for building trees on 
972
 
                          VFS's. If the transport is readonly or None,
973
 
                          "." is opened automatically.
974
 
        """
975
 
        # XXX: It's OK to just create them using forward slashes on windows?
976
 
        if transport is None or transport.is_readonly():
977
 
            transport = get_transport(".")
978
 
        for name in shape:
979
 
            self.assert_(isinstance(name, basestring))
980
 
            if name[-1] == '/':
981
 
                transport.mkdir(urlutils.escape(name[:-1]))
982
 
            else:
983
 
                if line_endings == 'binary':
984
 
                    end = '\n'
985
 
                elif line_endings == 'native':
986
 
                    end = os.linesep
987
 
                else:
988
 
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
989
 
                content = "contents of %s%s" % (name.encode('utf-8'), end)
990
 
                transport.put(urlutils.escape(name), StringIO(content))
991
 
 
992
 
    def build_tree_contents(self, shape):
993
 
        build_tree_contents(shape)
994
 
 
995
 
    def failUnlessExists(self, path):
996
 
        """Fail unless path, which may be abs or relative, exists."""
997
 
        self.failUnless(osutils.lexists(path))
998
 
 
999
 
    def failIfExists(self, path):
1000
 
        """Fail if path, which may be abs or relative, exists."""
1001
 
        self.failIf(osutils.lexists(path))
1002
 
        
1003
 
    def assertFileEqual(self, content, path):
1004
 
        """Fail if path does not contain 'content'."""
1005
 
        self.failUnless(osutils.lexists(path))
1006
 
        # TODO: jam 20060427 Shouldn't this be 'rb'?
1007
 
        self.assertEqualDiff(content, open(path, 'r').read())
1008
 
 
1009
 
 
1010
 
class TestCaseWithTransport(TestCaseInTempDir):
1011
 
    """A test case that provides get_url and get_readonly_url facilities.
1012
 
 
1013
 
    These back onto two transport servers, one for readonly access and one for
1014
 
    read write access.
1015
 
 
1016
 
    If no explicit class is provided for readonly access, a
1017
 
    ReadonlyTransportDecorator is used instead which allows the use of non disk
1018
 
    based read write transports.
1019
 
 
1020
 
    If an explicit class is provided for readonly access, that server and the 
1021
 
    readwrite one must both define get_url() as resolving to os.getcwd().
1022
 
    """
1023
 
 
1024
 
    def __init__(self, methodName='testMethod'):
1025
 
        super(TestCaseWithTransport, self).__init__(methodName)
1026
 
        self.__readonly_server = None
1027
 
        self.__server = None
1028
 
        self.transport_server = default_transport
1029
 
        self.transport_readonly_server = None
1030
 
 
1031
 
    def get_readonly_url(self, relpath=None):
1032
 
        """Get a URL for the readonly transport.
1033
 
 
1034
 
        This will either be backed by '.' or a decorator to the transport 
1035
 
        used by self.get_url()
1036
 
        relpath provides for clients to get a path relative to the base url.
1037
 
        These should only be downwards relative, not upwards.
1038
 
        """
1039
 
        base = self.get_readonly_server().get_url()
1040
 
        if relpath is not None:
1041
 
            if not base.endswith('/'):
1042
 
                base = base + '/'
1043
 
            base = base + relpath
1044
 
        return base
1045
 
 
1046
 
    def get_readonly_server(self):
1047
 
        """Get the server instance for the readonly transport
1048
 
 
1049
 
        This is useful for some tests with specific servers to do diagnostics.
1050
 
        """
1051
 
        if self.__readonly_server is None:
1052
 
            if self.transport_readonly_server is None:
1053
 
                # readonly decorator requested
1054
 
                # bring up the server
1055
 
                self.get_url()
1056
 
                self.__readonly_server = ReadonlyServer()
1057
 
                self.__readonly_server.setUp(self.__server)
1058
 
            else:
1059
 
                self.__readonly_server = self.transport_readonly_server()
1060
 
                self.__readonly_server.setUp()
1061
 
            self.addCleanup(self.__readonly_server.tearDown)
1062
 
        return self.__readonly_server
1063
 
 
1064
 
    def get_server(self):
1065
 
        """Get the read/write server instance.
1066
 
 
1067
 
        This is useful for some tests with specific servers that need
1068
 
        diagnostics.
1069
 
        """
1070
 
        if self.__server is None:
1071
 
            self.__server = self.transport_server()
1072
 
            self.__server.setUp()
1073
 
            self.addCleanup(self.__server.tearDown)
1074
 
        return self.__server
1075
 
 
1076
 
    def get_url(self, relpath=None):
1077
 
        """Get a URL for the readwrite transport.
1078
 
 
1079
 
        This will either be backed by '.' or to an equivalent non-file based
1080
 
        facility.
1081
 
        relpath provides for clients to get a path relative to the base url.
1082
 
        These should only be downwards relative, not upwards.
1083
 
        """
1084
 
        base = self.get_server().get_url()
1085
 
        if relpath is not None and relpath != '.':
1086
 
            if not base.endswith('/'):
1087
 
                base = base + '/'
1088
 
            base = base + urlutils.escape(relpath)
1089
 
        return base
1090
 
 
1091
 
    def get_transport(self):
1092
 
        """Return a writeable transport for the test scratch space"""
1093
 
        t = get_transport(self.get_url())
1094
 
        self.assertFalse(t.is_readonly())
1095
 
        return t
1096
 
 
1097
 
    def get_readonly_transport(self):
1098
 
        """Return a readonly transport for the test scratch space
1099
 
        
1100
 
        This can be used to test that operations which should only need
1101
 
        readonly access in fact do not try to write.
1102
 
        """
1103
 
        t = get_transport(self.get_readonly_url())
1104
 
        self.assertTrue(t.is_readonly())
1105
 
        return t
1106
 
 
1107
 
    def make_branch(self, relpath, format=None):
1108
 
        """Create a branch on the transport at relpath."""
1109
 
        repo = self.make_repository(relpath, format=format)
1110
 
        return repo.bzrdir.create_branch()
1111
 
 
1112
 
    def make_bzrdir(self, relpath, format=None):
1113
 
        try:
1114
 
            url = self.get_url(relpath)
1115
 
            mutter('relpath %r => url %r', relpath, url)
1116
 
            segments = url.split('/')
1117
 
            if segments and segments[-1] not in ('', '.'):
1118
 
                parent = '/'.join(segments[:-1])
1119
 
                t = get_transport(parent)
1120
 
                try:
1121
 
                    t.mkdir(segments[-1])
1122
 
                except errors.FileExists:
1123
 
                    pass
1124
 
            if format is None:
1125
 
                format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1126
 
            # FIXME: make this use a single transport someday. RBC 20060418
1127
 
            return format.initialize_on_transport(get_transport(relpath))
1128
 
        except errors.UninitializableFormat:
1129
 
            raise TestSkipped("Format %s is not initializable." % format)
1130
 
 
1131
 
    def make_repository(self, relpath, shared=False, format=None):
1132
 
        """Create a repository on our default transport at relpath."""
1133
 
        made_control = self.make_bzrdir(relpath, format=format)
1134
 
        return made_control.create_repository(shared=shared)
1135
 
 
1136
 
    def make_branch_and_tree(self, relpath, format=None):
1137
 
        """Create a branch on the transport and a tree locally.
1138
 
 
1139
 
        Returns the tree.
1140
 
        """
1141
 
        # TODO: always use the local disk path for the working tree,
1142
 
        # this obviously requires a format that supports branch references
1143
 
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1144
 
        # RBC 20060208
1145
 
        b = self.make_branch(relpath, format=format)
1146
 
        try:
1147
 
            return b.bzrdir.create_workingtree()
1148
 
        except errors.NotLocalUrl:
1149
 
            # new formats - catch No tree error and create
1150
 
            # a branch reference and a checkout.
1151
 
            # old formats at that point - raise TestSkipped.
1152
 
            # TODO: rbc 20060208
1153
 
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1154
 
 
1155
 
    def assertIsDirectory(self, relpath, transport):
1156
 
        """Assert that relpath within transport is a directory.
1157
 
 
1158
 
        This may not be possible on all transports; in that case it propagates
1159
 
        a TransportNotPossible.
1160
 
        """
1161
 
        try:
1162
 
            mode = transport.stat(relpath).st_mode
1163
 
        except errors.NoSuchFile:
1164
 
            self.fail("path %s is not a directory; no such file"
1165
 
                      % (relpath))
1166
 
        if not stat.S_ISDIR(mode):
1167
 
            self.fail("path %s is not a directory; has mode %#o"
1168
 
                      % (relpath, mode))
1169
 
 
1170
 
 
1171
 
class ChrootedTestCase(TestCaseWithTransport):
1172
 
    """A support class that provides readonly urls outside the local namespace.
1173
 
 
1174
 
    This is done by checking if self.transport_server is a MemoryServer. if it
1175
 
    is then we are chrooted already, if it is not then an HttpServer is used
1176
 
    for readonly urls.
1177
 
 
1178
 
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1179
 
                       be used without needed to redo it when a different 
1180
 
                       subclass is in use ?
1181
 
    """
1182
 
 
1183
 
    def setUp(self):
1184
 
        super(ChrootedTestCase, self).setUp()
1185
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1186
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
1187
 
 
1188
 
 
1189
 
def filter_suite_by_re(suite, pattern):
1190
 
    result = TestUtil.TestSuite()
1191
 
    filter_re = re.compile(pattern)
1192
 
    for test in iter_suite_tests(suite):
1193
 
        if filter_re.search(test.id()):
1194
 
            result.addTest(test)
1195
 
    return result
1196
 
 
1197
 
 
1198
 
def run_suite(suite, name='test', verbose=False, pattern=".*",
1199
 
              stop_on_failure=False, keep_output=False,
1200
 
              transport=None, lsprof_timed=None):
1201
 
    TestCaseInTempDir._TEST_NAME = name
1202
 
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1203
 
    if verbose:
1204
 
        verbosity = 2
1205
 
        pb = None
1206
 
    else:
1207
 
        verbosity = 1
1208
 
        pb = progress.ProgressBar()
1209
 
    runner = TextTestRunner(stream=sys.stdout,
1210
 
                            descriptions=0,
1211
 
                            verbosity=verbosity,
1212
 
                            keep_output=keep_output,
1213
 
                            pb=pb)
1214
 
    runner.stop_on_failure=stop_on_failure
1215
 
    if pattern != '.*':
1216
 
        suite = filter_suite_by_re(suite, pattern)
1217
 
    result = runner.run(suite)
1218
 
    return result.wasSuccessful()
1219
 
 
1220
 
 
1221
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1222
 
             keep_output=False,
1223
 
             transport=None,
1224
 
             test_suite_factory=None,
1225
 
             lsprof_timed=None):
1226
 
    """Run the whole test suite under the enhanced runner"""
1227
 
    global default_transport
1228
 
    if transport is None:
1229
 
        transport = default_transport
1230
 
    old_transport = default_transport
1231
 
    default_transport = transport
1232
 
    try:
1233
 
        if test_suite_factory is None:
1234
 
            suite = test_suite()
1235
 
        else:
1236
 
            suite = test_suite_factory()
1237
 
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1238
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
1239
 
                     transport=transport,
1240
 
                     lsprof_timed=lsprof_timed)
1241
 
    finally:
1242
 
        default_transport = old_transport
1243
 
 
1244
 
 
1245
 
def test_suite():
1246
 
    """Build and return TestSuite for the whole of bzrlib.
1247
 
    
1248
 
    This function can be replaced if you need to change the default test
1249
 
    suite on a global basis, but it is not encouraged.
1250
 
    """
1251
 
    testmod_names = [
1252
 
                   'bzrlib.tests.test_ancestry',
1253
 
                   'bzrlib.tests.test_api',
1254
 
                   'bzrlib.tests.test_bad_files',
1255
 
                   'bzrlib.tests.test_branch',
1256
 
                   'bzrlib.tests.test_bundle',
1257
 
                   'bzrlib.tests.test_bzrdir',
1258
 
                   'bzrlib.tests.test_command',
1259
 
                   'bzrlib.tests.test_commit',
1260
 
                   'bzrlib.tests.test_commit_merge',
1261
 
                   'bzrlib.tests.test_config',
1262
 
                   'bzrlib.tests.test_conflicts',
1263
 
                   'bzrlib.tests.test_decorators',
1264
 
                   'bzrlib.tests.test_diff',
1265
 
                   'bzrlib.tests.test_doc_generate',
1266
 
                   'bzrlib.tests.test_errors',
1267
 
                   'bzrlib.tests.test_escaped_store',
1268
 
                   'bzrlib.tests.test_fetch',
1269
 
                   'bzrlib.tests.test_gpg',
1270
 
                   'bzrlib.tests.test_graph',
1271
 
                   'bzrlib.tests.test_hashcache',
1272
 
                   'bzrlib.tests.test_http',
1273
 
                   'bzrlib.tests.test_http_response',
1274
 
                   'bzrlib.tests.test_identitymap',
1275
 
                   'bzrlib.tests.test_ignores',
1276
 
                   'bzrlib.tests.test_inv',
1277
 
                   'bzrlib.tests.test_knit',
1278
 
                   'bzrlib.tests.test_lockdir',
1279
 
                   'bzrlib.tests.test_lockable_files',
1280
 
                   'bzrlib.tests.test_log',
1281
 
                   'bzrlib.tests.test_merge',
1282
 
                   'bzrlib.tests.test_merge3',
1283
 
                   'bzrlib.tests.test_merge_core',
1284
 
                   'bzrlib.tests.test_missing',
1285
 
                   'bzrlib.tests.test_msgeditor',
1286
 
                   'bzrlib.tests.test_nonascii',
1287
 
                   'bzrlib.tests.test_options',
1288
 
                   'bzrlib.tests.test_osutils',
1289
 
                   'bzrlib.tests.test_patch',
1290
 
                   'bzrlib.tests.test_patches',
1291
 
                   'bzrlib.tests.test_permissions',
1292
 
                   'bzrlib.tests.test_plugins',
1293
 
                   'bzrlib.tests.test_progress',
1294
 
                   'bzrlib.tests.test_reconcile',
1295
 
                   'bzrlib.tests.test_repository',
1296
 
                   'bzrlib.tests.test_revision',
1297
 
                   'bzrlib.tests.test_revisionnamespaces',
1298
 
                   'bzrlib.tests.test_revprops',
1299
 
                   'bzrlib.tests.test_revisiontree',
1300
 
                   'bzrlib.tests.test_rio',
1301
 
                   'bzrlib.tests.test_sampler',
1302
 
                   'bzrlib.tests.test_selftest',
1303
 
                   'bzrlib.tests.test_setup',
1304
 
                   'bzrlib.tests.test_sftp_transport',
1305
 
                   'bzrlib.tests.test_smart_add',
1306
 
                   'bzrlib.tests.test_source',
1307
 
                   'bzrlib.tests.test_status',
1308
 
                   'bzrlib.tests.test_store',
1309
 
                   'bzrlib.tests.test_symbol_versioning',
1310
 
                   'bzrlib.tests.test_testament',
1311
 
                   'bzrlib.tests.test_textfile',
1312
 
                   'bzrlib.tests.test_textmerge',
1313
 
                   'bzrlib.tests.test_trace',
1314
 
                   'bzrlib.tests.test_transactions',
1315
 
                   'bzrlib.tests.test_transform',
1316
 
                   'bzrlib.tests.test_transport',
1317
 
                   'bzrlib.tests.test_tsort',
1318
 
                   'bzrlib.tests.test_tuned_gzip',
1319
 
                   'bzrlib.tests.test_ui',
1320
 
                   'bzrlib.tests.test_upgrade',
1321
 
                   'bzrlib.tests.test_urlutils',
1322
 
                   'bzrlib.tests.test_versionedfile',
1323
 
                   'bzrlib.tests.test_weave',
1324
 
                   'bzrlib.tests.test_whitebox',
1325
 
                   'bzrlib.tests.test_workingtree',
1326
 
                   'bzrlib.tests.test_xml',
1327
 
                   ]
1328
 
    test_transport_implementations = [
1329
 
        'bzrlib.tests.test_transport_implementations',
1330
 
        'bzrlib.tests.test_read_bundle',
1331
 
        ]
1332
 
    suite = TestUtil.TestSuite()
1333
 
    loader = TestUtil.TestLoader()
1334
 
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1335
 
    from bzrlib.transport import TransportTestProviderAdapter
1336
 
    adapter = TransportTestProviderAdapter()
1337
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
1338
 
    for package in packages_to_test():
1339
 
        suite.addTest(package.test_suite())
 
21
MODULES_TO_DOCTEST = []
 
22
 
 
23
def selftest():
 
24
    from unittest import TestLoader, TestSuite
 
25
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
 
26
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
 
27
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
 
28
 
 
29
    import bzrlib.selftest.whitebox
 
30
    import bzrlib.selftest.blackbox
 
31
    import bzrlib.selftest.versioning
 
32
    import bzrlib.selftest.testmerge3
 
33
    import bzrlib.selftest.testhashcache
 
34
    import bzrlib.selftest.testrevisionnamespaces
 
35
    import bzrlib.selftest.testbranch
 
36
    import bzrlib.selftest.teststatus
 
37
    import bzrlib.selftest.testinv
 
38
    import bzrlib.merge_core
 
39
    from doctest import DocTestSuite
 
40
    import os
 
41
    import shutil
 
42
    import time
 
43
    import sys
 
44
    import unittest
 
45
 
 
46
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
 
47
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
 
48
        if m not in MODULES_TO_DOCTEST:
 
49
            MODULES_TO_DOCTEST.append(m)
 
50
            
 
51
    for m in (bzrlib.selftest.whitebox,
 
52
              bzrlib.selftest.versioning,
 
53
              bzrlib.selftest.testinv,
 
54
              bzrlib.selftest.testmerge3,
 
55
              bzrlib.selftest.testhashcache,
 
56
              bzrlib.selftest.teststatus,
 
57
              bzrlib.selftest.blackbox,
 
58
              bzrlib.selftest.testhashcache,
 
59
              bzrlib.selftest.testrevisionnamespaces,
 
60
              bzrlib.selftest.testbranch,
 
61
              ):
 
62
        if m not in MODULES_TO_TEST:
 
63
            MODULES_TO_TEST.append(m)
 
64
 
 
65
 
 
66
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
 
67
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
 
68
 
 
69
    print
 
70
 
 
71
    suite = TestSuite()
 
72
 
 
73
    # should also test bzrlib.merge_core, but they seem to be out of date with
 
74
    # the code.
 
75
 
 
76
 
 
77
    # XXX: python2.3's TestLoader() doesn't seem to find all the
 
78
    # tests; don't know why
1340
79
    for m in MODULES_TO_TEST:
1341
 
        suite.addTest(loader.loadTestsFromModule(m))
1342
 
    for m in MODULES_TO_DOCTEST:
1343
 
        suite.addTest(doctest.DocTestSuite(m))
1344
 
    for name, plugin in bzrlib.plugin.all_plugins().items():
1345
 
        if getattr(plugin, 'test_suite', None) is not None:
1346
 
            suite.addTest(plugin.test_suite())
1347
 
    return suite
1348
 
 
1349
 
 
1350
 
def adapt_modules(mods_list, adapter, loader, suite):
1351
 
    """Adapt the modules in mods_list using adapter and add to suite."""
1352
 
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1353
 
        suite.addTests(adapter.adapt(test))
 
80
         suite.addTest(TestLoader().loadTestsFromModule(m))
 
81
 
 
82
    for m in (MODULES_TO_DOCTEST):
 
83
        suite.addTest(DocTestSuite(m))
 
84
 
 
85
    for p in bzrlib.plugin.all_plugins:
 
86
        if hasattr(p, 'test_suite'):
 
87
            suite.addTest(p.test_suite())
 
88
 
 
89
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
 
90
 
 
91
    return run_suite(suite, 'testbzr')
 
92
 
 
93
 
 
94