~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2006-11-02 10:20:19 UTC
  • mfrom: (2114 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2119.
  • Revision ID: mbp@sourcefrog.net-20061102102019-9a5a02f485dff6f6
merge bzr.dev and reconcile several changes, also some test fixes

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
18
30
from cStringIO import StringIO
19
31
import difflib
 
32
import doctest
20
33
import errno
21
34
import logging
22
35
import os
23
36
import re
24
 
import shutil
 
37
import shlex
 
38
import stat
 
39
from subprocess import Popen, PIPE
25
40
import sys
26
41
import tempfile
27
42
import unittest
28
43
import time
29
44
 
 
45
 
 
46
from bzrlib import memorytree
 
47
import bzrlib.branch
 
48
import bzrlib.bzrdir as bzrdir
30
49
import bzrlib.commands
 
50
import bzrlib.bundle.serializer
 
51
import bzrlib.errors as errors
 
52
import bzrlib.export
 
53
import bzrlib.inventory
 
54
import bzrlib.iterablefile
 
55
import bzrlib.lockdir
 
56
try:
 
57
    import bzrlib.lsprof
 
58
except ImportError:
 
59
    # lsprof not available
 
60
    pass
 
61
from bzrlib.merge import merge_inner
 
62
import bzrlib.merge3
 
63
import bzrlib.osutils
 
64
import bzrlib.osutils as osutils
 
65
import bzrlib.plugin
 
66
import bzrlib.progress as progress
 
67
from bzrlib.revision import common_ancestor
 
68
import bzrlib.store
 
69
from bzrlib import symbol_versioning
31
70
import bzrlib.trace
32
 
import bzrlib.fetch
33
 
import bzrlib.osutils as osutils
34
 
from bzrlib.selftest import TestUtil
35
 
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
36
 
from bzrlib.selftest.treeshape import build_tree_contents
 
71
from bzrlib.transport import get_transport
 
72
import bzrlib.transport
 
73
from bzrlib.transport.local import LocalURLServer
 
74
from bzrlib.transport.memory import MemoryServer
 
75
from bzrlib.transport.readonly import ReadonlyServer
 
76
from bzrlib.trace import mutter, note
 
77
from bzrlib.tests import TestUtil
 
78
from bzrlib.tests.TestUtil import (
 
79
                          TestSuite,
 
80
                          TestLoader,
 
81
                          )
 
82
from bzrlib.tests.treeshape import build_tree_contents
 
83
import bzrlib.urlutils as urlutils
 
84
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
85
 
 
86
default_transport = LocalURLServer
37
87
 
38
88
MODULES_TO_TEST = []
39
 
MODULES_TO_DOCTEST = []
40
 
 
41
 
from logging import debug, warning, error
42
 
 
43
 
 
44
 
 
45
 
class EarlyStoppingTestResultAdapter(object):
46
 
    """An adapter for TestResult to stop at the first first failure or error"""
47
 
 
48
 
    def __init__(self, result):
49
 
        self._result = result
50
 
 
51
 
    def addError(self, test, err):
52
 
        self._result.addError(test, err)
53
 
        self._result.stop()
54
 
 
55
 
    def addFailure(self, test, err):
56
 
        self._result.addFailure(test, err)
57
 
        self._result.stop()
58
 
 
59
 
    def __getattr__(self, name):
60
 
        return getattr(self._result, name)
61
 
 
62
 
    def __setattr__(self, name, value):
63
 
        if name == '_result':
64
 
            object.__setattr__(self, name, value)
65
 
        return setattr(self._result, name, value)
66
 
 
67
 
 
68
 
class _MyResult(unittest._TextTestResult):
69
 
    """
70
 
    Custom TestResult.
71
 
 
72
 
    No special behaviour for now.
73
 
    """
74
 
 
75
 
    def _elapsedTime(self):
76
 
        return "(Took %.3fs)" % (time.time() - self._start_time)
 
89
MODULES_TO_DOCTEST = [
 
90
                      bzrlib.bundle.serializer,
 
91
                      bzrlib.errors,
 
92
                      bzrlib.export,
 
93
                      bzrlib.inventory,
 
94
                      bzrlib.iterablefile,
 
95
                      bzrlib.lockdir,
 
96
                      bzrlib.merge3,
 
97
                      bzrlib.option,
 
98
                      bzrlib.store,
 
99
                      ]
 
100
 
 
101
 
 
102
def packages_to_test():
 
103
    """Return a list of packages to test.
 
104
 
 
105
    The packages are not globally imported so that import failures are
 
106
    triggered when running selftest, not when importing the command.
 
107
    """
 
108
    import bzrlib.doc
 
109
    import bzrlib.tests.blackbox
 
110
    import bzrlib.tests.branch_implementations
 
111
    import bzrlib.tests.bzrdir_implementations
 
112
    import bzrlib.tests.interrepository_implementations
 
113
    import bzrlib.tests.interversionedfile_implementations
 
114
    import bzrlib.tests.intertree_implementations
 
115
    import bzrlib.tests.repository_implementations
 
116
    import bzrlib.tests.revisionstore_implementations
 
117
    import bzrlib.tests.tree_implementations
 
118
    import bzrlib.tests.workingtree_implementations
 
119
    return [
 
120
            bzrlib.doc,
 
121
            bzrlib.tests.blackbox,
 
122
            bzrlib.tests.branch_implementations,
 
123
            bzrlib.tests.bzrdir_implementations,
 
124
            bzrlib.tests.interrepository_implementations,
 
125
            bzrlib.tests.interversionedfile_implementations,
 
126
            bzrlib.tests.intertree_implementations,
 
127
            bzrlib.tests.repository_implementations,
 
128
            bzrlib.tests.revisionstore_implementations,
 
129
            bzrlib.tests.tree_implementations,
 
130
            bzrlib.tests.workingtree_implementations,
 
131
            ]
 
132
 
 
133
 
 
134
class ExtendedTestResult(unittest._TextTestResult):
 
135
    """Accepts, reports and accumulates the results of running tests.
 
136
 
 
137
    Compared to this unittest version this class adds support for profiling,
 
138
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
139
    There are further-specialized subclasses for different types of display.
 
140
    """
 
141
 
 
142
    stop_early = False
 
143
    
 
144
    def __init__(self, stream, descriptions, verbosity,
 
145
                 bench_history=None,
 
146
                 num_tests=None,
 
147
                 ):
 
148
        """Construct new TestResult.
 
149
 
 
150
        :param bench_history: Optionally, a writable file object to accumulate
 
151
            benchmark results.
 
152
        """
 
153
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
154
        if bench_history is not None:
 
155
            from bzrlib.version import _get_bzr_source_tree
 
156
            src_tree = _get_bzr_source_tree()
 
157
            if src_tree:
 
158
                try:
 
159
                    revision_id = src_tree.get_parent_ids()[0]
 
160
                except IndexError:
 
161
                    # XXX: if this is a brand new tree, do the same as if there
 
162
                    # is no branch.
 
163
                    revision_id = ''
 
164
            else:
 
165
                # XXX: If there's no branch, what should we do?
 
166
                revision_id = ''
 
167
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
168
        self._bench_history = bench_history
 
169
        self.ui = bzrlib.ui.ui_factory
 
170
        self.num_tests = num_tests
 
171
        self.error_count = 0
 
172
        self.failure_count = 0
 
173
        self.skip_count = 0
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
 
176
    
 
177
    def extractBenchmarkTime(self, testCase):
 
178
        """Add a benchmark time for the current test case."""
 
179
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
180
    
 
181
    def _elapsedTestTimeString(self):
 
182
        """Return a time string for the overall time the current test has taken."""
 
183
        return self._formatTime(time.time() - self._start_time)
 
184
 
 
185
    def _testTimeString(self):
 
186
        if self._benchmarkTime is not None:
 
187
            return "%s/%s" % (
 
188
                self._formatTime(self._benchmarkTime),
 
189
                self._elapsedTestTimeString())
 
190
        else:
 
191
            return "      %s" % self._elapsedTestTimeString()
 
192
 
 
193
    def _formatTime(self, seconds):
 
194
        """Format seconds as milliseconds with leading spaces."""
 
195
        return "%5dms" % (1000 * seconds)
 
196
 
 
197
    def _shortened_test_description(self, test):
 
198
        what = test.id()
 
199
        what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
 
200
        return what
77
201
 
78
202
    def startTest(self, test):
79
203
        unittest.TestResult.startTest(self, test)
80
 
        # TODO: Maybe show test.shortDescription somewhere?
81
 
        what = test.shortDescription() or test.id()        
82
 
        if self.showAll:
83
 
            self.stream.write('%-70.70s' % what)
84
 
        self.stream.flush()
 
204
        self.report_test_start(test)
 
205
        self._recordTestStartTime()
 
206
 
 
207
    def _recordTestStartTime(self):
 
208
        """Record that a test has started."""
85
209
        self._start_time = time.time()
86
210
 
87
211
    def addError(self, test, err):
 
212
        if isinstance(err[1], TestSkipped):
 
213
            return self.addSkipped(test, err)    
88
214
        unittest.TestResult.addError(self, test, err)
89
 
        if self.showAll:
90
 
            self.stream.writeln("ERROR %s" % self._elapsedTime())
91
 
        elif self.dots:
92
 
            self.stream.write('E')
93
 
        self.stream.flush()
 
215
        # We can only do this if we have one of our TestCases, not if
 
216
        # we have a doctest.
 
217
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
218
        if setKeepLogfile is not None:
 
219
            setKeepLogfile()
 
220
        self.extractBenchmarkTime(test)
 
221
        self.report_error(test, err)
 
222
        if self.stop_early:
 
223
            self.stop()
94
224
 
95
225
    def addFailure(self, test, err):
96
226
        unittest.TestResult.addFailure(self, test, err)
97
 
        if self.showAll:
98
 
            self.stream.writeln("FAIL %s" % self._elapsedTime())
99
 
        elif self.dots:
100
 
            self.stream.write('F')
101
 
        self.stream.flush()
 
227
        # We can only do this if we have one of our TestCases, not if
 
228
        # we have a doctest.
 
229
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
230
        if setKeepLogfile is not None:
 
231
            setKeepLogfile()
 
232
        self.extractBenchmarkTime(test)
 
233
        self.report_failure(test, err)
 
234
        if self.stop_early:
 
235
            self.stop()
102
236
 
103
237
    def addSuccess(self, test):
104
 
        if self.showAll:
105
 
            self.stream.writeln('OK %s' % self._elapsedTime())
106
 
        elif self.dots:
107
 
            self.stream.write('~')
108
 
        self.stream.flush()
 
238
        self.extractBenchmarkTime(test)
 
239
        if self._bench_history is not None:
 
240
            if self._benchmarkTime is not None:
 
241
                self._bench_history.write("%s %s\n" % (
 
242
                    self._formatTime(self._benchmarkTime),
 
243
                    test.id()))
 
244
        self.report_success(test)
109
245
        unittest.TestResult.addSuccess(self, test)
110
246
 
 
247
    def addSkipped(self, test, skip_excinfo):
 
248
        self.extractBenchmarkTime(test)
 
249
        self.report_skip(test, skip_excinfo)
 
250
        # seems best to treat this as success from point-of-view of unittest
 
251
        # -- it actually does nothing so it barely matters :)
 
252
        try:
 
253
            test.tearDown()
 
254
        except KeyboardInterrupt:
 
255
            raise
 
256
        except:
 
257
            self.addError(test, test.__exc_info())
 
258
        else:
 
259
            unittest.TestResult.addSuccess(self, test)
 
260
 
111
261
    def printErrorList(self, flavour, errors):
112
262
        for test, err in errors:
113
263
            self.stream.writeln(self.separator1)
114
 
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
115
 
            if hasattr(test, '_get_log'):
116
 
                self.stream.writeln()
117
 
                self.stream.writeln('log from this test:')
 
264
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
265
            if getattr(test, '_get_log', None) is not None:
 
266
                print >>self.stream
 
267
                print >>self.stream, \
 
268
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
118
269
                print >>self.stream, test._get_log()
 
270
                print >>self.stream, \
 
271
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
119
272
            self.stream.writeln(self.separator2)
120
273
            self.stream.writeln("%s" % err)
121
274
 
122
 
 
123
 
class TextTestRunner(unittest.TextTestRunner):
 
275
    def finished(self):
 
276
        pass
 
277
 
 
278
    def report_cleaning_up(self):
 
279
        pass
 
280
 
 
281
    def report_success(self, test):
 
282
        pass
 
283
 
 
284
 
 
285
class TextTestResult(ExtendedTestResult):
 
286
    """Displays progress and results of tests in text form"""
 
287
 
 
288
    def __init__(self, *args, **kw):
 
289
        ExtendedTestResult.__init__(self, *args, **kw)
 
290
        self.pb = self.ui.nested_progress_bar()
 
291
        self.pb.show_pct = False
 
292
        self.pb.show_spinner = False
 
293
        self.pb.show_eta = False, 
 
294
        self.pb.show_count = False
 
295
        self.pb.show_bar = False
 
296
 
 
297
    def report_starting(self):
 
298
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
299
 
 
300
    def _progress_prefix_text(self):
 
301
        a = '[%d' % self.count
 
302
        if self.num_tests is not None:
 
303
            a +='/%d' % self.num_tests
 
304
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
305
        if self.error_count:
 
306
            a += ', %d errors' % self.error_count
 
307
        if self.failure_count:
 
308
            a += ', %d failed' % self.failure_count
 
309
        if self.skip_count:
 
310
            a += ', %d skipped' % self.skip_count
 
311
        a += ']'
 
312
        return a
 
313
 
 
314
    def report_test_start(self, test):
 
315
        self.count += 1
 
316
        self.pb.update(
 
317
                self._progress_prefix_text()
 
318
                + ' ' 
 
319
                + self._shortened_test_description(test))
 
320
 
 
321
    def report_error(self, test, err):
 
322
        self.error_count += 1
 
323
        self.pb.note('ERROR: %s\n    %s\n' % (
 
324
            self._shortened_test_description(test),
 
325
            err[1],
 
326
            ))
 
327
 
 
328
    def report_failure(self, test, err):
 
329
        self.failure_count += 1
 
330
        self.pb.note('FAIL: %s\n    %s\n' % (
 
331
            self._shortened_test_description(test),
 
332
            err[1],
 
333
            ))
 
334
 
 
335
    def report_skip(self, test, skip_excinfo):
 
336
        self.skip_count += 1
 
337
        if False:
 
338
            # at the moment these are mostly not things we can fix
 
339
            # and so they just produce stipple; use the verbose reporter
 
340
            # to see them.
 
341
            if False:
 
342
                # show test and reason for skip
 
343
                self.pb.note('SKIP: %s\n    %s\n' % (
 
344
                    self._shortened_test_description(test),
 
345
                    skip_excinfo[1]))
 
346
            else:
 
347
                # since the class name was left behind in the still-visible
 
348
                # progress bar...
 
349
                self.pb.note('SKIP: %s' % (skip_excinfo[1]))
 
350
 
 
351
    def report_cleaning_up(self):
 
352
        self.pb.update('cleaning up...')
 
353
 
 
354
    def finished(self):
 
355
        self.pb.finished()
 
356
 
 
357
 
 
358
class VerboseTestResult(ExtendedTestResult):
 
359
    """Produce long output, with one line per test run plus times"""
 
360
 
 
361
    def _ellipsize_to_right(self, a_string, final_width):
 
362
        """Truncate and pad a string, keeping the right hand side"""
 
363
        if len(a_string) > final_width:
 
364
            result = '...' + a_string[3-final_width:]
 
365
        else:
 
366
            result = a_string
 
367
        return result.ljust(final_width)
 
368
 
 
369
    def report_starting(self):
 
370
        self.stream.write('running %d tests...\n' % self.num_tests)
 
371
 
 
372
    def report_test_start(self, test):
 
373
        self.count += 1
 
374
        name = self._shortened_test_description(test)
 
375
        self.stream.write(self._ellipsize_to_right(name, 60))
 
376
        self.stream.flush()
 
377
 
 
378
    def report_error(self, test, err):
 
379
        self.error_count += 1
 
380
        self.stream.writeln('ERROR %s\n    %s' 
 
381
                % (self._testTimeString(), err[1]))
 
382
 
 
383
    def report_failure(self, test, err):
 
384
        self.failure_count += 1
 
385
        self.stream.writeln('FAIL %s\n    %s'
 
386
                % (self._testTimeString(), err[1]))
 
387
 
 
388
    def report_success(self, test):
 
389
        self.stream.writeln('   OK %s' % self._testTimeString())
 
390
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
391
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
392
            stats.pprint(file=self.stream)
 
393
        self.stream.flush()
 
394
 
 
395
    def report_skip(self, test, skip_excinfo):
 
396
        print >>self.stream, ' SKIP %s' % self._testTimeString()
 
397
        print >>self.stream, '     %s' % skip_excinfo[1]
 
398
 
 
399
 
 
400
class TextTestRunner(object):
124
401
    stop_on_failure = False
125
402
 
126
 
    def _makeResult(self):
127
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
128
 
        if self.stop_on_failure:
129
 
            result = EarlyStoppingTestResultAdapter(result)
 
403
    def __init__(self,
 
404
                 stream=sys.stderr,
 
405
                 descriptions=0,
 
406
                 verbosity=1,
 
407
                 keep_output=False,
 
408
                 bench_history=None):
 
409
        self.stream = unittest._WritelnDecorator(stream)
 
410
        self.descriptions = descriptions
 
411
        self.verbosity = verbosity
 
412
        self.keep_output = keep_output
 
413
        self._bench_history = bench_history
 
414
 
 
415
    def run(self, test):
 
416
        "Run the given test case or test suite."
 
417
        startTime = time.time()
 
418
        if self.verbosity == 1:
 
419
            result_class = TextTestResult
 
420
        elif self.verbosity >= 2:
 
421
            result_class = VerboseTestResult
 
422
        result = result_class(self.stream,
 
423
                              self.descriptions,
 
424
                              self.verbosity,
 
425
                              bench_history=self._bench_history,
 
426
                              num_tests=test.countTestCases(),
 
427
                              )
 
428
        result.stop_early = self.stop_on_failure
 
429
        result.report_starting()
 
430
        test.run(result)
 
431
        stopTime = time.time()
 
432
        timeTaken = stopTime - startTime
 
433
        result.printErrors()
 
434
        self.stream.writeln(result.separator2)
 
435
        run = result.testsRun
 
436
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
437
                            (run, run != 1 and "s" or "", timeTaken))
 
438
        self.stream.writeln()
 
439
        if not result.wasSuccessful():
 
440
            self.stream.write("FAILED (")
 
441
            failed, errored = map(len, (result.failures, result.errors))
 
442
            if failed:
 
443
                self.stream.write("failures=%d" % failed)
 
444
            if errored:
 
445
                if failed: self.stream.write(", ")
 
446
                self.stream.write("errors=%d" % errored)
 
447
            self.stream.writeln(")")
 
448
        else:
 
449
            self.stream.writeln("OK")
 
450
        result.report_cleaning_up()
 
451
        # This is still a little bogus, 
 
452
        # but only a little. Folk not using our testrunner will
 
453
        # have to delete their temp directories themselves.
 
454
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
455
        if result.wasSuccessful() or not self.keep_output:
 
456
            if test_root is not None:
 
457
                # If LANG=C we probably have created some bogus paths
 
458
                # which rmtree(unicode) will fail to delete
 
459
                # so make sure we are using rmtree(str) to delete everything
 
460
                # except on win32, where rmtree(str) will fail
 
461
                # since it doesn't have the property of byte-stream paths
 
462
                # (they are either ascii or mbcs)
 
463
                if sys.platform == 'win32':
 
464
                    # make sure we are using the unicode win32 api
 
465
                    test_root = unicode(test_root)
 
466
                else:
 
467
                    test_root = test_root.encode(
 
468
                        sys.getfilesystemencoding())
 
469
                osutils.rmtree(test_root)
 
470
        else:
 
471
            note("Failed tests working directories are in '%s'\n", test_root)
 
472
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
473
        result.finished()
130
474
        return result
131
475
 
132
476
 
145
489
 
146
490
class TestSkipped(Exception):
147
491
    """Indicates that a test was intentionally skipped, rather than failing."""
148
 
    # XXX: Not used yet
149
492
 
150
493
 
151
494
class CommandFailed(Exception):
152
495
    pass
153
496
 
 
497
 
 
498
class StringIOWrapper(object):
 
499
    """A wrapper around cStringIO which just adds an encoding attribute.
 
500
    
 
501
    Internally we can check sys.stdout to see what the output encoding
 
502
    should be. However, cStringIO has no encoding attribute that we can
 
503
    set. So we wrap it instead.
 
504
    """
 
505
    encoding='ascii'
 
506
    _cstring = None
 
507
 
 
508
    def __init__(self, s=None):
 
509
        if s is not None:
 
510
            self.__dict__['_cstring'] = StringIO(s)
 
511
        else:
 
512
            self.__dict__['_cstring'] = StringIO()
 
513
 
 
514
    def __getattr__(self, name, getattr=getattr):
 
515
        return getattr(self.__dict__['_cstring'], name)
 
516
 
 
517
    def __setattr__(self, name, val):
 
518
        if name == 'encoding':
 
519
            self.__dict__['encoding'] = val
 
520
        else:
 
521
            return setattr(self._cstring, name, val)
 
522
 
 
523
 
154
524
class TestCase(unittest.TestCase):
155
525
    """Base class for bzr unit tests.
156
526
    
159
529
 
160
530
    Error and debug log messages are redirected from their usual
161
531
    location into a temporary file, the contents of which can be
162
 
    retrieved by _get_log().
 
532
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
533
    so that it can also capture file IO.  When the test completes this file
 
534
    is read into memory and removed from disk.
163
535
       
164
536
    There are also convenience functions to invoke bzr's command-line
165
 
    routine, and to build and check bzr trees."""
 
537
    routine, and to build and check bzr trees.
 
538
   
 
539
    In addition to the usual method of overriding tearDown(), this class also
 
540
    allows subclasses to register functions into the _cleanups list, which is
 
541
    run in order as the object is torn down.  It's less likely this will be
 
542
    accidentally overlooked.
 
543
    """
166
544
 
167
 
    BZRPATH = 'bzr'
168
545
    _log_file_name = None
 
546
    _log_contents = ''
 
547
    _keep_log_file = False
 
548
    # record lsprof data when performing benchmark calls.
 
549
    _gather_lsprof_in_benchmarks = False
 
550
 
 
551
    def __init__(self, methodName='testMethod'):
 
552
        super(TestCase, self).__init__(methodName)
 
553
        self._cleanups = []
169
554
 
170
555
    def setUp(self):
171
556
        unittest.TestCase.setUp(self)
172
 
        self.oldenv = os.environ.get('HOME', None)
173
 
        os.environ['HOME'] = os.getcwd()
174
 
        self.bzr_email = os.environ.get('BZREMAIL')
175
 
        if self.bzr_email is not None:
176
 
            del os.environ['BZREMAIL']
177
 
        self.email = os.environ.get('EMAIL')
178
 
        if self.email is not None:
179
 
            del os.environ['EMAIL']
 
557
        self._cleanEnvironment()
180
558
        bzrlib.trace.disable_default_logging()
181
 
        self._enable_file_logging()
 
559
        self._silenceUI()
 
560
        self._startLogFile()
 
561
        self._benchcalls = []
 
562
        self._benchtime = None
 
563
 
 
564
    def _silenceUI(self):
 
565
        """Turn off UI for duration of test"""
 
566
        # by default the UI is off; tests can turn it on if they want it.
 
567
        saved = bzrlib.ui.ui_factory
 
568
        def _restore():
 
569
            bzrlib.ui.ui_factory = saved
 
570
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
571
        self.addCleanup(_restore)
182
572
 
183
573
    def _ndiff_strings(self, a, b):
184
574
        """Return ndiff between two strings containing lines.
195
585
                                  charjunk=lambda x: False)
196
586
        return ''.join(difflines)
197
587
 
198
 
    def assertEqualDiff(self, a, b):
 
588
    def assertEqualDiff(self, a, b, message=None):
199
589
        """Assert two texts are equal, if not raise an exception.
200
590
        
201
591
        This is intended for use with multi-line strings where it can 
204
594
        # TODO: perhaps override assertEquals to call this for strings?
205
595
        if a == b:
206
596
            return
207
 
        raise AssertionError("texts not equal:\n" + 
 
597
        if message is None:
 
598
            message = "texts not equal:\n"
 
599
        raise AssertionError(message + 
208
600
                             self._ndiff_strings(a, b))      
 
601
        
 
602
    def assertEqualMode(self, mode, mode_test):
 
603
        self.assertEqual(mode, mode_test,
 
604
                         'mode mismatch %o != %o' % (mode, mode_test))
 
605
 
 
606
    def assertStartsWith(self, s, prefix):
 
607
        if not s.startswith(prefix):
 
608
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
609
 
 
610
    def assertEndsWith(self, s, suffix):
 
611
        """Asserts that s ends with suffix."""
 
612
        if not s.endswith(suffix):
 
613
            raise AssertionError('string %r does not end with %r' % (s, suffix))
209
614
 
210
615
    def assertContainsRe(self, haystack, needle_re):
211
616
        """Assert that a contains something matching a regular expression."""
213
618
            raise AssertionError('pattern "%s" not found in "%s"'
214
619
                    % (needle_re, haystack))
215
620
 
216
 
    def _enable_file_logging(self):
 
621
    def assertNotContainsRe(self, haystack, needle_re):
 
622
        """Assert that a does not match a regular expression"""
 
623
        if re.search(needle_re, haystack):
 
624
            raise AssertionError('pattern "%s" found in "%s"'
 
625
                    % (needle_re, haystack))
 
626
 
 
627
    def assertSubset(self, sublist, superlist):
 
628
        """Assert that every entry in sublist is present in superlist."""
 
629
        missing = []
 
630
        for entry in sublist:
 
631
            if entry not in superlist:
 
632
                missing.append(entry)
 
633
        if len(missing) > 0:
 
634
            raise AssertionError("value(s) %r not present in container %r" % 
 
635
                                 (missing, superlist))
 
636
 
 
637
    def assertIs(self, left, right):
 
638
        if not (left is right):
 
639
            raise AssertionError("%r is not %r." % (left, right))
 
640
 
 
641
    def assertTransportMode(self, transport, path, mode):
 
642
        """Fail if a path does not have mode mode.
 
643
        
 
644
        If modes are not supported on this transport, the assertion is ignored.
 
645
        """
 
646
        if not transport._can_roundtrip_unix_modebits():
 
647
            return
 
648
        path_stat = transport.stat(path)
 
649
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
650
        self.assertEqual(mode, actual_mode,
 
651
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
652
 
 
653
    def assertIsInstance(self, obj, kls):
 
654
        """Fail if obj is not an instance of kls"""
 
655
        if not isinstance(obj, kls):
 
656
            self.fail("%r is an instance of %s rather than %s" % (
 
657
                obj, obj.__class__, kls))
 
658
 
 
659
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
660
        """A helper for callDeprecated and applyDeprecated.
 
661
 
 
662
        :param a_callable: A callable to call.
 
663
        :param args: The positional arguments for the callable
 
664
        :param kwargs: The keyword arguments for the callable
 
665
        :return: A tuple (warnings, result). result is the result of calling
 
666
            a_callable(*args, **kwargs).
 
667
        """
 
668
        local_warnings = []
 
669
        def capture_warnings(msg, cls=None, stacklevel=None):
 
670
            # we've hooked into a deprecation specific callpath,
 
671
            # only deprecations should getting sent via it.
 
672
            self.assertEqual(cls, DeprecationWarning)
 
673
            local_warnings.append(msg)
 
674
        original_warning_method = symbol_versioning.warn
 
675
        symbol_versioning.set_warning_method(capture_warnings)
 
676
        try:
 
677
            result = a_callable(*args, **kwargs)
 
678
        finally:
 
679
            symbol_versioning.set_warning_method(original_warning_method)
 
680
        return (local_warnings, result)
 
681
 
 
682
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
683
        """Call a deprecated callable without warning the user.
 
684
 
 
685
        :param deprecation_format: The deprecation format that the callable
 
686
            should have been deprecated with. This is the same type as the 
 
687
            parameter to deprecated_method/deprecated_function. If the 
 
688
            callable is not deprecated with this format, an assertion error
 
689
            will be raised.
 
690
        :param a_callable: A callable to call. This may be a bound method or
 
691
            a regular function. It will be called with *args and **kwargs.
 
692
        :param args: The positional arguments for the callable
 
693
        :param kwargs: The keyword arguments for the callable
 
694
        :return: The result of a_callable(*args, **kwargs)
 
695
        """
 
696
        call_warnings, result = self._capture_warnings(a_callable,
 
697
            *args, **kwargs)
 
698
        expected_first_warning = symbol_versioning.deprecation_string(
 
699
            a_callable, deprecation_format)
 
700
        if len(call_warnings) == 0:
 
701
            self.fail("No assertion generated by call to %s" %
 
702
                a_callable)
 
703
        self.assertEqual(expected_first_warning, call_warnings[0])
 
704
        return result
 
705
 
 
706
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
707
        """Assert that a callable is deprecated in a particular way.
 
708
 
 
709
        This is a very precise test for unusual requirements. The 
 
710
        applyDeprecated helper function is probably more suited for most tests
 
711
        as it allows you to simply specify the deprecation format being used
 
712
        and will ensure that that is issued for the function being called.
 
713
 
 
714
        :param expected: a list of the deprecation warnings expected, in order
 
715
        :param callable: The callable to call
 
716
        :param args: The positional arguments for the callable
 
717
        :param kwargs: The keyword arguments for the callable
 
718
        """
 
719
        call_warnings, result = self._capture_warnings(callable,
 
720
            *args, **kwargs)
 
721
        self.assertEqual(expected, call_warnings)
 
722
        return result
 
723
 
 
724
    def _startLogFile(self):
 
725
        """Send bzr and test log messages to a temporary file.
 
726
 
 
727
        The file is removed as the test is torn down.
 
728
        """
217
729
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
218
 
 
219
730
        self._log_file = os.fdopen(fileno, 'w+')
220
 
 
221
 
        hdlr = logging.StreamHandler(self._log_file)
222
 
        hdlr.setLevel(logging.DEBUG)
223
 
        hdlr.setFormatter(logging.Formatter('%(levelname)8s  %(message)s'))
224
 
        logging.getLogger('').addHandler(hdlr)
225
 
        logging.getLogger('').setLevel(logging.DEBUG)
226
 
        self._log_hdlr = hdlr
227
 
        debug('opened log file %s', name)
228
 
        
 
731
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
229
732
        self._log_file_name = name
230
 
 
231
 
    def tearDown(self):
232
 
        os.environ['HOME'] = self.oldenv
233
 
        if os.environ.get('BZREMAIL') is not None:
234
 
            del os.environ['BZREMAIL']
235
 
        if self.bzr_email is not None:
236
 
            os.environ['BZREMAIL'] = self.bzr_email
237
 
        if os.environ.get('EMAIL') is not None:
238
 
            del os.environ['EMAIL']
239
 
        if self.email is not None:
240
 
            os.environ['EMAIL'] = self.email
241
 
        logging.getLogger('').removeHandler(self._log_hdlr)
242
 
        bzrlib.trace.enable_default_logging()
243
 
        logging.debug('%s teardown', self.id())
 
733
        self.addCleanup(self._finishLogFile)
 
734
 
 
735
    def _finishLogFile(self):
 
736
        """Finished with the log file.
 
737
 
 
738
        Close the file and delete it, unless setKeepLogfile was called.
 
739
        """
 
740
        if self._log_file is None:
 
741
            return
 
742
        bzrlib.trace.disable_test_log(self._log_nonce)
244
743
        self._log_file.close()
 
744
        self._log_file = None
 
745
        if not self._keep_log_file:
 
746
            os.remove(self._log_file_name)
 
747
            self._log_file_name = None
 
748
 
 
749
    def setKeepLogfile(self):
 
750
        """Make the logfile not be deleted when _finishLogFile is called."""
 
751
        self._keep_log_file = True
 
752
 
 
753
    def addCleanup(self, callable):
 
754
        """Arrange to run a callable when this case is torn down.
 
755
 
 
756
        Callables are run in the reverse of the order they are registered, 
 
757
        ie last-in first-out.
 
758
        """
 
759
        if callable in self._cleanups:
 
760
            raise ValueError("cleanup function %r already registered on %s" 
 
761
                    % (callable, self))
 
762
        self._cleanups.append(callable)
 
763
 
 
764
    def _cleanEnvironment(self):
 
765
        new_env = {
 
766
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
767
            'HOME': os.getcwd(),
 
768
            'APPDATA': os.getcwd(),
 
769
            'BZR_EMAIL': None,
 
770
            'BZREMAIL': None, # may still be present in the environment
 
771
            'EMAIL': None,
 
772
            'BZR_PROGRESS_BAR': None,
 
773
        }
 
774
        self.__old_env = {}
 
775
        self.addCleanup(self._restoreEnvironment)
 
776
        for name, value in new_env.iteritems():
 
777
            self._captureVar(name, value)
 
778
 
 
779
    def _captureVar(self, name, newvalue):
 
780
        """Set an environment variable, and reset it when finished."""
 
781
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
782
 
 
783
    def _restoreEnvironment(self):
 
784
        for name, value in self.__old_env.iteritems():
 
785
            osutils.set_or_unset_env(name, value)
 
786
 
 
787
    def tearDown(self):
 
788
        self._runCleanups()
245
789
        unittest.TestCase.tearDown(self)
246
790
 
 
791
    def time(self, callable, *args, **kwargs):
 
792
        """Run callable and accrue the time it takes to the benchmark time.
 
793
        
 
794
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
795
        this will cause lsprofile statistics to be gathered and stored in
 
796
        self._benchcalls.
 
797
        """
 
798
        if self._benchtime is None:
 
799
            self._benchtime = 0
 
800
        start = time.time()
 
801
        try:
 
802
            if not self._gather_lsprof_in_benchmarks:
 
803
                return callable(*args, **kwargs)
 
804
            else:
 
805
                # record this benchmark
 
806
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
807
                stats.sort()
 
808
                self._benchcalls.append(((callable, args, kwargs), stats))
 
809
                return ret
 
810
        finally:
 
811
            self._benchtime += time.time() - start
 
812
 
 
813
    def _runCleanups(self):
 
814
        """Run registered cleanup functions. 
 
815
 
 
816
        This should only be called from TestCase.tearDown.
 
817
        """
 
818
        # TODO: Perhaps this should keep running cleanups even if 
 
819
        # one of them fails?
 
820
        for cleanup_fn in reversed(self._cleanups):
 
821
            cleanup_fn()
 
822
 
247
823
    def log(self, *args):
248
 
        logging.debug(*args)
 
824
        mutter(*args)
249
825
 
250
 
    def _get_log(self):
251
 
        """Return as a string the log for this test"""
252
 
        if self._log_file_name:
253
 
            return open(self._log_file_name).read()
 
826
    def _get_log(self, keep_log_file=False):
 
827
        """Return as a string the log for this test. If the file is still
 
828
        on disk and keep_log_file=False, delete the log file and store the
 
829
        content in self._log_contents."""
 
830
        # flush the log file, to get all content
 
831
        import bzrlib.trace
 
832
        bzrlib.trace._trace_file.flush()
 
833
        if self._log_contents:
 
834
            return self._log_contents
 
835
        if self._log_file_name is not None:
 
836
            logfile = open(self._log_file_name)
 
837
            try:
 
838
                log_contents = logfile.read()
 
839
            finally:
 
840
                logfile.close()
 
841
            if not keep_log_file:
 
842
                self._log_contents = log_contents
 
843
                os.remove(self._log_file_name)
 
844
            return log_contents
254
845
        else:
255
 
            return ''
 
846
            return "DELETED log file to reduce memory footprint"
256
847
 
257
 
    def capture(self, cmd):
 
848
    def capture(self, cmd, retcode=0):
258
849
        """Shortcut that splits cmd into words, runs, and returns stdout"""
259
 
        return self.run_bzr_captured(cmd.split())[0]
 
850
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
260
851
 
261
 
    def run_bzr_captured(self, argv, retcode=0):
262
 
        """Invoke bzr and return (result, stdout, stderr).
 
852
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
853
                         working_dir=None):
 
854
        """Invoke bzr and return (stdout, stderr).
263
855
 
264
856
        Useful for code that wants to check the contents of the
265
857
        output, the way error messages are presented, etc.
275
867
        errors, and with logging set to something approximating the
276
868
        default, so that error reporting can be checked.
277
869
 
278
 
        argv -- arguments to invoke bzr
279
 
        retcode -- expected return code, or None for don't-care.
 
870
        :param argv: arguments to invoke bzr
 
871
        :param retcode: expected return code, or None for don't-care.
 
872
        :param encoding: encoding for sys.stdout and sys.stderr
 
873
        :param stdin: A string to be used as stdin for the command.
 
874
        :param working_dir: Change to this directory before running
280
875
        """
281
 
        stdout = StringIO()
282
 
        stderr = StringIO()
283
 
        self.log('run bzr: %s', ' '.join(argv))
 
876
        if encoding is None:
 
877
            encoding = bzrlib.user_encoding
 
878
        if stdin is not None:
 
879
            stdin = StringIO(stdin)
 
880
        stdout = StringIOWrapper()
 
881
        stderr = StringIOWrapper()
 
882
        stdout.encoding = encoding
 
883
        stderr.encoding = encoding
 
884
 
 
885
        self.log('run bzr: %r', argv)
 
886
        # FIXME: don't call into logging here
284
887
        handler = logging.StreamHandler(stderr)
285
 
        handler.setFormatter(bzrlib.trace.QuietFormatter())
286
888
        handler.setLevel(logging.INFO)
287
889
        logger = logging.getLogger('')
288
890
        logger.addHandler(handler)
 
891
        old_ui_factory = bzrlib.ui.ui_factory
 
892
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
893
            stdout=stdout,
 
894
            stderr=stderr)
 
895
        bzrlib.ui.ui_factory.stdin = stdin
 
896
 
 
897
        cwd = None
 
898
        if working_dir is not None:
 
899
            cwd = osutils.getcwd()
 
900
            os.chdir(working_dir)
 
901
 
289
902
        try:
290
 
            result = self.apply_redirected(None, stdout, stderr,
 
903
            result = self.apply_redirected(stdin, stdout, stderr,
291
904
                                           bzrlib.commands.run_bzr_catch_errors,
292
905
                                           argv)
293
906
        finally:
294
907
            logger.removeHandler(handler)
 
908
            bzrlib.ui.ui_factory = old_ui_factory
 
909
            if cwd is not None:
 
910
                os.chdir(cwd)
 
911
 
295
912
        out = stdout.getvalue()
296
913
        err = stderr.getvalue()
297
914
        if out:
298
 
            self.log('output:\n%s', out)
 
915
            self.log('output:\n%r', out)
299
916
        if err:
300
 
            self.log('errors:\n%s', err)
 
917
            self.log('errors:\n%r', err)
301
918
        if retcode is not None:
302
 
            self.assertEquals(result, retcode)
 
919
            self.assertEquals(retcode, result)
303
920
        return out, err
304
921
 
305
922
    def run_bzr(self, *args, **kwargs):
311
928
 
312
929
        This sends the stdout/stderr results into the test's log,
313
930
        where it may be useful for debugging.  See also run_captured.
 
931
 
 
932
        :param stdin: A string to be used as stdin for the command.
314
933
        """
315
934
        retcode = kwargs.pop('retcode', 0)
316
 
        return self.run_bzr_captured(args, retcode)
 
935
        encoding = kwargs.pop('encoding', None)
 
936
        stdin = kwargs.pop('stdin', None)
 
937
        working_dir = kwargs.pop('working_dir', None)
 
938
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
939
                                     stdin=stdin, working_dir=working_dir)
 
940
 
 
941
    def run_bzr_decode(self, *args, **kwargs):
 
942
        if 'encoding' in kwargs:
 
943
            encoding = kwargs['encoding']
 
944
        else:
 
945
            encoding = bzrlib.user_encoding
 
946
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
947
 
 
948
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
949
        """Run bzr, and check that stderr contains the supplied regexes
 
950
        
 
951
        :param error_regexes: Sequence of regular expressions which 
 
952
            must each be found in the error output. The relative ordering
 
953
            is not enforced.
 
954
        :param args: command-line arguments for bzr
 
955
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
956
            This function changes the default value of retcode to be 3,
 
957
            since in most cases this is run when you expect bzr to fail.
 
958
        :return: (out, err) The actual output of running the command (in case you
 
959
                 want to do more inspection)
 
960
 
 
961
        Examples of use:
 
962
            # Make sure that commit is failing because there is nothing to do
 
963
            self.run_bzr_error(['no changes to commit'],
 
964
                               'commit', '-m', 'my commit comment')
 
965
            # Make sure --strict is handling an unknown file, rather than
 
966
            # giving us the 'nothing to do' error
 
967
            self.build_tree(['unknown'])
 
968
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
969
                               'commit', '--strict', '-m', 'my commit comment')
 
970
        """
 
971
        kwargs.setdefault('retcode', 3)
 
972
        out, err = self.run_bzr(*args, **kwargs)
 
973
        for regex in error_regexes:
 
974
            self.assertContainsRe(err, regex)
 
975
        return out, err
 
976
 
 
977
    def run_bzr_subprocess(self, *args, **kwargs):
 
978
        """Run bzr in a subprocess for testing.
 
979
 
 
980
        This starts a new Python interpreter and runs bzr in there. 
 
981
        This should only be used for tests that have a justifiable need for
 
982
        this isolation: e.g. they are testing startup time, or signal
 
983
        handling, or early startup code, etc.  Subprocess code can't be 
 
984
        profiled or debugged so easily.
 
985
 
 
986
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
987
            None is supplied, the status code is not checked.
 
988
        :param env_changes: A dictionary which lists changes to environment
 
989
            variables. A value of None will unset the env variable.
 
990
            The values must be strings. The change will only occur in the
 
991
            child, so you don't need to fix the environment after running.
 
992
        :param universal_newlines: Convert CRLF => LF
 
993
        :param allow_plugins: By default the subprocess is run with
 
994
            --no-plugins to ensure test reproducibility. Also, it is possible
 
995
            for system-wide plugins to create unexpected output on stderr,
 
996
            which can cause unnecessary test failures.
 
997
        """
 
998
        env_changes = kwargs.get('env_changes', {})
 
999
        working_dir = kwargs.get('working_dir', None)
 
1000
        allow_plugins = kwargs.get('allow_plugins', False)
 
1001
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1002
                                            working_dir=working_dir,
 
1003
                                            allow_plugins=allow_plugins)
 
1004
        # We distinguish between retcode=None and retcode not passed.
 
1005
        supplied_retcode = kwargs.get('retcode', 0)
 
1006
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1007
            universal_newlines=kwargs.get('universal_newlines', False),
 
1008
            process_args=args)
 
1009
 
 
1010
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1011
                             skip_if_plan_to_signal=False,
 
1012
                             working_dir=None,
 
1013
                             allow_plugins=False):
 
1014
        """Start bzr in a subprocess for testing.
 
1015
 
 
1016
        This starts a new Python interpreter and runs bzr in there.
 
1017
        This should only be used for tests that have a justifiable need for
 
1018
        this isolation: e.g. they are testing startup time, or signal
 
1019
        handling, or early startup code, etc.  Subprocess code can't be
 
1020
        profiled or debugged so easily.
 
1021
 
 
1022
        :param process_args: a list of arguments to pass to the bzr executable,
 
1023
            for example `['--version']`.
 
1024
        :param env_changes: A dictionary which lists changes to environment
 
1025
            variables. A value of None will unset the env variable.
 
1026
            The values must be strings. The change will only occur in the
 
1027
            child, so you don't need to fix the environment after running.
 
1028
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1029
            is not available.
 
1030
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1031
 
 
1032
        :returns: Popen object for the started process.
 
1033
        """
 
1034
        if skip_if_plan_to_signal:
 
1035
            if not getattr(os, 'kill', None):
 
1036
                raise TestSkipped("os.kill not available.")
 
1037
 
 
1038
        if env_changes is None:
 
1039
            env_changes = {}
 
1040
        old_env = {}
 
1041
 
 
1042
        def cleanup_environment():
 
1043
            for env_var, value in env_changes.iteritems():
 
1044
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1045
 
 
1046
        def restore_environment():
 
1047
            for env_var, value in old_env.iteritems():
 
1048
                osutils.set_or_unset_env(env_var, value)
 
1049
 
 
1050
        bzr_path = self.get_bzr_path()
 
1051
 
 
1052
        cwd = None
 
1053
        if working_dir is not None:
 
1054
            cwd = osutils.getcwd()
 
1055
            os.chdir(working_dir)
 
1056
 
 
1057
        try:
 
1058
            # win32 subprocess doesn't support preexec_fn
 
1059
            # so we will avoid using it on all platforms, just to
 
1060
            # make sure the code path is used, and we don't break on win32
 
1061
            cleanup_environment()
 
1062
            command = [sys.executable, bzr_path]
 
1063
            if not allow_plugins:
 
1064
                command.append('--no-plugins')
 
1065
            command.extend(process_args)
 
1066
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1067
        finally:
 
1068
            restore_environment()
 
1069
            if cwd is not None:
 
1070
                os.chdir(cwd)
 
1071
 
 
1072
        return process
 
1073
 
 
1074
    def _popen(self, *args, **kwargs):
 
1075
        """Place a call to Popen.
 
1076
 
 
1077
        Allows tests to override this method to intercept the calls made to
 
1078
        Popen for introspection.
 
1079
        """
 
1080
        return Popen(*args, **kwargs)
 
1081
 
 
1082
    def get_bzr_path(self):
 
1083
        """Return the path of the 'bzr' executable for this test suite."""
 
1084
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1085
        if not os.path.isfile(bzr_path):
 
1086
            # We are probably installed. Assume sys.argv is the right file
 
1087
            bzr_path = sys.argv[0]
 
1088
        return bzr_path
 
1089
 
 
1090
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1091
                              universal_newlines=False, process_args=None):
 
1092
        """Finish the execution of process.
 
1093
 
 
1094
        :param process: the Popen object returned from start_bzr_subprocess.
 
1095
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1096
            None is supplied, the status code is not checked.
 
1097
        :param send_signal: an optional signal to send to the process.
 
1098
        :param universal_newlines: Convert CRLF => LF
 
1099
        :returns: (stdout, stderr)
 
1100
        """
 
1101
        if send_signal is not None:
 
1102
            os.kill(process.pid, send_signal)
 
1103
        out, err = process.communicate()
 
1104
 
 
1105
        if universal_newlines:
 
1106
            out = out.replace('\r\n', '\n')
 
1107
            err = err.replace('\r\n', '\n')
 
1108
 
 
1109
        if retcode is not None and retcode != process.returncode:
 
1110
            if process_args is None:
 
1111
                process_args = "(unknown args)"
 
1112
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1113
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1114
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1115
                      % (process_args, retcode, process.returncode))
 
1116
        return [out, err]
317
1117
 
318
1118
    def check_inventory_shape(self, inv, shape):
319
1119
        """Compare an inventory to a list of expected names.
345
1145
        if stdin is None:
346
1146
            stdin = StringIO("")
347
1147
        if stdout is None:
348
 
            if hasattr(self, "_log_file"):
 
1148
            if getattr(self, "_log_file", None) is not None:
349
1149
                stdout = self._log_file
350
1150
            else:
351
1151
                stdout = StringIO()
352
1152
        if stderr is None:
353
 
            if hasattr(self, "_log_file"):
 
1153
            if getattr(self, "_log_file", None is not None):
354
1154
                stderr = self._log_file
355
1155
            else:
356
1156
                stderr = StringIO()
367
1167
            sys.stderr = real_stderr
368
1168
            sys.stdin = real_stdin
369
1169
 
 
1170
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1171
    def merge(self, branch_from, wt_to):
 
1172
        """A helper for tests to do a ui-less merge.
 
1173
 
 
1174
        This should move to the main library when someone has time to integrate
 
1175
        it in.
 
1176
        """
 
1177
        # minimal ui-less merge.
 
1178
        wt_to.branch.fetch(branch_from)
 
1179
        base_rev = common_ancestor(branch_from.last_revision(),
 
1180
                                   wt_to.branch.last_revision(),
 
1181
                                   wt_to.branch.repository)
 
1182
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1183
                    wt_to.branch.repository.revision_tree(base_rev),
 
1184
                    this_tree=wt_to)
 
1185
        wt_to.add_parent_tree_id(branch_from.last_revision())
 
1186
 
370
1187
 
371
1188
BzrTestBase = TestCase
372
1189
 
 
1190
 
 
1191
class TestCaseWithMemoryTransport(TestCase):
 
1192
    """Common test class for tests that do not need disk resources.
 
1193
 
 
1194
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1195
 
 
1196
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1197
 
 
1198
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1199
    a directory which does not exist. This serves to help ensure test isolation
 
1200
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1201
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1202
    file defaults for the transport in tests, nor does it obey the command line
 
1203
    override, so tests that accidentally write to the common directory should
 
1204
    be rare.
 
1205
    """
 
1206
 
 
1207
    TEST_ROOT = None
 
1208
    _TEST_NAME = 'test'
 
1209
 
 
1210
 
 
1211
    def __init__(self, methodName='runTest'):
 
1212
        # allow test parameterisation after test construction and before test
 
1213
        # execution. Variables that the parameteriser sets need to be 
 
1214
        # ones that are not set by setUp, or setUp will trash them.
 
1215
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1216
        self.transport_server = default_transport
 
1217
        self.transport_readonly_server = None
 
1218
 
 
1219
    def failUnlessExists(self, path):
 
1220
        """Fail unless path, which may be abs or relative, exists."""
 
1221
        self.failUnless(osutils.lexists(path))
 
1222
 
 
1223
    def failIfExists(self, path):
 
1224
        """Fail if path, which may be abs or relative, exists."""
 
1225
        self.failIf(osutils.lexists(path))
 
1226
        
 
1227
    def get_transport(self):
 
1228
        """Return a writeable transport for the test scratch space"""
 
1229
        t = get_transport(self.get_url())
 
1230
        self.assertFalse(t.is_readonly())
 
1231
        return t
 
1232
 
 
1233
    def get_readonly_transport(self):
 
1234
        """Return a readonly transport for the test scratch space
 
1235
        
 
1236
        This can be used to test that operations which should only need
 
1237
        readonly access in fact do not try to write.
 
1238
        """
 
1239
        t = get_transport(self.get_readonly_url())
 
1240
        self.assertTrue(t.is_readonly())
 
1241
        return t
 
1242
 
 
1243
    def get_readonly_server(self):
 
1244
        """Get the server instance for the readonly transport
 
1245
 
 
1246
        This is useful for some tests with specific servers to do diagnostics.
 
1247
        """
 
1248
        if self.__readonly_server is None:
 
1249
            if self.transport_readonly_server is None:
 
1250
                # readonly decorator requested
 
1251
                # bring up the server
 
1252
                self.get_url()
 
1253
                self.__readonly_server = ReadonlyServer()
 
1254
                self.__readonly_server.setUp(self.__server)
 
1255
            else:
 
1256
                self.__readonly_server = self.transport_readonly_server()
 
1257
                self.__readonly_server.setUp()
 
1258
            self.addCleanup(self.__readonly_server.tearDown)
 
1259
        return self.__readonly_server
 
1260
 
 
1261
    def get_readonly_url(self, relpath=None):
 
1262
        """Get a URL for the readonly transport.
 
1263
 
 
1264
        This will either be backed by '.' or a decorator to the transport 
 
1265
        used by self.get_url()
 
1266
        relpath provides for clients to get a path relative to the base url.
 
1267
        These should only be downwards relative, not upwards.
 
1268
        """
 
1269
        base = self.get_readonly_server().get_url()
 
1270
        if relpath is not None:
 
1271
            if not base.endswith('/'):
 
1272
                base = base + '/'
 
1273
            base = base + relpath
 
1274
        return base
 
1275
 
 
1276
    def get_server(self):
 
1277
        """Get the read/write server instance.
 
1278
 
 
1279
        This is useful for some tests with specific servers that need
 
1280
        diagnostics.
 
1281
 
 
1282
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1283
        is no means to override it.
 
1284
        """
 
1285
        if self.__server is None:
 
1286
            self.__server = MemoryServer()
 
1287
            self.__server.setUp()
 
1288
            self.addCleanup(self.__server.tearDown)
 
1289
        return self.__server
 
1290
 
 
1291
    def get_url(self, relpath=None):
 
1292
        """Get a URL (or maybe a path) for the readwrite transport.
 
1293
 
 
1294
        This will either be backed by '.' or to an equivalent non-file based
 
1295
        facility.
 
1296
        relpath provides for clients to get a path relative to the base url.
 
1297
        These should only be downwards relative, not upwards.
 
1298
        """
 
1299
        base = self.get_server().get_url()
 
1300
        if relpath is not None and relpath != '.':
 
1301
            if not base.endswith('/'):
 
1302
                base = base + '/'
 
1303
            # XXX: Really base should be a url; we did after all call
 
1304
            # get_url()!  But sometimes it's just a path (from
 
1305
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1306
            # to a non-escaped local path.
 
1307
            if base.startswith('./') or base.startswith('/'):
 
1308
                base += relpath
 
1309
            else:
 
1310
                base += urlutils.escape(relpath)
 
1311
        return base
 
1312
 
 
1313
    def _make_test_root(self):
 
1314
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1315
            return
 
1316
        i = 0
 
1317
        while True:
 
1318
            root = u'test%04d.tmp' % i
 
1319
            try:
 
1320
                os.mkdir(root)
 
1321
            except OSError, e:
 
1322
                if e.errno == errno.EEXIST:
 
1323
                    i += 1
 
1324
                    continue
 
1325
                else:
 
1326
                    raise
 
1327
            # successfully created
 
1328
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1329
            break
 
1330
        # make a fake bzr directory there to prevent any tests propagating
 
1331
        # up onto the source directory's real branch
 
1332
        bzrdir.BzrDir.create_standalone_workingtree(
 
1333
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1334
 
 
1335
    def makeAndChdirToTestDir(self):
 
1336
        """Create a temporary directories for this one test.
 
1337
        
 
1338
        This must set self.test_home_dir and self.test_dir and chdir to
 
1339
        self.test_dir.
 
1340
        
 
1341
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1342
        """
 
1343
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1344
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1345
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1346
        
 
1347
    def make_branch(self, relpath, format=None):
 
1348
        """Create a branch on the transport at relpath."""
 
1349
        repo = self.make_repository(relpath, format=format)
 
1350
        return repo.bzrdir.create_branch()
 
1351
 
 
1352
    def make_bzrdir(self, relpath, format=None):
 
1353
        try:
 
1354
            # might be a relative or absolute path
 
1355
            maybe_a_url = self.get_url(relpath)
 
1356
            segments = maybe_a_url.rsplit('/', 1)
 
1357
            t = get_transport(maybe_a_url)
 
1358
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1359
                try:
 
1360
                    t.mkdir('.')
 
1361
                except errors.FileExists:
 
1362
                    pass
 
1363
            if format is None:
 
1364
                format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
1365
            return format.initialize_on_transport(t)
 
1366
        except errors.UninitializableFormat:
 
1367
            raise TestSkipped("Format %s is not initializable." % format)
 
1368
 
 
1369
    def make_repository(self, relpath, shared=False, format=None):
 
1370
        """Create a repository on our default transport at relpath."""
 
1371
        made_control = self.make_bzrdir(relpath, format=format)
 
1372
        return made_control.create_repository(shared=shared)
 
1373
 
 
1374
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1375
        """Create a branch on the default transport and a MemoryTree for it."""
 
1376
        b = self.make_branch(relpath, format=format)
 
1377
        return memorytree.MemoryTree.create_on_branch(b)
 
1378
 
 
1379
    def overrideEnvironmentForTesting(self):
 
1380
        os.environ['HOME'] = self.test_home_dir
 
1381
        os.environ['APPDATA'] = self.test_home_dir
 
1382
        
 
1383
    def setUp(self):
 
1384
        super(TestCaseWithMemoryTransport, self).setUp()
 
1385
        self._make_test_root()
 
1386
        _currentdir = os.getcwdu()
 
1387
        def _leaveDirectory():
 
1388
            os.chdir(_currentdir)
 
1389
        self.addCleanup(_leaveDirectory)
 
1390
        self.makeAndChdirToTestDir()
 
1391
        self.overrideEnvironmentForTesting()
 
1392
        self.__readonly_server = None
 
1393
        self.__server = None
 
1394
 
373
1395
     
374
 
class TestCaseInTempDir(TestCase):
 
1396
class TestCaseInTempDir(TestCaseWithMemoryTransport):
375
1397
    """Derived class that runs a test within a temporary directory.
376
1398
 
377
1399
    This is useful for tests that need to create a branch, etc.
384
1406
    InTempDir is an old alias for FunctionalTestCase.
385
1407
    """
386
1408
 
387
 
    TEST_ROOT = None
388
 
    _TEST_NAME = 'test'
389
1409
    OVERRIDE_PYTHON = 'python'
390
1410
 
391
1411
    def check_file_contents(self, filename, expect):
396
1416
            self.log("actually: %r" % contents)
397
1417
            self.fail("contents of %s not as expected" % filename)
398
1418
 
399
 
    def _make_test_root(self):
400
 
        if TestCaseInTempDir.TEST_ROOT is not None:
401
 
            return
 
1419
    def makeAndChdirToTestDir(self):
 
1420
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1421
        
 
1422
        For TestCaseInTempDir we create a temporary directory based on the test
 
1423
        name and then create two subdirs - test and home under it.
 
1424
        """
 
1425
        # shorten the name, to avoid test failures due to path length
 
1426
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1427
                   .replace('__main__.', '')[-100:]
 
1428
        # it's possible the same test class is run several times for
 
1429
        # parameterized tests, so make sure the names don't collide.  
402
1430
        i = 0
403
1431
        while True:
404
 
            root = 'test%04d.tmp' % i
405
 
            try:
406
 
                os.mkdir(root)
407
 
            except OSError, e:
408
 
                if e.errno == errno.EEXIST:
409
 
                    i += 1
410
 
                    continue
411
 
                else:
412
 
                    raise
413
 
            # successfully created
414
 
            TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
415
 
            break
416
 
        # make a fake bzr directory there to prevent any tests propagating
417
 
        # up onto the source directory's real branch
418
 
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
419
 
 
420
 
    def setUp(self):
421
 
        self._make_test_root()
422
 
        self._currentdir = os.getcwdu()
423
 
        short_id = self.id().replace('bzrlib.selftest.', '') \
424
 
                   .replace('__main__.', '')
425
 
        self.test_dir = os.path.join(self.TEST_ROOT, short_id)
426
 
        os.mkdir(self.test_dir)
427
 
        os.chdir(self.test_dir)
428
 
        super(TestCaseInTempDir, self).setUp()
429
 
        
430
 
    def tearDown(self):
431
 
        os.chdir(self._currentdir)
432
 
        super(TestCaseInTempDir, self).tearDown()
433
 
 
434
 
    def build_tree(self, shape):
 
1432
            if i > 0:
 
1433
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1434
            else:
 
1435
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1436
            if os.path.exists(candidate_dir):
 
1437
                i = i + 1
 
1438
                continue
 
1439
            else:
 
1440
                os.mkdir(candidate_dir)
 
1441
                self.test_home_dir = candidate_dir + '/home'
 
1442
                os.mkdir(self.test_home_dir)
 
1443
                self.test_dir = candidate_dir + '/work'
 
1444
                os.mkdir(self.test_dir)
 
1445
                os.chdir(self.test_dir)
 
1446
                break
 
1447
 
 
1448
    def build_tree(self, shape, line_endings='native', transport=None):
435
1449
        """Build a test tree according to a pattern.
436
1450
 
437
1451
        shape is a sequence of file specifications.  If the final
438
1452
        character is '/', a directory is created.
439
1453
 
 
1454
        This assumes that all the elements in the tree being built are new.
 
1455
 
440
1456
        This doesn't add anything to a branch.
 
1457
        :param line_endings: Either 'binary' or 'native'
 
1458
                             in binary mode, exact contents are written
 
1459
                             in native mode, the line endings match the
 
1460
                             default platform endings.
 
1461
 
 
1462
        :param transport: A transport to write to, for building trees on 
 
1463
                          VFS's. If the transport is readonly or None,
 
1464
                          "." is opened automatically.
441
1465
        """
442
 
        # XXX: It's OK to just create them using forward slashes on windows?
 
1466
        # It's OK to just create them using forward slashes on windows.
 
1467
        if transport is None or transport.is_readonly():
 
1468
            transport = get_transport(".")
443
1469
        for name in shape:
444
 
            assert isinstance(name, basestring)
 
1470
            self.assert_(isinstance(name, basestring))
445
1471
            if name[-1] == '/':
446
 
                os.mkdir(name[:-1])
 
1472
                transport.mkdir(urlutils.escape(name[:-1]))
447
1473
            else:
448
 
                f = file(name, 'wt')
449
 
                print >>f, "contents of", name
450
 
                f.close()
 
1474
                if line_endings == 'binary':
 
1475
                    end = '\n'
 
1476
                elif line_endings == 'native':
 
1477
                    end = os.linesep
 
1478
                else:
 
1479
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
1480
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1481
                # Technically 'put()' is the right command. However, put
 
1482
                # uses an AtomicFile, which requires an extra rename into place
 
1483
                # As long as the files didn't exist in the past, append() will
 
1484
                # do the same thing as put()
 
1485
                # On jam's machine, make_kernel_like_tree is:
 
1486
                #   put:    4.5-7.5s (averaging 6s)
 
1487
                #   append: 2.9-4.5s
 
1488
                #   put_non_atomic: 2.9-4.5s
 
1489
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
451
1490
 
452
1491
    def build_tree_contents(self, shape):
453
 
        bzrlib.selftest.build_tree_contents(shape)
 
1492
        build_tree_contents(shape)
454
1493
 
455
 
    def failUnlessExists(self, path):
456
 
        """Fail unless path, which may be abs or relative, exists."""
457
 
        self.failUnless(osutils.lexists(path))
458
 
        
459
1494
    def assertFileEqual(self, content, path):
460
1495
        """Fail if path does not contain 'content'."""
461
1496
        self.failUnless(osutils.lexists(path))
 
1497
        # TODO: jam 20060427 Shouldn't this be 'rb'?
462
1498
        self.assertEqualDiff(content, open(path, 'r').read())
463
 
        
464
 
 
465
 
class MetaTestLog(TestCase):
466
 
    def test_logging(self):
467
 
        """Test logs are captured when a test fails."""
468
 
        logging.info('an info message')
469
 
        warning('something looks dodgy...')
470
 
        logging.debug('hello, test is running')
471
 
        ## assert 0
 
1499
 
 
1500
 
 
1501
class TestCaseWithTransport(TestCaseInTempDir):
 
1502
    """A test case that provides get_url and get_readonly_url facilities.
 
1503
 
 
1504
    These back onto two transport servers, one for readonly access and one for
 
1505
    read write access.
 
1506
 
 
1507
    If no explicit class is provided for readonly access, a
 
1508
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1509
    based read write transports.
 
1510
 
 
1511
    If an explicit class is provided for readonly access, that server and the 
 
1512
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1513
    """
 
1514
 
 
1515
    def get_server(self):
 
1516
        """See TestCaseWithMemoryTransport.
 
1517
 
 
1518
        This is useful for some tests with specific servers that need
 
1519
        diagnostics.
 
1520
        """
 
1521
        if self.__server is None:
 
1522
            self.__server = self.transport_server()
 
1523
            self.__server.setUp()
 
1524
            self.addCleanup(self.__server.tearDown)
 
1525
        return self.__server
 
1526
 
 
1527
    def make_branch_and_tree(self, relpath, format=None):
 
1528
        """Create a branch on the transport and a tree locally.
 
1529
 
 
1530
        If the transport is not a LocalTransport, the Tree can't be created on
 
1531
        the transport.  In that case the working tree is created in the local
 
1532
        directory, and the returned tree's branch and repository will also be
 
1533
        accessed locally.
 
1534
 
 
1535
        This will fail if the original default transport for this test
 
1536
        case wasn't backed by the working directory, as the branch won't
 
1537
        be on disk for us to open it.  
 
1538
 
 
1539
        :param format: The BzrDirFormat.
 
1540
        :returns: the WorkingTree.
 
1541
        """
 
1542
        # TODO: always use the local disk path for the working tree,
 
1543
        # this obviously requires a format that supports branch references
 
1544
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1545
        # RBC 20060208
 
1546
        b = self.make_branch(relpath, format=format)
 
1547
        try:
 
1548
            return b.bzrdir.create_workingtree()
 
1549
        except errors.NotLocalUrl:
 
1550
            # We can only make working trees locally at the moment.  If the
 
1551
            # transport can't support them, then reopen the branch on a local
 
1552
            # transport, and create the working tree there.  
 
1553
            #
 
1554
            # Possibly we should instead keep
 
1555
            # the non-disk-backed branch and create a local checkout?
 
1556
            bd = bzrdir.BzrDir.open(relpath)
 
1557
            return bd.create_workingtree()
 
1558
 
 
1559
    def assertIsDirectory(self, relpath, transport):
 
1560
        """Assert that relpath within transport is a directory.
 
1561
 
 
1562
        This may not be possible on all transports; in that case it propagates
 
1563
        a TransportNotPossible.
 
1564
        """
 
1565
        try:
 
1566
            mode = transport.stat(relpath).st_mode
 
1567
        except errors.NoSuchFile:
 
1568
            self.fail("path %s is not a directory; no such file"
 
1569
                      % (relpath))
 
1570
        if not stat.S_ISDIR(mode):
 
1571
            self.fail("path %s is not a directory; has mode %#o"
 
1572
                      % (relpath, mode))
 
1573
 
 
1574
    def setUp(self):
 
1575
        super(TestCaseWithTransport, self).setUp()
 
1576
        self.__server = None
 
1577
 
 
1578
 
 
1579
class ChrootedTestCase(TestCaseWithTransport):
 
1580
    """A support class that provides readonly urls outside the local namespace.
 
1581
 
 
1582
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1583
    is then we are chrooted already, if it is not then an HttpServer is used
 
1584
    for readonly urls.
 
1585
 
 
1586
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1587
                       be used without needed to redo it when a different 
 
1588
                       subclass is in use ?
 
1589
    """
 
1590
 
 
1591
    def setUp(self):
 
1592
        super(ChrootedTestCase, self).setUp()
 
1593
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
1594
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
472
1595
 
473
1596
 
474
1597
def filter_suite_by_re(suite, pattern):
481
1604
 
482
1605
 
483
1606
def run_suite(suite, name='test', verbose=False, pattern=".*",
484
 
              stop_on_failure=False):
485
 
    TestCaseInTempDir._TEST_NAME = name
 
1607
              stop_on_failure=False, keep_output=False,
 
1608
              transport=None, lsprof_timed=None, bench_history=None):
 
1609
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
486
1610
    if verbose:
487
1611
        verbosity = 2
488
1612
    else:
489
1613
        verbosity = 1
490
1614
    runner = TextTestRunner(stream=sys.stdout,
491
1615
                            descriptions=0,
492
 
                            verbosity=verbosity)
 
1616
                            verbosity=verbosity,
 
1617
                            keep_output=keep_output,
 
1618
                            bench_history=bench_history)
493
1619
    runner.stop_on_failure=stop_on_failure
494
1620
    if pattern != '.*':
495
1621
        suite = filter_suite_by_re(suite, pattern)
496
1622
    result = runner.run(suite)
497
 
    # This is still a little bogus, 
498
 
    # but only a little. Folk not using our testrunner will
499
 
    # have to delete their temp directories themselves.
500
 
    if result.wasSuccessful():
501
 
        if TestCaseInTempDir.TEST_ROOT is not None:
502
 
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
503
 
    else:
504
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
505
1623
    return result.wasSuccessful()
506
1624
 
507
1625
 
508
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True):
 
1626
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1627
             keep_output=False,
 
1628
             transport=None,
 
1629
             test_suite_factory=None,
 
1630
             lsprof_timed=None,
 
1631
             bench_history=None):
509
1632
    """Run the whole test suite under the enhanced runner"""
510
 
    return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
511
 
                     stop_on_failure=stop_on_failure)
 
1633
    # XXX: Very ugly way to do this...
 
1634
    # Disable warning about old formats because we don't want it to disturb
 
1635
    # any blackbox tests.
 
1636
    from bzrlib import repository
 
1637
    repository._deprecation_warning_done = True
 
1638
 
 
1639
    global default_transport
 
1640
    if transport is None:
 
1641
        transport = default_transport
 
1642
    old_transport = default_transport
 
1643
    default_transport = transport
 
1644
    try:
 
1645
        if test_suite_factory is None:
 
1646
            suite = test_suite()
 
1647
        else:
 
1648
            suite = test_suite_factory()
 
1649
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1650
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1651
                     transport=transport,
 
1652
                     lsprof_timed=lsprof_timed,
 
1653
                     bench_history=bench_history)
 
1654
    finally:
 
1655
        default_transport = old_transport
512
1656
 
513
1657
 
514
1658
def test_suite():
515
 
    """Build and return TestSuite for the whole program."""
516
 
    import bzrlib.store, bzrlib.inventory, bzrlib.branch
517
 
    import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
518
 
    from doctest import DocTestSuite
519
 
 
520
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
521
 
 
522
 
    testmod_names = \
523
 
                  ['bzrlib.selftest.MetaTestLog',
524
 
                   'bzrlib.selftest.testgpg',
525
 
                   'bzrlib.selftest.testidentitymap',
526
 
                   'bzrlib.selftest.testinv',
527
 
                   'bzrlib.selftest.test_ancestry',
528
 
                   'bzrlib.selftest.test_commit',
529
 
                   'bzrlib.selftest.test_commit_merge',
530
 
                   'bzrlib.selftest.testconfig',
531
 
                   'bzrlib.selftest.versioning',
532
 
                   'bzrlib.selftest.testmerge3',
533
 
                   'bzrlib.selftest.testmerge',
534
 
                   'bzrlib.selftest.testhashcache',
535
 
                   'bzrlib.selftest.teststatus',
536
 
                   'bzrlib.selftest.testlog',
537
 
                   'bzrlib.selftest.testrevisionnamespaces',
538
 
                   'bzrlib.selftest.testbranch',
539
 
                   'bzrlib.selftest.testrevision',
540
 
                   'bzrlib.selftest.test_revision_info',
541
 
                   'bzrlib.selftest.test_merge_core',
542
 
                   'bzrlib.selftest.test_smart_add',
543
 
                   'bzrlib.selftest.test_bad_files',
544
 
                   'bzrlib.selftest.testdiff',
545
 
                   'bzrlib.selftest.test_parent',
546
 
                   'bzrlib.selftest.test_xml',
547
 
                   'bzrlib.selftest.test_weave',
548
 
                   'bzrlib.selftest.testfetch',
549
 
                   'bzrlib.selftest.whitebox',
550
 
                   'bzrlib.selftest.teststore',
551
 
                   'bzrlib.selftest.blackbox',
552
 
                   'bzrlib.selftest.testsampler',
553
 
                   'bzrlib.selftest.testtransactions',
554
 
                   'bzrlib.selftest.testtransport',
555
 
                   'bzrlib.selftest.testgraph',
556
 
                   'bzrlib.selftest.testworkingtree',
557
 
                   'bzrlib.selftest.test_upgrade',
558
 
                   'bzrlib.selftest.test_conflicts',
559
 
                   'bzrlib.selftest.testtestament',
560
 
                   'bzrlib.selftest.testannotate',
561
 
                   'bzrlib.selftest.testrevprops',
562
 
                   'bzrlib.selftest.testoptions',
563
 
                   'bzrlib.selftest.testhttp',
564
 
                   'bzrlib.selftest.testnonascii',
 
1659
    """Build and return TestSuite for the whole of bzrlib.
 
1660
    
 
1661
    This function can be replaced if you need to change the default test
 
1662
    suite on a global basis, but it is not encouraged.
 
1663
    """
 
1664
    testmod_names = [
 
1665
                   'bzrlib.tests.test_ancestry',
 
1666
                   'bzrlib.tests.test_api',
 
1667
                   'bzrlib.tests.test_atomicfile',
 
1668
                   'bzrlib.tests.test_bad_files',
 
1669
                   'bzrlib.tests.test_branch',
 
1670
                   'bzrlib.tests.test_bundle',
 
1671
                   'bzrlib.tests.test_bzrdir',
 
1672
                   'bzrlib.tests.test_cache_utf8',
 
1673
                   'bzrlib.tests.test_command',
 
1674
                   'bzrlib.tests.test_commit',
 
1675
                   'bzrlib.tests.test_commit_merge',
 
1676
                   'bzrlib.tests.test_config',
 
1677
                   'bzrlib.tests.test_conflicts',
 
1678
                   'bzrlib.tests.test_decorators',
 
1679
                   'bzrlib.tests.test_diff',
 
1680
                   'bzrlib.tests.test_doc_generate',
 
1681
                   'bzrlib.tests.test_errors',
 
1682
                   'bzrlib.tests.test_escaped_store',
 
1683
                   'bzrlib.tests.test_fetch',
 
1684
                   'bzrlib.tests.test_ftp_transport',
 
1685
                   'bzrlib.tests.test_gpg',
 
1686
                   'bzrlib.tests.test_graph',
 
1687
                   'bzrlib.tests.test_hashcache',
 
1688
                   'bzrlib.tests.test_http',
 
1689
                   'bzrlib.tests.test_http_response',
 
1690
                   'bzrlib.tests.test_identitymap',
 
1691
                   'bzrlib.tests.test_ignores',
 
1692
                   'bzrlib.tests.test_inv',
 
1693
                   'bzrlib.tests.test_knit',
 
1694
                   'bzrlib.tests.test_lazy_import',
 
1695
                   'bzrlib.tests.test_lazy_regex',
 
1696
                   'bzrlib.tests.test_lockdir',
 
1697
                   'bzrlib.tests.test_lockable_files',
 
1698
                   'bzrlib.tests.test_log',
 
1699
                   'bzrlib.tests.test_memorytree',
 
1700
                   'bzrlib.tests.test_merge',
 
1701
                   'bzrlib.tests.test_merge3',
 
1702
                   'bzrlib.tests.test_merge_core',
 
1703
                   'bzrlib.tests.test_missing',
 
1704
                   'bzrlib.tests.test_msgeditor',
 
1705
                   'bzrlib.tests.test_nonascii',
 
1706
                   'bzrlib.tests.test_options',
 
1707
                   'bzrlib.tests.test_osutils',
 
1708
                   'bzrlib.tests.test_patch',
 
1709
                   'bzrlib.tests.test_patches',
 
1710
                   'bzrlib.tests.test_permissions',
 
1711
                   'bzrlib.tests.test_plugins',
 
1712
                   'bzrlib.tests.test_progress',
 
1713
                   'bzrlib.tests.test_reconcile',
 
1714
                   'bzrlib.tests.test_registry',
 
1715
                   'bzrlib.tests.test_repository',
 
1716
                   'bzrlib.tests.test_revert',
 
1717
                   'bzrlib.tests.test_revision',
 
1718
                   'bzrlib.tests.test_revisionnamespaces',
 
1719
                   'bzrlib.tests.test_revisiontree',
 
1720
                   'bzrlib.tests.test_rio',
 
1721
                   'bzrlib.tests.test_sampler',
 
1722
                   'bzrlib.tests.test_selftest',
 
1723
                   'bzrlib.tests.test_setup',
 
1724
                   'bzrlib.tests.test_sftp_transport',
 
1725
                   'bzrlib.tests.test_smart_add',
 
1726
                   'bzrlib.tests.test_smart_transport',
 
1727
                   'bzrlib.tests.test_source',
 
1728
                   'bzrlib.tests.test_status',
 
1729
                   'bzrlib.tests.test_store',
 
1730
                   'bzrlib.tests.test_symbol_versioning',
 
1731
                   'bzrlib.tests.test_testament',
 
1732
                   'bzrlib.tests.test_textfile',
 
1733
                   'bzrlib.tests.test_textmerge',
 
1734
                   'bzrlib.tests.test_trace',
 
1735
                   'bzrlib.tests.test_transactions',
 
1736
                   'bzrlib.tests.test_transform',
 
1737
                   'bzrlib.tests.test_transport',
 
1738
                   'bzrlib.tests.test_tree',
 
1739
                   'bzrlib.tests.test_treebuilder',
 
1740
                   'bzrlib.tests.test_tsort',
 
1741
                   'bzrlib.tests.test_tuned_gzip',
 
1742
                   'bzrlib.tests.test_ui',
 
1743
                   'bzrlib.tests.test_upgrade',
 
1744
                   'bzrlib.tests.test_urlutils',
 
1745
                   'bzrlib.tests.test_versionedfile',
 
1746
                   'bzrlib.tests.test_version',
 
1747
                   'bzrlib.tests.test_version_info',
 
1748
                   'bzrlib.tests.test_weave',
 
1749
                   'bzrlib.tests.test_whitebox',
 
1750
                   'bzrlib.tests.test_workingtree',
 
1751
                   'bzrlib.tests.test_xml',
565
1752
                   ]
566
 
 
567
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
568
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3,
569
 
              bzrlib.errors,
570
 
              ):
571
 
        if m not in MODULES_TO_DOCTEST:
572
 
            MODULES_TO_DOCTEST.append(m)
573
 
 
574
 
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
575
 
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
576
 
    print
577
 
    suite = TestSuite()
578
 
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
 
1753
    test_transport_implementations = [
 
1754
        'bzrlib.tests.test_transport_implementations',
 
1755
        'bzrlib.tests.test_read_bundle',
 
1756
        ]
 
1757
    suite = TestUtil.TestSuite()
 
1758
    loader = TestUtil.TestLoader()
 
1759
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1760
    from bzrlib.transport import TransportTestProviderAdapter
 
1761
    adapter = TransportTestProviderAdapter()
 
1762
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1763
    for package in packages_to_test():
 
1764
        suite.addTest(package.test_suite())
579
1765
    for m in MODULES_TO_TEST:
580
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
581
 
    for m in (MODULES_TO_DOCTEST):
582
 
        suite.addTest(DocTestSuite(m))
583
 
    for p in bzrlib.plugin.all_plugins:
584
 
        if hasattr(p, 'test_suite'):
585
 
            suite.addTest(p.test_suite())
 
1766
        suite.addTest(loader.loadTestsFromModule(m))
 
1767
    for m in MODULES_TO_DOCTEST:
 
1768
        try:
 
1769
            suite.addTest(doctest.DocTestSuite(m))
 
1770
        except ValueError, e:
 
1771
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
1772
            raise
 
1773
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1774
        if getattr(plugin, 'test_suite', None) is not None:
 
1775
            suite.addTest(plugin.test_suite())
586
1776
    return suite
587
1777
 
 
1778
 
 
1779
def adapt_modules(mods_list, adapter, loader, suite):
 
1780
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1781
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1782
        suite.addTests(adapter.adapt(test))