21
21
# little as possible, so this should be used rarely if it's added at all.
22
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
24
30
from cStringIO import StringIO
39
from subprocess import Popen, PIPE
37
46
import bzrlib.branch
47
import bzrlib.bzrdir as bzrdir
38
48
import bzrlib.commands
39
from bzrlib.errors import BzrError
49
import bzrlib.bundle.serializer
50
import bzrlib.errors as errors
40
51
import bzrlib.inventory
52
import bzrlib.iterablefile
57
# lsprof not available
59
from bzrlib.merge import merge_inner
41
60
import bzrlib.merge3
42
61
import bzrlib.osutils
43
62
import bzrlib.osutils as osutils
44
63
import bzrlib.plugin
64
import bzrlib.progress as progress
65
from bzrlib.revision import common_ancestor
45
66
import bzrlib.store
67
from bzrlib import symbol_versioning
46
68
import bzrlib.trace
69
from bzrlib.transport import get_transport
70
import bzrlib.transport
71
from bzrlib.transport.local import LocalRelpathServer
72
from bzrlib.transport.readonly import ReadonlyServer
47
73
from bzrlib.trace import mutter
48
from bzrlib.tests.TestUtil import TestLoader, TestSuite
74
from bzrlib.tests import TestUtil
75
from bzrlib.tests.TestUtil import (
49
79
from bzrlib.tests.treeshape import build_tree_contents
80
import bzrlib.urlutils as urlutils
81
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
83
default_transport = LocalRelpathServer
51
85
MODULES_TO_TEST = []
52
86
MODULES_TO_DOCTEST = [
88
bzrlib.bundle.serializer,
61
101
def packages_to_test():
102
"""Return a list of packages to test.
104
The packages are not globally imported so that import failures are
105
triggered when running selftest, not when importing the command.
62
108
import bzrlib.tests.blackbox
109
import bzrlib.tests.branch_implementations
110
import bzrlib.tests.bzrdir_implementations
111
import bzrlib.tests.interrepository_implementations
112
import bzrlib.tests.interversionedfile_implementations
113
import bzrlib.tests.intertree_implementations
114
import bzrlib.tests.repository_implementations
115
import bzrlib.tests.revisionstore_implementations
116
import bzrlib.tests.tree_implementations
117
import bzrlib.tests.workingtree_implementations
120
bzrlib.tests.blackbox,
121
bzrlib.tests.branch_implementations,
122
bzrlib.tests.bzrdir_implementations,
123
bzrlib.tests.interrepository_implementations,
124
bzrlib.tests.interversionedfile_implementations,
125
bzrlib.tests.intertree_implementations,
126
bzrlib.tests.repository_implementations,
127
bzrlib.tests.revisionstore_implementations,
128
bzrlib.tests.tree_implementations,
129
bzrlib.tests.workingtree_implementations,
68
class EarlyStoppingTestResultAdapter(object):
69
"""An adapter for TestResult to stop at the first first failure or error"""
71
def __init__(self, result):
74
def addError(self, test, err):
75
self._result.addError(test, err)
78
def addFailure(self, test, err):
79
self._result.addFailure(test, err)
82
def __getattr__(self, name):
83
return getattr(self._result, name)
85
def __setattr__(self, name, value):
87
object.__setattr__(self, name, value)
88
return setattr(self._result, name, value)
91
133
class _MyResult(unittest._TextTestResult):
92
134
"""Custom TestResult.
94
136
Shows output in a different format, including displaying runtime for tests.
97
def _elapsedTime(self):
98
return "%5dms" % (1000 * (time.time() - self._start_time))
140
def __init__(self, stream, descriptions, verbosity, pb=None,
142
"""Construct new TestResult.
144
:param bench_history: Optionally, a writable file object to accumulate
147
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
149
if bench_history is not None:
150
from bzrlib.version import _get_bzr_source_tree
151
src_tree = _get_bzr_source_tree()
153
revision_id = src_tree.last_revision()
155
# XXX: If there's no branch, what should we do?
157
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
158
self._bench_history = bench_history
160
def extractBenchmarkTime(self, testCase):
161
"""Add a benchmark time for the current test case."""
162
self._benchmarkTime = getattr(testCase, "_benchtime", None)
164
def _elapsedTestTimeString(self):
165
"""Return a time string for the overall time the current test has taken."""
166
return self._formatTime(time.time() - self._start_time)
168
def _testTimeString(self):
169
if self._benchmarkTime is not None:
171
self._formatTime(self._benchmarkTime),
172
self._elapsedTestTimeString())
174
return " %s" % self._elapsedTestTimeString()
176
def _formatTime(self, seconds):
177
"""Format seconds as milliseconds with leading spaces."""
178
return "%5dms" % (1000 * seconds)
180
def _ellipsise_unimportant_words(self, a_string, final_width,
182
"""Add ellipses (sp?) for overly long strings.
184
:param keep_start: If true preserve the start of a_string rather
188
if len(a_string) > final_width:
189
result = a_string[:final_width-3] + '...'
193
if len(a_string) > final_width:
194
result = '...' + a_string[3-final_width:]
197
return result.ljust(final_width)
100
199
def startTest(self, test):
101
200
unittest.TestResult.startTest(self, test)
103
202
# the beginning, but in an id, the important words are
105
204
SHOW_DESCRIPTIONS = False
206
if not self.showAll and self.dots and self.pb is not None:
209
final_width = osutils.terminal_width()
210
final_width = final_width - 15 - 8
212
if SHOW_DESCRIPTIONS:
213
what = test.shortDescription()
215
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
218
if what.startswith('bzrlib.tests.'):
220
what = self._ellipsise_unimportant_words(what, final_width)
107
width = osutils.terminal_width()
108
name_width = width - 15
110
if SHOW_DESCRIPTIONS:
111
what = test.shortDescription()
113
if len(what) > name_width:
114
what = what[:name_width-3] + '...'
117
if what.startswith('bzrlib.tests.'):
119
if len(what) > name_width:
120
what = '...' + what[3-name_width:]
121
what = what.ljust(name_width)
122
222
self.stream.write(what)
223
elif self.dots and self.pb is not None:
224
self.pb.update(what, self.testsRun - 1, None)
123
225
self.stream.flush()
226
self._recordTestStartTime()
228
def _recordTestStartTime(self):
229
"""Record that a test has started."""
124
230
self._start_time = time.time()
126
232
def addError(self, test, err):
127
233
if isinstance(err[1], TestSkipped):
128
234
return self.addSkipped(test, err)
129
235
unittest.TestResult.addError(self, test, err)
236
self.extractBenchmarkTime(test)
131
self.stream.writeln("ERROR %s" % self._elapsedTime())
238
self.stream.writeln("ERROR %s" % self._testTimeString())
239
elif self.dots and self.pb is None:
133
240
self.stream.write('E')
242
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
243
self.pb.note(self._ellipsise_unimportant_words(
244
test.id() + ': ERROR',
245
osutils.terminal_width()))
134
246
self.stream.flush()
136
250
def addFailure(self, test, err):
137
251
unittest.TestResult.addFailure(self, test, err)
252
self.extractBenchmarkTime(test)
139
self.stream.writeln(" FAIL %s" % self._elapsedTime())
254
self.stream.writeln(" FAIL %s" % self._testTimeString())
255
elif self.dots and self.pb is None:
141
256
self.stream.write('F')
258
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
259
self.pb.note(self._ellipsise_unimportant_words(
260
test.id() + ': FAIL',
261
osutils.terminal_width()))
142
262
self.stream.flush()
144
266
def addSuccess(self, test):
267
self.extractBenchmarkTime(test)
268
if self._bench_history is not None:
269
if self._benchmarkTime is not None:
270
self._bench_history.write("%s %s\n" % (
271
self._formatTime(self._benchmarkTime),
146
self.stream.writeln(' OK %s' % self._elapsedTime())
274
self.stream.writeln(' OK %s' % self._testTimeString())
275
for bench_called, stats in getattr(test, '_benchcalls', []):
276
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
277
stats.pprint(file=self.stream)
278
elif self.dots and self.pb is None:
148
279
self.stream.write('~')
281
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
149
282
self.stream.flush()
150
283
unittest.TestResult.addSuccess(self, test)
152
285
def addSkipped(self, test, skip_excinfo):
286
self.extractBenchmarkTime(test)
154
print >>self.stream, ' SKIP %s' % self._elapsedTime()
288
print >>self.stream, ' SKIP %s' % self._testTimeString()
155
289
print >>self.stream, ' %s' % skip_excinfo[1]
290
elif self.dots and self.pb is None:
157
291
self.stream.write('S')
293
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
158
294
self.stream.flush()
159
295
# seems best to treat this as success from point-of-view of unittest
160
296
# -- it actually does nothing so it barely matters :)
161
unittest.TestResult.addSuccess(self, test)
299
except KeyboardInterrupt:
302
self.addError(test, test.__exc_info())
304
unittest.TestResult.addSuccess(self, test)
163
306
def printErrorList(self, flavour, errors):
164
307
for test, err in errors:
165
308
self.stream.writeln(self.separator1)
166
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
167
if hasattr(test, '_get_log'):
309
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
310
if getattr(test, '_get_log', None) is not None:
168
311
print >>self.stream
169
312
print >>self.stream, \
170
('vvvv[log from %s]' % test).ljust(78,'-')
313
('vvvv[log from %s]' % test.id()).ljust(78,'-')
171
314
print >>self.stream, test._get_log()
172
315
print >>self.stream, \
173
('^^^^[log from %s]' % test).ljust(78,'-')
316
('^^^^[log from %s]' % test.id()).ljust(78,'-')
174
317
self.stream.writeln(self.separator2)
175
318
self.stream.writeln("%s" % err)
178
class TextTestRunner(unittest.TextTestRunner):
321
class TextTestRunner(object):
179
322
stop_on_failure = False
331
self.stream = unittest._WritelnDecorator(stream)
332
self.descriptions = descriptions
333
self.verbosity = verbosity
334
self.keep_output = keep_output
336
self._bench_history = bench_history
181
338
def _makeResult(self):
182
result = _MyResult(self.stream, self.descriptions, self.verbosity)
183
if self.stop_on_failure:
184
result = EarlyStoppingTestResultAdapter(result)
339
result = _MyResult(self.stream,
343
bench_history=self._bench_history)
344
result.stop_early = self.stop_on_failure
348
"Run the given test case or test suite."
349
result = self._makeResult()
350
startTime = time.time()
351
if self.pb is not None:
352
self.pb.update('Running tests', 0, test.countTestCases())
354
stopTime = time.time()
355
timeTaken = stopTime - startTime
357
self.stream.writeln(result.separator2)
358
run = result.testsRun
359
self.stream.writeln("Ran %d test%s in %.3fs" %
360
(run, run != 1 and "s" or "", timeTaken))
361
self.stream.writeln()
362
if not result.wasSuccessful():
363
self.stream.write("FAILED (")
364
failed, errored = map(len, (result.failures, result.errors))
366
self.stream.write("failures=%d" % failed)
368
if failed: self.stream.write(", ")
369
self.stream.write("errors=%d" % errored)
370
self.stream.writeln(")")
372
self.stream.writeln("OK")
373
if self.pb is not None:
374
self.pb.update('Cleaning up', 0, 1)
375
# This is still a little bogus,
376
# but only a little. Folk not using our testrunner will
377
# have to delete their temp directories themselves.
378
test_root = TestCaseInTempDir.TEST_ROOT
379
if result.wasSuccessful() or not self.keep_output:
380
if test_root is not None:
381
# If LANG=C we probably have created some bogus paths
382
# which rmtree(unicode) will fail to delete
383
# so make sure we are using rmtree(str) to delete everything
384
# except on win32, where rmtree(str) will fail
385
# since it doesn't have the property of byte-stream paths
386
# (they are either ascii or mbcs)
387
if sys.platform == 'win32':
388
# make sure we are using the unicode win32 api
389
test_root = unicode(test_root)
391
test_root = test_root.encode(
392
sys.getfilesystemencoding())
393
osutils.rmtree(test_root)
395
if self.pb is not None:
396
self.pb.note("Failed tests working directories are in '%s'\n",
400
"Failed tests working directories are in '%s'\n" %
402
TestCaseInTempDir.TEST_ROOT = None
403
if self.pb is not None:
289
554
raise AssertionError("value(s) %r not present in container %r" %
290
555
(missing, superlist))
557
def assertIs(self, left, right):
558
if not (left is right):
559
raise AssertionError("%r is not %r." % (left, right))
561
def assertTransportMode(self, transport, path, mode):
562
"""Fail if a path does not have mode mode.
564
If modes are not supported on this transport, the assertion is ignored.
566
if not transport._can_roundtrip_unix_modebits():
568
path_stat = transport.stat(path)
569
actual_mode = stat.S_IMODE(path_stat.st_mode)
570
self.assertEqual(mode, actual_mode,
571
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
573
def assertIsInstance(self, obj, kls):
574
"""Fail if obj is not an instance of kls"""
575
if not isinstance(obj, kls):
576
self.fail("%r is an instance of %s rather than %s" % (
577
obj, obj.__class__, kls))
579
def callDeprecated(self, expected, callable, *args, **kwargs):
580
"""Assert that a callable is deprecated in a particular way.
582
:param expected: a list of the deprecation warnings expected, in order
583
:param callable: The callable to call
584
:param args: The positional arguments for the callable
585
:param kwargs: The keyword arguments for the callable
588
def capture_warnings(msg, cls, stacklevel=None):
589
self.assertEqual(cls, DeprecationWarning)
590
local_warnings.append(msg)
591
method = symbol_versioning.warn
592
symbol_versioning.set_warning_method(capture_warnings)
594
result = callable(*args, **kwargs)
596
symbol_versioning.set_warning_method(method)
597
self.assertEqual(expected, local_warnings)
292
600
def _startLogFile(self):
293
601
"""Send bzr and test log messages to a temporary file.
295
603
The file is removed as the test is torn down.
297
605
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
298
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
299
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
300
bzrlib.trace.enable_test_log(self._log_file)
606
self._log_file = os.fdopen(fileno, 'w+')
607
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
301
608
self._log_file_name = name
302
609
self.addCleanup(self._finishLogFile)
402
736
errors, and with logging set to something approximating the
403
737
default, so that error reporting can be checked.
405
argv -- arguments to invoke bzr
406
retcode -- expected return code, or None for don't-care.
739
:param argv: arguments to invoke bzr
740
:param retcode: expected return code, or None for don't-care.
741
:param encoding: encoding for sys.stdout and sys.stderr
742
:param stdin: A string to be used as stdin for the command.
410
self.log('run bzr: %s', ' '.join(argv))
745
encoding = bzrlib.user_encoding
746
if stdin is not None:
747
stdin = StringIO(stdin)
748
stdout = StringIOWrapper()
749
stderr = StringIOWrapper()
750
stdout.encoding = encoding
751
stderr.encoding = encoding
753
self.log('run bzr: %r', argv)
411
754
# FIXME: don't call into logging here
412
755
handler = logging.StreamHandler(stderr)
413
handler.setFormatter(bzrlib.trace.QuietFormatter())
414
756
handler.setLevel(logging.INFO)
415
757
logger = logging.getLogger('')
416
758
logger.addHandler(handler)
759
old_ui_factory = bzrlib.ui.ui_factory
760
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
763
bzrlib.ui.ui_factory.stdin = stdin
418
result = self.apply_redirected(None, stdout, stderr,
765
result = self.apply_redirected(stdin, stdout, stderr,
419
766
bzrlib.commands.run_bzr_catch_errors,
422
769
logger.removeHandler(handler)
770
bzrlib.ui.ui_factory = old_ui_factory
423
772
out = stdout.getvalue()
424
773
err = stderr.getvalue()
426
self.log('output:\n%s', out)
775
self.log('output:\n%r', out)
428
self.log('errors:\n%s', err)
777
self.log('errors:\n%r', err)
429
778
if retcode is not None:
430
self.assertEquals(result, retcode)
779
self.assertEquals(retcode, result)
433
782
def run_bzr(self, *args, **kwargs):
440
789
This sends the stdout/stderr results into the test's log,
441
790
where it may be useful for debugging. See also run_captured.
792
:param stdin: A string to be used as stdin for the command.
443
794
retcode = kwargs.pop('retcode', 0)
444
return self.run_bzr_captured(args, retcode)
795
encoding = kwargs.pop('encoding', None)
796
stdin = kwargs.pop('stdin', None)
797
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
799
def run_bzr_decode(self, *args, **kwargs):
800
if 'encoding' in kwargs:
801
encoding = kwargs['encoding']
803
encoding = bzrlib.user_encoding
804
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
806
def run_bzr_error(self, error_regexes, *args, **kwargs):
807
"""Run bzr, and check that stderr contains the supplied regexes
809
:param error_regexes: Sequence of regular expressions which
810
must each be found in the error output. The relative ordering
812
:param args: command-line arguments for bzr
813
:param kwargs: Keyword arguments which are interpreted by run_bzr
814
This function changes the default value of retcode to be 3,
815
since in most cases this is run when you expect bzr to fail.
816
:return: (out, err) The actual output of running the command (in case you
817
want to do more inspection)
820
# Make sure that commit is failing because there is nothing to do
821
self.run_bzr_error(['no changes to commit'],
822
'commit', '-m', 'my commit comment')
823
# Make sure --strict is handling an unknown file, rather than
824
# giving us the 'nothing to do' error
825
self.build_tree(['unknown'])
826
self.run_bzr_error(['Commit refused because there are unknown files'],
827
'commit', '--strict', '-m', 'my commit comment')
829
kwargs.setdefault('retcode', 3)
830
out, err = self.run_bzr(*args, **kwargs)
831
for regex in error_regexes:
832
self.assertContainsRe(err, regex)
835
def run_bzr_subprocess(self, *args, **kwargs):
836
"""Run bzr in a subprocess for testing.
838
This starts a new Python interpreter and runs bzr in there.
839
This should only be used for tests that have a justifiable need for
840
this isolation: e.g. they are testing startup time, or signal
841
handling, or early startup code, etc. Subprocess code can't be
842
profiled or debugged so easily.
844
:param retcode: The status code that is expected. Defaults to 0. If
845
None is supplied, the status code is not checked.
846
:param env_changes: A dictionary which lists changes to environment
847
variables. A value of None will unset the env variable.
848
The values must be strings. The change will only occur in the
849
child, so you don't need to fix the environment after running.
851
env_changes = kwargs.get('env_changes', {})
852
def cleanup_environment():
853
for env_var, value in env_changes.iteritems():
855
del os.environ[env_var]
857
os.environ[env_var] = value
859
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
861
process = Popen([sys.executable, bzr_path]+args,
862
stdout=PIPE, stderr=PIPE,
863
preexec_fn=cleanup_environment)
864
out = process.stdout.read()
865
err = process.stderr.read()
866
retcode = process.wait()
867
supplied_retcode = kwargs.get('retcode', 0)
868
if supplied_retcode is not None:
869
assert supplied_retcode == retcode
446
872
def check_inventory_shape(self, inv, shape):
447
873
"""Compare an inventory to a list of expected names.
544
986
# make a fake bzr directory there to prevent any tests propagating
545
987
# up onto the source directory's real branch
546
os.mkdir(osutils.pathjoin(TestCaseInTempDir.TEST_ROOT, '.bzr'))
988
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
549
991
super(TestCaseInTempDir, self).setUp()
550
992
self._make_test_root()
551
993
_currentdir = os.getcwdu()
994
# shorten the name, to avoid test failures due to path length
552
995
short_id = self.id().replace('bzrlib.tests.', '') \
553
.replace('__main__.', '')
554
self.test_dir = osutils.pathjoin(self.TEST_ROOT, short_id)
555
os.mkdir(self.test_dir)
556
os.chdir(self.test_dir)
996
.replace('__main__.', '')[-100:]
997
# it's possible the same test class is run several times for
998
# parameterized tests, so make sure the names don't collide.
1002
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1004
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1005
if os.path.exists(candidate_dir):
1009
self.test_dir = candidate_dir
1010
os.mkdir(self.test_dir)
1011
os.chdir(self.test_dir)
557
1013
os.environ['HOME'] = self.test_dir
558
1014
os.environ['APPDATA'] = self.test_dir
559
1015
def _leaveDirectory():
560
1016
os.chdir(_currentdir)
561
1017
self.addCleanup(_leaveDirectory)
563
def build_tree(self, shape, line_endings='native'):
1019
def build_tree(self, shape, line_endings='native', transport=None):
564
1020
"""Build a test tree according to a pattern.
566
1022
shape is a sequence of file specifications. If the final
567
1023
character is '/', a directory is created.
1025
This assumes that all the elements in the tree being built are new.
569
1027
This doesn't add anything to a branch.
570
1028
:param line_endings: Either 'binary' or 'native'
571
1029
in binary mode, exact contents are written
572
1030
in native mode, the line endings match the
573
1031
default platform endings.
1033
:param transport: A transport to write to, for building trees on
1034
VFS's. If the transport is readonly or None,
1035
"." is opened automatically.
575
# XXX: It's OK to just create them using forward slashes on windows?
1037
# It's OK to just create them using forward slashes on windows.
1038
if transport is None or transport.is_readonly():
1039
transport = get_transport(".")
576
1040
for name in shape:
577
1041
self.assert_(isinstance(name, basestring))
578
1042
if name[-1] == '/':
1043
transport.mkdir(urlutils.escape(name[:-1]))
581
1045
if line_endings == 'binary':
583
1047
elif line_endings == 'native':
586
raise BzrError('Invalid line ending request %r' % (line_endings,))
587
print >>f, "contents of", name
1050
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
1051
content = "contents of %s%s" % (name.encode('utf-8'), end)
1052
# Technically 'put()' is the right command. However, put
1053
# uses an AtomicFile, which requires an extra rename into place
1054
# As long as the files didn't exist in the past, append() will
1055
# do the same thing as put()
1056
# On jam's machine, make_kernel_like_tree is:
1057
# put: 4.5-7.5s (averaging 6s)
1059
transport.append(urlutils.escape(name), StringIO(content))
590
1061
def build_tree_contents(self, shape):
591
1062
build_tree_contents(shape)
601
1072
def assertFileEqual(self, content, path):
602
1073
"""Fail if path does not contain 'content'."""
603
1074
self.failUnless(osutils.lexists(path))
1075
# TODO: jam 20060427 Shouldn't this be 'rb'?
604
1076
self.assertEqualDiff(content, open(path, 'r').read())
1079
class TestCaseWithTransport(TestCaseInTempDir):
1080
"""A test case that provides get_url and get_readonly_url facilities.
1082
These back onto two transport servers, one for readonly access and one for
1085
If no explicit class is provided for readonly access, a
1086
ReadonlyTransportDecorator is used instead which allows the use of non disk
1087
based read write transports.
1089
If an explicit class is provided for readonly access, that server and the
1090
readwrite one must both define get_url() as resolving to os.getcwd().
1093
def __init__(self, methodName='testMethod'):
1094
super(TestCaseWithTransport, self).__init__(methodName)
1095
self.__readonly_server = None
1096
self.__server = None
1097
self.transport_server = default_transport
1098
self.transport_readonly_server = None
1100
def get_readonly_url(self, relpath=None):
1101
"""Get a URL for the readonly transport.
1103
This will either be backed by '.' or a decorator to the transport
1104
used by self.get_url()
1105
relpath provides for clients to get a path relative to the base url.
1106
These should only be downwards relative, not upwards.
1108
base = self.get_readonly_server().get_url()
1109
if relpath is not None:
1110
if not base.endswith('/'):
1112
base = base + relpath
1115
def get_readonly_server(self):
1116
"""Get the server instance for the readonly transport
1118
This is useful for some tests with specific servers to do diagnostics.
1120
if self.__readonly_server is None:
1121
if self.transport_readonly_server is None:
1122
# readonly decorator requested
1123
# bring up the server
1125
self.__readonly_server = ReadonlyServer()
1126
self.__readonly_server.setUp(self.__server)
1128
self.__readonly_server = self.transport_readonly_server()
1129
self.__readonly_server.setUp()
1130
self.addCleanup(self.__readonly_server.tearDown)
1131
return self.__readonly_server
1133
def get_server(self):
1134
"""Get the read/write server instance.
1136
This is useful for some tests with specific servers that need
1139
if self.__server is None:
1140
self.__server = self.transport_server()
1141
self.__server.setUp()
1142
self.addCleanup(self.__server.tearDown)
1143
return self.__server
1145
def get_url(self, relpath=None):
1146
"""Get a URL for the readwrite transport.
1148
This will either be backed by '.' or to an equivalent non-file based
1150
relpath provides for clients to get a path relative to the base url.
1151
These should only be downwards relative, not upwards.
1153
base = self.get_server().get_url()
1154
if relpath is not None and relpath != '.':
1155
if not base.endswith('/'):
1157
base = base + urlutils.escape(relpath)
1160
def get_transport(self):
1161
"""Return a writeable transport for the test scratch space"""
1162
t = get_transport(self.get_url())
1163
self.assertFalse(t.is_readonly())
1166
def get_readonly_transport(self):
1167
"""Return a readonly transport for the test scratch space
1169
This can be used to test that operations which should only need
1170
readonly access in fact do not try to write.
1172
t = get_transport(self.get_readonly_url())
1173
self.assertTrue(t.is_readonly())
1176
def make_branch(self, relpath, format=None):
1177
"""Create a branch on the transport at relpath."""
1178
repo = self.make_repository(relpath, format=format)
1179
return repo.bzrdir.create_branch()
1181
def make_bzrdir(self, relpath, format=None):
1183
url = self.get_url(relpath)
1184
mutter('relpath %r => url %r', relpath, url)
1185
segments = url.split('/')
1186
if segments and segments[-1] not in ('', '.'):
1187
parent = '/'.join(segments[:-1])
1188
t = get_transport(parent)
1190
t.mkdir(segments[-1])
1191
except errors.FileExists:
1194
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1195
# FIXME: make this use a single transport someday. RBC 20060418
1196
return format.initialize_on_transport(get_transport(relpath))
1197
except errors.UninitializableFormat:
1198
raise TestSkipped("Format %s is not initializable." % format)
1200
def make_repository(self, relpath, shared=False, format=None):
1201
"""Create a repository on our default transport at relpath."""
1202
made_control = self.make_bzrdir(relpath, format=format)
1203
return made_control.create_repository(shared=shared)
1205
def make_branch_and_tree(self, relpath, format=None):
1206
"""Create a branch on the transport and a tree locally.
1210
# TODO: always use the local disk path for the working tree,
1211
# this obviously requires a format that supports branch references
1212
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1214
b = self.make_branch(relpath, format=format)
1216
return b.bzrdir.create_workingtree()
1217
except errors.NotLocalUrl:
1218
# new formats - catch No tree error and create
1219
# a branch reference and a checkout.
1220
# old formats at that point - raise TestSkipped.
1221
# TODO: rbc 20060208
1222
return WorkingTreeFormat2().initialize(bzrdir.BzrDir.open(relpath))
1224
def assertIsDirectory(self, relpath, transport):
1225
"""Assert that relpath within transport is a directory.
1227
This may not be possible on all transports; in that case it propagates
1228
a TransportNotPossible.
1231
mode = transport.stat(relpath).st_mode
1232
except errors.NoSuchFile:
1233
self.fail("path %s is not a directory; no such file"
1235
if not stat.S_ISDIR(mode):
1236
self.fail("path %s is not a directory; has mode %#o"
1240
class ChrootedTestCase(TestCaseWithTransport):
1241
"""A support class that provides readonly urls outside the local namespace.
1243
This is done by checking if self.transport_server is a MemoryServer. if it
1244
is then we are chrooted already, if it is not then an HttpServer is used
1247
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1248
be used without needed to redo it when a different
1249
subclass is in use ?
1253
super(ChrootedTestCase, self).setUp()
1254
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1255
self.transport_readonly_server = bzrlib.transport.http.HttpServer
607
1258
def filter_suite_by_re(suite, pattern):
1259
result = TestUtil.TestSuite()
609
1260
filter_re = re.compile(pattern)
610
1261
for test in iter_suite_tests(suite):
611
1262
if filter_re.search(test.id()):
616
1267
def run_suite(suite, name='test', verbose=False, pattern=".*",
617
stop_on_failure=False, keep_output=False):
1268
stop_on_failure=False, keep_output=False,
1269
transport=None, lsprof_timed=None, bench_history=None):
618
1270
TestCaseInTempDir._TEST_NAME = name
1271
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1277
pb = progress.ProgressBar()
623
1278
runner = TextTestRunner(stream=sys.stdout,
1280
verbosity=verbosity,
1281
keep_output=keep_output,
1283
bench_history=bench_history)
626
1284
runner.stop_on_failure=stop_on_failure
627
1285
if pattern != '.*':
628
1286
suite = filter_suite_by_re(suite, pattern)
629
1287
result = runner.run(suite)
630
# This is still a little bogus,
631
# but only a little. Folk not using our testrunner will
632
# have to delete their temp directories themselves.
633
if result.wasSuccessful() or not keep_output:
634
if TestCaseInTempDir.TEST_ROOT is not None:
635
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
637
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
638
1288
return result.wasSuccessful()
641
1291
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1294
test_suite_factory=None,
1296
bench_history=None):
643
1297
"""Run the whole test suite under the enhanced runner"""
644
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern,
645
stop_on_failure=stop_on_failure, keep_output=keep_output)
1298
# XXX: Very ugly way to do this...
1299
# Disable warning about old formats because we don't want it to disturb
1300
# any blackbox tests.
1301
from bzrlib import repository
1302
repository._deprecation_warning_done = True
1304
global default_transport
1305
if transport is None:
1306
transport = default_transport
1307
old_transport = default_transport
1308
default_transport = transport
1310
if test_suite_factory is None:
1311
suite = test_suite()
1313
suite = test_suite_factory()
1314
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1315
stop_on_failure=stop_on_failure, keep_output=keep_output,
1316
transport=transport,
1317
lsprof_timed=lsprof_timed,
1318
bench_history=bench_history)
1320
default_transport = old_transport
648
1323
def test_suite():
649
"""Build and return TestSuite for the whole program."""
650
from doctest import DocTestSuite
652
global MODULES_TO_DOCTEST
1324
"""Build and return TestSuite for the whole of bzrlib.
1326
This function can be replaced if you need to change the default test
1327
suite on a global basis, but it is not encouraged.
655
1330
'bzrlib.tests.test_ancestry',
656
'bzrlib.tests.test_annotate',
657
1331
'bzrlib.tests.test_api',
1332
'bzrlib.tests.test_atomicfile',
658
1333
'bzrlib.tests.test_bad_files',
659
'bzrlib.tests.test_basis_inventory',
660
1334
'bzrlib.tests.test_branch',
1335
'bzrlib.tests.test_bundle',
1336
'bzrlib.tests.test_bzrdir',
1337
'bzrlib.tests.test_cache_utf8',
661
1338
'bzrlib.tests.test_command',
662
1339
'bzrlib.tests.test_commit',
663
1340
'bzrlib.tests.test_commit_merge',
664
1341
'bzrlib.tests.test_config',
665
1342
'bzrlib.tests.test_conflicts',
1343
'bzrlib.tests.test_decorators',
666
1344
'bzrlib.tests.test_diff',
1345
'bzrlib.tests.test_doc_generate',
1346
'bzrlib.tests.test_errors',
1347
'bzrlib.tests.test_escaped_store',
667
1348
'bzrlib.tests.test_fetch',
668
1349
'bzrlib.tests.test_gpg',
669
1350
'bzrlib.tests.test_graph',
670
1351
'bzrlib.tests.test_hashcache',
671
1352
'bzrlib.tests.test_http',
1353
'bzrlib.tests.test_http_response',
672
1354
'bzrlib.tests.test_identitymap',
1355
'bzrlib.tests.test_ignores',
673
1356
'bzrlib.tests.test_inv',
1357
'bzrlib.tests.test_knit',
1358
'bzrlib.tests.test_lockdir',
1359
'bzrlib.tests.test_lockable_files',
674
1360
'bzrlib.tests.test_log',
675
1361
'bzrlib.tests.test_merge',
676
1362
'bzrlib.tests.test_merge3',
680
1366
'bzrlib.tests.test_nonascii',
681
1367
'bzrlib.tests.test_options',
682
1368
'bzrlib.tests.test_osutils',
683
'bzrlib.tests.test_parent',
1369
'bzrlib.tests.test_patch',
1370
'bzrlib.tests.test_patches',
684
1371
'bzrlib.tests.test_permissions',
685
1372
'bzrlib.tests.test_plugins',
686
'bzrlib.tests.test_remove',
1373
'bzrlib.tests.test_progress',
1374
'bzrlib.tests.test_reconcile',
1375
'bzrlib.tests.test_repository',
1376
'bzrlib.tests.test_revert',
687
1377
'bzrlib.tests.test_revision',
688
1378
'bzrlib.tests.test_revisionnamespaces',
689
'bzrlib.tests.test_revprops',
690
'bzrlib.tests.test_reweave',
1379
'bzrlib.tests.test_revisiontree',
691
1380
'bzrlib.tests.test_rio',
692
1381
'bzrlib.tests.test_sampler',
693
1382
'bzrlib.tests.test_selftest',
694
1383
'bzrlib.tests.test_setup',
695
1384
'bzrlib.tests.test_sftp_transport',
1385
'bzrlib.tests.test_ftp_transport',
696
1386
'bzrlib.tests.test_smart_add',
697
1387
'bzrlib.tests.test_source',
698
1388
'bzrlib.tests.test_status',
699
1389
'bzrlib.tests.test_store',
700
1390
'bzrlib.tests.test_symbol_versioning',
701
1391
'bzrlib.tests.test_testament',
1392
'bzrlib.tests.test_textfile',
1393
'bzrlib.tests.test_textmerge',
702
1394
'bzrlib.tests.test_trace',
703
1395
'bzrlib.tests.test_transactions',
1396
'bzrlib.tests.test_transform',
704
1397
'bzrlib.tests.test_transport',
1398
'bzrlib.tests.test_tree',
705
1399
'bzrlib.tests.test_tsort',
1400
'bzrlib.tests.test_tuned_gzip',
706
1401
'bzrlib.tests.test_ui',
707
'bzrlib.tests.test_uncommit',
708
1402
'bzrlib.tests.test_upgrade',
1403
'bzrlib.tests.test_urlutils',
1404
'bzrlib.tests.test_versionedfile',
1405
'bzrlib.tests.test_version',
709
1406
'bzrlib.tests.test_weave',
710
1407
'bzrlib.tests.test_whitebox',
711
1408
'bzrlib.tests.test_workingtree',
712
1409
'bzrlib.tests.test_xml',
715
TestCase.BZRPATH = osutils.pathjoin(
716
osutils.realpath(osutils.dirname(bzrlib.__path__[0])), 'bzr')
717
print '%10s: %s' % ('bzr', osutils.realpath(sys.argv[0]))
718
print '%10s: %s' % ('bzrlib', bzrlib.__path__[0])
721
# python2.4's TestLoader.loadTestsFromNames gives very poor
722
# errors if it fails to load a named module - no indication of what's
723
# actually wrong, just "no such module". We should probably override that
724
# class, but for the moment just load them ourselves. (mbp 20051202)
725
loader = TestLoader()
726
for mod_name in testmod_names:
727
mod = _load_module_by_name(mod_name)
728
suite.addTest(loader.loadTestsFromModule(mod))
1411
test_transport_implementations = [
1412
'bzrlib.tests.test_transport_implementations',
1413
'bzrlib.tests.test_read_bundle',
1415
suite = TestUtil.TestSuite()
1416
loader = TestUtil.TestLoader()
1417
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1418
from bzrlib.transport import TransportTestProviderAdapter
1419
adapter = TransportTestProviderAdapter()
1420
adapt_modules(test_transport_implementations, adapter, loader, suite)
729
1421
for package in packages_to_test():
730
1422
suite.addTest(package.test_suite())
731
1423
for m in MODULES_TO_TEST:
732
1424
suite.addTest(loader.loadTestsFromModule(m))
733
for m in (MODULES_TO_DOCTEST):
734
suite.addTest(DocTestSuite(m))
1425
for m in MODULES_TO_DOCTEST:
1426
suite.addTest(doctest.DocTestSuite(m))
735
1427
for name, plugin in bzrlib.plugin.all_plugins().items():
736
if hasattr(plugin, 'test_suite'):
1428
if getattr(plugin, 'test_suite', None) is not None:
737
1429
suite.addTest(plugin.test_suite())
741
def _load_module_by_name(mod_name):
742
parts = mod_name.split('.')
743
module = __import__(mod_name)
745
# for historical reasons python returns the top-level module even though
746
# it loads the submodule; we need to walk down to get the one we want.
748
module = getattr(module, parts.pop(0))
1433
def adapt_modules(mods_list, adapter, loader, suite):
1434
"""Adapt the modules in mods_list using adapter and add to suite."""
1435
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1436
suite.addTests(adapter.adapt(test))