1
# Copyright (C) 2005, 2006 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
30
from cStringIO import StringIO
39
from subprocess import Popen, PIPE
46
from bzrlib import memorytree
48
import bzrlib.bzrdir as bzrdir
25
from testsweet import run_suite
49
26
import bzrlib.commands
50
import bzrlib.bundle.serializer
51
import bzrlib.errors as errors
53
import bzrlib.inventory
54
import bzrlib.iterablefile
59
# lsprof not available
61
from bzrlib.merge import merge_inner
64
import bzrlib.osutils as osutils
66
import bzrlib.progress as progress
67
from bzrlib.revision import common_ancestor
69
from bzrlib import symbol_versioning
70
28
import bzrlib.trace
71
from bzrlib.transport import get_transport
72
import bzrlib.transport
73
from bzrlib.transport.local import LocalURLServer
74
from bzrlib.transport.memory import MemoryServer
75
from bzrlib.transport.readonly import ReadonlyServer
76
from bzrlib.trace import mutter, note
77
from bzrlib.tests import TestUtil
78
from bzrlib.tests.TestUtil import (
82
from bzrlib.tests.treeshape import build_tree_contents
83
import bzrlib.urlutils as urlutils
84
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
86
default_transport = LocalURLServer
88
31
MODULES_TO_TEST = []
89
MODULES_TO_DOCTEST = [
90
bzrlib.bundle.serializer,
102
def packages_to_test():
103
"""Return a list of packages to test.
105
The packages are not globally imported so that import failures are
106
triggered when running selftest, not when importing the command.
109
import bzrlib.tests.blackbox
110
import bzrlib.tests.branch_implementations
111
import bzrlib.tests.bzrdir_implementations
112
import bzrlib.tests.interrepository_implementations
113
import bzrlib.tests.interversionedfile_implementations
114
import bzrlib.tests.intertree_implementations
115
import bzrlib.tests.repository_implementations
116
import bzrlib.tests.revisionstore_implementations
117
import bzrlib.tests.tree_implementations
118
import bzrlib.tests.workingtree_implementations
121
bzrlib.tests.blackbox,
122
bzrlib.tests.branch_implementations,
123
bzrlib.tests.bzrdir_implementations,
124
bzrlib.tests.interrepository_implementations,
125
bzrlib.tests.interversionedfile_implementations,
126
bzrlib.tests.intertree_implementations,
127
bzrlib.tests.repository_implementations,
128
bzrlib.tests.revisionstore_implementations,
129
bzrlib.tests.tree_implementations,
130
bzrlib.tests.workingtree_implementations,
134
class ExtendedTestResult(unittest._TextTestResult):
135
"""Accepts, reports and accumulates the results of running tests.
137
Compared to this unittest version this class adds support for profiling,
138
benchmarking, stopping as soon as a test fails, and skipping tests.
139
There are further-specialized subclasses for different types of display.
144
def __init__(self, stream, descriptions, verbosity,
148
"""Construct new TestResult.
150
:param bench_history: Optionally, a writable file object to accumulate
153
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
154
if bench_history is not None:
155
from bzrlib.version import _get_bzr_source_tree
156
src_tree = _get_bzr_source_tree()
159
revision_id = src_tree.get_parent_ids()[0]
161
# XXX: if this is a brand new tree, do the same as if there
165
# XXX: If there's no branch, what should we do?
167
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
168
self._bench_history = bench_history
169
self.ui = bzrlib.ui.ui_factory
170
self.num_tests = num_tests
172
self.failure_count = 0
175
self._overall_start_time = time.time()
177
def extractBenchmarkTime(self, testCase):
178
"""Add a benchmark time for the current test case."""
179
self._benchmarkTime = getattr(testCase, "_benchtime", None)
181
def _elapsedTestTimeString(self):
182
"""Return a time string for the overall time the current test has taken."""
183
return self._formatTime(time.time() - self._start_time)
185
def _testTimeString(self):
186
if self._benchmarkTime is not None:
188
self._formatTime(self._benchmarkTime),
189
self._elapsedTestTimeString())
191
return " %s" % self._elapsedTestTimeString()
193
def _formatTime(self, seconds):
194
"""Format seconds as milliseconds with leading spaces."""
195
return "%5dms" % (1000 * seconds)
197
def _shortened_test_description(self, test):
199
what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
202
def startTest(self, test):
203
unittest.TestResult.startTest(self, test)
204
self.report_test_start(test)
205
self._recordTestStartTime()
207
def _recordTestStartTime(self):
208
"""Record that a test has started."""
209
self._start_time = time.time()
211
def addError(self, test, err):
212
if isinstance(err[1], TestSkipped):
213
return self.addSkipped(test, err)
214
unittest.TestResult.addError(self, test, err)
215
# We can only do this if we have one of our TestCases, not if
217
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
218
if setKeepLogfile is not None:
220
self.extractBenchmarkTime(test)
221
self.report_error(test, err)
225
def addFailure(self, test, err):
226
unittest.TestResult.addFailure(self, test, err)
227
# We can only do this if we have one of our TestCases, not if
229
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
230
if setKeepLogfile is not None:
232
self.extractBenchmarkTime(test)
233
self.report_failure(test, err)
237
def addSuccess(self, test):
238
self.extractBenchmarkTime(test)
239
if self._bench_history is not None:
240
if self._benchmarkTime is not None:
241
self._bench_history.write("%s %s\n" % (
242
self._formatTime(self._benchmarkTime),
244
self.report_success(test)
245
unittest.TestResult.addSuccess(self, test)
247
def addSkipped(self, test, skip_excinfo):
248
self.extractBenchmarkTime(test)
249
self.report_skip(test, skip_excinfo)
250
# seems best to treat this as success from point-of-view of unittest
251
# -- it actually does nothing so it barely matters :)
254
except KeyboardInterrupt:
257
self.addError(test, test.__exc_info())
259
unittest.TestResult.addSuccess(self, test)
261
def printErrorList(self, flavour, errors):
262
for test, err in errors:
263
self.stream.writeln(self.separator1)
264
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
265
if getattr(test, '_get_log', None) is not None:
267
print >>self.stream, \
268
('vvvv[log from %s]' % test.id()).ljust(78,'-')
269
print >>self.stream, test._get_log()
270
print >>self.stream, \
271
('^^^^[log from %s]' % test.id()).ljust(78,'-')
272
self.stream.writeln(self.separator2)
273
self.stream.writeln("%s" % err)
278
def report_cleaning_up(self):
281
def report_success(self, test):
285
class TextTestResult(ExtendedTestResult):
286
"""Displays progress and results of tests in text form"""
288
def __init__(self, *args, **kw):
289
ExtendedTestResult.__init__(self, *args, **kw)
290
self.pb = self.ui.nested_progress_bar()
291
self.pb.show_pct = False
292
self.pb.show_spinner = False
293
self.pb.show_eta = False,
294
self.pb.show_count = False
295
self.pb.show_bar = False
297
def report_starting(self):
298
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
300
def _progress_prefix_text(self):
301
a = '[%d' % self.count
302
if self.num_tests is not None:
303
a +='/%d' % self.num_tests
304
a += ' in %ds' % (time.time() - self._overall_start_time)
306
a += ', %d errors' % self.error_count
307
if self.failure_count:
308
a += ', %d failed' % self.failure_count
310
a += ', %d skipped' % self.skip_count
314
def report_test_start(self, test):
317
self._progress_prefix_text()
319
+ self._shortened_test_description(test))
321
def report_error(self, test, err):
322
self.error_count += 1
323
self.pb.note('ERROR: %s\n %s\n' % (
324
self._shortened_test_description(test),
328
def report_failure(self, test, err):
329
self.failure_count += 1
330
self.pb.note('FAIL: %s\n %s\n' % (
331
self._shortened_test_description(test),
335
def report_skip(self, test, skip_excinfo):
338
# at the moment these are mostly not things we can fix
339
# and so they just produce stipple; use the verbose reporter
342
# show test and reason for skip
343
self.pb.note('SKIP: %s\n %s\n' % (
344
self._shortened_test_description(test),
347
# since the class name was left behind in the still-visible
349
self.pb.note('SKIP: %s' % (skip_excinfo[1]))
351
def report_cleaning_up(self):
352
self.pb.update('cleaning up...')
358
class VerboseTestResult(ExtendedTestResult):
359
"""Produce long output, with one line per test run plus times"""
361
def _ellipsize_to_right(self, a_string, final_width):
362
"""Truncate and pad a string, keeping the right hand side"""
363
if len(a_string) > final_width:
364
result = '...' + a_string[3-final_width:]
367
return result.ljust(final_width)
369
def report_starting(self):
370
self.stream.write('running %d tests...\n' % self.num_tests)
372
def report_test_start(self, test):
374
name = self._shortened_test_description(test)
375
self.stream.write(self._ellipsize_to_right(name, 60))
378
def report_error(self, test, err):
379
self.error_count += 1
380
self.stream.writeln('ERROR %s\n %s'
381
% (self._testTimeString(), err[1]))
383
def report_failure(self, test, err):
384
self.failure_count += 1
385
self.stream.writeln('FAIL %s\n %s'
386
% (self._testTimeString(), err[1]))
388
def report_success(self, test):
389
self.stream.writeln(' OK %s' % self._testTimeString())
390
for bench_called, stats in getattr(test, '_benchcalls', []):
391
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
392
stats.pprint(file=self.stream)
395
def report_skip(self, test, skip_excinfo):
396
print >>self.stream, ' SKIP %s' % self._testTimeString()
397
print >>self.stream, ' %s' % skip_excinfo[1]
400
class TextTestRunner(object):
401
stop_on_failure = False
409
self.stream = unittest._WritelnDecorator(stream)
410
self.descriptions = descriptions
411
self.verbosity = verbosity
412
self.keep_output = keep_output
413
self._bench_history = bench_history
416
"Run the given test case or test suite."
417
startTime = time.time()
418
if self.verbosity == 1:
419
result_class = TextTestResult
420
elif self.verbosity >= 2:
421
result_class = VerboseTestResult
422
result = result_class(self.stream,
425
bench_history=self._bench_history,
426
num_tests=test.countTestCases(),
428
result.stop_early = self.stop_on_failure
429
result.report_starting()
431
stopTime = time.time()
432
timeTaken = stopTime - startTime
434
self.stream.writeln(result.separator2)
435
run = result.testsRun
436
self.stream.writeln("Ran %d test%s in %.3fs" %
437
(run, run != 1 and "s" or "", timeTaken))
438
self.stream.writeln()
439
if not result.wasSuccessful():
440
self.stream.write("FAILED (")
441
failed, errored = map(len, (result.failures, result.errors))
443
self.stream.write("failures=%d" % failed)
445
if failed: self.stream.write(", ")
446
self.stream.write("errors=%d" % errored)
447
self.stream.writeln(")")
449
self.stream.writeln("OK")
450
result.report_cleaning_up()
451
# This is still a little bogus,
452
# but only a little. Folk not using our testrunner will
453
# have to delete their temp directories themselves.
454
test_root = TestCaseWithMemoryTransport.TEST_ROOT
455
if result.wasSuccessful() or not self.keep_output:
456
if test_root is not None:
457
# If LANG=C we probably have created some bogus paths
458
# which rmtree(unicode) will fail to delete
459
# so make sure we are using rmtree(str) to delete everything
460
# except on win32, where rmtree(str) will fail
461
# since it doesn't have the property of byte-stream paths
462
# (they are either ascii or mbcs)
463
if sys.platform == 'win32':
464
# make sure we are using the unicode win32 api
465
test_root = unicode(test_root)
467
test_root = test_root.encode(
468
sys.getfilesystemencoding())
469
osutils.rmtree(test_root)
471
note("Failed tests working directories are in '%s'\n", test_root)
472
TestCaseWithMemoryTransport.TEST_ROOT = None
477
def iter_suite_tests(suite):
478
"""Return all tests in a suite, recursing through nested suites"""
479
for item in suite._tests:
480
if isinstance(item, unittest.TestCase):
482
elif isinstance(item, unittest.TestSuite):
483
for r in iter_suite_tests(item):
486
raise Exception('unknown object %r inside test suite %r'
490
class TestSkipped(Exception):
491
"""Indicates that a test was intentionally skipped, rather than failing."""
494
class CommandFailed(Exception):
498
class StringIOWrapper(object):
499
"""A wrapper around cStringIO which just adds an encoding attribute.
501
Internally we can check sys.stdout to see what the output encoding
502
should be. However, cStringIO has no encoding attribute that we can
503
set. So we wrap it instead.
508
def __init__(self, s=None):
510
self.__dict__['_cstring'] = StringIO(s)
512
self.__dict__['_cstring'] = StringIO()
514
def __getattr__(self, name, getattr=getattr):
515
return getattr(self.__dict__['_cstring'], name)
517
def __setattr__(self, name, val):
518
if name == 'encoding':
519
self.__dict__['encoding'] = val
521
return setattr(self._cstring, name, val)
32
MODULES_TO_DOCTEST = []
34
from logging import debug, warning, error
524
37
class TestCase(unittest.TestCase):
530
43
Error and debug log messages are redirected from their usual
531
44
location into a temporary file, the contents of which can be
532
retrieved by _get_log(). We use a real OS file, not an in-memory object,
533
so that it can also capture file IO. When the test completes this file
534
is read into memory and removed from disk.
45
retrieved by _get_log().
536
47
There are also convenience functions to invoke bzr's command-line
537
routine, and to build and check bzr trees.
539
In addition to the usual method of overriding tearDown(), this class also
540
allows subclasses to register functions into the _cleanups list, which is
541
run in order as the object is torn down. It's less likely this will be
542
accidentally overlooked.
545
_log_file_name = None
547
_keep_log_file = False
548
# record lsprof data when performing benchmark calls.
549
_gather_lsprof_in_benchmarks = False
551
def __init__(self, methodName='testMethod'):
552
super(TestCase, self).__init__(methodName)
48
routine, and to build and check bzr trees."""
53
# this replaces the default testsweet.TestCase; we don't want logging changed
556
54
unittest.TestCase.setUp(self)
557
self._cleanEnvironment()
558
55
bzrlib.trace.disable_default_logging()
561
self._benchcalls = []
562
self._benchtime = None
564
def _silenceUI(self):
565
"""Turn off UI for duration of test"""
566
# by default the UI is off; tests can turn it on if they want it.
567
saved = bzrlib.ui.ui_factory
569
bzrlib.ui.ui_factory = saved
570
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
571
self.addCleanup(_restore)
573
def _ndiff_strings(self, a, b):
574
"""Return ndiff between two strings containing lines.
576
A trailing newline is added if missing to make the strings
578
if b and b[-1] != '\n':
580
if a and a[-1] != '\n':
582
difflines = difflib.ndiff(a.splitlines(True),
584
linejunk=lambda x: False,
585
charjunk=lambda x: False)
586
return ''.join(difflines)
588
def assertEqualDiff(self, a, b, message=None):
589
"""Assert two texts are equal, if not raise an exception.
591
This is intended for use with multi-line strings where it can
592
be hard to find the differences by eye.
594
# TODO: perhaps override assertEquals to call this for strings?
598
message = "texts not equal:\n"
599
raise AssertionError(message +
600
self._ndiff_strings(a, b))
602
def assertEqualMode(self, mode, mode_test):
603
self.assertEqual(mode, mode_test,
604
'mode mismatch %o != %o' % (mode, mode_test))
606
def assertStartsWith(self, s, prefix):
607
if not s.startswith(prefix):
608
raise AssertionError('string %r does not start with %r' % (s, prefix))
610
def assertEndsWith(self, s, suffix):
611
"""Asserts that s ends with suffix."""
612
if not s.endswith(suffix):
613
raise AssertionError('string %r does not end with %r' % (s, suffix))
615
def assertContainsRe(self, haystack, needle_re):
616
"""Assert that a contains something matching a regular expression."""
617
if not re.search(needle_re, haystack):
618
raise AssertionError('pattern "%s" not found in "%s"'
619
% (needle_re, haystack))
621
def assertNotContainsRe(self, haystack, needle_re):
622
"""Assert that a does not match a regular expression"""
623
if re.search(needle_re, haystack):
624
raise AssertionError('pattern "%s" found in "%s"'
625
% (needle_re, haystack))
627
def assertSubset(self, sublist, superlist):
628
"""Assert that every entry in sublist is present in superlist."""
630
for entry in sublist:
631
if entry not in superlist:
632
missing.append(entry)
634
raise AssertionError("value(s) %r not present in container %r" %
635
(missing, superlist))
637
def assertIs(self, left, right):
638
if not (left is right):
639
raise AssertionError("%r is not %r." % (left, right))
641
def assertTransportMode(self, transport, path, mode):
642
"""Fail if a path does not have mode mode.
644
If modes are not supported on this transport, the assertion is ignored.
646
if not transport._can_roundtrip_unix_modebits():
648
path_stat = transport.stat(path)
649
actual_mode = stat.S_IMODE(path_stat.st_mode)
650
self.assertEqual(mode, actual_mode,
651
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
653
def assertIsInstance(self, obj, kls):
654
"""Fail if obj is not an instance of kls"""
655
if not isinstance(obj, kls):
656
self.fail("%r is an instance of %s rather than %s" % (
657
obj, obj.__class__, kls))
659
def _capture_warnings(self, a_callable, *args, **kwargs):
660
"""A helper for callDeprecated and applyDeprecated.
662
:param a_callable: A callable to call.
663
:param args: The positional arguments for the callable
664
:param kwargs: The keyword arguments for the callable
665
:return: A tuple (warnings, result). result is the result of calling
666
a_callable(*args, **kwargs).
669
def capture_warnings(msg, cls=None, stacklevel=None):
670
# we've hooked into a deprecation specific callpath,
671
# only deprecations should getting sent via it.
672
self.assertEqual(cls, DeprecationWarning)
673
local_warnings.append(msg)
674
original_warning_method = symbol_versioning.warn
675
symbol_versioning.set_warning_method(capture_warnings)
677
result = a_callable(*args, **kwargs)
679
symbol_versioning.set_warning_method(original_warning_method)
680
return (local_warnings, result)
682
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
683
"""Call a deprecated callable without warning the user.
685
:param deprecation_format: The deprecation format that the callable
686
should have been deprecated with. This is the same type as the
687
parameter to deprecated_method/deprecated_function. If the
688
callable is not deprecated with this format, an assertion error
690
:param a_callable: A callable to call. This may be a bound method or
691
a regular function. It will be called with *args and **kwargs.
692
:param args: The positional arguments for the callable
693
:param kwargs: The keyword arguments for the callable
694
:return: The result of a_callable(*args, **kwargs)
696
call_warnings, result = self._capture_warnings(a_callable,
698
expected_first_warning = symbol_versioning.deprecation_string(
699
a_callable, deprecation_format)
700
if len(call_warnings) == 0:
701
self.fail("No assertion generated by call to %s" %
703
self.assertEqual(expected_first_warning, call_warnings[0])
706
def callDeprecated(self, expected, callable, *args, **kwargs):
707
"""Assert that a callable is deprecated in a particular way.
709
This is a very precise test for unusual requirements. The
710
applyDeprecated helper function is probably more suited for most tests
711
as it allows you to simply specify the deprecation format being used
712
and will ensure that that is issued for the function being called.
714
:param expected: a list of the deprecation warnings expected, in order
715
:param callable: The callable to call
716
:param args: The positional arguments for the callable
717
:param kwargs: The keyword arguments for the callable
719
call_warnings, result = self._capture_warnings(callable,
721
self.assertEqual(expected, call_warnings)
724
def _startLogFile(self):
725
"""Send bzr and test log messages to a temporary file.
727
The file is removed as the test is torn down.
56
self._enable_file_logging()
59
def _enable_file_logging(self):
729
60
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
730
62
self._log_file = os.fdopen(fileno, 'w+')
731
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
64
hdlr = logging.StreamHandler(self._log_file)
65
hdlr.setLevel(logging.DEBUG)
66
hdlr.setFormatter(logging.Formatter('%(levelname)4.4s %(message)s'))
67
logging.getLogger('').addHandler(hdlr)
68
logging.getLogger('').setLevel(logging.DEBUG)
70
debug('opened log file %s', name)
732
72
self._log_file_name = name
733
self.addCleanup(self._finishLogFile)
735
def _finishLogFile(self):
736
"""Finished with the log file.
738
Close the file and delete it, unless setKeepLogfile was called.
740
if self._log_file is None:
742
bzrlib.trace.disable_test_log(self._log_nonce)
76
logging.getLogger('').removeHandler(self._log_hdlr)
77
bzrlib.trace.enable_default_logging()
78
logging.debug('%s teardown', self.id())
743
79
self._log_file.close()
744
self._log_file = None
745
if not self._keep_log_file:
746
os.remove(self._log_file_name)
747
self._log_file_name = None
749
def setKeepLogfile(self):
750
"""Make the logfile not be deleted when _finishLogFile is called."""
751
self._keep_log_file = True
753
def addCleanup(self, callable):
754
"""Arrange to run a callable when this case is torn down.
756
Callables are run in the reverse of the order they are registered,
757
ie last-in first-out.
759
if callable in self._cleanups:
760
raise ValueError("cleanup function %r already registered on %s"
762
self._cleanups.append(callable)
764
def _cleanEnvironment(self):
766
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
768
'APPDATA': os.getcwd(),
770
'BZREMAIL': None, # may still be present in the environment
772
'BZR_PROGRESS_BAR': None,
775
self.addCleanup(self._restoreEnvironment)
776
for name, value in new_env.iteritems():
777
self._captureVar(name, value)
779
def _captureVar(self, name, newvalue):
780
"""Set an environment variable, and reset it when finished."""
781
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
783
def _restoreEnvironment(self):
784
for name, value in self.__old_env.iteritems():
785
osutils.set_or_unset_env(name, value)
789
80
unittest.TestCase.tearDown(self)
791
def time(self, callable, *args, **kwargs):
792
"""Run callable and accrue the time it takes to the benchmark time.
794
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
795
this will cause lsprofile statistics to be gathered and stored in
798
if self._benchtime is None:
802
if not self._gather_lsprof_in_benchmarks:
803
return callable(*args, **kwargs)
805
# record this benchmark
806
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
808
self._benchcalls.append(((callable, args, kwargs), stats))
811
self._benchtime += time.time() - start
813
def _runCleanups(self):
814
"""Run registered cleanup functions.
816
This should only be called from TestCase.tearDown.
818
# TODO: Perhaps this should keep running cleanups even if
820
for cleanup_fn in reversed(self._cleanups):
823
83
def log(self, *args):
826
def _get_log(self, keep_log_file=False):
827
"""Return as a string the log for this test. If the file is still
828
on disk and keep_log_file=False, delete the log file and store the
829
content in self._log_contents."""
830
# flush the log file, to get all content
832
bzrlib.trace._trace_file.flush()
833
if self._log_contents:
834
return self._log_contents
835
if self._log_file_name is not None:
836
logfile = open(self._log_file_name)
838
log_contents = logfile.read()
841
if not keep_log_file:
842
self._log_contents = log_contents
843
os.remove(self._log_file_name)
846
return "DELETED log file to reduce memory footprint"
848
def capture(self, cmd, retcode=0):
849
"""Shortcut that splits cmd into words, runs, and returns stdout"""
850
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
852
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
854
"""Invoke bzr and return (stdout, stderr).
856
Useful for code that wants to check the contents of the
857
output, the way error messages are presented, etc.
87
"""Return as a string the log for this test"""
88
return open(self._log_file_name).read()
90
def run_bzr(self, *args, **kwargs):
91
"""Invoke bzr, as if it were run from the command line.
859
93
This should be the main method for tests that want to exercise the
860
94
overall behavior of the bzr application (rather than a unit test
863
97
Much of the old code runs bzr by forking a new copy of Python, but
864
98
that is slower, harder to debug, and generally not necessary.
866
This runs bzr through the interface that catches and reports
867
errors, and with logging set to something approximating the
868
default, so that error reporting can be checked.
870
:param argv: arguments to invoke bzr
871
:param retcode: expected return code, or None for don't-care.
872
:param encoding: encoding for sys.stdout and sys.stderr
873
:param stdin: A string to be used as stdin for the command.
874
:param working_dir: Change to this directory before running
877
encoding = bzrlib.user_encoding
878
if stdin is not None:
879
stdin = StringIO(stdin)
880
stdout = StringIOWrapper()
881
stderr = StringIOWrapper()
882
stdout.encoding = encoding
883
stderr.encoding = encoding
885
self.log('run bzr: %r', argv)
886
# FIXME: don't call into logging here
887
handler = logging.StreamHandler(stderr)
888
handler.setLevel(logging.INFO)
889
logger = logging.getLogger('')
890
logger.addHandler(handler)
891
old_ui_factory = bzrlib.ui.ui_factory
892
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
895
bzrlib.ui.ui_factory.stdin = stdin
898
if working_dir is not None:
899
cwd = osutils.getcwd()
900
os.chdir(working_dir)
903
result = self.apply_redirected(stdin, stdout, stderr,
904
bzrlib.commands.run_bzr_catch_errors,
907
logger.removeHandler(handler)
908
bzrlib.ui.ui_factory = old_ui_factory
912
out = stdout.getvalue()
913
err = stderr.getvalue()
915
self.log('output:\n%r', out)
917
self.log('errors:\n%r', err)
918
if retcode is not None:
919
self.assertEquals(retcode, result)
922
def run_bzr(self, *args, **kwargs):
923
"""Invoke bzr, as if it were run from the command line.
925
This should be the main method for tests that want to exercise the
926
overall behavior of the bzr application (rather than a unit test
927
or a functional test of the library.)
929
This sends the stdout/stderr results into the test's log,
930
where it may be useful for debugging. See also run_captured.
932
:param stdin: A string to be used as stdin for the command.
934
retcode = kwargs.pop('retcode', 0)
935
encoding = kwargs.pop('encoding', None)
936
stdin = kwargs.pop('stdin', None)
937
working_dir = kwargs.pop('working_dir', None)
938
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
939
stdin=stdin, working_dir=working_dir)
941
def run_bzr_decode(self, *args, **kwargs):
942
if 'encoding' in kwargs:
943
encoding = kwargs['encoding']
945
encoding = bzrlib.user_encoding
946
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
948
def run_bzr_error(self, error_regexes, *args, **kwargs):
949
"""Run bzr, and check that stderr contains the supplied regexes
100
retcode = kwargs.get('retcode', 0)
101
result = self.apply_redirected(None, None, None,
102
bzrlib.commands.run_bzr, args)
103
self.assertEquals(result, retcode)
951
:param error_regexes: Sequence of regular expressions which
952
must each be found in the error output. The relative ordering
954
:param args: command-line arguments for bzr
955
:param kwargs: Keyword arguments which are interpreted by run_bzr
956
This function changes the default value of retcode to be 3,
957
since in most cases this is run when you expect bzr to fail.
958
:return: (out, err) The actual output of running the command (in case you
959
want to do more inspection)
962
# Make sure that commit is failing because there is nothing to do
963
self.run_bzr_error(['no changes to commit'],
964
'commit', '-m', 'my commit comment')
965
# Make sure --strict is handling an unknown file, rather than
966
# giving us the 'nothing to do' error
967
self.build_tree(['unknown'])
968
self.run_bzr_error(['Commit refused because there are unknown files'],
969
'commit', '--strict', '-m', 'my commit comment')
971
kwargs.setdefault('retcode', 3)
972
out, err = self.run_bzr(*args, **kwargs)
973
for regex in error_regexes:
974
self.assertContainsRe(err, regex)
977
def run_bzr_subprocess(self, *args, **kwargs):
978
"""Run bzr in a subprocess for testing.
980
This starts a new Python interpreter and runs bzr in there.
981
This should only be used for tests that have a justifiable need for
982
this isolation: e.g. they are testing startup time, or signal
983
handling, or early startup code, etc. Subprocess code can't be
984
profiled or debugged so easily.
986
:param retcode: The status code that is expected. Defaults to 0. If
987
None is supplied, the status code is not checked.
988
:param env_changes: A dictionary which lists changes to environment
989
variables. A value of None will unset the env variable.
990
The values must be strings. The change will only occur in the
991
child, so you don't need to fix the environment after running.
992
:param universal_newlines: Convert CRLF => LF
993
:param allow_plugins: By default the subprocess is run with
994
--no-plugins to ensure test reproducibility. Also, it is possible
995
for system-wide plugins to create unexpected output on stderr,
996
which can cause unnecessary test failures.
998
env_changes = kwargs.get('env_changes', {})
999
working_dir = kwargs.get('working_dir', None)
1000
allow_plugins = kwargs.get('allow_plugins', False)
1001
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1002
working_dir=working_dir,
1003
allow_plugins=allow_plugins)
1004
# We distinguish between retcode=None and retcode not passed.
1005
supplied_retcode = kwargs.get('retcode', 0)
1006
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1007
universal_newlines=kwargs.get('universal_newlines', False),
1010
def start_bzr_subprocess(self, process_args, env_changes=None,
1011
skip_if_plan_to_signal=False,
1013
allow_plugins=False):
1014
"""Start bzr in a subprocess for testing.
1016
This starts a new Python interpreter and runs bzr in there.
1017
This should only be used for tests that have a justifiable need for
1018
this isolation: e.g. they are testing startup time, or signal
1019
handling, or early startup code, etc. Subprocess code can't be
1020
profiled or debugged so easily.
1022
:param process_args: a list of arguments to pass to the bzr executable,
1023
for example `['--version']`.
1024
:param env_changes: A dictionary which lists changes to environment
1025
variables. A value of None will unset the env variable.
1026
The values must be strings. The change will only occur in the
1027
child, so you don't need to fix the environment after running.
1028
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1030
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1032
:returns: Popen object for the started process.
1034
if skip_if_plan_to_signal:
1035
if not getattr(os, 'kill', None):
1036
raise TestSkipped("os.kill not available.")
1038
if env_changes is None:
1042
def cleanup_environment():
1043
for env_var, value in env_changes.iteritems():
1044
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1046
def restore_environment():
1047
for env_var, value in old_env.iteritems():
1048
osutils.set_or_unset_env(env_var, value)
1050
bzr_path = self.get_bzr_path()
1053
if working_dir is not None:
1054
cwd = osutils.getcwd()
1055
os.chdir(working_dir)
1058
# win32 subprocess doesn't support preexec_fn
1059
# so we will avoid using it on all platforms, just to
1060
# make sure the code path is used, and we don't break on win32
1061
cleanup_environment()
1062
command = [sys.executable, bzr_path]
1063
if not allow_plugins:
1064
command.append('--no-plugins')
1065
command.extend(process_args)
1066
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1068
restore_environment()
1074
def _popen(self, *args, **kwargs):
1075
"""Place a call to Popen.
1077
Allows tests to override this method to intercept the calls made to
1078
Popen for introspection.
1080
return Popen(*args, **kwargs)
1082
def get_bzr_path(self):
1083
"""Return the path of the 'bzr' executable for this test suite."""
1084
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1085
if not os.path.isfile(bzr_path):
1086
# We are probably installed. Assume sys.argv is the right file
1087
bzr_path = sys.argv[0]
1090
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1091
universal_newlines=False, process_args=None):
1092
"""Finish the execution of process.
1094
:param process: the Popen object returned from start_bzr_subprocess.
1095
:param retcode: The status code that is expected. Defaults to 0. If
1096
None is supplied, the status code is not checked.
1097
:param send_signal: an optional signal to send to the process.
1098
:param universal_newlines: Convert CRLF => LF
1099
:returns: (stdout, stderr)
1101
if send_signal is not None:
1102
os.kill(process.pid, send_signal)
1103
out, err = process.communicate()
1105
if universal_newlines:
1106
out = out.replace('\r\n', '\n')
1107
err = err.replace('\r\n', '\n')
1109
if retcode is not None and retcode != process.returncode:
1110
if process_args is None:
1111
process_args = "(unknown args)"
1112
mutter('Output of bzr %s:\n%s', process_args, out)
1113
mutter('Error for bzr %s:\n%s', process_args, err)
1114
self.fail('Command bzr %s failed with retcode %s != %s'
1115
% (process_args, retcode, process.returncode))
1118
105
def check_inventory_shape(self, inv, shape):
1119
"""Compare an inventory to a list of expected names.
107
Compare an inventory to a list of expected names.
1121
109
Fail if they are not precisely equal.
1140
128
"""Call callable with redirected std io pipes.
1142
130
Returns the return code."""
131
from StringIO import StringIO
1143
132
if not callable(a_callable):
1144
133
raise ValueError("a_callable must be callable.")
1145
134
if stdin is None:
1146
135
stdin = StringIO("")
1147
136
if stdout is None:
1148
if getattr(self, "_log_file", None) is not None:
1149
stdout = self._log_file
137
stdout = self._log_file
1152
138
if stderr is None:
1153
if getattr(self, "_log_file", None is not None):
1154
stderr = self._log_file
139
stderr = self._log_file
1157
140
real_stdin = sys.stdin
1158
141
real_stdout = sys.stdout
1159
142
real_stderr = sys.stderr
1161
145
sys.stdout = stdout
1162
146
sys.stderr = stderr
1163
147
sys.stdin = stdin
1164
return a_callable(*args, **kwargs)
148
result = a_callable(*args, **kwargs)
1166
150
sys.stdout = real_stdout
1167
151
sys.stderr = real_stderr
1168
152
sys.stdin = real_stdin
1170
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1171
def merge(self, branch_from, wt_to):
1172
"""A helper for tests to do a ui-less merge.
1174
This should move to the main library when someone has time to integrate
1177
# minimal ui-less merge.
1178
wt_to.branch.fetch(branch_from)
1179
base_rev = common_ancestor(branch_from.last_revision(),
1180
wt_to.branch.last_revision(),
1181
wt_to.branch.repository)
1182
merge_inner(wt_to.branch, branch_from.basis_tree(),
1183
wt_to.branch.repository.revision_tree(base_rev),
1185
wt_to.add_parent_tree_id(branch_from.last_revision())
1188
156
BzrTestBase = TestCase
1191
class TestCaseWithMemoryTransport(TestCase):
1192
"""Common test class for tests that do not need disk resources.
1194
Tests that need disk resources should derive from TestCaseWithTransport.
1196
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1198
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1199
a directory which does not exist. This serves to help ensure test isolation
1200
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1201
must exist. However, TestCaseWithMemoryTransport does not offer local
1202
file defaults for the transport in tests, nor does it obey the command line
1203
override, so tests that accidentally write to the common directory should
1211
def __init__(self, methodName='runTest'):
1212
# allow test parameterisation after test construction and before test
1213
# execution. Variables that the parameteriser sets need to be
1214
# ones that are not set by setUp, or setUp will trash them.
1215
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1216
self.transport_server = default_transport
1217
self.transport_readonly_server = None
1219
def failUnlessExists(self, path):
1220
"""Fail unless path, which may be abs or relative, exists."""
1221
self.failUnless(osutils.lexists(path))
1223
def failIfExists(self, path):
1224
"""Fail if path, which may be abs or relative, exists."""
1225
self.failIf(osutils.lexists(path))
1227
def get_transport(self):
1228
"""Return a writeable transport for the test scratch space"""
1229
t = get_transport(self.get_url())
1230
self.assertFalse(t.is_readonly())
1233
def get_readonly_transport(self):
1234
"""Return a readonly transport for the test scratch space
1236
This can be used to test that operations which should only need
1237
readonly access in fact do not try to write.
1239
t = get_transport(self.get_readonly_url())
1240
self.assertTrue(t.is_readonly())
1243
def get_readonly_server(self):
1244
"""Get the server instance for the readonly transport
1246
This is useful for some tests with specific servers to do diagnostics.
1248
if self.__readonly_server is None:
1249
if self.transport_readonly_server is None:
1250
# readonly decorator requested
1251
# bring up the server
1253
self.__readonly_server = ReadonlyServer()
1254
self.__readonly_server.setUp(self.__server)
1256
self.__readonly_server = self.transport_readonly_server()
1257
self.__readonly_server.setUp()
1258
self.addCleanup(self.__readonly_server.tearDown)
1259
return self.__readonly_server
1261
def get_readonly_url(self, relpath=None):
1262
"""Get a URL for the readonly transport.
1264
This will either be backed by '.' or a decorator to the transport
1265
used by self.get_url()
1266
relpath provides for clients to get a path relative to the base url.
1267
These should only be downwards relative, not upwards.
1269
base = self.get_readonly_server().get_url()
1270
if relpath is not None:
1271
if not base.endswith('/'):
1273
base = base + relpath
1276
def get_server(self):
1277
"""Get the read/write server instance.
1279
This is useful for some tests with specific servers that need
1282
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1283
is no means to override it.
1285
if self.__server is None:
1286
self.__server = MemoryServer()
1287
self.__server.setUp()
1288
self.addCleanup(self.__server.tearDown)
1289
return self.__server
1291
def get_url(self, relpath=None):
1292
"""Get a URL (or maybe a path) for the readwrite transport.
1294
This will either be backed by '.' or to an equivalent non-file based
1296
relpath provides for clients to get a path relative to the base url.
1297
These should only be downwards relative, not upwards.
1299
base = self.get_server().get_url()
1300
if relpath is not None and relpath != '.':
1301
if not base.endswith('/'):
1303
# XXX: Really base should be a url; we did after all call
1304
# get_url()! But sometimes it's just a path (from
1305
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1306
# to a non-escaped local path.
1307
if base.startswith('./') or base.startswith('/'):
1310
base += urlutils.escape(relpath)
1313
def _make_test_root(self):
1314
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1318
root = u'test%04d.tmp' % i
1322
if e.errno == errno.EEXIST:
1327
# successfully created
1328
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1330
# make a fake bzr directory there to prevent any tests propagating
1331
# up onto the source directory's real branch
1332
bzrdir.BzrDir.create_standalone_workingtree(
1333
TestCaseWithMemoryTransport.TEST_ROOT)
1335
def makeAndChdirToTestDir(self):
1336
"""Create a temporary directories for this one test.
1338
This must set self.test_home_dir and self.test_dir and chdir to
1341
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1343
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1344
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1345
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1347
def make_branch(self, relpath, format=None):
1348
"""Create a branch on the transport at relpath."""
1349
repo = self.make_repository(relpath, format=format)
1350
return repo.bzrdir.create_branch()
1352
def make_bzrdir(self, relpath, format=None):
1354
# might be a relative or absolute path
1355
maybe_a_url = self.get_url(relpath)
1356
segments = maybe_a_url.rsplit('/', 1)
1357
t = get_transport(maybe_a_url)
1358
if len(segments) > 1 and segments[-1] not in ('', '.'):
1361
except errors.FileExists:
1364
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1365
return format.initialize_on_transport(t)
1366
except errors.UninitializableFormat:
1367
raise TestSkipped("Format %s is not initializable." % format)
1369
def make_repository(self, relpath, shared=False, format=None):
1370
"""Create a repository on our default transport at relpath."""
1371
made_control = self.make_bzrdir(relpath, format=format)
1372
return made_control.create_repository(shared=shared)
1374
def make_branch_and_memory_tree(self, relpath, format=None):
1375
"""Create a branch on the default transport and a MemoryTree for it."""
1376
b = self.make_branch(relpath, format=format)
1377
return memorytree.MemoryTree.create_on_branch(b)
1379
def overrideEnvironmentForTesting(self):
1380
os.environ['HOME'] = self.test_home_dir
1381
os.environ['APPDATA'] = self.test_home_dir
1384
super(TestCaseWithMemoryTransport, self).setUp()
1385
self._make_test_root()
1386
_currentdir = os.getcwdu()
1387
def _leaveDirectory():
1388
os.chdir(_currentdir)
1389
self.addCleanup(_leaveDirectory)
1390
self.makeAndChdirToTestDir()
1391
self.overrideEnvironmentForTesting()
1392
self.__readonly_server = None
1393
self.__server = None
1396
class TestCaseInTempDir(TestCaseWithMemoryTransport):
159
class TestCaseInTempDir(TestCase):
1397
160
"""Derived class that runs a test within a temporary directory.
1399
162
This is useful for tests that need to create a branch, etc.
1414
179
if contents != expect:
1415
180
self.log("expected: %r" % expect)
1416
181
self.log("actually: %r" % contents)
1417
self.fail("contents of %s not as expected" % filename)
1419
def makeAndChdirToTestDir(self):
1420
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1422
For TestCaseInTempDir we create a temporary directory based on the test
1423
name and then create two subdirs - test and home under it.
1425
# shorten the name, to avoid test failures due to path length
1426
short_id = self.id().replace('bzrlib.tests.', '') \
1427
.replace('__main__.', '')[-100:]
1428
# it's possible the same test class is run several times for
1429
# parameterized tests, so make sure the names don't collide.
1433
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1435
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1436
if os.path.exists(candidate_dir):
1440
os.mkdir(candidate_dir)
1441
self.test_home_dir = candidate_dir + '/home'
1442
os.mkdir(self.test_home_dir)
1443
self.test_dir = candidate_dir + '/work'
1444
os.mkdir(self.test_dir)
1445
os.chdir(self.test_dir)
1448
def build_tree(self, shape, line_endings='native', transport=None):
182
self.fail("contents of %s not as expected")
184
def _make_test_root(self):
189
if TestCaseInTempDir.TEST_ROOT is not None:
191
TestCaseInTempDir.TEST_ROOT = os.path.abspath(
192
tempfile.mkdtemp(suffix='.tmp',
193
prefix=self._TEST_NAME + '-',
196
# make a fake bzr directory there to prevent any tests propagating
197
# up onto the source directory's real branch
198
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
201
super(TestCaseInTempDir, self).setUp()
203
self._make_test_root()
204
self._currentdir = os.getcwdu()
205
self.test_dir = os.path.join(self.TEST_ROOT, self.id())
206
os.mkdir(self.test_dir)
207
os.chdir(self.test_dir)
211
os.chdir(self._currentdir)
212
super(TestCaseInTempDir, self).tearDown()
214
def _formcmd(self, cmd):
215
if isinstance(cmd, basestring):
218
cmd[0] = self.BZRPATH
219
if self.OVERRIDE_PYTHON:
220
cmd.insert(0, self.OVERRIDE_PYTHON)
221
self.log('$ %r' % cmd)
224
def runcmd(self, cmd, retcode=0):
225
"""Run one command and check the return code.
227
Returns a tuple of (stdout,stderr) strings.
229
If a single string is based, it is split into words.
230
For commands that are not simple space-separated words, please
231
pass a list instead."""
232
cmd = self._formcmd(cmd)
233
self.log('$ ' + ' '.join(cmd))
234
actual_retcode = subprocess.call(cmd, stdout=self._log_file,
235
stderr=self._log_file)
236
if retcode != actual_retcode:
237
raise CommandFailed("test failed: %r returned %d, expected %d"
238
% (cmd, actual_retcode, retcode))
240
def backtick(self, cmd, retcode=0):
241
"""Run a command and return its output"""
242
cmd = self._formcmd(cmd)
243
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=self._log_file)
244
outd, errd = child.communicate()
246
actual_retcode = child.wait()
248
outd = outd.replace('\r', '')
250
if retcode != actual_retcode:
251
raise CommandFailed("test failed: %r returned %d, expected %d"
252
% (cmd, actual_retcode, retcode))
258
def build_tree(self, shape):
1449
259
"""Build a test tree according to a pattern.
1451
261
shape is a sequence of file specifications. If the final
1452
262
character is '/', a directory is created.
1454
This assumes that all the elements in the tree being built are new.
1456
264
This doesn't add anything to a branch.
1457
:param line_endings: Either 'binary' or 'native'
1458
in binary mode, exact contents are written
1459
in native mode, the line endings match the
1460
default platform endings.
1462
:param transport: A transport to write to, for building trees on
1463
VFS's. If the transport is readonly or None,
1464
"." is opened automatically.
1466
# It's OK to just create them using forward slashes on windows.
1467
if transport is None or transport.is_readonly():
1468
transport = get_transport(".")
266
# XXX: It's OK to just create them using forward slashes on windows?
1469
268
for name in shape:
1470
self.assert_(isinstance(name, basestring))
269
assert isinstance(name, basestring)
1471
270
if name[-1] == '/':
1472
transport.mkdir(urlutils.escape(name[:-1]))
1474
if line_endings == 'binary':
1476
elif line_endings == 'native':
1479
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
1480
content = "contents of %s%s" % (name.encode('utf-8'), end)
1481
# Technically 'put()' is the right command. However, put
1482
# uses an AtomicFile, which requires an extra rename into place
1483
# As long as the files didn't exist in the past, append() will
1484
# do the same thing as put()
1485
# On jam's machine, make_kernel_like_tree is:
1486
# put: 4.5-7.5s (averaging 6s)
1488
# put_non_atomic: 2.9-4.5s
1489
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1491
def build_tree_contents(self, shape):
1492
build_tree_contents(shape)
1494
def assertFileEqual(self, content, path):
1495
"""Fail if path does not contain 'content'."""
1496
self.failUnless(osutils.lexists(path))
1497
# TODO: jam 20060427 Shouldn't this be 'rb'?
1498
self.assertEqualDiff(content, open(path, 'r').read())
1501
class TestCaseWithTransport(TestCaseInTempDir):
1502
"""A test case that provides get_url and get_readonly_url facilities.
1504
These back onto two transport servers, one for readonly access and one for
1507
If no explicit class is provided for readonly access, a
1508
ReadonlyTransportDecorator is used instead which allows the use of non disk
1509
based read write transports.
1511
If an explicit class is provided for readonly access, that server and the
1512
readwrite one must both define get_url() as resolving to os.getcwd().
1515
def get_server(self):
1516
"""See TestCaseWithMemoryTransport.
1518
This is useful for some tests with specific servers that need
1521
if self.__server is None:
1522
self.__server = self.transport_server()
1523
self.__server.setUp()
1524
self.addCleanup(self.__server.tearDown)
1525
return self.__server
1527
def make_branch_and_tree(self, relpath, format=None):
1528
"""Create a branch on the transport and a tree locally.
1530
If the transport is not a LocalTransport, the Tree can't be created on
1531
the transport. In that case the working tree is created in the local
1532
directory, and the returned tree's branch and repository will also be
1535
This will fail if the original default transport for this test
1536
case wasn't backed by the working directory, as the branch won't
1537
be on disk for us to open it.
1539
:param format: The BzrDirFormat.
1540
:returns: the WorkingTree.
1542
# TODO: always use the local disk path for the working tree,
1543
# this obviously requires a format that supports branch references
1544
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
1546
b = self.make_branch(relpath, format=format)
1548
return b.bzrdir.create_workingtree()
1549
except errors.NotLocalUrl:
1550
# We can only make working trees locally at the moment. If the
1551
# transport can't support them, then reopen the branch on a local
1552
# transport, and create the working tree there.
1554
# Possibly we should instead keep
1555
# the non-disk-backed branch and create a local checkout?
1556
bd = bzrdir.BzrDir.open(relpath)
1557
return bd.create_workingtree()
1559
def assertIsDirectory(self, relpath, transport):
1560
"""Assert that relpath within transport is a directory.
1562
This may not be possible on all transports; in that case it propagates
1563
a TransportNotPossible.
1566
mode = transport.stat(relpath).st_mode
1567
except errors.NoSuchFile:
1568
self.fail("path %s is not a directory; no such file"
1570
if not stat.S_ISDIR(mode):
1571
self.fail("path %s is not a directory; has mode %#o"
1575
super(TestCaseWithTransport, self).setUp()
1576
self.__server = None
1579
class ChrootedTestCase(TestCaseWithTransport):
1580
"""A support class that provides readonly urls outside the local namespace.
1582
This is done by checking if self.transport_server is a MemoryServer. if it
1583
is then we are chrooted already, if it is not then an HttpServer is used
1586
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1587
be used without needed to redo it when a different
1588
subclass is in use ?
1592
super(ChrootedTestCase, self).setUp()
1593
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1594
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1597
def filter_suite_by_re(suite, pattern):
1598
result = TestUtil.TestSuite()
1599
filter_re = re.compile(pattern)
1600
for test in iter_suite_tests(suite):
1601
if filter_re.search(test.id()):
1602
result.addTest(test)
1606
def run_suite(suite, name='test', verbose=False, pattern=".*",
1607
stop_on_failure=False, keep_output=False,
1608
transport=None, lsprof_timed=None, bench_history=None):
1609
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1614
runner = TextTestRunner(stream=sys.stdout,
1616
verbosity=verbosity,
1617
keep_output=keep_output,
1618
bench_history=bench_history)
1619
runner.stop_on_failure=stop_on_failure
1621
suite = filter_suite_by_re(suite, pattern)
1622
result = runner.run(suite)
1623
return result.wasSuccessful()
1626
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1629
test_suite_factory=None,
1631
bench_history=None):
1632
"""Run the whole test suite under the enhanced runner"""
1633
# XXX: Very ugly way to do this...
1634
# Disable warning about old formats because we don't want it to disturb
1635
# any blackbox tests.
1636
from bzrlib import repository
1637
repository._deprecation_warning_done = True
1639
global default_transport
1640
if transport is None:
1641
transport = default_transport
1642
old_transport = default_transport
1643
default_transport = transport
1645
if test_suite_factory is None:
1646
suite = test_suite()
1648
suite = test_suite_factory()
1649
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1650
stop_on_failure=stop_on_failure, keep_output=keep_output,
1651
transport=transport,
1652
lsprof_timed=lsprof_timed,
1653
bench_history=bench_history)
1655
default_transport = old_transport
274
print >>f, "contents of", name
279
class MetaTestLog(TestCase):
280
def test_logging(self):
281
"""Test logs are captured when a test fails."""
282
logging.info('an info message')
283
warning('something looks dodgy...')
284
logging.debug('hello, test is running')
288
def selftest(verbose=False, pattern=".*"):
289
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
1658
292
def test_suite():
1659
"""Build and return TestSuite for the whole of bzrlib.
1661
This function can be replaced if you need to change the default test
1662
suite on a global basis, but it is not encouraged.
1665
'bzrlib.tests.test_ancestry',
1666
'bzrlib.tests.test_api',
1667
'bzrlib.tests.test_atomicfile',
1668
'bzrlib.tests.test_bad_files',
1669
'bzrlib.tests.test_branch',
1670
'bzrlib.tests.test_bundle',
1671
'bzrlib.tests.test_bzrdir',
1672
'bzrlib.tests.test_cache_utf8',
1673
'bzrlib.tests.test_command',
1674
'bzrlib.tests.test_commit',
1675
'bzrlib.tests.test_commit_merge',
1676
'bzrlib.tests.test_config',
1677
'bzrlib.tests.test_conflicts',
1678
'bzrlib.tests.test_decorators',
1679
'bzrlib.tests.test_diff',
1680
'bzrlib.tests.test_doc_generate',
1681
'bzrlib.tests.test_errors',
1682
'bzrlib.tests.test_escaped_store',
1683
'bzrlib.tests.test_fetch',
1684
'bzrlib.tests.test_ftp_transport',
1685
'bzrlib.tests.test_gpg',
1686
'bzrlib.tests.test_graph',
1687
'bzrlib.tests.test_hashcache',
1688
'bzrlib.tests.test_http',
1689
'bzrlib.tests.test_http_response',
1690
'bzrlib.tests.test_identitymap',
1691
'bzrlib.tests.test_ignores',
1692
'bzrlib.tests.test_inv',
1693
'bzrlib.tests.test_knit',
1694
'bzrlib.tests.test_lazy_import',
1695
'bzrlib.tests.test_lazy_regex',
1696
'bzrlib.tests.test_lockdir',
1697
'bzrlib.tests.test_lockable_files',
1698
'bzrlib.tests.test_log',
1699
'bzrlib.tests.test_memorytree',
1700
'bzrlib.tests.test_merge',
1701
'bzrlib.tests.test_merge3',
1702
'bzrlib.tests.test_merge_core',
1703
'bzrlib.tests.test_missing',
1704
'bzrlib.tests.test_msgeditor',
1705
'bzrlib.tests.test_nonascii',
1706
'bzrlib.tests.test_options',
1707
'bzrlib.tests.test_osutils',
1708
'bzrlib.tests.test_patch',
1709
'bzrlib.tests.test_patches',
1710
'bzrlib.tests.test_permissions',
1711
'bzrlib.tests.test_plugins',
1712
'bzrlib.tests.test_progress',
1713
'bzrlib.tests.test_reconcile',
1714
'bzrlib.tests.test_registry',
1715
'bzrlib.tests.test_repository',
1716
'bzrlib.tests.test_revert',
1717
'bzrlib.tests.test_revision',
1718
'bzrlib.tests.test_revisionnamespaces',
1719
'bzrlib.tests.test_revisiontree',
1720
'bzrlib.tests.test_rio',
1721
'bzrlib.tests.test_sampler',
1722
'bzrlib.tests.test_selftest',
1723
'bzrlib.tests.test_setup',
1724
'bzrlib.tests.test_sftp_transport',
1725
'bzrlib.tests.test_smart_add',
1726
'bzrlib.tests.test_smart_transport',
1727
'bzrlib.tests.test_source',
1728
'bzrlib.tests.test_status',
1729
'bzrlib.tests.test_store',
1730
'bzrlib.tests.test_symbol_versioning',
1731
'bzrlib.tests.test_testament',
1732
'bzrlib.tests.test_textfile',
1733
'bzrlib.tests.test_textmerge',
1734
'bzrlib.tests.test_trace',
1735
'bzrlib.tests.test_transactions',
1736
'bzrlib.tests.test_transform',
1737
'bzrlib.tests.test_transport',
1738
'bzrlib.tests.test_tree',
1739
'bzrlib.tests.test_treebuilder',
1740
'bzrlib.tests.test_tsort',
1741
'bzrlib.tests.test_tuned_gzip',
1742
'bzrlib.tests.test_ui',
1743
'bzrlib.tests.test_upgrade',
1744
'bzrlib.tests.test_urlutils',
1745
'bzrlib.tests.test_versionedfile',
1746
'bzrlib.tests.test_version',
1747
'bzrlib.tests.test_version_info',
1748
'bzrlib.tests.test_weave',
1749
'bzrlib.tests.test_whitebox',
1750
'bzrlib.tests.test_workingtree',
1751
'bzrlib.tests.test_xml',
293
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
294
import bzrlib, bzrlib.store, bzrlib.inventory, bzrlib.branch
295
import bzrlib.osutils, bzrlib.commands, bzrlib.merge3, bzrlib.plugin
296
from doctest import DocTestSuite
302
global MODULES_TO_TEST, MODULES_TO_DOCTEST
305
['bzrlib.selftest.MetaTestLog',
306
'bzrlib.selftest.testinv',
307
'bzrlib.selftest.testfetch',
308
'bzrlib.selftest.versioning',
309
'bzrlib.selftest.whitebox',
310
'bzrlib.selftest.testmerge3',
311
'bzrlib.selftest.testhashcache',
312
'bzrlib.selftest.teststatus',
313
'bzrlib.selftest.testlog',
314
'bzrlib.selftest.blackbox',
315
'bzrlib.selftest.testrevisionnamespaces',
316
'bzrlib.selftest.testbranch',
317
'bzrlib.selftest.testrevision',
318
'bzrlib.selftest.test_merge_core',
319
'bzrlib.selftest.test_smart_add',
320
'bzrlib.selftest.testdiff',
1753
test_transport_implementations = [
1754
'bzrlib.tests.test_transport_implementations',
1755
'bzrlib.tests.test_read_bundle',
1757
suite = TestUtil.TestSuite()
1758
loader = TestUtil.TestLoader()
1759
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1760
from bzrlib.transport import TransportTestProviderAdapter
1761
adapter = TransportTestProviderAdapter()
1762
adapt_modules(test_transport_implementations, adapter, loader, suite)
1763
for package in packages_to_test():
1764
suite.addTest(package.test_suite())
324
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
325
bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
326
if m not in MODULES_TO_DOCTEST:
327
MODULES_TO_DOCTEST.append(m)
329
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
330
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
333
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
1765
334
for m in MODULES_TO_TEST:
1766
suite.addTest(loader.loadTestsFromModule(m))
1767
for m in MODULES_TO_DOCTEST:
1769
suite.addTest(doctest.DocTestSuite(m))
1770
except ValueError, e:
1771
print '**failed to get doctest for: %s\n%s' %(m,e)
1773
for name, plugin in bzrlib.plugin.all_plugins().items():
1774
if getattr(plugin, 'test_suite', None) is not None:
1775
suite.addTest(plugin.test_suite())
335
suite.addTest(TestLoader().loadTestsFromModule(m))
336
for m in (MODULES_TO_DOCTEST):
337
suite.addTest(DocTestSuite(m))
338
for p in bzrlib.plugin.all_plugins:
339
if hasattr(p, 'test_suite'):
340
suite.addTest(p.test_suite())
1779
def adapt_modules(mods_list, adapter, loader, suite):
1780
"""Adapt the modules in mods_list using adapter and add to suite."""
1781
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1782
suite.addTests(adapter.adapt(test))