~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Robert Collins
  • Date: 2007-07-13 16:35:54 UTC
  • mto: (2592.3.3 repository)
  • mto: This revision was merged to the branch mainline in revision 2624.
  • Revision ID: robertc@robertcollins.net-20070713163554-ok2qtnzv6rcbpt3z
Change the missing key interface in index operations to not raise, allowing callers to set policy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import unittest
 
46
import time
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
 
61
import bzrlib.branch
 
62
import bzrlib.commands
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
 
65
import bzrlib.inventory
 
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
 
74
import bzrlib.merge3
 
75
import bzrlib.osutils
 
76
import bzrlib.plugin
 
77
from bzrlib.revision import common_ancestor
 
78
import bzrlib.store
 
79
from bzrlib import symbol_versioning
 
80
from bzrlib.symbol_versioning import (
 
81
    deprecated_method,
 
82
    zero_eighteen,
 
83
    )
 
84
import bzrlib.trace
 
85
from bzrlib.transport import get_transport
 
86
import bzrlib.transport
 
87
from bzrlib.transport.local import LocalURLServer
 
88
from bzrlib.transport.memory import MemoryServer
 
89
from bzrlib.transport.readonly import ReadonlyServer
 
90
from bzrlib.trace import mutter, note
 
91
from bzrlib.tests import TestUtil
 
92
from bzrlib.tests.HttpServer import HttpServer
 
93
from bzrlib.tests.TestUtil import (
 
94
                          TestSuite,
 
95
                          TestLoader,
 
96
                          )
 
97
from bzrlib.tests.treeshape import build_tree_contents
 
98
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
99
 
 
100
# Mark this python module as being part of the implementation
 
101
# of unittest: this gives us better tracebacks where the last
 
102
# shown frame is the test code, not our assertXYZ.
 
103
__unittest = 1
 
104
 
 
105
default_transport = LocalURLServer
19
106
 
20
107
MODULES_TO_TEST = []
21
 
MODULES_TO_DOCTEST = []
22
 
 
23
 
def selftest():
24
 
    from unittest import TestLoader, TestSuite
25
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
26
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
27
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
28
 
 
29
 
    import bzrlib.selftest.whitebox
30
 
    import bzrlib.selftest.blackbox
31
 
    import bzrlib.selftest.versioning
32
 
    import bzrlib.selftest.testmerge3
33
 
    import bzrlib.selftest.testhashcache
34
 
    import bzrlib.selftest.testrevisionnamespaces
35
 
    import bzrlib.selftest.testbranch
36
 
    import bzrlib.selftest.teststatus
37
 
    import bzrlib.selftest.testinv
38
 
    import bzrlib.merge_core
39
 
    from doctest import DocTestSuite
40
 
    import os
41
 
    import shutil
42
 
    import time
43
 
    import sys
44
 
    import unittest
45
 
 
46
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
47
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
48
 
        if m not in MODULES_TO_DOCTEST:
49
 
            MODULES_TO_DOCTEST.append(m)
50
 
            
51
 
    for m in (bzrlib.selftest.whitebox,
52
 
              bzrlib.selftest.versioning,
53
 
              bzrlib.selftest.testinv,
54
 
              bzrlib.selftest.testmerge3,
55
 
              bzrlib.selftest.testhashcache,
56
 
              bzrlib.selftest.teststatus,
57
 
              bzrlib.selftest.blackbox,
58
 
              bzrlib.selftest.testhashcache,
59
 
              bzrlib.selftest.testrevisionnamespaces,
60
 
              bzrlib.selftest.testbranch,
 
108
MODULES_TO_DOCTEST = [
 
109
                      bzrlib.timestamp,
 
110
                      bzrlib.errors,
 
111
                      bzrlib.export,
 
112
                      bzrlib.inventory,
 
113
                      bzrlib.iterablefile,
 
114
                      bzrlib.lockdir,
 
115
                      bzrlib.merge3,
 
116
                      bzrlib.option,
 
117
                      bzrlib.store,
 
118
                      ]
 
119
 
 
120
 
 
121
def packages_to_test():
 
122
    """Return a list of packages to test.
 
123
 
 
124
    The packages are not globally imported so that import failures are
 
125
    triggered when running selftest, not when importing the command.
 
126
    """
 
127
    import bzrlib.doc
 
128
    import bzrlib.tests.blackbox
 
129
    import bzrlib.tests.branch_implementations
 
130
    import bzrlib.tests.bzrdir_implementations
 
131
    import bzrlib.tests.interrepository_implementations
 
132
    import bzrlib.tests.interversionedfile_implementations
 
133
    import bzrlib.tests.intertree_implementations
 
134
    import bzrlib.tests.per_lock
 
135
    import bzrlib.tests.repository_implementations
 
136
    import bzrlib.tests.revisionstore_implementations
 
137
    import bzrlib.tests.tree_implementations
 
138
    import bzrlib.tests.workingtree_implementations
 
139
    return [
 
140
            bzrlib.doc,
 
141
            bzrlib.tests.blackbox,
 
142
            bzrlib.tests.branch_implementations,
 
143
            bzrlib.tests.bzrdir_implementations,
 
144
            bzrlib.tests.interrepository_implementations,
 
145
            bzrlib.tests.interversionedfile_implementations,
 
146
            bzrlib.tests.intertree_implementations,
 
147
            bzrlib.tests.per_lock,
 
148
            bzrlib.tests.repository_implementations,
 
149
            bzrlib.tests.revisionstore_implementations,
 
150
            bzrlib.tests.tree_implementations,
 
151
            bzrlib.tests.workingtree_implementations,
 
152
            ]
 
153
 
 
154
 
 
155
class ExtendedTestResult(unittest._TextTestResult):
 
156
    """Accepts, reports and accumulates the results of running tests.
 
157
 
 
158
    Compared to this unittest version this class adds support for profiling,
 
159
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
160
    There are further-specialized subclasses for different types of display.
 
161
    """
 
162
 
 
163
    stop_early = False
 
164
    
 
165
    def __init__(self, stream, descriptions, verbosity,
 
166
                 bench_history=None,
 
167
                 num_tests=None,
 
168
                 ):
 
169
        """Construct new TestResult.
 
170
 
 
171
        :param bench_history: Optionally, a writable file object to accumulate
 
172
            benchmark results.
 
173
        """
 
174
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
175
        if bench_history is not None:
 
176
            from bzrlib.version import _get_bzr_source_tree
 
177
            src_tree = _get_bzr_source_tree()
 
178
            if src_tree:
 
179
                try:
 
180
                    revision_id = src_tree.get_parent_ids()[0]
 
181
                except IndexError:
 
182
                    # XXX: if this is a brand new tree, do the same as if there
 
183
                    # is no branch.
 
184
                    revision_id = ''
 
185
            else:
 
186
                # XXX: If there's no branch, what should we do?
 
187
                revision_id = ''
 
188
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
189
        self._bench_history = bench_history
 
190
        self.ui = ui.ui_factory
 
191
        self.num_tests = num_tests
 
192
        self.error_count = 0
 
193
        self.failure_count = 0
 
194
        self.known_failure_count = 0
 
195
        self.skip_count = 0
 
196
        self.unsupported = {}
 
197
        self.count = 0
 
198
        self._overall_start_time = time.time()
 
199
    
 
200
    def extractBenchmarkTime(self, testCase):
 
201
        """Add a benchmark time for the current test case."""
 
202
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
203
    
 
204
    def _elapsedTestTimeString(self):
 
205
        """Return a time string for the overall time the current test has taken."""
 
206
        return self._formatTime(time.time() - self._start_time)
 
207
 
 
208
    def _testTimeString(self):
 
209
        if self._benchmarkTime is not None:
 
210
            return "%s/%s" % (
 
211
                self._formatTime(self._benchmarkTime),
 
212
                self._elapsedTestTimeString())
 
213
        else:
 
214
            return "           %s" % self._elapsedTestTimeString()
 
215
 
 
216
    def _formatTime(self, seconds):
 
217
        """Format seconds as milliseconds with leading spaces."""
 
218
        # some benchmarks can take thousands of seconds to run, so we need 8
 
219
        # places
 
220
        return "%8dms" % (1000 * seconds)
 
221
 
 
222
    def _shortened_test_description(self, test):
 
223
        what = test.id()
 
224
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
225
        return what
 
226
 
 
227
    def startTest(self, test):
 
228
        unittest.TestResult.startTest(self, test)
 
229
        self.report_test_start(test)
 
230
        test.number = self.count
 
231
        self._recordTestStartTime()
 
232
 
 
233
    def _recordTestStartTime(self):
 
234
        """Record that a test has started."""
 
235
        self._start_time = time.time()
 
236
 
 
237
    def _cleanupLogFile(self, test):
 
238
        # We can only do this if we have one of our TestCases, not if
 
239
        # we have a doctest.
 
240
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
241
        if setKeepLogfile is not None:
 
242
            setKeepLogfile()
 
243
 
 
244
    def addError(self, test, err):
 
245
        self.extractBenchmarkTime(test)
 
246
        self._cleanupLogFile(test)
 
247
        if isinstance(err[1], TestSkipped):
 
248
            return self.addSkipped(test, err)
 
249
        elif isinstance(err[1], UnavailableFeature):
 
250
            return self.addNotSupported(test, err[1].args[0])
 
251
        unittest.TestResult.addError(self, test, err)
 
252
        self.error_count += 1
 
253
        self.report_error(test, err)
 
254
        if self.stop_early:
 
255
            self.stop()
 
256
 
 
257
    def addFailure(self, test, err):
 
258
        self._cleanupLogFile(test)
 
259
        self.extractBenchmarkTime(test)
 
260
        if isinstance(err[1], KnownFailure):
 
261
            return self.addKnownFailure(test, err)
 
262
        unittest.TestResult.addFailure(self, test, err)
 
263
        self.failure_count += 1
 
264
        self.report_failure(test, err)
 
265
        if self.stop_early:
 
266
            self.stop()
 
267
 
 
268
    def addKnownFailure(self, test, err):
 
269
        self.known_failure_count += 1
 
270
        self.report_known_failure(test, err)
 
271
 
 
272
    def addNotSupported(self, test, feature):
 
273
        self.unsupported.setdefault(str(feature), 0)
 
274
        self.unsupported[str(feature)] += 1
 
275
        self.report_unsupported(test, feature)
 
276
 
 
277
    def addSuccess(self, test):
 
278
        self.extractBenchmarkTime(test)
 
279
        if self._bench_history is not None:
 
280
            if self._benchmarkTime is not None:
 
281
                self._bench_history.write("%s %s\n" % (
 
282
                    self._formatTime(self._benchmarkTime),
 
283
                    test.id()))
 
284
        self.report_success(test)
 
285
        unittest.TestResult.addSuccess(self, test)
 
286
 
 
287
    def addSkipped(self, test, skip_excinfo):
 
288
        self.report_skip(test, skip_excinfo)
 
289
        # seems best to treat this as success from point-of-view of unittest
 
290
        # -- it actually does nothing so it barely matters :)
 
291
        try:
 
292
            test.tearDown()
 
293
        except KeyboardInterrupt:
 
294
            raise
 
295
        except:
 
296
            self.addError(test, test.__exc_info())
 
297
        else:
 
298
            unittest.TestResult.addSuccess(self, test)
 
299
 
 
300
    def printErrorList(self, flavour, errors):
 
301
        for test, err in errors:
 
302
            self.stream.writeln(self.separator1)
 
303
            self.stream.write("%s: " % flavour)
 
304
            self.stream.writeln(self.getDescription(test))
 
305
            if getattr(test, '_get_log', None) is not None:
 
306
                print >>self.stream
 
307
                print >>self.stream, \
 
308
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
309
                print >>self.stream, test._get_log()
 
310
                print >>self.stream, \
 
311
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
312
            self.stream.writeln(self.separator2)
 
313
            self.stream.writeln("%s" % err)
 
314
 
 
315
    def finished(self):
 
316
        pass
 
317
 
 
318
    def report_cleaning_up(self):
 
319
        pass
 
320
 
 
321
    def report_success(self, test):
 
322
        pass
 
323
 
 
324
 
 
325
class TextTestResult(ExtendedTestResult):
 
326
    """Displays progress and results of tests in text form"""
 
327
 
 
328
    def __init__(self, stream, descriptions, verbosity,
 
329
                 bench_history=None,
 
330
                 num_tests=None,
 
331
                 pb=None,
 
332
                 ):
 
333
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
334
            bench_history, num_tests)
 
335
        if pb is None:
 
336
            self.pb = self.ui.nested_progress_bar()
 
337
            self._supplied_pb = False
 
338
        else:
 
339
            self.pb = pb
 
340
            self._supplied_pb = True
 
341
        self.pb.show_pct = False
 
342
        self.pb.show_spinner = False
 
343
        self.pb.show_eta = False,
 
344
        self.pb.show_count = False
 
345
        self.pb.show_bar = False
 
346
 
 
347
    def report_starting(self):
 
348
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
349
 
 
350
    def _progress_prefix_text(self):
 
351
        a = '[%d' % self.count
 
352
        if self.num_tests is not None:
 
353
            a +='/%d' % self.num_tests
 
354
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
355
        if self.error_count:
 
356
            a += ', %d errors' % self.error_count
 
357
        if self.failure_count:
 
358
            a += ', %d failed' % self.failure_count
 
359
        if self.known_failure_count:
 
360
            a += ', %d known failures' % self.known_failure_count
 
361
        if self.skip_count:
 
362
            a += ', %d skipped' % self.skip_count
 
363
        if self.unsupported:
 
364
            a += ', %d missing features' % len(self.unsupported)
 
365
        a += ']'
 
366
        return a
 
367
 
 
368
    def report_test_start(self, test):
 
369
        self.count += 1
 
370
        self.pb.update(
 
371
                self._progress_prefix_text()
 
372
                + ' ' 
 
373
                + self._shortened_test_description(test))
 
374
 
 
375
    def _test_description(self, test):
 
376
        return self._shortened_test_description(test)
 
377
 
 
378
    def report_error(self, test, err):
 
379
        self.pb.note('ERROR: %s\n    %s\n', 
 
380
            self._test_description(test),
 
381
            err[1],
 
382
            )
 
383
 
 
384
    def report_failure(self, test, err):
 
385
        self.pb.note('FAIL: %s\n    %s\n', 
 
386
            self._test_description(test),
 
387
            err[1],
 
388
            )
 
389
 
 
390
    def report_known_failure(self, test, err):
 
391
        self.pb.note('XFAIL: %s\n%s\n',
 
392
            self._test_description(test), err[1])
 
393
 
 
394
    def report_skip(self, test, skip_excinfo):
 
395
        self.skip_count += 1
 
396
        if False:
 
397
            # at the moment these are mostly not things we can fix
 
398
            # and so they just produce stipple; use the verbose reporter
 
399
            # to see them.
 
400
            if False:
 
401
                # show test and reason for skip
 
402
                self.pb.note('SKIP: %s\n    %s\n', 
 
403
                    self._shortened_test_description(test),
 
404
                    skip_excinfo[1])
 
405
            else:
 
406
                # since the class name was left behind in the still-visible
 
407
                # progress bar...
 
408
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
409
 
 
410
    def report_unsupported(self, test, feature):
 
411
        """test cannot be run because feature is missing."""
 
412
                  
 
413
    def report_cleaning_up(self):
 
414
        self.pb.update('cleaning up...')
 
415
 
 
416
    def finished(self):
 
417
        if not self._supplied_pb:
 
418
            self.pb.finished()
 
419
 
 
420
 
 
421
class VerboseTestResult(ExtendedTestResult):
 
422
    """Produce long output, with one line per test run plus times"""
 
423
 
 
424
    def _ellipsize_to_right(self, a_string, final_width):
 
425
        """Truncate and pad a string, keeping the right hand side"""
 
426
        if len(a_string) > final_width:
 
427
            result = '...' + a_string[3-final_width:]
 
428
        else:
 
429
            result = a_string
 
430
        return result.ljust(final_width)
 
431
 
 
432
    def report_starting(self):
 
433
        self.stream.write('running %d tests...\n' % self.num_tests)
 
434
 
 
435
    def report_test_start(self, test):
 
436
        self.count += 1
 
437
        name = self._shortened_test_description(test)
 
438
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
439
        # numbers, plus a trailing blank
 
440
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
441
        self.stream.write(self._ellipsize_to_right(name,
 
442
                          osutils.terminal_width()-30))
 
443
        self.stream.flush()
 
444
 
 
445
    def _error_summary(self, err):
 
446
        indent = ' ' * 4
 
447
        return '%s%s' % (indent, err[1])
 
448
 
 
449
    def report_error(self, test, err):
 
450
        self.stream.writeln('ERROR %s\n%s'
 
451
                % (self._testTimeString(),
 
452
                   self._error_summary(err)))
 
453
 
 
454
    def report_failure(self, test, err):
 
455
        self.stream.writeln(' FAIL %s\n%s'
 
456
                % (self._testTimeString(),
 
457
                   self._error_summary(err)))
 
458
 
 
459
    def report_known_failure(self, test, err):
 
460
        self.stream.writeln('XFAIL %s\n%s'
 
461
                % (self._testTimeString(),
 
462
                   self._error_summary(err)))
 
463
 
 
464
    def report_success(self, test):
 
465
        self.stream.writeln('   OK %s' % self._testTimeString())
 
466
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
467
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
468
            stats.pprint(file=self.stream)
 
469
        # flush the stream so that we get smooth output. This verbose mode is
 
470
        # used to show the output in PQM.
 
471
        self.stream.flush()
 
472
 
 
473
    def report_skip(self, test, skip_excinfo):
 
474
        self.skip_count += 1
 
475
        self.stream.writeln(' SKIP %s\n%s'
 
476
                % (self._testTimeString(),
 
477
                   self._error_summary(skip_excinfo)))
 
478
 
 
479
    def report_unsupported(self, test, feature):
 
480
        """test cannot be run because feature is missing."""
 
481
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
482
                %(self._testTimeString(), feature))
 
483
                  
 
484
 
 
485
 
 
486
class TextTestRunner(object):
 
487
    stop_on_failure = False
 
488
 
 
489
    def __init__(self,
 
490
                 stream=sys.stderr,
 
491
                 descriptions=0,
 
492
                 verbosity=1,
 
493
                 bench_history=None,
 
494
                 list_only=False
 
495
                 ):
 
496
        self.stream = unittest._WritelnDecorator(stream)
 
497
        self.descriptions = descriptions
 
498
        self.verbosity = verbosity
 
499
        self._bench_history = bench_history
 
500
        self.list_only = list_only
 
501
 
 
502
    def run(self, test):
 
503
        "Run the given test case or test suite."
 
504
        startTime = time.time()
 
505
        if self.verbosity == 1:
 
506
            result_class = TextTestResult
 
507
        elif self.verbosity >= 2:
 
508
            result_class = VerboseTestResult
 
509
        result = result_class(self.stream,
 
510
                              self.descriptions,
 
511
                              self.verbosity,
 
512
                              bench_history=self._bench_history,
 
513
                              num_tests=test.countTestCases(),
 
514
                              )
 
515
        result.stop_early = self.stop_on_failure
 
516
        result.report_starting()
 
517
        if self.list_only:
 
518
            if self.verbosity >= 2:
 
519
                self.stream.writeln("Listing tests only ...\n")
 
520
            run = 0
 
521
            for t in iter_suite_tests(test):
 
522
                self.stream.writeln("%s" % (t.id()))
 
523
                run += 1
 
524
            actionTaken = "Listed"
 
525
        else: 
 
526
            test.run(result)
 
527
            run = result.testsRun
 
528
            actionTaken = "Ran"
 
529
        stopTime = time.time()
 
530
        timeTaken = stopTime - startTime
 
531
        result.printErrors()
 
532
        self.stream.writeln(result.separator2)
 
533
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
534
                            run, run != 1 and "s" or "", timeTaken))
 
535
        self.stream.writeln()
 
536
        if not result.wasSuccessful():
 
537
            self.stream.write("FAILED (")
 
538
            failed, errored = map(len, (result.failures, result.errors))
 
539
            if failed:
 
540
                self.stream.write("failures=%d" % failed)
 
541
            if errored:
 
542
                if failed: self.stream.write(", ")
 
543
                self.stream.write("errors=%d" % errored)
 
544
            if result.known_failure_count:
 
545
                if failed or errored: self.stream.write(", ")
 
546
                self.stream.write("known_failure_count=%d" %
 
547
                    result.known_failure_count)
 
548
            self.stream.writeln(")")
 
549
        else:
 
550
            if result.known_failure_count:
 
551
                self.stream.writeln("OK (known_failures=%d)" %
 
552
                    result.known_failure_count)
 
553
            else:
 
554
                self.stream.writeln("OK")
 
555
        if result.skip_count > 0:
 
556
            skipped = result.skip_count
 
557
            self.stream.writeln('%d test%s skipped' %
 
558
                                (skipped, skipped != 1 and "s" or ""))
 
559
        if result.unsupported:
 
560
            for feature, count in sorted(result.unsupported.items()):
 
561
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
562
                    (feature, count))
 
563
        result.finished()
 
564
        return result
 
565
 
 
566
 
 
567
def iter_suite_tests(suite):
 
568
    """Return all tests in a suite, recursing through nested suites"""
 
569
    for item in suite._tests:
 
570
        if isinstance(item, unittest.TestCase):
 
571
            yield item
 
572
        elif isinstance(item, unittest.TestSuite):
 
573
            for r in iter_suite_tests(item):
 
574
                yield r
 
575
        else:
 
576
            raise Exception('unknown object %r inside test suite %r'
 
577
                            % (item, suite))
 
578
 
 
579
 
 
580
class TestSkipped(Exception):
 
581
    """Indicates that a test was intentionally skipped, rather than failing."""
 
582
 
 
583
 
 
584
class KnownFailure(AssertionError):
 
585
    """Indicates that a test failed in a precisely expected manner.
 
586
 
 
587
    Such failures dont block the whole test suite from passing because they are
 
588
    indicators of partially completed code or of future work. We have an
 
589
    explicit error for them so that we can ensure that they are always visible:
 
590
    KnownFailures are always shown in the output of bzr selftest.
 
591
    """
 
592
 
 
593
 
 
594
class UnavailableFeature(Exception):
 
595
    """A feature required for this test was not available.
 
596
 
 
597
    The feature should be used to construct the exception.
 
598
    """
 
599
 
 
600
 
 
601
class CommandFailed(Exception):
 
602
    pass
 
603
 
 
604
 
 
605
class StringIOWrapper(object):
 
606
    """A wrapper around cStringIO which just adds an encoding attribute.
 
607
    
 
608
    Internally we can check sys.stdout to see what the output encoding
 
609
    should be. However, cStringIO has no encoding attribute that we can
 
610
    set. So we wrap it instead.
 
611
    """
 
612
    encoding='ascii'
 
613
    _cstring = None
 
614
 
 
615
    def __init__(self, s=None):
 
616
        if s is not None:
 
617
            self.__dict__['_cstring'] = StringIO(s)
 
618
        else:
 
619
            self.__dict__['_cstring'] = StringIO()
 
620
 
 
621
    def __getattr__(self, name, getattr=getattr):
 
622
        return getattr(self.__dict__['_cstring'], name)
 
623
 
 
624
    def __setattr__(self, name, val):
 
625
        if name == 'encoding':
 
626
            self.__dict__['encoding'] = val
 
627
        else:
 
628
            return setattr(self._cstring, name, val)
 
629
 
 
630
 
 
631
class TestUIFactory(ui.CLIUIFactory):
 
632
    """A UI Factory for testing.
 
633
 
 
634
    Hide the progress bar but emit note()s.
 
635
    Redirect stdin.
 
636
    Allows get_password to be tested without real tty attached.
 
637
    """
 
638
 
 
639
    def __init__(self,
 
640
                 stdout=None,
 
641
                 stderr=None,
 
642
                 stdin=None):
 
643
        super(TestUIFactory, self).__init__()
 
644
        if stdin is not None:
 
645
            # We use a StringIOWrapper to be able to test various
 
646
            # encodings, but the user is still responsible to
 
647
            # encode the string and to set the encoding attribute
 
648
            # of StringIOWrapper.
 
649
            self.stdin = StringIOWrapper(stdin)
 
650
        if stdout is None:
 
651
            self.stdout = sys.stdout
 
652
        else:
 
653
            self.stdout = stdout
 
654
        if stderr is None:
 
655
            self.stderr = sys.stderr
 
656
        else:
 
657
            self.stderr = stderr
 
658
 
 
659
    def clear(self):
 
660
        """See progress.ProgressBar.clear()."""
 
661
 
 
662
    def clear_term(self):
 
663
        """See progress.ProgressBar.clear_term()."""
 
664
 
 
665
    def clear_term(self):
 
666
        """See progress.ProgressBar.clear_term()."""
 
667
 
 
668
    def finished(self):
 
669
        """See progress.ProgressBar.finished()."""
 
670
 
 
671
    def note(self, fmt_string, *args, **kwargs):
 
672
        """See progress.ProgressBar.note()."""
 
673
        self.stdout.write((fmt_string + "\n") % args)
 
674
 
 
675
    def progress_bar(self):
 
676
        return self
 
677
 
 
678
    def nested_progress_bar(self):
 
679
        return self
 
680
 
 
681
    def update(self, message, count=None, total=None):
 
682
        """See progress.ProgressBar.update()."""
 
683
 
 
684
    def get_non_echoed_password(self, prompt):
 
685
        """Get password from stdin without trying to handle the echo mode"""
 
686
        if prompt:
 
687
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
688
        password = self.stdin.readline()
 
689
        if not password:
 
690
            raise EOFError
 
691
        if password[-1] == '\n':
 
692
            password = password[:-1]
 
693
        return password
 
694
 
 
695
 
 
696
class TestCase(unittest.TestCase):
 
697
    """Base class for bzr unit tests.
 
698
    
 
699
    Tests that need access to disk resources should subclass 
 
700
    TestCaseInTempDir not TestCase.
 
701
 
 
702
    Error and debug log messages are redirected from their usual
 
703
    location into a temporary file, the contents of which can be
 
704
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
705
    so that it can also capture file IO.  When the test completes this file
 
706
    is read into memory and removed from disk.
 
707
       
 
708
    There are also convenience functions to invoke bzr's command-line
 
709
    routine, and to build and check bzr trees.
 
710
   
 
711
    In addition to the usual method of overriding tearDown(), this class also
 
712
    allows subclasses to register functions into the _cleanups list, which is
 
713
    run in order as the object is torn down.  It's less likely this will be
 
714
    accidentally overlooked.
 
715
    """
 
716
 
 
717
    _log_file_name = None
 
718
    _log_contents = ''
 
719
    _keep_log_file = False
 
720
    # record lsprof data when performing benchmark calls.
 
721
    _gather_lsprof_in_benchmarks = False
 
722
 
 
723
    def __init__(self, methodName='testMethod'):
 
724
        super(TestCase, self).__init__(methodName)
 
725
        self._cleanups = []
 
726
 
 
727
    def setUp(self):
 
728
        unittest.TestCase.setUp(self)
 
729
        self._cleanEnvironment()
 
730
        bzrlib.trace.disable_default_logging()
 
731
        self._silenceUI()
 
732
        self._startLogFile()
 
733
        self._benchcalls = []
 
734
        self._benchtime = None
 
735
        self._clear_hooks()
 
736
        self._clear_debug_flags()
 
737
 
 
738
    def _clear_debug_flags(self):
 
739
        """Prevent externally set debug flags affecting tests.
 
740
        
 
741
        Tests that want to use debug flags can just set them in the
 
742
        debug_flags set during setup/teardown.
 
743
        """
 
744
        self._preserved_debug_flags = set(debug.debug_flags)
 
745
        debug.debug_flags.clear()
 
746
        self.addCleanup(self._restore_debug_flags)
 
747
 
 
748
    def _clear_hooks(self):
 
749
        # prevent hooks affecting tests
 
750
        import bzrlib.branch
 
751
        import bzrlib.smart.server
 
752
        self._preserved_hooks = {
 
753
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
754
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
755
            }
 
756
        self.addCleanup(self._restoreHooks)
 
757
        # reset all hooks to an empty instance of the appropriate type
 
758
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
759
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
760
 
 
761
    def _silenceUI(self):
 
762
        """Turn off UI for duration of test"""
 
763
        # by default the UI is off; tests can turn it on if they want it.
 
764
        saved = ui.ui_factory
 
765
        def _restore():
 
766
            ui.ui_factory = saved
 
767
        ui.ui_factory = ui.SilentUIFactory()
 
768
        self.addCleanup(_restore)
 
769
 
 
770
    def _ndiff_strings(self, a, b):
 
771
        """Return ndiff between two strings containing lines.
 
772
        
 
773
        A trailing newline is added if missing to make the strings
 
774
        print properly."""
 
775
        if b and b[-1] != '\n':
 
776
            b += '\n'
 
777
        if a and a[-1] != '\n':
 
778
            a += '\n'
 
779
        difflines = difflib.ndiff(a.splitlines(True),
 
780
                                  b.splitlines(True),
 
781
                                  linejunk=lambda x: False,
 
782
                                  charjunk=lambda x: False)
 
783
        return ''.join(difflines)
 
784
 
 
785
    def assertEqual(self, a, b, message=''):
 
786
        try:
 
787
            if a == b:
 
788
                return
 
789
        except UnicodeError, e:
 
790
            # If we can't compare without getting a UnicodeError, then
 
791
            # obviously they are different
 
792
            mutter('UnicodeError: %s', e)
 
793
        if message:
 
794
            message += '\n'
 
795
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
796
            % (message,
 
797
               pformat(a), pformat(b)))
 
798
 
 
799
    assertEquals = assertEqual
 
800
 
 
801
    def assertEqualDiff(self, a, b, message=None):
 
802
        """Assert two texts are equal, if not raise an exception.
 
803
        
 
804
        This is intended for use with multi-line strings where it can 
 
805
        be hard to find the differences by eye.
 
806
        """
 
807
        # TODO: perhaps override assertEquals to call this for strings?
 
808
        if a == b:
 
809
            return
 
810
        if message is None:
 
811
            message = "texts not equal:\n"
 
812
        raise AssertionError(message +
 
813
                             self._ndiff_strings(a, b))
 
814
        
 
815
    def assertEqualMode(self, mode, mode_test):
 
816
        self.assertEqual(mode, mode_test,
 
817
                         'mode mismatch %o != %o' % (mode, mode_test))
 
818
 
 
819
    def assertStartsWith(self, s, prefix):
 
820
        if not s.startswith(prefix):
 
821
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
822
 
 
823
    def assertEndsWith(self, s, suffix):
 
824
        """Asserts that s ends with suffix."""
 
825
        if not s.endswith(suffix):
 
826
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
827
 
 
828
    def assertContainsRe(self, haystack, needle_re):
 
829
        """Assert that a contains something matching a regular expression."""
 
830
        if not re.search(needle_re, haystack):
 
831
            if '\n' in haystack or len(haystack) > 60:
 
832
                # a long string, format it in a more readable way
 
833
                raise AssertionError(
 
834
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
835
                        % (needle_re, haystack))
 
836
            else:
 
837
                raise AssertionError('pattern "%s" not found in "%s"'
 
838
                        % (needle_re, haystack))
 
839
 
 
840
    def assertNotContainsRe(self, haystack, needle_re):
 
841
        """Assert that a does not match a regular expression"""
 
842
        if re.search(needle_re, haystack):
 
843
            raise AssertionError('pattern "%s" found in "%s"'
 
844
                    % (needle_re, haystack))
 
845
 
 
846
    def assertSubset(self, sublist, superlist):
 
847
        """Assert that every entry in sublist is present in superlist."""
 
848
        missing = []
 
849
        for entry in sublist:
 
850
            if entry not in superlist:
 
851
                missing.append(entry)
 
852
        if len(missing) > 0:
 
853
            raise AssertionError("value(s) %r not present in container %r" % 
 
854
                                 (missing, superlist))
 
855
 
 
856
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
857
        """Fail unless excClass is raised when the iterator from func is used.
 
858
        
 
859
        Many functions can return generators this makes sure
 
860
        to wrap them in a list() call to make sure the whole generator
 
861
        is run, and that the proper exception is raised.
 
862
        """
 
863
        try:
 
864
            list(func(*args, **kwargs))
 
865
        except excClass:
 
866
            return
 
867
        else:
 
868
            if getattr(excClass,'__name__', None) is not None:
 
869
                excName = excClass.__name__
 
870
            else:
 
871
                excName = str(excClass)
 
872
            raise self.failureException, "%s not raised" % excName
 
873
 
 
874
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
875
        """Assert that a callable raises a particular exception.
 
876
 
 
877
        :param excClass: As for the except statement, this may be either an
 
878
            exception class, or a tuple of classes.
 
879
        :param callableObj: A callable, will be passed ``*args`` and
 
880
            ``**kwargs``.
 
881
 
 
882
        Returns the exception so that you can examine it.
 
883
        """
 
884
        try:
 
885
            callableObj(*args, **kwargs)
 
886
        except excClass, e:
 
887
            return e
 
888
        else:
 
889
            if getattr(excClass,'__name__', None) is not None:
 
890
                excName = excClass.__name__
 
891
            else:
 
892
                # probably a tuple
 
893
                excName = str(excClass)
 
894
            raise self.failureException, "%s not raised" % excName
 
895
 
 
896
    def assertIs(self, left, right, message=None):
 
897
        if not (left is right):
 
898
            if message is not None:
 
899
                raise AssertionError(message)
 
900
            else:
 
901
                raise AssertionError("%r is not %r." % (left, right))
 
902
 
 
903
    def assertIsNot(self, left, right, message=None):
 
904
        if (left is right):
 
905
            if message is not None:
 
906
                raise AssertionError(message)
 
907
            else:
 
908
                raise AssertionError("%r is %r." % (left, right))
 
909
 
 
910
    def assertTransportMode(self, transport, path, mode):
 
911
        """Fail if a path does not have mode mode.
 
912
        
 
913
        If modes are not supported on this transport, the assertion is ignored.
 
914
        """
 
915
        if not transport._can_roundtrip_unix_modebits():
 
916
            return
 
917
        path_stat = transport.stat(path)
 
918
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
919
        self.assertEqual(mode, actual_mode,
 
920
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
921
 
 
922
    def assertIsInstance(self, obj, kls):
 
923
        """Fail if obj is not an instance of kls"""
 
924
        if not isinstance(obj, kls):
 
925
            self.fail("%r is an instance of %s rather than %s" % (
 
926
                obj, obj.__class__, kls))
 
927
 
 
928
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
929
        """Invoke a test, expecting it to fail for the given reason.
 
930
 
 
931
        This is for assertions that ought to succeed, but currently fail.
 
932
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
933
        about the failure you're expecting.  If a new bug is introduced,
 
934
        AssertionError should be raised, not KnownFailure.
 
935
 
 
936
        Frequently, expectFailure should be followed by an opposite assertion.
 
937
        See example below.
 
938
 
 
939
        Intended to be used with a callable that raises AssertionError as the
 
940
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
941
 
 
942
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
943
        test succeeds.
 
944
 
 
945
        example usage::
 
946
 
 
947
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
948
                             dynamic_val)
 
949
          self.assertEqual(42, dynamic_val)
 
950
 
 
951
          This means that a dynamic_val of 54 will cause the test to raise
 
952
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
953
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
954
          than 54 or 42 will cause an AssertionError.
 
955
        """
 
956
        try:
 
957
            assertion(*args, **kwargs)
 
958
        except AssertionError:
 
959
            raise KnownFailure(reason)
 
960
        else:
 
961
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
962
 
 
963
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
964
        """A helper for callDeprecated and applyDeprecated.
 
965
 
 
966
        :param a_callable: A callable to call.
 
967
        :param args: The positional arguments for the callable
 
968
        :param kwargs: The keyword arguments for the callable
 
969
        :return: A tuple (warnings, result). result is the result of calling
 
970
            a_callable(``*args``, ``**kwargs``).
 
971
        """
 
972
        local_warnings = []
 
973
        def capture_warnings(msg, cls=None, stacklevel=None):
 
974
            # we've hooked into a deprecation specific callpath,
 
975
            # only deprecations should getting sent via it.
 
976
            self.assertEqual(cls, DeprecationWarning)
 
977
            local_warnings.append(msg)
 
978
        original_warning_method = symbol_versioning.warn
 
979
        symbol_versioning.set_warning_method(capture_warnings)
 
980
        try:
 
981
            result = a_callable(*args, **kwargs)
 
982
        finally:
 
983
            symbol_versioning.set_warning_method(original_warning_method)
 
984
        return (local_warnings, result)
 
985
 
 
986
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
987
        """Call a deprecated callable without warning the user.
 
988
 
 
989
        Note that this only captures warnings raised by symbol_versioning.warn,
 
990
        not other callers that go direct to the warning module.
 
991
 
 
992
        :param deprecation_format: The deprecation format that the callable
 
993
            should have been deprecated with. This is the same type as the
 
994
            parameter to deprecated_method/deprecated_function. If the
 
995
            callable is not deprecated with this format, an assertion error
 
996
            will be raised.
 
997
        :param a_callable: A callable to call. This may be a bound method or
 
998
            a regular function. It will be called with ``*args`` and
 
999
            ``**kwargs``.
 
1000
        :param args: The positional arguments for the callable
 
1001
        :param kwargs: The keyword arguments for the callable
 
1002
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1003
        """
 
1004
        call_warnings, result = self._capture_warnings(a_callable,
 
1005
            *args, **kwargs)
 
1006
        expected_first_warning = symbol_versioning.deprecation_string(
 
1007
            a_callable, deprecation_format)
 
1008
        if len(call_warnings) == 0:
 
1009
            self.fail("No deprecation warning generated by call to %s" %
 
1010
                a_callable)
 
1011
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1012
        return result
 
1013
 
 
1014
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1015
        """Assert that a callable is deprecated in a particular way.
 
1016
 
 
1017
        This is a very precise test for unusual requirements. The 
 
1018
        applyDeprecated helper function is probably more suited for most tests
 
1019
        as it allows you to simply specify the deprecation format being used
 
1020
        and will ensure that that is issued for the function being called.
 
1021
 
 
1022
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1023
        not other callers that go direct to the warning module.
 
1024
 
 
1025
        :param expected: a list of the deprecation warnings expected, in order
 
1026
        :param callable: The callable to call
 
1027
        :param args: The positional arguments for the callable
 
1028
        :param kwargs: The keyword arguments for the callable
 
1029
        """
 
1030
        call_warnings, result = self._capture_warnings(callable,
 
1031
            *args, **kwargs)
 
1032
        self.assertEqual(expected, call_warnings)
 
1033
        return result
 
1034
 
 
1035
    def _startLogFile(self):
 
1036
        """Send bzr and test log messages to a temporary file.
 
1037
 
 
1038
        The file is removed as the test is torn down.
 
1039
        """
 
1040
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1041
        self._log_file = os.fdopen(fileno, 'w+')
 
1042
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1043
        self._log_file_name = name
 
1044
        self.addCleanup(self._finishLogFile)
 
1045
 
 
1046
    def _finishLogFile(self):
 
1047
        """Finished with the log file.
 
1048
 
 
1049
        Close the file and delete it, unless setKeepLogfile was called.
 
1050
        """
 
1051
        if self._log_file is None:
 
1052
            return
 
1053
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1054
        self._log_file.close()
 
1055
        self._log_file = None
 
1056
        if not self._keep_log_file:
 
1057
            os.remove(self._log_file_name)
 
1058
            self._log_file_name = None
 
1059
 
 
1060
    def setKeepLogfile(self):
 
1061
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1062
        self._keep_log_file = True
 
1063
 
 
1064
    def addCleanup(self, callable):
 
1065
        """Arrange to run a callable when this case is torn down.
 
1066
 
 
1067
        Callables are run in the reverse of the order they are registered, 
 
1068
        ie last-in first-out.
 
1069
        """
 
1070
        if callable in self._cleanups:
 
1071
            raise ValueError("cleanup function %r already registered on %s" 
 
1072
                    % (callable, self))
 
1073
        self._cleanups.append(callable)
 
1074
 
 
1075
    def _cleanEnvironment(self):
 
1076
        new_env = {
 
1077
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1078
            'HOME': os.getcwd(),
 
1079
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1080
            'BZR_EMAIL': None,
 
1081
            'BZREMAIL': None, # may still be present in the environment
 
1082
            'EMAIL': None,
 
1083
            'BZR_PROGRESS_BAR': None,
 
1084
            # Proxies
 
1085
            'http_proxy': None,
 
1086
            'HTTP_PROXY': None,
 
1087
            'https_proxy': None,
 
1088
            'HTTPS_PROXY': None,
 
1089
            'no_proxy': None,
 
1090
            'NO_PROXY': None,
 
1091
            'all_proxy': None,
 
1092
            'ALL_PROXY': None,
 
1093
            # Nobody cares about these ones AFAIK. So far at
 
1094
            # least. If you do (care), please update this comment
 
1095
            # -- vila 20061212
 
1096
            'ftp_proxy': None,
 
1097
            'FTP_PROXY': None,
 
1098
            'BZR_REMOTE_PATH': None,
 
1099
        }
 
1100
        self.__old_env = {}
 
1101
        self.addCleanup(self._restoreEnvironment)
 
1102
        for name, value in new_env.iteritems():
 
1103
            self._captureVar(name, value)
 
1104
 
 
1105
    def _captureVar(self, name, newvalue):
 
1106
        """Set an environment variable, and reset it when finished."""
 
1107
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1108
 
 
1109
    def _restore_debug_flags(self):
 
1110
        debug.debug_flags.clear()
 
1111
        debug.debug_flags.update(self._preserved_debug_flags)
 
1112
 
 
1113
    def _restoreEnvironment(self):
 
1114
        for name, value in self.__old_env.iteritems():
 
1115
            osutils.set_or_unset_env(name, value)
 
1116
 
 
1117
    def _restoreHooks(self):
 
1118
        for klass, hooks in self._preserved_hooks.items():
 
1119
            setattr(klass, 'hooks', hooks)
 
1120
 
 
1121
    def knownFailure(self, reason):
 
1122
        """This test has failed for some known reason."""
 
1123
        raise KnownFailure(reason)
 
1124
 
 
1125
    def run(self, result=None):
 
1126
        if result is None: result = self.defaultTestResult()
 
1127
        for feature in getattr(self, '_test_needs_features', []):
 
1128
            if not feature.available():
 
1129
                result.startTest(self)
 
1130
                if getattr(result, 'addNotSupported', None):
 
1131
                    result.addNotSupported(self, feature)
 
1132
                else:
 
1133
                    result.addSuccess(self)
 
1134
                result.stopTest(self)
 
1135
                return
 
1136
        return unittest.TestCase.run(self, result)
 
1137
 
 
1138
    def tearDown(self):
 
1139
        self._runCleanups()
 
1140
        unittest.TestCase.tearDown(self)
 
1141
 
 
1142
    def time(self, callable, *args, **kwargs):
 
1143
        """Run callable and accrue the time it takes to the benchmark time.
 
1144
        
 
1145
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1146
        this will cause lsprofile statistics to be gathered and stored in
 
1147
        self._benchcalls.
 
1148
        """
 
1149
        if self._benchtime is None:
 
1150
            self._benchtime = 0
 
1151
        start = time.time()
 
1152
        try:
 
1153
            if not self._gather_lsprof_in_benchmarks:
 
1154
                return callable(*args, **kwargs)
 
1155
            else:
 
1156
                # record this benchmark
 
1157
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1158
                stats.sort()
 
1159
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1160
                return ret
 
1161
        finally:
 
1162
            self._benchtime += time.time() - start
 
1163
 
 
1164
    def _runCleanups(self):
 
1165
        """Run registered cleanup functions. 
 
1166
 
 
1167
        This should only be called from TestCase.tearDown.
 
1168
        """
 
1169
        # TODO: Perhaps this should keep running cleanups even if 
 
1170
        # one of them fails?
 
1171
 
 
1172
        # Actually pop the cleanups from the list so tearDown running
 
1173
        # twice is safe (this happens for skipped tests).
 
1174
        while self._cleanups:
 
1175
            self._cleanups.pop()()
 
1176
 
 
1177
    def log(self, *args):
 
1178
        mutter(*args)
 
1179
 
 
1180
    def _get_log(self, keep_log_file=False):
 
1181
        """Return as a string the log for this test. If the file is still
 
1182
        on disk and keep_log_file=False, delete the log file and store the
 
1183
        content in self._log_contents."""
 
1184
        # flush the log file, to get all content
 
1185
        import bzrlib.trace
 
1186
        bzrlib.trace._trace_file.flush()
 
1187
        if self._log_contents:
 
1188
            return self._log_contents
 
1189
        if self._log_file_name is not None:
 
1190
            logfile = open(self._log_file_name)
 
1191
            try:
 
1192
                log_contents = logfile.read()
 
1193
            finally:
 
1194
                logfile.close()
 
1195
            if not keep_log_file:
 
1196
                self._log_contents = log_contents
 
1197
                try:
 
1198
                    os.remove(self._log_file_name)
 
1199
                except OSError, e:
 
1200
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1201
                        print >>sys.stderr, ('Unable to delete log file '
 
1202
                                             ' %r' % self._log_file_name)
 
1203
                    else:
 
1204
                        raise
 
1205
            return log_contents
 
1206
        else:
 
1207
            return "DELETED log file to reduce memory footprint"
 
1208
 
 
1209
    @deprecated_method(zero_eighteen)
 
1210
    def capture(self, cmd, retcode=0):
 
1211
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1212
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1213
 
 
1214
    def requireFeature(self, feature):
 
1215
        """This test requires a specific feature is available.
 
1216
 
 
1217
        :raises UnavailableFeature: When feature is not available.
 
1218
        """
 
1219
        if not feature.available():
 
1220
            raise UnavailableFeature(feature)
 
1221
 
 
1222
    @deprecated_method(zero_eighteen)
 
1223
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1224
                         working_dir=None):
 
1225
        """Invoke bzr and return (stdout, stderr).
 
1226
 
 
1227
        Don't call this method, just use run_bzr() which is equivalent.
 
1228
 
 
1229
        :param argv: Arguments to invoke bzr.  This may be either a 
 
1230
            single string, in which case it is split by shlex into words, 
 
1231
            or a list of arguments.
 
1232
        :param retcode: Expected return code, or None for don't-care.
 
1233
        :param encoding: Encoding for sys.stdout and sys.stderr
 
1234
        :param stdin: A string to be used as stdin for the command.
 
1235
        :param working_dir: Change to this directory before running
 
1236
        """
 
1237
        return self._run_bzr_autosplit(argv, retcode=retcode,
 
1238
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1239
                )
 
1240
 
 
1241
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1242
            working_dir):
 
1243
        """Run bazaar command line, splitting up a string command line."""
 
1244
        if isinstance(args, basestring):
 
1245
            args = list(shlex.split(args))
 
1246
        return self._run_bzr_core(args, retcode=retcode,
 
1247
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1248
                )
 
1249
 
 
1250
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1251
            working_dir):
 
1252
        if encoding is None:
 
1253
            encoding = bzrlib.user_encoding
 
1254
        stdout = StringIOWrapper()
 
1255
        stderr = StringIOWrapper()
 
1256
        stdout.encoding = encoding
 
1257
        stderr.encoding = encoding
 
1258
 
 
1259
        self.log('run bzr: %r', args)
 
1260
        # FIXME: don't call into logging here
 
1261
        handler = logging.StreamHandler(stderr)
 
1262
        handler.setLevel(logging.INFO)
 
1263
        logger = logging.getLogger('')
 
1264
        logger.addHandler(handler)
 
1265
        old_ui_factory = ui.ui_factory
 
1266
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1267
 
 
1268
        cwd = None
 
1269
        if working_dir is not None:
 
1270
            cwd = osutils.getcwd()
 
1271
            os.chdir(working_dir)
 
1272
 
 
1273
        try:
 
1274
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1275
                stdout, stderr,
 
1276
                bzrlib.commands.run_bzr_catch_errors,
 
1277
                args)
 
1278
        finally:
 
1279
            logger.removeHandler(handler)
 
1280
            ui.ui_factory = old_ui_factory
 
1281
            if cwd is not None:
 
1282
                os.chdir(cwd)
 
1283
 
 
1284
        out = stdout.getvalue()
 
1285
        err = stderr.getvalue()
 
1286
        if out:
 
1287
            self.log('output:\n%r', out)
 
1288
        if err:
 
1289
            self.log('errors:\n%r', err)
 
1290
        if retcode is not None:
 
1291
            self.assertEquals(retcode, result,
 
1292
                              message='Unexpected return code')
 
1293
        return out, err
 
1294
 
 
1295
    def run_bzr(self, *args, **kwargs):
 
1296
        """Invoke bzr, as if it were run from the command line.
 
1297
 
 
1298
        The argument list should not include the bzr program name - the
 
1299
        first argument is normally the bzr command.  Arguments may be
 
1300
        passed in three ways:
 
1301
 
 
1302
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1303
        when the command contains whitespace or metacharacters, or 
 
1304
        is built up at run time.
 
1305
 
 
1306
        2- A single string, eg "add a".  This is the most convenient 
 
1307
        for hardcoded commands.
 
1308
 
 
1309
        3- Several varargs parameters, eg run_bzr("add", "a").  
 
1310
        This is not recommended for new code.
 
1311
 
 
1312
        This runs bzr through the interface that catches and reports
 
1313
        errors, and with logging set to something approximating the
 
1314
        default, so that error reporting can be checked.
 
1315
 
 
1316
        This should be the main method for tests that want to exercise the
 
1317
        overall behavior of the bzr application (rather than a unit test
 
1318
        or a functional test of the library.)
 
1319
 
 
1320
        This sends the stdout/stderr results into the test's log,
 
1321
        where it may be useful for debugging.  See also run_captured.
 
1322
 
 
1323
        :keyword stdin: A string to be used as stdin for the command.
 
1324
        :keyword retcode: The status code the command should return;
 
1325
            default 0.
 
1326
        :keyword working_dir: The directory to run the command in
 
1327
        :keyword error_regexes: A list of expected error messages.  If
 
1328
            specified they must be seen in the error output of the command.
 
1329
        """
 
1330
        retcode = kwargs.pop('retcode', 0)
 
1331
        encoding = kwargs.pop('encoding', None)
 
1332
        stdin = kwargs.pop('stdin', None)
 
1333
        working_dir = kwargs.pop('working_dir', None)
 
1334
        error_regexes = kwargs.pop('error_regexes', [])
 
1335
 
 
1336
        if len(args) == 1:
 
1337
            if isinstance(args[0], (list, basestring)):
 
1338
                args = args[0]
 
1339
        else:
 
1340
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
 
1341
                                   DeprecationWarning, stacklevel=3)
 
1342
 
 
1343
        out, err = self._run_bzr_autosplit(args=args,
 
1344
            retcode=retcode,
 
1345
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1346
            )
 
1347
 
 
1348
        for regex in error_regexes:
 
1349
            self.assertContainsRe(err, regex)
 
1350
        return out, err
 
1351
 
 
1352
    def run_bzr_decode(self, *args, **kwargs):
 
1353
        if 'encoding' in kwargs:
 
1354
            encoding = kwargs['encoding']
 
1355
        else:
 
1356
            encoding = bzrlib.user_encoding
 
1357
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1358
 
 
1359
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1360
        """Run bzr, and check that stderr contains the supplied regexes
 
1361
 
 
1362
        :param error_regexes: Sequence of regular expressions which
 
1363
            must each be found in the error output. The relative ordering
 
1364
            is not enforced.
 
1365
        :param args: command-line arguments for bzr
 
1366
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1367
            This function changes the default value of retcode to be 3,
 
1368
            since in most cases this is run when you expect bzr to fail.
 
1369
 
 
1370
        :return: (out, err) The actual output of running the command (in case
 
1371
            you want to do more inspection)
 
1372
 
 
1373
        Examples of use::
 
1374
 
 
1375
            # Make sure that commit is failing because there is nothing to do
 
1376
            self.run_bzr_error(['no changes to commit'],
 
1377
                               'commit', '-m', 'my commit comment')
 
1378
            # Make sure --strict is handling an unknown file, rather than
 
1379
            # giving us the 'nothing to do' error
 
1380
            self.build_tree(['unknown'])
 
1381
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1382
                               'commit', '--strict', '-m', 'my commit comment')
 
1383
        """
 
1384
        kwargs.setdefault('retcode', 3)
 
1385
        kwargs['error_regexes'] = error_regexes
 
1386
        out, err = self.run_bzr(*args, **kwargs)
 
1387
        return out, err
 
1388
 
 
1389
    def run_bzr_subprocess(self, *args, **kwargs):
 
1390
        """Run bzr in a subprocess for testing.
 
1391
 
 
1392
        This starts a new Python interpreter and runs bzr in there. 
 
1393
        This should only be used for tests that have a justifiable need for
 
1394
        this isolation: e.g. they are testing startup time, or signal
 
1395
        handling, or early startup code, etc.  Subprocess code can't be 
 
1396
        profiled or debugged so easily.
 
1397
 
 
1398
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1399
            None is supplied, the status code is not checked.
 
1400
        :keyword env_changes: A dictionary which lists changes to environment
 
1401
            variables. A value of None will unset the env variable.
 
1402
            The values must be strings. The change will only occur in the
 
1403
            child, so you don't need to fix the environment after running.
 
1404
        :keyword universal_newlines: Convert CRLF => LF
 
1405
        :keyword allow_plugins: By default the subprocess is run with
 
1406
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1407
            for system-wide plugins to create unexpected output on stderr,
 
1408
            which can cause unnecessary test failures.
 
1409
        """
 
1410
        env_changes = kwargs.get('env_changes', {})
 
1411
        working_dir = kwargs.get('working_dir', None)
 
1412
        allow_plugins = kwargs.get('allow_plugins', False)
 
1413
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1414
                                            working_dir=working_dir,
 
1415
                                            allow_plugins=allow_plugins)
 
1416
        # We distinguish between retcode=None and retcode not passed.
 
1417
        supplied_retcode = kwargs.get('retcode', 0)
 
1418
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1419
            universal_newlines=kwargs.get('universal_newlines', False),
 
1420
            process_args=args)
 
1421
 
 
1422
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1423
                             skip_if_plan_to_signal=False,
 
1424
                             working_dir=None,
 
1425
                             allow_plugins=False):
 
1426
        """Start bzr in a subprocess for testing.
 
1427
 
 
1428
        This starts a new Python interpreter and runs bzr in there.
 
1429
        This should only be used for tests that have a justifiable need for
 
1430
        this isolation: e.g. they are testing startup time, or signal
 
1431
        handling, or early startup code, etc.  Subprocess code can't be
 
1432
        profiled or debugged so easily.
 
1433
 
 
1434
        :param process_args: a list of arguments to pass to the bzr executable,
 
1435
            for example ``['--version']``.
 
1436
        :param env_changes: A dictionary which lists changes to environment
 
1437
            variables. A value of None will unset the env variable.
 
1438
            The values must be strings. The change will only occur in the
 
1439
            child, so you don't need to fix the environment after running.
 
1440
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1441
            is not available.
 
1442
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1443
 
 
1444
        :returns: Popen object for the started process.
 
1445
        """
 
1446
        if skip_if_plan_to_signal:
 
1447
            if not getattr(os, 'kill', None):
 
1448
                raise TestSkipped("os.kill not available.")
 
1449
 
 
1450
        if env_changes is None:
 
1451
            env_changes = {}
 
1452
        old_env = {}
 
1453
 
 
1454
        def cleanup_environment():
 
1455
            for env_var, value in env_changes.iteritems():
 
1456
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1457
 
 
1458
        def restore_environment():
 
1459
            for env_var, value in old_env.iteritems():
 
1460
                osutils.set_or_unset_env(env_var, value)
 
1461
 
 
1462
        bzr_path = self.get_bzr_path()
 
1463
 
 
1464
        cwd = None
 
1465
        if working_dir is not None:
 
1466
            cwd = osutils.getcwd()
 
1467
            os.chdir(working_dir)
 
1468
 
 
1469
        try:
 
1470
            # win32 subprocess doesn't support preexec_fn
 
1471
            # so we will avoid using it on all platforms, just to
 
1472
            # make sure the code path is used, and we don't break on win32
 
1473
            cleanup_environment()
 
1474
            command = [sys.executable, bzr_path]
 
1475
            if not allow_plugins:
 
1476
                command.append('--no-plugins')
 
1477
            command.extend(process_args)
 
1478
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1479
        finally:
 
1480
            restore_environment()
 
1481
            if cwd is not None:
 
1482
                os.chdir(cwd)
 
1483
 
 
1484
        return process
 
1485
 
 
1486
    def _popen(self, *args, **kwargs):
 
1487
        """Place a call to Popen.
 
1488
 
 
1489
        Allows tests to override this method to intercept the calls made to
 
1490
        Popen for introspection.
 
1491
        """
 
1492
        return Popen(*args, **kwargs)
 
1493
 
 
1494
    def get_bzr_path(self):
 
1495
        """Return the path of the 'bzr' executable for this test suite."""
 
1496
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1497
        if not os.path.isfile(bzr_path):
 
1498
            # We are probably installed. Assume sys.argv is the right file
 
1499
            bzr_path = sys.argv[0]
 
1500
        return bzr_path
 
1501
 
 
1502
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1503
                              universal_newlines=False, process_args=None):
 
1504
        """Finish the execution of process.
 
1505
 
 
1506
        :param process: the Popen object returned from start_bzr_subprocess.
 
1507
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1508
            None is supplied, the status code is not checked.
 
1509
        :param send_signal: an optional signal to send to the process.
 
1510
        :param universal_newlines: Convert CRLF => LF
 
1511
        :returns: (stdout, stderr)
 
1512
        """
 
1513
        if send_signal is not None:
 
1514
            os.kill(process.pid, send_signal)
 
1515
        out, err = process.communicate()
 
1516
 
 
1517
        if universal_newlines:
 
1518
            out = out.replace('\r\n', '\n')
 
1519
            err = err.replace('\r\n', '\n')
 
1520
 
 
1521
        if retcode is not None and retcode != process.returncode:
 
1522
            if process_args is None:
 
1523
                process_args = "(unknown args)"
 
1524
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1525
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1526
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1527
                      % (process_args, retcode, process.returncode))
 
1528
        return [out, err]
 
1529
 
 
1530
    def check_inventory_shape(self, inv, shape):
 
1531
        """Compare an inventory to a list of expected names.
 
1532
 
 
1533
        Fail if they are not precisely equal.
 
1534
        """
 
1535
        extras = []
 
1536
        shape = list(shape)             # copy
 
1537
        for path, ie in inv.entries():
 
1538
            name = path.replace('\\', '/')
 
1539
            if ie.kind == 'directory':
 
1540
                name = name + '/'
 
1541
            if name in shape:
 
1542
                shape.remove(name)
 
1543
            else:
 
1544
                extras.append(name)
 
1545
        if shape:
 
1546
            self.fail("expected paths not found in inventory: %r" % shape)
 
1547
        if extras:
 
1548
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1549
 
 
1550
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1551
                         a_callable=None, *args, **kwargs):
 
1552
        """Call callable with redirected std io pipes.
 
1553
 
 
1554
        Returns the return code."""
 
1555
        if not callable(a_callable):
 
1556
            raise ValueError("a_callable must be callable.")
 
1557
        if stdin is None:
 
1558
            stdin = StringIO("")
 
1559
        if stdout is None:
 
1560
            if getattr(self, "_log_file", None) is not None:
 
1561
                stdout = self._log_file
 
1562
            else:
 
1563
                stdout = StringIO()
 
1564
        if stderr is None:
 
1565
            if getattr(self, "_log_file", None is not None):
 
1566
                stderr = self._log_file
 
1567
            else:
 
1568
                stderr = StringIO()
 
1569
        real_stdin = sys.stdin
 
1570
        real_stdout = sys.stdout
 
1571
        real_stderr = sys.stderr
 
1572
        try:
 
1573
            sys.stdout = stdout
 
1574
            sys.stderr = stderr
 
1575
            sys.stdin = stdin
 
1576
            return a_callable(*args, **kwargs)
 
1577
        finally:
 
1578
            sys.stdout = real_stdout
 
1579
            sys.stderr = real_stderr
 
1580
            sys.stdin = real_stdin
 
1581
 
 
1582
    def reduceLockdirTimeout(self):
 
1583
        """Reduce the default lock timeout for the duration of the test, so that
 
1584
        if LockContention occurs during a test, it does so quickly.
 
1585
 
 
1586
        Tests that expect to provoke LockContention errors should call this.
 
1587
        """
 
1588
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1589
        def resetTimeout():
 
1590
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1591
        self.addCleanup(resetTimeout)
 
1592
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1593
 
 
1594
 
 
1595
class TestCaseWithMemoryTransport(TestCase):
 
1596
    """Common test class for tests that do not need disk resources.
 
1597
 
 
1598
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1599
 
 
1600
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1601
 
 
1602
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1603
    a directory which does not exist. This serves to help ensure test isolation
 
1604
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1605
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1606
    file defaults for the transport in tests, nor does it obey the command line
 
1607
    override, so tests that accidentally write to the common directory should
 
1608
    be rare.
 
1609
 
 
1610
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1611
    a .bzr directory that stops us ascending higher into the filesystem.
 
1612
    """
 
1613
 
 
1614
    TEST_ROOT = None
 
1615
    _TEST_NAME = 'test'
 
1616
 
 
1617
    def __init__(self, methodName='runTest'):
 
1618
        # allow test parameterisation after test construction and before test
 
1619
        # execution. Variables that the parameteriser sets need to be 
 
1620
        # ones that are not set by setUp, or setUp will trash them.
 
1621
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1622
        self.vfs_transport_factory = default_transport
 
1623
        self.transport_server = None
 
1624
        self.transport_readonly_server = None
 
1625
        self.__vfs_server = None
 
1626
 
 
1627
    def get_transport(self, relpath=None):
 
1628
        """Return a writeable transport.
 
1629
 
 
1630
        This transport is for the test scratch space relative to
 
1631
        "self._test_root""
 
1632
        
 
1633
        :param relpath: a path relative to the base url.
 
1634
        """
 
1635
        t = get_transport(self.get_url(relpath))
 
1636
        self.assertFalse(t.is_readonly())
 
1637
        return t
 
1638
 
 
1639
    def get_readonly_transport(self, relpath=None):
 
1640
        """Return a readonly transport for the test scratch space
 
1641
        
 
1642
        This can be used to test that operations which should only need
 
1643
        readonly access in fact do not try to write.
 
1644
 
 
1645
        :param relpath: a path relative to the base url.
 
1646
        """
 
1647
        t = get_transport(self.get_readonly_url(relpath))
 
1648
        self.assertTrue(t.is_readonly())
 
1649
        return t
 
1650
 
 
1651
    def create_transport_readonly_server(self):
 
1652
        """Create a transport server from class defined at init.
 
1653
 
 
1654
        This is mostly a hook for daughter classes.
 
1655
        """
 
1656
        return self.transport_readonly_server()
 
1657
 
 
1658
    def get_readonly_server(self):
 
1659
        """Get the server instance for the readonly transport
 
1660
 
 
1661
        This is useful for some tests with specific servers to do diagnostics.
 
1662
        """
 
1663
        if self.__readonly_server is None:
 
1664
            if self.transport_readonly_server is None:
 
1665
                # readonly decorator requested
 
1666
                # bring up the server
 
1667
                self.__readonly_server = ReadonlyServer()
 
1668
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1669
            else:
 
1670
                self.__readonly_server = self.create_transport_readonly_server()
 
1671
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1672
            self.addCleanup(self.__readonly_server.tearDown)
 
1673
        return self.__readonly_server
 
1674
 
 
1675
    def get_readonly_url(self, relpath=None):
 
1676
        """Get a URL for the readonly transport.
 
1677
 
 
1678
        This will either be backed by '.' or a decorator to the transport 
 
1679
        used by self.get_url()
 
1680
        relpath provides for clients to get a path relative to the base url.
 
1681
        These should only be downwards relative, not upwards.
 
1682
        """
 
1683
        base = self.get_readonly_server().get_url()
 
1684
        return self._adjust_url(base, relpath)
 
1685
 
 
1686
    def get_vfs_only_server(self):
 
1687
        """Get the vfs only read/write server instance.
 
1688
 
 
1689
        This is useful for some tests with specific servers that need
 
1690
        diagnostics.
 
1691
 
 
1692
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1693
        is no means to override it.
 
1694
        """
 
1695
        if self.__vfs_server is None:
 
1696
            self.__vfs_server = MemoryServer()
 
1697
            self.__vfs_server.setUp()
 
1698
            self.addCleanup(self.__vfs_server.tearDown)
 
1699
        return self.__vfs_server
 
1700
 
 
1701
    def get_server(self):
 
1702
        """Get the read/write server instance.
 
1703
 
 
1704
        This is useful for some tests with specific servers that need
 
1705
        diagnostics.
 
1706
 
 
1707
        This is built from the self.transport_server factory. If that is None,
 
1708
        then the self.get_vfs_server is returned.
 
1709
        """
 
1710
        if self.__server is None:
 
1711
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1712
                return self.get_vfs_only_server()
 
1713
            else:
 
1714
                # bring up a decorated means of access to the vfs only server.
 
1715
                self.__server = self.transport_server()
 
1716
                try:
 
1717
                    self.__server.setUp(self.get_vfs_only_server())
 
1718
                except TypeError, e:
 
1719
                    # This should never happen; the try:Except here is to assist
 
1720
                    # developers having to update code rather than seeing an
 
1721
                    # uninformative TypeError.
 
1722
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1723
            self.addCleanup(self.__server.tearDown)
 
1724
        return self.__server
 
1725
 
 
1726
    def _adjust_url(self, base, relpath):
 
1727
        """Get a URL (or maybe a path) for the readwrite transport.
 
1728
 
 
1729
        This will either be backed by '.' or to an equivalent non-file based
 
1730
        facility.
 
1731
        relpath provides for clients to get a path relative to the base url.
 
1732
        These should only be downwards relative, not upwards.
 
1733
        """
 
1734
        if relpath is not None and relpath != '.':
 
1735
            if not base.endswith('/'):
 
1736
                base = base + '/'
 
1737
            # XXX: Really base should be a url; we did after all call
 
1738
            # get_url()!  But sometimes it's just a path (from
 
1739
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1740
            # to a non-escaped local path.
 
1741
            if base.startswith('./') or base.startswith('/'):
 
1742
                base += relpath
 
1743
            else:
 
1744
                base += urlutils.escape(relpath)
 
1745
        return base
 
1746
 
 
1747
    def get_url(self, relpath=None):
 
1748
        """Get a URL (or maybe a path) for the readwrite transport.
 
1749
 
 
1750
        This will either be backed by '.' or to an equivalent non-file based
 
1751
        facility.
 
1752
        relpath provides for clients to get a path relative to the base url.
 
1753
        These should only be downwards relative, not upwards.
 
1754
        """
 
1755
        base = self.get_server().get_url()
 
1756
        return self._adjust_url(base, relpath)
 
1757
 
 
1758
    def get_vfs_only_url(self, relpath=None):
 
1759
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1760
 
 
1761
        This will never be a smart protocol.  It always has all the
 
1762
        capabilities of the local filesystem, but it might actually be a
 
1763
        MemoryTransport or some other similar virtual filesystem.
 
1764
 
 
1765
        This is the backing transport (if any) of the server returned by
 
1766
        get_url and get_readonly_url.
 
1767
 
 
1768
        :param relpath: provides for clients to get a path relative to the base
 
1769
            url.  These should only be downwards relative, not upwards.
 
1770
        :return: A URL
 
1771
        """
 
1772
        base = self.get_vfs_only_server().get_url()
 
1773
        return self._adjust_url(base, relpath)
 
1774
 
 
1775
    def _make_test_root(self):
 
1776
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1777
            return
 
1778
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1779
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
1780
        
 
1781
        # make a fake bzr directory there to prevent any tests propagating
 
1782
        # up onto the source directory's real branch
 
1783
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1784
 
 
1785
        # The same directory is used by all tests, and we're not specifically
 
1786
        # told when all tests are finished.  This will do.
 
1787
        atexit.register(_rmtree_temp_dir, root)
 
1788
 
 
1789
    def makeAndChdirToTestDir(self):
 
1790
        """Create a temporary directories for this one test.
 
1791
        
 
1792
        This must set self.test_home_dir and self.test_dir and chdir to
 
1793
        self.test_dir.
 
1794
        
 
1795
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1796
        """
 
1797
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1798
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1799
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1800
        
 
1801
    def make_branch(self, relpath, format=None):
 
1802
        """Create a branch on the transport at relpath."""
 
1803
        repo = self.make_repository(relpath, format=format)
 
1804
        return repo.bzrdir.create_branch()
 
1805
 
 
1806
    def make_bzrdir(self, relpath, format=None):
 
1807
        try:
 
1808
            # might be a relative or absolute path
 
1809
            maybe_a_url = self.get_url(relpath)
 
1810
            segments = maybe_a_url.rsplit('/', 1)
 
1811
            t = get_transport(maybe_a_url)
 
1812
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1813
                t.ensure_base()
 
1814
            if format is None:
 
1815
                format = 'default'
 
1816
            if isinstance(format, basestring):
 
1817
                format = bzrdir.format_registry.make_bzrdir(format)
 
1818
            return format.initialize_on_transport(t)
 
1819
        except errors.UninitializableFormat:
 
1820
            raise TestSkipped("Format %s is not initializable." % format)
 
1821
 
 
1822
    def make_repository(self, relpath, shared=False, format=None):
 
1823
        """Create a repository on our default transport at relpath.
 
1824
        
 
1825
        Note that relpath must be a relative path, not a full url.
 
1826
        """
 
1827
        # FIXME: If you create a remoterepository this returns the underlying
 
1828
        # real format, which is incorrect.  Actually we should make sure that 
 
1829
        # RemoteBzrDir returns a RemoteRepository.
 
1830
        # maybe  mbp 20070410
 
1831
        made_control = self.make_bzrdir(relpath, format=format)
 
1832
        return made_control.create_repository(shared=shared)
 
1833
 
 
1834
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1835
        """Create a branch on the default transport and a MemoryTree for it."""
 
1836
        b = self.make_branch(relpath, format=format)
 
1837
        return memorytree.MemoryTree.create_on_branch(b)
 
1838
 
 
1839
    def overrideEnvironmentForTesting(self):
 
1840
        os.environ['HOME'] = self.test_home_dir
 
1841
        os.environ['BZR_HOME'] = self.test_home_dir
 
1842
        
 
1843
    def setUp(self):
 
1844
        super(TestCaseWithMemoryTransport, self).setUp()
 
1845
        self._make_test_root()
 
1846
        _currentdir = os.getcwdu()
 
1847
        def _leaveDirectory():
 
1848
            os.chdir(_currentdir)
 
1849
        self.addCleanup(_leaveDirectory)
 
1850
        self.makeAndChdirToTestDir()
 
1851
        self.overrideEnvironmentForTesting()
 
1852
        self.__readonly_server = None
 
1853
        self.__server = None
 
1854
        self.reduceLockdirTimeout()
 
1855
 
 
1856
     
 
1857
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1858
    """Derived class that runs a test within a temporary directory.
 
1859
 
 
1860
    This is useful for tests that need to create a branch, etc.
 
1861
 
 
1862
    The directory is created in a slightly complex way: for each
 
1863
    Python invocation, a new temporary top-level directory is created.
 
1864
    All test cases create their own directory within that.  If the
 
1865
    tests complete successfully, the directory is removed.
 
1866
 
 
1867
    :ivar test_base_dir: The path of the top-level directory for this 
 
1868
    test, which contains a home directory and a work directory.
 
1869
 
 
1870
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1871
    which is used as $HOME for this test.
 
1872
 
 
1873
    :ivar test_dir: A directory under test_base_dir used as the current
 
1874
    directory when the test proper is run.
 
1875
    """
 
1876
 
 
1877
    OVERRIDE_PYTHON = 'python'
 
1878
 
 
1879
    def check_file_contents(self, filename, expect):
 
1880
        self.log("check contents of file %s" % filename)
 
1881
        contents = file(filename, 'r').read()
 
1882
        if contents != expect:
 
1883
            self.log("expected: %r" % expect)
 
1884
            self.log("actually: %r" % contents)
 
1885
            self.fail("contents of %s not as expected" % filename)
 
1886
 
 
1887
    def makeAndChdirToTestDir(self):
 
1888
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1889
        
 
1890
        For TestCaseInTempDir we create a temporary directory based on the test
 
1891
        name and then create two subdirs - test and home under it.
 
1892
        """
 
1893
        # create a directory within the top level test directory
 
1894
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
1895
        # now create test and home directories within this dir
 
1896
        self.test_base_dir = candidate_dir
 
1897
        self.test_home_dir = self.test_base_dir + '/home'
 
1898
        os.mkdir(self.test_home_dir)
 
1899
        self.test_dir = self.test_base_dir + '/work'
 
1900
        os.mkdir(self.test_dir)
 
1901
        os.chdir(self.test_dir)
 
1902
        # put name of test inside
 
1903
        f = file(self.test_base_dir + '/name', 'w')
 
1904
        try:
 
1905
            f.write(self.id())
 
1906
        finally:
 
1907
            f.close()
 
1908
        self.addCleanup(self.deleteTestDir)
 
1909
 
 
1910
    def deleteTestDir(self):
 
1911
        os.chdir(self.TEST_ROOT)
 
1912
        _rmtree_temp_dir(self.test_base_dir)
 
1913
 
 
1914
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1915
        """Build a test tree according to a pattern.
 
1916
 
 
1917
        shape is a sequence of file specifications.  If the final
 
1918
        character is '/', a directory is created.
 
1919
 
 
1920
        This assumes that all the elements in the tree being built are new.
 
1921
 
 
1922
        This doesn't add anything to a branch.
 
1923
 
 
1924
        :param line_endings: Either 'binary' or 'native'
 
1925
            in binary mode, exact contents are written in native mode, the
 
1926
            line endings match the default platform endings.
 
1927
        :param transport: A transport to write to, for building trees on VFS's.
 
1928
            If the transport is readonly or None, "." is opened automatically.
 
1929
        :return: None
 
1930
        """
 
1931
        # It's OK to just create them using forward slashes on windows.
 
1932
        if transport is None or transport.is_readonly():
 
1933
            transport = get_transport(".")
 
1934
        for name in shape:
 
1935
            self.assert_(isinstance(name, basestring))
 
1936
            if name[-1] == '/':
 
1937
                transport.mkdir(urlutils.escape(name[:-1]))
 
1938
            else:
 
1939
                if line_endings == 'binary':
 
1940
                    end = '\n'
 
1941
                elif line_endings == 'native':
 
1942
                    end = os.linesep
 
1943
                else:
 
1944
                    raise errors.BzrError(
 
1945
                        'Invalid line ending request %r' % line_endings)
 
1946
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1947
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1948
 
 
1949
    def build_tree_contents(self, shape):
 
1950
        build_tree_contents(shape)
 
1951
 
 
1952
    def assertFileEqual(self, content, path):
 
1953
        """Fail if path does not contain 'content'."""
 
1954
        self.failUnlessExists(path)
 
1955
        f = file(path, 'rb')
 
1956
        try:
 
1957
            s = f.read()
 
1958
        finally:
 
1959
            f.close()
 
1960
        self.assertEqualDiff(content, s)
 
1961
 
 
1962
    def failUnlessExists(self, path):
 
1963
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1964
        if not isinstance(path, basestring):
 
1965
            for p in path:
 
1966
                self.failUnlessExists(p)
 
1967
        else:
 
1968
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1969
 
 
1970
    def failIfExists(self, path):
 
1971
        """Fail if path or paths, which may be abs or relative, exist."""
 
1972
        if not isinstance(path, basestring):
 
1973
            for p in path:
 
1974
                self.failIfExists(p)
 
1975
        else:
 
1976
            self.failIf(osutils.lexists(path),path+" exists")
 
1977
 
 
1978
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
1979
        """Assert whether path or paths are in the WorkingTree"""
 
1980
        if tree is None:
 
1981
            tree = workingtree.WorkingTree.open(root_path)
 
1982
        if not isinstance(path, basestring):
 
1983
            for p in path:
 
1984
                self.assertInWorkingTree(p,tree=tree)
 
1985
        else:
 
1986
            self.assertIsNot(tree.path2id(path), None,
 
1987
                path+' not in working tree.')
 
1988
 
 
1989
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
1990
        """Assert whether path or paths are not in the WorkingTree"""
 
1991
        if tree is None:
 
1992
            tree = workingtree.WorkingTree.open(root_path)
 
1993
        if not isinstance(path, basestring):
 
1994
            for p in path:
 
1995
                self.assertNotInWorkingTree(p,tree=tree)
 
1996
        else:
 
1997
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
1998
 
 
1999
 
 
2000
class TestCaseWithTransport(TestCaseInTempDir):
 
2001
    """A test case that provides get_url and get_readonly_url facilities.
 
2002
 
 
2003
    These back onto two transport servers, one for readonly access and one for
 
2004
    read write access.
 
2005
 
 
2006
    If no explicit class is provided for readonly access, a
 
2007
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2008
    based read write transports.
 
2009
 
 
2010
    If an explicit class is provided for readonly access, that server and the 
 
2011
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2012
    """
 
2013
 
 
2014
    def get_vfs_only_server(self):
 
2015
        """See TestCaseWithMemoryTransport.
 
2016
 
 
2017
        This is useful for some tests with specific servers that need
 
2018
        diagnostics.
 
2019
        """
 
2020
        if self.__vfs_server is None:
 
2021
            self.__vfs_server = self.vfs_transport_factory()
 
2022
            self.__vfs_server.setUp()
 
2023
            self.addCleanup(self.__vfs_server.tearDown)
 
2024
        return self.__vfs_server
 
2025
 
 
2026
    def make_branch_and_tree(self, relpath, format=None):
 
2027
        """Create a branch on the transport and a tree locally.
 
2028
 
 
2029
        If the transport is not a LocalTransport, the Tree can't be created on
 
2030
        the transport.  In that case if the vfs_transport_factory is
 
2031
        LocalURLServer the working tree is created in the local
 
2032
        directory backing the transport, and the returned tree's branch and
 
2033
        repository will also be accessed locally. Otherwise a lightweight
 
2034
        checkout is created and returned.
 
2035
 
 
2036
        :param format: The BzrDirFormat.
 
2037
        :returns: the WorkingTree.
 
2038
        """
 
2039
        # TODO: always use the local disk path for the working tree,
 
2040
        # this obviously requires a format that supports branch references
 
2041
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2042
        # RBC 20060208
 
2043
        b = self.make_branch(relpath, format=format)
 
2044
        try:
 
2045
            return b.bzrdir.create_workingtree()
 
2046
        except errors.NotLocalUrl:
 
2047
            # We can only make working trees locally at the moment.  If the
 
2048
            # transport can't support them, then we keep the non-disk-backed
 
2049
            # branch and create a local checkout.
 
2050
            if self.vfs_transport_factory is LocalURLServer:
 
2051
                # the branch is colocated on disk, we cannot create a checkout.
 
2052
                # hopefully callers will expect this.
 
2053
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2054
                return local_controldir.create_workingtree()
 
2055
            else:
 
2056
                return b.create_checkout(relpath, lightweight=True)
 
2057
 
 
2058
    def assertIsDirectory(self, relpath, transport):
 
2059
        """Assert that relpath within transport is a directory.
 
2060
 
 
2061
        This may not be possible on all transports; in that case it propagates
 
2062
        a TransportNotPossible.
 
2063
        """
 
2064
        try:
 
2065
            mode = transport.stat(relpath).st_mode
 
2066
        except errors.NoSuchFile:
 
2067
            self.fail("path %s is not a directory; no such file"
 
2068
                      % (relpath))
 
2069
        if not stat.S_ISDIR(mode):
 
2070
            self.fail("path %s is not a directory; has mode %#o"
 
2071
                      % (relpath, mode))
 
2072
 
 
2073
    def assertTreesEqual(self, left, right):
 
2074
        """Check that left and right have the same content and properties."""
 
2075
        # we use a tree delta to check for equality of the content, and we
 
2076
        # manually check for equality of other things such as the parents list.
 
2077
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2078
        differences = left.changes_from(right)
 
2079
        self.assertFalse(differences.has_changed(),
 
2080
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2081
 
 
2082
    def setUp(self):
 
2083
        super(TestCaseWithTransport, self).setUp()
 
2084
        self.__vfs_server = None
 
2085
 
 
2086
 
 
2087
class ChrootedTestCase(TestCaseWithTransport):
 
2088
    """A support class that provides readonly urls outside the local namespace.
 
2089
 
 
2090
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2091
    is then we are chrooted already, if it is not then an HttpServer is used
 
2092
    for readonly urls.
 
2093
 
 
2094
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2095
                       be used without needed to redo it when a different 
 
2096
                       subclass is in use ?
 
2097
    """
 
2098
 
 
2099
    def setUp(self):
 
2100
        super(ChrootedTestCase, self).setUp()
 
2101
        if not self.vfs_transport_factory == MemoryServer:
 
2102
            self.transport_readonly_server = HttpServer
 
2103
 
 
2104
 
 
2105
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2106
                       random_order=False):
 
2107
    """Create a test suite by filtering another one.
 
2108
    
 
2109
    :param suite:           the source suite
 
2110
    :param pattern:         pattern that names must match
 
2111
    :param exclude_pattern: pattern that names must not match, if any
 
2112
    :param random_order:    if True, tests in the new suite will be put in
 
2113
                            random order
 
2114
    :returns: the newly created suite
 
2115
    """ 
 
2116
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2117
        random_order, False)
 
2118
 
 
2119
 
 
2120
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2121
                     random_order=False, append_rest=True):
 
2122
    """Create a test suite by sorting another one.
 
2123
    
 
2124
    :param suite:           the source suite
 
2125
    :param pattern:         pattern that names must match in order to go
 
2126
                            first in the new suite
 
2127
    :param exclude_pattern: pattern that names must not match, if any
 
2128
    :param random_order:    if True, tests in the new suite will be put in
 
2129
                            random order
 
2130
    :param append_rest:     if False, pattern is a strict filter and not
 
2131
                            just an ordering directive
 
2132
    :returns: the newly created suite
 
2133
    """ 
 
2134
    first = []
 
2135
    second = []
 
2136
    filter_re = re.compile(pattern)
 
2137
    if exclude_pattern is not None:
 
2138
        exclude_re = re.compile(exclude_pattern)
 
2139
    for test in iter_suite_tests(suite):
 
2140
        test_id = test.id()
 
2141
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2142
            if filter_re.search(test_id):
 
2143
                first.append(test)
 
2144
            elif append_rest:
 
2145
                second.append(test)
 
2146
    if random_order:
 
2147
        random.shuffle(first)
 
2148
        random.shuffle(second)
 
2149
    return TestUtil.TestSuite(first + second)
 
2150
 
 
2151
 
 
2152
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2153
              stop_on_failure=False,
 
2154
              transport=None, lsprof_timed=None, bench_history=None,
 
2155
              matching_tests_first=None,
 
2156
              list_only=False,
 
2157
              random_seed=None,
 
2158
              exclude_pattern=None,
61
2159
              ):
62
 
        if m not in MODULES_TO_TEST:
63
 
            MODULES_TO_TEST.append(m)
64
 
 
65
 
 
66
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
67
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
68
 
 
69
 
    print
70
 
 
71
 
    suite = TestSuite()
72
 
 
73
 
    # should also test bzrlib.merge_core, but they seem to be out of date with
74
 
    # the code.
75
 
 
76
 
 
77
 
    # XXX: python2.3's TestLoader() doesn't seem to find all the
78
 
    # tests; don't know why
 
2160
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2161
    if verbose:
 
2162
        verbosity = 2
 
2163
    else:
 
2164
        verbosity = 1
 
2165
    runner = TextTestRunner(stream=sys.stdout,
 
2166
                            descriptions=0,
 
2167
                            verbosity=verbosity,
 
2168
                            bench_history=bench_history,
 
2169
                            list_only=list_only,
 
2170
                            )
 
2171
    runner.stop_on_failure=stop_on_failure
 
2172
    # Initialise the random number generator and display the seed used.
 
2173
    # We convert the seed to a long to make it reuseable across invocations.
 
2174
    random_order = False
 
2175
    if random_seed is not None:
 
2176
        random_order = True
 
2177
        if random_seed == "now":
 
2178
            random_seed = long(time.time())
 
2179
        else:
 
2180
            # Convert the seed to a long if we can
 
2181
            try:
 
2182
                random_seed = long(random_seed)
 
2183
            except:
 
2184
                pass
 
2185
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2186
            (random_seed))
 
2187
        random.seed(random_seed)
 
2188
    # Customise the list of tests if requested
 
2189
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2190
        if matching_tests_first:
 
2191
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2192
                random_order)
 
2193
        else:
 
2194
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2195
                random_order)
 
2196
    result = runner.run(suite)
 
2197
    return result.wasSuccessful()
 
2198
 
 
2199
 
 
2200
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2201
             transport=None,
 
2202
             test_suite_factory=None,
 
2203
             lsprof_timed=None,
 
2204
             bench_history=None,
 
2205
             matching_tests_first=None,
 
2206
             list_only=False,
 
2207
             random_seed=None,
 
2208
             exclude_pattern=None):
 
2209
    """Run the whole test suite under the enhanced runner"""
 
2210
    # XXX: Very ugly way to do this...
 
2211
    # Disable warning about old formats because we don't want it to disturb
 
2212
    # any blackbox tests.
 
2213
    from bzrlib import repository
 
2214
    repository._deprecation_warning_done = True
 
2215
 
 
2216
    global default_transport
 
2217
    if transport is None:
 
2218
        transport = default_transport
 
2219
    old_transport = default_transport
 
2220
    default_transport = transport
 
2221
    try:
 
2222
        if test_suite_factory is None:
 
2223
            suite = test_suite()
 
2224
        else:
 
2225
            suite = test_suite_factory()
 
2226
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2227
                     stop_on_failure=stop_on_failure,
 
2228
                     transport=transport,
 
2229
                     lsprof_timed=lsprof_timed,
 
2230
                     bench_history=bench_history,
 
2231
                     matching_tests_first=matching_tests_first,
 
2232
                     list_only=list_only,
 
2233
                     random_seed=random_seed,
 
2234
                     exclude_pattern=exclude_pattern)
 
2235
    finally:
 
2236
        default_transport = old_transport
 
2237
 
 
2238
 
 
2239
def test_suite():
 
2240
    """Build and return TestSuite for the whole of bzrlib.
 
2241
    
 
2242
    This function can be replaced if you need to change the default test
 
2243
    suite on a global basis, but it is not encouraged.
 
2244
    """
 
2245
    testmod_names = [
 
2246
                   'bzrlib.tests.test_ancestry',
 
2247
                   'bzrlib.tests.test_annotate',
 
2248
                   'bzrlib.tests.test_api',
 
2249
                   'bzrlib.tests.test_atomicfile',
 
2250
                   'bzrlib.tests.test_bad_files',
 
2251
                   'bzrlib.tests.test_branch',
 
2252
                   'bzrlib.tests.test_branchbuilder',
 
2253
                   'bzrlib.tests.test_bugtracker',
 
2254
                   'bzrlib.tests.test_bundle',
 
2255
                   'bzrlib.tests.test_bzrdir',
 
2256
                   'bzrlib.tests.test_cache_utf8',
 
2257
                   'bzrlib.tests.test_commands',
 
2258
                   'bzrlib.tests.test_commit',
 
2259
                   'bzrlib.tests.test_commit_merge',
 
2260
                   'bzrlib.tests.test_config',
 
2261
                   'bzrlib.tests.test_conflicts',
 
2262
                   'bzrlib.tests.test_pack',
 
2263
                   'bzrlib.tests.test_counted_lock',
 
2264
                   'bzrlib.tests.test_decorators',
 
2265
                   'bzrlib.tests.test_delta',
 
2266
                   'bzrlib.tests.test_deprecated_graph',
 
2267
                   'bzrlib.tests.test_diff',
 
2268
                   'bzrlib.tests.test_dirstate',
 
2269
                   'bzrlib.tests.test_errors',
 
2270
                   'bzrlib.tests.test_escaped_store',
 
2271
                   'bzrlib.tests.test_extract',
 
2272
                   'bzrlib.tests.test_fetch',
 
2273
                   'bzrlib.tests.test_ftp_transport',
 
2274
                   'bzrlib.tests.test_generate_docs',
 
2275
                   'bzrlib.tests.test_generate_ids',
 
2276
                   'bzrlib.tests.test_globbing',
 
2277
                   'bzrlib.tests.test_gpg',
 
2278
                   'bzrlib.tests.test_graph',
 
2279
                   'bzrlib.tests.test_hashcache',
 
2280
                   'bzrlib.tests.test_help',
 
2281
                   'bzrlib.tests.test_hooks',
 
2282
                   'bzrlib.tests.test_http',
 
2283
                   'bzrlib.tests.test_http_response',
 
2284
                   'bzrlib.tests.test_https_ca_bundle',
 
2285
                   'bzrlib.tests.test_identitymap',
 
2286
                   'bzrlib.tests.test_ignores',
 
2287
                   'bzrlib.tests.test_index',
 
2288
                   'bzrlib.tests.test_info',
 
2289
                   'bzrlib.tests.test_inv',
 
2290
                   'bzrlib.tests.test_knit',
 
2291
                   'bzrlib.tests.test_lazy_import',
 
2292
                   'bzrlib.tests.test_lazy_regex',
 
2293
                   'bzrlib.tests.test_lockdir',
 
2294
                   'bzrlib.tests.test_lockable_files',
 
2295
                   'bzrlib.tests.test_log',
 
2296
                   'bzrlib.tests.test_lsprof',
 
2297
                   'bzrlib.tests.test_memorytree',
 
2298
                   'bzrlib.tests.test_merge',
 
2299
                   'bzrlib.tests.test_merge3',
 
2300
                   'bzrlib.tests.test_merge_core',
 
2301
                   'bzrlib.tests.test_merge_directive',
 
2302
                   'bzrlib.tests.test_missing',
 
2303
                   'bzrlib.tests.test_msgeditor',
 
2304
                   'bzrlib.tests.test_nonascii',
 
2305
                   'bzrlib.tests.test_options',
 
2306
                   'bzrlib.tests.test_osutils',
 
2307
                   'bzrlib.tests.test_osutils_encodings',
 
2308
                   'bzrlib.tests.test_patch',
 
2309
                   'bzrlib.tests.test_patches',
 
2310
                   'bzrlib.tests.test_permissions',
 
2311
                   'bzrlib.tests.test_plugins',
 
2312
                   'bzrlib.tests.test_progress',
 
2313
                   'bzrlib.tests.test_reconcile',
 
2314
                   'bzrlib.tests.test_registry',
 
2315
                   'bzrlib.tests.test_remote',
 
2316
                   'bzrlib.tests.test_repository',
 
2317
                   'bzrlib.tests.test_revert',
 
2318
                   'bzrlib.tests.test_revision',
 
2319
                   'bzrlib.tests.test_revisionnamespaces',
 
2320
                   'bzrlib.tests.test_revisiontree',
 
2321
                   'bzrlib.tests.test_rio',
 
2322
                   'bzrlib.tests.test_sampler',
 
2323
                   'bzrlib.tests.test_selftest',
 
2324
                   'bzrlib.tests.test_setup',
 
2325
                   'bzrlib.tests.test_sftp_transport',
 
2326
                   'bzrlib.tests.test_smart',
 
2327
                   'bzrlib.tests.test_smart_add',
 
2328
                   'bzrlib.tests.test_smart_transport',
 
2329
                   'bzrlib.tests.test_smtp_connection',
 
2330
                   'bzrlib.tests.test_source',
 
2331
                   'bzrlib.tests.test_ssh_transport',
 
2332
                   'bzrlib.tests.test_status',
 
2333
                   'bzrlib.tests.test_store',
 
2334
                   'bzrlib.tests.test_strace',
 
2335
                   'bzrlib.tests.test_subsume',
 
2336
                   'bzrlib.tests.test_symbol_versioning',
 
2337
                   'bzrlib.tests.test_tag',
 
2338
                   'bzrlib.tests.test_testament',
 
2339
                   'bzrlib.tests.test_textfile',
 
2340
                   'bzrlib.tests.test_textmerge',
 
2341
                   'bzrlib.tests.test_timestamp',
 
2342
                   'bzrlib.tests.test_trace',
 
2343
                   'bzrlib.tests.test_transactions',
 
2344
                   'bzrlib.tests.test_transform',
 
2345
                   'bzrlib.tests.test_transport',
 
2346
                   'bzrlib.tests.test_tree',
 
2347
                   'bzrlib.tests.test_treebuilder',
 
2348
                   'bzrlib.tests.test_tsort',
 
2349
                   'bzrlib.tests.test_tuned_gzip',
 
2350
                   'bzrlib.tests.test_ui',
 
2351
                   'bzrlib.tests.test_upgrade',
 
2352
                   'bzrlib.tests.test_urlutils',
 
2353
                   'bzrlib.tests.test_versionedfile',
 
2354
                   'bzrlib.tests.test_version',
 
2355
                   'bzrlib.tests.test_version_info',
 
2356
                   'bzrlib.tests.test_weave',
 
2357
                   'bzrlib.tests.test_whitebox',
 
2358
                   'bzrlib.tests.test_workingtree',
 
2359
                   'bzrlib.tests.test_workingtree_4',
 
2360
                   'bzrlib.tests.test_wsgi',
 
2361
                   'bzrlib.tests.test_xml',
 
2362
                   ]
 
2363
    test_transport_implementations = [
 
2364
        'bzrlib.tests.test_transport_implementations',
 
2365
        'bzrlib.tests.test_read_bundle',
 
2366
        ]
 
2367
    suite = TestUtil.TestSuite()
 
2368
    loader = TestUtil.TestLoader()
 
2369
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2370
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2371
    adapter = TransportTestProviderAdapter()
 
2372
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2373
    for package in packages_to_test():
 
2374
        suite.addTest(package.test_suite())
79
2375
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
82
 
    for m in (MODULES_TO_DOCTEST):
83
 
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
90
 
 
91
 
    return run_suite(suite, 'testbzr')
92
 
 
93
 
 
94
 
 
 
2376
        suite.addTest(loader.loadTestsFromModule(m))
 
2377
    for m in MODULES_TO_DOCTEST:
 
2378
        try:
 
2379
            suite.addTest(doctest.DocTestSuite(m))
 
2380
        except ValueError, e:
 
2381
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2382
            raise
 
2383
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2384
        if getattr(plugin, 'test_suite', None) is not None:
 
2385
            default_encoding = sys.getdefaultencoding()
 
2386
            try:
 
2387
                plugin_suite = plugin.test_suite()
 
2388
            except ImportError, e:
 
2389
                bzrlib.trace.warning(
 
2390
                    'Unable to test plugin "%s": %s', name, e)
 
2391
            else:
 
2392
                suite.addTest(plugin_suite)
 
2393
            if default_encoding != sys.getdefaultencoding():
 
2394
                bzrlib.trace.warning(
 
2395
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2396
                    sys.getdefaultencoding())
 
2397
                reload(sys)
 
2398
                sys.setdefaultencoding(default_encoding)
 
2399
    return suite
 
2400
 
 
2401
 
 
2402
def adapt_modules(mods_list, adapter, loader, suite):
 
2403
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2404
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2405
        suite.addTests(adapter.adapt(test))
 
2406
 
 
2407
 
 
2408
def _rmtree_temp_dir(dirname):
 
2409
    # If LANG=C we probably have created some bogus paths
 
2410
    # which rmtree(unicode) will fail to delete
 
2411
    # so make sure we are using rmtree(str) to delete everything
 
2412
    # except on win32, where rmtree(str) will fail
 
2413
    # since it doesn't have the property of byte-stream paths
 
2414
    # (they are either ascii or mbcs)
 
2415
    if sys.platform == 'win32':
 
2416
        # make sure we are using the unicode win32 api
 
2417
        dirname = unicode(dirname)
 
2418
    else:
 
2419
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2420
    try:
 
2421
        osutils.rmtree(dirname)
 
2422
    except OSError, e:
 
2423
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2424
            print >>sys.stderr, ('Permission denied: '
 
2425
                                 'unable to remove testing dir '
 
2426
                                 '%s' % os.path.basename(dirname))
 
2427
        else:
 
2428
            raise
 
2429
 
 
2430
 
 
2431
class Feature(object):
 
2432
    """An operating system Feature."""
 
2433
 
 
2434
    def __init__(self):
 
2435
        self._available = None
 
2436
 
 
2437
    def available(self):
 
2438
        """Is the feature available?
 
2439
 
 
2440
        :return: True if the feature is available.
 
2441
        """
 
2442
        if self._available is None:
 
2443
            self._available = self._probe()
 
2444
        return self._available
 
2445
 
 
2446
    def _probe(self):
 
2447
        """Implement this method in concrete features.
 
2448
 
 
2449
        :return: True if the feature is available.
 
2450
        """
 
2451
        raise NotImplementedError
 
2452
 
 
2453
    def __str__(self):
 
2454
        if getattr(self, 'feature_name', None):
 
2455
            return self.feature_name()
 
2456
        return self.__class__.__name__
 
2457
 
 
2458
 
 
2459
class TestScenarioApplier(object):
 
2460
    """A tool to apply scenarios to tests."""
 
2461
 
 
2462
    def adapt(self, test):
 
2463
        """Return a TestSuite containing a copy of test for each scenario."""
 
2464
        result = unittest.TestSuite()
 
2465
        for scenario in self.scenarios:
 
2466
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
2467
        return result
 
2468
 
 
2469
    def adapt_test_to_scenario(self, test, scenario):
 
2470
        """Copy test and apply scenario to it.
 
2471
 
 
2472
        :param test: A test to adapt.
 
2473
        :param scenario: A tuple describing the scenarion.
 
2474
            The first element of the tuple is the new test id.
 
2475
            The second element is a dict containing attributes to set on the
 
2476
            test.
 
2477
        :return: The adapted test.
 
2478
        """
 
2479
        from copy import deepcopy
 
2480
        new_test = deepcopy(test)
 
2481
        for name, value in scenario[1].items():
 
2482
            setattr(new_test, name, value)
 
2483
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
2484
        new_test.id = lambda: new_id
 
2485
        return new_test