~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Martin Pool
  • Date: 2006-11-03 01:52:12 UTC
  • mto: This revision was merged to the branch mainline in revision 2119.
  • Revision ID: mbp@sourcefrog.net-20061103015212-1e5f881c2152d79f
Review comments

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
 
18
# TODO: Perhaps there should be an API to find out if bzr running under the
 
19
# test suite -- some plugins might want to avoid making intrusive changes if
 
20
# this is the case.  However, we want behaviour under to test to diverge as
 
21
# little as possible, so this should be used rarely if it's added at all.
 
22
# (Suggestion from j-a-meinel, 2005-11-24)
 
23
 
 
24
# NOTE: Some classes in here use camelCaseNaming() rather than
 
25
# underscore_naming().  That's for consistency with unittest; it's not the
 
26
# general style of bzrlib.  Please continue that consistency when adding e.g.
 
27
# new assertFoo() methods.
 
28
 
 
29
import codecs
 
30
from cStringIO import StringIO
 
31
import difflib
 
32
import doctest
 
33
import errno
18
34
import logging
19
 
import unittest
20
 
import tempfile
21
35
import os
 
36
import re
 
37
import shlex
 
38
import stat
 
39
from subprocess import Popen, PIPE
22
40
import sys
23
 
import subprocess
24
 
 
25
 
from testsweet import run_suite
 
41
import tempfile
 
42
import unittest
 
43
import time
 
44
 
 
45
 
 
46
from bzrlib import memorytree
 
47
import bzrlib.branch
 
48
import bzrlib.bzrdir as bzrdir
26
49
import bzrlib.commands
27
 
 
 
50
import bzrlib.bundle.serializer
 
51
import bzrlib.errors as errors
 
52
import bzrlib.export
 
53
import bzrlib.inventory
 
54
import bzrlib.iterablefile
 
55
import bzrlib.lockdir
 
56
try:
 
57
    import bzrlib.lsprof
 
58
except ImportError:
 
59
    # lsprof not available
 
60
    pass
 
61
from bzrlib.merge import merge_inner
 
62
import bzrlib.merge3
 
63
import bzrlib.osutils
 
64
import bzrlib.osutils as osutils
 
65
import bzrlib.plugin
 
66
import bzrlib.progress as progress
 
67
from bzrlib.revision import common_ancestor
 
68
import bzrlib.store
 
69
from bzrlib import symbol_versioning
28
70
import bzrlib.trace
29
 
import bzrlib.fetch
 
71
from bzrlib.transport import get_transport
 
72
import bzrlib.transport
 
73
from bzrlib.transport.local import LocalURLServer
 
74
from bzrlib.transport.memory import MemoryServer
 
75
from bzrlib.transport.readonly import ReadonlyServer
 
76
from bzrlib.trace import mutter, note
 
77
from bzrlib.tests import TestUtil
 
78
from bzrlib.tests.TestUtil import (
 
79
                          TestSuite,
 
80
                          TestLoader,
 
81
                          )
 
82
from bzrlib.tests.treeshape import build_tree_contents
 
83
import bzrlib.urlutils as urlutils
 
84
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
 
85
 
 
86
default_transport = LocalURLServer
30
87
 
31
88
MODULES_TO_TEST = []
32
 
MODULES_TO_DOCTEST = []
33
 
 
34
 
from logging import debug, warning, error
 
89
MODULES_TO_DOCTEST = [
 
90
                      bzrlib.bundle.serializer,
 
91
                      bzrlib.errors,
 
92
                      bzrlib.export,
 
93
                      bzrlib.inventory,
 
94
                      bzrlib.iterablefile,
 
95
                      bzrlib.lockdir,
 
96
                      bzrlib.merge3,
 
97
                      bzrlib.option,
 
98
                      bzrlib.store,
 
99
                      ]
 
100
 
 
101
 
 
102
def packages_to_test():
 
103
    """Return a list of packages to test.
 
104
 
 
105
    The packages are not globally imported so that import failures are
 
106
    triggered when running selftest, not when importing the command.
 
107
    """
 
108
    import bzrlib.doc
 
109
    import bzrlib.tests.blackbox
 
110
    import bzrlib.tests.branch_implementations
 
111
    import bzrlib.tests.bzrdir_implementations
 
112
    import bzrlib.tests.interrepository_implementations
 
113
    import bzrlib.tests.interversionedfile_implementations
 
114
    import bzrlib.tests.intertree_implementations
 
115
    import bzrlib.tests.repository_implementations
 
116
    import bzrlib.tests.revisionstore_implementations
 
117
    import bzrlib.tests.tree_implementations
 
118
    import bzrlib.tests.workingtree_implementations
 
119
    return [
 
120
            bzrlib.doc,
 
121
            bzrlib.tests.blackbox,
 
122
            bzrlib.tests.branch_implementations,
 
123
            bzrlib.tests.bzrdir_implementations,
 
124
            bzrlib.tests.interrepository_implementations,
 
125
            bzrlib.tests.interversionedfile_implementations,
 
126
            bzrlib.tests.intertree_implementations,
 
127
            bzrlib.tests.repository_implementations,
 
128
            bzrlib.tests.revisionstore_implementations,
 
129
            bzrlib.tests.tree_implementations,
 
130
            bzrlib.tests.workingtree_implementations,
 
131
            ]
 
132
 
 
133
 
 
134
class ExtendedTestResult(unittest._TextTestResult):
 
135
    """Accepts, reports and accumulates the results of running tests.
 
136
 
 
137
    Compared to this unittest version this class adds support for profiling,
 
138
    benchmarking, stopping as soon as a test fails,  and skipping tests.
 
139
    There are further-specialized subclasses for different types of display.
 
140
    """
 
141
 
 
142
    stop_early = False
 
143
    
 
144
    def __init__(self, stream, descriptions, verbosity,
 
145
                 bench_history=None,
 
146
                 num_tests=None,
 
147
                 ):
 
148
        """Construct new TestResult.
 
149
 
 
150
        :param bench_history: Optionally, a writable file object to accumulate
 
151
            benchmark results.
 
152
        """
 
153
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
 
154
        if bench_history is not None:
 
155
            from bzrlib.version import _get_bzr_source_tree
 
156
            src_tree = _get_bzr_source_tree()
 
157
            if src_tree:
 
158
                try:
 
159
                    revision_id = src_tree.get_parent_ids()[0]
 
160
                except IndexError:
 
161
                    # XXX: if this is a brand new tree, do the same as if there
 
162
                    # is no branch.
 
163
                    revision_id = ''
 
164
            else:
 
165
                # XXX: If there's no branch, what should we do?
 
166
                revision_id = ''
 
167
            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
 
168
        self._bench_history = bench_history
 
169
        self.ui = bzrlib.ui.ui_factory
 
170
        self.num_tests = num_tests
 
171
        self.error_count = 0
 
172
        self.failure_count = 0
 
173
        self.skip_count = 0
 
174
        self.count = 0
 
175
        self._overall_start_time = time.time()
 
176
    
 
177
    def extractBenchmarkTime(self, testCase):
 
178
        """Add a benchmark time for the current test case."""
 
179
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
 
180
    
 
181
    def _elapsedTestTimeString(self):
 
182
        """Return a time string for the overall time the current test has taken."""
 
183
        return self._formatTime(time.time() - self._start_time)
 
184
 
 
185
    def _testTimeString(self):
 
186
        if self._benchmarkTime is not None:
 
187
            return "%s/%s" % (
 
188
                self._formatTime(self._benchmarkTime),
 
189
                self._elapsedTestTimeString())
 
190
        else:
 
191
            return "      %s" % self._elapsedTestTimeString()
 
192
 
 
193
    def _formatTime(self, seconds):
 
194
        """Format seconds as milliseconds with leading spaces."""
 
195
        return "%5dms" % (1000 * seconds)
 
196
 
 
197
    def _shortened_test_description(self, test):
 
198
        what = test.id()
 
199
        what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
 
200
        return what
 
201
 
 
202
    def startTest(self, test):
 
203
        unittest.TestResult.startTest(self, test)
 
204
        self.report_test_start(test)
 
205
        self._recordTestStartTime()
 
206
 
 
207
    def _recordTestStartTime(self):
 
208
        """Record that a test has started."""
 
209
        self._start_time = time.time()
 
210
 
 
211
    def addError(self, test, err):
 
212
        if isinstance(err[1], TestSkipped):
 
213
            return self.addSkipped(test, err)    
 
214
        unittest.TestResult.addError(self, test, err)
 
215
        # We can only do this if we have one of our TestCases, not if
 
216
        # we have a doctest.
 
217
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
218
        if setKeepLogfile is not None:
 
219
            setKeepLogfile()
 
220
        self.extractBenchmarkTime(test)
 
221
        self.report_error(test, err)
 
222
        if self.stop_early:
 
223
            self.stop()
 
224
 
 
225
    def addFailure(self, test, err):
 
226
        unittest.TestResult.addFailure(self, test, err)
 
227
        # We can only do this if we have one of our TestCases, not if
 
228
        # we have a doctest.
 
229
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
230
        if setKeepLogfile is not None:
 
231
            setKeepLogfile()
 
232
        self.extractBenchmarkTime(test)
 
233
        self.report_failure(test, err)
 
234
        if self.stop_early:
 
235
            self.stop()
 
236
 
 
237
    def addSuccess(self, test):
 
238
        self.extractBenchmarkTime(test)
 
239
        if self._bench_history is not None:
 
240
            if self._benchmarkTime is not None:
 
241
                self._bench_history.write("%s %s\n" % (
 
242
                    self._formatTime(self._benchmarkTime),
 
243
                    test.id()))
 
244
        self.report_success(test)
 
245
        unittest.TestResult.addSuccess(self, test)
 
246
 
 
247
    def addSkipped(self, test, skip_excinfo):
 
248
        self.extractBenchmarkTime(test)
 
249
        self.report_skip(test, skip_excinfo)
 
250
        # seems best to treat this as success from point-of-view of unittest
 
251
        # -- it actually does nothing so it barely matters :)
 
252
        try:
 
253
            test.tearDown()
 
254
        except KeyboardInterrupt:
 
255
            raise
 
256
        except:
 
257
            self.addError(test, test.__exc_info())
 
258
        else:
 
259
            unittest.TestResult.addSuccess(self, test)
 
260
 
 
261
    def printErrorList(self, flavour, errors):
 
262
        for test, err in errors:
 
263
            self.stream.writeln(self.separator1)
 
264
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
 
265
            if getattr(test, '_get_log', None) is not None:
 
266
                print >>self.stream
 
267
                print >>self.stream, \
 
268
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
 
269
                print >>self.stream, test._get_log()
 
270
                print >>self.stream, \
 
271
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
 
272
            self.stream.writeln(self.separator2)
 
273
            self.stream.writeln("%s" % err)
 
274
 
 
275
    def finished(self):
 
276
        pass
 
277
 
 
278
    def report_cleaning_up(self):
 
279
        pass
 
280
 
 
281
    def report_success(self, test):
 
282
        pass
 
283
 
 
284
 
 
285
class TextTestResult(ExtendedTestResult):
 
286
    """Displays progress and results of tests in text form"""
 
287
 
 
288
    def __init__(self, *args, **kw):
 
289
        ExtendedTestResult.__init__(self, *args, **kw)
 
290
        self.pb = self.ui.nested_progress_bar()
 
291
        self.pb.show_pct = False
 
292
        self.pb.show_spinner = False
 
293
        self.pb.show_eta = False, 
 
294
        self.pb.show_count = False
 
295
        self.pb.show_bar = False
 
296
 
 
297
    def report_starting(self):
 
298
        self.pb.update('[test 0/%d] starting...' % (self.num_tests))
 
299
 
 
300
    def _progress_prefix_text(self):
 
301
        a = '[%d' % self.count
 
302
        if self.num_tests is not None:
 
303
            a +='/%d' % self.num_tests
 
304
        a += ' in %ds' % (time.time() - self._overall_start_time)
 
305
        if self.error_count:
 
306
            a += ', %d errors' % self.error_count
 
307
        if self.failure_count:
 
308
            a += ', %d failed' % self.failure_count
 
309
        if self.skip_count:
 
310
            a += ', %d skipped' % self.skip_count
 
311
        a += ']'
 
312
        return a
 
313
 
 
314
    def report_test_start(self, test):
 
315
        self.count += 1
 
316
        self.pb.update(
 
317
                self._progress_prefix_text()
 
318
                + ' ' 
 
319
                + self._shortened_test_description(test))
 
320
 
 
321
    def report_error(self, test, err):
 
322
        self.error_count += 1
 
323
        self.pb.note('ERROR: %s\n    %s\n' % (
 
324
            self._shortened_test_description(test),
 
325
            err[1],
 
326
            ))
 
327
 
 
328
    def report_failure(self, test, err):
 
329
        self.failure_count += 1
 
330
        self.pb.note('FAIL: %s\n    %s\n' % (
 
331
            self._shortened_test_description(test),
 
332
            err[1],
 
333
            ))
 
334
 
 
335
    def report_skip(self, test, skip_excinfo):
 
336
        self.skip_count += 1
 
337
        if False:
 
338
            # at the moment these are mostly not things we can fix
 
339
            # and so they just produce stipple; use the verbose reporter
 
340
            # to see them.
 
341
            if False:
 
342
                # show test and reason for skip
 
343
                self.pb.note('SKIP: %s\n    %s\n' % (
 
344
                    self._shortened_test_description(test),
 
345
                    skip_excinfo[1]))
 
346
            else:
 
347
                # since the class name was left behind in the still-visible
 
348
                # progress bar...
 
349
                self.pb.note('SKIP: %s' % (skip_excinfo[1]))
 
350
 
 
351
    def report_cleaning_up(self):
 
352
        self.pb.update('cleaning up...')
 
353
 
 
354
    def finished(self):
 
355
        self.pb.finished()
 
356
 
 
357
 
 
358
class VerboseTestResult(ExtendedTestResult):
 
359
    """Produce long output, with one line per test run plus times"""
 
360
 
 
361
    def _ellipsize_to_right(self, a_string, final_width):
 
362
        """Truncate and pad a string, keeping the right hand side"""
 
363
        if len(a_string) > final_width:
 
364
            result = '...' + a_string[3-final_width:]
 
365
        else:
 
366
            result = a_string
 
367
        return result.ljust(final_width)
 
368
 
 
369
    def report_starting(self):
 
370
        self.stream.write('running %d tests...\n' % self.num_tests)
 
371
 
 
372
    def report_test_start(self, test):
 
373
        self.count += 1
 
374
        name = self._shortened_test_description(test)
 
375
        self.stream.write(self._ellipsize_to_right(name, 60))
 
376
        self.stream.flush()
 
377
 
 
378
    def report_error(self, test, err):
 
379
        self.error_count += 1
 
380
        self.stream.writeln('ERROR %s\n    %s' 
 
381
                % (self._testTimeString(), err[1]))
 
382
 
 
383
    def report_failure(self, test, err):
 
384
        self.failure_count += 1
 
385
        self.stream.writeln('FAIL %s\n    %s'
 
386
                % (self._testTimeString(), err[1]))
 
387
 
 
388
    def report_success(self, test):
 
389
        self.stream.writeln('   OK %s' % self._testTimeString())
 
390
        for bench_called, stats in getattr(test, '_benchcalls', []):
 
391
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
 
392
            stats.pprint(file=self.stream)
 
393
        self.stream.flush()
 
394
 
 
395
    def report_skip(self, test, skip_excinfo):
 
396
        print >>self.stream, ' SKIP %s' % self._testTimeString()
 
397
        print >>self.stream, '     %s' % skip_excinfo[1]
 
398
 
 
399
 
 
400
class TextTestRunner(object):
 
401
    stop_on_failure = False
 
402
 
 
403
    def __init__(self,
 
404
                 stream=sys.stderr,
 
405
                 descriptions=0,
 
406
                 verbosity=1,
 
407
                 keep_output=False,
 
408
                 bench_history=None):
 
409
        self.stream = unittest._WritelnDecorator(stream)
 
410
        self.descriptions = descriptions
 
411
        self.verbosity = verbosity
 
412
        self.keep_output = keep_output
 
413
        self._bench_history = bench_history
 
414
 
 
415
    def run(self, test):
 
416
        "Run the given test case or test suite."
 
417
        startTime = time.time()
 
418
        if self.verbosity == 1:
 
419
            result_class = TextTestResult
 
420
        elif self.verbosity >= 2:
 
421
            result_class = VerboseTestResult
 
422
        result = result_class(self.stream,
 
423
                              self.descriptions,
 
424
                              self.verbosity,
 
425
                              bench_history=self._bench_history,
 
426
                              num_tests=test.countTestCases(),
 
427
                              )
 
428
        result.stop_early = self.stop_on_failure
 
429
        result.report_starting()
 
430
        test.run(result)
 
431
        stopTime = time.time()
 
432
        timeTaken = stopTime - startTime
 
433
        result.printErrors()
 
434
        self.stream.writeln(result.separator2)
 
435
        run = result.testsRun
 
436
        self.stream.writeln("Ran %d test%s in %.3fs" %
 
437
                            (run, run != 1 and "s" or "", timeTaken))
 
438
        self.stream.writeln()
 
439
        if not result.wasSuccessful():
 
440
            self.stream.write("FAILED (")
 
441
            failed, errored = map(len, (result.failures, result.errors))
 
442
            if failed:
 
443
                self.stream.write("failures=%d" % failed)
 
444
            if errored:
 
445
                if failed: self.stream.write(", ")
 
446
                self.stream.write("errors=%d" % errored)
 
447
            self.stream.writeln(")")
 
448
        else:
 
449
            self.stream.writeln("OK")
 
450
        result.report_cleaning_up()
 
451
        # This is still a little bogus, 
 
452
        # but only a little. Folk not using our testrunner will
 
453
        # have to delete their temp directories themselves.
 
454
        test_root = TestCaseWithMemoryTransport.TEST_ROOT
 
455
        if result.wasSuccessful() or not self.keep_output:
 
456
            if test_root is not None:
 
457
                # If LANG=C we probably have created some bogus paths
 
458
                # which rmtree(unicode) will fail to delete
 
459
                # so make sure we are using rmtree(str) to delete everything
 
460
                # except on win32, where rmtree(str) will fail
 
461
                # since it doesn't have the property of byte-stream paths
 
462
                # (they are either ascii or mbcs)
 
463
                if sys.platform == 'win32':
 
464
                    # make sure we are using the unicode win32 api
 
465
                    test_root = unicode(test_root)
 
466
                else:
 
467
                    test_root = test_root.encode(
 
468
                        sys.getfilesystemencoding())
 
469
                osutils.rmtree(test_root)
 
470
        else:
 
471
            note("Failed tests working directories are in '%s'\n", test_root)
 
472
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
473
        result.finished()
 
474
        return result
 
475
 
 
476
 
 
477
def iter_suite_tests(suite):
 
478
    """Return all tests in a suite, recursing through nested suites"""
 
479
    for item in suite._tests:
 
480
        if isinstance(item, unittest.TestCase):
 
481
            yield item
 
482
        elif isinstance(item, unittest.TestSuite):
 
483
            for r in iter_suite_tests(item):
 
484
                yield r
 
485
        else:
 
486
            raise Exception('unknown object %r inside test suite %r'
 
487
                            % (item, suite))
 
488
 
 
489
 
 
490
class TestSkipped(Exception):
 
491
    """Indicates that a test was intentionally skipped, rather than failing."""
 
492
 
 
493
 
 
494
class CommandFailed(Exception):
 
495
    pass
 
496
 
 
497
 
 
498
class StringIOWrapper(object):
 
499
    """A wrapper around cStringIO which just adds an encoding attribute.
 
500
    
 
501
    Internally we can check sys.stdout to see what the output encoding
 
502
    should be. However, cStringIO has no encoding attribute that we can
 
503
    set. So we wrap it instead.
 
504
    """
 
505
    encoding='ascii'
 
506
    _cstring = None
 
507
 
 
508
    def __init__(self, s=None):
 
509
        if s is not None:
 
510
            self.__dict__['_cstring'] = StringIO(s)
 
511
        else:
 
512
            self.__dict__['_cstring'] = StringIO()
 
513
 
 
514
    def __getattr__(self, name, getattr=getattr):
 
515
        return getattr(self.__dict__['_cstring'], name)
 
516
 
 
517
    def __setattr__(self, name, val):
 
518
        if name == 'encoding':
 
519
            self.__dict__['encoding'] = val
 
520
        else:
 
521
            return setattr(self._cstring, name, val)
35
522
 
36
523
 
37
524
class TestCase(unittest.TestCase):
42
529
 
43
530
    Error and debug log messages are redirected from their usual
44
531
    location into a temporary file, the contents of which can be
45
 
    retrieved by _get_log().
 
532
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
 
533
    so that it can also capture file IO.  When the test completes this file
 
534
    is read into memory and removed from disk.
46
535
       
47
536
    There are also convenience functions to invoke bzr's command-line
48
 
    routine, and to build and check bzr trees."""
49
 
 
50
 
    BZRPATH = 'bzr'
 
537
    routine, and to build and check bzr trees.
 
538
   
 
539
    In addition to the usual method of overriding tearDown(), this class also
 
540
    allows subclasses to register functions into the _cleanups list, which is
 
541
    run in order as the object is torn down.  It's less likely this will be
 
542
    accidentally overlooked.
 
543
    """
 
544
 
 
545
    _log_file_name = None
 
546
    _log_contents = ''
 
547
    _keep_log_file = False
 
548
    # record lsprof data when performing benchmark calls.
 
549
    _gather_lsprof_in_benchmarks = False
 
550
 
 
551
    def __init__(self, methodName='testMethod'):
 
552
        super(TestCase, self).__init__(methodName)
 
553
        self._cleanups = []
51
554
 
52
555
    def setUp(self):
53
 
        # this replaces the default testsweet.TestCase; we don't want logging changed
54
556
        unittest.TestCase.setUp(self)
 
557
        self._cleanEnvironment()
55
558
        bzrlib.trace.disable_default_logging()
56
 
        self._enable_file_logging()
57
 
 
58
 
 
59
 
    def _enable_file_logging(self):
 
559
        self._silenceUI()
 
560
        self._startLogFile()
 
561
        self._benchcalls = []
 
562
        self._benchtime = None
 
563
 
 
564
    def _silenceUI(self):
 
565
        """Turn off UI for duration of test"""
 
566
        # by default the UI is off; tests can turn it on if they want it.
 
567
        saved = bzrlib.ui.ui_factory
 
568
        def _restore():
 
569
            bzrlib.ui.ui_factory = saved
 
570
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
571
        self.addCleanup(_restore)
 
572
 
 
573
    def _ndiff_strings(self, a, b):
 
574
        """Return ndiff between two strings containing lines.
 
575
        
 
576
        A trailing newline is added if missing to make the strings
 
577
        print properly."""
 
578
        if b and b[-1] != '\n':
 
579
            b += '\n'
 
580
        if a and a[-1] != '\n':
 
581
            a += '\n'
 
582
        difflines = difflib.ndiff(a.splitlines(True),
 
583
                                  b.splitlines(True),
 
584
                                  linejunk=lambda x: False,
 
585
                                  charjunk=lambda x: False)
 
586
        return ''.join(difflines)
 
587
 
 
588
    def assertEqualDiff(self, a, b, message=None):
 
589
        """Assert two texts are equal, if not raise an exception.
 
590
        
 
591
        This is intended for use with multi-line strings where it can 
 
592
        be hard to find the differences by eye.
 
593
        """
 
594
        # TODO: perhaps override assertEquals to call this for strings?
 
595
        if a == b:
 
596
            return
 
597
        if message is None:
 
598
            message = "texts not equal:\n"
 
599
        raise AssertionError(message + 
 
600
                             self._ndiff_strings(a, b))      
 
601
        
 
602
    def assertEqualMode(self, mode, mode_test):
 
603
        self.assertEqual(mode, mode_test,
 
604
                         'mode mismatch %o != %o' % (mode, mode_test))
 
605
 
 
606
    def assertStartsWith(self, s, prefix):
 
607
        if not s.startswith(prefix):
 
608
            raise AssertionError('string %r does not start with %r' % (s, prefix))
 
609
 
 
610
    def assertEndsWith(self, s, suffix):
 
611
        """Asserts that s ends with suffix."""
 
612
        if not s.endswith(suffix):
 
613
            raise AssertionError('string %r does not end with %r' % (s, suffix))
 
614
 
 
615
    def assertContainsRe(self, haystack, needle_re):
 
616
        """Assert that a contains something matching a regular expression."""
 
617
        if not re.search(needle_re, haystack):
 
618
            raise AssertionError('pattern "%s" not found in "%s"'
 
619
                    % (needle_re, haystack))
 
620
 
 
621
    def assertNotContainsRe(self, haystack, needle_re):
 
622
        """Assert that a does not match a regular expression"""
 
623
        if re.search(needle_re, haystack):
 
624
            raise AssertionError('pattern "%s" found in "%s"'
 
625
                    % (needle_re, haystack))
 
626
 
 
627
    def assertSubset(self, sublist, superlist):
 
628
        """Assert that every entry in sublist is present in superlist."""
 
629
        missing = []
 
630
        for entry in sublist:
 
631
            if entry not in superlist:
 
632
                missing.append(entry)
 
633
        if len(missing) > 0:
 
634
            raise AssertionError("value(s) %r not present in container %r" % 
 
635
                                 (missing, superlist))
 
636
 
 
637
    def assertIs(self, left, right):
 
638
        if not (left is right):
 
639
            raise AssertionError("%r is not %r." % (left, right))
 
640
 
 
641
    def assertTransportMode(self, transport, path, mode):
 
642
        """Fail if a path does not have mode mode.
 
643
        
 
644
        If modes are not supported on this transport, the assertion is ignored.
 
645
        """
 
646
        if not transport._can_roundtrip_unix_modebits():
 
647
            return
 
648
        path_stat = transport.stat(path)
 
649
        actual_mode = stat.S_IMODE(path_stat.st_mode)
 
650
        self.assertEqual(mode, actual_mode,
 
651
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
652
 
 
653
    def assertIsInstance(self, obj, kls):
 
654
        """Fail if obj is not an instance of kls"""
 
655
        if not isinstance(obj, kls):
 
656
            self.fail("%r is an instance of %s rather than %s" % (
 
657
                obj, obj.__class__, kls))
 
658
 
 
659
    def _capture_warnings(self, a_callable, *args, **kwargs):
 
660
        """A helper for callDeprecated and applyDeprecated.
 
661
 
 
662
        :param a_callable: A callable to call.
 
663
        :param args: The positional arguments for the callable
 
664
        :param kwargs: The keyword arguments for the callable
 
665
        :return: A tuple (warnings, result). result is the result of calling
 
666
            a_callable(*args, **kwargs).
 
667
        """
 
668
        local_warnings = []
 
669
        def capture_warnings(msg, cls=None, stacklevel=None):
 
670
            # we've hooked into a deprecation specific callpath,
 
671
            # only deprecations should getting sent via it.
 
672
            self.assertEqual(cls, DeprecationWarning)
 
673
            local_warnings.append(msg)
 
674
        original_warning_method = symbol_versioning.warn
 
675
        symbol_versioning.set_warning_method(capture_warnings)
 
676
        try:
 
677
            result = a_callable(*args, **kwargs)
 
678
        finally:
 
679
            symbol_versioning.set_warning_method(original_warning_method)
 
680
        return (local_warnings, result)
 
681
 
 
682
    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
 
683
        """Call a deprecated callable without warning the user.
 
684
 
 
685
        :param deprecation_format: The deprecation format that the callable
 
686
            should have been deprecated with. This is the same type as the 
 
687
            parameter to deprecated_method/deprecated_function. If the 
 
688
            callable is not deprecated with this format, an assertion error
 
689
            will be raised.
 
690
        :param a_callable: A callable to call. This may be a bound method or
 
691
            a regular function. It will be called with *args and **kwargs.
 
692
        :param args: The positional arguments for the callable
 
693
        :param kwargs: The keyword arguments for the callable
 
694
        :return: The result of a_callable(*args, **kwargs)
 
695
        """
 
696
        call_warnings, result = self._capture_warnings(a_callable,
 
697
            *args, **kwargs)
 
698
        expected_first_warning = symbol_versioning.deprecation_string(
 
699
            a_callable, deprecation_format)
 
700
        if len(call_warnings) == 0:
 
701
            self.fail("No assertion generated by call to %s" %
 
702
                a_callable)
 
703
        self.assertEqual(expected_first_warning, call_warnings[0])
 
704
        return result
 
705
 
 
706
    def callDeprecated(self, expected, callable, *args, **kwargs):
 
707
        """Assert that a callable is deprecated in a particular way.
 
708
 
 
709
        This is a very precise test for unusual requirements. The 
 
710
        applyDeprecated helper function is probably more suited for most tests
 
711
        as it allows you to simply specify the deprecation format being used
 
712
        and will ensure that that is issued for the function being called.
 
713
 
 
714
        :param expected: a list of the deprecation warnings expected, in order
 
715
        :param callable: The callable to call
 
716
        :param args: The positional arguments for the callable
 
717
        :param kwargs: The keyword arguments for the callable
 
718
        """
 
719
        call_warnings, result = self._capture_warnings(callable,
 
720
            *args, **kwargs)
 
721
        self.assertEqual(expected, call_warnings)
 
722
        return result
 
723
 
 
724
    def _startLogFile(self):
 
725
        """Send bzr and test log messages to a temporary file.
 
726
 
 
727
        The file is removed as the test is torn down.
 
728
        """
60
729
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
61
 
 
62
730
        self._log_file = os.fdopen(fileno, 'w+')
63
 
 
64
 
        hdlr = logging.StreamHandler(self._log_file)
65
 
        hdlr.setLevel(logging.DEBUG)
66
 
        hdlr.setFormatter(logging.Formatter('%(levelname)4.4s  %(message)s'))
67
 
        logging.getLogger('').addHandler(hdlr)
68
 
        logging.getLogger('').setLevel(logging.DEBUG)
69
 
        self._log_hdlr = hdlr
70
 
        debug('opened log file %s', name)
71
 
        
 
731
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
72
732
        self._log_file_name = name
73
 
 
74
 
        
75
 
    def tearDown(self):
76
 
        logging.getLogger('').removeHandler(self._log_hdlr)
77
 
        bzrlib.trace.enable_default_logging()
78
 
        logging.debug('%s teardown', self.id())
 
733
        self.addCleanup(self._finishLogFile)
 
734
 
 
735
    def _finishLogFile(self):
 
736
        """Finished with the log file.
 
737
 
 
738
        Close the file and delete it, unless setKeepLogfile was called.
 
739
        """
 
740
        if self._log_file is None:
 
741
            return
 
742
        bzrlib.trace.disable_test_log(self._log_nonce)
79
743
        self._log_file.close()
 
744
        self._log_file = None
 
745
        if not self._keep_log_file:
 
746
            os.remove(self._log_file_name)
 
747
            self._log_file_name = None
 
748
 
 
749
    def setKeepLogfile(self):
 
750
        """Make the logfile not be deleted when _finishLogFile is called."""
 
751
        self._keep_log_file = True
 
752
 
 
753
    def addCleanup(self, callable):
 
754
        """Arrange to run a callable when this case is torn down.
 
755
 
 
756
        Callables are run in the reverse of the order they are registered, 
 
757
        ie last-in first-out.
 
758
        """
 
759
        if callable in self._cleanups:
 
760
            raise ValueError("cleanup function %r already registered on %s" 
 
761
                    % (callable, self))
 
762
        self._cleanups.append(callable)
 
763
 
 
764
    def _cleanEnvironment(self):
 
765
        new_env = {
 
766
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
 
767
            'HOME': os.getcwd(),
 
768
            'APPDATA': os.getcwd(),
 
769
            'BZR_EMAIL': None,
 
770
            'BZREMAIL': None, # may still be present in the environment
 
771
            'EMAIL': None,
 
772
            'BZR_PROGRESS_BAR': None,
 
773
        }
 
774
        self.__old_env = {}
 
775
        self.addCleanup(self._restoreEnvironment)
 
776
        for name, value in new_env.iteritems():
 
777
            self._captureVar(name, value)
 
778
 
 
779
    def _captureVar(self, name, newvalue):
 
780
        """Set an environment variable, and reset it when finished."""
 
781
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
 
782
 
 
783
    def _restoreEnvironment(self):
 
784
        for name, value in self.__old_env.iteritems():
 
785
            osutils.set_or_unset_env(name, value)
 
786
 
 
787
    def tearDown(self):
 
788
        self._runCleanups()
80
789
        unittest.TestCase.tearDown(self)
81
790
 
 
791
    def time(self, callable, *args, **kwargs):
 
792
        """Run callable and accrue the time it takes to the benchmark time.
 
793
        
 
794
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
 
795
        this will cause lsprofile statistics to be gathered and stored in
 
796
        self._benchcalls.
 
797
        """
 
798
        if self._benchtime is None:
 
799
            self._benchtime = 0
 
800
        start = time.time()
 
801
        try:
 
802
            if not self._gather_lsprof_in_benchmarks:
 
803
                return callable(*args, **kwargs)
 
804
            else:
 
805
                # record this benchmark
 
806
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
 
807
                stats.sort()
 
808
                self._benchcalls.append(((callable, args, kwargs), stats))
 
809
                return ret
 
810
        finally:
 
811
            self._benchtime += time.time() - start
 
812
 
 
813
    def _runCleanups(self):
 
814
        """Run registered cleanup functions. 
 
815
 
 
816
        This should only be called from TestCase.tearDown.
 
817
        """
 
818
        # TODO: Perhaps this should keep running cleanups even if 
 
819
        # one of them fails?
 
820
        for cleanup_fn in reversed(self._cleanups):
 
821
            cleanup_fn()
82
822
 
83
823
    def log(self, *args):
84
 
        logging.debug(*args)
85
 
 
86
 
    def _get_log(self):
87
 
        """Return as a string the log for this test"""
88
 
        return open(self._log_file_name).read()
 
824
        mutter(*args)
 
825
 
 
826
    def _get_log(self, keep_log_file=False):
 
827
        """Return as a string the log for this test. If the file is still
 
828
        on disk and keep_log_file=False, delete the log file and store the
 
829
        content in self._log_contents."""
 
830
        # flush the log file, to get all content
 
831
        import bzrlib.trace
 
832
        bzrlib.trace._trace_file.flush()
 
833
        if self._log_contents:
 
834
            return self._log_contents
 
835
        if self._log_file_name is not None:
 
836
            logfile = open(self._log_file_name)
 
837
            try:
 
838
                log_contents = logfile.read()
 
839
            finally:
 
840
                logfile.close()
 
841
            if not keep_log_file:
 
842
                self._log_contents = log_contents
 
843
                os.remove(self._log_file_name)
 
844
            return log_contents
 
845
        else:
 
846
            return "DELETED log file to reduce memory footprint"
 
847
 
 
848
    def capture(self, cmd, retcode=0):
 
849
        """Shortcut that splits cmd into words, runs, and returns stdout"""
 
850
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
 
851
 
 
852
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
853
                         working_dir=None):
 
854
        """Invoke bzr and return (stdout, stderr).
 
855
 
 
856
        Useful for code that wants to check the contents of the
 
857
        output, the way error messages are presented, etc.
 
858
 
 
859
        This should be the main method for tests that want to exercise the
 
860
        overall behavior of the bzr application (rather than a unit test
 
861
        or a functional test of the library.)
 
862
 
 
863
        Much of the old code runs bzr by forking a new copy of Python, but
 
864
        that is slower, harder to debug, and generally not necessary.
 
865
 
 
866
        This runs bzr through the interface that catches and reports
 
867
        errors, and with logging set to something approximating the
 
868
        default, so that error reporting can be checked.
 
869
 
 
870
        :param argv: arguments to invoke bzr
 
871
        :param retcode: expected return code, or None for don't-care.
 
872
        :param encoding: encoding for sys.stdout and sys.stderr
 
873
        :param stdin: A string to be used as stdin for the command.
 
874
        :param working_dir: Change to this directory before running
 
875
        """
 
876
        if encoding is None:
 
877
            encoding = bzrlib.user_encoding
 
878
        if stdin is not None:
 
879
            stdin = StringIO(stdin)
 
880
        stdout = StringIOWrapper()
 
881
        stderr = StringIOWrapper()
 
882
        stdout.encoding = encoding
 
883
        stderr.encoding = encoding
 
884
 
 
885
        self.log('run bzr: %r', argv)
 
886
        # FIXME: don't call into logging here
 
887
        handler = logging.StreamHandler(stderr)
 
888
        handler.setLevel(logging.INFO)
 
889
        logger = logging.getLogger('')
 
890
        logger.addHandler(handler)
 
891
        old_ui_factory = bzrlib.ui.ui_factory
 
892
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
 
893
            stdout=stdout,
 
894
            stderr=stderr)
 
895
        bzrlib.ui.ui_factory.stdin = stdin
 
896
 
 
897
        cwd = None
 
898
        if working_dir is not None:
 
899
            cwd = osutils.getcwd()
 
900
            os.chdir(working_dir)
 
901
 
 
902
        try:
 
903
            result = self.apply_redirected(stdin, stdout, stderr,
 
904
                                           bzrlib.commands.run_bzr_catch_errors,
 
905
                                           argv)
 
906
        finally:
 
907
            logger.removeHandler(handler)
 
908
            bzrlib.ui.ui_factory = old_ui_factory
 
909
            if cwd is not None:
 
910
                os.chdir(cwd)
 
911
 
 
912
        out = stdout.getvalue()
 
913
        err = stderr.getvalue()
 
914
        if out:
 
915
            self.log('output:\n%r', out)
 
916
        if err:
 
917
            self.log('errors:\n%r', err)
 
918
        if retcode is not None:
 
919
            self.assertEquals(retcode, result)
 
920
        return out, err
89
921
 
90
922
    def run_bzr(self, *args, **kwargs):
91
923
        """Invoke bzr, as if it were run from the command line.
94
926
        overall behavior of the bzr application (rather than a unit test
95
927
        or a functional test of the library.)
96
928
 
97
 
        Much of the old code runs bzr by forking a new copy of Python, but
98
 
        that is slower, harder to debug, and generally not necessary.
 
929
        This sends the stdout/stderr results into the test's log,
 
930
        where it may be useful for debugging.  See also run_captured.
 
931
 
 
932
        :param stdin: A string to be used as stdin for the command.
99
933
        """
100
 
        retcode = kwargs.get('retcode', 0)
101
 
        result = self.apply_redirected(None, None, None,
102
 
                                       bzrlib.commands.run_bzr, args)
103
 
        self.assertEquals(result, retcode)
 
934
        retcode = kwargs.pop('retcode', 0)
 
935
        encoding = kwargs.pop('encoding', None)
 
936
        stdin = kwargs.pop('stdin', None)
 
937
        working_dir = kwargs.pop('working_dir', None)
 
938
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
 
939
                                     stdin=stdin, working_dir=working_dir)
 
940
 
 
941
    def run_bzr_decode(self, *args, **kwargs):
 
942
        if 'encoding' in kwargs:
 
943
            encoding = kwargs['encoding']
 
944
        else:
 
945
            encoding = bzrlib.user_encoding
 
946
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
 
947
 
 
948
    def run_bzr_error(self, error_regexes, *args, **kwargs):
 
949
        """Run bzr, and check that stderr contains the supplied regexes
104
950
        
 
951
        :param error_regexes: Sequence of regular expressions which 
 
952
            must each be found in the error output. The relative ordering
 
953
            is not enforced.
 
954
        :param args: command-line arguments for bzr
 
955
        :param kwargs: Keyword arguments which are interpreted by run_bzr
 
956
            This function changes the default value of retcode to be 3,
 
957
            since in most cases this is run when you expect bzr to fail.
 
958
        :return: (out, err) The actual output of running the command (in case you
 
959
                 want to do more inspection)
 
960
 
 
961
        Examples of use:
 
962
            # Make sure that commit is failing because there is nothing to do
 
963
            self.run_bzr_error(['no changes to commit'],
 
964
                               'commit', '-m', 'my commit comment')
 
965
            # Make sure --strict is handling an unknown file, rather than
 
966
            # giving us the 'nothing to do' error
 
967
            self.build_tree(['unknown'])
 
968
            self.run_bzr_error(['Commit refused because there are unknown files'],
 
969
                               'commit', '--strict', '-m', 'my commit comment')
 
970
        """
 
971
        kwargs.setdefault('retcode', 3)
 
972
        out, err = self.run_bzr(*args, **kwargs)
 
973
        for regex in error_regexes:
 
974
            self.assertContainsRe(err, regex)
 
975
        return out, err
 
976
 
 
977
    def run_bzr_subprocess(self, *args, **kwargs):
 
978
        """Run bzr in a subprocess for testing.
 
979
 
 
980
        This starts a new Python interpreter and runs bzr in there. 
 
981
        This should only be used for tests that have a justifiable need for
 
982
        this isolation: e.g. they are testing startup time, or signal
 
983
        handling, or early startup code, etc.  Subprocess code can't be 
 
984
        profiled or debugged so easily.
 
985
 
 
986
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
987
            None is supplied, the status code is not checked.
 
988
        :param env_changes: A dictionary which lists changes to environment
 
989
            variables. A value of None will unset the env variable.
 
990
            The values must be strings. The change will only occur in the
 
991
            child, so you don't need to fix the environment after running.
 
992
        :param universal_newlines: Convert CRLF => LF
 
993
        :param allow_plugins: By default the subprocess is run with
 
994
            --no-plugins to ensure test reproducibility. Also, it is possible
 
995
            for system-wide plugins to create unexpected output on stderr,
 
996
            which can cause unnecessary test failures.
 
997
        """
 
998
        env_changes = kwargs.get('env_changes', {})
 
999
        working_dir = kwargs.get('working_dir', None)
 
1000
        allow_plugins = kwargs.get('allow_plugins', False)
 
1001
        process = self.start_bzr_subprocess(args, env_changes=env_changes,
 
1002
                                            working_dir=working_dir,
 
1003
                                            allow_plugins=allow_plugins)
 
1004
        # We distinguish between retcode=None and retcode not passed.
 
1005
        supplied_retcode = kwargs.get('retcode', 0)
 
1006
        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
 
1007
            universal_newlines=kwargs.get('universal_newlines', False),
 
1008
            process_args=args)
 
1009
 
 
1010
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
1011
                             skip_if_plan_to_signal=False,
 
1012
                             working_dir=None,
 
1013
                             allow_plugins=False):
 
1014
        """Start bzr in a subprocess for testing.
 
1015
 
 
1016
        This starts a new Python interpreter and runs bzr in there.
 
1017
        This should only be used for tests that have a justifiable need for
 
1018
        this isolation: e.g. they are testing startup time, or signal
 
1019
        handling, or early startup code, etc.  Subprocess code can't be
 
1020
        profiled or debugged so easily.
 
1021
 
 
1022
        :param process_args: a list of arguments to pass to the bzr executable,
 
1023
            for example `['--version']`.
 
1024
        :param env_changes: A dictionary which lists changes to environment
 
1025
            variables. A value of None will unset the env variable.
 
1026
            The values must be strings. The change will only occur in the
 
1027
            child, so you don't need to fix the environment after running.
 
1028
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1029
            is not available.
 
1030
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
 
1031
 
 
1032
        :returns: Popen object for the started process.
 
1033
        """
 
1034
        if skip_if_plan_to_signal:
 
1035
            if not getattr(os, 'kill', None):
 
1036
                raise TestSkipped("os.kill not available.")
 
1037
 
 
1038
        if env_changes is None:
 
1039
            env_changes = {}
 
1040
        old_env = {}
 
1041
 
 
1042
        def cleanup_environment():
 
1043
            for env_var, value in env_changes.iteritems():
 
1044
                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
 
1045
 
 
1046
        def restore_environment():
 
1047
            for env_var, value in old_env.iteritems():
 
1048
                osutils.set_or_unset_env(env_var, value)
 
1049
 
 
1050
        bzr_path = self.get_bzr_path()
 
1051
 
 
1052
        cwd = None
 
1053
        if working_dir is not None:
 
1054
            cwd = osutils.getcwd()
 
1055
            os.chdir(working_dir)
 
1056
 
 
1057
        try:
 
1058
            # win32 subprocess doesn't support preexec_fn
 
1059
            # so we will avoid using it on all platforms, just to
 
1060
            # make sure the code path is used, and we don't break on win32
 
1061
            cleanup_environment()
 
1062
            command = [sys.executable, bzr_path]
 
1063
            if not allow_plugins:
 
1064
                command.append('--no-plugins')
 
1065
            command.extend(process_args)
 
1066
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
1067
        finally:
 
1068
            restore_environment()
 
1069
            if cwd is not None:
 
1070
                os.chdir(cwd)
 
1071
 
 
1072
        return process
 
1073
 
 
1074
    def _popen(self, *args, **kwargs):
 
1075
        """Place a call to Popen.
 
1076
 
 
1077
        Allows tests to override this method to intercept the calls made to
 
1078
        Popen for introspection.
 
1079
        """
 
1080
        return Popen(*args, **kwargs)
 
1081
 
 
1082
    def get_bzr_path(self):
 
1083
        """Return the path of the 'bzr' executable for this test suite."""
 
1084
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
1085
        if not os.path.isfile(bzr_path):
 
1086
            # We are probably installed. Assume sys.argv is the right file
 
1087
            bzr_path = sys.argv[0]
 
1088
        return bzr_path
 
1089
 
 
1090
    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
 
1091
                              universal_newlines=False, process_args=None):
 
1092
        """Finish the execution of process.
 
1093
 
 
1094
        :param process: the Popen object returned from start_bzr_subprocess.
 
1095
        :param retcode: The status code that is expected.  Defaults to 0.  If
 
1096
            None is supplied, the status code is not checked.
 
1097
        :param send_signal: an optional signal to send to the process.
 
1098
        :param universal_newlines: Convert CRLF => LF
 
1099
        :returns: (stdout, stderr)
 
1100
        """
 
1101
        if send_signal is not None:
 
1102
            os.kill(process.pid, send_signal)
 
1103
        out, err = process.communicate()
 
1104
 
 
1105
        if universal_newlines:
 
1106
            out = out.replace('\r\n', '\n')
 
1107
            err = err.replace('\r\n', '\n')
 
1108
 
 
1109
        if retcode is not None and retcode != process.returncode:
 
1110
            if process_args is None:
 
1111
                process_args = "(unknown args)"
 
1112
            mutter('Output of bzr %s:\n%s', process_args, out)
 
1113
            mutter('Error for bzr %s:\n%s', process_args, err)
 
1114
            self.fail('Command bzr %s failed with retcode %s != %s'
 
1115
                      % (process_args, retcode, process.returncode))
 
1116
        return [out, err]
 
1117
 
105
1118
    def check_inventory_shape(self, inv, shape):
106
 
        """
107
 
        Compare an inventory to a list of expected names.
 
1119
        """Compare an inventory to a list of expected names.
108
1120
 
109
1121
        Fail if they are not precisely equal.
110
1122
        """
128
1140
        """Call callable with redirected std io pipes.
129
1141
 
130
1142
        Returns the return code."""
131
 
        from StringIO import StringIO
132
1143
        if not callable(a_callable):
133
1144
            raise ValueError("a_callable must be callable.")
134
1145
        if stdin is None:
135
1146
            stdin = StringIO("")
136
1147
        if stdout is None:
137
 
            stdout = self._log_file
 
1148
            if getattr(self, "_log_file", None) is not None:
 
1149
                stdout = self._log_file
 
1150
            else:
 
1151
                stdout = StringIO()
138
1152
        if stderr is None:
139
 
            stderr = self._log_file
 
1153
            if getattr(self, "_log_file", None is not None):
 
1154
                stderr = self._log_file
 
1155
            else:
 
1156
                stderr = StringIO()
140
1157
        real_stdin = sys.stdin
141
1158
        real_stdout = sys.stdout
142
1159
        real_stderr = sys.stderr
143
 
        result = None
144
1160
        try:
145
1161
            sys.stdout = stdout
146
1162
            sys.stderr = stderr
147
1163
            sys.stdin = stdin
148
 
            result = a_callable(*args, **kwargs)
 
1164
            return a_callable(*args, **kwargs)
149
1165
        finally:
150
1166
            sys.stdout = real_stdout
151
1167
            sys.stderr = real_stderr
152
1168
            sys.stdin = real_stdin
153
 
        return result
 
1169
 
 
1170
    @symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
 
1171
    def merge(self, branch_from, wt_to):
 
1172
        """A helper for tests to do a ui-less merge.
 
1173
 
 
1174
        This should move to the main library when someone has time to integrate
 
1175
        it in.
 
1176
        """
 
1177
        # minimal ui-less merge.
 
1178
        wt_to.branch.fetch(branch_from)
 
1179
        base_rev = common_ancestor(branch_from.last_revision(),
 
1180
                                   wt_to.branch.last_revision(),
 
1181
                                   wt_to.branch.repository)
 
1182
        merge_inner(wt_to.branch, branch_from.basis_tree(),
 
1183
                    wt_to.branch.repository.revision_tree(base_rev),
 
1184
                    this_tree=wt_to)
 
1185
        wt_to.add_parent_tree_id(branch_from.last_revision())
154
1186
 
155
1187
 
156
1188
BzrTestBase = TestCase
157
1189
 
 
1190
 
 
1191
class TestCaseWithMemoryTransport(TestCase):
 
1192
    """Common test class for tests that do not need disk resources.
 
1193
 
 
1194
    Tests that need disk resources should derive from TestCaseWithTransport.
 
1195
 
 
1196
    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
 
1197
 
 
1198
    For TestCaseWithMemoryTransport the test_home_dir is set to the name of
 
1199
    a directory which does not exist. This serves to help ensure test isolation
 
1200
    is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
 
1201
    must exist. However, TestCaseWithMemoryTransport does not offer local
 
1202
    file defaults for the transport in tests, nor does it obey the command line
 
1203
    override, so tests that accidentally write to the common directory should
 
1204
    be rare.
 
1205
    """
 
1206
 
 
1207
    TEST_ROOT = None
 
1208
    _TEST_NAME = 'test'
 
1209
 
 
1210
 
 
1211
    def __init__(self, methodName='runTest'):
 
1212
        # allow test parameterisation after test construction and before test
 
1213
        # execution. Variables that the parameteriser sets need to be 
 
1214
        # ones that are not set by setUp, or setUp will trash them.
 
1215
        super(TestCaseWithMemoryTransport, self).__init__(methodName)
 
1216
        self.transport_server = default_transport
 
1217
        self.transport_readonly_server = None
 
1218
 
 
1219
    def failUnlessExists(self, path):
 
1220
        """Fail unless path, which may be abs or relative, exists."""
 
1221
        self.failUnless(osutils.lexists(path))
 
1222
 
 
1223
    def failIfExists(self, path):
 
1224
        """Fail if path, which may be abs or relative, exists."""
 
1225
        self.failIf(osutils.lexists(path))
 
1226
        
 
1227
    def get_transport(self):
 
1228
        """Return a writeable transport for the test scratch space"""
 
1229
        t = get_transport(self.get_url())
 
1230
        self.assertFalse(t.is_readonly())
 
1231
        return t
 
1232
 
 
1233
    def get_readonly_transport(self):
 
1234
        """Return a readonly transport for the test scratch space
 
1235
        
 
1236
        This can be used to test that operations which should only need
 
1237
        readonly access in fact do not try to write.
 
1238
        """
 
1239
        t = get_transport(self.get_readonly_url())
 
1240
        self.assertTrue(t.is_readonly())
 
1241
        return t
 
1242
 
 
1243
    def get_readonly_server(self):
 
1244
        """Get the server instance for the readonly transport
 
1245
 
 
1246
        This is useful for some tests with specific servers to do diagnostics.
 
1247
        """
 
1248
        if self.__readonly_server is None:
 
1249
            if self.transport_readonly_server is None:
 
1250
                # readonly decorator requested
 
1251
                # bring up the server
 
1252
                self.get_url()
 
1253
                self.__readonly_server = ReadonlyServer()
 
1254
                self.__readonly_server.setUp(self.__server)
 
1255
            else:
 
1256
                self.__readonly_server = self.transport_readonly_server()
 
1257
                self.__readonly_server.setUp()
 
1258
            self.addCleanup(self.__readonly_server.tearDown)
 
1259
        return self.__readonly_server
 
1260
 
 
1261
    def get_readonly_url(self, relpath=None):
 
1262
        """Get a URL for the readonly transport.
 
1263
 
 
1264
        This will either be backed by '.' or a decorator to the transport 
 
1265
        used by self.get_url()
 
1266
        relpath provides for clients to get a path relative to the base url.
 
1267
        These should only be downwards relative, not upwards.
 
1268
        """
 
1269
        base = self.get_readonly_server().get_url()
 
1270
        if relpath is not None:
 
1271
            if not base.endswith('/'):
 
1272
                base = base + '/'
 
1273
            base = base + relpath
 
1274
        return base
 
1275
 
 
1276
    def get_server(self):
 
1277
        """Get the read/write server instance.
 
1278
 
 
1279
        This is useful for some tests with specific servers that need
 
1280
        diagnostics.
 
1281
 
 
1282
        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
 
1283
        is no means to override it.
 
1284
        """
 
1285
        if self.__server is None:
 
1286
            self.__server = MemoryServer()
 
1287
            self.__server.setUp()
 
1288
            self.addCleanup(self.__server.tearDown)
 
1289
        return self.__server
 
1290
 
 
1291
    def get_url(self, relpath=None):
 
1292
        """Get a URL (or maybe a path) for the readwrite transport.
 
1293
 
 
1294
        This will either be backed by '.' or to an equivalent non-file based
 
1295
        facility.
 
1296
        relpath provides for clients to get a path relative to the base url.
 
1297
        These should only be downwards relative, not upwards.
 
1298
        """
 
1299
        base = self.get_server().get_url()
 
1300
        if relpath is not None and relpath != '.':
 
1301
            if not base.endswith('/'):
 
1302
                base = base + '/'
 
1303
            # XXX: Really base should be a url; we did after all call
 
1304
            # get_url()!  But sometimes it's just a path (from
 
1305
            # LocalAbspathServer), and it'd be wrong to append urlescaped data
 
1306
            # to a non-escaped local path.
 
1307
            if base.startswith('./') or base.startswith('/'):
 
1308
                base += relpath
 
1309
            else:
 
1310
                base += urlutils.escape(relpath)
 
1311
        return base
 
1312
 
 
1313
    def _make_test_root(self):
 
1314
        if TestCaseWithMemoryTransport.TEST_ROOT is not None:
 
1315
            return
 
1316
        i = 0
 
1317
        while True:
 
1318
            root = u'test%04d.tmp' % i
 
1319
            try:
 
1320
                os.mkdir(root)
 
1321
            except OSError, e:
 
1322
                if e.errno == errno.EEXIST:
 
1323
                    i += 1
 
1324
                    continue
 
1325
                else:
 
1326
                    raise
 
1327
            # successfully created
 
1328
            TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
 
1329
            break
 
1330
        # make a fake bzr directory there to prevent any tests propagating
 
1331
        # up onto the source directory's real branch
 
1332
        bzrdir.BzrDir.create_standalone_workingtree(
 
1333
            TestCaseWithMemoryTransport.TEST_ROOT)
 
1334
 
 
1335
    def makeAndChdirToTestDir(self):
 
1336
        """Create a temporary directories for this one test.
 
1337
        
 
1338
        This must set self.test_home_dir and self.test_dir and chdir to
 
1339
        self.test_dir.
 
1340
        
 
1341
        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
 
1342
        """
 
1343
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
 
1344
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
 
1345
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
1346
        
 
1347
    def make_branch(self, relpath, format=None):
 
1348
        """Create a branch on the transport at relpath."""
 
1349
        repo = self.make_repository(relpath, format=format)
 
1350
        return repo.bzrdir.create_branch()
 
1351
 
 
1352
    def make_bzrdir(self, relpath, format=None):
 
1353
        try:
 
1354
            # might be a relative or absolute path
 
1355
            maybe_a_url = self.get_url(relpath)
 
1356
            segments = maybe_a_url.rsplit('/', 1)
 
1357
            t = get_transport(maybe_a_url)
 
1358
            if len(segments) > 1 and segments[-1] not in ('', '.'):
 
1359
                try:
 
1360
                    t.mkdir('.')
 
1361
                except errors.FileExists:
 
1362
                    pass
 
1363
            if format is None:
 
1364
                format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
 
1365
            return format.initialize_on_transport(t)
 
1366
        except errors.UninitializableFormat:
 
1367
            raise TestSkipped("Format %s is not initializable." % format)
 
1368
 
 
1369
    def make_repository(self, relpath, shared=False, format=None):
 
1370
        """Create a repository on our default transport at relpath."""
 
1371
        made_control = self.make_bzrdir(relpath, format=format)
 
1372
        return made_control.create_repository(shared=shared)
 
1373
 
 
1374
    def make_branch_and_memory_tree(self, relpath, format=None):
 
1375
        """Create a branch on the default transport and a MemoryTree for it."""
 
1376
        b = self.make_branch(relpath, format=format)
 
1377
        return memorytree.MemoryTree.create_on_branch(b)
 
1378
 
 
1379
    def overrideEnvironmentForTesting(self):
 
1380
        os.environ['HOME'] = self.test_home_dir
 
1381
        os.environ['APPDATA'] = self.test_home_dir
 
1382
        
 
1383
    def setUp(self):
 
1384
        super(TestCaseWithMemoryTransport, self).setUp()
 
1385
        self._make_test_root()
 
1386
        _currentdir = os.getcwdu()
 
1387
        def _leaveDirectory():
 
1388
            os.chdir(_currentdir)
 
1389
        self.addCleanup(_leaveDirectory)
 
1390
        self.makeAndChdirToTestDir()
 
1391
        self.overrideEnvironmentForTesting()
 
1392
        self.__readonly_server = None
 
1393
        self.__server = None
 
1394
 
158
1395
     
159
 
class TestCaseInTempDir(TestCase):
 
1396
class TestCaseInTempDir(TestCaseWithMemoryTransport):
160
1397
    """Derived class that runs a test within a temporary directory.
161
1398
 
162
1399
    This is useful for tests that need to create a branch, etc.
169
1406
    InTempDir is an old alias for FunctionalTestCase.
170
1407
    """
171
1408
 
172
 
    TEST_ROOT = None
173
 
    _TEST_NAME = 'test'
174
1409
    OVERRIDE_PYTHON = 'python'
175
1410
 
176
1411
    def check_file_contents(self, filename, expect):
179
1414
        if contents != expect:
180
1415
            self.log("expected: %r" % expect)
181
1416
            self.log("actually: %r" % contents)
182
 
            self.fail("contents of %s not as expected")
183
 
 
184
 
    def _make_test_root(self):
185
 
        import os
186
 
        import shutil
187
 
        import tempfile
188
 
        
189
 
        if TestCaseInTempDir.TEST_ROOT is not None:
190
 
            return
191
 
        TestCaseInTempDir.TEST_ROOT = os.path.abspath(
192
 
                                 tempfile.mkdtemp(suffix='.tmp',
193
 
                                                  prefix=self._TEST_NAME + '-',
194
 
                                                  dir=os.curdir))
195
 
    
196
 
        # make a fake bzr directory there to prevent any tests propagating
197
 
        # up onto the source directory's real branch
198
 
        os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
199
 
 
200
 
    def setUp(self):
201
 
        super(TestCaseInTempDir, self).setUp()
202
 
        import os
203
 
        self._make_test_root()
204
 
        self._currentdir = os.getcwdu()
205
 
        self.test_dir = os.path.join(self.TEST_ROOT, self.id())
206
 
        os.mkdir(self.test_dir)
207
 
        os.chdir(self.test_dir)
208
 
        
209
 
    def tearDown(self):
210
 
        import os
211
 
        os.chdir(self._currentdir)
212
 
        super(TestCaseInTempDir, self).tearDown()
213
 
 
214
 
    def _formcmd(self, cmd):
215
 
        if isinstance(cmd, basestring):
216
 
            cmd = cmd.split()
217
 
        if cmd[0] == 'bzr':
218
 
            cmd[0] = self.BZRPATH
219
 
            if self.OVERRIDE_PYTHON:
220
 
                cmd.insert(0, self.OVERRIDE_PYTHON)
221
 
        self.log('$ %r' % cmd)
222
 
        return cmd
223
 
 
224
 
    def runcmd(self, cmd, retcode=0):
225
 
        """Run one command and check the return code.
226
 
 
227
 
        Returns a tuple of (stdout,stderr) strings.
228
 
 
229
 
        If a single string is based, it is split into words.
230
 
        For commands that are not simple space-separated words, please
231
 
        pass a list instead."""
232
 
        cmd = self._formcmd(cmd)
233
 
        self.log('$ ' + ' '.join(cmd))
234
 
        actual_retcode = subprocess.call(cmd, stdout=self._log_file,
235
 
                                         stderr=self._log_file)
236
 
        if retcode != actual_retcode:
237
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
238
 
                                % (cmd, actual_retcode, retcode))
239
 
 
240
 
    def backtick(self, cmd, retcode=0):
241
 
        """Run a command and return its output"""
242
 
        cmd = self._formcmd(cmd)
243
 
        child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=self._log_file)
244
 
        outd, errd = child.communicate()
245
 
        self.log(outd)
246
 
        actual_retcode = child.wait()
247
 
 
248
 
        outd = outd.replace('\r', '')
249
 
 
250
 
        if retcode != actual_retcode:
251
 
            raise CommandFailed("test failed: %r returned %d, expected %d"
252
 
                                % (cmd, actual_retcode, retcode))
253
 
 
254
 
        return outd
255
 
 
256
 
 
257
 
 
258
 
    def build_tree(self, shape):
 
1417
            self.fail("contents of %s not as expected" % filename)
 
1418
 
 
1419
    def makeAndChdirToTestDir(self):
 
1420
        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
 
1421
        
 
1422
        For TestCaseInTempDir we create a temporary directory based on the test
 
1423
        name and then create two subdirs - test and home under it.
 
1424
        """
 
1425
        # shorten the name, to avoid test failures due to path length
 
1426
        short_id = self.id().replace('bzrlib.tests.', '') \
 
1427
                   .replace('__main__.', '')[-100:]
 
1428
        # it's possible the same test class is run several times for
 
1429
        # parameterized tests, so make sure the names don't collide.  
 
1430
        i = 0
 
1431
        while True:
 
1432
            if i > 0:
 
1433
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
 
1434
            else:
 
1435
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
 
1436
            if os.path.exists(candidate_dir):
 
1437
                i = i + 1
 
1438
                continue
 
1439
            else:
 
1440
                os.mkdir(candidate_dir)
 
1441
                self.test_home_dir = candidate_dir + '/home'
 
1442
                os.mkdir(self.test_home_dir)
 
1443
                self.test_dir = candidate_dir + '/work'
 
1444
                os.mkdir(self.test_dir)
 
1445
                os.chdir(self.test_dir)
 
1446
                break
 
1447
 
 
1448
    def build_tree(self, shape, line_endings='native', transport=None):
259
1449
        """Build a test tree according to a pattern.
260
1450
 
261
1451
        shape is a sequence of file specifications.  If the final
262
1452
        character is '/', a directory is created.
263
1453
 
 
1454
        This assumes that all the elements in the tree being built are new.
 
1455
 
264
1456
        This doesn't add anything to a branch.
 
1457
        :param line_endings: Either 'binary' or 'native'
 
1458
                             in binary mode, exact contents are written
 
1459
                             in native mode, the line endings match the
 
1460
                             default platform endings.
 
1461
 
 
1462
        :param transport: A transport to write to, for building trees on 
 
1463
                          VFS's. If the transport is readonly or None,
 
1464
                          "." is opened automatically.
265
1465
        """
266
 
        # XXX: It's OK to just create them using forward slashes on windows?
267
 
        import os
 
1466
        # It's OK to just create them using forward slashes on windows.
 
1467
        if transport is None or transport.is_readonly():
 
1468
            transport = get_transport(".")
268
1469
        for name in shape:
269
 
            assert isinstance(name, basestring)
 
1470
            self.assert_(isinstance(name, basestring))
270
1471
            if name[-1] == '/':
271
 
                os.mkdir(name[:-1])
 
1472
                transport.mkdir(urlutils.escape(name[:-1]))
272
1473
            else:
273
 
                f = file(name, 'wt')
274
 
                print >>f, "contents of", name
275
 
                f.close()
276
 
                
277
 
 
278
 
 
279
 
class MetaTestLog(TestCase):
280
 
    def test_logging(self):
281
 
        """Test logs are captured when a test fails."""
282
 
        logging.info('an info message')
283
 
        warning('something looks dodgy...')
284
 
        logging.debug('hello, test is running')
285
 
        ##assert 0
286
 
 
287
 
 
288
 
def selftest(verbose=False, pattern=".*"):
289
 
    return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
 
1474
                if line_endings == 'binary':
 
1475
                    end = '\n'
 
1476
                elif line_endings == 'native':
 
1477
                    end = os.linesep
 
1478
                else:
 
1479
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
 
1480
                content = "contents of %s%s" % (name.encode('utf-8'), end)
 
1481
                # Technically 'put()' is the right command. However, put
 
1482
                # uses an AtomicFile, which requires an extra rename into place
 
1483
                # As long as the files didn't exist in the past, append() will
 
1484
                # do the same thing as put()
 
1485
                # On jam's machine, make_kernel_like_tree is:
 
1486
                #   put:    4.5-7.5s (averaging 6s)
 
1487
                #   append: 2.9-4.5s
 
1488
                #   put_non_atomic: 2.9-4.5s
 
1489
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
 
1490
 
 
1491
    def build_tree_contents(self, shape):
 
1492
        build_tree_contents(shape)
 
1493
 
 
1494
    def assertFileEqual(self, content, path):
 
1495
        """Fail if path does not contain 'content'."""
 
1496
        self.failUnless(osutils.lexists(path))
 
1497
        # TODO: jam 20060427 Shouldn't this be 'rb'?
 
1498
        self.assertEqualDiff(content, open(path, 'r').read())
 
1499
 
 
1500
 
 
1501
class TestCaseWithTransport(TestCaseInTempDir):
 
1502
    """A test case that provides get_url and get_readonly_url facilities.
 
1503
 
 
1504
    These back onto two transport servers, one for readonly access and one for
 
1505
    read write access.
 
1506
 
 
1507
    If no explicit class is provided for readonly access, a
 
1508
    ReadonlyTransportDecorator is used instead which allows the use of non disk
 
1509
    based read write transports.
 
1510
 
 
1511
    If an explicit class is provided for readonly access, that server and the 
 
1512
    readwrite one must both define get_url() as resolving to os.getcwd().
 
1513
    """
 
1514
 
 
1515
    def get_server(self):
 
1516
        """See TestCaseWithMemoryTransport.
 
1517
 
 
1518
        This is useful for some tests with specific servers that need
 
1519
        diagnostics.
 
1520
        """
 
1521
        if self.__server is None:
 
1522
            self.__server = self.transport_server()
 
1523
            self.__server.setUp()
 
1524
            self.addCleanup(self.__server.tearDown)
 
1525
        return self.__server
 
1526
 
 
1527
    def make_branch_and_tree(self, relpath, format=None):
 
1528
        """Create a branch on the transport and a tree locally.
 
1529
 
 
1530
        If the transport is not a LocalTransport, the Tree can't be created on
 
1531
        the transport.  In that case the working tree is created in the local
 
1532
        directory, and the returned tree's branch and repository will also be
 
1533
        accessed locally.
 
1534
 
 
1535
        This will fail if the original default transport for this test
 
1536
        case wasn't backed by the working directory, as the branch won't
 
1537
        be on disk for us to open it.  
 
1538
 
 
1539
        :param format: The BzrDirFormat.
 
1540
        :returns: the WorkingTree.
 
1541
        """
 
1542
        # TODO: always use the local disk path for the working tree,
 
1543
        # this obviously requires a format that supports branch references
 
1544
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
 
1545
        # RBC 20060208
 
1546
        b = self.make_branch(relpath, format=format)
 
1547
        try:
 
1548
            return b.bzrdir.create_workingtree()
 
1549
        except errors.NotLocalUrl:
 
1550
            # We can only make working trees locally at the moment.  If the
 
1551
            # transport can't support them, then reopen the branch on a local
 
1552
            # transport, and create the working tree there.  
 
1553
            #
 
1554
            # Possibly we should instead keep
 
1555
            # the non-disk-backed branch and create a local checkout?
 
1556
            bd = bzrdir.BzrDir.open(relpath)
 
1557
            return bd.create_workingtree()
 
1558
 
 
1559
    def assertIsDirectory(self, relpath, transport):
 
1560
        """Assert that relpath within transport is a directory.
 
1561
 
 
1562
        This may not be possible on all transports; in that case it propagates
 
1563
        a TransportNotPossible.
 
1564
        """
 
1565
        try:
 
1566
            mode = transport.stat(relpath).st_mode
 
1567
        except errors.NoSuchFile:
 
1568
            self.fail("path %s is not a directory; no such file"
 
1569
                      % (relpath))
 
1570
        if not stat.S_ISDIR(mode):
 
1571
            self.fail("path %s is not a directory; has mode %#o"
 
1572
                      % (relpath, mode))
 
1573
 
 
1574
    def setUp(self):
 
1575
        super(TestCaseWithTransport, self).setUp()
 
1576
        self.__server = None
 
1577
 
 
1578
 
 
1579
class ChrootedTestCase(TestCaseWithTransport):
 
1580
    """A support class that provides readonly urls outside the local namespace.
 
1581
 
 
1582
    This is done by checking if self.transport_server is a MemoryServer. if it
 
1583
    is then we are chrooted already, if it is not then an HttpServer is used
 
1584
    for readonly urls.
 
1585
 
 
1586
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
 
1587
                       be used without needed to redo it when a different 
 
1588
                       subclass is in use ?
 
1589
    """
 
1590
 
 
1591
    def setUp(self):
 
1592
        super(ChrootedTestCase, self).setUp()
 
1593
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
 
1594
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
 
1595
 
 
1596
 
 
1597
def filter_suite_by_re(suite, pattern):
 
1598
    result = TestUtil.TestSuite()
 
1599
    filter_re = re.compile(pattern)
 
1600
    for test in iter_suite_tests(suite):
 
1601
        if filter_re.search(test.id()):
 
1602
            result.addTest(test)
 
1603
    return result
 
1604
 
 
1605
 
 
1606
def run_suite(suite, name='test', verbose=False, pattern=".*",
 
1607
              stop_on_failure=False, keep_output=False,
 
1608
              transport=None, lsprof_timed=None, bench_history=None):
 
1609
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
 
1610
    if verbose:
 
1611
        verbosity = 2
 
1612
    else:
 
1613
        verbosity = 1
 
1614
    runner = TextTestRunner(stream=sys.stdout,
 
1615
                            descriptions=0,
 
1616
                            verbosity=verbosity,
 
1617
                            keep_output=keep_output,
 
1618
                            bench_history=bench_history)
 
1619
    runner.stop_on_failure=stop_on_failure
 
1620
    if pattern != '.*':
 
1621
        suite = filter_suite_by_re(suite, pattern)
 
1622
    result = runner.run(suite)
 
1623
    return result.wasSuccessful()
 
1624
 
 
1625
 
 
1626
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
 
1627
             keep_output=False,
 
1628
             transport=None,
 
1629
             test_suite_factory=None,
 
1630
             lsprof_timed=None,
 
1631
             bench_history=None):
 
1632
    """Run the whole test suite under the enhanced runner"""
 
1633
    # XXX: Very ugly way to do this...
 
1634
    # Disable warning about old formats because we don't want it to disturb
 
1635
    # any blackbox tests.
 
1636
    from bzrlib import repository
 
1637
    repository._deprecation_warning_done = True
 
1638
 
 
1639
    global default_transport
 
1640
    if transport is None:
 
1641
        transport = default_transport
 
1642
    old_transport = default_transport
 
1643
    default_transport = transport
 
1644
    try:
 
1645
        if test_suite_factory is None:
 
1646
            suite = test_suite()
 
1647
        else:
 
1648
            suite = test_suite_factory()
 
1649
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
 
1650
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
 
1651
                     transport=transport,
 
1652
                     lsprof_timed=lsprof_timed,
 
1653
                     bench_history=bench_history)
 
1654
    finally:
 
1655
        default_transport = old_transport
290
1656
 
291
1657
 
292
1658
def test_suite():
293
 
    from bzrlib.selftest.TestUtil import TestLoader, TestSuite
294
 
    import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
295
 
    import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
296
 
    from doctest import DocTestSuite
297
 
    import os
298
 
    import shutil
299
 
    import time
300
 
    import sys
301
 
 
302
 
    global MODULES_TO_TEST, MODULES_TO_DOCTEST
303
 
 
304
 
    testmod_names = \
305
 
                  ['bzrlib.selftest.MetaTestLog',
306
 
                   'bzrlib.selftest.testinv',
307
 
                   'bzrlib.selftest.testfetch',
308
 
                   'bzrlib.selftest.versioning',
309
 
                   'bzrlib.selftest.whitebox',
310
 
                   'bzrlib.selftest.testmerge3',
311
 
                   'bzrlib.selftest.testhashcache',
312
 
                   'bzrlib.selftest.teststatus',
313
 
                   'bzrlib.selftest.testlog',
314
 
                   'bzrlib.selftest.blackbox',
315
 
                   'bzrlib.selftest.testrevisionnamespaces',
316
 
                   'bzrlib.selftest.testbranch',
317
 
                   'bzrlib.selftest.testrevision',
318
 
                   'bzrlib.selftest.test_merge_core',
319
 
                   'bzrlib.selftest.test_smart_add',
320
 
                   'bzrlib.selftest.testdiff',
321
 
                   'bzrlib.fetch'
 
1659
    """Build and return TestSuite for the whole of bzrlib.
 
1660
    
 
1661
    This function can be replaced if you need to change the default test
 
1662
    suite on a global basis, but it is not encouraged.
 
1663
    """
 
1664
    testmod_names = [
 
1665
                   'bzrlib.tests.test_ancestry',
 
1666
                   'bzrlib.tests.test_api',
 
1667
                   'bzrlib.tests.test_atomicfile',
 
1668
                   'bzrlib.tests.test_bad_files',
 
1669
                   'bzrlib.tests.test_branch',
 
1670
                   'bzrlib.tests.test_bundle',
 
1671
                   'bzrlib.tests.test_bzrdir',
 
1672
                   'bzrlib.tests.test_cache_utf8',
 
1673
                   'bzrlib.tests.test_command',
 
1674
                   'bzrlib.tests.test_commit',
 
1675
                   'bzrlib.tests.test_commit_merge',
 
1676
                   'bzrlib.tests.test_config',
 
1677
                   'bzrlib.tests.test_conflicts',
 
1678
                   'bzrlib.tests.test_decorators',
 
1679
                   'bzrlib.tests.test_diff',
 
1680
                   'bzrlib.tests.test_doc_generate',
 
1681
                   'bzrlib.tests.test_errors',
 
1682
                   'bzrlib.tests.test_escaped_store',
 
1683
                   'bzrlib.tests.test_fetch',
 
1684
                   'bzrlib.tests.test_ftp_transport',
 
1685
                   'bzrlib.tests.test_gpg',
 
1686
                   'bzrlib.tests.test_graph',
 
1687
                   'bzrlib.tests.test_hashcache',
 
1688
                   'bzrlib.tests.test_http',
 
1689
                   'bzrlib.tests.test_http_response',
 
1690
                   'bzrlib.tests.test_identitymap',
 
1691
                   'bzrlib.tests.test_ignores',
 
1692
                   'bzrlib.tests.test_inv',
 
1693
                   'bzrlib.tests.test_knit',
 
1694
                   'bzrlib.tests.test_lazy_import',
 
1695
                   'bzrlib.tests.test_lazy_regex',
 
1696
                   'bzrlib.tests.test_lockdir',
 
1697
                   'bzrlib.tests.test_lockable_files',
 
1698
                   'bzrlib.tests.test_log',
 
1699
                   'bzrlib.tests.test_memorytree',
 
1700
                   'bzrlib.tests.test_merge',
 
1701
                   'bzrlib.tests.test_merge3',
 
1702
                   'bzrlib.tests.test_merge_core',
 
1703
                   'bzrlib.tests.test_missing',
 
1704
                   'bzrlib.tests.test_msgeditor',
 
1705
                   'bzrlib.tests.test_nonascii',
 
1706
                   'bzrlib.tests.test_options',
 
1707
                   'bzrlib.tests.test_osutils',
 
1708
                   'bzrlib.tests.test_patch',
 
1709
                   'bzrlib.tests.test_patches',
 
1710
                   'bzrlib.tests.test_permissions',
 
1711
                   'bzrlib.tests.test_plugins',
 
1712
                   'bzrlib.tests.test_progress',
 
1713
                   'bzrlib.tests.test_reconcile',
 
1714
                   'bzrlib.tests.test_registry',
 
1715
                   'bzrlib.tests.test_repository',
 
1716
                   'bzrlib.tests.test_revert',
 
1717
                   'bzrlib.tests.test_revision',
 
1718
                   'bzrlib.tests.test_revisionnamespaces',
 
1719
                   'bzrlib.tests.test_revisiontree',
 
1720
                   'bzrlib.tests.test_rio',
 
1721
                   'bzrlib.tests.test_sampler',
 
1722
                   'bzrlib.tests.test_selftest',
 
1723
                   'bzrlib.tests.test_setup',
 
1724
                   'bzrlib.tests.test_sftp_transport',
 
1725
                   'bzrlib.tests.test_smart_add',
 
1726
                   'bzrlib.tests.test_smart_transport',
 
1727
                   'bzrlib.tests.test_source',
 
1728
                   'bzrlib.tests.test_status',
 
1729
                   'bzrlib.tests.test_store',
 
1730
                   'bzrlib.tests.test_symbol_versioning',
 
1731
                   'bzrlib.tests.test_testament',
 
1732
                   'bzrlib.tests.test_textfile',
 
1733
                   'bzrlib.tests.test_textmerge',
 
1734
                   'bzrlib.tests.test_trace',
 
1735
                   'bzrlib.tests.test_transactions',
 
1736
                   'bzrlib.tests.test_transform',
 
1737
                   'bzrlib.tests.test_transport',
 
1738
                   'bzrlib.tests.test_tree',
 
1739
                   'bzrlib.tests.test_treebuilder',
 
1740
                   'bzrlib.tests.test_tsort',
 
1741
                   'bzrlib.tests.test_tuned_gzip',
 
1742
                   'bzrlib.tests.test_ui',
 
1743
                   'bzrlib.tests.test_upgrade',
 
1744
                   'bzrlib.tests.test_urlutils',
 
1745
                   'bzrlib.tests.test_versionedfile',
 
1746
                   'bzrlib.tests.test_version',
 
1747
                   'bzrlib.tests.test_version_info',
 
1748
                   'bzrlib.tests.test_weave',
 
1749
                   'bzrlib.tests.test_whitebox',
 
1750
                   'bzrlib.tests.test_workingtree',
 
1751
                   'bzrlib.tests.test_xml',
322
1752
                   ]
323
 
 
324
 
    for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
325
 
              bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
326
 
        if m not in MODULES_TO_DOCTEST:
327
 
            MODULES_TO_DOCTEST.append(m)
328
 
 
329
 
    TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
330
 
    print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
331
 
    print
332
 
    suite = TestSuite()
333
 
    suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
 
1753
    test_transport_implementations = [
 
1754
        'bzrlib.tests.test_transport_implementations',
 
1755
        'bzrlib.tests.test_read_bundle',
 
1756
        ]
 
1757
    suite = TestUtil.TestSuite()
 
1758
    loader = TestUtil.TestLoader()
 
1759
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
 
1760
    from bzrlib.transport import TransportTestProviderAdapter
 
1761
    adapter = TransportTestProviderAdapter()
 
1762
    adapt_modules(test_transport_implementations, adapter, loader, suite)
 
1763
    for package in packages_to_test():
 
1764
        suite.addTest(package.test_suite())
334
1765
    for m in MODULES_TO_TEST:
335
 
         suite.addTest(TestLoader().loadTestsFromModule(m))
336
 
    for m in (MODULES_TO_DOCTEST):
337
 
        suite.addTest(DocTestSuite(m))
338
 
    for p in bzrlib.plugin.all_plugins:
339
 
        if hasattr(p, 'test_suite'):
340
 
            suite.addTest(p.test_suite())
 
1766
        suite.addTest(loader.loadTestsFromModule(m))
 
1767
    for m in MODULES_TO_DOCTEST:
 
1768
        try:
 
1769
            suite.addTest(doctest.DocTestSuite(m))
 
1770
        except ValueError, e:
 
1771
            print '**failed to get doctest for: %s\n%s' %(m,e)
 
1772
            raise
 
1773
    for name, plugin in bzrlib.plugin.all_plugins().items():
 
1774
        if getattr(plugin, 'test_suite', None) is not None:
 
1775
            suite.addTest(plugin.test_suite())
341
1776
    return suite
342
1777
 
 
1778
 
 
1779
def adapt_modules(mods_list, adapter, loader, suite):
 
1780
    """Adapt the modules in mods_list using adapter and add to suite."""
 
1781
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
 
1782
        suite.addTests(adapter.adapt(test))