~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-08-01 17:14:51 UTC
  • mfrom: (2662.1.1 bzr.easy_install)
  • Revision ID: pqm@pqm.ubuntu.com-20070801171451-en3tds1hzlru2j83
allow ``easy_install bzr`` runs without fatal errors. (#125521, bialix,
 r=mbp)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
from testsweet import TestBase, run_suite, InTempDir
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import atexit
 
30
import codecs
 
31
from cStringIO import StringIO
 
32
import difflib
 
33
import doctest
 
34
import errno
 
35
import logging
 
36
import os
 
37
from pprint import pformat
 
38
import random
 
39
import re
 
40
import shlex
 
41
import stat
 
42
from subprocess import Popen, PIPE
 
43
import sys
 
44
import tempfile
 
45
import unittest
 
46
import time
 
47
import warnings
 
48
 
 
49
 
 
50
from bzrlib import (
 
51
    bzrdir,
 
52
    debug,
 
53
    errors,
 
54
    memorytree,
 
55
    osutils,
 
56
    progress,
 
57
    ui,
 
58
    urlutils,
 
59
    workingtree,
 
60
    )
 
61
import bzrlib.branch
 
62
import bzrlib.commands
 
63
import bzrlib.timestamp
 
64
import bzrlib.export
 
65
import bzrlib.inventory
 
66
import bzrlib.iterablefile
 
67
import bzrlib.lockdir
 
68
try:
 
69
    import bzrlib.lsprof
 
70
except ImportError:
 
71
    # lsprof not available
 
72
    pass
 
73
from bzrlib.merge import merge_inner
 
74
import bzrlib.merge3
 
75
import bzrlib.osutils
 
76
import bzrlib.plugin
 
77
from bzrlib.revision import common_ancestor
 
78
import bzrlib.store
 
79
from bzrlib import symbol_versioning
 
80
from bzrlib.symbol_versioning import (
 
81
    deprecated_method,
 
82
    zero_eighteen,
 
83
    )
 
84
import bzrlib.trace
 
85
from bzrlib.transport import get_transport
 
86
import bzrlib.transport
 
87
from bzrlib.transport.local import LocalURLServer
 
88
from bzrlib.transport.memory import MemoryServer
 
89
from bzrlib.transport.readonly import ReadonlyServer
 
90
from bzrlib.trace import mutter, note
 
91
from bzrlib.tests import TestUtil
 
92
from bzrlib.tests.HttpServer import HttpServer
 
93
from bzrlib.tests.TestUtil import (
 
94
                          TestSuite,
 
95
                          TestLoader,
 
96
                          )
 
97
from bzrlib.tests.treeshape import build_tree_contents
 
98
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
99
 
 
100
# Mark this python module as being part of the implementation
 
101
# of unittest: this gives us better tracebacks where the last
 
102
# shown frame is the test code, not our assertXYZ.
 
103
__unittest = 1
 
104
 
 
105
default_transport = LocalURLServer
19
106
 
20
107
MODULES_TO_TEST = []
21
 
MODULES_TO_DOCTEST = []
22
 
 
23
 
def selftest():
24
 
    from unittest import TestLoader, TestSuite
25
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
26
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
27
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
28
 
 
29
 
    import bzrlib.selftest.whitebox
30
 
    import bzrlib.selftest.blackbox
31
 
    import bzrlib.selftest.versioning
32
 
    import bzrlib.selftest.testmerge3
33
 
    import bzrlib.selftest.testhashcache
34
 
    import bzrlib.selftest.testrevisionnamespaces
35
 
    import bzrlib.selftest.testbranch
36
 
    import bzrlib.selftest.teststatus
37
 
    import bzrlib.selftest.testinv
38
 
    import bzrlib.merge_core
39
 
    from doctest import DocTestSuite
40
 
    import os
41
 
    import shutil
42
 
    import time
43
 
    import sys
44
 
    import unittest
45
 
 
46
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
47
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
48
 
        if m not in MODULES_TO_DOCTEST:
49
 
            MODULES_TO_DOCTEST.append(m)
50
 
            
51
 
    for m in (bzrlib.selftest.whitebox,
52
 
              bzrlib.selftest.versioning,
53
 
              bzrlib.selftest.testinv,
54
 
              bzrlib.selftest.testmerge3,
55
 
              bzrlib.selftest.testhashcache,
56
 
              bzrlib.selftest.teststatus,
57
 
              bzrlib.selftest.blackbox,
58
 
              bzrlib.selftest.testhashcache,
59
 
              bzrlib.selftest.testrevisionnamespaces,
60
 
              bzrlib.selftest.testbranch,
 
108
MODULES_TO_DOCTEST = [
 
109
                      bzrlib.timestamp,
 
110
                      bzrlib.errors,
 
111
                      bzrlib.export,
 
112
                      bzrlib.inventory,
 
113
                      bzrlib.iterablefile,
 
114
                      bzrlib.lockdir,
 
115
                      bzrlib.merge3,
 
116
                      bzrlib.option,
 
117
                      bzrlib.store,
 
118
                      ]
 
119
 
 
120
 
 
121
def packages_to_test():
 
122
    """Return a list of packages to test.
 
123
 
 
124
    The packages are not globally imported so that import failures are
 
125
    triggered when running selftest, not when importing the command.
 
126
    """
 
127
    import bzrlib.doc
 
128
    import bzrlib.tests.blackbox
 
129
    import bzrlib.tests.branch_implementations
 
130
    import bzrlib.tests.bzrdir_implementations
 
131
    import bzrlib.tests.commands
 
132
    import bzrlib.tests.interrepository_implementations
 
133
    import bzrlib.tests.interversionedfile_implementations
 
134
    import bzrlib.tests.intertree_implementations
 
135
    import bzrlib.tests.per_lock
 
136
    import bzrlib.tests.repository_implementations
 
137
    import bzrlib.tests.revisionstore_implementations
 
138
    import bzrlib.tests.tree_implementations
 
139
    import bzrlib.tests.workingtree_implementations
 
140
    return [
 
141
            bzrlib.doc,
 
142
            bzrlib.tests.blackbox,
 
143
            bzrlib.tests.branch_implementations,
 
144
            bzrlib.tests.bzrdir_implementations,
 
145
            bzrlib.tests.commands,
 
146
            bzrlib.tests.interrepository_implementations,
 
147
            bzrlib.tests.interversionedfile_implementations,
 
148
            bzrlib.tests.intertree_implementations,
 
149
            bzrlib.tests.per_lock,
 
150
            bzrlib.tests.repository_implementations,
 
151
            bzrlib.tests.revisionstore_implementations,
 
152
            bzrlib.tests.tree_implementations,
 
153
            bzrlib.tests.workingtree_implementations,
 
154
            ]
 
155
 
 
156
 
 
157
class ExtendedTestResult(unittest._TextTestResult):
 
158
    """Accepts, reports and accumulates the results of running tests.
 
159
 
 
160
    Compared to this unittest version this class adds support for profiling,
 
161
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
162
    There are further-specialized subclasses for different types of display.
 
163
    """
 
164
 
 
165
    stop_early = False
 
166
    
 
167
    def __init__(self, stream, descriptions, verbosity,
 
168
                 bench_history=None,
 
169
                 num_tests=None,
 
170
                 ):
 
171
        """Construct new TestResult.
 
172
 
 
173
        :param bench_history: Optionally, a writable file object to accumulate
 
174
            benchmark results.
 
175
        """
 
176
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
177
        if bench_history is not None:
 
178
            from bzrlib.version import _get_bzr_source_tree
 
179
            src_tree = _get_bzr_source_tree()
 
180
            if src_tree:
 
181
                try:
 
182
                    revision_id = src_tree.get_parent_ids()[0]
 
183
                except IndexError:
 
184
                    # XXX: if this is a brand new tree, do the same as if there
 
185
                    # is no branch.
 
186
                    revision_id = ''
 
187
            else:
 
188
                # XXX: If there's no branch, what should we do?
 
189
                revision_id = ''
 
190
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
191
        self._bench_history = bench_history
 
192
        self.ui = ui.ui_factory
 
193
        self.num_tests = num_tests
 
194
        self.error_count = 0
 
195
        self.failure_count = 0
 
196
        self.known_failure_count = 0
 
197
        self.skip_count = 0
 
198
        self.unsupported = {}
 
199
        self.count = 0
 
200
        self._overall_start_time = time.time()
 
201
    
 
202
    def extractBenchmarkTime(self, testCase):
 
203
        """Add a benchmark time for the current test case."""
 
204
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
205
    
 
206
    def _elapsedTestTimeString(self):
 
207
        """Return a time string for the overall time the current test has taken."""
 
208
        return self._formatTime(time.time() - self._start_time)
 
209
 
 
210
    def _testTimeString(self):
 
211
        if self._benchmarkTime is not None:
 
212
            return "%s/%s" % (
 
213
                self._formatTime(self._benchmarkTime),
 
214
                self._elapsedTestTimeString())
 
215
        else:
 
216
            return "           %s" % self._elapsedTestTimeString()
 
217
 
 
218
    def _formatTime(self, seconds):
 
219
        """Format seconds as milliseconds with leading spaces."""
 
220
        # some benchmarks can take thousands of seconds to run, so we need 8
 
221
        # places
 
222
        return "%8dms" % (1000 * seconds)
 
223
 
 
224
    def _shortened_test_description(self, test):
 
225
        what = test.id()
 
226
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
 
227
        return what
 
228
 
 
229
    def startTest(self, test):
 
230
        unittest.TestResult.startTest(self, test)
 
231
        self.report_test_start(test)
 
232
        test.number = self.count
 
233
        self._recordTestStartTime()
 
234
 
 
235
    def _recordTestStartTime(self):
 
236
        """Record that a test has started."""
 
237
        self._start_time = time.time()
 
238
 
 
239
    def _cleanupLogFile(self, test):
 
240
        # We can only do this if we have one of our TestCases, not if
 
241
        # we have a doctest.
 
242
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
243
        if setKeepLogfile is not None:
 
244
            setKeepLogfile()
 
245
 
 
246
    def addError(self, test, err):
 
247
        self.extractBenchmarkTime(test)
 
248
        self._cleanupLogFile(test)
 
249
        if isinstance(err[1], TestSkipped):
 
250
            return self.addSkipped(test, err)
 
251
        elif isinstance(err[1], UnavailableFeature):
 
252
            return self.addNotSupported(test, err[1].args[0])
 
253
        unittest.TestResult.addError(self, test, err)
 
254
        self.error_count += 1
 
255
        self.report_error(test, err)
 
256
        if self.stop_early:
 
257
            self.stop()
 
258
 
 
259
    def addFailure(self, test, err):
 
260
        self._cleanupLogFile(test)
 
261
        self.extractBenchmarkTime(test)
 
262
        if isinstance(err[1], KnownFailure):
 
263
            return self.addKnownFailure(test, err)
 
264
        unittest.TestResult.addFailure(self, test, err)
 
265
        self.failure_count += 1
 
266
        self.report_failure(test, err)
 
267
        if self.stop_early:
 
268
            self.stop()
 
269
 
 
270
    def addKnownFailure(self, test, err):
 
271
        self.known_failure_count += 1
 
272
        self.report_known_failure(test, err)
 
273
 
 
274
    def addNotSupported(self, test, feature):
 
275
        self.unsupported.setdefault(str(feature), 0)
 
276
        self.unsupported[str(feature)] += 1
 
277
        self.report_unsupported(test, feature)
 
278
 
 
279
    def addSuccess(self, test):
 
280
        self.extractBenchmarkTime(test)
 
281
        if self._bench_history is not None:
 
282
            if self._benchmarkTime is not None:
 
283
                self._bench_history.write("%s %s\n" % (
 
284
                    self._formatTime(self._benchmarkTime),
 
285
                    test.id()))
 
286
        self.report_success(test)
 
287
        unittest.TestResult.addSuccess(self, test)
 
288
 
 
289
    def addSkipped(self, test, skip_excinfo):
 
290
        self.report_skip(test, skip_excinfo)
 
291
        # seems best to treat this as success from point-of-view of unittest
 
292
        # -- it actually does nothing so it barely matters :)
 
293
        try:
 
294
            test.tearDown()
 
295
        except KeyboardInterrupt:
 
296
            raise
 
297
        except:
 
298
            self.addError(test, test.__exc_info())
 
299
        else:
 
300
            unittest.TestResult.addSuccess(self, test)
 
301
 
 
302
    def printErrorList(self, flavour, errors):
 
303
        for test, err in errors:
 
304
            self.stream.writeln(self.separator1)
 
305
            self.stream.write("%s: " % flavour)
 
306
            self.stream.writeln(self.getDescription(test))
 
307
            if getattr(test, '_get_log', None) is not None:
 
308
                print >>self.stream
 
309
                print >>self.stream, \
 
310
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
311
                print >>self.stream, test._get_log()
 
312
                print >>self.stream, \
 
313
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
314
            self.stream.writeln(self.separator2)
 
315
            self.stream.writeln("%s" % err)
 
316
 
 
317
    def finished(self):
 
318
        pass
 
319
 
 
320
    def report_cleaning_up(self):
 
321
        pass
 
322
 
 
323
    def report_success(self, test):
 
324
        pass
 
325
 
 
326
 
 
327
class TextTestResult(ExtendedTestResult):
 
328
    """Displays progress and results of tests in text form"""
 
329
 
 
330
    def __init__(self, stream, descriptions, verbosity,
 
331
                 bench_history=None,
 
332
                 num_tests=None,
 
333
                 pb=None,
 
334
                 ):
 
335
        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
 
336
            bench_history, num_tests)
 
337
        if pb is None:
 
338
            self.pb = self.ui.nested_progress_bar()
 
339
            self._supplied_pb = False
 
340
        else:
 
341
            self.pb = pb
 
342
            self._supplied_pb = True
 
343
        self.pb.show_pct = False
 
344
        self.pb.show_spinner = False
 
345
        self.pb.show_eta = False,
 
346
        self.pb.show_count = False
 
347
        self.pb.show_bar = False
 
348
 
 
349
    def report_starting(self):
 
350
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
351
 
 
352
    def _progress_prefix_text(self):
 
353
        a = '[%d' % self.count
 
354
        if self.num_tests is not None:
 
355
            a +='/%d' % self.num_tests
 
356
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
357
        if self.error_count:
 
358
            a += ', %d errors' % self.error_count
 
359
        if self.failure_count:
 
360
            a += ', %d failed' % self.failure_count
 
361
        if self.known_failure_count:
 
362
            a += ', %d known failures' % self.known_failure_count
 
363
        if self.skip_count:
 
364
            a += ', %d skipped' % self.skip_count
 
365
        if self.unsupported:
 
366
            a += ', %d missing features' % len(self.unsupported)
 
367
        a += ']'
 
368
        return a
 
369
 
 
370
    def report_test_start(self, test):
 
371
        self.count += 1
 
372
        self.pb.update(
 
373
                self._progress_prefix_text()
 
374
                + ' ' 
 
375
                + self._shortened_test_description(test))
 
376
 
 
377
    def _test_description(self, test):
 
378
        return self._shortened_test_description(test)
 
379
 
 
380
    def report_error(self, test, err):
 
381
        self.pb.note('ERROR: %s\n    %s\n', 
 
382
            self._test_description(test),
 
383
            err[1],
 
384
            )
 
385
 
 
386
    def report_failure(self, test, err):
 
387
        self.pb.note('FAIL: %s\n    %s\n', 
 
388
            self._test_description(test),
 
389
            err[1],
 
390
            )
 
391
 
 
392
    def report_known_failure(self, test, err):
 
393
        self.pb.note('XFAIL: %s\n%s\n',
 
394
            self._test_description(test), err[1])
 
395
 
 
396
    def report_skip(self, test, skip_excinfo):
 
397
        self.skip_count += 1
 
398
        if False:
 
399
            # at the moment these are mostly not things we can fix
 
400
            # and so they just produce stipple; use the verbose reporter
 
401
            # to see them.
 
402
            if False:
 
403
                # show test and reason for skip
 
404
                self.pb.note('SKIP: %s\n    %s\n', 
 
405
                    self._shortened_test_description(test),
 
406
                    skip_excinfo[1])
 
407
            else:
 
408
                # since the class name was left behind in the still-visible
 
409
                # progress bar...
 
410
                self.pb.note('SKIP: %s', skip_excinfo[1])
 
411
 
 
412
    def report_unsupported(self, test, feature):
 
413
        """test cannot be run because feature is missing."""
 
414
                  
 
415
    def report_cleaning_up(self):
 
416
        self.pb.update('cleaning up...')
 
417
 
 
418
    def finished(self):
 
419
        if not self._supplied_pb:
 
420
            self.pb.finished()
 
421
 
 
422
 
 
423
class VerboseTestResult(ExtendedTestResult):
 
424
    """Produce long output, with one line per test run plus times"""
 
425
 
 
426
    def _ellipsize_to_right(self, a_string, final_width):
 
427
        """Truncate and pad a string, keeping the right hand side"""
 
428
        if len(a_string) > final_width:
 
429
            result = '...' + a_string[3-final_width:]
 
430
        else:
 
431
            result = a_string
 
432
        return result.ljust(final_width)
 
433
 
 
434
    def report_starting(self):
 
435
        self.stream.write('running %d tests...\n' % self.num_tests)
 
436
 
 
437
    def report_test_start(self, test):
 
438
        self.count += 1
 
439
        name = self._shortened_test_description(test)
 
440
        # width needs space for 6 char status, plus 1 for slash, plus 2 10-char
 
441
        # numbers, plus a trailing blank
 
442
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
443
        self.stream.write(self._ellipsize_to_right(name,
 
444
                          osutils.terminal_width()-30))
 
445
        self.stream.flush()
 
446
 
 
447
    def _error_summary(self, err):
 
448
        indent = ' ' * 4
 
449
        return '%s%s' % (indent, err[1])
 
450
 
 
451
    def report_error(self, test, err):
 
452
        self.stream.writeln('ERROR %s\n%s'
 
453
                % (self._testTimeString(),
 
454
                   self._error_summary(err)))
 
455
 
 
456
    def report_failure(self, test, err):
 
457
        self.stream.writeln(' FAIL %s\n%s'
 
458
                % (self._testTimeString(),
 
459
                   self._error_summary(err)))
 
460
 
 
461
    def report_known_failure(self, test, err):
 
462
        self.stream.writeln('XFAIL %s\n%s'
 
463
                % (self._testTimeString(),
 
464
                   self._error_summary(err)))
 
465
 
 
466
    def report_success(self, test):
 
467
        self.stream.writeln('   OK %s' % self._testTimeString())
 
468
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
469
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
470
            stats.pprint(file=self.stream)
 
471
        # flush the stream so that we get smooth output. This verbose mode is
 
472
        # used to show the output in PQM.
 
473
        self.stream.flush()
 
474
 
 
475
    def report_skip(self, test, skip_excinfo):
 
476
        self.skip_count += 1
 
477
        self.stream.writeln(' SKIP %s\n%s'
 
478
                % (self._testTimeString(),
 
479
                   self._error_summary(skip_excinfo)))
 
480
 
 
481
    def report_unsupported(self, test, feature):
 
482
        """test cannot be run because feature is missing."""
 
483
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
 
484
                %(self._testTimeString(), feature))
 
485
                  
 
486
 
 
487
 
 
488
class TextTestRunner(object):
 
489
    stop_on_failure = False
 
490
 
 
491
    def __init__(self,
 
492
                 stream=sys.stderr,
 
493
                 descriptions=0,
 
494
                 verbosity=1,
 
495
                 bench_history=None,
 
496
                 list_only=False
 
497
                 ):
 
498
        self.stream = unittest._WritelnDecorator(stream)
 
499
        self.descriptions = descriptions
 
500
        self.verbosity = verbosity
 
501
        self._bench_history = bench_history
 
502
        self.list_only = list_only
 
503
 
 
504
    def run(self, test):
 
505
        "Run the given test case or test suite."
 
506
        startTime = time.time()
 
507
        if self.verbosity == 1:
 
508
            result_class = TextTestResult
 
509
        elif self.verbosity >= 2:
 
510
            result_class = VerboseTestResult
 
511
        result = result_class(self.stream,
 
512
                              self.descriptions,
 
513
                              self.verbosity,
 
514
                              bench_history=self._bench_history,
 
515
                              num_tests=test.countTestCases(),
 
516
                              )
 
517
        result.stop_early = self.stop_on_failure
 
518
        result.report_starting()
 
519
        if self.list_only:
 
520
            if self.verbosity >= 2:
 
521
                self.stream.writeln("Listing tests only ...\n")
 
522
            run = 0
 
523
            for t in iter_suite_tests(test):
 
524
                self.stream.writeln("%s" % (t.id()))
 
525
                run += 1
 
526
            actionTaken = "Listed"
 
527
        else: 
 
528
            test.run(result)
 
529
            run = result.testsRun
 
530
            actionTaken = "Ran"
 
531
        stopTime = time.time()
 
532
        timeTaken = stopTime - startTime
 
533
        result.printErrors()
 
534
        self.stream.writeln(result.separator2)
 
535
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
 
536
                            run, run != 1 and "s" or "", timeTaken))
 
537
        self.stream.writeln()
 
538
        if not result.wasSuccessful():
 
539
            self.stream.write("FAILED (")
 
540
            failed, errored = map(len, (result.failures, result.errors))
 
541
            if failed:
 
542
                self.stream.write("failures=%d" % failed)
 
543
            if errored:
 
544
                if failed: self.stream.write(", ")
 
545
                self.stream.write("errors=%d" % errored)
 
546
            if result.known_failure_count:
 
547
                if failed or errored: self.stream.write(", ")
 
548
                self.stream.write("known_failure_count=%d" %
 
549
                    result.known_failure_count)
 
550
            self.stream.writeln(")")
 
551
        else:
 
552
            if result.known_failure_count:
 
553
                self.stream.writeln("OK (known_failures=%d)" %
 
554
                    result.known_failure_count)
 
555
            else:
 
556
                self.stream.writeln("OK")
 
557
        if result.skip_count > 0:
 
558
            skipped = result.skip_count
 
559
            self.stream.writeln('%d test%s skipped' %
 
560
                                (skipped, skipped != 1 and "s" or ""))
 
561
        if result.unsupported:
 
562
            for feature, count in sorted(result.unsupported.items()):
 
563
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
 
564
                    (feature, count))
 
565
        result.finished()
 
566
        return result
 
567
 
 
568
 
 
569
def iter_suite_tests(suite):
 
570
    """Return all tests in a suite, recursing through nested suites"""
 
571
    for item in suite._tests:
 
572
        if isinstance(item, unittest.TestCase):
 
573
            yield item
 
574
        elif isinstance(item, unittest.TestSuite):
 
575
            for r in iter_suite_tests(item):
 
576
                yield r
 
577
        else:
 
578
            raise Exception('unknown object %r inside test suite %r'
 
579
                            % (item, suite))
 
580
 
 
581
 
 
582
class TestSkipped(Exception):
 
583
    """Indicates that a test was intentionally skipped, rather than failing."""
 
584
 
 
585
 
 
586
class KnownFailure(AssertionError):
 
587
    """Indicates that a test failed in a precisely expected manner.
 
588
 
 
589
    Such failures dont block the whole test suite from passing because they are
 
590
    indicators of partially completed code or of future work. We have an
 
591
    explicit error for them so that we can ensure that they are always visible:
 
592
    KnownFailures are always shown in the output of bzr selftest.
 
593
    """
 
594
 
 
595
 
 
596
class UnavailableFeature(Exception):
 
597
    """A feature required for this test was not available.
 
598
 
 
599
    The feature should be used to construct the exception.
 
600
    """
 
601
 
 
602
 
 
603
class CommandFailed(Exception):
 
604
    pass
 
605
 
 
606
 
 
607
class StringIOWrapper(object):
 
608
    """A wrapper around cStringIO which just adds an encoding attribute.
 
609
    
 
610
    Internally we can check sys.stdout to see what the output encoding
 
611
    should be. However, cStringIO has no encoding attribute that we can
 
612
    set. So we wrap it instead.
 
613
    """
 
614
    encoding='ascii'
 
615
    _cstring = None
 
616
 
 
617
    def __init__(self, s=None):
 
618
        if s is not None:
 
619
            self.__dict__['_cstring'] = StringIO(s)
 
620
        else:
 
621
            self.__dict__['_cstring'] = StringIO()
 
622
 
 
623
    def __getattr__(self, name, getattr=getattr):
 
624
        return getattr(self.__dict__['_cstring'], name)
 
625
 
 
626
    def __setattr__(self, name, val):
 
627
        if name == 'encoding':
 
628
            self.__dict__['encoding'] = val
 
629
        else:
 
630
            return setattr(self._cstring, name, val)
 
631
 
 
632
 
 
633
class TestUIFactory(ui.CLIUIFactory):
 
634
    """A UI Factory for testing.
 
635
 
 
636
    Hide the progress bar but emit note()s.
 
637
    Redirect stdin.
 
638
    Allows get_password to be tested without real tty attached.
 
639
    """
 
640
 
 
641
    def __init__(self,
 
642
                 stdout=None,
 
643
                 stderr=None,
 
644
                 stdin=None):
 
645
        super(TestUIFactory, self).__init__()
 
646
        if stdin is not None:
 
647
            # We use a StringIOWrapper to be able to test various
 
648
            # encodings, but the user is still responsible to
 
649
            # encode the string and to set the encoding attribute
 
650
            # of StringIOWrapper.
 
651
            self.stdin = StringIOWrapper(stdin)
 
652
        if stdout is None:
 
653
            self.stdout = sys.stdout
 
654
        else:
 
655
            self.stdout = stdout
 
656
        if stderr is None:
 
657
            self.stderr = sys.stderr
 
658
        else:
 
659
            self.stderr = stderr
 
660
 
 
661
    def clear(self):
 
662
        """See progress.ProgressBar.clear()."""
 
663
 
 
664
    def clear_term(self):
 
665
        """See progress.ProgressBar.clear_term()."""
 
666
 
 
667
    def clear_term(self):
 
668
        """See progress.ProgressBar.clear_term()."""
 
669
 
 
670
    def finished(self):
 
671
        """See progress.ProgressBar.finished()."""
 
672
 
 
673
    def note(self, fmt_string, *args, **kwargs):
 
674
        """See progress.ProgressBar.note()."""
 
675
        self.stdout.write((fmt_string + "\n") % args)
 
676
 
 
677
    def progress_bar(self):
 
678
        return self
 
679
 
 
680
    def nested_progress_bar(self):
 
681
        return self
 
682
 
 
683
    def update(self, message, count=None, total=None):
 
684
        """See progress.ProgressBar.update()."""
 
685
 
 
686
    def get_non_echoed_password(self, prompt):
 
687
        """Get password from stdin without trying to handle the echo mode"""
 
688
        if prompt:
 
689
            self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
 
690
        password = self.stdin.readline()
 
691
        if not password:
 
692
            raise EOFError
 
693
        if password[-1] == '\n':
 
694
            password = password[:-1]
 
695
        return password
 
696
 
 
697
 
 
698
class TestCase(unittest.TestCase):
 
699
    """Base class for bzr unit tests.
 
700
    
 
701
    Tests that need access to disk resources should subclass 
 
702
    TestCaseInTempDir not TestCase.
 
703
 
 
704
    Error and debug log messages are redirected from their usual
 
705
    location into a temporary file, the contents of which can be
 
706
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
707
    so that it can also capture file IO.  When the test completes this file
 
708
    is read into memory and removed from disk.
 
709
       
 
710
    There are also convenience functions to invoke bzr's command-line
 
711
    routine, and to build and check bzr trees.
 
712
   
 
713
    In addition to the usual method of overriding tearDown(), this class also
 
714
    allows subclasses to register functions into the _cleanups list, which is
 
715
    run in order as the object is torn down.  It's less likely this will be
 
716
    accidentally overlooked.
 
717
    """
 
718
 
 
719
    _log_file_name = None
 
720
    _log_contents = ''
 
721
    _keep_log_file = False
 
722
    # record lsprof data when performing benchmark calls.
 
723
    _gather_lsprof_in_benchmarks = False
 
724
 
 
725
    def __init__(self, methodName='testMethod'):
 
726
        super(TestCase, self).__init__(methodName)
 
727
        self._cleanups = []
 
728
 
 
729
    def setUp(self):
 
730
        unittest.TestCase.setUp(self)
 
731
        self._cleanEnvironment()
 
732
        bzrlib.trace.disable_default_logging()
 
733
        self._silenceUI()
 
734
        self._startLogFile()
 
735
        self._benchcalls = []
 
736
        self._benchtime = None
 
737
        self._clear_hooks()
 
738
        self._clear_debug_flags()
 
739
 
 
740
    def _clear_debug_flags(self):
 
741
        """Prevent externally set debug flags affecting tests.
 
742
        
 
743
        Tests that want to use debug flags can just set them in the
 
744
        debug_flags set during setup/teardown.
 
745
        """
 
746
        self._preserved_debug_flags = set(debug.debug_flags)
 
747
        debug.debug_flags.clear()
 
748
        self.addCleanup(self._restore_debug_flags)
 
749
 
 
750
    def _clear_hooks(self):
 
751
        # prevent hooks affecting tests
 
752
        import bzrlib.branch
 
753
        import bzrlib.smart.server
 
754
        self._preserved_hooks = {
 
755
            bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
 
756
            bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
 
757
            }
 
758
        self.addCleanup(self._restoreHooks)
 
759
        # reset all hooks to an empty instance of the appropriate type
 
760
        bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
 
761
        bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
 
762
 
 
763
    def _silenceUI(self):
 
764
        """Turn off UI for duration of test"""
 
765
        # by default the UI is off; tests can turn it on if they want it.
 
766
        saved = ui.ui_factory
 
767
        def _restore():
 
768
            ui.ui_factory = saved
 
769
        ui.ui_factory = ui.SilentUIFactory()
 
770
        self.addCleanup(_restore)
 
771
 
 
772
    def _ndiff_strings(self, a, b):
 
773
        """Return ndiff between two strings containing lines.
 
774
        
 
775
        A trailing newline is added if missing to make the strings
 
776
        print properly."""
 
777
        if b and b[-1] != '\n':
 
778
            b += '\n'
 
779
        if a and a[-1] != '\n':
 
780
            a += '\n'
 
781
        difflines = difflib.ndiff(a.splitlines(True),
 
782
                                  b.splitlines(True),
 
783
                                  linejunk=lambda x: False,
 
784
                                  charjunk=lambda x: False)
 
785
        return ''.join(difflines)
 
786
 
 
787
    def assertEqual(self, a, b, message=''):
 
788
        try:
 
789
            if a == b:
 
790
                return
 
791
        except UnicodeError, e:
 
792
            # If we can't compare without getting a UnicodeError, then
 
793
            # obviously they are different
 
794
            mutter('UnicodeError: %s', e)
 
795
        if message:
 
796
            message += '\n'
 
797
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
 
798
            % (message,
 
799
               pformat(a), pformat(b)))
 
800
 
 
801
    assertEquals = assertEqual
 
802
 
 
803
    def assertEqualDiff(self, a, b, message=None):
 
804
        """Assert two texts are equal, if not raise an exception.
 
805
        
 
806
        This is intended for use with multi-line strings where it can 
 
807
        be hard to find the differences by eye.
 
808
        """
 
809
        # TODO: perhaps override assertEquals to call this for strings?
 
810
        if a == b:
 
811
            return
 
812
        if message is None:
 
813
            message = "texts not equal:\n"
 
814
        raise AssertionError(message +
 
815
                             self._ndiff_strings(a, b))
 
816
        
 
817
    def assertEqualMode(self, mode, mode_test):
 
818
        self.assertEqual(mode, mode_test,
 
819
                         'mode mismatch %o != %o' % (mode, mode_test))
 
820
 
 
821
    def assertPositive(self, val):
 
822
        """Assert that val is greater than 0."""
 
823
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
 
824
 
 
825
    def assertNegative(self, val):
 
826
        """Assert that val is less than 0."""
 
827
        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
 
828
 
 
829
    def assertStartsWith(self, s, prefix):
 
830
        if not s.startswith(prefix):
 
831
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
832
 
 
833
    def assertEndsWith(self, s, suffix):
 
834
        """Asserts that s ends with suffix."""
 
835
        if not s.endswith(suffix):
 
836
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
837
 
 
838
    def assertContainsRe(self, haystack, needle_re):
 
839
        """Assert that a contains something matching a regular expression."""
 
840
        if not re.search(needle_re, haystack):
 
841
            if '\n' in haystack or len(haystack) > 60:
 
842
                # a long string, format it in a more readable way
 
843
                raise AssertionError(
 
844
                        'pattern "%s" not found in\n"""\\\n%s"""\n'
 
845
                        % (needle_re, haystack))
 
846
            else:
 
847
                raise AssertionError('pattern "%s" not found in "%s"'
 
848
                        % (needle_re, haystack))
 
849
 
 
850
    def assertNotContainsRe(self, haystack, needle_re):
 
851
        """Assert that a does not match a regular expression"""
 
852
        if re.search(needle_re, haystack):
 
853
            raise AssertionError('pattern "%s" found in "%s"'
 
854
                    % (needle_re, haystack))
 
855
 
 
856
    def assertSubset(self, sublist, superlist):
 
857
        """Assert that every entry in sublist is present in superlist."""
 
858
        missing = []
 
859
        for entry in sublist:
 
860
            if entry not in superlist:
 
861
                missing.append(entry)
 
862
        if len(missing) > 0:
 
863
            raise AssertionError("value(s) %r not present in container %r" % 
 
864
                                 (missing, superlist))
 
865
 
 
866
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
867
        """Fail unless excClass is raised when the iterator from func is used.
 
868
        
 
869
        Many functions can return generators this makes sure
 
870
        to wrap them in a list() call to make sure the whole generator
 
871
        is run, and that the proper exception is raised.
 
872
        """
 
873
        try:
 
874
            list(func(*args, **kwargs))
 
875
        except excClass:
 
876
            return
 
877
        else:
 
878
            if getattr(excClass,'__name__', None) is not None:
 
879
                excName = excClass.__name__
 
880
            else:
 
881
                excName = str(excClass)
 
882
            raise self.failureException, "%s not raised" % excName
 
883
 
 
884
    def assertRaises(self, excClass, callableObj, *args, **kwargs):
 
885
        """Assert that a callable raises a particular exception.
 
886
 
 
887
        :param excClass: As for the except statement, this may be either an
 
888
            exception class, or a tuple of classes.
 
889
        :param callableObj: A callable, will be passed ``*args`` and
 
890
            ``**kwargs``.
 
891
 
 
892
        Returns the exception so that you can examine it.
 
893
        """
 
894
        try:
 
895
            callableObj(*args, **kwargs)
 
896
        except excClass, e:
 
897
            return e
 
898
        else:
 
899
            if getattr(excClass,'__name__', None) is not None:
 
900
                excName = excClass.__name__
 
901
            else:
 
902
                # probably a tuple
 
903
                excName = str(excClass)
 
904
            raise self.failureException, "%s not raised" % excName
 
905
 
 
906
    def assertIs(self, left, right, message=None):
 
907
        if not (left is right):
 
908
            if message is not None:
 
909
                raise AssertionError(message)
 
910
            else:
 
911
                raise AssertionError("%r is not %r." % (left, right))
 
912
 
 
913
    def assertIsNot(self, left, right, message=None):
 
914
        if (left is right):
 
915
            if message is not None:
 
916
                raise AssertionError(message)
 
917
            else:
 
918
                raise AssertionError("%r is %r." % (left, right))
 
919
 
 
920
    def assertTransportMode(self, transport, path, mode):
 
921
        """Fail if a path does not have mode mode.
 
922
        
 
923
        If modes are not supported on this transport, the assertion is ignored.
 
924
        """
 
925
        if not transport._can_roundtrip_unix_modebits():
 
926
            return
 
927
        path_stat = transport.stat(path)
 
928
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
929
        self.assertEqual(mode, actual_mode,
 
930
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
931
 
 
932
    def assertIsInstance(self, obj, kls):
 
933
        """Fail if obj is not an instance of kls"""
 
934
        if not isinstance(obj, kls):
 
935
            self.fail("%r is an instance of %s rather than %s" % (
 
936
                obj, obj.__class__, kls))
 
937
 
 
938
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
939
        """Invoke a test, expecting it to fail for the given reason.
 
940
 
 
941
        This is for assertions that ought to succeed, but currently fail.
 
942
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
943
        about the failure you're expecting.  If a new bug is introduced,
 
944
        AssertionError should be raised, not KnownFailure.
 
945
 
 
946
        Frequently, expectFailure should be followed by an opposite assertion.
 
947
        See example below.
 
948
 
 
949
        Intended to be used with a callable that raises AssertionError as the
 
950
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
951
 
 
952
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
953
        test succeeds.
 
954
 
 
955
        example usage::
 
956
 
 
957
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
958
                             dynamic_val)
 
959
          self.assertEqual(42, dynamic_val)
 
960
 
 
961
          This means that a dynamic_val of 54 will cause the test to raise
 
962
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
963
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
964
          than 54 or 42 will cause an AssertionError.
 
965
        """
 
966
        try:
 
967
            assertion(*args, **kwargs)
 
968
        except AssertionError:
 
969
            raise KnownFailure(reason)
 
970
        else:
 
971
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
972
 
 
973
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
974
        """A helper for callDeprecated and applyDeprecated.
 
975
 
 
976
        :param a_callable: A callable to call.
 
977
        :param args: The positional arguments for the callable
 
978
        :param kwargs: The keyword arguments for the callable
 
979
        :return: A tuple (warnings, result). result is the result of calling
 
980
            a_callable(``*args``, ``**kwargs``).
 
981
        """
 
982
        local_warnings = []
 
983
        def capture_warnings(msg, cls=None, stacklevel=None):
 
984
            # we've hooked into a deprecation specific callpath,
 
985
            # only deprecations should getting sent via it.
 
986
            self.assertEqual(cls, DeprecationWarning)
 
987
            local_warnings.append(msg)
 
988
        original_warning_method = symbol_versioning.warn
 
989
        symbol_versioning.set_warning_method(capture_warnings)
 
990
        try:
 
991
            result = a_callable(*args, **kwargs)
 
992
        finally:
 
993
            symbol_versioning.set_warning_method(original_warning_method)
 
994
        return (local_warnings, result)
 
995
 
 
996
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
997
        """Call a deprecated callable without warning the user.
 
998
 
 
999
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1000
        not other callers that go direct to the warning module.
 
1001
 
 
1002
        :param deprecation_format: The deprecation format that the callable
 
1003
            should have been deprecated with. This is the same type as the
 
1004
            parameter to deprecated_method/deprecated_function. If the
 
1005
            callable is not deprecated with this format, an assertion error
 
1006
            will be raised.
 
1007
        :param a_callable: A callable to call. This may be a bound method or
 
1008
            a regular function. It will be called with ``*args`` and
 
1009
            ``**kwargs``.
 
1010
        :param args: The positional arguments for the callable
 
1011
        :param kwargs: The keyword arguments for the callable
 
1012
        :return: The result of a_callable(``*args``, ``**kwargs``)
 
1013
        """
 
1014
        call_warnings, result = self._capture_warnings(a_callable,
 
1015
            *args, **kwargs)
 
1016
        expected_first_warning = symbol_versioning.deprecation_string(
 
1017
            a_callable, deprecation_format)
 
1018
        if len(call_warnings) == 0:
 
1019
            self.fail("No deprecation warning generated by call to %s" %
 
1020
                a_callable)
 
1021
        self.assertEqual(expected_first_warning, call_warnings[0])
 
1022
        return result
 
1023
 
 
1024
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
1025
        """Assert that a callable is deprecated in a particular way.
 
1026
 
 
1027
        This is a very precise test for unusual requirements. The 
 
1028
        applyDeprecated helper function is probably more suited for most tests
 
1029
        as it allows you to simply specify the deprecation format being used
 
1030
        and will ensure that that is issued for the function being called.
 
1031
 
 
1032
        Note that this only captures warnings raised by symbol_versioning.warn,
 
1033
        not other callers that go direct to the warning module.
 
1034
 
 
1035
        :param expected: a list of the deprecation warnings expected, in order
 
1036
        :param callable: The callable to call
 
1037
        :param args: The positional arguments for the callable
 
1038
        :param kwargs: The keyword arguments for the callable
 
1039
        """
 
1040
        call_warnings, result = self._capture_warnings(callable,
 
1041
            *args, **kwargs)
 
1042
        self.assertEqual(expected, call_warnings)
 
1043
        return result
 
1044
 
 
1045
    def _startLogFile(self):
 
1046
        """Send bzr and test log messages to a temporary file.
 
1047
 
 
1048
        The file is removed as the test is torn down.
 
1049
        """
 
1050
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1051
        self._log_file = os.fdopen(fileno, 'w+')
 
1052
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
 
1053
        self._log_file_name = name
 
1054
        self.addCleanup(self._finishLogFile)
 
1055
 
 
1056
    def _finishLogFile(self):
 
1057
        """Finished with the log file.
 
1058
 
 
1059
        Close the file and delete it, unless setKeepLogfile was called.
 
1060
        """
 
1061
        if self._log_file is None:
 
1062
            return
 
1063
        bzrlib.trace.disable_test_log(self._log_nonce)
 
1064
        self._log_file.close()
 
1065
        self._log_file = None
 
1066
        if not self._keep_log_file:
 
1067
            os.remove(self._log_file_name)
 
1068
            self._log_file_name = None
 
1069
 
 
1070
    def setKeepLogfile(self):
 
1071
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1072
        self._keep_log_file = True
 
1073
 
 
1074
    def addCleanup(self, callable):
 
1075
        """Arrange to run a callable when this case is torn down.
 
1076
 
 
1077
        Callables are run in the reverse of the order they are registered, 
 
1078
        ie last-in first-out.
 
1079
        """
 
1080
        if callable in self._cleanups:
 
1081
            raise ValueError("cleanup function %r already registered on %s" 
 
1082
                    % (callable, self))
 
1083
        self._cleanups.append(callable)
 
1084
 
 
1085
    def _cleanEnvironment(self):
 
1086
        new_env = {
 
1087
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
1088
            'HOME': os.getcwd(),
 
1089
            'APPDATA': None,  # bzr now use Win32 API and don't rely on APPDATA
 
1090
            'BZR_EMAIL': None,
 
1091
            'BZREMAIL': None, # may still be present in the environment
 
1092
            'EMAIL': None,
 
1093
            'BZR_PROGRESS_BAR': None,
 
1094
            # SSH Agent
 
1095
            'SSH_AUTH_SOCK': None,
 
1096
            # Proxies
 
1097
            'http_proxy': None,
 
1098
            'HTTP_PROXY': None,
 
1099
            'https_proxy': None,
 
1100
            'HTTPS_PROXY': None,
 
1101
            'no_proxy': None,
 
1102
            'NO_PROXY': None,
 
1103
            'all_proxy': None,
 
1104
            'ALL_PROXY': None,
 
1105
            # Nobody cares about these ones AFAIK. So far at
 
1106
            # least. If you do (care), please update this comment
 
1107
            # -- vila 20061212
 
1108
            'ftp_proxy': None,
 
1109
            'FTP_PROXY': None,
 
1110
            'BZR_REMOTE_PATH': None,
 
1111
        }
 
1112
        self.__old_env = {}
 
1113
        self.addCleanup(self._restoreEnvironment)
 
1114
        for name, value in new_env.iteritems():
 
1115
            self._captureVar(name, value)
 
1116
 
 
1117
    def _captureVar(self, name, newvalue):
 
1118
        """Set an environment variable, and reset it when finished."""
 
1119
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
1120
 
 
1121
    def _restore_debug_flags(self):
 
1122
        debug.debug_flags.clear()
 
1123
        debug.debug_flags.update(self._preserved_debug_flags)
 
1124
 
 
1125
    def _restoreEnvironment(self):
 
1126
        for name, value in self.__old_env.iteritems():
 
1127
            osutils.set_or_unset_env(name, value)
 
1128
 
 
1129
    def _restoreHooks(self):
 
1130
        for klass, hooks in self._preserved_hooks.items():
 
1131
            setattr(klass, 'hooks', hooks)
 
1132
 
 
1133
    def knownFailure(self, reason):
 
1134
        """This test has failed for some known reason."""
 
1135
        raise KnownFailure(reason)
 
1136
 
 
1137
    def run(self, result=None):
 
1138
        if result is None: result = self.defaultTestResult()
 
1139
        for feature in getattr(self, '_test_needs_features', []):
 
1140
            if not feature.available():
 
1141
                result.startTest(self)
 
1142
                if getattr(result, 'addNotSupported', None):
 
1143
                    result.addNotSupported(self, feature)
 
1144
                else:
 
1145
                    result.addSuccess(self)
 
1146
                result.stopTest(self)
 
1147
                return
 
1148
        return unittest.TestCase.run(self, result)
 
1149
 
 
1150
    def tearDown(self):
 
1151
        self._runCleanups()
 
1152
        unittest.TestCase.tearDown(self)
 
1153
 
 
1154
    def time(self, callable, *args, **kwargs):
 
1155
        """Run callable and accrue the time it takes to the benchmark time.
 
1156
        
 
1157
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
1158
        this will cause lsprofile statistics to be gathered and stored in
 
1159
        self._benchcalls.
 
1160
        """
 
1161
        if self._benchtime is None:
 
1162
            self._benchtime = 0
 
1163
        start = time.time()
 
1164
        try:
 
1165
            if not self._gather_lsprof_in_benchmarks:
 
1166
                return callable(*args, **kwargs)
 
1167
            else:
 
1168
                # record this benchmark
 
1169
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
1170
                stats.sort()
 
1171
                self._benchcalls.append(((callable, args, kwargs), stats))
 
1172
                return ret
 
1173
        finally:
 
1174
            self._benchtime += time.time() - start
 
1175
 
 
1176
    def _runCleanups(self):
 
1177
        """Run registered cleanup functions. 
 
1178
 
 
1179
        This should only be called from TestCase.tearDown.
 
1180
        """
 
1181
        # TODO: Perhaps this should keep running cleanups even if 
 
1182
        # one of them fails?
 
1183
 
 
1184
        # Actually pop the cleanups from the list so tearDown running
 
1185
        # twice is safe (this happens for skipped tests).
 
1186
        while self._cleanups:
 
1187
            self._cleanups.pop()()
 
1188
 
 
1189
    def log(self, *args):
 
1190
        mutter(*args)
 
1191
 
 
1192
    def _get_log(self, keep_log_file=False):
 
1193
        """Return as a string the log for this test. If the file is still
 
1194
        on disk and keep_log_file=False, delete the log file and store the
 
1195
        content in self._log_contents."""
 
1196
        # flush the log file, to get all content
 
1197
        import bzrlib.trace
 
1198
        bzrlib.trace._trace_file.flush()
 
1199
        if self._log_contents:
 
1200
            return self._log_contents
 
1201
        if self._log_file_name is not None:
 
1202
            logfile = open(self._log_file_name)
 
1203
            try:
 
1204
                log_contents = logfile.read()
 
1205
            finally:
 
1206
                logfile.close()
 
1207
            if not keep_log_file:
 
1208
                self._log_contents = log_contents
 
1209
                try:
 
1210
                    os.remove(self._log_file_name)
 
1211
                except OSError, e:
 
1212
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1213
                        print >>sys.stderr, ('Unable to delete log file '
 
1214
                                             ' %r' % self._log_file_name)
 
1215
                    else:
 
1216
                        raise
 
1217
            return log_contents
 
1218
        else:
 
1219
            return "DELETED log file to reduce memory footprint"
 
1220
 
 
1221
    @deprecated_method(zero_eighteen)
 
1222
    def capture(self, cmd, retcode=0):
 
1223
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
1224
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
1225
 
 
1226
    def requireFeature(self, feature):
 
1227
        """This test requires a specific feature is available.
 
1228
 
 
1229
        :raises UnavailableFeature: When feature is not available.
 
1230
        """
 
1231
        if not feature.available():
 
1232
            raise UnavailableFeature(feature)
 
1233
 
 
1234
    @deprecated_method(zero_eighteen)
 
1235
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
1236
                         working_dir=None):
 
1237
        """Invoke bzr and return (stdout, stderr).
 
1238
 
 
1239
        Don't call this method, just use run_bzr() which is equivalent.
 
1240
 
 
1241
        :param argv: Arguments to invoke bzr.  This may be either a 
 
1242
            single string, in which case it is split by shlex into words, 
 
1243
            or a list of arguments.
 
1244
        :param retcode: Expected return code, or None for don't-care.
 
1245
        :param encoding: Encoding for sys.stdout and sys.stderr
 
1246
        :param stdin: A string to be used as stdin for the command.
 
1247
        :param working_dir: Change to this directory before running
 
1248
        """
 
1249
        return self._run_bzr_autosplit(argv, retcode=retcode,
 
1250
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1251
                )
 
1252
 
 
1253
    def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
 
1254
            working_dir):
 
1255
        """Run bazaar command line, splitting up a string command line."""
 
1256
        if isinstance(args, basestring):
 
1257
            args = list(shlex.split(args))
 
1258
        return self._run_bzr_core(args, retcode=retcode,
 
1259
                encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1260
                )
 
1261
 
 
1262
    def _run_bzr_core(self, args, retcode, encoding, stdin,
 
1263
            working_dir):
 
1264
        if encoding is None:
 
1265
            encoding = bzrlib.user_encoding
 
1266
        stdout = StringIOWrapper()
 
1267
        stderr = StringIOWrapper()
 
1268
        stdout.encoding = encoding
 
1269
        stderr.encoding = encoding
 
1270
 
 
1271
        self.log('run bzr: %r', args)
 
1272
        # FIXME: don't call into logging here
 
1273
        handler = logging.StreamHandler(stderr)
 
1274
        handler.setLevel(logging.INFO)
 
1275
        logger = logging.getLogger('')
 
1276
        logger.addHandler(handler)
 
1277
        old_ui_factory = ui.ui_factory
 
1278
        ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
 
1279
 
 
1280
        cwd = None
 
1281
        if working_dir is not None:
 
1282
            cwd = osutils.getcwd()
 
1283
            os.chdir(working_dir)
 
1284
 
 
1285
        try:
 
1286
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1287
                stdout, stderr,
 
1288
                bzrlib.commands.run_bzr_catch_errors,
 
1289
                args)
 
1290
        finally:
 
1291
            logger.removeHandler(handler)
 
1292
            ui.ui_factory = old_ui_factory
 
1293
            if cwd is not None:
 
1294
                os.chdir(cwd)
 
1295
 
 
1296
        out = stdout.getvalue()
 
1297
        err = stderr.getvalue()
 
1298
        if out:
 
1299
            self.log('output:\n%r', out)
 
1300
        if err:
 
1301
            self.log('errors:\n%r', err)
 
1302
        if retcode is not None:
 
1303
            self.assertEquals(retcode, result,
 
1304
                              message='Unexpected return code')
 
1305
        return out, err
 
1306
 
 
1307
    def run_bzr(self, *args, **kwargs):
 
1308
        """Invoke bzr, as if it were run from the command line.
 
1309
 
 
1310
        The argument list should not include the bzr program name - the
 
1311
        first argument is normally the bzr command.  Arguments may be
 
1312
        passed in three ways:
 
1313
 
 
1314
        1- A list of strings, eg ["commit", "a"].  This is recommended
 
1315
        when the command contains whitespace or metacharacters, or 
 
1316
        is built up at run time.
 
1317
 
 
1318
        2- A single string, eg "add a".  This is the most convenient 
 
1319
        for hardcoded commands.
 
1320
 
 
1321
        3- Several varargs parameters, eg run_bzr("add", "a").  
 
1322
        This is not recommended for new code.
 
1323
 
 
1324
        This runs bzr through the interface that catches and reports
 
1325
        errors, and with logging set to something approximating the
 
1326
        default, so that error reporting can be checked.
 
1327
 
 
1328
        This should be the main method for tests that want to exercise the
 
1329
        overall behavior of the bzr application (rather than a unit test
 
1330
        or a functional test of the library.)
 
1331
 
 
1332
        This sends the stdout/stderr results into the test's log,
 
1333
        where it may be useful for debugging.  See also run_captured.
 
1334
 
 
1335
        :keyword stdin: A string to be used as stdin for the command.
 
1336
        :keyword retcode: The status code the command should return;
 
1337
            default 0.
 
1338
        :keyword working_dir: The directory to run the command in
 
1339
        :keyword error_regexes: A list of expected error messages.  If
 
1340
            specified they must be seen in the error output of the command.
 
1341
        """
 
1342
        retcode = kwargs.pop('retcode', 0)
 
1343
        encoding = kwargs.pop('encoding', None)
 
1344
        stdin = kwargs.pop('stdin', None)
 
1345
        working_dir = kwargs.pop('working_dir', None)
 
1346
        error_regexes = kwargs.pop('error_regexes', [])
 
1347
 
 
1348
        if len(args) == 1:
 
1349
            if isinstance(args[0], (list, basestring)):
 
1350
                args = args[0]
 
1351
        else:
 
1352
            symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
 
1353
                                   DeprecationWarning, stacklevel=3)
 
1354
 
 
1355
        out, err = self._run_bzr_autosplit(args=args,
 
1356
            retcode=retcode,
 
1357
            encoding=encoding, stdin=stdin, working_dir=working_dir,
 
1358
            )
 
1359
 
 
1360
        for regex in error_regexes:
 
1361
            self.assertContainsRe(err, regex)
 
1362
        return out, err
 
1363
 
 
1364
    def run_bzr_decode(self, *args, **kwargs):
 
1365
        if 'encoding' in kwargs:
 
1366
            encoding = kwargs['encoding']
 
1367
        else:
 
1368
            encoding = bzrlib.user_encoding
 
1369
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
1370
 
 
1371
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
1372
        """Run bzr, and check that stderr contains the supplied regexes
 
1373
 
 
1374
        :param error_regexes: Sequence of regular expressions which
 
1375
            must each be found in the error output. The relative ordering
 
1376
            is not enforced.
 
1377
        :param args: command-line arguments for bzr
 
1378
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
1379
            This function changes the default value of retcode to be 3,
 
1380
            since in most cases this is run when you expect bzr to fail.
 
1381
 
 
1382
        :return: (out, err) The actual output of running the command (in case
 
1383
            you want to do more inspection)
 
1384
 
 
1385
        Examples of use::
 
1386
 
 
1387
            # Make sure that commit is failing because there is nothing to do
 
1388
            self.run_bzr_error(['no changes to commit'],
 
1389
                               'commit', '-m', 'my commit comment')
 
1390
            # Make sure --strict is handling an unknown file, rather than
 
1391
            # giving us the 'nothing to do' error
 
1392
            self.build_tree(['unknown'])
 
1393
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
1394
                               'commit', '--strict', '-m', 'my commit comment')
 
1395
        """
 
1396
        kwargs.setdefault('retcode', 3)
 
1397
        kwargs['error_regexes'] = error_regexes
 
1398
        out, err = self.run_bzr(*args, **kwargs)
 
1399
        return out, err
 
1400
 
 
1401
    def run_bzr_subprocess(self, *args, **kwargs):
 
1402
        """Run bzr in a subprocess for testing.
 
1403
 
 
1404
        This starts a new Python interpreter and runs bzr in there. 
 
1405
        This should only be used for tests that have a justifiable need for
 
1406
        this isolation: e.g. they are testing startup time, or signal
 
1407
        handling, or early startup code, etc.  Subprocess code can't be 
 
1408
        profiled or debugged so easily.
 
1409
 
 
1410
        :keyword retcode: The status code that is expected.  Defaults to 0.  If
 
1411
            None is supplied, the status code is not checked.
 
1412
        :keyword env_changes: A dictionary which lists changes to environment
 
1413
            variables. A value of None will unset the env variable.
 
1414
            The values must be strings. The change will only occur in the
 
1415
            child, so you don't need to fix the environment after running.
 
1416
        :keyword universal_newlines: Convert CRLF => LF
 
1417
        :keyword allow_plugins: By default the subprocess is run with
 
1418
            --no-plugins to ensure test reproducibility. Also, it is possible
 
1419
            for system-wide plugins to create unexpected output on stderr,
 
1420
            which can cause unnecessary test failures.
 
1421
        """
 
1422
        env_changes = kwargs.get('env_changes', {})
 
1423
        working_dir = kwargs.get('working_dir', None)
 
1424
        allow_plugins = kwargs.get('allow_plugins', False)
 
1425
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1426
                                            working_dir=working_dir,
 
1427
                                            allow_plugins=allow_plugins)
 
1428
        # We distinguish between retcode=None and retcode not passed.
 
1429
        supplied_retcode = kwargs.get('retcode', 0)
 
1430
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1431
            universal_newlines=kwargs.get('universal_newlines', False),
 
1432
            process_args=args)
 
1433
 
 
1434
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1435
                             skip_if_plan_to_signal=False,
 
1436
                             working_dir=None,
 
1437
                             allow_plugins=False):
 
1438
        """Start bzr in a subprocess for testing.
 
1439
 
 
1440
        This starts a new Python interpreter and runs bzr in there.
 
1441
        This should only be used for tests that have a justifiable need for
 
1442
        this isolation: e.g. they are testing startup time, or signal
 
1443
        handling, or early startup code, etc.  Subprocess code can't be
 
1444
        profiled or debugged so easily.
 
1445
 
 
1446
        :param process_args: a list of arguments to pass to the bzr executable,
 
1447
            for example ``['--version']``.
 
1448
        :param env_changes: A dictionary which lists changes to environment
 
1449
            variables. A value of None will unset the env variable.
 
1450
            The values must be strings. The change will only occur in the
 
1451
            child, so you don't need to fix the environment after running.
 
1452
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1453
            is not available.
 
1454
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1455
 
 
1456
        :returns: Popen object for the started process.
 
1457
        """
 
1458
        if skip_if_plan_to_signal:
 
1459
            if not getattr(os, 'kill', None):
 
1460
                raise TestSkipped("os.kill not available.")
 
1461
 
 
1462
        if env_changes is None:
 
1463
            env_changes = {}
 
1464
        old_env = {}
 
1465
 
 
1466
        def cleanup_environment():
 
1467
            for env_var, value in env_changes.iteritems():
 
1468
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1469
 
 
1470
        def restore_environment():
 
1471
            for env_var, value in old_env.iteritems():
 
1472
                osutils.set_or_unset_env(env_var, value)
 
1473
 
 
1474
        bzr_path = self.get_bzr_path()
 
1475
 
 
1476
        cwd = None
 
1477
        if working_dir is not None:
 
1478
            cwd = osutils.getcwd()
 
1479
            os.chdir(working_dir)
 
1480
 
 
1481
        try:
 
1482
            # win32 subprocess doesn't support preexec_fn
 
1483
            # so we will avoid using it on all platforms, just to
 
1484
            # make sure the code path is used, and we don't break on win32
 
1485
            cleanup_environment()
 
1486
            command = [sys.executable, bzr_path]
 
1487
            if not allow_plugins:
 
1488
                command.append('--no-plugins')
 
1489
            command.extend(process_args)
 
1490
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1491
        finally:
 
1492
            restore_environment()
 
1493
            if cwd is not None:
 
1494
                os.chdir(cwd)
 
1495
 
 
1496
        return process
 
1497
 
 
1498
    def _popen(self, *args, **kwargs):
 
1499
        """Place a call to Popen.
 
1500
 
 
1501
        Allows tests to override this method to intercept the calls made to
 
1502
        Popen for introspection.
 
1503
        """
 
1504
        return Popen(*args, **kwargs)
 
1505
 
 
1506
    def get_bzr_path(self):
 
1507
        """Return the path of the 'bzr' executable for this test suite."""
 
1508
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1509
        if not os.path.isfile(bzr_path):
 
1510
            # We are probably installed. Assume sys.argv is the right file
 
1511
            bzr_path = sys.argv[0]
 
1512
        return bzr_path
 
1513
 
 
1514
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1515
                              universal_newlines=False, process_args=None):
 
1516
        """Finish the execution of process.
 
1517
 
 
1518
        :param process: the Popen object returned from start_bzr_subprocess.
 
1519
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1520
            None is supplied, the status code is not checked.
 
1521
        :param send_signal: an optional signal to send to the process.
 
1522
        :param universal_newlines: Convert CRLF => LF
 
1523
        :returns: (stdout, stderr)
 
1524
        """
 
1525
        if send_signal is not None:
 
1526
            os.kill(process.pid, send_signal)
 
1527
        out, err = process.communicate()
 
1528
 
 
1529
        if universal_newlines:
 
1530
            out = out.replace('\r\n', '\n')
 
1531
            err = err.replace('\r\n', '\n')
 
1532
 
 
1533
        if retcode is not None and retcode != process.returncode:
 
1534
            if process_args is None:
 
1535
                process_args = "(unknown args)"
 
1536
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1537
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1538
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1539
                      % (process_args, retcode, process.returncode))
 
1540
        return [out, err]
 
1541
 
 
1542
    def check_inventory_shape(self, inv, shape):
 
1543
        """Compare an inventory to a list of expected names.
 
1544
 
 
1545
        Fail if they are not precisely equal.
 
1546
        """
 
1547
        extras = []
 
1548
        shape = list(shape)             # copy
 
1549
        for path, ie in inv.entries():
 
1550
            name = path.replace('\\', '/')
 
1551
            if ie.kind == 'directory':
 
1552
                name = name + '/'
 
1553
            if name in shape:
 
1554
                shape.remove(name)
 
1555
            else:
 
1556
                extras.append(name)
 
1557
        if shape:
 
1558
            self.fail("expected paths not found in inventory: %r" % shape)
 
1559
        if extras:
 
1560
            self.fail("unexpected paths found in inventory: %r" % extras)
 
1561
 
 
1562
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
1563
                         a_callable=None, *args, **kwargs):
 
1564
        """Call callable with redirected std io pipes.
 
1565
 
 
1566
        Returns the return code."""
 
1567
        if not callable(a_callable):
 
1568
            raise ValueError("a_callable must be callable.")
 
1569
        if stdin is None:
 
1570
            stdin = StringIO("")
 
1571
        if stdout is None:
 
1572
            if getattr(self, "_log_file", None) is not None:
 
1573
                stdout = self._log_file
 
1574
            else:
 
1575
                stdout = StringIO()
 
1576
        if stderr is None:
 
1577
            if getattr(self, "_log_file", None is not None):
 
1578
                stderr = self._log_file
 
1579
            else:
 
1580
                stderr = StringIO()
 
1581
        real_stdin = sys.stdin
 
1582
        real_stdout = sys.stdout
 
1583
        real_stderr = sys.stderr
 
1584
        try:
 
1585
            sys.stdout = stdout
 
1586
            sys.stderr = stderr
 
1587
            sys.stdin = stdin
 
1588
            return a_callable(*args, **kwargs)
 
1589
        finally:
 
1590
            sys.stdout = real_stdout
 
1591
            sys.stderr = real_stderr
 
1592
            sys.stdin = real_stdin
 
1593
 
 
1594
    def reduceLockdirTimeout(self):
 
1595
        """Reduce the default lock timeout for the duration of the test, so that
 
1596
        if LockContention occurs during a test, it does so quickly.
 
1597
 
 
1598
        Tests that expect to provoke LockContention errors should call this.
 
1599
        """
 
1600
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
 
1601
        def resetTimeout():
 
1602
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
1603
        self.addCleanup(resetTimeout)
 
1604
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
1605
 
 
1606
 
 
1607
class TestCaseWithMemoryTransport(TestCase):
 
1608
    """Common test class for tests that do not need disk resources.
 
1609
 
 
1610
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1611
 
 
1612
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1613
 
 
1614
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1615
    a directory which does not exist. This serves to help ensure test isolation
 
1616
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1617
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1618
    file defaults for the transport in tests, nor does it obey the command line
 
1619
    override, so tests that accidentally write to the common directory should
 
1620
    be rare.
 
1621
 
 
1622
    :cvar TEST_ROOT: Directory containing all temporary directories, plus
 
1623
    a .bzr directory that stops us ascending higher into the filesystem.
 
1624
    """
 
1625
 
 
1626
    TEST_ROOT = None
 
1627
    _TEST_NAME = 'test'
 
1628
 
 
1629
    def __init__(self, methodName='runTest'):
 
1630
        # allow test parameterisation after test construction and before test
 
1631
        # execution. Variables that the parameteriser sets need to be 
 
1632
        # ones that are not set by setUp, or setUp will trash them.
 
1633
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1634
        self.vfs_transport_factory = default_transport
 
1635
        self.transport_server = None
 
1636
        self.transport_readonly_server = None
 
1637
        self.__vfs_server = None
 
1638
 
 
1639
    def get_transport(self, relpath=None):
 
1640
        """Return a writeable transport.
 
1641
 
 
1642
        This transport is for the test scratch space relative to
 
1643
        "self._test_root"
 
1644
        
 
1645
        :param relpath: a path relative to the base url.
 
1646
        """
 
1647
        t = get_transport(self.get_url(relpath))
 
1648
        self.assertFalse(t.is_readonly())
 
1649
        return t
 
1650
 
 
1651
    def get_readonly_transport(self, relpath=None):
 
1652
        """Return a readonly transport for the test scratch space
 
1653
        
 
1654
        This can be used to test that operations which should only need
 
1655
        readonly access in fact do not try to write.
 
1656
 
 
1657
        :param relpath: a path relative to the base url.
 
1658
        """
 
1659
        t = get_transport(self.get_readonly_url(relpath))
 
1660
        self.assertTrue(t.is_readonly())
 
1661
        return t
 
1662
 
 
1663
    def create_transport_readonly_server(self):
 
1664
        """Create a transport server from class defined at init.
 
1665
 
 
1666
        This is mostly a hook for daughter classes.
 
1667
        """
 
1668
        return self.transport_readonly_server()
 
1669
 
 
1670
    def get_readonly_server(self):
 
1671
        """Get the server instance for the readonly transport
 
1672
 
 
1673
        This is useful for some tests with specific servers to do diagnostics.
 
1674
        """
 
1675
        if self.__readonly_server is None:
 
1676
            if self.transport_readonly_server is None:
 
1677
                # readonly decorator requested
 
1678
                # bring up the server
 
1679
                self.__readonly_server = ReadonlyServer()
 
1680
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1681
            else:
 
1682
                self.__readonly_server = self.create_transport_readonly_server()
 
1683
                self.__readonly_server.setUp(self.get_vfs_only_server())
 
1684
            self.addCleanup(self.__readonly_server.tearDown)
 
1685
        return self.__readonly_server
 
1686
 
 
1687
    def get_readonly_url(self, relpath=None):
 
1688
        """Get a URL for the readonly transport.
 
1689
 
 
1690
        This will either be backed by '.' or a decorator to the transport 
 
1691
        used by self.get_url()
 
1692
        relpath provides for clients to get a path relative to the base url.
 
1693
        These should only be downwards relative, not upwards.
 
1694
        """
 
1695
        base = self.get_readonly_server().get_url()
 
1696
        return self._adjust_url(base, relpath)
 
1697
 
 
1698
    def get_vfs_only_server(self):
 
1699
        """Get the vfs only read/write server instance.
 
1700
 
 
1701
        This is useful for some tests with specific servers that need
 
1702
        diagnostics.
 
1703
 
 
1704
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1705
        is no means to override it.
 
1706
        """
 
1707
        if self.__vfs_server is None:
 
1708
            self.__vfs_server = MemoryServer()
 
1709
            self.__vfs_server.setUp()
 
1710
            self.addCleanup(self.__vfs_server.tearDown)
 
1711
        return self.__vfs_server
 
1712
 
 
1713
    def get_server(self):
 
1714
        """Get the read/write server instance.
 
1715
 
 
1716
        This is useful for some tests with specific servers that need
 
1717
        diagnostics.
 
1718
 
 
1719
        This is built from the self.transport_server factory. If that is None,
 
1720
        then the self.get_vfs_server is returned.
 
1721
        """
 
1722
        if self.__server is None:
 
1723
            if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
 
1724
                return self.get_vfs_only_server()
 
1725
            else:
 
1726
                # bring up a decorated means of access to the vfs only server.
 
1727
                self.__server = self.transport_server()
 
1728
                try:
 
1729
                    self.__server.setUp(self.get_vfs_only_server())
 
1730
                except TypeError, e:
 
1731
                    # This should never happen; the try:Except here is to assist
 
1732
                    # developers having to update code rather than seeing an
 
1733
                    # uninformative TypeError.
 
1734
                    raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
 
1735
            self.addCleanup(self.__server.tearDown)
 
1736
        return self.__server
 
1737
 
 
1738
    def _adjust_url(self, base, relpath):
 
1739
        """Get a URL (or maybe a path) for the readwrite transport.
 
1740
 
 
1741
        This will either be backed by '.' or to an equivalent non-file based
 
1742
        facility.
 
1743
        relpath provides for clients to get a path relative to the base url.
 
1744
        These should only be downwards relative, not upwards.
 
1745
        """
 
1746
        if relpath is not None and relpath != '.':
 
1747
            if not base.endswith('/'):
 
1748
                base = base + '/'
 
1749
            # XXX: Really base should be a url; we did after all call
 
1750
            # get_url()!  But sometimes it's just a path (from
 
1751
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1752
            # to a non-escaped local path.
 
1753
            if base.startswith('./') or base.startswith('/'):
 
1754
                base += relpath
 
1755
            else:
 
1756
                base += urlutils.escape(relpath)
 
1757
        return base
 
1758
 
 
1759
    def get_url(self, relpath=None):
 
1760
        """Get a URL (or maybe a path) for the readwrite transport.
 
1761
 
 
1762
        This will either be backed by '.' or to an equivalent non-file based
 
1763
        facility.
 
1764
        relpath provides for clients to get a path relative to the base url.
 
1765
        These should only be downwards relative, not upwards.
 
1766
        """
 
1767
        base = self.get_server().get_url()
 
1768
        return self._adjust_url(base, relpath)
 
1769
 
 
1770
    def get_vfs_only_url(self, relpath=None):
 
1771
        """Get a URL (or maybe a path for the plain old vfs transport.
 
1772
 
 
1773
        This will never be a smart protocol.  It always has all the
 
1774
        capabilities of the local filesystem, but it might actually be a
 
1775
        MemoryTransport or some other similar virtual filesystem.
 
1776
 
 
1777
        This is the backing transport (if any) of the server returned by
 
1778
        get_url and get_readonly_url.
 
1779
 
 
1780
        :param relpath: provides for clients to get a path relative to the base
 
1781
            url.  These should only be downwards relative, not upwards.
 
1782
        :return: A URL
 
1783
        """
 
1784
        base = self.get_vfs_only_server().get_url()
 
1785
        return self._adjust_url(base, relpath)
 
1786
 
 
1787
    def _make_test_root(self):
 
1788
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1789
            return
 
1790
        root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
1791
        TestCaseWithMemoryTransport.TEST_ROOT = root
 
1792
        
 
1793
        # make a fake bzr directory there to prevent any tests propagating
 
1794
        # up onto the source directory's real branch
 
1795
        bzrdir.BzrDir.create_standalone_workingtree(root)
 
1796
 
 
1797
        # The same directory is used by all tests, and we're not specifically
 
1798
        # told when all tests are finished.  This will do.
 
1799
        atexit.register(_rmtree_temp_dir, root)
 
1800
 
 
1801
    def makeAndChdirToTestDir(self):
 
1802
        """Create a temporary directories for this one test.
 
1803
        
 
1804
        This must set self.test_home_dir and self.test_dir and chdir to
 
1805
        self.test_dir.
 
1806
        
 
1807
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1808
        """
 
1809
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1810
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1811
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1812
        
 
1813
    def make_branch(self, relpath, format=None):
 
1814
        """Create a branch on the transport at relpath."""
 
1815
        repo = self.make_repository(relpath, format=format)
 
1816
        return repo.bzrdir.create_branch()
 
1817
 
 
1818
    def make_bzrdir(self, relpath, format=None):
 
1819
        try:
 
1820
            # might be a relative or absolute path
 
1821
            maybe_a_url = self.get_url(relpath)
 
1822
            segments = maybe_a_url.rsplit('/', 1)
 
1823
            t = get_transport(maybe_a_url)
 
1824
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1825
                t.ensure_base()
 
1826
            if format is None:
 
1827
                format = 'default'
 
1828
            if isinstance(format, basestring):
 
1829
                format = bzrdir.format_registry.make_bzrdir(format)
 
1830
            return format.initialize_on_transport(t)
 
1831
        except errors.UninitializableFormat:
 
1832
            raise TestSkipped("Format %s is not initializable." % format)
 
1833
 
 
1834
    def make_repository(self, relpath, shared=False, format=None):
 
1835
        """Create a repository on our default transport at relpath.
 
1836
        
 
1837
        Note that relpath must be a relative path, not a full url.
 
1838
        """
 
1839
        # FIXME: If you create a remoterepository this returns the underlying
 
1840
        # real format, which is incorrect.  Actually we should make sure that 
 
1841
        # RemoteBzrDir returns a RemoteRepository.
 
1842
        # maybe  mbp 20070410
 
1843
        made_control = self.make_bzrdir(relpath, format=format)
 
1844
        return made_control.create_repository(shared=shared)
 
1845
 
 
1846
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1847
        """Create a branch on the default transport and a MemoryTree for it."""
 
1848
        b = self.make_branch(relpath, format=format)
 
1849
        return memorytree.MemoryTree.create_on_branch(b)
 
1850
 
 
1851
    def overrideEnvironmentForTesting(self):
 
1852
        os.environ['HOME'] = self.test_home_dir
 
1853
        os.environ['BZR_HOME'] = self.test_home_dir
 
1854
        
 
1855
    def setUp(self):
 
1856
        super(TestCaseWithMemoryTransport, self).setUp()
 
1857
        self._make_test_root()
 
1858
        _currentdir = os.getcwdu()
 
1859
        def _leaveDirectory():
 
1860
            os.chdir(_currentdir)
 
1861
        self.addCleanup(_leaveDirectory)
 
1862
        self.makeAndChdirToTestDir()
 
1863
        self.overrideEnvironmentForTesting()
 
1864
        self.__readonly_server = None
 
1865
        self.__server = None
 
1866
        self.reduceLockdirTimeout()
 
1867
 
 
1868
     
 
1869
class TestCaseInTempDir(TestCaseWithMemoryTransport):
 
1870
    """Derived class that runs a test within a temporary directory.
 
1871
 
 
1872
    This is useful for tests that need to create a branch, etc.
 
1873
 
 
1874
    The directory is created in a slightly complex way: for each
 
1875
    Python invocation, a new temporary top-level directory is created.
 
1876
    All test cases create their own directory within that.  If the
 
1877
    tests complete successfully, the directory is removed.
 
1878
 
 
1879
    :ivar test_base_dir: The path of the top-level directory for this 
 
1880
    test, which contains a home directory and a work directory.
 
1881
 
 
1882
    :ivar test_home_dir: An initially empty directory under test_base_dir
 
1883
    which is used as $HOME for this test.
 
1884
 
 
1885
    :ivar test_dir: A directory under test_base_dir used as the current
 
1886
    directory when the test proper is run.
 
1887
    """
 
1888
 
 
1889
    OVERRIDE_PYTHON = 'python'
 
1890
 
 
1891
    def check_file_contents(self, filename, expect):
 
1892
        self.log("check contents of file %s" % filename)
 
1893
        contents = file(filename, 'r').read()
 
1894
        if contents != expect:
 
1895
            self.log("expected: %r" % expect)
 
1896
            self.log("actually: %r" % contents)
 
1897
            self.fail("contents of %s not as expected" % filename)
 
1898
 
 
1899
    def makeAndChdirToTestDir(self):
 
1900
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1901
        
 
1902
        For TestCaseInTempDir we create a temporary directory based on the test
 
1903
        name and then create two subdirs - test and home under it.
 
1904
        """
 
1905
        # create a directory within the top level test directory
 
1906
        candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
 
1907
        # now create test and home directories within this dir
 
1908
        self.test_base_dir = candidate_dir
 
1909
        self.test_home_dir = self.test_base_dir + '/home'
 
1910
        os.mkdir(self.test_home_dir)
 
1911
        self.test_dir = self.test_base_dir + '/work'
 
1912
        os.mkdir(self.test_dir)
 
1913
        os.chdir(self.test_dir)
 
1914
        # put name of test inside
 
1915
        f = file(self.test_base_dir + '/name', 'w')
 
1916
        try:
 
1917
            f.write(self.id())
 
1918
        finally:
 
1919
            f.close()
 
1920
        self.addCleanup(self.deleteTestDir)
 
1921
 
 
1922
    def deleteTestDir(self):
 
1923
        os.chdir(self.TEST_ROOT)
 
1924
        _rmtree_temp_dir(self.test_base_dir)
 
1925
 
 
1926
    def build_tree(self, shape, line_endings='binary', transport=None):
 
1927
        """Build a test tree according to a pattern.
 
1928
 
 
1929
        shape is a sequence of file specifications.  If the final
 
1930
        character is '/', a directory is created.
 
1931
 
 
1932
        This assumes that all the elements in the tree being built are new.
 
1933
 
 
1934
        This doesn't add anything to a branch.
 
1935
 
 
1936
        :param line_endings: Either 'binary' or 'native'
 
1937
            in binary mode, exact contents are written in native mode, the
 
1938
            line endings match the default platform endings.
 
1939
        :param transport: A transport to write to, for building trees on VFS's.
 
1940
            If the transport is readonly or None, "." is opened automatically.
 
1941
        :return: None
 
1942
        """
 
1943
        # It's OK to just create them using forward slashes on windows.
 
1944
        if transport is None or transport.is_readonly():
 
1945
            transport = get_transport(".")
 
1946
        for name in shape:
 
1947
            self.assert_(isinstance(name, basestring))
 
1948
            if name[-1] == '/':
 
1949
                transport.mkdir(urlutils.escape(name[:-1]))
 
1950
            else:
 
1951
                if line_endings == 'binary':
 
1952
                    end = '\n'
 
1953
                elif line_endings == 'native':
 
1954
                    end = os.linesep
 
1955
                else:
 
1956
                    raise errors.BzrError(
 
1957
                        'Invalid line ending request %r' % line_endings)
 
1958
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1959
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1960
 
 
1961
    def build_tree_contents(self, shape):
 
1962
        build_tree_contents(shape)
 
1963
 
 
1964
    def assertFileEqual(self, content, path):
 
1965
        """Fail if path does not contain 'content'."""
 
1966
        self.failUnlessExists(path)
 
1967
        f = file(path, 'rb')
 
1968
        try:
 
1969
            s = f.read()
 
1970
        finally:
 
1971
            f.close()
 
1972
        self.assertEqualDiff(content, s)
 
1973
 
 
1974
    def failUnlessExists(self, path):
 
1975
        """Fail unless path or paths, which may be abs or relative, exist."""
 
1976
        if not isinstance(path, basestring):
 
1977
            for p in path:
 
1978
                self.failUnlessExists(p)
 
1979
        else:
 
1980
            self.failUnless(osutils.lexists(path),path+" does not exist")
 
1981
 
 
1982
    def failIfExists(self, path):
 
1983
        """Fail if path or paths, which may be abs or relative, exist."""
 
1984
        if not isinstance(path, basestring):
 
1985
            for p in path:
 
1986
                self.failIfExists(p)
 
1987
        else:
 
1988
            self.failIf(osutils.lexists(path),path+" exists")
 
1989
 
 
1990
    def assertInWorkingTree(self,path,root_path='.',tree=None):
 
1991
        """Assert whether path or paths are in the WorkingTree"""
 
1992
        if tree is None:
 
1993
            tree = workingtree.WorkingTree.open(root_path)
 
1994
        if not isinstance(path, basestring):
 
1995
            for p in path:
 
1996
                self.assertInWorkingTree(p,tree=tree)
 
1997
        else:
 
1998
            self.assertIsNot(tree.path2id(path), None,
 
1999
                path+' not in working tree.')
 
2000
 
 
2001
    def assertNotInWorkingTree(self,path,root_path='.',tree=None):
 
2002
        """Assert whether path or paths are not in the WorkingTree"""
 
2003
        if tree is None:
 
2004
            tree = workingtree.WorkingTree.open(root_path)
 
2005
        if not isinstance(path, basestring):
 
2006
            for p in path:
 
2007
                self.assertNotInWorkingTree(p,tree=tree)
 
2008
        else:
 
2009
            self.assertIs(tree.path2id(path), None, path+' in working tree.')
 
2010
 
 
2011
 
 
2012
class TestCaseWithTransport(TestCaseInTempDir):
 
2013
    """A test case that provides get_url and get_readonly_url facilities.
 
2014
 
 
2015
    These back onto two transport servers, one for readonly access and one for
 
2016
    read write access.
 
2017
 
 
2018
    If no explicit class is provided for readonly access, a
 
2019
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
2020
    based read write transports.
 
2021
 
 
2022
    If an explicit class is provided for readonly access, that server and the 
 
2023
    readwrite one must both define get_url() as resolving to os.getcwd().
 
2024
    """
 
2025
 
 
2026
    def get_vfs_only_server(self):
 
2027
        """See TestCaseWithMemoryTransport.
 
2028
 
 
2029
        This is useful for some tests with specific servers that need
 
2030
        diagnostics.
 
2031
        """
 
2032
        if self.__vfs_server is None:
 
2033
            self.__vfs_server = self.vfs_transport_factory()
 
2034
            self.__vfs_server.setUp()
 
2035
            self.addCleanup(self.__vfs_server.tearDown)
 
2036
        return self.__vfs_server
 
2037
 
 
2038
    def make_branch_and_tree(self, relpath, format=None):
 
2039
        """Create a branch on the transport and a tree locally.
 
2040
 
 
2041
        If the transport is not a LocalTransport, the Tree can't be created on
 
2042
        the transport.  In that case if the vfs_transport_factory is
 
2043
        LocalURLServer the working tree is created in the local
 
2044
        directory backing the transport, and the returned tree's branch and
 
2045
        repository will also be accessed locally. Otherwise a lightweight
 
2046
        checkout is created and returned.
 
2047
 
 
2048
        :param format: The BzrDirFormat.
 
2049
        :returns: the WorkingTree.
 
2050
        """
 
2051
        # TODO: always use the local disk path for the working tree,
 
2052
        # this obviously requires a format that supports branch references
 
2053
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
2054
        # RBC 20060208
 
2055
        b = self.make_branch(relpath, format=format)
 
2056
        try:
 
2057
            return b.bzrdir.create_workingtree()
 
2058
        except errors.NotLocalUrl:
 
2059
            # We can only make working trees locally at the moment.  If the
 
2060
            # transport can't support them, then we keep the non-disk-backed
 
2061
            # branch and create a local checkout.
 
2062
            if self.vfs_transport_factory is LocalURLServer:
 
2063
                # the branch is colocated on disk, we cannot create a checkout.
 
2064
                # hopefully callers will expect this.
 
2065
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
 
2066
                return local_controldir.create_workingtree()
 
2067
            else:
 
2068
                return b.create_checkout(relpath, lightweight=True)
 
2069
 
 
2070
    def assertIsDirectory(self, relpath, transport):
 
2071
        """Assert that relpath within transport is a directory.
 
2072
 
 
2073
        This may not be possible on all transports; in that case it propagates
 
2074
        a TransportNotPossible.
 
2075
        """
 
2076
        try:
 
2077
            mode = transport.stat(relpath).st_mode
 
2078
        except errors.NoSuchFile:
 
2079
            self.fail("path %s is not a directory; no such file"
 
2080
                      % (relpath))
 
2081
        if not stat.S_ISDIR(mode):
 
2082
            self.fail("path %s is not a directory; has mode %#o"
 
2083
                      % (relpath, mode))
 
2084
 
 
2085
    def assertTreesEqual(self, left, right):
 
2086
        """Check that left and right have the same content and properties."""
 
2087
        # we use a tree delta to check for equality of the content, and we
 
2088
        # manually check for equality of other things such as the parents list.
 
2089
        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
 
2090
        differences = left.changes_from(right)
 
2091
        self.assertFalse(differences.has_changed(),
 
2092
            "Trees %r and %r are different: %r" % (left, right, differences))
 
2093
 
 
2094
    def setUp(self):
 
2095
        super(TestCaseWithTransport, self).setUp()
 
2096
        self.__vfs_server = None
 
2097
 
 
2098
 
 
2099
class ChrootedTestCase(TestCaseWithTransport):
 
2100
    """A support class that provides readonly urls outside the local namespace.
 
2101
 
 
2102
    This is done by checking if self.transport_server is a MemoryServer. if it
 
2103
    is then we are chrooted already, if it is not then an HttpServer is used
 
2104
    for readonly urls.
 
2105
 
 
2106
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
2107
                       be used without needed to redo it when a different 
 
2108
                       subclass is in use ?
 
2109
    """
 
2110
 
 
2111
    def setUp(self):
 
2112
        super(ChrootedTestCase, self).setUp()
 
2113
        if not self.vfs_transport_factory == MemoryServer:
 
2114
            self.transport_readonly_server = HttpServer
 
2115
 
 
2116
 
 
2117
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
 
2118
                       random_order=False):
 
2119
    """Create a test suite by filtering another one.
 
2120
    
 
2121
    :param suite:           the source suite
 
2122
    :param pattern:         pattern that names must match
 
2123
    :param exclude_pattern: pattern that names must not match, if any
 
2124
    :param random_order:    if True, tests in the new suite will be put in
 
2125
                            random order
 
2126
    :returns: the newly created suite
 
2127
    """ 
 
2128
    return sort_suite_by_re(suite, pattern, exclude_pattern,
 
2129
        random_order, False)
 
2130
 
 
2131
 
 
2132
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
 
2133
                     random_order=False, append_rest=True):
 
2134
    """Create a test suite by sorting another one.
 
2135
    
 
2136
    :param suite:           the source suite
 
2137
    :param pattern:         pattern that names must match in order to go
 
2138
                            first in the new suite
 
2139
    :param exclude_pattern: pattern that names must not match, if any
 
2140
    :param random_order:    if True, tests in the new suite will be put in
 
2141
                            random order
 
2142
    :param append_rest:     if False, pattern is a strict filter and not
 
2143
                            just an ordering directive
 
2144
    :returns: the newly created suite
 
2145
    """ 
 
2146
    first = []
 
2147
    second = []
 
2148
    filter_re = re.compile(pattern)
 
2149
    if exclude_pattern is not None:
 
2150
        exclude_re = re.compile(exclude_pattern)
 
2151
    for test in iter_suite_tests(suite):
 
2152
        test_id = test.id()
 
2153
        if exclude_pattern is None or not exclude_re.search(test_id):
 
2154
            if filter_re.search(test_id):
 
2155
                first.append(test)
 
2156
            elif append_rest:
 
2157
                second.append(test)
 
2158
    if random_order:
 
2159
        random.shuffle(first)
 
2160
        random.shuffle(second)
 
2161
    return TestUtil.TestSuite(first + second)
 
2162
 
 
2163
 
 
2164
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
2165
              stop_on_failure=False,
 
2166
              transport=None, lsprof_timed=None, bench_history=None,
 
2167
              matching_tests_first=None,
 
2168
              list_only=False,
 
2169
              random_seed=None,
 
2170
              exclude_pattern=None,
61
2171
              ):
62
 
        if m not in MODULES_TO_TEST:
63
 
            MODULES_TO_TEST.append(m)
64
 
 
65
 
 
66
 
    TestBase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
67
 
    print '%-30s %s' % ('bzr binary', TestBase.BZRPATH)
68
 
 
69
 
    print
70
 
 
71
 
    suite = TestSuite()
72
 
 
73
 
    # should also test bzrlib.merge_core, but they seem to be out of date with
74
 
    # the code.
75
 
 
76
 
 
77
 
    # XXX: python2.3's TestLoader() doesn't seem to find all the
78
 
    # tests; don't know why
 
2172
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
2173
    if verbose:
 
2174
        verbosity = 2
 
2175
    else:
 
2176
        verbosity = 1
 
2177
    runner = TextTestRunner(stream=sys.stdout,
 
2178
                            descriptions=0,
 
2179
                            verbosity=verbosity,
 
2180
                            bench_history=bench_history,
 
2181
                            list_only=list_only,
 
2182
                            )
 
2183
    runner.stop_on_failure=stop_on_failure
 
2184
    # Initialise the random number generator and display the seed used.
 
2185
    # We convert the seed to a long to make it reuseable across invocations.
 
2186
    random_order = False
 
2187
    if random_seed is not None:
 
2188
        random_order = True
 
2189
        if random_seed == "now":
 
2190
            random_seed = long(time.time())
 
2191
        else:
 
2192
            # Convert the seed to a long if we can
 
2193
            try:
 
2194
                random_seed = long(random_seed)
 
2195
            except:
 
2196
                pass
 
2197
        runner.stream.writeln("Randomizing test order using seed %s\n" %
 
2198
            (random_seed))
 
2199
        random.seed(random_seed)
 
2200
    # Customise the list of tests if requested
 
2201
    if pattern != '.*' or exclude_pattern is not None or random_order:
 
2202
        if matching_tests_first:
 
2203
            suite = sort_suite_by_re(suite, pattern, exclude_pattern,
 
2204
                random_order)
 
2205
        else:
 
2206
            suite = filter_suite_by_re(suite, pattern, exclude_pattern,
 
2207
                random_order)
 
2208
    result = runner.run(suite)
 
2209
    return result.wasSuccessful()
 
2210
 
 
2211
 
 
2212
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
2213
             transport=None,
 
2214
             test_suite_factory=None,
 
2215
             lsprof_timed=None,
 
2216
             bench_history=None,
 
2217
             matching_tests_first=None,
 
2218
             list_only=False,
 
2219
             random_seed=None,
 
2220
             exclude_pattern=None):
 
2221
    """Run the whole test suite under the enhanced runner"""
 
2222
    # XXX: Very ugly way to do this...
 
2223
    # Disable warning about old formats because we don't want it to disturb
 
2224
    # any blackbox tests.
 
2225
    from bzrlib import repository
 
2226
    repository._deprecation_warning_done = True
 
2227
 
 
2228
    global default_transport
 
2229
    if transport is None:
 
2230
        transport = default_transport
 
2231
    old_transport = default_transport
 
2232
    default_transport = transport
 
2233
    try:
 
2234
        if test_suite_factory is None:
 
2235
            suite = test_suite()
 
2236
        else:
 
2237
            suite = test_suite_factory()
 
2238
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
2239
                     stop_on_failure=stop_on_failure,
 
2240
                     transport=transport,
 
2241
                     lsprof_timed=lsprof_timed,
 
2242
                     bench_history=bench_history,
 
2243
                     matching_tests_first=matching_tests_first,
 
2244
                     list_only=list_only,
 
2245
                     random_seed=random_seed,
 
2246
                     exclude_pattern=exclude_pattern)
 
2247
    finally:
 
2248
        default_transport = old_transport
 
2249
 
 
2250
 
 
2251
def test_suite():
 
2252
    """Build and return TestSuite for the whole of bzrlib.
 
2253
    
 
2254
    This function can be replaced if you need to change the default test
 
2255
    suite on a global basis, but it is not encouraged.
 
2256
    """
 
2257
    testmod_names = [
 
2258
                   'bzrlib.util.tests.test_bencode',
 
2259
                   'bzrlib.tests.test__dirstate_helpers',
 
2260
                   'bzrlib.tests.test_ancestry',
 
2261
                   'bzrlib.tests.test_annotate',
 
2262
                   'bzrlib.tests.test_api',
 
2263
                   'bzrlib.tests.test_atomicfile',
 
2264
                   'bzrlib.tests.test_bad_files',
 
2265
                   'bzrlib.tests.test_branch',
 
2266
                   'bzrlib.tests.test_branchbuilder',
 
2267
                   'bzrlib.tests.test_bugtracker',
 
2268
                   'bzrlib.tests.test_bundle',
 
2269
                   'bzrlib.tests.test_bzrdir',
 
2270
                   'bzrlib.tests.test_cache_utf8',
 
2271
                   'bzrlib.tests.test_commands',
 
2272
                   'bzrlib.tests.test_commit',
 
2273
                   'bzrlib.tests.test_commit_merge',
 
2274
                   'bzrlib.tests.test_config',
 
2275
                   'bzrlib.tests.test_conflicts',
 
2276
                   'bzrlib.tests.test_counted_lock',
 
2277
                   'bzrlib.tests.test_decorators',
 
2278
                   'bzrlib.tests.test_delta',
 
2279
                   'bzrlib.tests.test_deprecated_graph',
 
2280
                   'bzrlib.tests.test_diff',
 
2281
                   'bzrlib.tests.test_dirstate',
 
2282
                   'bzrlib.tests.test_email_message',
 
2283
                   'bzrlib.tests.test_errors',
 
2284
                   'bzrlib.tests.test_escaped_store',
 
2285
                   'bzrlib.tests.test_extract',
 
2286
                   'bzrlib.tests.test_fetch',
 
2287
                   'bzrlib.tests.test_file_names',
 
2288
                   'bzrlib.tests.test_ftp_transport',
 
2289
                   'bzrlib.tests.test_generate_docs',
 
2290
                   'bzrlib.tests.test_generate_ids',
 
2291
                   'bzrlib.tests.test_globbing',
 
2292
                   'bzrlib.tests.test_gpg',
 
2293
                   'bzrlib.tests.test_graph',
 
2294
                   'bzrlib.tests.test_hashcache',
 
2295
                   'bzrlib.tests.test_help',
 
2296
                   'bzrlib.tests.test_hooks',
 
2297
                   'bzrlib.tests.test_http',
 
2298
                   'bzrlib.tests.test_http_response',
 
2299
                   'bzrlib.tests.test_https_ca_bundle',
 
2300
                   'bzrlib.tests.test_identitymap',
 
2301
                   'bzrlib.tests.test_ignores',
 
2302
                   'bzrlib.tests.test_index',
 
2303
                   'bzrlib.tests.test_info',
 
2304
                   'bzrlib.tests.test_inv',
 
2305
                   'bzrlib.tests.test_knit',
 
2306
                   'bzrlib.tests.test_lazy_import',
 
2307
                   'bzrlib.tests.test_lazy_regex',
 
2308
                   'bzrlib.tests.test_lockdir',
 
2309
                   'bzrlib.tests.test_lockable_files',
 
2310
                   'bzrlib.tests.test_log',
 
2311
                   'bzrlib.tests.test_lsprof',
 
2312
                   'bzrlib.tests.test_memorytree',
 
2313
                   'bzrlib.tests.test_merge',
 
2314
                   'bzrlib.tests.test_merge3',
 
2315
                   'bzrlib.tests.test_merge_core',
 
2316
                   'bzrlib.tests.test_merge_directive',
 
2317
                   'bzrlib.tests.test_missing',
 
2318
                   'bzrlib.tests.test_msgeditor',
 
2319
                   'bzrlib.tests.test_multiparent',
 
2320
                   'bzrlib.tests.test_nonascii',
 
2321
                   'bzrlib.tests.test_options',
 
2322
                   'bzrlib.tests.test_osutils',
 
2323
                   'bzrlib.tests.test_osutils_encodings',
 
2324
                   'bzrlib.tests.test_pack',
 
2325
                   'bzrlib.tests.test_patch',
 
2326
                   'bzrlib.tests.test_patches',
 
2327
                   'bzrlib.tests.test_permissions',
 
2328
                   'bzrlib.tests.test_plugins',
 
2329
                   'bzrlib.tests.test_progress',
 
2330
                   'bzrlib.tests.test_reconcile',
 
2331
                   'bzrlib.tests.test_registry',
 
2332
                   'bzrlib.tests.test_remote',
 
2333
                   'bzrlib.tests.test_repository',
 
2334
                   'bzrlib.tests.test_revert',
 
2335
                   'bzrlib.tests.test_revision',
 
2336
                   'bzrlib.tests.test_revisionnamespaces',
 
2337
                   'bzrlib.tests.test_revisiontree',
 
2338
                   'bzrlib.tests.test_rio',
 
2339
                   'bzrlib.tests.test_sampler',
 
2340
                   'bzrlib.tests.test_selftest',
 
2341
                   'bzrlib.tests.test_setup',
 
2342
                   'bzrlib.tests.test_sftp_transport',
 
2343
                   'bzrlib.tests.test_smart',
 
2344
                   'bzrlib.tests.test_smart_add',
 
2345
                   'bzrlib.tests.test_smart_transport',
 
2346
                   'bzrlib.tests.test_smtp_connection',
 
2347
                   'bzrlib.tests.test_source',
 
2348
                   'bzrlib.tests.test_ssh_transport',
 
2349
                   'bzrlib.tests.test_status',
 
2350
                   'bzrlib.tests.test_store',
 
2351
                   'bzrlib.tests.test_strace',
 
2352
                   'bzrlib.tests.test_subsume',
 
2353
                   'bzrlib.tests.test_symbol_versioning',
 
2354
                   'bzrlib.tests.test_tag',
 
2355
                   'bzrlib.tests.test_testament',
 
2356
                   'bzrlib.tests.test_textfile',
 
2357
                   'bzrlib.tests.test_textmerge',
 
2358
                   'bzrlib.tests.test_timestamp',
 
2359
                   'bzrlib.tests.test_trace',
 
2360
                   'bzrlib.tests.test_transactions',
 
2361
                   'bzrlib.tests.test_transform',
 
2362
                   'bzrlib.tests.test_transport',
 
2363
                   'bzrlib.tests.test_tree',
 
2364
                   'bzrlib.tests.test_treebuilder',
 
2365
                   'bzrlib.tests.test_tsort',
 
2366
                   'bzrlib.tests.test_tuned_gzip',
 
2367
                   'bzrlib.tests.test_ui',
 
2368
                   'bzrlib.tests.test_upgrade',
 
2369
                   'bzrlib.tests.test_urlutils',
 
2370
                   'bzrlib.tests.test_versionedfile',
 
2371
                   'bzrlib.tests.test_version',
 
2372
                   'bzrlib.tests.test_version_info',
 
2373
                   'bzrlib.tests.test_weave',
 
2374
                   'bzrlib.tests.test_whitebox',
 
2375
                   'bzrlib.tests.test_win32utils',
 
2376
                   'bzrlib.tests.test_workingtree',
 
2377
                   'bzrlib.tests.test_workingtree_4',
 
2378
                   'bzrlib.tests.test_wsgi',
 
2379
                   'bzrlib.tests.test_xml',
 
2380
                   ]
 
2381
    test_transport_implementations = [
 
2382
        'bzrlib.tests.test_transport_implementations',
 
2383
        'bzrlib.tests.test_read_bundle',
 
2384
        ]
 
2385
    suite = TestUtil.TestSuite()
 
2386
    loader = TestUtil.TestLoader()
 
2387
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
2388
    from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
 
2389
    adapter = TransportTestProviderAdapter()
 
2390
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
2391
    for package in packages_to_test():
 
2392
        suite.addTest(package.test_suite())
79
2393
    for m in MODULES_TO_TEST:
80
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
81
 
 
82
 
    for m in (MODULES_TO_DOCTEST):
83
 
        suite.addTest(DocTestSuite(m))
84
 
 
85
 
    for p in bzrlib.plugin.all_plugins:
86
 
        if hasattr(p, 'test_suite'):
87
 
            suite.addTest(p.test_suite())
88
 
 
89
 
    suite.addTest(unittest.makeSuite(bzrlib.merge_core.MergeTest, 'test_'))
90
 
 
91
 
    return run_suite(suite, 'testbzr')
92
 
 
93
 
 
94
 
 
 
2394
        suite.addTest(loader.loadTestsFromModule(m))
 
2395
    for m in MODULES_TO_DOCTEST:
 
2396
        try:
 
2397
            suite.addTest(doctest.DocTestSuite(m))
 
2398
        except ValueError, e:
 
2399
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
2400
            raise
 
2401
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
2402
        if getattr(plugin, 'test_suite', None) is not None:
 
2403
            default_encoding = sys.getdefaultencoding()
 
2404
            try:
 
2405
                plugin_suite = plugin.test_suite()
 
2406
            except ImportError, e:
 
2407
                bzrlib.trace.warning(
 
2408
                    'Unable to test plugin "%s": %s', name, e)
 
2409
            else:
 
2410
                suite.addTest(plugin_suite)
 
2411
            if default_encoding != sys.getdefaultencoding():
 
2412
                bzrlib.trace.warning(
 
2413
                    'Plugin "%s" tried to reset default encoding to: %s', name,
 
2414
                    sys.getdefaultencoding())
 
2415
                reload(sys)
 
2416
                sys.setdefaultencoding(default_encoding)
 
2417
    return suite
 
2418
 
 
2419
 
 
2420
def adapt_modules(mods_list, adapter, loader, suite):
 
2421
    """Adapt the modules in mods_list using adapter and add to suite."""
 
2422
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
2423
        suite.addTests(adapter.adapt(test))
 
2424
 
 
2425
 
 
2426
def _rmtree_temp_dir(dirname):
 
2427
    # If LANG=C we probably have created some bogus paths
 
2428
    # which rmtree(unicode) will fail to delete
 
2429
    # so make sure we are using rmtree(str) to delete everything
 
2430
    # except on win32, where rmtree(str) will fail
 
2431
    # since it doesn't have the property of byte-stream paths
 
2432
    # (they are either ascii or mbcs)
 
2433
    if sys.platform == 'win32':
 
2434
        # make sure we are using the unicode win32 api
 
2435
        dirname = unicode(dirname)
 
2436
    else:
 
2437
        dirname = dirname.encode(sys.getfilesystemencoding())
 
2438
    try:
 
2439
        osutils.rmtree(dirname)
 
2440
    except OSError, e:
 
2441
        if sys.platform == 'win32' and e.errno == errno.EACCES:
 
2442
            print >>sys.stderr, ('Permission denied: '
 
2443
                                 'unable to remove testing dir '
 
2444
                                 '%s' % os.path.basename(dirname))
 
2445
        else:
 
2446
            raise
 
2447
 
 
2448
 
 
2449
class Feature(object):
 
2450
    """An operating system Feature."""
 
2451
 
 
2452
    def __init__(self):
 
2453
        self._available = None
 
2454
 
 
2455
    def available(self):
 
2456
        """Is the feature available?
 
2457
 
 
2458
        :return: True if the feature is available.
 
2459
        """
 
2460
        if self._available is None:
 
2461
            self._available = self._probe()
 
2462
        return self._available
 
2463
 
 
2464
    def _probe(self):
 
2465
        """Implement this method in concrete features.
 
2466
 
 
2467
        :return: True if the feature is available.
 
2468
        """
 
2469
        raise NotImplementedError
 
2470
 
 
2471
    def __str__(self):
 
2472
        if getattr(self, 'feature_name', None):
 
2473
            return self.feature_name()
 
2474
        return self.__class__.__name__
 
2475
 
 
2476
 
 
2477
class TestScenarioApplier(object):
 
2478
    """A tool to apply scenarios to tests."""
 
2479
 
 
2480
    def adapt(self, test):
 
2481
        """Return a TestSuite containing a copy of test for each scenario."""
 
2482
        result = unittest.TestSuite()
 
2483
        for scenario in self.scenarios:
 
2484
            result.addTest(self.adapt_test_to_scenario(test, scenario))
 
2485
        return result
 
2486
 
 
2487
    def adapt_test_to_scenario(self, test, scenario):
 
2488
        """Copy test and apply scenario to it.
 
2489
 
 
2490
        :param test: A test to adapt.
 
2491
        :param scenario: A tuple describing the scenarion.
 
2492
            The first element of the tuple is the new test id.
 
2493
            The second element is a dict containing attributes to set on the
 
2494
            test.
 
2495
        :return: The adapted test.
 
2496
        """
 
2497
        from copy import deepcopy
 
2498
        new_test = deepcopy(test)
 
2499
        for name, value in scenario[1].items():
 
2500
            setattr(new_test, name, value)
 
2501
        new_id = "%s(%s)" % (new_test.id(), scenario[0])
 
2502
        new_test.id = lambda: new_id
 
2503
        return new_test