~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/__init__.py

  • Committer: Robert Collins
  • Date: 2005-10-06 00:52:53 UTC
  • Revision ID: robertc@robertcollins.net-20051006005253-415c38ad22094f13
define some expected behaviour for inventory_entry.snapshot

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005 by Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
23
 
 
24
 
# NOTE: Some classes in here use camelCaseNaming() rather than
25
 
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
 
# new assertFoo() methods.
28
 
 
29
 
import codecs
30
18
from cStringIO import StringIO
31
 
import difflib
32
 
import doctest
33
 
import errno
34
19
import logging
 
20
import unittest
 
21
import tempfile
35
22
import os
 
23
import sys
 
24
import errno
 
25
import subprocess
 
26
import shutil
36
27
import re
37
 
import shlex
38
 
import stat
39
 
from subprocess import Popen, PIPE
40
 
import sys
41
 
import tempfile
42
 
import unittest
43
 
import time
44
 
 
45
 
 
46
 
import bzrlib.branch
47
 
import bzrlib.bzrdir as bzrdir
 
28
 
48
29
import bzrlib.commands
49
 
import bzrlib.bundle.serializer
50
 
import bzrlib.errors as errors
51
 
import bzrlib.inventory
52
 
import bzrlib.iterablefile
53
 
import bzrlib.lockdir
54
 
try:
55
 
    import bzrlib.lsprof
56
 
except ImportError:
57
 
    # lsprof not available
58
 
    pass
59
 
from bzrlib.merge import merge_inner
60
 
import bzrlib.merge3
61
 
import bzrlib.osutils
62
 
import bzrlib.osutils as osutils
63
 
import bzrlib.plugin
64
 
import bzrlib.progress as progress
65
 
from bzrlib.revision import common_ancestor
66
 
import bzrlib.store
67
30
import bzrlib.trace
68
 
from bzrlib.transport import get_transport
69
 
import bzrlib.transport
70
 
from bzrlib.transport.local import LocalRelpathServer
71
 
from bzrlib.transport.readonly import ReadonlyServer
72
 
from bzrlib.trace import mutter
73
 
from bzrlib.tests import TestUtil
74
 
from bzrlib.tests.TestUtil import (
75
 
                          TestSuite,
76
 
                          TestLoader,
77
 
                          )
78
 
from bzrlib.tests.treeshape import build_tree_contents
79
 
import bzrlib.urlutils as urlutils
80
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
31
import bzrlib.fetch
 
32
from bzrlib.selftest import TestUtil
 
33
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
81
34
 
82
 
default_transport = LocalRelpathServer
83
35
 
84
36
MODULES_TO_TEST = []
85
 
MODULES_TO_DOCTEST = [
86
 
                      bzrlib.branch,
87
 
                      bzrlib.bundle.serializer,
88
 
                      bzrlib.commands,
89
 
                      bzrlib.errors,
90
 
                      bzrlib.inventory,
91
 
                      bzrlib.iterablefile,
92
 
                      bzrlib.lockdir,
93
 
                      bzrlib.merge3,
94
 
                      bzrlib.option,
95
 
                      bzrlib.osutils,
96
 
                      bzrlib.store
97
 
                      ]
98
 
 
99
 
 
100
 
def packages_to_test():
101
 
    """Return a list of packages to test.
102
 
 
103
 
    The packages are not globally imported so that import failures are
104
 
    triggered when running selftest, not when importing the command.
105
 
    """
106
 
    import bzrlib.doc
107
 
    import bzrlib.tests.blackbox
108
 
    import bzrlib.tests.branch_implementations
109
 
    import bzrlib.tests.bzrdir_implementations
110
 
    import bzrlib.tests.interrepository_implementations
111
 
    import bzrlib.tests.interversionedfile_implementations
112
 
    import bzrlib.tests.repository_implementations
113
 
    import bzrlib.tests.revisionstore_implementations
114
 
    import bzrlib.tests.workingtree_implementations
115
 
    return [
116
 
            bzrlib.doc,
117
 
            bzrlib.tests.blackbox,
118
 
            bzrlib.tests.branch_implementations,
119
 
            bzrlib.tests.bzrdir_implementations,
120
 
            bzrlib.tests.interrepository_implementations,
121
 
            bzrlib.tests.interversionedfile_implementations,
122
 
            bzrlib.tests.repository_implementations,
123
 
            bzrlib.tests.revisionstore_implementations,
124
 
            bzrlib.tests.workingtree_implementations,
125
 
            ]
 
37
MODULES_TO_DOCTEST = []
 
38
 
 
39
from logging import debug, warning, error
 
40
 
 
41
 
 
42
 
 
43
class EarlyStoppingTestResultAdapter(object):
 
44
    """An adapter for TestResult to stop at the first first failure or error"""
 
45
 
 
46
    def __init__(self, result):
 
47
        self._result = result
 
48
 
 
49
    def addError(self, test, err):
 
50
        self._result.addError(test, err)
 
51
        self._result.stop()
 
52
 
 
53
    def addFailure(self, test, err):
 
54
        self._result.addFailure(test, err)
 
55
        self._result.stop()
 
56
 
 
57
    def __getattr__(self, name):
 
58
        return getattr(self._result, name)
 
59
 
 
60
    def __setattr__(self, name, value):
 
61
        if name == '_result':
 
62
            object.__setattr__(self, name, value)
 
63
        return setattr(self._result, name, value)
126
64
 
127
65
 
128
66
class _MyResult(unittest._TextTestResult):
129
 
    """Custom TestResult.
130
 
 
131
 
    Shows output in a different format, including displaying runtime for tests.
132
 
    """
133
 
    stop_early = False
134
 
    
135
 
    def __init__(self, stream, descriptions, verbosity, pb=None):
136
 
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
137
 
        self.pb = pb
138
 
    
139
 
    def extractBenchmarkTime(self, testCase):
140
 
        """Add a benchmark time for the current test case."""
141
 
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
142
 
    
143
 
    def _elapsedTestTimeString(self):
144
 
        """Return a time string for the overall time the current test has taken."""
145
 
        return self._formatTime(time.time() - self._start_time)
146
 
 
147
 
    def _testTimeString(self):
148
 
        if self._benchmarkTime is not None:
149
 
            return "%s/%s" % (
150
 
                self._formatTime(self._benchmarkTime),
151
 
                self._elapsedTestTimeString())
152
 
        else:
153
 
            return "      %s" % self._elapsedTestTimeString()
154
 
 
155
 
    def _formatTime(self, seconds):
156
 
        """Format seconds as milliseconds with leading spaces."""
157
 
        return "%5dms" % (1000 * seconds)
158
 
 
159
 
    def _ellipsise_unimportant_words(self, a_string, final_width,
160
 
                                   keep_start=False):
161
 
        """Add ellipses (sp?) for overly long strings.
162
 
        
163
 
        :param keep_start: If true preserve the start of a_string rather
164
 
                           than the end of it.
165
 
        """
166
 
        if keep_start:
167
 
            if len(a_string) > final_width:
168
 
                result = a_string[:final_width-3] + '...'
169
 
            else:
170
 
                result = a_string
171
 
        else:
172
 
            if len(a_string) > final_width:
173
 
                result = '...' + a_string[3-final_width:]
174
 
            else:
175
 
                result = a_string
176
 
        return result.ljust(final_width)
 
67
    """
 
68
    Custom TestResult.
 
69
 
 
70
    No special behaviour for now.
 
71
    """
177
72
 
178
73
    def startTest(self, test):
179
74
        unittest.TestResult.startTest(self, test)
180
 
        # In a short description, the important words are in
181
 
        # the beginning, but in an id, the important words are
182
 
        # at the end
183
 
        SHOW_DESCRIPTIONS = False
184
 
 
185
 
        if not self.showAll and self.dots and self.pb is not None:
186
 
            final_width = 13
187
 
        else:
188
 
            final_width = osutils.terminal_width()
189
 
            final_width = final_width - 15 - 8
190
 
        what = None
191
 
        if SHOW_DESCRIPTIONS:
192
 
            what = test.shortDescription()
193
 
            if what:
194
 
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
195
 
        if what is None:
196
 
            what = test.id()
197
 
            if what.startswith('bzrlib.tests.'):
198
 
                what = what[13:]
199
 
            what = self._ellipsise_unimportant_words(what, final_width)
 
75
        # TODO: Maybe show test.shortDescription somewhere?
 
76
        what = test.shortDescription() or test.id()        
200
77
        if self.showAll:
201
 
            self.stream.write(what)
202
 
        elif self.dots and self.pb is not None:
203
 
            self.pb.update(what, self.testsRun - 1, None)
 
78
            self.stream.write('%-70.70s' % what)
204
79
        self.stream.flush()
205
 
        self._recordTestStartTime()
206
 
 
207
 
    def _recordTestStartTime(self):
208
 
        """Record that a test has started."""
209
 
        self._start_time = time.time()
210
80
 
211
81
    def addError(self, test, err):
212
 
        if isinstance(err[1], TestSkipped):
213
 
            return self.addSkipped(test, err)    
214
 
        unittest.TestResult.addError(self, test, err)
215
 
        self.extractBenchmarkTime(test)
216
 
        if self.showAll:
217
 
            self.stream.writeln("ERROR %s" % self._testTimeString())
218
 
        elif self.dots and self.pb is None:
219
 
            self.stream.write('E')
220
 
        elif self.dots:
221
 
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
 
82
        super(_MyResult, self).addError(test, err)
222
83
        self.stream.flush()
223
 
        if self.stop_early:
224
 
            self.stop()
225
84
 
226
85
    def addFailure(self, test, err):
227
 
        unittest.TestResult.addFailure(self, test, err)
228
 
        self.extractBenchmarkTime(test)
229
 
        if self.showAll:
230
 
            self.stream.writeln(" FAIL %s" % self._testTimeString())
231
 
        elif self.dots and self.pb is None:
232
 
            self.stream.write('F')
233
 
        elif self.dots:
234
 
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
 
86
        super(_MyResult, self).addFailure(test, err)
235
87
        self.stream.flush()
236
 
        if self.stop_early:
237
 
            self.stop()
238
88
 
239
89
    def addSuccess(self, test):
240
 
        self.extractBenchmarkTime(test)
241
90
        if self.showAll:
242
 
            self.stream.writeln('   OK %s' % self._testTimeString())
243
 
            for bench_called, stats in getattr(test, '_benchcalls', []):
244
 
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
245
 
                stats.pprint(file=self.stream)
246
 
        elif self.dots and self.pb is None:
 
91
            self.stream.writeln('OK')
 
92
        elif self.dots:
247
93
            self.stream.write('~')
248
 
        elif self.dots:
249
 
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
250
 
        self.stream.flush()
251
 
        unittest.TestResult.addSuccess(self, test)
252
 
 
253
 
    def addSkipped(self, test, skip_excinfo):
254
 
        self.extractBenchmarkTime(test)
255
 
        if self.showAll:
256
 
            print >>self.stream, ' SKIP %s' % self._testTimeString()
257
 
            print >>self.stream, '     %s' % skip_excinfo[1]
258
 
        elif self.dots and self.pb is None:
259
 
            self.stream.write('S')
260
 
        elif self.dots:
261
 
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
262
 
        self.stream.flush()
263
 
        # seems best to treat this as success from point-of-view of unittest
264
 
        # -- it actually does nothing so it barely matters :)
 
94
        self.stream.flush()
265
95
        unittest.TestResult.addSuccess(self, test)
266
96
 
267
97
    def printErrorList(self, flavour, errors):
268
98
        for test, err in errors:
269
99
            self.stream.writeln(self.separator1)
270
 
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
271
 
            if getattr(test, '_get_log', None) is not None:
272
 
                print >>self.stream
273
 
                print >>self.stream, \
274
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
100
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
 
101
            if hasattr(test, '_get_log'):
 
102
                self.stream.writeln()
 
103
                self.stream.writeln('log from this test:')
275
104
                print >>self.stream, test._get_log()
276
 
                print >>self.stream, \
277
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
278
105
            self.stream.writeln(self.separator2)
279
106
            self.stream.writeln("%s" % err)
280
107
 
281
108
 
282
 
class TextTestRunner(object):
283
 
    stop_on_failure = False
284
 
 
285
 
    def __init__(self,
286
 
                 stream=sys.stderr,
287
 
                 descriptions=0,
288
 
                 verbosity=1,
289
 
                 keep_output=False,
290
 
                 pb=None):
291
 
        self.stream = unittest._WritelnDecorator(stream)
292
 
        self.descriptions = descriptions
293
 
        self.verbosity = verbosity
294
 
        self.keep_output = keep_output
295
 
        self.pb = pb
 
109
class TextTestRunner(unittest.TextTestRunner):
296
110
 
297
111
    def _makeResult(self):
298
 
        result = _MyResult(self.stream,
299
 
                           self.descriptions,
300
 
                           self.verbosity,
301
 
                           pb=self.pb)
302
 
        result.stop_early = self.stop_on_failure
303
 
        return result
304
 
 
305
 
    def run(self, test):
306
 
        "Run the given test case or test suite."
307
 
        result = self._makeResult()
308
 
        startTime = time.time()
309
 
        if self.pb is not None:
310
 
            self.pb.update('Running tests', 0, test.countTestCases())
311
 
        test.run(result)
312
 
        stopTime = time.time()
313
 
        timeTaken = stopTime - startTime
314
 
        result.printErrors()
315
 
        self.stream.writeln(result.separator2)
316
 
        run = result.testsRun
317
 
        self.stream.writeln("Ran %d test%s in %.3fs" %
318
 
                            (run, run != 1 and "s" or "", timeTaken))
319
 
        self.stream.writeln()
320
 
        if not result.wasSuccessful():
321
 
            self.stream.write("FAILED (")
322
 
            failed, errored = map(len, (result.failures, result.errors))
323
 
            if failed:
324
 
                self.stream.write("failures=%d" % failed)
325
 
            if errored:
326
 
                if failed: self.stream.write(", ")
327
 
                self.stream.write("errors=%d" % errored)
328
 
            self.stream.writeln(")")
329
 
        else:
330
 
            self.stream.writeln("OK")
331
 
        if self.pb is not None:
332
 
            self.pb.update('Cleaning up', 0, 1)
333
 
        # This is still a little bogus, 
334
 
        # but only a little. Folk not using our testrunner will
335
 
        # have to delete their temp directories themselves.
336
 
        test_root = TestCaseInTempDir.TEST_ROOT
337
 
        if result.wasSuccessful() or not self.keep_output:
338
 
            if test_root is not None:
339
 
                # If LANG=C we probably have created some bogus paths
340
 
                # which rmtree(unicode) will fail to delete
341
 
                # so make sure we are using rmtree(str) to delete everything
342
 
                # except on win32, where rmtree(str) will fail
343
 
                # since it doesn't have the property of byte-stream paths
344
 
                # (they are either ascii or mbcs)
345
 
                if sys.platform == 'win32':
346
 
                    # make sure we are using the unicode win32 api
347
 
                    test_root = unicode(test_root)
348
 
                else:
349
 
                    test_root = test_root.encode(
350
 
                        sys.getfilesystemencoding())
351
 
                osutils.rmtree(test_root)
352
 
        else:
353
 
            if self.pb is not None:
354
 
                self.pb.note("Failed tests working directories are in '%s'\n",
355
 
                             test_root)
356
 
            else:
357
 
                self.stream.writeln(
358
 
                    "Failed tests working directories are in '%s'\n" %
359
 
                    test_root)
360
 
        TestCaseInTempDir.TEST_ROOT = None
361
 
        if self.pb is not None:
362
 
            self.pb.clear()
363
 
        return result
 
112
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
 
113
        return EarlyStoppingTestResultAdapter(result)
364
114
 
365
115
 
366
116
def iter_suite_tests(suite):
384
134
class CommandFailed(Exception):
385
135
    pass
386
136
 
387
 
 
388
 
class StringIOWrapper(object):
389
 
    """A wrapper around cStringIO which just adds an encoding attribute.
390
 
    
391
 
    Internally we can check sys.stdout to see what the output encoding
392
 
    should be. However, cStringIO has no encoding attribute that we can
393
 
    set. So we wrap it instead.
394
 
    """
395
 
    encoding='ascii'
396
 
    _cstring = None
397
 
 
398
 
    def __init__(self, s=None):
399
 
        if s is not None:
400
 
            self.__dict__['_cstring'] = StringIO(s)
401
 
        else:
402
 
            self.__dict__['_cstring'] = StringIO()
403
 
 
404
 
    def __getattr__(self, name, getattr=getattr):
405
 
        return getattr(self.__dict__['_cstring'], name)
406
 
 
407
 
    def __setattr__(self, name, val):
408
 
        if name == 'encoding':
409
 
            self.__dict__['encoding'] = val
410
 
        else:
411
 
            return setattr(self._cstring, name, val)
412
 
 
413
 
 
414
137
class TestCase(unittest.TestCase):
415
138
    """Base class for bzr unit tests.
416
139
    
419
142
 
420
143
    Error and debug log messages are redirected from their usual
421
144
    location into a temporary file, the contents of which can be
422
 
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
423
 
    so that it can also capture file IO.  When the test completes this file
424
 
    is read into memory and removed from disk.
 
145
    retrieved by _get_log().
425
146
       
426
147
    There are also convenience functions to invoke bzr's command-line
427
 
    routine, and to build and check bzr trees.
428
 
   
429
 
    In addition to the usual method of overriding tearDown(), this class also
430
 
    allows subclasses to register functions into the _cleanups list, which is
431
 
    run in order as the object is torn down.  It's less likely this will be
432
 
    accidentally overlooked.
433
 
    """
434
 
 
435
 
    _log_file_name = None
436
 
    _log_contents = ''
437
 
    # record lsprof data when performing benchmark calls.
438
 
    _gather_lsprof_in_benchmarks = False
439
 
 
440
 
    def __init__(self, methodName='testMethod'):
441
 
        super(TestCase, self).__init__(methodName)
442
 
        self._cleanups = []
 
148
    routine, and to build and check bzr trees."""
 
149
 
 
150
    BZRPATH = 'bzr'
443
151
 
444
152
    def setUp(self):
445
153
        unittest.TestCase.setUp(self)
446
 
        self._cleanEnvironment()
447
154
        bzrlib.trace.disable_default_logging()
448
 
        self._startLogFile()
449
 
        self._benchcalls = []
450
 
        self._benchtime = None
451
 
 
452
 
    def _ndiff_strings(self, a, b):
453
 
        """Return ndiff between two strings containing lines.
454
 
        
455
 
        A trailing newline is added if missing to make the strings
456
 
        print properly."""
457
 
        if b and b[-1] != '\n':
458
 
            b += '\n'
459
 
        if a and a[-1] != '\n':
460
 
            a += '\n'
461
 
        difflines = difflib.ndiff(a.splitlines(True),
462
 
                                  b.splitlines(True),
463
 
                                  linejunk=lambda x: False,
464
 
                                  charjunk=lambda x: False)
465
 
        return ''.join(difflines)
466
 
 
467
 
    def assertEqualDiff(self, a, b, message=None):
468
 
        """Assert two texts are equal, if not raise an exception.
469
 
        
470
 
        This is intended for use with multi-line strings where it can 
471
 
        be hard to find the differences by eye.
472
 
        """
473
 
        # TODO: perhaps override assertEquals to call this for strings?
474
 
        if a == b:
475
 
            return
476
 
        if message is None:
477
 
            message = "texts not equal:\n"
478
 
        raise AssertionError(message + 
479
 
                             self._ndiff_strings(a, b))      
480
 
        
481
 
    def assertEqualMode(self, mode, mode_test):
482
 
        self.assertEqual(mode, mode_test,
483
 
                         'mode mismatch %o != %o' % (mode, mode_test))
484
 
 
485
 
    def assertStartsWith(self, s, prefix):
486
 
        if not s.startswith(prefix):
487
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
488
 
 
489
 
    def assertEndsWith(self, s, suffix):
490
 
        """Asserts that s ends with suffix."""
491
 
        if not s.endswith(suffix):
492
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
493
 
 
494
 
    def assertContainsRe(self, haystack, needle_re):
495
 
        """Assert that a contains something matching a regular expression."""
496
 
        if not re.search(needle_re, haystack):
497
 
            raise AssertionError('pattern "%s" not found in "%s"'
498
 
                    % (needle_re, haystack))
499
 
 
500
 
    def assertNotContainsRe(self, haystack, needle_re):
501
 
        """Assert that a does not match a regular expression"""
502
 
        if re.search(needle_re, haystack):
503
 
            raise AssertionError('pattern "%s" found in "%s"'
504
 
                    % (needle_re, haystack))
505
 
 
506
 
    def assertSubset(self, sublist, superlist):
507
 
        """Assert that every entry in sublist is present in superlist."""
508
 
        missing = []
509
 
        for entry in sublist:
510
 
            if entry not in superlist:
511
 
                missing.append(entry)
512
 
        if len(missing) > 0:
513
 
            raise AssertionError("value(s) %r not present in container %r" % 
514
 
                                 (missing, superlist))
515
 
 
516
 
    def assertIs(self, left, right):
517
 
        if not (left is right):
518
 
            raise AssertionError("%r is not %r." % (left, right))
519
 
 
520
 
    def assertTransportMode(self, transport, path, mode):
521
 
        """Fail if a path does not have mode mode.
522
 
        
523
 
        If modes are not supported on this transport, the assertion is ignored.
524
 
        """
525
 
        if not transport._can_roundtrip_unix_modebits():
526
 
            return
527
 
        path_stat = transport.stat(path)
528
 
        actual_mode = stat.S_IMODE(path_stat.st_mode)
529
 
        self.assertEqual(mode, actual_mode,
530
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
531
 
 
532
 
    def assertIsInstance(self, obj, kls):
533
 
        """Fail if obj is not an instance of kls"""
534
 
        if not isinstance(obj, kls):
535
 
            self.fail("%r is an instance of %s rather than %s" % (
536
 
                obj, obj.__class__, kls))
537
 
 
538
 
    def _startLogFile(self):
539
 
        """Send bzr and test log messages to a temporary file.
540
 
 
541
 
        The file is removed as the test is torn down.
542
 
        """
 
155
        self._enable_file_logging()
 
156
 
 
157
 
 
158
    def _enable_file_logging(self):
543
159
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
544
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
545
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
546
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
160
 
 
161
        self._log_file = os.fdopen(fileno, 'w+')
 
162
 
 
163
        hdlr = logging.StreamHandler(self._log_file)
 
164
        hdlr.setLevel(logging.DEBUG)
 
165
        hdlr.setFormatter(logging.Formatter('%(levelname)8s  %(message)s'))
 
166
        logging.getLogger('').addHandler(hdlr)
 
167
        logging.getLogger('').setLevel(logging.DEBUG)
 
168
        self._log_hdlr = hdlr
 
169
        debug('opened log file %s', name)
 
170
        
547
171
        self._log_file_name = name
548
 
        self.addCleanup(self._finishLogFile)
549
 
 
550
 
    def _finishLogFile(self):
551
 
        """Finished with the log file.
552
 
 
553
 
        Read contents into memory, close, and delete.
554
 
        """
555
 
        bzrlib.trace.disable_test_log(self._log_nonce)
556
 
        self._log_file.seek(0)
557
 
        self._log_contents = self._log_file.read()
 
172
 
 
173
    def tearDown(self):
 
174
        logging.getLogger('').removeHandler(self._log_hdlr)
 
175
        bzrlib.trace.enable_default_logging()
 
176
        logging.debug('%s teardown', self.id())
558
177
        self._log_file.close()
559
 
        os.remove(self._log_file_name)
560
 
        self._log_file = self._log_file_name = None
561
 
 
562
 
    def addCleanup(self, callable):
563
 
        """Arrange to run a callable when this case is torn down.
564
 
 
565
 
        Callables are run in the reverse of the order they are registered, 
566
 
        ie last-in first-out.
567
 
        """
568
 
        if callable in self._cleanups:
569
 
            raise ValueError("cleanup function %r already registered on %s" 
570
 
                    % (callable, self))
571
 
        self._cleanups.append(callable)
572
 
 
573
 
    def _cleanEnvironment(self):
574
 
        new_env = {
575
 
            'HOME': os.getcwd(),
576
 
            'APPDATA': os.getcwd(),
577
 
            'BZREMAIL': None,
578
 
            'EMAIL': None,
579
 
        }
580
 
        self.__old_env = {}
581
 
        self.addCleanup(self._restoreEnvironment)
582
 
        for name, value in new_env.iteritems():
583
 
            self._captureVar(name, value)
584
 
 
585
 
 
586
 
    def _captureVar(self, name, newvalue):
587
 
        """Set an environment variable, preparing it to be reset when finished."""
588
 
        self.__old_env[name] = os.environ.get(name, None)
589
 
        if newvalue is None:
590
 
            if name in os.environ:
591
 
                del os.environ[name]
592
 
        else:
593
 
            os.environ[name] = newvalue
594
 
 
595
 
    @staticmethod
596
 
    def _restoreVar(name, value):
597
 
        if value is None:
598
 
            if name in os.environ:
599
 
                del os.environ[name]
600
 
        else:
601
 
            os.environ[name] = value
602
 
 
603
 
    def _restoreEnvironment(self):
604
 
        for name, value in self.__old_env.iteritems():
605
 
            self._restoreVar(name, value)
606
 
 
607
 
    def tearDown(self):
608
 
        self._runCleanups()
609
178
        unittest.TestCase.tearDown(self)
610
179
 
611
 
    def time(self, callable, *args, **kwargs):
612
 
        """Run callable and accrue the time it takes to the benchmark time.
613
 
        
614
 
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
615
 
        this will cause lsprofile statistics to be gathered and stored in
616
 
        self._benchcalls.
617
 
        """
618
 
        if self._benchtime is None:
619
 
            self._benchtime = 0
620
 
        start = time.time()
621
 
        try:
622
 
            if not self._gather_lsprof_in_benchmarks:
623
 
                return callable(*args, **kwargs)
624
 
            else:
625
 
                # record this benchmark
626
 
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
627
 
                stats.sort()
628
 
                self._benchcalls.append(((callable, args, kwargs), stats))
629
 
                return ret
630
 
        finally:
631
 
            self._benchtime += time.time() - start
632
 
 
633
 
    def _runCleanups(self):
634
 
        """Run registered cleanup functions. 
635
 
 
636
 
        This should only be called from TestCase.tearDown.
637
 
        """
638
 
        # TODO: Perhaps this should keep running cleanups even if 
639
 
        # one of them fails?
640
 
        for cleanup_fn in reversed(self._cleanups):
641
 
            cleanup_fn()
642
 
 
643
180
    def log(self, *args):
644
 
        mutter(*args)
 
181
        logging.debug(*args)
645
182
 
646
183
    def _get_log(self):
647
184
        """Return as a string the log for this test"""
648
 
        if self._log_file_name:
649
 
            return open(self._log_file_name).read()
650
 
        else:
651
 
            return self._log_contents
652
 
        # TODO: Delete the log after it's been read in
653
 
 
654
 
    def capture(self, cmd, retcode=0):
 
185
        return open(self._log_file_name).read()
 
186
 
 
187
 
 
188
    def capture(self, cmd):
655
189
        """Shortcut that splits cmd into words, runs, and returns stdout"""
656
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
190
        return self.run_bzr_captured(cmd.split())[0]
657
191
 
658
 
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
659
 
        """Invoke bzr and return (stdout, stderr).
 
192
    def run_bzr_captured(self, argv, retcode=0):
 
193
        """Invoke bzr and return (result, stdout, stderr).
660
194
 
661
195
        Useful for code that wants to check the contents of the
662
196
        output, the way error messages are presented, etc.
672
206
        errors, and with logging set to something approximating the
673
207
        default, so that error reporting can be checked.
674
208
 
675
 
        :param argv: arguments to invoke bzr
676
 
        :param retcode: expected return code, or None for don't-care.
677
 
        :param encoding: encoding for sys.stdout and sys.stderr
678
 
        :param stdin: A string to be used as stdin for the command.
 
209
        argv -- arguments to invoke bzr
 
210
        retcode -- expected return code, or None for don't-care.
679
211
        """
680
 
        if encoding is None:
681
 
            encoding = bzrlib.user_encoding
682
 
        if stdin is not None:
683
 
            stdin = StringIO(stdin)
684
 
        stdout = StringIOWrapper()
685
 
        stderr = StringIOWrapper()
686
 
        stdout.encoding = encoding
687
 
        stderr.encoding = encoding
688
 
 
689
 
        self.log('run bzr: %r', argv)
690
 
        # FIXME: don't call into logging here
 
212
        stdout = StringIO()
 
213
        stderr = StringIO()
 
214
        self.log('run bzr: %s', ' '.join(argv))
691
215
        handler = logging.StreamHandler(stderr)
 
216
        handler.setFormatter(bzrlib.trace.QuietFormatter())
692
217
        handler.setLevel(logging.INFO)
693
218
        logger = logging.getLogger('')
694
219
        logger.addHandler(handler)
695
 
        old_ui_factory = bzrlib.ui.ui_factory
696
 
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
697
 
            stdout=stdout,
698
 
            stderr=stderr)
699
 
        bzrlib.ui.ui_factory.stdin = stdin
700
220
        try:
701
 
            result = self.apply_redirected(stdin, stdout, stderr,
 
221
            result = self.apply_redirected(None, stdout, stderr,
702
222
                                           bzrlib.commands.run_bzr_catch_errors,
703
223
                                           argv)
704
224
        finally:
705
225
            logger.removeHandler(handler)
706
 
            bzrlib.ui.ui_factory = old_ui_factory
707
 
 
708
226
        out = stdout.getvalue()
709
227
        err = stderr.getvalue()
710
228
        if out:
711
 
            self.log('output:\n%r', out)
 
229
            self.log('output:\n%s', out)
712
230
        if err:
713
 
            self.log('errors:\n%r', err)
 
231
            self.log('errors:\n%s', err)
714
232
        if retcode is not None:
715
 
            self.assertEquals(retcode, result)
 
233
            self.assertEquals(result, retcode)
716
234
        return out, err
717
235
 
718
236
    def run_bzr(self, *args, **kwargs):
724
242
 
725
243
        This sends the stdout/stderr results into the test's log,
726
244
        where it may be useful for debugging.  See also run_captured.
727
 
 
728
 
        :param stdin: A string to be used as stdin for the command.
729
245
        """
730
246
        retcode = kwargs.pop('retcode', 0)
731
 
        encoding = kwargs.pop('encoding', None)
732
 
        stdin = kwargs.pop('stdin', None)
733
 
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
734
 
 
735
 
    def run_bzr_decode(self, *args, **kwargs):
736
 
        if kwargs.has_key('encoding'):
737
 
            encoding = kwargs['encoding']
738
 
        else:
739
 
            encoding = bzrlib.user_encoding
740
 
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
741
 
 
742
 
    def run_bzr_error(self, error_regexes, *args, **kwargs):
743
 
        """Run bzr, and check that stderr contains the supplied regexes
744
 
        
745
 
        This defaults to retcode=3, so you must supply retcode=? if you expect
746
 
        a different value.
747
 
        :return: (out, err) The actual output of running the command (in case you
748
 
                 want to do more inspection)
749
 
        """
750
 
        kwargs.setdefault('retcode', 3)
751
 
        out, err = self.run_bzr(*args, **kwargs)
752
 
        for regex in error_regexes:
753
 
            self.assertContainsRe(err, regex)
754
 
        return out, err
755
 
 
756
 
    def run_bzr_subprocess(self, *args, **kwargs):
757
 
        """Run bzr in a subprocess for testing.
758
 
 
759
 
        This starts a new Python interpreter and runs bzr in there. 
760
 
        This should only be used for tests that have a justifiable need for
761
 
        this isolation: e.g. they are testing startup time, or signal
762
 
        handling, or early startup code, etc.  Subprocess code can't be 
763
 
        profiled or debugged so easily.
764
 
 
765
 
        :param retcode: The status code that is expected.  Defaults to 0.  If
766
 
        None is supplied, the status code is not checked.
767
 
        """
768
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
769
 
        args = list(args)
770
 
        process = Popen([sys.executable, bzr_path]+args, stdout=PIPE, 
771
 
                         stderr=PIPE)
772
 
        out = process.stdout.read()
773
 
        err = process.stderr.read()
774
 
        retcode = process.wait()
775
 
        supplied_retcode = kwargs.get('retcode', 0)
776
 
        if supplied_retcode is not None:
777
 
            assert supplied_retcode == retcode
778
 
        return [out, err]
 
247
        return self.run_bzr_captured(args, retcode)
779
248
 
780
249
    def check_inventory_shape(self, inv, shape):
781
250
        """Compare an inventory to a list of expected names.
807
276
        if stdin is None:
808
277
            stdin = StringIO("")
809
278
        if stdout is None:
810
 
            if getattr(self, "_log_file", None) is not None:
 
279
            if hasattr(self, "_log_file"):
811
280
                stdout = self._log_file
812
281
            else:
813
282
                stdout = StringIO()
814
283
        if stderr is None:
815
 
            if getattr(self, "_log_file", None is not None):
 
284
            if hasattr(self, "_log_file"):
816
285
                stderr = self._log_file
817
286
            else:
818
287
                stderr = StringIO()
829
298
            sys.stderr = real_stderr
830
299
            sys.stdin = real_stdin
831
300
 
832
 
    def merge(self, branch_from, wt_to):
833
 
        """A helper for tests to do a ui-less merge.
834
 
 
835
 
        This should move to the main library when someone has time to integrate
836
 
        it in.
837
 
        """
838
 
        # minimal ui-less merge.
839
 
        wt_to.branch.fetch(branch_from)
840
 
        base_rev = common_ancestor(branch_from.last_revision(),
841
 
                                   wt_to.branch.last_revision(),
842
 
                                   wt_to.branch.repository)
843
 
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
844
 
                    wt_to.branch.repository.revision_tree(base_rev),
845
 
                    this_tree=wt_to)
846
 
        wt_to.add_pending_merge(branch_from.last_revision())
847
 
 
848
301
 
849
302
BzrTestBase = TestCase
850
303
 
879
332
            return
880
333
        i = 0
881
334
        while True:
882
 
            root = u'test%04d.tmp' % i
 
335
            root = 'test%04d.tmp' % i
883
336
            try:
884
337
                os.mkdir(root)
885
338
            except OSError, e:
889
342
                else:
890
343
                    raise
891
344
            # successfully created
892
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
 
345
            TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
893
346
            break
894
347
        # make a fake bzr directory there to prevent any tests propagating
895
348
        # up onto the source directory's real branch
896
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
 
349
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
897
350
 
898
351
    def setUp(self):
899
352
        super(TestCaseInTempDir, self).setUp()
900
353
        self._make_test_root()
901
 
        _currentdir = os.getcwdu()
902
 
        # shorten the name, to avoid test failures due to path length
903
 
        short_id = self.id().replace('bzrlib.tests.', '') \
904
 
                   .replace('__main__.', '')[-100:]
905
 
        # it's possible the same test class is run several times for
906
 
        # parameterized tests, so make sure the names don't collide.  
907
 
        i = 0
908
 
        while True:
909
 
            if i > 0:
910
 
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
911
 
            else:
912
 
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
913
 
            if os.path.exists(candidate_dir):
914
 
                i = i + 1
915
 
                continue
916
 
            else:
917
 
                self.test_dir = candidate_dir
918
 
                os.mkdir(self.test_dir)
919
 
                os.chdir(self.test_dir)
920
 
                break
921
 
        os.environ['HOME'] = self.test_dir
922
 
        os.environ['APPDATA'] = self.test_dir
923
 
        def _leaveDirectory():
924
 
            os.chdir(_currentdir)
925
 
        self.addCleanup(_leaveDirectory)
 
354
        self._currentdir = os.getcwdu()
 
355
        short_id = self.id().replace('bzrlib.selftest.', '') \
 
356
                   .replace('__main__.', '')
 
357
        self.test_dir = os.path.join(self.TEST_ROOT, short_id)
 
358
        os.mkdir(self.test_dir)
 
359
        os.chdir(self.test_dir)
926
360
        
927
 
    def build_tree(self, shape, line_endings='native', transport=None):
 
361
    def tearDown(self):
 
362
        os.chdir(self._currentdir)
 
363
        super(TestCaseInTempDir, self).tearDown()
 
364
 
 
365
    def build_tree(self, shape):
928
366
        """Build a test tree according to a pattern.
929
367
 
930
368
        shape is a sequence of file specifications.  If the final
931
369
        character is '/', a directory is created.
932
370
 
933
371
        This doesn't add anything to a branch.
934
 
        :param line_endings: Either 'binary' or 'native'
935
 
                             in binary mode, exact contents are written
936
 
                             in native mode, the line endings match the
937
 
                             default platform endings.
938
 
 
939
 
        :param transport: A transport to write to, for building trees on 
940
 
                          VFS's. If the transport is readonly or None,
941
 
                          "." is opened automatically.
942
372
        """
943
373
        # XXX: It's OK to just create them using forward slashes on windows?
944
 
        if transport is None or transport.is_readonly():
945
 
            transport = get_transport(".")
946
374
        for name in shape:
947
 
            self.assert_(isinstance(name, basestring))
 
375
            assert isinstance(name, basestring)
948
376
            if name[-1] == '/':
949
 
                transport.mkdir(urlutils.escape(name[:-1]))
 
377
                os.mkdir(name[:-1])
950
378
            else:
951
 
                if line_endings == 'binary':
952
 
                    end = '\n'
953
 
                elif line_endings == 'native':
954
 
                    end = os.linesep
955
 
                else:
956
 
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
957
 
                content = "contents of %s%s" % (name.encode('utf-8'), end)
958
 
                transport.put(urlutils.escape(name), StringIO(content))
959
 
 
960
 
    def build_tree_contents(self, shape):
961
 
        build_tree_contents(shape)
 
379
                f = file(name, 'wt')
 
380
                print >>f, "contents of", name
 
381
                f.close()
962
382
 
963
383
    def failUnlessExists(self, path):
964
384
        """Fail unless path, which may be abs or relative, exists."""
965
 
        self.failUnless(osutils.lexists(path))
966
 
 
967
 
    def failIfExists(self, path):
968
 
        """Fail if path, which may be abs or relative, exists."""
969
 
        self.failIf(osutils.lexists(path))
970
 
        
971
 
    def assertFileEqual(self, content, path):
972
 
        """Fail if path does not contain 'content'."""
973
 
        self.failUnless(osutils.lexists(path))
974
 
        # TODO: jam 20060427 Shouldn't this be 'rb'?
975
 
        self.assertEqualDiff(content, open(path, 'r').read())
976
 
 
977
 
 
978
 
class TestCaseWithTransport(TestCaseInTempDir):
979
 
    """A test case that provides get_url and get_readonly_url facilities.
980
 
 
981
 
    These back onto two transport servers, one for readonly access and one for
982
 
    read write access.
983
 
 
984
 
    If no explicit class is provided for readonly access, a
985
 
    ReadonlyTransportDecorator is used instead which allows the use of non disk
986
 
    based read write transports.
987
 
 
988
 
    If an explicit class is provided for readonly access, that server and the 
989
 
    readwrite one must both define get_url() as resolving to os.getcwd().
990
 
    """
991
 
 
992
 
    def __init__(self, methodName='testMethod'):
993
 
        super(TestCaseWithTransport, self).__init__(methodName)
994
 
        self.__readonly_server = None
995
 
        self.__server = None
996
 
        self.transport_server = default_transport
997
 
        self.transport_readonly_server = None
998
 
 
999
 
    def get_readonly_url(self, relpath=None):
1000
 
        """Get a URL for the readonly transport.
1001
 
 
1002
 
        This will either be backed by '.' or a decorator to the transport 
1003
 
        used by self.get_url()
1004
 
        relpath provides for clients to get a path relative to the base url.
1005
 
        These should only be downwards relative, not upwards.
1006
 
        """
1007
 
        base = self.get_readonly_server().get_url()
1008
 
        if relpath is not None:
1009
 
            if not base.endswith('/'):
1010
 
                base = base + '/'
1011
 
            base = base + relpath
1012
 
        return base
1013
 
 
1014
 
    def get_readonly_server(self):
1015
 
        """Get the server instance for the readonly transport
1016
 
 
1017
 
        This is useful for some tests with specific servers to do diagnostics.
1018
 
        """
1019
 
        if self.__readonly_server is None:
1020
 
            if self.transport_readonly_server is None:
1021
 
                # readonly decorator requested
1022
 
                # bring up the server
1023
 
                self.get_url()
1024
 
                self.__readonly_server = ReadonlyServer()
1025
 
                self.__readonly_server.setUp(self.__server)
1026
 
            else:
1027
 
                self.__readonly_server = self.transport_readonly_server()
1028
 
                self.__readonly_server.setUp()
1029
 
            self.addCleanup(self.__readonly_server.tearDown)
1030
 
        return self.__readonly_server
1031
 
 
1032
 
    def get_server(self):
1033
 
        """Get the read/write server instance.
1034
 
 
1035
 
        This is useful for some tests with specific servers that need
1036
 
        diagnostics.
1037
 
        """
1038
 
        if self.__server is None:
1039
 
            self.__server = self.transport_server()
1040
 
            self.__server.setUp()
1041
 
            self.addCleanup(self.__server.tearDown)
1042
 
        return self.__server
1043
 
 
1044
 
    def get_url(self, relpath=None):
1045
 
        """Get a URL for the readwrite transport.
1046
 
 
1047
 
        This will either be backed by '.' or to an equivalent non-file based
1048
 
        facility.
1049
 
        relpath provides for clients to get a path relative to the base url.
1050
 
        These should only be downwards relative, not upwards.
1051
 
        """
1052
 
        base = self.get_server().get_url()
1053
 
        if relpath is not None and relpath != '.':
1054
 
            if not base.endswith('/'):
1055
 
                base = base + '/'
1056
 
            base = base + urlutils.escape(relpath)
1057
 
        return base
1058
 
 
1059
 
    def get_transport(self):
1060
 
        """Return a writeable transport for the test scratch space"""
1061
 
        t = get_transport(self.get_url())
1062
 
        self.assertFalse(t.is_readonly())
1063
 
        return t
1064
 
 
1065
 
    def get_readonly_transport(self):
1066
 
        """Return a readonly transport for the test scratch space
1067
 
        
1068
 
        This can be used to test that operations which should only need
1069
 
        readonly access in fact do not try to write.
1070
 
        """
1071
 
        t = get_transport(self.get_readonly_url())
1072
 
        self.assertTrue(t.is_readonly())
1073
 
        return t
1074
 
 
1075
 
    def make_branch(self, relpath, format=None):
1076
 
        """Create a branch on the transport at relpath."""
1077
 
        repo = self.make_repository(relpath, format=format)
1078
 
        return repo.bzrdir.create_branch()
1079
 
 
1080
 
    def make_bzrdir(self, relpath, format=None):
1081
 
        try:
1082
 
            url = self.get_url(relpath)
1083
 
            mutter('relpath %r => url %r', relpath, url)
1084
 
            segments = url.split('/')
1085
 
            if segments and segments[-1] not in ('', '.'):
1086
 
                parent = '/'.join(segments[:-1])
1087
 
                t = get_transport(parent)
1088
 
                try:
1089
 
                    t.mkdir(segments[-1])
1090
 
                except errors.FileExists:
1091
 
                    pass
1092
 
            if format is None:
1093
 
                format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1094
 
            # FIXME: make this use a single transport someday. RBC 20060418
1095
 
            return format.initialize_on_transport(get_transport(relpath))
1096
 
        except errors.UninitializableFormat:
1097
 
            raise TestSkipped("Format %s is not initializable." % format)
1098
 
 
1099
 
    def make_repository(self, relpath, shared=False, format=None):
1100
 
        """Create a repository on our default transport at relpath."""
1101
 
        made_control = self.make_bzrdir(relpath, format=format)
1102
 
        return made_control.create_repository(shared=shared)
1103
 
 
1104
 
    def make_branch_and_tree(self, relpath, format=None):
1105
 
        """Create a branch on the transport and a tree locally.
1106
 
 
1107
 
        Returns the tree.
1108
 
        """
1109
 
        # TODO: always use the local disk path for the working tree,
1110
 
        # this obviously requires a format that supports branch references
1111
 
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1112
 
        # RBC 20060208
1113
 
        b = self.make_branch(relpath, format=format)
1114
 
        try:
1115
 
            return b.bzrdir.create_workingtree()
1116
 
        except errors.NotLocalUrl:
1117
 
            # new formats - catch No tree error and create
1118
 
            # a branch reference and a checkout.
1119
 
            # old formats at that point - raise TestSkipped.
1120
 
            # TODO: rbc 20060208
1121
 
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1122
 
 
1123
 
    def assertIsDirectory(self, relpath, transport):
1124
 
        """Assert that relpath within transport is a directory.
1125
 
 
1126
 
        This may not be possible on all transports; in that case it propagates
1127
 
        a TransportNotPossible.
1128
 
        """
1129
 
        try:
1130
 
            mode = transport.stat(relpath).st_mode
1131
 
        except errors.NoSuchFile:
1132
 
            self.fail("path %s is not a directory; no such file"
1133
 
                      % (relpath))
1134
 
        if not stat.S_ISDIR(mode):
1135
 
            self.fail("path %s is not a directory; has mode %#o"
1136
 
                      % (relpath, mode))
1137
 
 
1138
 
 
1139
 
class ChrootedTestCase(TestCaseWithTransport):
1140
 
    """A support class that provides readonly urls outside the local namespace.
1141
 
 
1142
 
    This is done by checking if self.transport_server is a MemoryServer. if it
1143
 
    is then we are chrooted already, if it is not then an HttpServer is used
1144
 
    for readonly urls.
1145
 
 
1146
 
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1147
 
                       be used without needed to redo it when a different 
1148
 
                       subclass is in use ?
1149
 
    """
1150
 
 
1151
 
    def setUp(self):
1152
 
        super(ChrootedTestCase, self).setUp()
1153
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1154
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
385
        self.failUnless(os.path.exists(path))
 
386
        
 
387
 
 
388
class MetaTestLog(TestCase):
 
389
    def test_logging(self):
 
390
        """Test logs are captured when a test fails."""
 
391
        logging.info('an info message')
 
392
        warning('something looks dodgy...')
 
393
        logging.debug('hello, test is running')
 
394
        ##assert 0
1155
395
 
1156
396
 
1157
397
def filter_suite_by_re(suite, pattern):
1158
398
    result = TestUtil.TestSuite()
1159
399
    filter_re = re.compile(pattern)
1160
400
    for test in iter_suite_tests(suite):
1161
 
        if filter_re.search(test.id()):
 
401
        if filter_re.match(test.id()):
1162
402
            result.addTest(test)
1163
403
    return result
1164
404
 
1165
405
 
1166
 
def run_suite(suite, name='test', verbose=False, pattern=".*",
1167
 
              stop_on_failure=False, keep_output=False,
1168
 
              transport=None, lsprof_timed=None):
 
406
def filter_suite_by_names(suite, wanted_names):
 
407
    """Return a new suite containing only selected tests.
 
408
    
 
409
    Names are considered to match if any name is a substring of the 
 
410
    fully-qualified test id (i.e. the class ."""
 
411
    result = TestSuite()
 
412
    for test in iter_suite_tests(suite):
 
413
        this_id = test.id()
 
414
        for p in wanted_names:
 
415
            if this_id.find(p) != -1:
 
416
                result.addTest(test)
 
417
    return result
 
418
 
 
419
 
 
420
def run_suite(suite, name='test', verbose=False, pattern=".*", testnames=None):
1169
421
    TestCaseInTempDir._TEST_NAME = name
1170
 
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1171
422
    if verbose:
1172
423
        verbosity = 2
1173
 
        pb = None
1174
424
    else:
1175
425
        verbosity = 1
1176
 
        pb = progress.ProgressBar()
1177
426
    runner = TextTestRunner(stream=sys.stdout,
1178
427
                            descriptions=0,
1179
 
                            verbosity=verbosity,
1180
 
                            keep_output=keep_output,
1181
 
                            pb=pb)
1182
 
    runner.stop_on_failure=stop_on_failure
 
428
                            verbosity=verbosity)
 
429
    if testnames:
 
430
        suite = filter_suite_by_names(suite, testnames)
1183
431
    if pattern != '.*':
1184
432
        suite = filter_suite_by_re(suite, pattern)
1185
433
    result = runner.run(suite)
 
434
    # This is still a little bogus, 
 
435
    # but only a little. Folk not using our testrunner will
 
436
    # have to delete their temp directories themselves.
 
437
    if result.wasSuccessful():
 
438
        if TestCaseInTempDir.TEST_ROOT is not None:
 
439
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
 
440
    else:
 
441
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
1186
442
    return result.wasSuccessful()
1187
443
 
1188
444
 
1189
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1190
 
             keep_output=False,
1191
 
             transport=None,
1192
 
             test_suite_factory=None,
1193
 
             lsprof_timed=None):
 
445
def selftest(verbose=False, pattern=".*", testnames=None):
1194
446
    """Run the whole test suite under the enhanced runner"""
1195
 
    global default_transport
1196
 
    if transport is None:
1197
 
        transport = default_transport
1198
 
    old_transport = default_transport
1199
 
    default_transport = transport
1200
 
    try:
1201
 
        if test_suite_factory is None:
1202
 
            suite = test_suite()
1203
 
        else:
1204
 
            suite = test_suite_factory()
1205
 
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1206
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
1207
 
                     transport=transport,
1208
 
                     lsprof_timed=lsprof_timed)
1209
 
    finally:
1210
 
        default_transport = old_transport
 
447
    return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
 
448
                     testnames=testnames)
1211
449
 
1212
450
 
1213
451
def test_suite():
1214
 
    """Build and return TestSuite for the whole of bzrlib.
1215
 
    
1216
 
    This function can be replaced if you need to change the default test
1217
 
    suite on a global basis, but it is not encouraged.
1218
 
    """
1219
 
    testmod_names = [
1220
 
                   'bzrlib.tests.test_ancestry',
1221
 
                   'bzrlib.tests.test_api',
1222
 
                   'bzrlib.tests.test_bad_files',
1223
 
                   'bzrlib.tests.test_branch',
1224
 
                   'bzrlib.tests.test_bundle',
1225
 
                   'bzrlib.tests.test_bzrdir',
1226
 
                   'bzrlib.tests.test_command',
1227
 
                   'bzrlib.tests.test_commit',
1228
 
                   'bzrlib.tests.test_commit_merge',
1229
 
                   'bzrlib.tests.test_config',
1230
 
                   'bzrlib.tests.test_conflicts',
1231
 
                   'bzrlib.tests.test_decorators',
1232
 
                   'bzrlib.tests.test_diff',
1233
 
                   'bzrlib.tests.test_doc_generate',
1234
 
                   'bzrlib.tests.test_emptytree',
1235
 
                   'bzrlib.tests.test_errors',
1236
 
                   'bzrlib.tests.test_escaped_store',
1237
 
                   'bzrlib.tests.test_fetch',
1238
 
                   'bzrlib.tests.test_gpg',
1239
 
                   'bzrlib.tests.test_graph',
1240
 
                   'bzrlib.tests.test_hashcache',
1241
 
                   'bzrlib.tests.test_http',
1242
 
                   'bzrlib.tests.test_identitymap',
1243
 
                   'bzrlib.tests.test_inv',
1244
 
                   'bzrlib.tests.test_knit',
1245
 
                   'bzrlib.tests.test_lockdir',
1246
 
                   'bzrlib.tests.test_lockable_files',
1247
 
                   'bzrlib.tests.test_log',
1248
 
                   'bzrlib.tests.test_merge',
1249
 
                   'bzrlib.tests.test_merge3',
1250
 
                   'bzrlib.tests.test_merge_core',
1251
 
                   'bzrlib.tests.test_missing',
1252
 
                   'bzrlib.tests.test_msgeditor',
1253
 
                   'bzrlib.tests.test_nonascii',
1254
 
                   'bzrlib.tests.test_options',
1255
 
                   'bzrlib.tests.test_osutils',
1256
 
                   'bzrlib.tests.test_patch',
1257
 
                   'bzrlib.tests.test_patches',
1258
 
                   'bzrlib.tests.test_permissions',
1259
 
                   'bzrlib.tests.test_plugins',
1260
 
                   'bzrlib.tests.test_progress',
1261
 
                   'bzrlib.tests.test_reconcile',
1262
 
                   'bzrlib.tests.test_repository',
1263
 
                   'bzrlib.tests.test_revision',
1264
 
                   'bzrlib.tests.test_revisionnamespaces',
1265
 
                   'bzrlib.tests.test_revprops',
1266
 
                   'bzrlib.tests.test_revisiontree',
1267
 
                   'bzrlib.tests.test_rio',
1268
 
                   'bzrlib.tests.test_sampler',
1269
 
                   'bzrlib.tests.test_selftest',
1270
 
                   'bzrlib.tests.test_setup',
1271
 
                   'bzrlib.tests.test_sftp_transport',
1272
 
                   'bzrlib.tests.test_smart_add',
1273
 
                   'bzrlib.tests.test_source',
1274
 
                   'bzrlib.tests.test_status',
1275
 
                   'bzrlib.tests.test_store',
1276
 
                   'bzrlib.tests.test_symbol_versioning',
1277
 
                   'bzrlib.tests.test_testament',
1278
 
                   'bzrlib.tests.test_textfile',
1279
 
                   'bzrlib.tests.test_textmerge',
1280
 
                   'bzrlib.tests.test_trace',
1281
 
                   'bzrlib.tests.test_transactions',
1282
 
                   'bzrlib.tests.test_transform',
1283
 
                   'bzrlib.tests.test_transport',
1284
 
                   'bzrlib.tests.test_tsort',
1285
 
                   'bzrlib.tests.test_tuned_gzip',
1286
 
                   'bzrlib.tests.test_ui',
1287
 
                   'bzrlib.tests.test_upgrade',
1288
 
                   'bzrlib.tests.test_urlutils',
1289
 
                   'bzrlib.tests.test_versionedfile',
1290
 
                   'bzrlib.tests.test_weave',
1291
 
                   'bzrlib.tests.test_whitebox',
1292
 
                   'bzrlib.tests.test_workingtree',
1293
 
                   'bzrlib.tests.test_xml',
 
452
    """Build and return TestSuite for the whole program."""
 
453
    import bzrlib.store, bzrlib.inventory, bzrlib.branch
 
454
    import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
 
455
    from doctest import DocTestSuite
 
456
 
 
457
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
 
458
 
 
459
    testmod_names = \
 
460
                  ['bzrlib.selftest.MetaTestLog',
 
461
                   'bzrlib.selftest.testinv',
 
462
                   'bzrlib.selftest.test_ancestry',
 
463
                   'bzrlib.selftest.test_commit',
 
464
                   'bzrlib.selftest.test_commit_merge',
 
465
                   'bzrlib.selftest.versioning',
 
466
                   'bzrlib.selftest.testmerge3',
 
467
                   'bzrlib.selftest.testmerge',
 
468
                   'bzrlib.selftest.testhashcache',
 
469
                   'bzrlib.selftest.teststatus',
 
470
                   'bzrlib.selftest.testlog',
 
471
                   'bzrlib.selftest.testrevisionnamespaces',
 
472
                   'bzrlib.selftest.testbranch',
 
473
                   'bzrlib.selftest.testrevision',
 
474
                   'bzrlib.selftest.test_revision_info',
 
475
                   'bzrlib.selftest.test_merge_core',
 
476
                   'bzrlib.selftest.test_smart_add',
 
477
                   'bzrlib.selftest.test_bad_files',
 
478
                   'bzrlib.selftest.testdiff',
 
479
                   'bzrlib.selftest.test_parent',
 
480
                   'bzrlib.selftest.test_xml',
 
481
                   'bzrlib.selftest.test_weave',
 
482
                   'bzrlib.selftest.testfetch',
 
483
                   'bzrlib.selftest.whitebox',
 
484
                   'bzrlib.selftest.teststore',
 
485
                   'bzrlib.selftest.blackbox',
 
486
                   'bzrlib.selftest.testtransport',
 
487
                   'bzrlib.selftest.testgraph',
 
488
                   'bzrlib.selftest.testworkingtree',
 
489
                   'bzrlib.selftest.test_upgrade',
1294
490
                   ]
1295
 
    test_transport_implementations = [
1296
 
        'bzrlib.tests.test_transport_implementations',
1297
 
        'bzrlib.tests.test_read_bundle',
1298
 
        ]
1299
 
    suite = TestUtil.TestSuite()
1300
 
    loader = TestUtil.TestLoader()
1301
 
    from bzrlib.transport import TransportTestProviderAdapter
1302
 
    adapter = TransportTestProviderAdapter()
1303
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
1304
 
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1305
 
    for package in packages_to_test():
1306
 
        suite.addTest(package.test_suite())
 
491
 
 
492
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
 
493
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
 
494
        if m not in MODULES_TO_DOCTEST:
 
495
            MODULES_TO_DOCTEST.append(m)
 
496
 
 
497
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
 
498
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
 
499
    print
 
500
    suite = TestSuite()
 
501
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
1307
502
    for m in MODULES_TO_TEST:
1308
 
        suite.addTest(loader.loadTestsFromModule(m))
1309
 
    for m in MODULES_TO_DOCTEST:
1310
 
        suite.addTest(doctest.DocTestSuite(m))
1311
 
    for name, plugin in bzrlib.plugin.all_plugins().items():
1312
 
        if getattr(plugin, 'test_suite', None) is not None:
1313
 
            suite.addTest(plugin.test_suite())
 
503
         suite.addTest(TestLoader().loadTestsFromModule(m))
 
504
    for m in (MODULES_TO_DOCTEST):
 
505
        suite.addTest(DocTestSuite(m))
 
506
    for p in bzrlib.plugin.all_plugins:
 
507
        if hasattr(p, 'test_suite'):
 
508
            suite.addTest(p.test_suite())
1314
509
    return suite
1315
510
 
1316
 
 
1317
 
def adapt_modules(mods_list, adapter, loader, suite):
1318
 
    """Adapt the modules in mods_list using adapter and add to suite."""
1319
 
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1320
 
        suite.addTests(adapter.adapt(test))