1
# Copyright (C) 2005, 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
44
import bzrlib.bzrdir as bzrdir
45
import bzrlib.commands
46
import bzrlib.errors as errors
47
import bzrlib.inventory
48
import bzrlib.iterablefile
53
# lsprof not available
55
from bzrlib.merge import merge_inner
58
import bzrlib.osutils as osutils
60
import bzrlib.progress as progress
61
from bzrlib.revision import common_ancestor
64
from bzrlib.transport import urlescape, get_transport
65
import bzrlib.transport
66
from bzrlib.transport.local import LocalRelpathServer
67
from bzrlib.transport.readonly import ReadonlyServer
68
from bzrlib.trace import mutter
69
from bzrlib.tests.TestUtil import TestLoader, TestSuite
70
from bzrlib.tests.treeshape import build_tree_contents
71
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
73
default_transport = LocalRelpathServer
76
MODULES_TO_DOCTEST = [
88
def packages_to_test():
89
"""Return a list of packages to test.
91
The packages are not globally imported so that import failures are
92
triggered when running selftest, not when importing the command.
95
import bzrlib.tests.blackbox
96
import bzrlib.tests.branch_implementations
97
import bzrlib.tests.bzrdir_implementations
98
import bzrlib.tests.interrepository_implementations
99
import bzrlib.tests.interversionedfile_implementations
100
import bzrlib.tests.repository_implementations
101
import bzrlib.tests.revisionstore_implementations
102
import bzrlib.tests.workingtree_implementations
105
bzrlib.tests.blackbox,
106
bzrlib.tests.branch_implementations,
107
bzrlib.tests.bzrdir_implementations,
108
bzrlib.tests.interrepository_implementations,
109
bzrlib.tests.interversionedfile_implementations,
110
bzrlib.tests.repository_implementations,
111
bzrlib.tests.revisionstore_implementations,
112
bzrlib.tests.workingtree_implementations,
116
class _MyResult(unittest._TextTestResult):
117
"""Custom TestResult.
119
Shows output in a different format, including displaying runtime for tests.
123
def __init__(self, stream, descriptions, verbosity, pb=None):
124
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
127
def extractBenchmarkTime(self, testCase):
128
"""Add a benchmark time for the current test case."""
129
self._benchmarkTime = getattr(testCase, "_benchtime", None)
131
def _elapsedTestTimeString(self):
132
"""Return a time string for the overall time the current test has taken."""
133
return self._formatTime(time.time() - self._start_time)
135
def _testTimeString(self):
136
if self._benchmarkTime is not None:
138
self._formatTime(self._benchmarkTime),
139
self._elapsedTestTimeString())
141
return " %s" % self._elapsedTestTimeString()
143
def _formatTime(self, seconds):
144
"""Format seconds as milliseconds with leading spaces."""
145
return "%5dms" % (1000 * seconds)
147
def _ellipsise_unimportant_words(self, a_string, final_width,
149
"""Add ellipses (sp?) for overly long strings.
151
:param keep_start: If true preserve the start of a_string rather
155
if len(a_string) > final_width:
156
result = a_string[:final_width-3] + '...'
160
if len(a_string) > final_width:
161
result = '...' + a_string[3-final_width:]
164
return result.ljust(final_width)
166
def startTest(self, test):
167
unittest.TestResult.startTest(self, test)
168
# In a short description, the important words are in
169
# the beginning, but in an id, the important words are
171
SHOW_DESCRIPTIONS = False
173
if not self.showAll and self.dots and self.pb is not None:
176
final_width = osutils.terminal_width()
177
final_width = final_width - 15 - 8
179
if SHOW_DESCRIPTIONS:
180
what = test.shortDescription()
182
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
185
if what.startswith('bzrlib.tests.'):
187
what = self._ellipsise_unimportant_words(what, final_width)
189
self.stream.write(what)
190
elif self.dots and self.pb is not None:
191
self.pb.update(what, self.testsRun - 1, None)
193
self._recordTestStartTime()
195
def _recordTestStartTime(self):
196
"""Record that a test has started."""
197
self._start_time = time.time()
199
def addError(self, test, err):
200
if isinstance(err[1], TestSkipped):
201
return self.addSkipped(test, err)
202
unittest.TestResult.addError(self, test, err)
203
self.extractBenchmarkTime(test)
205
self.stream.writeln("ERROR %s" % self._testTimeString())
206
elif self.dots and self.pb is None:
207
self.stream.write('E')
209
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
214
def addFailure(self, test, err):
215
unittest.TestResult.addFailure(self, test, err)
216
self.extractBenchmarkTime(test)
218
self.stream.writeln(" FAIL %s" % self._testTimeString())
219
elif self.dots and self.pb is None:
220
self.stream.write('F')
222
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
227
def addSuccess(self, test):
228
self.extractBenchmarkTime(test)
230
self.stream.writeln(' OK %s' % self._testTimeString())
231
for bench_called, stats in getattr(test, '_benchcalls', []):
232
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
233
stats.pprint(file=self.stream)
234
elif self.dots and self.pb is None:
235
self.stream.write('~')
237
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
239
unittest.TestResult.addSuccess(self, test)
241
def addSkipped(self, test, skip_excinfo):
242
self.extractBenchmarkTime(test)
244
print >>self.stream, ' SKIP %s' % self._testTimeString()
245
print >>self.stream, ' %s' % skip_excinfo[1]
246
elif self.dots and self.pb is None:
247
self.stream.write('S')
249
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
251
# seems best to treat this as success from point-of-view of unittest
252
# -- it actually does nothing so it barely matters :)
253
unittest.TestResult.addSuccess(self, test)
255
def printErrorList(self, flavour, errors):
256
for test, err in errors:
257
self.stream.writeln(self.separator1)
258
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
259
if getattr(test, '_get_log', None) is not None:
261
print >>self.stream, \
262
('vvvv[log from %s]' % test.id()).ljust(78,'-')
263
print >>self.stream, test._get_log()
264
print >>self.stream, \
265
('^^^^[log from %s]' % test.id()).ljust(78,'-')
266
self.stream.writeln(self.separator2)
267
self.stream.writeln("%s" % err)
270
class TextTestRunner(object):
271
stop_on_failure = False
279
self.stream = unittest._WritelnDecorator(stream)
280
self.descriptions = descriptions
281
self.verbosity = verbosity
282
self.keep_output = keep_output
285
def _makeResult(self):
286
result = _MyResult(self.stream,
290
result.stop_early = self.stop_on_failure
294
"Run the given test case or test suite."
295
result = self._makeResult()
296
startTime = time.time()
297
if self.pb is not None:
298
self.pb.update('Running tests', 0, test.countTestCases())
300
stopTime = time.time()
301
timeTaken = stopTime - startTime
303
self.stream.writeln(result.separator2)
304
run = result.testsRun
305
self.stream.writeln("Ran %d test%s in %.3fs" %
306
(run, run != 1 and "s" or "", timeTaken))
307
self.stream.writeln()
308
if not result.wasSuccessful():
309
self.stream.write("FAILED (")
310
failed, errored = map(len, (result.failures, result.errors))
312
self.stream.write("failures=%d" % failed)
314
if failed: self.stream.write(", ")
315
self.stream.write("errors=%d" % errored)
316
self.stream.writeln(")")
318
self.stream.writeln("OK")
319
if self.pb is not None:
320
self.pb.update('Cleaning up', 0, 1)
321
# This is still a little bogus,
322
# but only a little. Folk not using our testrunner will
323
# have to delete their temp directories themselves.
324
test_root = TestCaseInTempDir.TEST_ROOT
325
if result.wasSuccessful() or not self.keep_output:
326
if test_root is not None:
327
osutils.rmtree(test_root)
329
if self.pb is not None:
330
self.pb.note("Failed tests working directories are in '%s'\n",
334
"Failed tests working directories are in '%s'\n" %
336
TestCaseInTempDir.TEST_ROOT = None
337
if self.pb is not None:
342
def iter_suite_tests(suite):
343
"""Return all tests in a suite, recursing through nested suites"""
344
for item in suite._tests:
345
if isinstance(item, unittest.TestCase):
347
elif isinstance(item, unittest.TestSuite):
348
for r in iter_suite_tests(item):
351
raise Exception('unknown object %r inside test suite %r'
355
class TestSkipped(Exception):
356
"""Indicates that a test was intentionally skipped, rather than failing."""
360
class CommandFailed(Exception):
363
class TestCase(unittest.TestCase):
364
"""Base class for bzr unit tests.
366
Tests that need access to disk resources should subclass
367
TestCaseInTempDir not TestCase.
369
Error and debug log messages are redirected from their usual
370
location into a temporary file, the contents of which can be
371
retrieved by _get_log(). We use a real OS file, not an in-memory object,
372
so that it can also capture file IO. When the test completes this file
373
is read into memory and removed from disk.
375
There are also convenience functions to invoke bzr's command-line
376
routine, and to build and check bzr trees.
378
In addition to the usual method of overriding tearDown(), this class also
379
allows subclasses to register functions into the _cleanups list, which is
380
run in order as the object is torn down. It's less likely this will be
381
accidentally overlooked.
384
_log_file_name = None
386
# record lsprof data when performing benchmark calls.
387
_gather_lsprof_in_benchmarks = False
389
def __init__(self, methodName='testMethod'):
390
super(TestCase, self).__init__(methodName)
394
unittest.TestCase.setUp(self)
395
self._cleanEnvironment()
396
bzrlib.trace.disable_default_logging()
398
self._benchcalls = []
399
self._benchtime = None
401
def _ndiff_strings(self, a, b):
402
"""Return ndiff between two strings containing lines.
404
A trailing newline is added if missing to make the strings
406
if b and b[-1] != '\n':
408
if a and a[-1] != '\n':
410
difflines = difflib.ndiff(a.splitlines(True),
412
linejunk=lambda x: False,
413
charjunk=lambda x: False)
414
return ''.join(difflines)
416
def assertEqualDiff(self, a, b, message=None):
417
"""Assert two texts are equal, if not raise an exception.
419
This is intended for use with multi-line strings where it can
420
be hard to find the differences by eye.
422
# TODO: perhaps override assertEquals to call this for strings?
426
message = "texts not equal:\n"
427
raise AssertionError(message +
428
self._ndiff_strings(a, b))
430
def assertEqualMode(self, mode, mode_test):
431
self.assertEqual(mode, mode_test,
432
'mode mismatch %o != %o' % (mode, mode_test))
434
def assertStartsWith(self, s, prefix):
435
if not s.startswith(prefix):
436
raise AssertionError('string %r does not start with %r' % (s, prefix))
438
def assertEndsWith(self, s, suffix):
439
"""Asserts that s ends with suffix."""
440
if not s.endswith(suffix):
441
raise AssertionError('string %r does not end with %r' % (s, suffix))
443
def assertContainsRe(self, haystack, needle_re):
444
"""Assert that a contains something matching a regular expression."""
445
if not re.search(needle_re, haystack):
446
raise AssertionError('pattern "%s" not found in "%s"'
447
% (needle_re, haystack))
449
def assertSubset(self, sublist, superlist):
450
"""Assert that every entry in sublist is present in superlist."""
452
for entry in sublist:
453
if entry not in superlist:
454
missing.append(entry)
456
raise AssertionError("value(s) %r not present in container %r" %
457
(missing, superlist))
459
def assertIs(self, left, right):
460
if not (left is right):
461
raise AssertionError("%r is not %r." % (left, right))
463
def assertTransportMode(self, transport, path, mode):
464
"""Fail if a path does not have mode mode.
466
If modes are not supported on this transport, the assertion is ignored.
468
if not transport._can_roundtrip_unix_modebits():
470
path_stat = transport.stat(path)
471
actual_mode = stat.S_IMODE(path_stat.st_mode)
472
self.assertEqual(mode, actual_mode,
473
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
475
def assertIsInstance(self, obj, kls):
476
"""Fail if obj is not an instance of kls"""
477
if not isinstance(obj, kls):
478
self.fail("%r is an instance of %s rather than %s" % (
479
obj, obj.__class__, kls))
481
def _startLogFile(self):
482
"""Send bzr and test log messages to a temporary file.
484
The file is removed as the test is torn down.
486
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
487
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
488
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
489
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
490
self._log_file_name = name
491
self.addCleanup(self._finishLogFile)
493
def _finishLogFile(self):
494
"""Finished with the log file.
496
Read contents into memory, close, and delete.
498
bzrlib.trace.disable_test_log(self._log_nonce)
499
self._log_file.seek(0)
500
self._log_contents = self._log_file.read()
501
self._log_file.close()
502
os.remove(self._log_file_name)
503
self._log_file = self._log_file_name = None
505
def addCleanup(self, callable):
506
"""Arrange to run a callable when this case is torn down.
508
Callables are run in the reverse of the order they are registered,
509
ie last-in first-out.
511
if callable in self._cleanups:
512
raise ValueError("cleanup function %r already registered on %s"
514
self._cleanups.append(callable)
516
def _cleanEnvironment(self):
519
'APPDATA': os.getcwd(),
524
self.addCleanup(self._restoreEnvironment)
525
for name, value in new_env.iteritems():
526
self._captureVar(name, value)
529
def _captureVar(self, name, newvalue):
530
"""Set an environment variable, preparing it to be reset when finished."""
531
self.__old_env[name] = os.environ.get(name, None)
533
if name in os.environ:
536
os.environ[name] = newvalue
539
def _restoreVar(name, value):
541
if name in os.environ:
544
os.environ[name] = value
546
def _restoreEnvironment(self):
547
for name, value in self.__old_env.iteritems():
548
self._restoreVar(name, value)
552
unittest.TestCase.tearDown(self)
554
def time(self, callable, *args, **kwargs):
555
"""Run callable and accrue the time it takes to the benchmark time.
557
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
558
this will cause lsprofile statistics to be gathered and stored in
561
if self._benchtime is None:
565
if not self._gather_lsprof_in_benchmarks:
566
return callable(*args, **kwargs)
568
# record this benchmark
569
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
571
self._benchcalls.append(((callable, args, kwargs), stats))
574
self._benchtime += time.time() - start
576
def _runCleanups(self):
577
"""Run registered cleanup functions.
579
This should only be called from TestCase.tearDown.
581
# TODO: Perhaps this should keep running cleanups even if
583
for cleanup_fn in reversed(self._cleanups):
586
def log(self, *args):
590
"""Return as a string the log for this test"""
591
if self._log_file_name:
592
return open(self._log_file_name).read()
594
return self._log_contents
595
# TODO: Delete the log after it's been read in
597
def capture(self, cmd, retcode=0):
598
"""Shortcut that splits cmd into words, runs, and returns stdout"""
599
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
601
def run_bzr_captured(self, argv, retcode=0, stdin=None):
602
"""Invoke bzr and return (stdout, stderr).
604
Useful for code that wants to check the contents of the
605
output, the way error messages are presented, etc.
607
This should be the main method for tests that want to exercise the
608
overall behavior of the bzr application (rather than a unit test
609
or a functional test of the library.)
611
Much of the old code runs bzr by forking a new copy of Python, but
612
that is slower, harder to debug, and generally not necessary.
614
This runs bzr through the interface that catches and reports
615
errors, and with logging set to something approximating the
616
default, so that error reporting can be checked.
618
argv -- arguments to invoke bzr
619
retcode -- expected return code, or None for don't-care.
620
:param stdin: A string to be used as stdin for the command.
622
if stdin is not None:
623
stdin = StringIO(stdin)
626
self.log('run bzr: %s', ' '.join(argv))
627
# FIXME: don't call into logging here
628
handler = logging.StreamHandler(stderr)
629
handler.setFormatter(bzrlib.trace.QuietFormatter())
630
handler.setLevel(logging.INFO)
631
logger = logging.getLogger('')
632
logger.addHandler(handler)
633
old_ui_factory = bzrlib.ui.ui_factory
634
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
637
bzrlib.ui.ui_factory.stdin = stdin
639
result = self.apply_redirected(stdin, stdout, stderr,
640
bzrlib.commands.run_bzr_catch_errors,
643
logger.removeHandler(handler)
644
bzrlib.ui.ui_factory = old_ui_factory
645
out = stdout.getvalue()
646
err = stderr.getvalue()
648
self.log('output:\n%s', out)
650
self.log('errors:\n%s', err)
651
if retcode is not None:
652
self.assertEquals(result, retcode)
655
def run_bzr(self, *args, **kwargs):
656
"""Invoke bzr, as if it were run from the command line.
658
This should be the main method for tests that want to exercise the
659
overall behavior of the bzr application (rather than a unit test
660
or a functional test of the library.)
662
This sends the stdout/stderr results into the test's log,
663
where it may be useful for debugging. See also run_captured.
665
:param stdin: A string to be used as stdin for the command.
667
retcode = kwargs.pop('retcode', 0)
668
stdin = kwargs.pop('stdin', None)
669
return self.run_bzr_captured(args, retcode, stdin)
671
def check_inventory_shape(self, inv, shape):
672
"""Compare an inventory to a list of expected names.
674
Fail if they are not precisely equal.
677
shape = list(shape) # copy
678
for path, ie in inv.entries():
679
name = path.replace('\\', '/')
687
self.fail("expected paths not found in inventory: %r" % shape)
689
self.fail("unexpected paths found in inventory: %r" % extras)
691
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
692
a_callable=None, *args, **kwargs):
693
"""Call callable with redirected std io pipes.
695
Returns the return code."""
696
if not callable(a_callable):
697
raise ValueError("a_callable must be callable.")
701
if getattr(self, "_log_file", None) is not None:
702
stdout = self._log_file
706
if getattr(self, "_log_file", None is not None):
707
stderr = self._log_file
710
real_stdin = sys.stdin
711
real_stdout = sys.stdout
712
real_stderr = sys.stderr
717
return a_callable(*args, **kwargs)
719
sys.stdout = real_stdout
720
sys.stderr = real_stderr
721
sys.stdin = real_stdin
723
def merge(self, branch_from, wt_to):
724
"""A helper for tests to do a ui-less merge.
726
This should move to the main library when someone has time to integrate
729
# minimal ui-less merge.
730
wt_to.branch.fetch(branch_from)
731
base_rev = common_ancestor(branch_from.last_revision(),
732
wt_to.branch.last_revision(),
733
wt_to.branch.repository)
734
merge_inner(wt_to.branch, branch_from.basis_tree(),
735
wt_to.branch.repository.revision_tree(base_rev),
737
wt_to.add_pending_merge(branch_from.last_revision())
740
BzrTestBase = TestCase
743
class TestCaseInTempDir(TestCase):
744
"""Derived class that runs a test within a temporary directory.
746
This is useful for tests that need to create a branch, etc.
748
The directory is created in a slightly complex way: for each
749
Python invocation, a new temporary top-level directory is created.
750
All test cases create their own directory within that. If the
751
tests complete successfully, the directory is removed.
753
InTempDir is an old alias for FunctionalTestCase.
758
OVERRIDE_PYTHON = 'python'
760
def check_file_contents(self, filename, expect):
761
self.log("check contents of file %s" % filename)
762
contents = file(filename, 'r').read()
763
if contents != expect:
764
self.log("expected: %r" % expect)
765
self.log("actually: %r" % contents)
766
self.fail("contents of %s not as expected" % filename)
768
def _make_test_root(self):
769
if TestCaseInTempDir.TEST_ROOT is not None:
773
root = u'test%04d.tmp' % i
777
if e.errno == errno.EEXIST:
782
# successfully created
783
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
785
# make a fake bzr directory there to prevent any tests propagating
786
# up onto the source directory's real branch
787
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
790
super(TestCaseInTempDir, self).setUp()
791
self._make_test_root()
792
_currentdir = os.getcwdu()
793
# shorten the name, to avoid test failures due to path length
794
short_id = self.id().replace('bzrlib.tests.', '') \
795
.replace('__main__.', '')[-100:]
796
# it's possible the same test class is run several times for
797
# parameterized tests, so make sure the names don't collide.
801
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
803
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
804
if os.path.exists(candidate_dir):
808
self.test_dir = candidate_dir
809
os.mkdir(self.test_dir)
810
os.chdir(self.test_dir)
812
os.environ['HOME'] = self.test_dir
813
os.environ['APPDATA'] = self.test_dir
814
def _leaveDirectory():
815
os.chdir(_currentdir)
816
self.addCleanup(_leaveDirectory)
818
def build_tree(self, shape, line_endings='native', transport=None):
819
"""Build a test tree according to a pattern.
821
shape is a sequence of file specifications. If the final
822
character is '/', a directory is created.
824
This doesn't add anything to a branch.
825
:param line_endings: Either 'binary' or 'native'
826
in binary mode, exact contents are written
827
in native mode, the line endings match the
828
default platform endings.
830
:param transport: A transport to write to, for building trees on
831
VFS's. If the transport is readonly or None,
832
"." is opened automatically.
834
# XXX: It's OK to just create them using forward slashes on windows?
835
if transport is None or transport.is_readonly():
836
transport = get_transport(".")
838
self.assert_(isinstance(name, basestring))
840
transport.mkdir(urlescape(name[:-1]))
842
if line_endings == 'binary':
844
elif line_endings == 'native':
847
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
848
content = "contents of %s%s" % (name, end)
849
transport.put(urlescape(name), StringIO(content))
851
def build_tree_contents(self, shape):
852
build_tree_contents(shape)
854
def failUnlessExists(self, path):
855
"""Fail unless path, which may be abs or relative, exists."""
856
self.failUnless(osutils.lexists(path))
858
def failIfExists(self, path):
859
"""Fail if path, which may be abs or relative, exists."""
860
self.failIf(osutils.lexists(path))
862
def assertFileEqual(self, content, path):
863
"""Fail if path does not contain 'content'."""
864
self.failUnless(osutils.lexists(path))
865
self.assertEqualDiff(content, open(path, 'r').read())
868
class TestCaseWithTransport(TestCaseInTempDir):
869
"""A test case that provides get_url and get_readonly_url facilities.
871
These back onto two transport servers, one for readonly access and one for
874
If no explicit class is provided for readonly access, a
875
ReadonlyTransportDecorator is used instead which allows the use of non disk
876
based read write transports.
878
If an explicit class is provided for readonly access, that server and the
879
readwrite one must both define get_url() as resolving to os.getcwd().
882
def __init__(self, methodName='testMethod'):
883
super(TestCaseWithTransport, self).__init__(methodName)
884
self.__readonly_server = None
886
self.transport_server = default_transport
887
self.transport_readonly_server = None
889
def get_readonly_url(self, relpath=None):
890
"""Get a URL for the readonly transport.
892
This will either be backed by '.' or a decorator to the transport
893
used by self.get_url()
894
relpath provides for clients to get a path relative to the base url.
895
These should only be downwards relative, not upwards.
897
base = self.get_readonly_server().get_url()
898
if relpath is not None:
899
if not base.endswith('/'):
901
base = base + relpath
904
def get_readonly_server(self):
905
"""Get the server instance for the readonly transport
907
This is useful for some tests with specific servers to do diagnostics.
909
if self.__readonly_server is None:
910
if self.transport_readonly_server is None:
911
# readonly decorator requested
912
# bring up the server
914
self.__readonly_server = ReadonlyServer()
915
self.__readonly_server.setUp(self.__server)
917
self.__readonly_server = self.transport_readonly_server()
918
self.__readonly_server.setUp()
919
self.addCleanup(self.__readonly_server.tearDown)
920
return self.__readonly_server
922
def get_server(self):
923
"""Get the read/write server instance.
925
This is useful for some tests with specific servers that need
928
if self.__server is None:
929
self.__server = self.transport_server()
930
self.__server.setUp()
931
self.addCleanup(self.__server.tearDown)
934
def get_url(self, relpath=None):
935
"""Get a URL for the readwrite transport.
937
This will either be backed by '.' or to an equivalent non-file based
939
relpath provides for clients to get a path relative to the base url.
940
These should only be downwards relative, not upwards.
942
base = self.get_server().get_url()
943
if relpath is not None and relpath != '.':
944
if not base.endswith('/'):
946
base = base + relpath
949
def get_transport(self):
950
"""Return a writeable transport for the test scratch space"""
951
t = get_transport(self.get_url())
952
self.assertFalse(t.is_readonly())
955
def get_readonly_transport(self):
956
"""Return a readonly transport for the test scratch space
958
This can be used to test that operations which should only need
959
readonly access in fact do not try to write.
961
t = get_transport(self.get_readonly_url())
962
self.assertTrue(t.is_readonly())
965
def make_branch(self, relpath, format=None):
966
"""Create a branch on the transport at relpath."""
967
repo = self.make_repository(relpath, format=format)
968
return repo.bzrdir.create_branch()
970
def make_bzrdir(self, relpath, format=None):
972
url = self.get_url(relpath)
973
segments = relpath.split('/')
974
if segments and segments[-1] not in ('', '.'):
975
parent = self.get_url('/'.join(segments[:-1]))
976
t = get_transport(parent)
978
t.mkdir(segments[-1])
979
except errors.FileExists:
982
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
983
# FIXME: make this use a single transport someday. RBC 20060418
984
return format.initialize_on_transport(get_transport(relpath))
985
except errors.UninitializableFormat:
986
raise TestSkipped("Format %s is not initializable." % format)
988
def make_repository(self, relpath, shared=False, format=None):
989
"""Create a repository on our default transport at relpath."""
990
made_control = self.make_bzrdir(relpath, format=format)
991
return made_control.create_repository(shared=shared)
993
def make_branch_and_tree(self, relpath, format=None):
994
"""Create a branch on the transport and a tree locally.
998
# TODO: always use the local disk path for the working tree,
999
# this obviously requires a format that supports branch references
1000
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1002
b = self.make_branch(relpath, format=format)
1004
return b.bzrdir.create_workingtree()
1005
except errors.NotLocalUrl:
1006
# new formats - catch No tree error and create
1007
# a branch reference and a checkout.
1008
# old formats at that point - raise TestSkipped.
1009
# TODO: rbc 20060208
1010
return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1012
def assertIsDirectory(self, relpath, transport):
1013
"""Assert that relpath within transport is a directory.
1015
This may not be possible on all transports; in that case it propagates
1016
a TransportNotPossible.
1019
mode = transport.stat(relpath).st_mode
1020
except errors.NoSuchFile:
1021
self.fail("path %s is not a directory; no such file"
1023
if not stat.S_ISDIR(mode):
1024
self.fail("path %s is not a directory; has mode %#o"
1028
class ChrootedTestCase(TestCaseWithTransport):
1029
"""A support class that provides readonly urls outside the local namespace.
1031
This is done by checking if self.transport_server is a MemoryServer. if it
1032
is then we are chrooted already, if it is not then an HttpServer is used
1035
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1036
be used without needed to redo it when a different
1037
subclass is in use ?
1041
super(ChrootedTestCase, self).setUp()
1042
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1043
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1046
def filter_suite_by_re(suite, pattern):
1047
result = TestSuite()
1048
filter_re = re.compile(pattern)
1049
for test in iter_suite_tests(suite):
1050
if filter_re.search(test.id()):
1051
result.addTest(test)
1055
def run_suite(suite, name='test', verbose=False, pattern=".*",
1056
stop_on_failure=False, keep_output=False,
1057
transport=None, lsprof_timed=None):
1058
TestCaseInTempDir._TEST_NAME = name
1059
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1065
pb = progress.ProgressBar()
1066
runner = TextTestRunner(stream=sys.stdout,
1068
verbosity=verbosity,
1069
keep_output=keep_output,
1071
runner.stop_on_failure=stop_on_failure
1073
suite = filter_suite_by_re(suite, pattern)
1074
result = runner.run(suite)
1075
return result.wasSuccessful()
1078
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1081
test_suite_factory=None,
1083
"""Run the whole test suite under the enhanced runner"""
1084
global default_transport
1085
if transport is None:
1086
transport = default_transport
1087
old_transport = default_transport
1088
default_transport = transport
1090
if test_suite_factory is None:
1091
suite = test_suite()
1093
suite = test_suite_factory()
1094
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1095
stop_on_failure=stop_on_failure, keep_output=keep_output,
1096
transport=transport,
1097
lsprof_timed=lsprof_timed)
1099
default_transport = old_transport
1103
"""Build and return TestSuite for the whole of bzrlib.
1105
This function can be replaced if you need to change the default test
1106
suite on a global basis, but it is not encouraged.
1108
from doctest import DocTestSuite
1110
global MODULES_TO_DOCTEST
1113
'bzrlib.tests.test_ancestry',
1114
'bzrlib.tests.test_api',
1115
'bzrlib.tests.test_bad_files',
1116
'bzrlib.tests.test_branch',
1117
'bzrlib.tests.test_bzrdir',
1118
'bzrlib.tests.test_command',
1119
'bzrlib.tests.test_commit',
1120
'bzrlib.tests.test_commit_merge',
1121
'bzrlib.tests.test_config',
1122
'bzrlib.tests.test_conflicts',
1123
'bzrlib.tests.test_decorators',
1124
'bzrlib.tests.test_diff',
1125
'bzrlib.tests.test_doc_generate',
1126
'bzrlib.tests.test_errors',
1127
'bzrlib.tests.test_escaped_store',
1128
'bzrlib.tests.test_fetch',
1129
'bzrlib.tests.test_gpg',
1130
'bzrlib.tests.test_graph',
1131
'bzrlib.tests.test_hashcache',
1132
'bzrlib.tests.test_http',
1133
'bzrlib.tests.test_identitymap',
1134
'bzrlib.tests.test_inv',
1135
'bzrlib.tests.test_knit',
1136
'bzrlib.tests.test_lockdir',
1137
'bzrlib.tests.test_lockable_files',
1138
'bzrlib.tests.test_log',
1139
'bzrlib.tests.test_merge',
1140
'bzrlib.tests.test_merge3',
1141
'bzrlib.tests.test_merge_core',
1142
'bzrlib.tests.test_missing',
1143
'bzrlib.tests.test_msgeditor',
1144
'bzrlib.tests.test_nonascii',
1145
'bzrlib.tests.test_options',
1146
'bzrlib.tests.test_osutils',
1147
'bzrlib.tests.test_patch',
1148
'bzrlib.tests.test_permissions',
1149
'bzrlib.tests.test_plugins',
1150
'bzrlib.tests.test_progress',
1151
'bzrlib.tests.test_reconcile',
1152
'bzrlib.tests.test_repository',
1153
'bzrlib.tests.test_revision',
1154
'bzrlib.tests.test_revisionnamespaces',
1155
'bzrlib.tests.test_revprops',
1156
'bzrlib.tests.test_rio',
1157
'bzrlib.tests.test_sampler',
1158
'bzrlib.tests.test_selftest',
1159
'bzrlib.tests.test_setup',
1160
'bzrlib.tests.test_sftp_transport',
1161
'bzrlib.tests.test_smart_add',
1162
'bzrlib.tests.test_source',
1163
'bzrlib.tests.test_status',
1164
'bzrlib.tests.test_store',
1165
'bzrlib.tests.test_symbol_versioning',
1166
'bzrlib.tests.test_testament',
1167
'bzrlib.tests.test_textfile',
1168
'bzrlib.tests.test_textmerge',
1169
'bzrlib.tests.test_trace',
1170
'bzrlib.tests.test_transactions',
1171
'bzrlib.tests.test_transform',
1172
'bzrlib.tests.test_transport',
1173
'bzrlib.tests.test_tsort',
1174
'bzrlib.tests.test_tuned_gzip',
1175
'bzrlib.tests.test_ui',
1176
'bzrlib.tests.test_upgrade',
1177
'bzrlib.tests.test_versionedfile',
1178
'bzrlib.tests.test_weave',
1179
'bzrlib.tests.test_whitebox',
1180
'bzrlib.tests.test_workingtree',
1181
'bzrlib.tests.test_xml',
1183
test_transport_implementations = [
1184
'bzrlib.tests.test_transport_implementations']
1187
loader = TestUtil.TestLoader()
1188
from bzrlib.transport import TransportTestProviderAdapter
1189
adapter = TransportTestProviderAdapter()
1190
adapt_modules(test_transport_implementations, adapter, loader, suite)
1191
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1192
for package in packages_to_test():
1193
suite.addTest(package.test_suite())
1194
for m in MODULES_TO_TEST:
1195
suite.addTest(loader.loadTestsFromModule(m))
1196
for m in (MODULES_TO_DOCTEST):
1197
suite.addTest(DocTestSuite(m))
1198
for name, plugin in bzrlib.plugin.all_plugins().items():
1199
if getattr(plugin, 'test_suite', None) is not None:
1200
suite.addTest(plugin.test_suite())
1204
def adapt_modules(mods_list, adapter, loader, suite):
1205
"""Adapt the modules in mods_list using adapter and add to suite."""
1206
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1207
suite.addTests(adapter.adapt(test))