~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

Compare URLs in RemoteRepository.__eq__, rather than '_client' attributes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
18
31
from cStringIO import StringIO
19
32
import difflib
 
33
import doctest
20
34
import errno
21
35
import logging
22
36
import os
 
37
from pprint import pformat
 
38
import random
23
39
import re
24
 
import shutil
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
25
43
import sys
26
44
import tempfile
27
45
import unittest
28
46
import time
29
 
import codecs
30
 
 
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
31
61
import bzrlib.branch
32
62
import bzrlib.commands
33
 
from bzrlib.errors import BzrError
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
34
65
import bzrlib.inventory
35
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
36
74
import bzrlib.merge3
37
75
import bzrlib.osutils
38
 
import bzrlib.osutils as osutils
39
76
import bzrlib.plugin
 
77
from bzrlib.revision import common_ancestor
40
78
import bzrlib.store
 
79
from bzrlib import symbol_versioning
 
80
from bzrlib.symbol_versioning import (
 
81
    deprecated_method,
 
82
    zero_eighteen,
 
83
    )
41
84
import bzrlib.trace
42
 
from bzrlib.trace import mutter
43
 
from bzrlib.tests.TestUtil import TestLoader, TestSuite
 
85
from bzrlib.transport import get_transport
 
86
import bzrlib.transport
 
87
from bzrlib.transport.local import LocalURLServer
 
88
from bzrlib.transport.memory import MemoryServer
 
89
from bzrlib.transport.readonly import ReadonlyServer
 
90
from bzrlib.trace import mutter, note
 
91
from bzrlib.tests import TestUtil
 
92
from bzrlib.tests.HttpServer import HttpServer
 
93
from bzrlib.tests.TestUtil import (
 
94
                          TestSuite,
 
95
                          TestLoader,
 
96
                          )
44
97
from bzrlib.tests.treeshape import build_tree_contents
 
98
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
99
 
 
100
# Mark this python module as being part of the implementation
 
101
# of unittest: this gives us better tracebacks where the last
 
102
# shown frame is the test code, not our assertXYZ.
 
103
__unittest = 1
 
104
 
 
105
default_transport = LocalURLServer
45
106
 
46
107
MODULES_TO_TEST = []
47
108
MODULES_TO_DOCTEST = [
48
 
                      bzrlib.branch,
49
 
                      bzrlib.commands,
 
109
                      bzrlib.timestamp,
50
110
                      bzrlib.errors,
 
111
                      bzrlib.export,
51
112
                      bzrlib.inventory,
52
113
                      bzrlib.iterablefile,
 
114
                      bzrlib.lockdir,
53
115
                      bzrlib.merge3,
54
 
                      bzrlib.osutils,
 
116
                      bzrlib.option,
55
117
                      bzrlib.store,
56
118
                      ]
 
119
 
 
120
 
57
121
def packages_to_test():
 
122
    """Return a list of packages to test.
 
123
 
 
124
    The packages are not globally imported so that import failures are
 
125
    triggered when running selftest, not when importing the command.
 
126
    """
 
127
    import bzrlib.doc
58
128
    import bzrlib.tests.blackbox
 
129
    import bzrlib.tests.branch_implementations
 
130
    import bzrlib.tests.bzrdir_implementations
 
131
    import bzrlib.tests.commands
 
132
    import bzrlib.tests.interrepository_implementations
 
133
    import bzrlib.tests.interversionedfile_implementations
 
134
    import bzrlib.tests.intertree_implementations
 
135
    import bzrlib.tests.per_lock
 
136
    import bzrlib.tests.repository_implementations
 
137
    import bzrlib.tests.revisionstore_implementations
 
138
    import bzrlib.tests.tree_implementations
 
139
    import bzrlib.tests.workingtree_implementations
59
140
    return [
60
 
            bzrlib.tests.blackbox
 
141
            bzrlib.doc,
 
142
            bzrlib.tests.blackbox,
 
143
            bzrlib.tests.branch_implementations,
 
144
            bzrlib.tests.bzrdir_implementations,
 
145
            bzrlib.tests.commands,
 
146
            bzrlib.tests.interrepository_implementations,
 
147
            bzrlib.tests.interversionedfile_implementations,
 
148
            bzrlib.tests.intertree_implementations,
 
149
            bzrlib.tests.per_lock,
 
150
            bzrlib.tests.repository_implementations,
 
151
            bzrlib.tests.revisionstore_implementations,
 
152
            bzrlib.tests.tree_implementations,
 
153
            bzrlib.tests.workingtree_implementations,
61
154
            ]
62
155
 
63
156
 
64
 
class EarlyStoppingTestResultAdapter(object):
65
 
    """An adapter for TestResult to stop at the first first failure or error"""
66
 
 
67
 
    def __init__(self, result):
68
 
        self._result = result
69
 
 
70
 
    def addError(self, test, err):
71
 
        self._result.addError(test, err)
72
 
        self._result.stop()
73
 
 
74
 
    def addFailure(self, test, err):
75
 
        self._result.addFailure(test, err)
76
 
        self._result.stop()
77
 
 
78
 
    def __getattr__(self, name):
79
 
        return getattr(self._result, name)
80
 
 
81
 
    def __setattr__(self, name, value):
82
 
        if name == '_result':
83
 
            object.__setattr__(self, name, value)
84
 
        return setattr(self._result, name, value)
85
 
 
86
 
 
87
 
class _MyResult(unittest._TextTestResult):
88
 
    """Custom TestResult.
89
 
 
90
 
    Shows output in a different format, including displaying runtime for tests.
 
157
class ExtendedTestResult(unittest._TextTestResult):
 
158
    """Accepts, reports and accumulates the results of running tests.
 
159
 
 
160
    Compared to this unittest version this class adds support for profiling,
 
161
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
162
    There are further-specialized subclasses for different types of display.
91
163
    """
92
164
 
93
 
    def _elapsedTime(self):
94
 
        return "%5dms" % (1000 * (time.time() - self._start_time))
 
165
    stop_early = False
 
166
    
 
167
    def __init__(self, stream, descriptions, verbosity,
 
168
                 bench_history=None,
 
169
                 num_tests=None,
 
170
                 ):
 
171
        """Construct new TestResult.
 
172
 
 
173
        :param bench_history: Optionally, a writable file object to accumulate
 
174
            benchmark results.
 
175
        """
 
176
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
177
        if bench_history is not None:
 
178
            from bzrlib.version import _get_bzr_source_tree
 
179
            src_tree = _get_bzr_source_tree()
 
180
            if src_tree:
 
181
                try:
 
182
                    revision_id = src_tree.get_parent_ids()[0]
 
183
                except IndexError:
 
184
                    # XXX: if this is a brand new tree, do the same as if there
 
185
                    # is no branch.
 
186
                    revision_id = ''
 
187
            else:
 
188
                # XXX: If there's no branch, what should we do?
 
189
                revision_id = ''
 
190
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
191
        self._bench_history = bench_history
 
192
        self.ui = ui.ui_factory
 
193
        self.num_tests = num_tests
 
194
        self.error_count = 0
 
195
        self.failure_count = 0
 
196
        self.known_failure_count = 0
 
197
        self.skip_count = 0
 
198
        self.unsupported = {}
 
199
        self.count = 0
 
200
        self._overall_start_time = time.time()
 
201
    
 
202
    def extractBenchmarkTime(self, testCase):
 
203
        """Add a benchmark time for the current test case."""
 
204
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
205
    
 
206
    def _elapsedTestTimeString(self):
 
207
        """Return a time string for the overall time the current test has taken."""
 
208
        return self._formatTime(time.time() - self._start_time)
 
209
 
 
210
    def _testTimeString(self):
 
211
        if self._benchmarkTime is not None:
 
212
            return "%s/%s" % (
 
213
                self._formatTime(self._benchmarkTime),
 
214
                self._elapsedTestTimeString())
 
215
        else:
 
216
            return "           %s" % self._elapsedTestTimeString()
 
217
 
 
218
    def _formatTime(self, seconds):
 
219
        """Format seconds as milliseconds with leading spaces."""
 
220
        # some benchmarks can take thousands of seconds to run, so we need 8
 
221
        # places
 
222
        return "%8dms" % (1000 * seconds)
 
223
 
 
224
    def _shortened_test_description(self, test):
 
225
        what = test.id()
 
226
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
227
        return what
95
228
 
96
229
    def startTest(self, test):
97
230
        unittest.TestResult.startTest(self, test)
98
 
        # In a short description, the important words are in
99
 
        # the beginning, but in an id, the important words are
100
 
        # at the end
101
 
        SHOW_DESCRIPTIONS = False
102
 
        if self.showAll:
103
 
            width = osutils.terminal_width()
104
 
            name_width = width - 15
105
 
            what = None
106
 
            if SHOW_DESCRIPTIONS:
107
 
                what = test.shortDescription()
108
 
                if what:
109
 
                    if len(what) > name_width:
110
 
                        what = what[:name_width-3] + '...'
111
 
            if what is None:
112
 
                what = test.id()
113
 
                if what.startswith('bzrlib.tests.'):
114
 
                    what = what[13:]
115
 
                if len(what) > name_width:
116
 
                    what = '...' + what[3-name_width:]
117
 
            what = what.ljust(name_width)
118
 
            self.stream.write(what)
119
 
        self.stream.flush()
 
231
        self.report_test_start(test)
 
232
        test.number = self.count
 
233
        self._recordTestStartTime()
 
234
 
 
235
    def _recordTestStartTime(self):
 
236
        """Record that a test has started."""
120
237
        self._start_time = time.time()
121
238
 
 
239
    def _cleanupLogFile(self, test):
 
240
        # We can only do this if we have one of our TestCases, not if
 
241
        # we have a doctest.
 
242
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
243
        if setKeepLogfile is not None:
 
244
            setKeepLogfile()
 
245
 
122
246
    def addError(self, test, err):
 
247
        self.extractBenchmarkTime(test)
 
248
        self._cleanupLogFile(test)
123
249
        if isinstance(err[1], TestSkipped):
124
 
            return self.addSkipped(test, err)    
 
250
            return self.addSkipped(test, err)
 
251
        elif isinstance(err[1], UnavailableFeature):
 
252
            return self.addNotSupported(test, err[1].args[0])
125
253
        unittest.TestResult.addError(self, test, err)
126
 
        if self.showAll:
127
 
            self.stream.writeln("ERROR %s" % self._elapsedTime())
128
 
        elif self.dots:
129
 
            self.stream.write('E')
130
 
        self.stream.flush()
 
254
        self.error_count += 1
 
255
        self.report_error(test, err)
 
256
        if self.stop_early:
 
257
            self.stop()
131
258
 
132
259
    def addFailure(self, test, err):
 
260
        self._cleanupLogFile(test)
 
261
        self.extractBenchmarkTime(test)
 
262
        if isinstance(err[1], KnownFailure):
 
263
            return self.addKnownFailure(test, err)
133
264
        unittest.TestResult.addFailure(self, test, err)
134
 
        if self.showAll:
135
 
            self.stream.writeln(" FAIL %s" % self._elapsedTime())
136
 
        elif self.dots:
137
 
            self.stream.write('F')
138
 
        self.stream.flush()
 
265
        self.failure_count += 1
 
266
        self.report_failure(test, err)
 
267
        if self.stop_early:
 
268
            self.stop()
 
269
 
 
270
    def addKnownFailure(self, test, err):
 
271
        self.known_failure_count += 1
 
272
        self.report_known_failure(test, err)
 
273
 
 
274
    def addNotSupported(self, test, feature):
 
275
        self.unsupported.setdefault(str(feature), 0)
 
276
        self.unsupported[str(feature)] += 1
 
277
        self.report_unsupported(test, feature)
139
278
 
140
279
    def addSuccess(self, test):
141
 
        if self.showAll:
142
 
            self.stream.writeln('   OK %s' % self._elapsedTime())
143
 
        elif self.dots:
144
 
            self.stream.write('~')
145
 
        self.stream.flush()
 
280
        self.extractBenchmarkTime(test)
 
281
        if self._bench_history is not None:
 
282
            if self._benchmarkTime is not None:
 
283
                self._bench_history.write("%s %s\n" % (
 
284
                    self._formatTime(self._benchmarkTime),
 
285
                    test.id()))
 
286
        self.report_success(test)
146
287
        unittest.TestResult.addSuccess(self, test)
147
288
 
148
289
    def addSkipped(self, test, skip_excinfo):
149
 
        if self.showAll:
150
 
            print >>self.stream, ' SKIP %s' % self._elapsedTime()
151
 
            print >>self.stream, '     %s' % skip_excinfo[1]
152
 
        elif self.dots:
153
 
            self.stream.write('S')
154
 
        self.stream.flush()
 
290
        self.report_skip(test, skip_excinfo)
155
291
        # seems best to treat this as success from point-of-view of unittest
156
292
        # -- it actually does nothing so it barely matters :)
157
 
        unittest.TestResult.addSuccess(self, test)
 
293
        try:
 
294
            test.tearDown()
 
295
        except KeyboardInterrupt:
 
296
            raise
 
297
        except:
 
298
            self.addError(test, test.__exc_info())
 
299
        else:
 
300
            unittest.TestResult.addSuccess(self, test)
158
301
 
159
302
    def printErrorList(self, flavour, errors):
160
303
        for test, err in errors:
161
304
            self.stream.writeln(self.separator1)
162
 
            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
163
 
            if hasattr(test, '_get_log'):
 
305
            self.stream.write("%s: " % flavour)
 
306
            self.stream.writeln(self.getDescription(test))
 
307
            if getattr(test, '_get_log', None) is not None:
164
308
                print >>self.stream
165
309
                print >>self.stream, \
166
 
                        ('vvvv[log from %s]' % test).ljust(78,'-')
 
310
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
167
311
                print >>self.stream, test._get_log()
168
312
                print >>self.stream, \
169
 
                        ('^^^^[log from %s]' % test).ljust(78,'-')
 
313
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
170
314
            self.stream.writeln(self.separator2)
171
315
            self.stream.writeln("%s" % err)
172
316
 
173
 
 
174
 
class TextTestRunner(unittest.TextTestRunner):
 
317
    def finished(self):
 
318
        pass
 
319
 
 
320
    def report_cleaning_up(self):
 
321
        pass
 
322
 
 
323
    def report_success(self, test):
 
324
        pass
 
325
 
 
326
 
 
327
class TextTestResult(ExtendedTestResult):
 
328
    """Displays progress and results of tests in text form"""
 
329
 
 
330
    def __init__(self, stream, descriptions, verbosity,
 
331
                 bench_history=None,
 
332
                 num_tests=None,
 
333
                 pb=None,
 
334
                 ):
 
335
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
336
            bench_history, num_tests)
 
337
        if pb is None:
 
338
            self.pb = self.ui.nested_progress_bar()
 
339
            self._supplied_pb = False
 
340
        else:
 
341
            self.pb = pb
 
342
            self._supplied_pb = True
 
343
        self.pb.show_pct = False
 
344
        self.pb.show_spinner = False
 
345
        self.pb.show_eta = False,
 
346
        self.pb.show_count = False
 
347
        self.pb.show_bar = False
 
348
 
 
349
    def report_starting(self):
 
350
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
351
 
 
352
    def _progress_prefix_text(self):
 
353
        a = '[%d' % self.count
 
354
        if self.num_tests is not None:
 
355
            a +='/%d' % self.num_tests
 
356
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
357
        if self.error_count:
 
358
            a += ', %d errors' % self.error_count
 
359
        if self.failure_count:
 
360
            a += ', %d failed' % self.failure_count
 
361
        if self.known_failure_count:
 
362
            a += ', %d known failures' % self.known_failure_count
 
363
        if self.skip_count:
 
364
            a += ', %d skipped' % self.skip_count
 
365
        if self.unsupported:
 
366
            a += ', %d missing features' % len(self.unsupported)
 
367
        a += ']'
 
368
        return a
 
369
 
 
370
    def report_test_start(self, test):
 
371
        self.count += 1
 
372
        self.pb.update(
 
373
                self._progress_prefix_text()
 
374
                + ' ' 
 
375
                + self._shortened_test_description(test))
 
376
 
 
377
    def _test_description(self, test):
 
378
        return self._shortened_test_description(test)
 
379
 
 
380
    def report_error(self, test, err):
 
381
        self.pb.note('ERROR: %s\n    %s\n', 
 
382
            self._test_description(test),
 
383
            err[1],
 
384
            )
 
385
 
 
386
    def report_failure(self, test, err):
 
387
        self.pb.note('FAIL: %s\n    %s\n', 
 
388
            self._test_description(test),
 
389
            err[1],
 
390
            )
 
391
 
 
392
    def report_known_failure(self, test, err):
 
393
        self.pb.note('XFAIL: %s\n%s\n',
 
394
            self._test_description(test), err[1])
 
395
 
 
396
    def report_skip(self, test, skip_excinfo):
 
397
        self.skip_count += 1
 
398
        if False:
 
399
            # at the moment these are mostly not things we can fix
 
400
            # and so they just produce stipple; use the verbose reporter
 
401
            # to see them.
 
402
            if False:
 
403
                # show test and reason for skip
 
404
                self.pb.note('SKIP: %s\n    %s\n', 
 
405
                    self._shortened_test_description(test),
 
406
                    skip_excinfo[1])
 
407
            else:
 
408
                # since the class name was left behind in the still-visible
 
409
                # progress bar...
 
410
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
411
 
 
412
    def report_unsupported(self, test, feature):
 
413
        """test cannot be run because feature is missing."""
 
414
                  
 
415
    def report_cleaning_up(self):
 
416
        self.pb.update('cleaning up...')
 
417
 
 
418
    def finished(self):
 
419
        if not self._supplied_pb:
 
420
            self.pb.finished()
 
421
 
 
422
 
 
423
class VerboseTestResult(ExtendedTestResult):
 
424
    """Produce long output, with one line per test run plus times"""
 
425
 
 
426
    def _ellipsize_to_right(self, a_string, final_width):
 
427
        """Truncate and pad a string, keeping the right hand side"""
 
428
        if len(a_string) > final_width:
 
429
            result = '...' + a_string[3-final_width:]
 
430
        else:
 
431
            result = a_string
 
432
        return result.ljust(final_width)
 
433
 
 
434
    def report_starting(self):
 
435
        self.stream.write('running %d tests...\n' % self.num_tests)
 
436
 
 
437
    def report_test_start(self, test):
 
438
        self.count += 1
 
439
        name = self._shortened_test_description(test)
 
440
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
441
        # numbers, plus a trailing blank
 
442
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
443
        self.stream.write(self._ellipsize_to_right(name,
 
444
                          osutils.terminal_width()-30))
 
445
        self.stream.flush()
 
446
 
 
447
    def _error_summary(self, err):
 
448
        indent = ' ' * 4
 
449
        return '%s%s' % (indent, err[1])
 
450
 
 
451
    def report_error(self, test, err):
 
452
        self.stream.writeln('ERROR %s\n%s'
 
453
                % (self._testTimeString(),
 
454
                   self._error_summary(err)))
 
455
 
 
456
    def report_failure(self, test, err):
 
457
        self.stream.writeln(' FAIL %s\n%s'
 
458
                % (self._testTimeString(),
 
459
                   self._error_summary(err)))
 
460
 
 
461
    def report_known_failure(self, test, err):
 
462
        self.stream.writeln('XFAIL %s\n%s'
 
463
                % (self._testTimeString(),
 
464
                   self._error_summary(err)))
 
465
 
 
466
    def report_success(self, test):
 
467
        self.stream.writeln('   OK %s' % self._testTimeString())
 
468
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
469
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
470
            stats.pprint(file=self.stream)
 
471
        # flush the stream so that we get smooth output. This verbose mode is
 
472
        # used to show the output in PQM.
 
473
        self.stream.flush()
 
474
 
 
475
    def report_skip(self, test, skip_excinfo):
 
476
        self.skip_count += 1
 
477
        self.stream.writeln(' SKIP %s\n%s'
 
478
                % (self._testTimeString(),
 
479
                   self._error_summary(skip_excinfo)))
 
480
 
 
481
    def report_unsupported(self, test, feature):
 
482
        """test cannot be run because feature is missing."""
 
483
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
484
                %(self._testTimeString(), feature))
 
485
                  
 
486
 
 
487
 
 
488
class TextTestRunner(object):
175
489
    stop_on_failure = False
176
490
 
177
 
    def _makeResult(self):
178
 
        result = _MyResult(self.stream, self.descriptions, self.verbosity)
179
 
        if self.stop_on_failure:
180
 
            result = EarlyStoppingTestResultAdapter(result)
 
491
    def __init__(self,
 
492
                 stream=sys.stderr,
 
493
                 descriptions=0,
 
494
                 verbosity=1,
 
495
                 bench_history=None,
 
496
                 list_only=False
 
497
                 ):
 
498
        self.stream = unittest._WritelnDecorator(stream)
 
499
        self.descriptions = descriptions
 
500
        self.verbosity = verbosity
 
501
        self._bench_history = bench_history
 
502
        self.list_only = list_only
 
503
 
 
504
    def run(self, test):
 
505
        "Run the given test case or test suite."
 
506
        startTime = time.time()
 
507
        if self.verbosity == 1:
 
508
            result_class = TextTestResult
 
509
        elif self.verbosity >= 2:
 
510
            result_class = VerboseTestResult
 
511
        result = result_class(self.stream,
 
512
                              self.descriptions,
 
513
                              self.verbosity,
 
514
                              bench_history=self._bench_history,
 
515
                              num_tests=test.countTestCases(),
 
516
                              )
 
517
        result.stop_early = self.stop_on_failure
 
518
        result.report_starting()
 
519
        if self.list_only:
 
520
            if self.verbosity >= 2:
 
521
                self.stream.writeln("Listing tests only ...\n")
 
522
            run = 0
 
523
            for t in iter_suite_tests(test):
 
524
                self.stream.writeln("%s" % (t.id()))
 
525
                run += 1
 
526
            actionTaken = "Listed"
 
527
        else: 
 
528
            test.run(result)
 
529
            run = result.testsRun
 
530
            actionTaken = "Ran"
 
531
        stopTime = time.time()
 
532
        timeTaken = stopTime - startTime
 
533
        result.printErrors()
 
534
        self.stream.writeln(result.separator2)
 
535
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
536
                            run, run != 1 and "s" or "", timeTaken))
 
537
        self.stream.writeln()
 
538
        if not result.wasSuccessful():
 
539
            self.stream.write("FAILED (")
 
540
            failed, errored = map(len, (result.failures, result.errors))
 
541
            if failed:
 
542
                self.stream.write("failures=%d" % failed)
 
543
            if errored:
 
544
                if failed: self.stream.write(", ")
 
545
                self.stream.write("errors=%d" % errored)
 
546
            if result.known_failure_count:
 
547
                if failed or errored: self.stream.write(", ")
 
548
                self.stream.write("known_failure_count=%d" %
 
549
                    result.known_failure_count)
 
550
            self.stream.writeln(")")
 
551
        else:
 
552
            if result.known_failure_count:
 
553
                self.stream.writeln("OK (known_failures=%d)" %
 
554
                    result.known_failure_count)
 
555
            else:
 
556
                self.stream.writeln("OK")
 
557
        if result.skip_count > 0:
 
558
            skipped = result.skip_count
 
559
            self.stream.writeln('%d test%s skipped' %
 
560
                                (skipped, skipped != 1 and "s" or ""))
 
561
        if result.unsupported:
 
562
            for feature, count in sorted(result.unsupported.items()):
 
563
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
564
                    (feature, count))
 
565
        result.finished()
181
566
        return result
182
567
 
183
568
 
196
581
 
197
582
class TestSkipped(Exception):
198
583
    """Indicates that a test was intentionally skipped, rather than failing."""
199
 
    # XXX: Not used yet
 
584
 
 
585
 
 
586
class KnownFailure(AssertionError):
 
587
    """Indicates that a test failed in a precisely expected manner.
 
588
 
 
589
    Such failures dont block the whole test suite from passing because they are
 
590
    indicators of partially completed code or of future work. We have an
 
591
    explicit error for them so that we can ensure that they are always visible:
 
592
    KnownFailures are always shown in the output of bzr selftest.
 
593
    """
 
594
 
 
595
 
 
596
class UnavailableFeature(Exception):
 
597
    """A feature required for this test was not available.
 
598
 
 
599
    The feature should be used to construct the exception.
 
600
    """
200
601
 
201
602
 
202
603
class CommandFailed(Exception):
203
604
    pass
204
605
 
 
606
 
 
607
class StringIOWrapper(object):
 
608
    """A wrapper around cStringIO which just adds an encoding attribute.
 
609
    
 
610
    Internally we can check sys.stdout to see what the output encoding
 
611
    should be. However, cStringIO has no encoding attribute that we can
 
612
    set. So we wrap it instead.
 
613
    """
 
614
    encoding='ascii'
 
615
    _cstring = None
 
616
 
 
617
    def __init__(self, s=None):
 
618
        if s is not None:
 
619
            self.__dict__['_cstring'] = StringIO(s)
 
620
        else:
 
621
            self.__dict__['_cstring'] = StringIO()
 
622
 
 
623
    def __getattr__(self, name, getattr=getattr):
 
624
        return getattr(self.__dict__['_cstring'], name)
 
625
 
 
626
    def __setattr__(self, name, val):
 
627
        if name == 'encoding':
 
628
            self.__dict__['encoding'] = val
 
629
        else:
 
630
            return setattr(self._cstring, name, val)
 
631
 
 
632
 
 
633
class TestUIFactory(ui.CLIUIFactory):
 
634
    """A UI Factory for testing.
 
635
 
 
636
    Hide the progress bar but emit note()s.
 
637
    Redirect stdin.
 
638
    Allows get_password to be tested without real tty attached.
 
639
    """
 
640
 
 
641
    def __init__(self,
 
642
                 stdout=None,
 
643
                 stderr=None,
 
644
                 stdin=None):
 
645
        super(TestUIFactory, self).__init__()
 
646
        if stdin is not None:
 
647
            # We use a StringIOWrapper to be able to test various
 
648
            # encodings, but the user is still responsible to
 
649
            # encode the string and to set the encoding attribute
 
650
            # of StringIOWrapper.
 
651
            self.stdin = StringIOWrapper(stdin)
 
652
        if stdout is None:
 
653
            self.stdout = sys.stdout
 
654
        else:
 
655
            self.stdout = stdout
 
656
        if stderr is None:
 
657
            self.stderr = sys.stderr
 
658
        else:
 
659
            self.stderr = stderr
 
660
 
 
661
    def clear(self):
 
662
        """See progress.ProgressBar.clear()."""
 
663
 
 
664
    def clear_term(self):
 
665
        """See progress.ProgressBar.clear_term()."""
 
666
 
 
667
    def clear_term(self):
 
668
        """See progress.ProgressBar.clear_term()."""
 
669
 
 
670
    def finished(self):
 
671
        """See progress.ProgressBar.finished()."""
 
672
 
 
673
    def note(self, fmt_string, *args, **kwargs):
 
674
        """See progress.ProgressBar.note()."""
 
675
        self.stdout.write((fmt_string + "\n") % args)
 
676
 
 
677
    def progress_bar(self):
 
678
        return self
 
679
 
 
680
    def nested_progress_bar(self):
 
681
        return self
 
682
 
 
683
    def update(self, message, count=None, total=None):
 
684
        """See progress.ProgressBar.update()."""
 
685
 
 
686
    def get_non_echoed_password(self, prompt):
 
687
        """Get password from stdin without trying to handle the echo mode"""
 
688
        if prompt:
 
689
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
690
        password = self.stdin.readline()
 
691
        if not password:
 
692
            raise EOFError
 
693
        if password[-1] == '\n':
 
694
            password = password[:-1]
 
695
        return password
 
696
 
 
697
 
205
698
class TestCase(unittest.TestCase):
206
699
    """Base class for bzr unit tests.
207
700
    
223
716
    accidentally overlooked.
224
717
    """
225
718
 
226
 
    BZRPATH = 'bzr'
227
719
    _log_file_name = None
228
720
    _log_contents = ''
 
721
    _keep_log_file = False
 
722
    # record lsprof data when performing benchmark calls.
 
723
    _gather_lsprof_in_benchmarks = False
 
724
 
 
725
    def __init__(self, methodName='testMethod'):
 
726
        super(TestCase, self).__init__(methodName)
 
727
        self._cleanups = []
229
728
 
230
729
    def setUp(self):
231
730
        unittest.TestCase.setUp(self)
232
 
        self._cleanups = []
233
731
        self._cleanEnvironment()
234
732
        bzrlib.trace.disable_default_logging()
 
733
        self._silenceUI()
235
734
        self._startLogFile()
 
735
        self._benchcalls = []
 
736
        self._benchtime = None
 
737
        self._clear_hooks()
 
738
        self._clear_debug_flags()
 
739
 
 
740
    def _clear_debug_flags(self):
 
741
        """Prevent externally set debug flags affecting tests.
 
742
        
 
743
        Tests that want to use debug flags can just set them in the
 
744
        debug_flags set during setup/teardown.
 
745
        """
 
746
        self._preserved_debug_flags = set(debug.debug_flags)
 
747
        debug.debug_flags.clear()
 
748
        self.addCleanup(self._restore_debug_flags)
 
749
 
 
750
    def _clear_hooks(self):
 
751
        # prevent hooks affecting tests
 
752
        import bzrlib.branch
 
753
        import bzrlib.smart.server
 
754
        self._preserved_hooks = {
 
755
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
756
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
757
            }
 
758
        self.addCleanup(self._restoreHooks)
 
759
        # reset all hooks to an empty instance of the appropriate type
 
760
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
761
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
762
 
 
763
    def _silenceUI(self):
 
764
        """Turn off UI for duration of test"""
 
765
        # by default the UI is off; tests can turn it on if they want it.
 
766
        saved = ui.ui_factory
 
767
        def _restore():
 
768
            ui.ui_factory = saved
 
769
        ui.ui_factory = ui.SilentUIFactory()
 
770
        self.addCleanup(_restore)
236
771
 
237
772
    def _ndiff_strings(self, a, b):
238
773
        """Return ndiff between two strings containing lines.
249
784
                                  charjunk=lambda x: False)
250
785
        return ''.join(difflines)
251
786
 
252
 
    def assertEqualDiff(self, a, b):
 
787
    def assertEqual(self, a, b, message=''):
 
788
        try:
 
789
            if a == b:
 
790
                return
 
791
        except UnicodeError, e:
 
792
            # If we can't compare without getting a UnicodeError, then
 
793
            # obviously they are different
 
794
            mutter('UnicodeError: %s', e)
 
795
        if message:
 
796
            message += '\n'
 
797
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
798
            % (message,
 
799
               pformat(a), pformat(b)))
 
800
 
 
801
    assertEquals = assertEqual
 
802
 
 
803
    def assertEqualDiff(self, a, b, message=None):
253
804
        """Assert two texts are equal, if not raise an exception.
254
805
        
255
806
        This is intended for use with multi-line strings where it can 
258
809
        # TODO: perhaps override assertEquals to call this for strings?
259
810
        if a == b:
260
811
            return
261
 
        raise AssertionError("texts not equal:\n" + 
262
 
                             self._ndiff_strings(a, b))      
 
812
        if message is None:
 
813
            message = "texts not equal:\n"
 
814
        raise AssertionError(message +
 
815
                             self._ndiff_strings(a, b))
263
816
        
 
817
    def assertEqualMode(self, mode, mode_test):
 
818
        self.assertEqual(mode, mode_test,
 
819
                         'mode mismatch %o != %o' % (mode, mode_test))
 
820
 
 
821
    def assertPositive(self, val):
 
822
        """Assert that val is greater than 0."""
 
823
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
824
 
 
825
    def assertNegative(self, val):
 
826
        """Assert that val is less than 0."""
 
827
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
828
 
264
829
    def assertStartsWith(self, s, prefix):
265
830
        if not s.startswith(prefix):
266
831
            raise AssertionError('string %r does not start with %r' % (s, prefix))
267
832
 
268
833
    def assertEndsWith(self, s, suffix):
269
 
        if not s.endswith(prefix):
 
834
        """Asserts that s ends with suffix."""
 
835
        if not s.endswith(suffix):
270
836
            raise AssertionError('string %r does not end with %r' % (s, suffix))
271
837
 
272
838
    def assertContainsRe(self, haystack, needle_re):
273
839
        """Assert that a contains something matching a regular expression."""
274
840
        if not re.search(needle_re, haystack):
275
 
            raise AssertionError('pattern "%s" not found in "%s"'
 
841
            if '\n' in haystack or len(haystack) > 60:
 
842
                # a long string, format it in a more readable way
 
843
                raise AssertionError(
 
844
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
845
                        % (needle_re, haystack))
 
846
            else:
 
847
                raise AssertionError('pattern "%s" not found in "%s"'
 
848
                        % (needle_re, haystack))
 
849
 
 
850
    def assertNotContainsRe(self, haystack, needle_re):
 
851
        """Assert that a does not match a regular expression"""
 
852
        if re.search(needle_re, haystack):
 
853
            raise AssertionError('pattern "%s" found in "%s"'
276
854
                    % (needle_re, haystack))
277
855
 
278
 
    def AssertSubset(self, sublist, superlist):
 
856
    def assertSubset(self, sublist, superlist):
279
857
        """Assert that every entry in sublist is present in superlist."""
280
858
        missing = []
281
859
        for entry in sublist:
285
863
            raise AssertionError("value(s) %r not present in container %r" % 
286
864
                                 (missing, superlist))
287
865
 
288
 
    def assertIs(self, left, right):
 
866
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
867
        """Fail unless excClass is raised when the iterator from func is used.
 
868
        
 
869
        Many functions can return generators this makes sure
 
870
        to wrap them in a list() call to make sure the whole generator
 
871
        is run, and that the proper exception is raised.
 
872
        """
 
873
        try:
 
874
            list(func(*args, **kwargs))
 
875
        except excClass:
 
876
            return
 
877
        else:
 
878
            if getattr(excClass,'__name__', None) is not None:
 
879
                excName = excClass.__name__
 
880
            else:
 
881
                excName = str(excClass)
 
882
            raise self.failureException, "%s not raised" % excName
 
883
 
 
884
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
885
        """Assert that a callable raises a particular exception.
 
886
 
 
887
        :param excClass: As for the except statement, this may be either an
 
888
            exception class, or a tuple of classes.
 
889
        :param callableObj: A callable, will be passed ``*args`` and
 
890
            ``**kwargs``.
 
891
 
 
892
        Returns the exception so that you can examine it.
 
893
        """
 
894
        try:
 
895
            callableObj(*args, **kwargs)
 
896
        except excClass, e:
 
897
            return e
 
898
        else:
 
899
            if getattr(excClass,'__name__', None) is not None:
 
900
                excName = excClass.__name__
 
901
            else:
 
902
                # probably a tuple
 
903
                excName = str(excClass)
 
904
            raise self.failureException, "%s not raised" % excName
 
905
 
 
906
    def assertIs(self, left, right, message=None):
289
907
        if not (left is right):
290
 
            raise AssertionError("%r is not %r." % (left, right))
 
908
            if message is not None:
 
909
                raise AssertionError(message)
 
910
            else:
 
911
                raise AssertionError("%r is not %r." % (left, right))
 
912
 
 
913
    def assertIsNot(self, left, right, message=None):
 
914
        if (left is right):
 
915
            if message is not None:
 
916
                raise AssertionError(message)
 
917
            else:
 
918
                raise AssertionError("%r is %r." % (left, right))
 
919
 
 
920
    def assertTransportMode(self, transport, path, mode):
 
921
        """Fail if a path does not have mode mode.
 
922
        
 
923
        If modes are not supported on this transport, the assertion is ignored.
 
924
        """
 
925
        if not transport._can_roundtrip_unix_modebits():
 
926
            return
 
927
        path_stat = transport.stat(path)
 
928
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
929
        self.assertEqual(mode, actual_mode,
 
930
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
931
 
 
932
    def assertIsInstance(self, obj, kls):
 
933
        """Fail if obj is not an instance of kls"""
 
934
        if not isinstance(obj, kls):
 
935
            self.fail("%r is an instance of %s rather than %s" % (
 
936
                obj, obj.__class__, kls))
 
937
 
 
938
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
939
        """Invoke a test, expecting it to fail for the given reason.
 
940
 
 
941
        This is for assertions that ought to succeed, but currently fail.
 
942
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
943
        about the failure you're expecting.  If a new bug is introduced,
 
944
        AssertionError should be raised, not KnownFailure.
 
945
 
 
946
        Frequently, expectFailure should be followed by an opposite assertion.
 
947
        See example below.
 
948
 
 
949
        Intended to be used with a callable that raises AssertionError as the
 
950
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
951
 
 
952
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
953
        test succeeds.
 
954
 
 
955
        example usage::
 
956
 
 
957
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
958
                             dynamic_val)
 
959
          self.assertEqual(42, dynamic_val)
 
960
 
 
961
          This means that a dynamic_val of 54 will cause the test to raise
 
962
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
963
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
964
          than 54 or 42 will cause an AssertionError.
 
965
        """
 
966
        try:
 
967
            assertion(*args, **kwargs)
 
968
        except AssertionError:
 
969
            raise KnownFailure(reason)
 
970
        else:
 
971
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
972
 
 
973
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
974
        """A helper for callDeprecated and applyDeprecated.
 
975
 
 
976
        :param a_callable: A callable to call.
 
977
        :param args: The positional arguments for the callable
 
978
        :param kwargs: The keyword arguments for the callable
 
979
        :return: A tuple (warnings, result). result is the result of calling
 
980
            a_callable(``*args``, ``**kwargs``).
 
981
        """
 
982
        local_warnings = []
 
983
        def capture_warnings(msg, cls=None, stacklevel=None):
 
984
            # we've hooked into a deprecation specific callpath,
 
985
            # only deprecations should getting sent via it.
 
986
            self.assertEqual(cls, DeprecationWarning)
 
987
            local_warnings.append(msg)
 
988
        original_warning_method = symbol_versioning.warn
 
989
        symbol_versioning.set_warning_method(capture_warnings)
 
990
        try:
 
991
            result = a_callable(*args, **kwargs)
 
992
        finally:
 
993
            symbol_versioning.set_warning_method(original_warning_method)
 
994
        return (local_warnings, result)
 
995
 
 
996
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
997
        """Call a deprecated callable without warning the user.
 
998
 
 
999
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1000
        not other callers that go direct to the warning module.
 
1001
 
 
1002
        :param deprecation_format: The deprecation format that the callable
 
1003
            should have been deprecated with. This is the same type as the
 
1004
            parameter to deprecated_method/deprecated_function. If the
 
1005
            callable is not deprecated with this format, an assertion error
 
1006
            will be raised.
 
1007
        :param a_callable: A callable to call. This may be a bound method or
 
1008
            a regular function. It will be called with ``*args`` and
 
1009
            ``**kwargs``.
 
1010
        :param args: The positional arguments for the callable
 
1011
        :param kwargs: The keyword arguments for the callable
 
1012
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1013
        """
 
1014
        call_warnings, result = self._capture_warnings(a_callable,
 
1015
            *args, **kwargs)
 
1016
        expected_first_warning = symbol_versioning.deprecation_string(
 
1017
            a_callable, deprecation_format)
 
1018
        if len(call_warnings) == 0:
 
1019
            self.fail("No deprecation warning generated by call to %s" %
 
1020
                a_callable)
 
1021
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1022
        return result
 
1023
 
 
1024
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1025
        """Assert that a callable is deprecated in a particular way.
 
1026
 
 
1027
        This is a very precise test for unusual requirements. The 
 
1028
        applyDeprecated helper function is probably more suited for most tests
 
1029
        as it allows you to simply specify the deprecation format being used
 
1030
        and will ensure that that is issued for the function being called.
 
1031
 
 
1032
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1033
        not other callers that go direct to the warning module.
 
1034
 
 
1035
        :param expected: a list of the deprecation warnings expected, in order
 
1036
        :param callable: The callable to call
 
1037
        :param args: The positional arguments for the callable
 
1038
        :param kwargs: The keyword arguments for the callable
 
1039
        """
 
1040
        call_warnings, result = self._capture_warnings(callable,
 
1041
            *args, **kwargs)
 
1042
        self.assertEqual(expected, call_warnings)
 
1043
        return result
291
1044
 
292
1045
    def _startLogFile(self):
293
1046
        """Send bzr and test log messages to a temporary file.
295
1048
        The file is removed as the test is torn down.
296
1049
        """
297
1050
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
298
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
299
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
300
 
        bzrlib.trace.enable_test_log(self._log_file)
 
1051
        self._log_file = os.fdopen(fileno, 'w+')
 
1052
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
301
1053
        self._log_file_name = name
302
1054
        self.addCleanup(self._finishLogFile)
303
1055
 
304
1056
    def _finishLogFile(self):
305
1057
        """Finished with the log file.
306
1058
 
307
 
        Read contents into memory, close, and delete.
 
1059
        Close the file and delete it, unless setKeepLogfile was called.
308
1060
        """
309
 
        bzrlib.trace.disable_test_log()
310
 
        self._log_file.seek(0)
311
 
        self._log_contents = self._log_file.read()
 
1061
        if self._log_file is None:
 
1062
            return
 
1063
        bzrlib.trace.disable_test_log(self._log_nonce)
312
1064
        self._log_file.close()
313
 
        os.remove(self._log_file_name)
314
 
        self._log_file = self._log_file_name = None
 
1065
        self._log_file = None
 
1066
        if not self._keep_log_file:
 
1067
            os.remove(self._log_file_name)
 
1068
            self._log_file_name = None
 
1069
 
 
1070
    def setKeepLogfile(self):
 
1071
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1072
        self._keep_log_file = True
315
1073
 
316
1074
    def addCleanup(self, callable):
317
1075
        """Arrange to run a callable when this case is torn down.
326
1084
 
327
1085
    def _cleanEnvironment(self):
328
1086
        new_env = {
 
1087
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
329
1088
            'HOME': os.getcwd(),
330
 
            'APPDATA': os.getcwd(),
331
 
            'BZREMAIL': None,
 
1089
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1090
            'BZR_EMAIL': None,
 
1091
            'BZREMAIL': None, # may still be present in the environment
332
1092
            'EMAIL': None,
 
1093
            'BZR_PROGRESS_BAR': None,
 
1094
            # SSH Agent
 
1095
            'SSH_AUTH_SOCK': None,
 
1096
            # Proxies
 
1097
            'http_proxy': None,
 
1098
            'HTTP_PROXY': None,
 
1099
            'https_proxy': None,
 
1100
            'HTTPS_PROXY': None,
 
1101
            'no_proxy': None,
 
1102
            'NO_PROXY': None,
 
1103
            'all_proxy': None,
 
1104
            'ALL_PROXY': None,
 
1105
            # Nobody cares about these ones AFAIK. So far at
 
1106
            # least. If you do (care), please update this comment
 
1107
            # -- vila 20061212
 
1108
            'ftp_proxy': None,
 
1109
            'FTP_PROXY': None,
 
1110
            'BZR_REMOTE_PATH': None,
333
1111
        }
334
1112
        self.__old_env = {}
335
1113
        self.addCleanup(self._restoreEnvironment)
336
1114
        for name, value in new_env.iteritems():
337
1115
            self._captureVar(name, value)
338
1116
 
339
 
 
340
1117
    def _captureVar(self, name, newvalue):
341
 
        """Set an environment variable, preparing it to be reset when finished."""
342
 
        self.__old_env[name] = os.environ.get(name, None)
343
 
        if newvalue is None:
344
 
            if name in os.environ:
345
 
                del os.environ[name]
346
 
        else:
347
 
            os.environ[name] = newvalue
 
1118
        """Set an environment variable, and reset it when finished."""
 
1119
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
348
1120
 
349
 
    @staticmethod
350
 
    def _restoreVar(name, value):
351
 
        if value is None:
352
 
            if name in os.environ:
353
 
                del os.environ[name]
354
 
        else:
355
 
            os.environ[name] = value
 
1121
    def _restore_debug_flags(self):
 
1122
        debug.debug_flags.clear()
 
1123
        debug.debug_flags.update(self._preserved_debug_flags)
356
1124
 
357
1125
    def _restoreEnvironment(self):
358
1126
        for name, value in self.__old_env.iteritems():
359
 
            self._restoreVar(name, value)
 
1127
            osutils.set_or_unset_env(name, value)
 
1128
 
 
1129
    def _restoreHooks(self):
 
1130
        for klass, hooks in self._preserved_hooks.items():
 
1131
            setattr(klass, 'hooks', hooks)
 
1132
 
 
1133
    def knownFailure(self, reason):
 
1134
        """This test has failed for some known reason."""
 
1135
        raise KnownFailure(reason)
 
1136
 
 
1137
    def run(self, result=None):
 
1138
        if result is None: result = self.defaultTestResult()
 
1139
        for feature in getattr(self, '_test_needs_features', []):
 
1140
            if not feature.available():
 
1141
                result.startTest(self)
 
1142
                if getattr(result, 'addNotSupported', None):
 
1143
                    result.addNotSupported(self, feature)
 
1144
                else:
 
1145
                    result.addSuccess(self)
 
1146
                result.stopTest(self)
 
1147
                return
 
1148
        return unittest.TestCase.run(self, result)
360
1149
 
361
1150
    def tearDown(self):
362
1151
        self._runCleanups()
363
1152
        unittest.TestCase.tearDown(self)
364
1153
 
 
1154
    def time(self, callable, *args, **kwargs):
 
1155
        """Run callable and accrue the time it takes to the benchmark time.
 
1156
        
 
1157
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1158
        this will cause lsprofile statistics to be gathered and stored in
 
1159
        self._benchcalls.
 
1160
        """
 
1161
        if self._benchtime is None:
 
1162
            self._benchtime = 0
 
1163
        start = time.time()
 
1164
        try:
 
1165
            if not self._gather_lsprof_in_benchmarks:
 
1166
                return callable(*args, **kwargs)
 
1167
            else:
 
1168
                # record this benchmark
 
1169
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1170
                stats.sort()
 
1171
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1172
                return ret
 
1173
        finally:
 
1174
            self._benchtime += time.time() - start
 
1175
 
365
1176
    def _runCleanups(self):
366
1177
        """Run registered cleanup functions. 
367
1178
 
368
1179
        This should only be called from TestCase.tearDown.
369
1180
        """
370
 
        for cleanup_fn in reversed(self._cleanups):
371
 
            cleanup_fn()
 
1181
        # TODO: Perhaps this should keep running cleanups even if 
 
1182
        # one of them fails?
 
1183
 
 
1184
        # Actually pop the cleanups from the list so tearDown running
 
1185
        # twice is safe (this happens for skipped tests).
 
1186
        while self._cleanups:
 
1187
            self._cleanups.pop()()
372
1188
 
373
1189
    def log(self, *args):
374
1190
        mutter(*args)
375
1191
 
376
 
    def _get_log(self):
377
 
        """Return as a string the log for this test"""
378
 
        if self._log_file_name:
379
 
            return open(self._log_file_name).read()
380
 
        else:
 
1192
    def _get_log(self, keep_log_file=False):
 
1193
        """Return as a string the log for this test. If the file is still
 
1194
        on disk and keep_log_file=False, delete the log file and store the
 
1195
        content in self._log_contents."""
 
1196
        # flush the log file, to get all content
 
1197
        import bzrlib.trace
 
1198
        bzrlib.trace._trace_file.flush()
 
1199
        if self._log_contents:
381
1200
            return self._log_contents
382
 
        # TODO: Delete the log after it's been read in
 
1201
        if self._log_file_name is not None:
 
1202
            logfile = open(self._log_file_name)
 
1203
            try:
 
1204
                log_contents = logfile.read()
 
1205
            finally:
 
1206
                logfile.close()
 
1207
            if not keep_log_file:
 
1208
                self._log_contents = log_contents
 
1209
                try:
 
1210
                    os.remove(self._log_file_name)
 
1211
                except OSError, e:
 
1212
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1213
                        print >>sys.stderr, ('Unable to delete log file '
 
1214
                                             ' %r' % self._log_file_name)
 
1215
                    else:
 
1216
                        raise
 
1217
            return log_contents
 
1218
        else:
 
1219
            return "DELETED log file to reduce memory footprint"
383
1220
 
 
1221
    @deprecated_method(zero_eighteen)
384
1222
    def capture(self, cmd, retcode=0):
385
1223
        """Shortcut that splits cmd into words, runs, and returns stdout"""
386
1224
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
387
1225
 
388
 
    def run_bzr_captured(self, argv, retcode=0):
 
1226
    def requireFeature(self, feature):
 
1227
        """This test requires a specific feature is available.
 
1228
 
 
1229
        :raises UnavailableFeature: When feature is not available.
 
1230
        """
 
1231
        if not feature.available():
 
1232
            raise UnavailableFeature(feature)
 
1233
 
 
1234
    @deprecated_method(zero_eighteen)
 
1235
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1236
                         working_dir=None):
389
1237
        """Invoke bzr and return (stdout, stderr).
390
1238
 
391
 
        Useful for code that wants to check the contents of the
392
 
        output, the way error messages are presented, etc.
393
 
 
394
 
        This should be the main method for tests that want to exercise the
395
 
        overall behavior of the bzr application (rather than a unit test
396
 
        or a functional test of the library.)
397
 
 
398
 
        Much of the old code runs bzr by forking a new copy of Python, but
399
 
        that is slower, harder to debug, and generally not necessary.
400
 
 
401
 
        This runs bzr through the interface that catches and reports
402
 
        errors, and with logging set to something approximating the
403
 
        default, so that error reporting can be checked.
404
 
 
405
 
        argv -- arguments to invoke bzr
406
 
        retcode -- expected return code, or None for don't-care.
 
1239
        Don't call this method, just use run_bzr() which is equivalent.
 
1240
 
 
1241
        :param argv: Arguments to invoke bzr.  This may be either a 
 
1242
            single string, in which case it is split by shlex into words, 
 
1243
            or a list of arguments.
 
1244
        :param retcode: Expected return code, or None for don't-care.
 
1245
        :param encoding: Encoding for sys.stdout and sys.stderr
 
1246
        :param stdin: A string to be used as stdin for the command.
 
1247
        :param working_dir: Change to this directory before running
407
1248
        """
408
 
        stdout = StringIO()
409
 
        stderr = StringIO()
410
 
        self.log('run bzr: %s', ' '.join(argv))
 
1249
        return self._run_bzr_autosplit(argv, retcode=retcode,
 
1250
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1251
                )
 
1252
 
 
1253
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1254
            working_dir):
 
1255
        """Run bazaar command line, splitting up a string command line."""
 
1256
        if isinstance(args, basestring):
 
1257
            args = list(shlex.split(args))
 
1258
        return self._run_bzr_core(args, retcode=retcode,
 
1259
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1260
                )
 
1261
 
 
1262
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1263
            working_dir):
 
1264
        if encoding is None:
 
1265
            encoding = bzrlib.user_encoding
 
1266
        stdout = StringIOWrapper()
 
1267
        stderr = StringIOWrapper()
 
1268
        stdout.encoding = encoding
 
1269
        stderr.encoding = encoding
 
1270
 
 
1271
        self.log('run bzr: %r', args)
411
1272
        # FIXME: don't call into logging here
412
1273
        handler = logging.StreamHandler(stderr)
413
 
        handler.setFormatter(bzrlib.trace.QuietFormatter())
414
1274
        handler.setLevel(logging.INFO)
415
1275
        logger = logging.getLogger('')
416
1276
        logger.addHandler(handler)
 
1277
        old_ui_factory = ui.ui_factory
 
1278
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1279
 
 
1280
        cwd = None
 
1281
        if working_dir is not None:
 
1282
            cwd = osutils.getcwd()
 
1283
            os.chdir(working_dir)
 
1284
 
417
1285
        try:
418
 
            result = self.apply_redirected(None, stdout, stderr,
419
 
                                           bzrlib.commands.run_bzr_catch_errors,
420
 
                                           argv)
 
1286
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1287
                stdout, stderr,
 
1288
                bzrlib.commands.run_bzr_catch_errors,
 
1289
                args)
421
1290
        finally:
422
1291
            logger.removeHandler(handler)
 
1292
            ui.ui_factory = old_ui_factory
 
1293
            if cwd is not None:
 
1294
                os.chdir(cwd)
 
1295
 
423
1296
        out = stdout.getvalue()
424
1297
        err = stderr.getvalue()
425
1298
        if out:
426
 
            self.log('output:\n%s', out)
 
1299
            self.log('output:\n%r', out)
427
1300
        if err:
428
 
            self.log('errors:\n%s', err)
 
1301
            self.log('errors:\n%r', err)
429
1302
        if retcode is not None:
430
 
            self.assertEquals(result, retcode)
 
1303
            self.assertEquals(retcode, result,
 
1304
                              message='Unexpected return code')
431
1305
        return out, err
432
1306
 
433
1307
    def run_bzr(self, *args, **kwargs):
434
1308
        """Invoke bzr, as if it were run from the command line.
435
1309
 
 
1310
        The argument list should not include the bzr program name - the
 
1311
        first argument is normally the bzr command.  Arguments may be
 
1312
        passed in three ways:
 
1313
 
 
1314
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1315
        when the command contains whitespace or metacharacters, or 
 
1316
        is built up at run time.
 
1317
 
 
1318
        2- A single string, eg "add a".  This is the most convenient 
 
1319
        for hardcoded commands.
 
1320
 
 
1321
        3- Several varargs parameters, eg run_bzr("add", "a").  
 
1322
        This is not recommended for new code.
 
1323
 
 
1324
        This runs bzr through the interface that catches and reports
 
1325
        errors, and with logging set to something approximating the
 
1326
        default, so that error reporting can be checked.
 
1327
 
436
1328
        This should be the main method for tests that want to exercise the
437
1329
        overall behavior of the bzr application (rather than a unit test
438
1330
        or a functional test of the library.)
439
1331
 
440
1332
        This sends the stdout/stderr results into the test's log,
441
1333
        where it may be useful for debugging.  See also run_captured.
 
1334
 
 
1335
        :keyword stdin: A string to be used as stdin for the command.
 
1336
        :keyword retcode: The status code the command should return;
 
1337
            default 0.
 
1338
        :keyword working_dir: The directory to run the command in
 
1339
        :keyword error_regexes: A list of expected error messages.  If
 
1340
            specified they must be seen in the error output of the command.
442
1341
        """
443
1342
        retcode = kwargs.pop('retcode', 0)
444
 
        return self.run_bzr_captured(args, retcode)
 
1343
        encoding = kwargs.pop('encoding', None)
 
1344
        stdin = kwargs.pop('stdin', None)
 
1345
        working_dir = kwargs.pop('working_dir', None)
 
1346
        error_regexes = kwargs.pop('error_regexes', [])
 
1347
 
 
1348
        if len(args) == 1:
 
1349
            if isinstance(args[0], (list, basestring)):
 
1350
                args = args[0]
 
1351
        else:
 
1352
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
 
1353
                                   DeprecationWarning, stacklevel=3)
 
1354
 
 
1355
        out, err = self._run_bzr_autosplit(args=args,
 
1356
            retcode=retcode,
 
1357
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1358
            )
 
1359
 
 
1360
        for regex in error_regexes:
 
1361
            self.assertContainsRe(err, regex)
 
1362
        return out, err
 
1363
 
 
1364
    def run_bzr_decode(self, *args, **kwargs):
 
1365
        if 'encoding' in kwargs:
 
1366
            encoding = kwargs['encoding']
 
1367
        else:
 
1368
            encoding = bzrlib.user_encoding
 
1369
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1370
 
 
1371
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1372
        """Run bzr, and check that stderr contains the supplied regexes
 
1373
 
 
1374
        :param error_regexes: Sequence of regular expressions which
 
1375
            must each be found in the error output. The relative ordering
 
1376
            is not enforced.
 
1377
        :param args: command-line arguments for bzr
 
1378
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1379
            This function changes the default value of retcode to be 3,
 
1380
            since in most cases this is run when you expect bzr to fail.
 
1381
 
 
1382
        :return: (out, err) The actual output of running the command (in case
 
1383
            you want to do more inspection)
 
1384
 
 
1385
        Examples of use::
 
1386
 
 
1387
            # Make sure that commit is failing because there is nothing to do
 
1388
            self.run_bzr_error(['no changes to commit'],
 
1389
                               'commit', '-m', 'my commit comment')
 
1390
            # Make sure --strict is handling an unknown file, rather than
 
1391
            # giving us the 'nothing to do' error
 
1392
            self.build_tree(['unknown'])
 
1393
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1394
                               'commit', '--strict', '-m', 'my commit comment')
 
1395
        """
 
1396
        kwargs.setdefault('retcode', 3)
 
1397
        kwargs['error_regexes'] = error_regexes
 
1398
        out, err = self.run_bzr(*args, **kwargs)
 
1399
        return out, err
 
1400
 
 
1401
    def run_bzr_subprocess(self, *args, **kwargs):
 
1402
        """Run bzr in a subprocess for testing.
 
1403
 
 
1404
        This starts a new Python interpreter and runs bzr in there. 
 
1405
        This should only be used for tests that have a justifiable need for
 
1406
        this isolation: e.g. they are testing startup time, or signal
 
1407
        handling, or early startup code, etc.  Subprocess code can't be 
 
1408
        profiled or debugged so easily.
 
1409
 
 
1410
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1411
            None is supplied, the status code is not checked.
 
1412
        :keyword env_changes: A dictionary which lists changes to environment
 
1413
            variables. A value of None will unset the env variable.
 
1414
            The values must be strings. The change will only occur in the
 
1415
            child, so you don't need to fix the environment after running.
 
1416
        :keyword universal_newlines: Convert CRLF => LF
 
1417
        :keyword allow_plugins: By default the subprocess is run with
 
1418
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1419
            for system-wide plugins to create unexpected output on stderr,
 
1420
            which can cause unnecessary test failures.
 
1421
        """
 
1422
        env_changes = kwargs.get('env_changes', {})
 
1423
        working_dir = kwargs.get('working_dir', None)
 
1424
        allow_plugins = kwargs.get('allow_plugins', False)
 
1425
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1426
                                            working_dir=working_dir,
 
1427
                                            allow_plugins=allow_plugins)
 
1428
        # We distinguish between retcode=None and retcode not passed.
 
1429
        supplied_retcode = kwargs.get('retcode', 0)
 
1430
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1431
            universal_newlines=kwargs.get('universal_newlines', False),
 
1432
            process_args=args)
 
1433
 
 
1434
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1435
                             skip_if_plan_to_signal=False,
 
1436
                             working_dir=None,
 
1437
                             allow_plugins=False):
 
1438
        """Start bzr in a subprocess for testing.
 
1439
 
 
1440
        This starts a new Python interpreter and runs bzr in there.
 
1441
        This should only be used for tests that have a justifiable need for
 
1442
        this isolation: e.g. they are testing startup time, or signal
 
1443
        handling, or early startup code, etc.  Subprocess code can't be
 
1444
        profiled or debugged so easily.
 
1445
 
 
1446
        :param process_args: a list of arguments to pass to the bzr executable,
 
1447
            for example ``['--version']``.
 
1448
        :param env_changes: A dictionary which lists changes to environment
 
1449
            variables. A value of None will unset the env variable.
 
1450
            The values must be strings. The change will only occur in the
 
1451
            child, so you don't need to fix the environment after running.
 
1452
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1453
            is not available.
 
1454
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1455
 
 
1456
        :returns: Popen object for the started process.
 
1457
        """
 
1458
        if skip_if_plan_to_signal:
 
1459
            if not getattr(os, 'kill', None):
 
1460
                raise TestSkipped("os.kill not available.")
 
1461
 
 
1462
        if env_changes is None:
 
1463
            env_changes = {}
 
1464
        old_env = {}
 
1465
 
 
1466
        def cleanup_environment():
 
1467
            for env_var, value in env_changes.iteritems():
 
1468
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1469
 
 
1470
        def restore_environment():
 
1471
            for env_var, value in old_env.iteritems():
 
1472
                osutils.set_or_unset_env(env_var, value)
 
1473
 
 
1474
        bzr_path = self.get_bzr_path()
 
1475
 
 
1476
        cwd = None
 
1477
        if working_dir is not None:
 
1478
            cwd = osutils.getcwd()
 
1479
            os.chdir(working_dir)
 
1480
 
 
1481
        try:
 
1482
            # win32 subprocess doesn't support preexec_fn
 
1483
            # so we will avoid using it on all platforms, just to
 
1484
            # make sure the code path is used, and we don't break on win32
 
1485
            cleanup_environment()
 
1486
            command = [sys.executable, bzr_path]
 
1487
            if not allow_plugins:
 
1488
                command.append('--no-plugins')
 
1489
            command.extend(process_args)
 
1490
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1491
        finally:
 
1492
            restore_environment()
 
1493
            if cwd is not None:
 
1494
                os.chdir(cwd)
 
1495
 
 
1496
        return process
 
1497
 
 
1498
    def _popen(self, *args, **kwargs):
 
1499
        """Place a call to Popen.
 
1500
 
 
1501
        Allows tests to override this method to intercept the calls made to
 
1502
        Popen for introspection.
 
1503
        """
 
1504
        return Popen(*args, **kwargs)
 
1505
 
 
1506
    def get_bzr_path(self):
 
1507
        """Return the path of the 'bzr' executable for this test suite."""
 
1508
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1509
        if not os.path.isfile(bzr_path):
 
1510
            # We are probably installed. Assume sys.argv is the right file
 
1511
            bzr_path = sys.argv[0]
 
1512
        return bzr_path
 
1513
 
 
1514
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1515
                              universal_newlines=False, process_args=None):
 
1516
        """Finish the execution of process.
 
1517
 
 
1518
        :param process: the Popen object returned from start_bzr_subprocess.
 
1519
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1520
            None is supplied, the status code is not checked.
 
1521
        :param send_signal: an optional signal to send to the process.
 
1522
        :param universal_newlines: Convert CRLF => LF
 
1523
        :returns: (stdout, stderr)
 
1524
        """
 
1525
        if send_signal is not None:
 
1526
            os.kill(process.pid, send_signal)
 
1527
        out, err = process.communicate()
 
1528
 
 
1529
        if universal_newlines:
 
1530
            out = out.replace('\r\n', '\n')
 
1531
            err = err.replace('\r\n', '\n')
 
1532
 
 
1533
        if retcode is not None and retcode != process.returncode:
 
1534
            if process_args is None:
 
1535
                process_args = "(unknown args)"
 
1536
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1537
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1538
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1539
                      % (process_args, retcode, process.returncode))
 
1540
        return [out, err]
445
1541
 
446
1542
    def check_inventory_shape(self, inv, shape):
447
1543
        """Compare an inventory to a list of expected names.
452
1548
        shape = list(shape)             # copy
453
1549
        for path, ie in inv.entries():
454
1550
            name = path.replace('\\', '/')
455
 
            if ie.kind == 'dir':
 
1551
            if ie.kind == 'directory':
456
1552
                name = name + '/'
457
1553
            if name in shape:
458
1554
                shape.remove(name)
473
1569
        if stdin is None:
474
1570
            stdin = StringIO("")
475
1571
        if stdout is None:
476
 
            if hasattr(self, "_log_file"):
 
1572
            if getattr(self, "_log_file", None) is not None:
477
1573
                stdout = self._log_file
478
1574
            else:
479
1575
                stdout = StringIO()
480
1576
        if stderr is None:
481
 
            if hasattr(self, "_log_file"):
 
1577
            if getattr(self, "_log_file", None is not None):
482
1578
                stderr = self._log_file
483
1579
            else:
484
1580
                stderr = StringIO()
495
1591
            sys.stderr = real_stderr
496
1592
            sys.stdin = real_stdin
497
1593
 
498
 
 
499
 
BzrTestBase = TestCase
 
1594
    def reduceLockdirTimeout(self):
 
1595
        """Reduce the default lock timeout for the duration of the test, so that
 
1596
        if LockContention occurs during a test, it does so quickly.
 
1597
 
 
1598
        Tests that expect to provoke LockContention errors should call this.
 
1599
        """
 
1600
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1601
        def resetTimeout():
 
1602
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1603
        self.addCleanup(resetTimeout)
 
1604
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1605
 
 
1606
 
 
1607
class TestCaseWithMemoryTransport(TestCase):
 
1608
    """Common test class for tests that do not need disk resources.
 
1609
 
 
1610
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1611
 
 
1612
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1613
 
 
1614
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1615
    a directory which does not exist. This serves to help ensure test isolation
 
1616
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1617
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1618
    file defaults for the transport in tests, nor does it obey the command line
 
1619
    override, so tests that accidentally write to the common directory should
 
1620
    be rare.
 
1621
 
 
1622
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1623
    a .bzr directory that stops us ascending higher into the filesystem.
 
1624
    """
 
1625
 
 
1626
    TEST_ROOT = None
 
1627
    _TEST_NAME = 'test'
 
1628
 
 
1629
    def __init__(self, methodName='runTest'):
 
1630
        # allow test parameterisation after test construction and before test
 
1631
        # execution. Variables that the parameteriser sets need to be 
 
1632
        # ones that are not set by setUp, or setUp will trash them.
 
1633
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1634
        self.vfs_transport_factory = default_transport
 
1635
        self.transport_server = None
 
1636
        self.transport_readonly_server = None
 
1637
        self.__vfs_server = None
 
1638
 
 
1639
    def get_transport(self, relpath=None):
 
1640
        """Return a writeable transport.
 
1641
 
 
1642
        This transport is for the test scratch space relative to
 
1643
        "self._test_root"
 
1644
        
 
1645
        :param relpath: a path relative to the base url.
 
1646
        """
 
1647
        t = get_transport(self.get_url(relpath))
 
1648
        self.assertFalse(t.is_readonly())
 
1649
        return t
 
1650
 
 
1651
    def get_readonly_transport(self, relpath=None):
 
1652
        """Return a readonly transport for the test scratch space
 
1653
        
 
1654
        This can be used to test that operations which should only need
 
1655
        readonly access in fact do not try to write.
 
1656
 
 
1657
        :param relpath: a path relative to the base url.
 
1658
        """
 
1659
        t = get_transport(self.get_readonly_url(relpath))
 
1660
        self.assertTrue(t.is_readonly())
 
1661
        return t
 
1662
 
 
1663
    def create_transport_readonly_server(self):
 
1664
        """Create a transport server from class defined at init.
 
1665
 
 
1666
        This is mostly a hook for daughter classes.
 
1667
        """
 
1668
        return self.transport_readonly_server()
 
1669
 
 
1670
    def get_readonly_server(self):
 
1671
        """Get the server instance for the readonly transport
 
1672
 
 
1673
        This is useful for some tests with specific servers to do diagnostics.
 
1674
        """
 
1675
        if self.__readonly_server is None:
 
1676
            if self.transport_readonly_server is None:
 
1677
                # readonly decorator requested
 
1678
                # bring up the server
 
1679
                self.__readonly_server = ReadonlyServer()
 
1680
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1681
            else:
 
1682
                self.__readonly_server = self.create_transport_readonly_server()
 
1683
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1684
            self.addCleanup(self.__readonly_server.tearDown)
 
1685
        return self.__readonly_server
 
1686
 
 
1687
    def get_readonly_url(self, relpath=None):
 
1688
        """Get a URL for the readonly transport.
 
1689
 
 
1690
        This will either be backed by '.' or a decorator to the transport 
 
1691
        used by self.get_url()
 
1692
        relpath provides for clients to get a path relative to the base url.
 
1693
        These should only be downwards relative, not upwards.
 
1694
        """
 
1695
        base = self.get_readonly_server().get_url()
 
1696
        return self._adjust_url(base, relpath)
 
1697
 
 
1698
    def get_vfs_only_server(self):
 
1699
        """Get the vfs only read/write server instance.
 
1700
 
 
1701
        This is useful for some tests with specific servers that need
 
1702
        diagnostics.
 
1703
 
 
1704
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1705
        is no means to override it.
 
1706
        """
 
1707
        if self.__vfs_server is None:
 
1708
            self.__vfs_server = MemoryServer()
 
1709
            self.__vfs_server.setUp()
 
1710
            self.addCleanup(self.__vfs_server.tearDown)
 
1711
        return self.__vfs_server
 
1712
 
 
1713
    def get_server(self):
 
1714
        """Get the read/write server instance.
 
1715
 
 
1716
        This is useful for some tests with specific servers that need
 
1717
        diagnostics.
 
1718
 
 
1719
        This is built from the self.transport_server factory. If that is None,
 
1720
        then the self.get_vfs_server is returned.
 
1721
        """
 
1722
        if self.__server is None:
 
1723
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1724
                return self.get_vfs_only_server()
 
1725
            else:
 
1726
                # bring up a decorated means of access to the vfs only server.
 
1727
                self.__server = self.transport_server()
 
1728
                try:
 
1729
                    self.__server.setUp(self.get_vfs_only_server())
 
1730
                except TypeError, e:
 
1731
                    # This should never happen; the try:Except here is to assist
 
1732
                    # developers having to update code rather than seeing an
 
1733
                    # uninformative TypeError.
 
1734
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1735
            self.addCleanup(self.__server.tearDown)
 
1736
        return self.__server
 
1737
 
 
1738
    def _adjust_url(self, base, relpath):
 
1739
        """Get a URL (or maybe a path) for the readwrite transport.
 
1740
 
 
1741
        This will either be backed by '.' or to an equivalent non-file based
 
1742
        facility.
 
1743
        relpath provides for clients to get a path relative to the base url.
 
1744
        These should only be downwards relative, not upwards.
 
1745
        """
 
1746
        if relpath is not None and relpath != '.':
 
1747
            if not base.endswith('/'):
 
1748
                base = base + '/'
 
1749
            # XXX: Really base should be a url; we did after all call
 
1750
            # get_url()!  But sometimes it's just a path (from
 
1751
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1752
            # to a non-escaped local path.
 
1753
            if base.startswith('./') or base.startswith('/'):
 
1754
                base += relpath
 
1755
            else:
 
1756
                base += urlutils.escape(relpath)
 
1757
        return base
 
1758
 
 
1759
    def get_url(self, relpath=None):
 
1760
        """Get a URL (or maybe a path) for the readwrite transport.
 
1761
 
 
1762
        This will either be backed by '.' or to an equivalent non-file based
 
1763
        facility.
 
1764
        relpath provides for clients to get a path relative to the base url.
 
1765
        These should only be downwards relative, not upwards.
 
1766
        """
 
1767
        base = self.get_server().get_url()
 
1768
        return self._adjust_url(base, relpath)
 
1769
 
 
1770
    def get_vfs_only_url(self, relpath=None):
 
1771
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1772
 
 
1773
        This will never be a smart protocol.  It always has all the
 
1774
        capabilities of the local filesystem, but it might actually be a
 
1775
        MemoryTransport or some other similar virtual filesystem.
 
1776
 
 
1777
        This is the backing transport (if any) of the server returned by
 
1778
        get_url and get_readonly_url.
 
1779
 
 
1780
        :param relpath: provides for clients to get a path relative to the base
 
1781
            url.  These should only be downwards relative, not upwards.
 
1782
        :return: A URL
 
1783
        """
 
1784
        base = self.get_vfs_only_server().get_url()
 
1785
        return self._adjust_url(base, relpath)
 
1786
 
 
1787
    def _make_test_root(self):
 
1788
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1789
            return
 
1790
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1791
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
1792
        
 
1793
        # make a fake bzr directory there to prevent any tests propagating
 
1794
        # up onto the source directory's real branch
 
1795
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1796
 
 
1797
        # The same directory is used by all tests, and we're not specifically
 
1798
        # told when all tests are finished.  This will do.
 
1799
        atexit.register(_rmtree_temp_dir, root)
 
1800
 
 
1801
    def makeAndChdirToTestDir(self):
 
1802
        """Create a temporary directories for this one test.
 
1803
        
 
1804
        This must set self.test_home_dir and self.test_dir and chdir to
 
1805
        self.test_dir.
 
1806
        
 
1807
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1808
        """
 
1809
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1810
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1811
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1812
        
 
1813
    def make_branch(self, relpath, format=None):
 
1814
        """Create a branch on the transport at relpath."""
 
1815
        repo = self.make_repository(relpath, format=format)
 
1816
        return repo.bzrdir.create_branch()
 
1817
 
 
1818
    def make_bzrdir(self, relpath, format=None):
 
1819
        try:
 
1820
            # might be a relative or absolute path
 
1821
            maybe_a_url = self.get_url(relpath)
 
1822
            segments = maybe_a_url.rsplit('/', 1)
 
1823
            t = get_transport(maybe_a_url)
 
1824
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1825
                t.ensure_base()
 
1826
            if format is None:
 
1827
                format = 'default'
 
1828
            if isinstance(format, basestring):
 
1829
                format = bzrdir.format_registry.make_bzrdir(format)
 
1830
            return format.initialize_on_transport(t)
 
1831
        except errors.UninitializableFormat:
 
1832
            raise TestSkipped("Format %s is not initializable." % format)
 
1833
 
 
1834
    def make_repository(self, relpath, shared=False, format=None):
 
1835
        """Create a repository on our default transport at relpath.
 
1836
        
 
1837
        Note that relpath must be a relative path, not a full url.
 
1838
        """
 
1839
        # FIXME: If you create a remoterepository this returns the underlying
 
1840
        # real format, which is incorrect.  Actually we should make sure that 
 
1841
        # RemoteBzrDir returns a RemoteRepository.
 
1842
        # maybe  mbp 20070410
 
1843
        made_control = self.make_bzrdir(relpath, format=format)
 
1844
        return made_control.create_repository(shared=shared)
 
1845
 
 
1846
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1847
        """Create a branch on the default transport and a MemoryTree for it."""
 
1848
        b = self.make_branch(relpath, format=format)
 
1849
        return memorytree.MemoryTree.create_on_branch(b)
 
1850
 
 
1851
    def overrideEnvironmentForTesting(self):
 
1852
        os.environ['HOME'] = self.test_home_dir
 
1853
        os.environ['BZR_HOME'] = self.test_home_dir
 
1854
        
 
1855
    def setUp(self):
 
1856
        super(TestCaseWithMemoryTransport, self).setUp()
 
1857
        self._make_test_root()
 
1858
        _currentdir = os.getcwdu()
 
1859
        def _leaveDirectory():
 
1860
            os.chdir(_currentdir)
 
1861
        self.addCleanup(_leaveDirectory)
 
1862
        self.makeAndChdirToTestDir()
 
1863
        self.overrideEnvironmentForTesting()
 
1864
        self.__readonly_server = None
 
1865
        self.__server = None
 
1866
        self.reduceLockdirTimeout()
500
1867
 
501
1868
     
502
 
class TestCaseInTempDir(TestCase):
 
1869
class TestCaseInTempDir(TestCaseWithMemoryTransport):
503
1870
    """Derived class that runs a test within a temporary directory.
504
1871
 
505
1872
    This is useful for tests that need to create a branch, etc.
509
1876
    All test cases create their own directory within that.  If the
510
1877
    tests complete successfully, the directory is removed.
511
1878
 
512
 
    InTempDir is an old alias for FunctionalTestCase.
 
1879
    :ivar test_base_dir: The path of the top-level directory for this 
 
1880
    test, which contains a home directory and a work directory.
 
1881
 
 
1882
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1883
    which is used as $HOME for this test.
 
1884
 
 
1885
    :ivar test_dir: A directory under test_base_dir used as the current
 
1886
    directory when the test proper is run.
513
1887
    """
514
1888
 
515
 
    TEST_ROOT = None
516
 
    _TEST_NAME = 'test'
517
1889
    OVERRIDE_PYTHON = 'python'
518
1890
 
519
1891
    def check_file_contents(self, filename, expect):
524
1896
            self.log("actually: %r" % contents)
525
1897
            self.fail("contents of %s not as expected" % filename)
526
1898
 
527
 
    def _make_test_root(self):
528
 
        if TestCaseInTempDir.TEST_ROOT is not None:
529
 
            return
530
 
        i = 0
531
 
        while True:
532
 
            root = u'test%04d.tmp' % i
533
 
            try:
534
 
                os.mkdir(root)
535
 
            except OSError, e:
536
 
                if e.errno == errno.EEXIST:
537
 
                    i += 1
538
 
                    continue
539
 
                else:
540
 
                    raise
541
 
            # successfully created
542
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
543
 
            break
544
 
        # make a fake bzr directory there to prevent any tests propagating
545
 
        # up onto the source directory's real branch
546
 
        os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
547
 
 
548
 
    def setUp(self):
549
 
        super(TestCaseInTempDir, self).setUp()
550
 
        self._make_test_root()
551
 
        _currentdir = os.getcwdu()
552
 
        short_id = self.id().replace('bzrlib.tests.', '') \
553
 
                   .replace('__main__.', '')
554
 
        self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
 
1899
    def makeAndChdirToTestDir(self):
 
1900
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1901
        
 
1902
        For TestCaseInTempDir we create a temporary directory based on the test
 
1903
        name and then create two subdirs - test and home under it.
 
1904
        """
 
1905
        # create a directory within the top level test directory
 
1906
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
1907
        # now create test and home directories within this dir
 
1908
        self.test_base_dir = candidate_dir
 
1909
        self.test_home_dir = self.test_base_dir + '/home'
 
1910
        os.mkdir(self.test_home_dir)
 
1911
        self.test_dir = self.test_base_dir + '/work'
555
1912
        os.mkdir(self.test_dir)
556
1913
        os.chdir(self.test_dir)
557
 
        os.environ['HOME'] = self.test_dir
558
 
        os.environ['APPDATA'] = self.test_dir
559
 
        def _leaveDirectory():
560
 
            os.chdir(_currentdir)
561
 
        self.addCleanup(_leaveDirectory)
562
 
        
563
 
    def build_tree(self, shape, line_endings='native'):
 
1914
        # put name of test inside
 
1915
        f = file(self.test_base_dir + '/name', 'w')
 
1916
        try:
 
1917
            f.write(self.id())
 
1918
        finally:
 
1919
            f.close()
 
1920
        self.addCleanup(self.deleteTestDir)
 
1921
 
 
1922
    def deleteTestDir(self):
 
1923
        os.chdir(self.TEST_ROOT)
 
1924
        _rmtree_temp_dir(self.test_base_dir)
 
1925
 
 
1926
    def build_tree(self, shape, line_endings='binary', transport=None):
564
1927
        """Build a test tree according to a pattern.
565
1928
 
566
1929
        shape is a sequence of file specifications.  If the final
567
1930
        character is '/', a directory is created.
568
1931
 
 
1932
        This assumes that all the elements in the tree being built are new.
 
1933
 
569
1934
        This doesn't add anything to a branch.
 
1935
 
570
1936
        :param line_endings: Either 'binary' or 'native'
571
 
                             in binary mode, exact contents are written
572
 
                             in native mode, the line endings match the
573
 
                             default platform endings.
 
1937
            in binary mode, exact contents are written in native mode, the
 
1938
            line endings match the default platform endings.
 
1939
        :param transport: A transport to write to, for building trees on VFS's.
 
1940
            If the transport is readonly or None, "." is opened automatically.
 
1941
        :return: None
574
1942
        """
575
 
        # XXX: It's OK to just create them using forward slashes on windows?
 
1943
        # It's OK to just create them using forward slashes on windows.
 
1944
        if transport is None or transport.is_readonly():
 
1945
            transport = get_transport(".")
576
1946
        for name in shape:
577
1947
            self.assert_(isinstance(name, basestring))
578
1948
            if name[-1] == '/':
579
 
                os.mkdir(name[:-1])
 
1949
                transport.mkdir(urlutils.escape(name[:-1]))
580
1950
            else:
581
1951
                if line_endings == 'binary':
582
 
                    f = file(name, 'wb')
 
1952
                    end = '\n'
583
1953
                elif line_endings == 'native':
584
 
                    f = file(name, 'wt')
 
1954
                    end = os.linesep
585
1955
                else:
586
 
                    raise BzrError('Invalid line ending request %r' % (line_endings,))
587
 
                print >>f, "contents of", name
588
 
                f.close()
 
1956
                    raise errors.BzrError(
 
1957
                        'Invalid line ending request %r' % line_endings)
 
1958
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1959
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
589
1960
 
590
1961
    def build_tree_contents(self, shape):
591
1962
        build_tree_contents(shape)
592
1963
 
 
1964
    def assertFileEqual(self, content, path):
 
1965
        """Fail if path does not contain 'content'."""
 
1966
        self.failUnlessExists(path)
 
1967
        f = file(path, 'rb')
 
1968
        try:
 
1969
            s = f.read()
 
1970
        finally:
 
1971
            f.close()
 
1972
        self.assertEqualDiff(content, s)
 
1973
 
593
1974
    def failUnlessExists(self, path):
594
 
        """Fail unless path, which may be abs or relative, exists."""
595
 
        self.failUnless(osutils.lexists(path))
 
1975
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1976
        if not isinstance(path, basestring):
 
1977
            for p in path:
 
1978
                self.failUnlessExists(p)
 
1979
        else:
 
1980
            self.failUnless(osutils.lexists(path),path+" does not exist")
596
1981
 
597
1982
    def failIfExists(self, path):
598
 
        """Fail if path, which may be abs or relative, exists."""
599
 
        self.failIf(osutils.lexists(path))
600
 
        
601
 
    def assertFileEqual(self, content, path):
602
 
        """Fail if path does not contain 'content'."""
603
 
        self.failUnless(osutils.lexists(path))
604
 
        self.assertEqualDiff(content, open(path, 'r').read())
605
 
 
606
 
 
607
 
def filter_suite_by_re(suite, pattern):
608
 
    result = TestSuite()
 
1983
        """Fail if path or paths, which may be abs or relative, exist."""
 
1984
        if not isinstance(path, basestring):
 
1985
            for p in path:
 
1986
                self.failIfExists(p)
 
1987
        else:
 
1988
            self.failIf(osutils.lexists(path),path+" exists")
 
1989
 
 
1990
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
1991
        """Assert whether path or paths are in the WorkingTree"""
 
1992
        if tree is None:
 
1993
            tree = workingtree.WorkingTree.open(root_path)
 
1994
        if not isinstance(path, basestring):
 
1995
            for p in path:
 
1996
                self.assertInWorkingTree(p,tree=tree)
 
1997
        else:
 
1998
            self.assertIsNot(tree.path2id(path), None,
 
1999
                path+' not in working tree.')
 
2000
 
 
2001
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
2002
        """Assert whether path or paths are not in the WorkingTree"""
 
2003
        if tree is None:
 
2004
            tree = workingtree.WorkingTree.open(root_path)
 
2005
        if not isinstance(path, basestring):
 
2006
            for p in path:
 
2007
                self.assertNotInWorkingTree(p,tree=tree)
 
2008
        else:
 
2009
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2010
 
 
2011
 
 
2012
class TestCaseWithTransport(TestCaseInTempDir):
 
2013
    """A test case that provides get_url and get_readonly_url facilities.
 
2014
 
 
2015
    These back onto two transport servers, one for readonly access and one for
 
2016
    read write access.
 
2017
 
 
2018
    If no explicit class is provided for readonly access, a
 
2019
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2020
    based read write transports.
 
2021
 
 
2022
    If an explicit class is provided for readonly access, that server and the 
 
2023
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2024
    """
 
2025
 
 
2026
    def get_vfs_only_server(self):
 
2027
        """See TestCaseWithMemoryTransport.
 
2028
 
 
2029
        This is useful for some tests with specific servers that need
 
2030
        diagnostics.
 
2031
        """
 
2032
        if self.__vfs_server is None:
 
2033
            self.__vfs_server = self.vfs_transport_factory()
 
2034
            self.__vfs_server.setUp()
 
2035
            self.addCleanup(self.__vfs_server.tearDown)
 
2036
        return self.__vfs_server
 
2037
 
 
2038
    def make_branch_and_tree(self, relpath, format=None):
 
2039
        """Create a branch on the transport and a tree locally.
 
2040
 
 
2041
        If the transport is not a LocalTransport, the Tree can't be created on
 
2042
        the transport.  In that case if the vfs_transport_factory is
 
2043
        LocalURLServer the working tree is created in the local
 
2044
        directory backing the transport, and the returned tree's branch and
 
2045
        repository will also be accessed locally. Otherwise a lightweight
 
2046
        checkout is created and returned.
 
2047
 
 
2048
        :param format: The BzrDirFormat.
 
2049
        :returns: the WorkingTree.
 
2050
        """
 
2051
        # TODO: always use the local disk path for the working tree,
 
2052
        # this obviously requires a format that supports branch references
 
2053
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2054
        # RBC 20060208
 
2055
        b = self.make_branch(relpath, format=format)
 
2056
        try:
 
2057
            return b.bzrdir.create_workingtree()
 
2058
        except errors.NotLocalUrl:
 
2059
            # We can only make working trees locally at the moment.  If the
 
2060
            # transport can't support them, then we keep the non-disk-backed
 
2061
            # branch and create a local checkout.
 
2062
            if self.vfs_transport_factory is LocalURLServer:
 
2063
                # the branch is colocated on disk, we cannot create a checkout.
 
2064
                # hopefully callers will expect this.
 
2065
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2066
                return local_controldir.create_workingtree()
 
2067
            else:
 
2068
                return b.create_checkout(relpath, lightweight=True)
 
2069
 
 
2070
    def assertIsDirectory(self, relpath, transport):
 
2071
        """Assert that relpath within transport is a directory.
 
2072
 
 
2073
        This may not be possible on all transports; in that case it propagates
 
2074
        a TransportNotPossible.
 
2075
        """
 
2076
        try:
 
2077
            mode = transport.stat(relpath).st_mode
 
2078
        except errors.NoSuchFile:
 
2079
            self.fail("path %s is not a directory; no such file"
 
2080
                      % (relpath))
 
2081
        if not stat.S_ISDIR(mode):
 
2082
            self.fail("path %s is not a directory; has mode %#o"
 
2083
                      % (relpath, mode))
 
2084
 
 
2085
    def assertTreesEqual(self, left, right):
 
2086
        """Check that left and right have the same content and properties."""
 
2087
        # we use a tree delta to check for equality of the content, and we
 
2088
        # manually check for equality of other things such as the parents list.
 
2089
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2090
        differences = left.changes_from(right)
 
2091
        self.assertFalse(differences.has_changed(),
 
2092
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2093
 
 
2094
    def setUp(self):
 
2095
        super(TestCaseWithTransport, self).setUp()
 
2096
        self.__vfs_server = None
 
2097
 
 
2098
 
 
2099
class ChrootedTestCase(TestCaseWithTransport):
 
2100
    """A support class that provides readonly urls outside the local namespace.
 
2101
 
 
2102
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2103
    is then we are chrooted already, if it is not then an HttpServer is used
 
2104
    for readonly urls.
 
2105
 
 
2106
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2107
                       be used without needed to redo it when a different 
 
2108
                       subclass is in use ?
 
2109
    """
 
2110
 
 
2111
    def setUp(self):
 
2112
        super(ChrootedTestCase, self).setUp()
 
2113
        if not self.vfs_transport_factory == MemoryServer:
 
2114
            self.transport_readonly_server = HttpServer
 
2115
 
 
2116
 
 
2117
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2118
                       random_order=False):
 
2119
    """Create a test suite by filtering another one.
 
2120
    
 
2121
    :param suite:           the source suite
 
2122
    :param pattern:         pattern that names must match
 
2123
    :param exclude_pattern: pattern that names must not match, if any
 
2124
    :param random_order:    if True, tests in the new suite will be put in
 
2125
                            random order
 
2126
    :returns: the newly created suite
 
2127
    """ 
 
2128
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2129
        random_order, False)
 
2130
 
 
2131
 
 
2132
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2133
                     random_order=False, append_rest=True):
 
2134
    """Create a test suite by sorting another one.
 
2135
    
 
2136
    :param suite:           the source suite
 
2137
    :param pattern:         pattern that names must match in order to go
 
2138
                            first in the new suite
 
2139
    :param exclude_pattern: pattern that names must not match, if any
 
2140
    :param random_order:    if True, tests in the new suite will be put in
 
2141
                            random order
 
2142
    :param append_rest:     if False, pattern is a strict filter and not
 
2143
                            just an ordering directive
 
2144
    :returns: the newly created suite
 
2145
    """ 
 
2146
    first = []
 
2147
    second = []
609
2148
    filter_re = re.compile(pattern)
 
2149
    if exclude_pattern is not None:
 
2150
        exclude_re = re.compile(exclude_pattern)
610
2151
    for test in iter_suite_tests(suite):
611
 
        if filter_re.search(test.id()):
612
 
            result.addTest(test)
613
 
    return result
 
2152
        test_id = test.id()
 
2153
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2154
            if filter_re.search(test_id):
 
2155
                first.append(test)
 
2156
            elif append_rest:
 
2157
                second.append(test)
 
2158
    if random_order:
 
2159
        random.shuffle(first)
 
2160
        random.shuffle(second)
 
2161
    return TestUtil.TestSuite(first + second)
614
2162
 
615
2163
 
616
2164
def run_suite(suite, name='test', verbose=False, pattern=".*",
617
 
              stop_on_failure=False, keep_output=False):
618
 
    TestCaseInTempDir._TEST_NAME = name
 
2165
              stop_on_failure=False,
 
2166
              transport=None, lsprof_timed=None, bench_history=None,
 
2167
              matching_tests_first=None,
 
2168
              list_only=False,
 
2169
              random_seed=None,
 
2170
              exclude_pattern=None,
 
2171
              ):
 
2172
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
619
2173
    if verbose:
620
2174
        verbosity = 2
621
2175
    else:
622
2176
        verbosity = 1
623
2177
    runner = TextTestRunner(stream=sys.stdout,
624
2178
                            descriptions=0,
625
 
                            verbosity=verbosity)
 
2179
                            verbosity=verbosity,
 
2180
                            bench_history=bench_history,
 
2181
                            list_only=list_only,
 
2182
                            )
626
2183
    runner.stop_on_failure=stop_on_failure
627
 
    if pattern != '.*':
628
 
        suite = filter_suite_by_re(suite, pattern)
 
2184
    # Initialise the random number generator and display the seed used.
 
2185
    # We convert the seed to a long to make it reuseable across invocations.
 
2186
    random_order = False
 
2187
    if random_seed is not None:
 
2188
        random_order = True
 
2189
        if random_seed == "now":
 
2190
            random_seed = long(time.time())
 
2191
        else:
 
2192
            # Convert the seed to a long if we can
 
2193
            try:
 
2194
                random_seed = long(random_seed)
 
2195
            except:
 
2196
                pass
 
2197
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2198
            (random_seed))
 
2199
        random.seed(random_seed)
 
2200
    # Customise the list of tests if requested
 
2201
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2202
        if matching_tests_first:
 
2203
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2204
                random_order)
 
2205
        else:
 
2206
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2207
                random_order)
629
2208
    result = runner.run(suite)
630
 
    # This is still a little bogus, 
631
 
    # but only a little. Folk not using our testrunner will
632
 
    # have to delete their temp directories themselves.
633
 
    if result.wasSuccessful() or not keep_output:
634
 
        if TestCaseInTempDir.TEST_ROOT is not None:
635
 
            shutil.rmtree(TestCaseInTempDir.TEST_ROOT) 
636
 
    else:
637
 
        print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
638
2209
    return result.wasSuccessful()
639
2210
 
640
2211
 
641
2212
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
642
 
             keep_output=False):
 
2213
             transport=None,
 
2214
             test_suite_factory=None,
 
2215
             lsprof_timed=None,
 
2216
             bench_history=None,
 
2217
             matching_tests_first=None,
 
2218
             list_only=False,
 
2219
             random_seed=None,
 
2220
             exclude_pattern=None):
643
2221
    """Run the whole test suite under the enhanced runner"""
644
 
    return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
645
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output)
 
2222
    # XXX: Very ugly way to do this...
 
2223
    # Disable warning about old formats because we don't want it to disturb
 
2224
    # any blackbox tests.
 
2225
    from bzrlib import repository
 
2226
    repository._deprecation_warning_done = True
 
2227
 
 
2228
    global default_transport
 
2229
    if transport is None:
 
2230
        transport = default_transport
 
2231
    old_transport = default_transport
 
2232
    default_transport = transport
 
2233
    try:
 
2234
        if test_suite_factory is None:
 
2235
            suite = test_suite()
 
2236
        else:
 
2237
            suite = test_suite_factory()
 
2238
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2239
                     stop_on_failure=stop_on_failure,
 
2240
                     transport=transport,
 
2241
                     lsprof_timed=lsprof_timed,
 
2242
                     bench_history=bench_history,
 
2243
                     matching_tests_first=matching_tests_first,
 
2244
                     list_only=list_only,
 
2245
                     random_seed=random_seed,
 
2246
                     exclude_pattern=exclude_pattern)
 
2247
    finally:
 
2248
        default_transport = old_transport
646
2249
 
647
2250
 
648
2251
def test_suite():
649
 
    """Build and return TestSuite for the whole program."""
650
 
    from doctest import DocTestSuite
651
 
 
652
 
    global MODULES_TO_DOCTEST
653
 
 
654
 
    testmod_names = [ \
 
2252
    """Build and return TestSuite for the whole of bzrlib.
 
2253
    
 
2254
    This function can be replaced if you need to change the default test
 
2255
    suite on a global basis, but it is not encouraged.
 
2256
    """
 
2257
    testmod_names = [
 
2258
                   'bzrlib.util.tests.test_bencode',
 
2259
                   'bzrlib.tests.test__dirstate_helpers',
655
2260
                   'bzrlib.tests.test_ancestry',
656
2261
                   'bzrlib.tests.test_annotate',
657
2262
                   'bzrlib.tests.test_api',
 
2263
                   'bzrlib.tests.test_atomicfile',
658
2264
                   'bzrlib.tests.test_bad_files',
659
 
                   'bzrlib.tests.test_basis_inventory',
660
2265
                   'bzrlib.tests.test_branch',
661
 
                   'bzrlib.tests.test_command',
 
2266
                   'bzrlib.tests.test_branchbuilder',
 
2267
                   'bzrlib.tests.test_bugtracker',
 
2268
                   'bzrlib.tests.test_bundle',
 
2269
                   'bzrlib.tests.test_bzrdir',
 
2270
                   'bzrlib.tests.test_cache_utf8',
 
2271
                   'bzrlib.tests.test_commands',
662
2272
                   'bzrlib.tests.test_commit',
663
2273
                   'bzrlib.tests.test_commit_merge',
664
2274
                   'bzrlib.tests.test_config',
665
2275
                   'bzrlib.tests.test_conflicts',
 
2276
                   'bzrlib.tests.test_counted_lock',
 
2277
                   'bzrlib.tests.test_decorators',
 
2278
                   'bzrlib.tests.test_delta',
 
2279
                   'bzrlib.tests.test_deprecated_graph',
666
2280
                   'bzrlib.tests.test_diff',
 
2281
                   'bzrlib.tests.test_dirstate',
 
2282
                   'bzrlib.tests.test_email_message',
 
2283
                   'bzrlib.tests.test_errors',
 
2284
                   'bzrlib.tests.test_escaped_store',
 
2285
                   'bzrlib.tests.test_extract',
667
2286
                   'bzrlib.tests.test_fetch',
 
2287
                   'bzrlib.tests.test_file_names',
 
2288
                   'bzrlib.tests.test_ftp_transport',
 
2289
                   'bzrlib.tests.test_generate_docs',
 
2290
                   'bzrlib.tests.test_generate_ids',
 
2291
                   'bzrlib.tests.test_globbing',
668
2292
                   'bzrlib.tests.test_gpg',
669
2293
                   'bzrlib.tests.test_graph',
670
2294
                   'bzrlib.tests.test_hashcache',
 
2295
                   'bzrlib.tests.test_help',
 
2296
                   'bzrlib.tests.test_hooks',
671
2297
                   'bzrlib.tests.test_http',
 
2298
                   'bzrlib.tests.test_http_response',
 
2299
                   'bzrlib.tests.test_https_ca_bundle',
672
2300
                   'bzrlib.tests.test_identitymap',
 
2301
                   'bzrlib.tests.test_ignores',
 
2302
                   'bzrlib.tests.test_index',
 
2303
                   'bzrlib.tests.test_info',
673
2304
                   'bzrlib.tests.test_inv',
 
2305
                   'bzrlib.tests.test_knit',
 
2306
                   'bzrlib.tests.test_lazy_import',
 
2307
                   'bzrlib.tests.test_lazy_regex',
 
2308
                   'bzrlib.tests.test_lockdir',
674
2309
                   'bzrlib.tests.test_lockable_files',
675
2310
                   'bzrlib.tests.test_log',
 
2311
                   'bzrlib.tests.test_lsprof',
 
2312
                   'bzrlib.tests.test_memorytree',
676
2313
                   'bzrlib.tests.test_merge',
677
2314
                   'bzrlib.tests.test_merge3',
678
2315
                   'bzrlib.tests.test_merge_core',
 
2316
                   'bzrlib.tests.test_merge_directive',
679
2317
                   'bzrlib.tests.test_missing',
680
2318
                   'bzrlib.tests.test_msgeditor',
 
2319
                   'bzrlib.tests.test_multiparent',
681
2320
                   'bzrlib.tests.test_nonascii',
682
2321
                   'bzrlib.tests.test_options',
683
2322
                   'bzrlib.tests.test_osutils',
684
 
                   'bzrlib.tests.test_parent',
 
2323
                   'bzrlib.tests.test_osutils_encodings',
 
2324
                   'bzrlib.tests.test_pack',
 
2325
                   'bzrlib.tests.test_patch',
 
2326
                   'bzrlib.tests.test_patches',
685
2327
                   'bzrlib.tests.test_permissions',
686
2328
                   'bzrlib.tests.test_plugins',
687
 
                   'bzrlib.tests.test_remove',
 
2329
                   'bzrlib.tests.test_progress',
 
2330
                   'bzrlib.tests.test_reconcile',
 
2331
                   'bzrlib.tests.test_registry',
 
2332
                   'bzrlib.tests.test_remote',
 
2333
                   'bzrlib.tests.test_repository',
 
2334
                   'bzrlib.tests.test_revert',
688
2335
                   'bzrlib.tests.test_revision',
689
2336
                   'bzrlib.tests.test_revisionnamespaces',
690
 
                   'bzrlib.tests.test_revprops',
691
 
                   'bzrlib.tests.test_reweave',
 
2337
                   'bzrlib.tests.test_revisiontree',
692
2338
                   'bzrlib.tests.test_rio',
693
2339
                   'bzrlib.tests.test_sampler',
694
2340
                   'bzrlib.tests.test_selftest',
695
2341
                   'bzrlib.tests.test_setup',
696
2342
                   'bzrlib.tests.test_sftp_transport',
 
2343
                   'bzrlib.tests.test_smart',
697
2344
                   'bzrlib.tests.test_smart_add',
 
2345
                   'bzrlib.tests.test_smart_transport',
 
2346
                   'bzrlib.tests.test_smtp_connection',
698
2347
                   'bzrlib.tests.test_source',
 
2348
                   'bzrlib.tests.test_ssh_transport',
699
2349
                   'bzrlib.tests.test_status',
700
2350
                   'bzrlib.tests.test_store',
 
2351
                   'bzrlib.tests.test_strace',
 
2352
                   'bzrlib.tests.test_subsume',
 
2353
                   'bzrlib.tests.test_symbol_versioning',
 
2354
                   'bzrlib.tests.test_tag',
701
2355
                   'bzrlib.tests.test_testament',
 
2356
                   'bzrlib.tests.test_textfile',
 
2357
                   'bzrlib.tests.test_textmerge',
 
2358
                   'bzrlib.tests.test_timestamp',
702
2359
                   'bzrlib.tests.test_trace',
703
2360
                   'bzrlib.tests.test_transactions',
 
2361
                   'bzrlib.tests.test_transform',
704
2362
                   'bzrlib.tests.test_transport',
 
2363
                   'bzrlib.tests.test_tree',
 
2364
                   'bzrlib.tests.test_treebuilder',
705
2365
                   'bzrlib.tests.test_tsort',
 
2366
                   'bzrlib.tests.test_tuned_gzip',
706
2367
                   'bzrlib.tests.test_ui',
707
 
                   'bzrlib.tests.test_uncommit',
708
2368
                   'bzrlib.tests.test_upgrade',
 
2369
                   'bzrlib.tests.test_urlutils',
 
2370
                   'bzrlib.tests.test_versionedfile',
 
2371
                   'bzrlib.tests.test_version',
 
2372
                   'bzrlib.tests.test_version_info',
709
2373
                   'bzrlib.tests.test_weave',
710
2374
                   'bzrlib.tests.test_whitebox',
 
2375
                   'bzrlib.tests.test_win32utils',
711
2376
                   'bzrlib.tests.test_workingtree',
 
2377
                   'bzrlib.tests.test_workingtree_4',
 
2378
                   'bzrlib.tests.test_wsgi',
712
2379
                   'bzrlib.tests.test_xml',
713
2380
                   ]
714
 
 
715
 
    TestCase.BZRPATH = osutils.pathjoin(
716
 
            osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
717
 
    print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
718
 
    print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
719
 
    print
720
 
    suite = TestSuite()
721
 
    # python2.4's TestLoader.loadTestsFromNames gives very poor 
722
 
    # errors if it fails to load a named module - no indication of what's
723
 
    # actually wrong, just "no such module".  We should probably override that
724
 
    # class, but for the moment just load them ourselves. (mbp 20051202)
725
 
    loader = TestLoader()
726
 
    for mod_name in testmod_names:
727
 
        mod = _load_module_by_name(mod_name)
728
 
        suite.addTest(loader.loadTestsFromModule(mod))
 
2381
    test_transport_implementations = [
 
2382
        'bzrlib.tests.test_transport_implementations',
 
2383
        'bzrlib.tests.test_read_bundle',
 
2384
        ]
 
2385
    suite = TestUtil.TestSuite()
 
2386
    loader = TestUtil.TestLoader()
 
2387
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2388
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2389
    adapter = TransportTestProviderAdapter()
 
2390
    adapt_modules(test_transport_implementations, adapter, loader, suite)
729
2391
    for package in packages_to_test():
730
2392
        suite.addTest(package.test_suite())
731
2393
    for m in MODULES_TO_TEST:
732
2394
        suite.addTest(loader.loadTestsFromModule(m))
733
 
    for m in (MODULES_TO_DOCTEST):
734
 
        suite.addTest(DocTestSuite(m))
 
2395
    for m in MODULES_TO_DOCTEST:
 
2396
        try:
 
2397
            suite.addTest(doctest.DocTestSuite(m))
 
2398
        except ValueError, e:
 
2399
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2400
            raise
735
2401
    for name, plugin in bzrlib.plugin.all_plugins().items():
736
 
        if hasattr(plugin, 'test_suite'):
737
 
            suite.addTest(plugin.test_suite())
 
2402
        if getattr(plugin, 'test_suite', None) is not None:
 
2403
            default_encoding = sys.getdefaultencoding()
 
2404
            try:
 
2405
                plugin_suite = plugin.test_suite()
 
2406
            except ImportError, e:
 
2407
                bzrlib.trace.warning(
 
2408
                    'Unable to test plugin "%s": %s', name, e)
 
2409
            else:
 
2410
                suite.addTest(plugin_suite)
 
2411
            if default_encoding != sys.getdefaultencoding():
 
2412
                bzrlib.trace.warning(
 
2413
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2414
                    sys.getdefaultencoding())
 
2415
                reload(sys)
 
2416
                sys.setdefaultencoding(default_encoding)
738
2417
    return suite
739
2418
 
740
2419
 
741
 
def _load_module_by_name(mod_name):
742
 
    parts = mod_name.split('.')
743
 
    module = __import__(mod_name)
744
 
    del parts[0]
745
 
    # for historical reasons python returns the top-level module even though
746
 
    # it loads the submodule; we need to walk down to get the one we want.
747
 
    while parts:
748
 
        module = getattr(module, parts.pop(0))
749
 
    return module
 
2420
def adapt_modules(mods_list, adapter, loader, suite):
 
2421
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2422
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2423
        suite.addTests(adapter.adapt(test))
 
2424
 
 
2425
 
 
2426
def _rmtree_temp_dir(dirname):
 
2427
    # If LANG=C we probably have created some bogus paths
 
2428
    # which rmtree(unicode) will fail to delete
 
2429
    # so make sure we are using rmtree(str) to delete everything
 
2430
    # except on win32, where rmtree(str) will fail
 
2431
    # since it doesn't have the property of byte-stream paths
 
2432
    # (they are either ascii or mbcs)
 
2433
    if sys.platform == 'win32':
 
2434
        # make sure we are using the unicode win32 api
 
2435
        dirname = unicode(dirname)
 
2436
    else:
 
2437
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2438
    try:
 
2439
        osutils.rmtree(dirname)
 
2440
    except OSError, e:
 
2441
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2442
            print >>sys.stderr, ('Permission denied: '
 
2443
                                 'unable to remove testing dir '
 
2444
                                 '%s' % os.path.basename(dirname))
 
2445
        else:
 
2446
            raise
 
2447
 
 
2448
 
 
2449
class Feature(object):
 
2450
    """An operating system Feature."""
 
2451
 
 
2452
    def __init__(self):
 
2453
        self._available = None
 
2454
 
 
2455
    def available(self):
 
2456
        """Is the feature available?
 
2457
 
 
2458
        :return: True if the feature is available.
 
2459
        """
 
2460
        if self._available is None:
 
2461
            self._available = self._probe()
 
2462
        return self._available
 
2463
 
 
2464
    def _probe(self):
 
2465
        """Implement this method in concrete features.
 
2466
 
 
2467
        :return: True if the feature is available.
 
2468
        """
 
2469
        raise NotImplementedError
 
2470
 
 
2471
    def __str__(self):
 
2472
        if getattr(self, 'feature_name', None):
 
2473
            return self.feature_name()
 
2474
        return self.__class__.__name__
 
2475
 
 
2476
 
 
2477
class TestScenarioApplier(object):
 
2478
    """A tool to apply scenarios to tests."""
 
2479
 
 
2480
    def adapt(self, test):
 
2481
        """Return a TestSuite containing a copy of test for each scenario."""
 
2482
        result = unittest.TestSuite()
 
2483
        for scenario in self.scenarios:
 
2484
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
2485
        return result
 
2486
 
 
2487
    def adapt_test_to_scenario(self, test, scenario):
 
2488
        """Copy test and apply scenario to it.
 
2489
 
 
2490
        :param test: A test to adapt.
 
2491
        :param scenario: A tuple describing the scenarion.
 
2492
            The first element of the tuple is the new test id.
 
2493
            The second element is a dict containing attributes to set on the
 
2494
            test.
 
2495
        :return: The adapted test.
 
2496
        """
 
2497
        from copy import deepcopy
 
2498
        new_test = deepcopy(test)
 
2499
        for name, value in scenario[1].items():
 
2500
            setattr(new_test, name, value)
 
2501
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
2502
        new_test.id = lambda: new_id
 
2503
        return new_test