~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/__init__.py

  • Committer: Martin Pool
  • Date: 2005-06-22 09:35:24 UTC
  • Revision ID: mbp@sourcefrog.net-20050622093524-b15e2d374c2ae6ea
- move standard plugins from contrib/plugins to just ./plugins

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005 by Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
# TODO: Perhaps there should be an API to find out if bzr running under the
19
 
# test suite -- some plugins might want to avoid making intrusive changes if
20
 
# this is the case.  However, we want behaviour under to test to diverge as
21
 
# little as possible, so this should be used rarely if it's added at all.
22
 
# (Suggestion from j-a-meinel, 2005-11-24)
23
 
 
24
 
# NOTE: Some classes in here use camelCaseNaming() rather than
25
 
# underscore_naming().  That's for consistency with unittest; it's not the
26
 
# general style of bzrlib.  Please continue that consistency when adding e.g.
27
 
# new assertFoo() methods.
28
 
 
29
 
import codecs
30
 
from cStringIO import StringIO
31
 
import difflib
32
 
import doctest
33
 
import errno
34
 
import logging
35
 
import os
36
 
import re
37
 
import shlex
38
 
import stat
39
 
from subprocess import Popen, PIPE
40
 
import sys
41
 
import tempfile
42
 
import unittest
43
 
import time
44
 
 
45
 
 
46
 
import bzrlib.branch
47
 
import bzrlib.bzrdir as bzrdir
48
 
import bzrlib.commands
49
 
import bzrlib.bundle.serializer
50
 
import bzrlib.errors as errors
51
 
import bzrlib.inventory
52
 
import bzrlib.iterablefile
53
 
import bzrlib.lockdir
 
18
from unittest import TestResult, TestCase
 
19
 
54
20
try:
55
 
    import bzrlib.lsprof
56
 
except ImportError:
57
 
    # lsprof not available
58
 
    pass
59
 
from bzrlib.merge import merge_inner
60
 
import bzrlib.merge3
61
 
import bzrlib.osutils
62
 
import bzrlib.osutils as osutils
63
 
import bzrlib.plugin
64
 
import bzrlib.progress as progress
65
 
from bzrlib.revision import common_ancestor
66
 
import bzrlib.store
67
 
import bzrlib.trace
68
 
from bzrlib.transport import get_transport
69
 
import bzrlib.transport
70
 
from bzrlib.transport.local import LocalRelpathServer
71
 
from bzrlib.transport.readonly import ReadonlyServer
72
 
from bzrlib.trace import mutter
73
 
from bzrlib.tests import TestUtil
74
 
from bzrlib.tests.TestUtil import (
75
 
                          TestSuite,
76
 
                          TestLoader,
77
 
                          )
78
 
from bzrlib.tests.treeshape import build_tree_contents
79
 
import bzrlib.urlutils as urlutils
80
 
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
81
 
 
82
 
default_transport = LocalRelpathServer
83
 
 
84
 
MODULES_TO_TEST = []
85
 
MODULES_TO_DOCTEST = [
86
 
                      bzrlib.branch,
87
 
                      bzrlib.bundle.serializer,
88
 
                      bzrlib.commands,
89
 
                      bzrlib.errors,
90
 
                      bzrlib.inventory,
91
 
                      bzrlib.iterablefile,
92
 
                      bzrlib.lockdir,
93
 
                      bzrlib.merge3,
94
 
                      bzrlib.option,
95
 
                      bzrlib.osutils,
96
 
                      bzrlib.store
97
 
                      ]
98
 
 
99
 
 
100
 
def packages_to_test():
101
 
    """Return a list of packages to test.
102
 
 
103
 
    The packages are not globally imported so that import failures are
104
 
    triggered when running selftest, not when importing the command.
105
 
    """
106
 
    import bzrlib.doc
107
 
    import bzrlib.tests.blackbox
108
 
    import bzrlib.tests.branch_implementations
109
 
    import bzrlib.tests.bzrdir_implementations
110
 
    import bzrlib.tests.interrepository_implementations
111
 
    import bzrlib.tests.interversionedfile_implementations
112
 
    import bzrlib.tests.repository_implementations
113
 
    import bzrlib.tests.revisionstore_implementations
114
 
    import bzrlib.tests.workingtree_implementations
115
 
    return [
116
 
            bzrlib.doc,
117
 
            bzrlib.tests.blackbox,
118
 
            bzrlib.tests.branch_implementations,
119
 
            bzrlib.tests.bzrdir_implementations,
120
 
            bzrlib.tests.interrepository_implementations,
121
 
            bzrlib.tests.interversionedfile_implementations,
122
 
            bzrlib.tests.repository_implementations,
123
 
            bzrlib.tests.revisionstore_implementations,
124
 
            bzrlib.tests.workingtree_implementations,
125
 
            ]
126
 
 
127
 
 
128
 
class _MyResult(unittest._TextTestResult):
129
 
    """Custom TestResult.
130
 
 
131
 
    Shows output in a different format, including displaying runtime for tests.
132
 
    """
133
 
    stop_early = False
134
 
    
135
 
    def __init__(self, stream, descriptions, verbosity, pb=None):
136
 
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
137
 
        self.pb = pb
138
 
    
139
 
    def extractBenchmarkTime(self, testCase):
140
 
        """Add a benchmark time for the current test case."""
141
 
        self._benchmarkTime = getattr(testCase, "_benchtime", None)
142
 
    
143
 
    def _elapsedTestTimeString(self):
144
 
        """Return a time string for the overall time the current test has taken."""
145
 
        return self._formatTime(time.time() - self._start_time)
146
 
 
147
 
    def _testTimeString(self):
148
 
        if self._benchmarkTime is not None:
149
 
            return "%s/%s" % (
150
 
                self._formatTime(self._benchmarkTime),
151
 
                self._elapsedTestTimeString())
152
 
        else:
153
 
            return "      %s" % self._elapsedTestTimeString()
154
 
 
155
 
    def _formatTime(self, seconds):
156
 
        """Format seconds as milliseconds with leading spaces."""
157
 
        return "%5dms" % (1000 * seconds)
158
 
 
159
 
    def _ellipsise_unimportant_words(self, a_string, final_width,
160
 
                                   keep_start=False):
161
 
        """Add ellipses (sp?) for overly long strings.
162
 
        
163
 
        :param keep_start: If true preserve the start of a_string rather
164
 
                           than the end of it.
165
 
        """
166
 
        if keep_start:
167
 
            if len(a_string) > final_width:
168
 
                result = a_string[:final_width-3] + '...'
169
 
            else:
170
 
                result = a_string
171
 
        else:
172
 
            if len(a_string) > final_width:
173
 
                result = '...' + a_string[3-final_width:]
174
 
            else:
175
 
                result = a_string
176
 
        return result.ljust(final_width)
177
 
 
178
 
    def startTest(self, test):
179
 
        unittest.TestResult.startTest(self, test)
180
 
        # In a short description, the important words are in
181
 
        # the beginning, but in an id, the important words are
182
 
        # at the end
183
 
        SHOW_DESCRIPTIONS = False
184
 
 
185
 
        if not self.showAll and self.dots and self.pb is not None:
186
 
            final_width = 13
187
 
        else:
188
 
            final_width = osutils.terminal_width()
189
 
            final_width = final_width - 15 - 8
190
 
        what = None
191
 
        if SHOW_DESCRIPTIONS:
192
 
            what = test.shortDescription()
193
 
            if what:
194
 
                what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
195
 
        if what is None:
196
 
            what = test.id()
197
 
            if what.startswith('bzrlib.tests.'):
198
 
                what = what[13:]
199
 
            what = self._ellipsise_unimportant_words(what, final_width)
200
 
        if self.showAll:
201
 
            self.stream.write(what)
202
 
        elif self.dots and self.pb is not None:
203
 
            self.pb.update(what, self.testsRun - 1, None)
204
 
        self.stream.flush()
205
 
        self._recordTestStartTime()
206
 
 
207
 
    def _recordTestStartTime(self):
208
 
        """Record that a test has started."""
209
 
        self._start_time = time.time()
210
 
 
211
 
    def addError(self, test, err):
212
 
        if isinstance(err[1], TestSkipped):
213
 
            return self.addSkipped(test, err)    
214
 
        unittest.TestResult.addError(self, test, err)
215
 
        self.extractBenchmarkTime(test)
216
 
        if self.showAll:
217
 
            self.stream.writeln("ERROR %s" % self._testTimeString())
218
 
        elif self.dots and self.pb is None:
219
 
            self.stream.write('E')
220
 
        elif self.dots:
221
 
            self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
222
 
        self.stream.flush()
223
 
        if self.stop_early:
224
 
            self.stop()
225
 
 
226
 
    def addFailure(self, test, err):
227
 
        unittest.TestResult.addFailure(self, test, err)
228
 
        self.extractBenchmarkTime(test)
229
 
        if self.showAll:
230
 
            self.stream.writeln(" FAIL %s" % self._testTimeString())
231
 
        elif self.dots and self.pb is None:
232
 
            self.stream.write('F')
233
 
        elif self.dots:
234
 
            self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
235
 
        self.stream.flush()
236
 
        if self.stop_early:
237
 
            self.stop()
238
 
 
239
 
    def addSuccess(self, test):
240
 
        self.extractBenchmarkTime(test)
241
 
        if self.showAll:
242
 
            self.stream.writeln('   OK %s' % self._testTimeString())
243
 
            for bench_called, stats in getattr(test, '_benchcalls', []):
244
 
                self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
245
 
                stats.pprint(file=self.stream)
246
 
        elif self.dots and self.pb is None:
247
 
            self.stream.write('~')
248
 
        elif self.dots:
249
 
            self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
250
 
        self.stream.flush()
251
 
        unittest.TestResult.addSuccess(self, test)
252
 
 
253
 
    def addSkipped(self, test, skip_excinfo):
254
 
        self.extractBenchmarkTime(test)
255
 
        if self.showAll:
256
 
            print >>self.stream, ' SKIP %s' % self._testTimeString()
257
 
            print >>self.stream, '     %s' % skip_excinfo[1]
258
 
        elif self.dots and self.pb is None:
259
 
            self.stream.write('S')
260
 
        elif self.dots:
261
 
            self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
262
 
        self.stream.flush()
263
 
        # seems best to treat this as success from point-of-view of unittest
264
 
        # -- it actually does nothing so it barely matters :)
265
 
        unittest.TestResult.addSuccess(self, test)
266
 
 
267
 
    def printErrorList(self, flavour, errors):
268
 
        for test, err in errors:
269
 
            self.stream.writeln(self.separator1)
270
 
            self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
271
 
            if getattr(test, '_get_log', None) is not None:
272
 
                print >>self.stream
273
 
                print >>self.stream, \
274
 
                        ('vvvv[log from %s]' % test.id()).ljust(78,'-')
275
 
                print >>self.stream, test._get_log()
276
 
                print >>self.stream, \
277
 
                        ('^^^^[log from %s]' % test.id()).ljust(78,'-')
278
 
            self.stream.writeln(self.separator2)
279
 
            self.stream.writeln("%s" % err)
280
 
 
281
 
 
282
 
class TextTestRunner(object):
283
 
    stop_on_failure = False
284
 
 
285
 
    def __init__(self,
286
 
                 stream=sys.stderr,
287
 
                 descriptions=0,
288
 
                 verbosity=1,
289
 
                 keep_output=False,
290
 
                 pb=None):
291
 
        self.stream = unittest._WritelnDecorator(stream)
292
 
        self.descriptions = descriptions
293
 
        self.verbosity = verbosity
294
 
        self.keep_output = keep_output
295
 
        self.pb = pb
296
 
 
297
 
    def _makeResult(self):
298
 
        result = _MyResult(self.stream,
299
 
                           self.descriptions,
300
 
                           self.verbosity,
301
 
                           pb=self.pb)
302
 
        result.stop_early = self.stop_on_failure
303
 
        return result
304
 
 
305
 
    def run(self, test):
306
 
        "Run the given test case or test suite."
307
 
        result = self._makeResult()
308
 
        startTime = time.time()
309
 
        if self.pb is not None:
310
 
            self.pb.update('Running tests', 0, test.countTestCases())
311
 
        test.run(result)
312
 
        stopTime = time.time()
313
 
        timeTaken = stopTime - startTime
314
 
        result.printErrors()
315
 
        self.stream.writeln(result.separator2)
316
 
        run = result.testsRun
317
 
        self.stream.writeln("Ran %d test%s in %.3fs" %
318
 
                            (run, run != 1 and "s" or "", timeTaken))
319
 
        self.stream.writeln()
320
 
        if not result.wasSuccessful():
321
 
            self.stream.write("FAILED (")
322
 
            failed, errored = map(len, (result.failures, result.errors))
323
 
            if failed:
324
 
                self.stream.write("failures=%d" % failed)
325
 
            if errored:
326
 
                if failed: self.stream.write(", ")
327
 
                self.stream.write("errors=%d" % errored)
328
 
            self.stream.writeln(")")
329
 
        else:
330
 
            self.stream.writeln("OK")
331
 
        if self.pb is not None:
332
 
            self.pb.update('Cleaning up', 0, 1)
333
 
        # This is still a little bogus, 
334
 
        # but only a little. Folk not using our testrunner will
335
 
        # have to delete their temp directories themselves.
336
 
        test_root = TestCaseInTempDir.TEST_ROOT
337
 
        if result.wasSuccessful() or not self.keep_output:
338
 
            if test_root is not None:
339
 
                # If LANG=C we probably have created some bogus paths
340
 
                # which rmtree(unicode) will fail to delete
341
 
                # so make sure we are using rmtree(str) to delete everything
342
 
                # except on win32, where rmtree(str) will fail
343
 
                # since it doesn't have the property of byte-stream paths
344
 
                # (they are either ascii or mbcs)
345
 
                if sys.platform == 'win32':
346
 
                    # make sure we are using the unicode win32 api
347
 
                    test_root = unicode(test_root)
348
 
                else:
349
 
                    test_root = test_root.encode(
350
 
                        sys.getfilesystemencoding())
351
 
                osutils.rmtree(test_root)
352
 
        else:
353
 
            if self.pb is not None:
354
 
                self.pb.note("Failed tests working directories are in '%s'\n",
355
 
                             test_root)
356
 
            else:
357
 
                self.stream.writeln(
358
 
                    "Failed tests working directories are in '%s'\n" %
359
 
                    test_root)
360
 
        TestCaseInTempDir.TEST_ROOT = None
361
 
        if self.pb is not None:
362
 
            self.pb.clear()
363
 
        return result
364
 
 
365
 
 
366
 
def iter_suite_tests(suite):
367
 
    """Return all tests in a suite, recursing through nested suites"""
368
 
    for item in suite._tests:
369
 
        if isinstance(item, unittest.TestCase):
370
 
            yield item
371
 
        elif isinstance(item, unittest.TestSuite):
372
 
            for r in iter_suite_tests(item):
373
 
                yield r
374
 
        else:
375
 
            raise Exception('unknown object %r inside test suite %r'
376
 
                            % (item, suite))
377
 
 
378
 
 
379
 
class TestSkipped(Exception):
380
 
    """Indicates that a test was intentionally skipped, rather than failing."""
381
 
    # XXX: Not used yet
 
21
    import shutil
 
22
    from subprocess import call, Popen, PIPE
 
23
except ImportError, e:
 
24
    sys.stderr.write("testbzr: sorry, this test suite requires the subprocess module\n"
 
25
                     "this is shipped with python2.4 and available separately for 2.3\n")
 
26
    raise
382
27
 
383
28
 
384
29
class CommandFailed(Exception):
385
30
    pass
386
31
 
387
32
 
388
 
class StringIOWrapper(object):
389
 
    """A wrapper around cStringIO which just adds an encoding attribute.
390
 
    
391
 
    Internally we can check sys.stdout to see what the output encoding
392
 
    should be. However, cStringIO has no encoding attribute that we can
393
 
    set. So we wrap it instead.
394
 
    """
395
 
    encoding='ascii'
396
 
    _cstring = None
397
 
 
398
 
    def __init__(self, s=None):
399
 
        if s is not None:
400
 
            self.__dict__['_cstring'] = StringIO(s)
401
 
        else:
402
 
            self.__dict__['_cstring'] = StringIO()
403
 
 
404
 
    def __getattr__(self, name, getattr=getattr):
405
 
        return getattr(self.__dict__['_cstring'], name)
406
 
 
407
 
    def __setattr__(self, name, val):
408
 
        if name == 'encoding':
409
 
            self.__dict__['encoding'] = val
410
 
        else:
411
 
            return setattr(self._cstring, name, val)
412
 
 
413
 
 
414
 
class TestCase(unittest.TestCase):
415
 
    """Base class for bzr unit tests.
416
 
    
417
 
    Tests that need access to disk resources should subclass 
418
 
    TestCaseInTempDir not TestCase.
419
 
 
420
 
    Error and debug log messages are redirected from their usual
421
 
    location into a temporary file, the contents of which can be
422
 
    retrieved by _get_log().  We use a real OS file, not an in-memory object,
423
 
    so that it can also capture file IO.  When the test completes this file
424
 
    is read into memory and removed from disk.
425
 
       
426
 
    There are also convenience functions to invoke bzr's command-line
427
 
    routine, and to build and check bzr trees.
428
 
   
429
 
    In addition to the usual method of overriding tearDown(), this class also
430
 
    allows subclasses to register functions into the _cleanups list, which is
431
 
    run in order as the object is torn down.  It's less likely this will be
432
 
    accidentally overlooked.
433
 
    """
434
 
 
435
 
    _log_file_name = None
436
 
    _log_contents = ''
437
 
    # record lsprof data when performing benchmark calls.
438
 
    _gather_lsprof_in_benchmarks = False
439
 
 
440
 
    def __init__(self, methodName='testMethod'):
441
 
        super(TestCase, self).__init__(methodName)
442
 
        self._cleanups = []
443
 
 
444
 
    def setUp(self):
445
 
        unittest.TestCase.setUp(self)
446
 
        self._cleanEnvironment()
447
 
        bzrlib.trace.disable_default_logging()
448
 
        self._startLogFile()
449
 
        self._benchcalls = []
450
 
        self._benchtime = None
451
 
 
452
 
    def _ndiff_strings(self, a, b):
453
 
        """Return ndiff between two strings containing lines.
454
 
        
455
 
        A trailing newline is added if missing to make the strings
456
 
        print properly."""
457
 
        if b and b[-1] != '\n':
458
 
            b += '\n'
459
 
        if a and a[-1] != '\n':
460
 
            a += '\n'
461
 
        difflines = difflib.ndiff(a.splitlines(True),
462
 
                                  b.splitlines(True),
463
 
                                  linejunk=lambda x: False,
464
 
                                  charjunk=lambda x: False)
465
 
        return ''.join(difflines)
466
 
 
467
 
    def assertEqualDiff(self, a, b, message=None):
468
 
        """Assert two texts are equal, if not raise an exception.
469
 
        
470
 
        This is intended for use with multi-line strings where it can 
471
 
        be hard to find the differences by eye.
472
 
        """
473
 
        # TODO: perhaps override assertEquals to call this for strings?
474
 
        if a == b:
475
 
            return
476
 
        if message is None:
477
 
            message = "texts not equal:\n"
478
 
        raise AssertionError(message + 
479
 
                             self._ndiff_strings(a, b))      
480
 
        
481
 
    def assertEqualMode(self, mode, mode_test):
482
 
        self.assertEqual(mode, mode_test,
483
 
                         'mode mismatch %o != %o' % (mode, mode_test))
484
 
 
485
 
    def assertStartsWith(self, s, prefix):
486
 
        if not s.startswith(prefix):
487
 
            raise AssertionError('string %r does not start with %r' % (s, prefix))
488
 
 
489
 
    def assertEndsWith(self, s, suffix):
490
 
        """Asserts that s ends with suffix."""
491
 
        if not s.endswith(suffix):
492
 
            raise AssertionError('string %r does not end with %r' % (s, suffix))
493
 
 
494
 
    def assertContainsRe(self, haystack, needle_re):
495
 
        """Assert that a contains something matching a regular expression."""
496
 
        if not re.search(needle_re, haystack):
497
 
            raise AssertionError('pattern "%s" not found in "%s"'
498
 
                    % (needle_re, haystack))
499
 
 
500
 
    def assertNotContainsRe(self, haystack, needle_re):
501
 
        """Assert that a does not match a regular expression"""
502
 
        if re.search(needle_re, haystack):
503
 
            raise AssertionError('pattern "%s" found in "%s"'
504
 
                    % (needle_re, haystack))
505
 
 
506
 
    def assertSubset(self, sublist, superlist):
507
 
        """Assert that every entry in sublist is present in superlist."""
508
 
        missing = []
509
 
        for entry in sublist:
510
 
            if entry not in superlist:
511
 
                missing.append(entry)
512
 
        if len(missing) > 0:
513
 
            raise AssertionError("value(s) %r not present in container %r" % 
514
 
                                 (missing, superlist))
515
 
 
516
 
    def assertIs(self, left, right):
517
 
        if not (left is right):
518
 
            raise AssertionError("%r is not %r." % (left, right))
519
 
 
520
 
    def assertTransportMode(self, transport, path, mode):
521
 
        """Fail if a path does not have mode mode.
522
 
        
523
 
        If modes are not supported on this transport, the assertion is ignored.
524
 
        """
525
 
        if not transport._can_roundtrip_unix_modebits():
526
 
            return
527
 
        path_stat = transport.stat(path)
528
 
        actual_mode = stat.S_IMODE(path_stat.st_mode)
529
 
        self.assertEqual(mode, actual_mode,
530
 
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
531
 
 
532
 
    def assertIsInstance(self, obj, kls):
533
 
        """Fail if obj is not an instance of kls"""
534
 
        if not isinstance(obj, kls):
535
 
            self.fail("%r is an instance of %s rather than %s" % (
536
 
                obj, obj.__class__, kls))
537
 
 
538
 
    def _startLogFile(self):
539
 
        """Send bzr and test log messages to a temporary file.
540
 
 
541
 
        The file is removed as the test is torn down.
542
 
        """
543
 
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
544
 
        encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
545
 
        self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
546
 
        self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
547
 
        self._log_file_name = name
548
 
        self.addCleanup(self._finishLogFile)
549
 
 
550
 
    def _finishLogFile(self):
551
 
        """Finished with the log file.
552
 
 
553
 
        Read contents into memory, close, and delete.
554
 
        """
555
 
        bzrlib.trace.disable_test_log(self._log_nonce)
556
 
        self._log_file.seek(0)
557
 
        self._log_contents = self._log_file.read()
558
 
        self._log_file.close()
559
 
        os.remove(self._log_file_name)
560
 
        self._log_file = self._log_file_name = None
561
 
 
562
 
    def addCleanup(self, callable):
563
 
        """Arrange to run a callable when this case is torn down.
564
 
 
565
 
        Callables are run in the reverse of the order they are registered, 
566
 
        ie last-in first-out.
567
 
        """
568
 
        if callable in self._cleanups:
569
 
            raise ValueError("cleanup function %r already registered on %s" 
570
 
                    % (callable, self))
571
 
        self._cleanups.append(callable)
572
 
 
573
 
    def _cleanEnvironment(self):
574
 
        new_env = {
575
 
            'HOME': os.getcwd(),
576
 
            'APPDATA': os.getcwd(),
577
 
            'BZREMAIL': None,
578
 
            'EMAIL': None,
579
 
        }
580
 
        self.__old_env = {}
581
 
        self.addCleanup(self._restoreEnvironment)
582
 
        for name, value in new_env.iteritems():
583
 
            self._captureVar(name, value)
584
 
 
585
 
 
586
 
    def _captureVar(self, name, newvalue):
587
 
        """Set an environment variable, preparing it to be reset when finished."""
588
 
        self.__old_env[name] = os.environ.get(name, None)
589
 
        if newvalue is None:
590
 
            if name in os.environ:
591
 
                del os.environ[name]
592
 
        else:
593
 
            os.environ[name] = newvalue
594
 
 
595
 
    @staticmethod
596
 
    def _restoreVar(name, value):
597
 
        if value is None:
598
 
            if name in os.environ:
599
 
                del os.environ[name]
600
 
        else:
601
 
            os.environ[name] = value
602
 
 
603
 
    def _restoreEnvironment(self):
604
 
        for name, value in self.__old_env.iteritems():
605
 
            self._restoreVar(name, value)
606
 
 
607
 
    def tearDown(self):
608
 
        self._runCleanups()
609
 
        unittest.TestCase.tearDown(self)
610
 
 
611
 
    def time(self, callable, *args, **kwargs):
612
 
        """Run callable and accrue the time it takes to the benchmark time.
613
 
        
614
 
        If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
615
 
        this will cause lsprofile statistics to be gathered and stored in
616
 
        self._benchcalls.
617
 
        """
618
 
        if self._benchtime is None:
619
 
            self._benchtime = 0
620
 
        start = time.time()
621
 
        try:
622
 
            if not self._gather_lsprof_in_benchmarks:
623
 
                return callable(*args, **kwargs)
624
 
            else:
625
 
                # record this benchmark
626
 
                ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
627
 
                stats.sort()
628
 
                self._benchcalls.append(((callable, args, kwargs), stats))
629
 
                return ret
630
 
        finally:
631
 
            self._benchtime += time.time() - start
632
 
 
633
 
    def _runCleanups(self):
634
 
        """Run registered cleanup functions. 
635
 
 
636
 
        This should only be called from TestCase.tearDown.
637
 
        """
638
 
        # TODO: Perhaps this should keep running cleanups even if 
639
 
        # one of them fails?
640
 
        for cleanup_fn in reversed(self._cleanups):
641
 
            cleanup_fn()
642
 
 
643
 
    def log(self, *args):
644
 
        mutter(*args)
645
 
 
646
 
    def _get_log(self):
647
 
        """Return as a string the log for this test"""
648
 
        if self._log_file_name:
649
 
            return open(self._log_file_name).read()
650
 
        else:
651
 
            return self._log_contents
652
 
        # TODO: Delete the log after it's been read in
653
 
 
654
 
    def capture(self, cmd, retcode=0):
655
 
        """Shortcut that splits cmd into words, runs, and returns stdout"""
656
 
        return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
657
 
 
658
 
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
659
 
        """Invoke bzr and return (stdout, stderr).
660
 
 
661
 
        Useful for code that wants to check the contents of the
662
 
        output, the way error messages are presented, etc.
663
 
 
664
 
        This should be the main method for tests that want to exercise the
665
 
        overall behavior of the bzr application (rather than a unit test
666
 
        or a functional test of the library.)
667
 
 
668
 
        Much of the old code runs bzr by forking a new copy of Python, but
669
 
        that is slower, harder to debug, and generally not necessary.
670
 
 
671
 
        This runs bzr through the interface that catches and reports
672
 
        errors, and with logging set to something approximating the
673
 
        default, so that error reporting can be checked.
674
 
 
675
 
        :param argv: arguments to invoke bzr
676
 
        :param retcode: expected return code, or None for don't-care.
677
 
        :param encoding: encoding for sys.stdout and sys.stderr
678
 
        :param stdin: A string to be used as stdin for the command.
679
 
        """
680
 
        if encoding is None:
681
 
            encoding = bzrlib.user_encoding
682
 
        if stdin is not None:
683
 
            stdin = StringIO(stdin)
684
 
        stdout = StringIOWrapper()
685
 
        stderr = StringIOWrapper()
686
 
        stdout.encoding = encoding
687
 
        stderr.encoding = encoding
688
 
 
689
 
        self.log('run bzr: %r', argv)
690
 
        # FIXME: don't call into logging here
691
 
        handler = logging.StreamHandler(stderr)
692
 
        handler.setLevel(logging.INFO)
693
 
        logger = logging.getLogger('')
694
 
        logger.addHandler(handler)
695
 
        old_ui_factory = bzrlib.ui.ui_factory
696
 
        bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
697
 
            stdout=stdout,
698
 
            stderr=stderr)
699
 
        bzrlib.ui.ui_factory.stdin = stdin
700
 
        try:
701
 
            result = self.apply_redirected(stdin, stdout, stderr,
702
 
                                           bzrlib.commands.run_bzr_catch_errors,
703
 
                                           argv)
704
 
        finally:
705
 
            logger.removeHandler(handler)
706
 
            bzrlib.ui.ui_factory = old_ui_factory
707
 
 
708
 
        out = stdout.getvalue()
709
 
        err = stderr.getvalue()
710
 
        if out:
711
 
            self.log('output:\n%r', out)
712
 
        if err:
713
 
            self.log('errors:\n%r', err)
714
 
        if retcode is not None:
715
 
            self.assertEquals(retcode, result)
716
 
        return out, err
717
 
 
718
 
    def run_bzr(self, *args, **kwargs):
719
 
        """Invoke bzr, as if it were run from the command line.
720
 
 
721
 
        This should be the main method for tests that want to exercise the
722
 
        overall behavior of the bzr application (rather than a unit test
723
 
        or a functional test of the library.)
724
 
 
725
 
        This sends the stdout/stderr results into the test's log,
726
 
        where it may be useful for debugging.  See also run_captured.
727
 
 
728
 
        :param stdin: A string to be used as stdin for the command.
729
 
        """
730
 
        retcode = kwargs.pop('retcode', 0)
731
 
        encoding = kwargs.pop('encoding', None)
732
 
        stdin = kwargs.pop('stdin', None)
733
 
        return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
734
 
 
735
 
    def run_bzr_decode(self, *args, **kwargs):
736
 
        if kwargs.has_key('encoding'):
737
 
            encoding = kwargs['encoding']
738
 
        else:
739
 
            encoding = bzrlib.user_encoding
740
 
        return self.run_bzr(*args, **kwargs)[0].decode(encoding)
741
 
 
742
 
    def run_bzr_subprocess(self, *args, **kwargs):
743
 
        """Run bzr in a subprocess for testing.
744
 
 
745
 
        This starts a new Python interpreter and runs bzr in there. 
746
 
        This should only be used for tests that have a justifiable need for
747
 
        this isolation: e.g. they are testing startup time, or signal
748
 
        handling, or early startup code, etc.  Subprocess code can't be 
749
 
        profiled or debugged so easily.
750
 
 
751
 
        :param retcode: The status code that is expected.  Defaults to 0.  If
752
 
        None is supplied, the status code is not checked.
753
 
        """
754
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
755
 
        args = list(args)
756
 
        process = Popen([sys.executable, bzr_path]+args, stdout=PIPE, 
757
 
                         stderr=PIPE)
758
 
        out = process.stdout.read()
759
 
        err = process.stderr.read()
760
 
        retcode = process.wait()
761
 
        supplied_retcode = kwargs.get('retcode', 0)
762
 
        if supplied_retcode is not None:
763
 
            assert supplied_retcode == retcode
764
 
        return [out, err]
765
 
 
766
 
    def check_inventory_shape(self, inv, shape):
767
 
        """Compare an inventory to a list of expected names.
768
 
 
769
 
        Fail if they are not precisely equal.
770
 
        """
771
 
        extras = []
772
 
        shape = list(shape)             # copy
773
 
        for path, ie in inv.entries():
774
 
            name = path.replace('\\', '/')
775
 
            if ie.kind == 'dir':
776
 
                name = name + '/'
777
 
            if name in shape:
778
 
                shape.remove(name)
779
 
            else:
780
 
                extras.append(name)
781
 
        if shape:
782
 
            self.fail("expected paths not found in inventory: %r" % shape)
783
 
        if extras:
784
 
            self.fail("unexpected paths found in inventory: %r" % extras)
785
 
 
786
 
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
787
 
                         a_callable=None, *args, **kwargs):
788
 
        """Call callable with redirected std io pipes.
789
 
 
790
 
        Returns the return code."""
791
 
        if not callable(a_callable):
792
 
            raise ValueError("a_callable must be callable.")
793
 
        if stdin is None:
794
 
            stdin = StringIO("")
795
 
        if stdout is None:
796
 
            if getattr(self, "_log_file", None) is not None:
797
 
                stdout = self._log_file
798
 
            else:
799
 
                stdout = StringIO()
800
 
        if stderr is None:
801
 
            if getattr(self, "_log_file", None is not None):
802
 
                stderr = self._log_file
803
 
            else:
804
 
                stderr = StringIO()
805
 
        real_stdin = sys.stdin
806
 
        real_stdout = sys.stdout
807
 
        real_stderr = sys.stderr
808
 
        try:
809
 
            sys.stdout = stdout
810
 
            sys.stderr = stderr
811
 
            sys.stdin = stdin
812
 
            return a_callable(*args, **kwargs)
813
 
        finally:
814
 
            sys.stdout = real_stdout
815
 
            sys.stderr = real_stderr
816
 
            sys.stdin = real_stdin
817
 
 
818
 
    def merge(self, branch_from, wt_to):
819
 
        """A helper for tests to do a ui-less merge.
820
 
 
821
 
        This should move to the main library when someone has time to integrate
822
 
        it in.
823
 
        """
824
 
        # minimal ui-less merge.
825
 
        wt_to.branch.fetch(branch_from)
826
 
        base_rev = common_ancestor(branch_from.last_revision(),
827
 
                                   wt_to.branch.last_revision(),
828
 
                                   wt_to.branch.repository)
829
 
        merge_inner(wt_to.branch, branch_from.basis_tree(), 
830
 
                    wt_to.branch.repository.revision_tree(base_rev),
831
 
                    this_tree=wt_to)
832
 
        wt_to.add_pending_merge(branch_from.last_revision())
833
 
 
834
 
 
835
 
BzrTestBase = TestCase
836
 
 
837
 
     
838
 
class TestCaseInTempDir(TestCase):
839
 
    """Derived class that runs a test within a temporary directory.
840
 
 
841
 
    This is useful for tests that need to create a branch, etc.
842
 
 
843
 
    The directory is created in a slightly complex way: for each
844
 
    Python invocation, a new temporary top-level directory is created.
845
 
    All test cases create their own directory within that.  If the
846
 
    tests complete successfully, the directory is removed.
847
 
 
848
 
    InTempDir is an old alias for FunctionalTestCase.
849
 
    """
850
 
 
851
 
    TEST_ROOT = None
852
 
    _TEST_NAME = 'test'
853
 
    OVERRIDE_PYTHON = 'python'
854
 
 
855
 
    def check_file_contents(self, filename, expect):
856
 
        self.log("check contents of file %s" % filename)
857
 
        contents = file(filename, 'r').read()
858
 
        if contents != expect:
859
 
            self.log("expected: %r" % expect)
860
 
            self.log("actually: %r" % contents)
861
 
            self.fail("contents of %s not as expected" % filename)
862
 
 
863
 
    def _make_test_root(self):
864
 
        if TestCaseInTempDir.TEST_ROOT is not None:
865
 
            return
866
 
        i = 0
867
 
        while True:
868
 
            root = u'test%04d.tmp' % i
869
 
            try:
870
 
                os.mkdir(root)
871
 
            except OSError, e:
872
 
                if e.errno == errno.EEXIST:
873
 
                    i += 1
874
 
                    continue
875
 
                else:
876
 
                    raise
877
 
            # successfully created
878
 
            TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
879
 
            break
880
 
        # make a fake bzr directory there to prevent any tests propagating
881
 
        # up onto the source directory's real branch
882
 
        bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
883
 
 
884
 
    def setUp(self):
885
 
        super(TestCaseInTempDir, self).setUp()
886
 
        self._make_test_root()
887
 
        _currentdir = os.getcwdu()
888
 
        # shorten the name, to avoid test failures due to path length
889
 
        short_id = self.id().replace('bzrlib.tests.', '') \
890
 
                   .replace('__main__.', '')[-100:]
891
 
        # it's possible the same test class is run several times for
892
 
        # parameterized tests, so make sure the names don't collide.  
893
 
        i = 0
894
 
        while True:
895
 
            if i > 0:
896
 
                candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
897
 
            else:
898
 
                candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
899
 
            if os.path.exists(candidate_dir):
900
 
                i = i + 1
901
 
                continue
902
 
            else:
903
 
                self.test_dir = candidate_dir
904
 
                os.mkdir(self.test_dir)
905
 
                os.chdir(self.test_dir)
906
 
                break
907
 
        os.environ['HOME'] = self.test_dir
908
 
        os.environ['APPDATA'] = self.test_dir
909
 
        def _leaveDirectory():
910
 
            os.chdir(_currentdir)
911
 
        self.addCleanup(_leaveDirectory)
912
 
        
913
 
    def build_tree(self, shape, line_endings='native', transport=None):
 
33
class TestBase(TestCase):
 
34
    """Base class for bzr test cases.
 
35
 
 
36
    Just defines some useful helper functions; doesn't actually test
 
37
    anything.
 
38
    """
 
39
    
 
40
    # TODO: Special methods to invoke bzr, so that we can run it
 
41
    # through a specified Python intepreter
 
42
 
 
43
    OVERRIDE_PYTHON = None # to run with alternative python 'python'
 
44
    BZRPATH = 'bzr'
 
45
    
 
46
 
 
47
    def formcmd(self, cmd):
 
48
        if isinstance(cmd, basestring):
 
49
            cmd = cmd.split()
 
50
 
 
51
        if cmd[0] == 'bzr':
 
52
            cmd[0] = self.BZRPATH
 
53
            if self.OVERRIDE_PYTHON:
 
54
                cmd.insert(0, self.OVERRIDE_PYTHON)
 
55
 
 
56
        self.log('$ %r' % cmd)
 
57
 
 
58
        return cmd
 
59
 
 
60
 
 
61
    def runcmd(self, cmd, retcode=0):
 
62
        """Run one command and check the return code.
 
63
 
 
64
        Returns a tuple of (stdout,stderr) strings.
 
65
 
 
66
        If a single string is based, it is split into words.
 
67
        For commands that are not simple space-separated words, please
 
68
        pass a list instead."""
 
69
        cmd = self.formcmd(cmd)
 
70
 
 
71
        self.log('$ ' + ' '.join(cmd))
 
72
        actual_retcode = call(cmd, stdout=self.TEST_LOG, stderr=self.TEST_LOG)
 
73
 
 
74
        if retcode != actual_retcode:
 
75
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
76
                                % (cmd, actual_retcode, retcode))
 
77
 
 
78
 
 
79
    def backtick(self, cmd, retcode=0):
 
80
        """Run a command and return its output"""
 
81
        cmd = self.formcmd(cmd)
 
82
        child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
 
83
        outd, errd = child.communicate()
 
84
        self.log(outd)
 
85
        actual_retcode = child.wait()
 
86
 
 
87
        outd = outd.replace('\r', '')
 
88
 
 
89
        if retcode != actual_retcode:
 
90
            raise CommandFailed("test failed: %r returned %d, expected %d"
 
91
                                % (cmd, actual_retcode, retcode))
 
92
 
 
93
        return outd
 
94
 
 
95
 
 
96
 
 
97
    def build_tree(self, shape):
914
98
        """Build a test tree according to a pattern.
915
99
 
916
100
        shape is a sequence of file specifications.  If the final
917
101
        character is '/', a directory is created.
918
102
 
919
103
        This doesn't add anything to a branch.
920
 
        :param line_endings: Either 'binary' or 'native'
921
 
                             in binary mode, exact contents are written
922
 
                             in native mode, the line endings match the
923
 
                             default platform endings.
924
 
 
925
 
        :param transport: A transport to write to, for building trees on 
926
 
                          VFS's. If the transport is readonly or None,
927
 
                          "." is opened automatically.
928
104
        """
929
105
        # XXX: It's OK to just create them using forward slashes on windows?
930
 
        if transport is None or transport.is_readonly():
931
 
            transport = get_transport(".")
 
106
        import os
932
107
        for name in shape:
933
 
            self.assert_(isinstance(name, basestring))
 
108
            assert isinstance(name, basestring)
934
109
            if name[-1] == '/':
935
 
                transport.mkdir(urlutils.escape(name[:-1]))
936
 
            else:
937
 
                if line_endings == 'binary':
938
 
                    end = '\n'
939
 
                elif line_endings == 'native':
940
 
                    end = os.linesep
941
 
                else:
942
 
                    raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
943
 
                content = "contents of %s%s" % (name.encode('utf-8'), end)
944
 
                transport.put(urlutils.escape(name), StringIO(content))
945
 
 
946
 
    def build_tree_contents(self, shape):
947
 
        build_tree_contents(shape)
948
 
 
949
 
    def failUnlessExists(self, path):
950
 
        """Fail unless path, which may be abs or relative, exists."""
951
 
        self.failUnless(osutils.lexists(path))
952
 
 
953
 
    def failIfExists(self, path):
954
 
        """Fail if path, which may be abs or relative, exists."""
955
 
        self.failIf(osutils.lexists(path))
956
 
        
957
 
    def assertFileEqual(self, content, path):
958
 
        """Fail if path does not contain 'content'."""
959
 
        self.failUnless(osutils.lexists(path))
960
 
        # TODO: jam 20060427 Shouldn't this be 'rb'?
961
 
        self.assertEqualDiff(content, open(path, 'r').read())
962
 
 
963
 
 
964
 
class TestCaseWithTransport(TestCaseInTempDir):
965
 
    """A test case that provides get_url and get_readonly_url facilities.
966
 
 
967
 
    These back onto two transport servers, one for readonly access and one for
968
 
    read write access.
969
 
 
970
 
    If no explicit class is provided for readonly access, a
971
 
    ReadonlyTransportDecorator is used instead which allows the use of non disk
972
 
    based read write transports.
973
 
 
974
 
    If an explicit class is provided for readonly access, that server and the 
975
 
    readwrite one must both define get_url() as resolving to os.getcwd().
976
 
    """
977
 
 
978
 
    def __init__(self, methodName='testMethod'):
979
 
        super(TestCaseWithTransport, self).__init__(methodName)
980
 
        self.__readonly_server = None
981
 
        self.__server = None
982
 
        self.transport_server = default_transport
983
 
        self.transport_readonly_server = None
984
 
 
985
 
    def get_readonly_url(self, relpath=None):
986
 
        """Get a URL for the readonly transport.
987
 
 
988
 
        This will either be backed by '.' or a decorator to the transport 
989
 
        used by self.get_url()
990
 
        relpath provides for clients to get a path relative to the base url.
991
 
        These should only be downwards relative, not upwards.
992
 
        """
993
 
        base = self.get_readonly_server().get_url()
994
 
        if relpath is not None:
995
 
            if not base.endswith('/'):
996
 
                base = base + '/'
997
 
            base = base + relpath
998
 
        return base
999
 
 
1000
 
    def get_readonly_server(self):
1001
 
        """Get the server instance for the readonly transport
1002
 
 
1003
 
        This is useful for some tests with specific servers to do diagnostics.
1004
 
        """
1005
 
        if self.__readonly_server is None:
1006
 
            if self.transport_readonly_server is None:
1007
 
                # readonly decorator requested
1008
 
                # bring up the server
1009
 
                self.get_url()
1010
 
                self.__readonly_server = ReadonlyServer()
1011
 
                self.__readonly_server.setUp(self.__server)
1012
 
            else:
1013
 
                self.__readonly_server = self.transport_readonly_server()
1014
 
                self.__readonly_server.setUp()
1015
 
            self.addCleanup(self.__readonly_server.tearDown)
1016
 
        return self.__readonly_server
1017
 
 
1018
 
    def get_server(self):
1019
 
        """Get the read/write server instance.
1020
 
 
1021
 
        This is useful for some tests with specific servers that need
1022
 
        diagnostics.
1023
 
        """
1024
 
        if self.__server is None:
1025
 
            self.__server = self.transport_server()
1026
 
            self.__server.setUp()
1027
 
            self.addCleanup(self.__server.tearDown)
1028
 
        return self.__server
1029
 
 
1030
 
    def get_url(self, relpath=None):
1031
 
        """Get a URL for the readwrite transport.
1032
 
 
1033
 
        This will either be backed by '.' or to an equivalent non-file based
1034
 
        facility.
1035
 
        relpath provides for clients to get a path relative to the base url.
1036
 
        These should only be downwards relative, not upwards.
1037
 
        """
1038
 
        base = self.get_server().get_url()
1039
 
        if relpath is not None and relpath != '.':
1040
 
            if not base.endswith('/'):
1041
 
                base = base + '/'
1042
 
            base = base + urlutils.escape(relpath)
1043
 
        return base
1044
 
 
1045
 
    def get_transport(self):
1046
 
        """Return a writeable transport for the test scratch space"""
1047
 
        t = get_transport(self.get_url())
1048
 
        self.assertFalse(t.is_readonly())
1049
 
        return t
1050
 
 
1051
 
    def get_readonly_transport(self):
1052
 
        """Return a readonly transport for the test scratch space
1053
 
        
1054
 
        This can be used to test that operations which should only need
1055
 
        readonly access in fact do not try to write.
1056
 
        """
1057
 
        t = get_transport(self.get_readonly_url())
1058
 
        self.assertTrue(t.is_readonly())
1059
 
        return t
1060
 
 
1061
 
    def make_branch(self, relpath, format=None):
1062
 
        """Create a branch on the transport at relpath."""
1063
 
        repo = self.make_repository(relpath, format=format)
1064
 
        return repo.bzrdir.create_branch()
1065
 
 
1066
 
    def make_bzrdir(self, relpath, format=None):
1067
 
        try:
1068
 
            url = self.get_url(relpath)
1069
 
            mutter('relpath %r => url %r', relpath, url)
1070
 
            segments = url.split('/')
1071
 
            if segments and segments[-1] not in ('', '.'):
1072
 
                parent = '/'.join(segments[:-1])
1073
 
                t = get_transport(parent)
1074
 
                try:
1075
 
                    t.mkdir(segments[-1])
1076
 
                except errors.FileExists:
1077
 
                    pass
1078
 
            if format is None:
1079
 
                format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1080
 
            # FIXME: make this use a single transport someday. RBC 20060418
1081
 
            return format.initialize_on_transport(get_transport(relpath))
1082
 
        except errors.UninitializableFormat:
1083
 
            raise TestSkipped("Format %s is not initializable." % format)
1084
 
 
1085
 
    def make_repository(self, relpath, shared=False, format=None):
1086
 
        """Create a repository on our default transport at relpath."""
1087
 
        made_control = self.make_bzrdir(relpath, format=format)
1088
 
        return made_control.create_repository(shared=shared)
1089
 
 
1090
 
    def make_branch_and_tree(self, relpath, format=None):
1091
 
        """Create a branch on the transport and a tree locally.
1092
 
 
1093
 
        Returns the tree.
1094
 
        """
1095
 
        # TODO: always use the local disk path for the working tree,
1096
 
        # this obviously requires a format that supports branch references
1097
 
        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1098
 
        # RBC 20060208
1099
 
        b = self.make_branch(relpath, format=format)
1100
 
        try:
1101
 
            return b.bzrdir.create_workingtree()
1102
 
        except errors.NotLocalUrl:
1103
 
            # new formats - catch No tree error and create
1104
 
            # a branch reference and a checkout.
1105
 
            # old formats at that point - raise TestSkipped.
1106
 
            # TODO: rbc 20060208
1107
 
            return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1108
 
 
1109
 
    def assertIsDirectory(self, relpath, transport):
1110
 
        """Assert that relpath within transport is a directory.
1111
 
 
1112
 
        This may not be possible on all transports; in that case it propagates
1113
 
        a TransportNotPossible.
1114
 
        """
1115
 
        try:
1116
 
            mode = transport.stat(relpath).st_mode
1117
 
        except errors.NoSuchFile:
1118
 
            self.fail("path %s is not a directory; no such file"
1119
 
                      % (relpath))
1120
 
        if not stat.S_ISDIR(mode):
1121
 
            self.fail("path %s is not a directory; has mode %#o"
1122
 
                      % (relpath, mode))
1123
 
 
1124
 
 
1125
 
class ChrootedTestCase(TestCaseWithTransport):
1126
 
    """A support class that provides readonly urls outside the local namespace.
1127
 
 
1128
 
    This is done by checking if self.transport_server is a MemoryServer. if it
1129
 
    is then we are chrooted already, if it is not then an HttpServer is used
1130
 
    for readonly urls.
1131
 
 
1132
 
    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1133
 
                       be used without needed to redo it when a different 
1134
 
                       subclass is in use ?
1135
 
    """
1136
 
 
 
110
                os.mkdir(name[:-1])
 
111
            else:
 
112
                f = file(name, 'wt')
 
113
                print >>f, "contents of", name
 
114
                f.close()
 
115
 
 
116
 
 
117
    def log(self, msg):
 
118
        """Log a message to a progress file"""
 
119
        print >>self.TEST_LOG, msg
 
120
               
 
121
 
 
122
class InTempDir(TestBase):
 
123
    """Base class for tests run in a temporary branch."""
1137
124
    def setUp(self):
1138
 
        super(ChrootedTestCase, self).setUp()
1139
 
        if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1140
 
            self.transport_readonly_server = bzrlib.transport.http.HttpServer
1141
 
 
1142
 
 
1143
 
def filter_suite_by_re(suite, pattern):
1144
 
    result = TestUtil.TestSuite()
1145
 
    filter_re = re.compile(pattern)
1146
 
    for test in iter_suite_tests(suite):
1147
 
        if filter_re.search(test.id()):
1148
 
            result.addTest(test)
1149
 
    return result
1150
 
 
1151
 
 
1152
 
def run_suite(suite, name='test', verbose=False, pattern=".*",
1153
 
              stop_on_failure=False, keep_output=False,
1154
 
              transport=None, lsprof_timed=None):
1155
 
    TestCaseInTempDir._TEST_NAME = name
1156
 
    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1157
 
    if verbose:
1158
 
        verbosity = 2
1159
 
        pb = None
1160
 
    else:
1161
 
        verbosity = 1
1162
 
        pb = progress.ProgressBar()
1163
 
    runner = TextTestRunner(stream=sys.stdout,
1164
 
                            descriptions=0,
1165
 
                            verbosity=verbosity,
1166
 
                            keep_output=keep_output,
1167
 
                            pb=pb)
1168
 
    runner.stop_on_failure=stop_on_failure
1169
 
    if pattern != '.*':
1170
 
        suite = filter_suite_by_re(suite, pattern)
1171
 
    result = runner.run(suite)
1172
 
    return result.wasSuccessful()
1173
 
 
1174
 
 
1175
 
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1176
 
             keep_output=False,
1177
 
             transport=None,
1178
 
             test_suite_factory=None,
1179
 
             lsprof_timed=None):
1180
 
    """Run the whole test suite under the enhanced runner"""
1181
 
    global default_transport
1182
 
    if transport is None:
1183
 
        transport = default_transport
1184
 
    old_transport = default_transport
1185
 
    default_transport = transport
 
125
        import os
 
126
        self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
 
127
        os.mkdir(self.test_dir)
 
128
        os.chdir(self.test_dir)
 
129
        
 
130
    def tearDown(self):
 
131
        import os
 
132
        os.chdir(self.TEST_ROOT)
 
133
 
 
134
 
 
135
 
 
136
 
 
137
 
 
138
class _MyResult(TestResult):
 
139
    """
 
140
    Custom TestResult.
 
141
 
 
142
    No special behaviour for now.
 
143
    """
 
144
    def __init__(self, out):
 
145
        self.out = out
 
146
        TestResult.__init__(self)
 
147
 
 
148
    def startTest(self, test):
 
149
        # TODO: Maybe show test.shortDescription somewhere?
 
150
        print >>self.out, '%-60.60s' % test.id(),
 
151
        TestResult.startTest(self, test)
 
152
 
 
153
    def stopTest(self, test):
 
154
        # print
 
155
        TestResult.stopTest(self, test)
 
156
 
 
157
 
 
158
    def addError(self, test, err):
 
159
        print >>self.out, 'ERROR'
 
160
        TestResult.addError(self, test, err)
 
161
 
 
162
    def addFailure(self, test, err):
 
163
        print >>self.out, 'FAILURE'
 
164
        TestResult.addFailure(self, test, err)
 
165
 
 
166
    def addSuccess(self, test):
 
167
        print >>self.out, 'OK'
 
168
        TestResult.addSuccess(self, test)
 
169
 
 
170
 
 
171
 
 
172
def selftest():
 
173
    from unittest import TestLoader, TestSuite
 
174
    import bzrlib
 
175
    import bzrlib.selftest.whitebox
 
176
    import bzrlib.selftest.blackbox
 
177
    import bzrlib.selftest.versioning
 
178
    from doctest import DocTestSuite
 
179
    import os
 
180
    import shutil
 
181
    import time
 
182
    import sys
 
183
 
 
184
    _setup_test_log()
 
185
    _setup_test_dir()
 
186
    print
 
187
 
 
188
    suite = TestSuite()
 
189
    tl = TestLoader()
 
190
 
 
191
    for m in bzrlib.selftest.whitebox, \
 
192
            bzrlib.selftest.versioning:
 
193
        suite.addTest(tl.loadTestsFromModule(m))
 
194
 
 
195
    suite.addTest(bzrlib.selftest.blackbox.suite())
 
196
 
 
197
    for m in bzrlib.store, bzrlib.inventory, bzrlib.branch, bzrlib.osutils, \
 
198
            bzrlib.commands:
 
199
        suite.addTest(DocTestSuite(m))
 
200
 
 
201
    # save stdout & stderr so there's no leakage from code-under-test
 
202
    real_stdout = sys.stdout
 
203
    real_stderr = sys.stderr
 
204
    sys.stdout = sys.stderr = TestBase.TEST_LOG
1186
205
    try:
1187
 
        if test_suite_factory is None:
1188
 
            suite = test_suite()
1189
 
        else:
1190
 
            suite = test_suite_factory()
1191
 
        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1192
 
                     stop_on_failure=stop_on_failure, keep_output=keep_output,
1193
 
                     transport=transport,
1194
 
                     lsprof_timed=lsprof_timed)
 
206
        result = _MyResult(real_stdout)
 
207
        suite.run(result)
1195
208
    finally:
1196
 
        default_transport = old_transport
1197
 
 
1198
 
 
1199
 
def test_suite():
1200
 
    """Build and return TestSuite for the whole of bzrlib.
1201
 
    
1202
 
    This function can be replaced if you need to change the default test
1203
 
    suite on a global basis, but it is not encouraged.
1204
 
    """
1205
 
    testmod_names = [
1206
 
                   'bzrlib.tests.test_ancestry',
1207
 
                   'bzrlib.tests.test_api',
1208
 
                   'bzrlib.tests.test_bad_files',
1209
 
                   'bzrlib.tests.test_branch',
1210
 
                   'bzrlib.tests.test_bundle',
1211
 
                   'bzrlib.tests.test_bzrdir',
1212
 
                   'bzrlib.tests.test_command',
1213
 
                   'bzrlib.tests.test_commit',
1214
 
                   'bzrlib.tests.test_commit_merge',
1215
 
                   'bzrlib.tests.test_config',
1216
 
                   'bzrlib.tests.test_conflicts',
1217
 
                   'bzrlib.tests.test_decorators',
1218
 
                   'bzrlib.tests.test_diff',
1219
 
                   'bzrlib.tests.test_doc_generate',
1220
 
                   'bzrlib.tests.test_emptytree',
1221
 
                   'bzrlib.tests.test_errors',
1222
 
                   'bzrlib.tests.test_escaped_store',
1223
 
                   'bzrlib.tests.test_fetch',
1224
 
                   'bzrlib.tests.test_gpg',
1225
 
                   'bzrlib.tests.test_graph',
1226
 
                   'bzrlib.tests.test_hashcache',
1227
 
                   'bzrlib.tests.test_http',
1228
 
                   'bzrlib.tests.test_identitymap',
1229
 
                   'bzrlib.tests.test_inv',
1230
 
                   'bzrlib.tests.test_knit',
1231
 
                   'bzrlib.tests.test_lockdir',
1232
 
                   'bzrlib.tests.test_lockable_files',
1233
 
                   'bzrlib.tests.test_log',
1234
 
                   'bzrlib.tests.test_merge',
1235
 
                   'bzrlib.tests.test_merge3',
1236
 
                   'bzrlib.tests.test_merge_core',
1237
 
                   'bzrlib.tests.test_missing',
1238
 
                   'bzrlib.tests.test_msgeditor',
1239
 
                   'bzrlib.tests.test_nonascii',
1240
 
                   'bzrlib.tests.test_options',
1241
 
                   'bzrlib.tests.test_osutils',
1242
 
                   'bzrlib.tests.test_patch',
1243
 
                   'bzrlib.tests.test_patches',
1244
 
                   'bzrlib.tests.test_permissions',
1245
 
                   'bzrlib.tests.test_plugins',
1246
 
                   'bzrlib.tests.test_progress',
1247
 
                   'bzrlib.tests.test_reconcile',
1248
 
                   'bzrlib.tests.test_repository',
1249
 
                   'bzrlib.tests.test_revision',
1250
 
                   'bzrlib.tests.test_revisionnamespaces',
1251
 
                   'bzrlib.tests.test_revprops',
1252
 
                   'bzrlib.tests.test_revisiontree',
1253
 
                   'bzrlib.tests.test_rio',
1254
 
                   'bzrlib.tests.test_sampler',
1255
 
                   'bzrlib.tests.test_selftest',
1256
 
                   'bzrlib.tests.test_setup',
1257
 
                   'bzrlib.tests.test_sftp_transport',
1258
 
                   'bzrlib.tests.test_smart_add',
1259
 
                   'bzrlib.tests.test_source',
1260
 
                   'bzrlib.tests.test_status',
1261
 
                   'bzrlib.tests.test_store',
1262
 
                   'bzrlib.tests.test_symbol_versioning',
1263
 
                   'bzrlib.tests.test_testament',
1264
 
                   'bzrlib.tests.test_textfile',
1265
 
                   'bzrlib.tests.test_textmerge',
1266
 
                   'bzrlib.tests.test_trace',
1267
 
                   'bzrlib.tests.test_transactions',
1268
 
                   'bzrlib.tests.test_transform',
1269
 
                   'bzrlib.tests.test_transport',
1270
 
                   'bzrlib.tests.test_tsort',
1271
 
                   'bzrlib.tests.test_tuned_gzip',
1272
 
                   'bzrlib.tests.test_ui',
1273
 
                   'bzrlib.tests.test_upgrade',
1274
 
                   'bzrlib.tests.test_urlutils',
1275
 
                   'bzrlib.tests.test_versionedfile',
1276
 
                   'bzrlib.tests.test_weave',
1277
 
                   'bzrlib.tests.test_whitebox',
1278
 
                   'bzrlib.tests.test_workingtree',
1279
 
                   'bzrlib.tests.test_xml',
1280
 
                   ]
1281
 
    test_transport_implementations = [
1282
 
        'bzrlib.tests.test_transport_implementations',
1283
 
        'bzrlib.tests.test_read_bundle',
1284
 
        ]
1285
 
    suite = TestUtil.TestSuite()
1286
 
    loader = TestUtil.TestLoader()
1287
 
    from bzrlib.transport import TransportTestProviderAdapter
1288
 
    adapter = TransportTestProviderAdapter()
1289
 
    adapt_modules(test_transport_implementations, adapter, loader, suite)
1290
 
    suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1291
 
    for package in packages_to_test():
1292
 
        suite.addTest(package.test_suite())
1293
 
    for m in MODULES_TO_TEST:
1294
 
        suite.addTest(loader.loadTestsFromModule(m))
1295
 
    for m in MODULES_TO_DOCTEST:
1296
 
        suite.addTest(doctest.DocTestSuite(m))
1297
 
    for name, plugin in bzrlib.plugin.all_plugins().items():
1298
 
        if getattr(plugin, 'test_suite', None) is not None:
1299
 
            suite.addTest(plugin.test_suite())
1300
 
    return suite
1301
 
 
1302
 
 
1303
 
def adapt_modules(mods_list, adapter, loader, suite):
1304
 
    """Adapt the modules in mods_list using adapter and add to suite."""
1305
 
    for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1306
 
        suite.addTests(adapter.adapt(test))
 
209
        sys.stdout = real_stdout
 
210
        sys.stderr = real_stderr
 
211
 
 
212
    _show_results(result)
 
213
 
 
214
    return result.wasSuccessful()
 
215
 
 
216
 
 
217
 
 
218
 
 
219
def _setup_test_log():
 
220
    import time
 
221
    import os
 
222
    
 
223
    log_filename = os.path.abspath('testbzr.log')
 
224
    TestBase.TEST_LOG = open(log_filename, 'wt', buffering=1) # line buffered
 
225
 
 
226
    print >>TestBase.TEST_LOG, "bzr tests run at " + time.ctime()
 
227
    print '%-30s %s' % ('test log', log_filename)
 
228
 
 
229
 
 
230
def _setup_test_dir():
 
231
    import os
 
232
    import shutil
 
233
    
 
234
    TestBase.ORIG_DIR = os.getcwdu()
 
235
    TestBase.TEST_ROOT = os.path.abspath("testbzr.tmp")
 
236
 
 
237
    print '%-30s %s' % ('running tests in', TestBase.TEST_ROOT)
 
238
 
 
239
    if os.path.exists(TestBase.TEST_ROOT):
 
240
        shutil.rmtree(TestBase.TEST_ROOT)
 
241
    os.mkdir(TestBase.TEST_ROOT)
 
242
    os.chdir(TestBase.TEST_ROOT)
 
243
 
 
244
    # make a fake bzr directory there to prevent any tests propagating
 
245
    # up onto the source directory's real branch
 
246
    os.mkdir(os.path.join(TestBase.TEST_ROOT, '.bzr'))
 
247
 
 
248
    
 
249
 
 
250
def _show_results(result):
 
251
     for case, tb in result.errors:
 
252
         _show_test_failure('ERROR', case, tb)
 
253
 
 
254
     for case, tb in result.failures:
 
255
         _show_test_failure('FAILURE', case, tb)
 
256
         
 
257
     print
 
258
     print '%4d tests run' % result.testsRun
 
259
     print '%4d errors' % len(result.errors)
 
260
     print '%4d failures' % len(result.failures)
 
261
 
 
262
 
 
263
 
 
264
def _show_test_failure(kind, case, tb):
 
265
     print (kind + '! ').ljust(60, '-')
 
266
     print case
 
267
     desc = case.shortDescription()
 
268
     if desc:
 
269
         print '   (%s)' % desc
 
270
     print tb
 
271
     print ''.ljust(60, '-')
 
272