1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
18
from cStringIO import StringIO
37
from pprint import pformat
42
from subprocess import Popen, PIPE
62
30
import bzrlib.commands
63
import bzrlib.timestamp
65
import bzrlib.inventory
66
import bzrlib.iterablefile
71
# lsprof not available
73
from bzrlib.merge import merge_inner
77
from bzrlib.revision import common_ancestor
79
from bzrlib import symbol_versioning
80
from bzrlib.symbol_versioning import (
84
31
import bzrlib.trace
85
from bzrlib.transport import get_transport
86
import bzrlib.transport
87
from bzrlib.transport.local import LocalURLServer
88
from bzrlib.transport.memory import MemoryServer
89
from bzrlib.transport.readonly import ReadonlyServer
90
from bzrlib.trace import mutter, note
91
from bzrlib.tests import TestUtil
92
from bzrlib.tests.HttpServer import HttpServer
93
from bzrlib.tests.TestUtil import (
97
from bzrlib.tests.treeshape import build_tree_contents
98
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
100
# Mark this python module as being part of the implementation
101
# of unittest: this gives us better tracebacks where the last
102
# shown frame is the test code, not our assertXYZ.
105
default_transport = LocalURLServer
33
import bzrlib.osutils as osutils
34
from bzrlib.selftest import TestUtil
35
from bzrlib.selftest.TestUtil import TestLoader, TestSuite
107
38
MODULES_TO_TEST = []
108
MODULES_TO_DOCTEST = [
121
def packages_to_test():
122
"""Return a list of packages to test.
124
The packages are not globally imported so that import failures are
125
triggered when running selftest, not when importing the command.
128
import bzrlib.tests.blackbox
129
import bzrlib.tests.branch_implementations
130
import bzrlib.tests.bzrdir_implementations
131
import bzrlib.tests.interrepository_implementations
132
import bzrlib.tests.interversionedfile_implementations
133
import bzrlib.tests.intertree_implementations
134
import bzrlib.tests.per_lock
135
import bzrlib.tests.repository_implementations
136
import bzrlib.tests.revisionstore_implementations
137
import bzrlib.tests.tree_implementations
138
import bzrlib.tests.workingtree_implementations
141
bzrlib.tests.blackbox,
142
bzrlib.tests.branch_implementations,
143
bzrlib.tests.bzrdir_implementations,
144
bzrlib.tests.interrepository_implementations,
145
bzrlib.tests.interversionedfile_implementations,
146
bzrlib.tests.intertree_implementations,
147
bzrlib.tests.per_lock,
148
bzrlib.tests.repository_implementations,
149
bzrlib.tests.revisionstore_implementations,
150
bzrlib.tests.tree_implementations,
151
bzrlib.tests.workingtree_implementations,
155
class ExtendedTestResult(unittest._TextTestResult):
156
"""Accepts, reports and accumulates the results of running tests.
158
Compared to this unittest version this class adds support for profiling,
159
benchmarking, stopping as soon as a test fails, and skipping tests.
160
There are further-specialized subclasses for different types of display.
165
def __init__(self, stream, descriptions, verbosity,
169
"""Construct new TestResult.
171
:param bench_history: Optionally, a writable file object to accumulate
174
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
175
if bench_history is not None:
176
from bzrlib.version import _get_bzr_source_tree
177
src_tree = _get_bzr_source_tree()
180
revision_id = src_tree.get_parent_ids()[0]
182
# XXX: if this is a brand new tree, do the same as if there
186
# XXX: If there's no branch, what should we do?
188
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
189
self._bench_history = bench_history
190
self.ui = ui.ui_factory
191
self.num_tests = num_tests
193
self.failure_count = 0
194
self.known_failure_count = 0
196
self.unsupported = {}
198
self._overall_start_time = time.time()
200
def extractBenchmarkTime(self, testCase):
201
"""Add a benchmark time for the current test case."""
202
self._benchmarkTime = getattr(testCase, "_benchtime", None)
204
def _elapsedTestTimeString(self):
205
"""Return a time string for the overall time the current test has taken."""
206
return self._formatTime(time.time() - self._start_time)
208
def _testTimeString(self):
209
if self._benchmarkTime is not None:
211
self._formatTime(self._benchmarkTime),
212
self._elapsedTestTimeString())
214
return " %s" % self._elapsedTestTimeString()
216
def _formatTime(self, seconds):
217
"""Format seconds as milliseconds with leading spaces."""
218
# some benchmarks can take thousands of seconds to run, so we need 8
220
return "%8dms" % (1000 * seconds)
222
def _shortened_test_description(self, test):
224
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
39
MODULES_TO_DOCTEST = []
41
from logging import debug, warning, error
45
class EarlyStoppingTestResultAdapter(object):
46
"""An adapter for TestResult to stop at the first first failure or error"""
48
def __init__(self, result):
51
def addError(self, test, err):
52
self._result.addError(test, err)
55
def addFailure(self, test, err):
56
self._result.addFailure(test, err)
59
def __getattr__(self, name):
60
return getattr(self._result, name)
62
def __setattr__(self, name, value):
64
object.__setattr__(self, name, value)
65
return setattr(self._result, name, value)
68
class _MyResult(unittest._TextTestResult):
72
No special behaviour for now.
75
def _elapsedTime(self):
76
return "(Took %.3fs)" % (time.time() - self._start_time)
227
78
def startTest(self, test):
228
79
unittest.TestResult.startTest(self, test)
229
self.report_test_start(test)
230
test.number = self.count
231
self._recordTestStartTime()
233
def _recordTestStartTime(self):
234
"""Record that a test has started."""
80
# TODO: Maybe show test.shortDescription somewhere?
81
what = test.shortDescription() or test.id()
83
self.stream.write('%-70.70s' % what)
235
85
self._start_time = time.time()
237
def _cleanupLogFile(self, test):
238
# We can only do this if we have one of our TestCases, not if
240
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
241
if setKeepLogfile is not None:
244
87
def addError(self, test, err):
245
self.extractBenchmarkTime(test)
246
self._cleanupLogFile(test)
247
if isinstance(err[1], TestSkipped):
248
return self.addSkipped(test, err)
249
elif isinstance(err[1], UnavailableFeature):
250
return self.addNotSupported(test, err[1].args[0])
251
88
unittest.TestResult.addError(self, test, err)
252
self.error_count += 1
253
self.report_error(test, err)
90
self.stream.writeln("ERROR %s" % self._elapsedTime())
92
self.stream.write('E')
257
95
def addFailure(self, test, err):
258
self._cleanupLogFile(test)
259
self.extractBenchmarkTime(test)
260
if isinstance(err[1], KnownFailure):
261
return self.addKnownFailure(test, err)
262
96
unittest.TestResult.addFailure(self, test, err)
263
self.failure_count += 1
264
self.report_failure(test, err)
268
def addKnownFailure(self, test, err):
269
self.known_failure_count += 1
270
self.report_known_failure(test, err)
272
def addNotSupported(self, test, feature):
273
self.unsupported.setdefault(str(feature), 0)
274
self.unsupported[str(feature)] += 1
275
self.report_unsupported(test, feature)
98
self.stream.writeln("FAIL %s" % self._elapsedTime())
100
self.stream.write('F')
277
103
def addSuccess(self, test):
278
self.extractBenchmarkTime(test)
279
if self._bench_history is not None:
280
if self._benchmarkTime is not None:
281
self._bench_history.write("%s %s\n" % (
282
self._formatTime(self._benchmarkTime),
284
self.report_success(test)
105
self.stream.writeln('OK %s' % self._elapsedTime())
107
self.stream.write('~')
285
109
unittest.TestResult.addSuccess(self, test)
287
def addSkipped(self, test, skip_excinfo):
288
self.report_skip(test, skip_excinfo)
289
# seems best to treat this as success from point-of-view of unittest
290
# -- it actually does nothing so it barely matters :)
293
except KeyboardInterrupt:
296
self.addError(test, test.__exc_info())
298
unittest.TestResult.addSuccess(self, test)
300
111
def printErrorList(self, flavour, errors):
301
112
for test, err in errors:
302
113
self.stream.writeln(self.separator1)
303
self.stream.write("%s: " % flavour)
304
self.stream.writeln(self.getDescription(test))
305
if getattr(test, '_get_log', None) is not None:
307
print >>self.stream, \
308
('vvvv[log from %s]' % test.id()).ljust(78,'-')
114
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
115
if hasattr(test, '_get_log'):
116
self.stream.writeln()
117
self.stream.writeln('log from this test:')
309
118
print >>self.stream, test._get_log()
310
print >>self.stream, \
311
('^^^^[log from %s]' % test.id()).ljust(78,'-')
312
119
self.stream.writeln(self.separator2)
313
120
self.stream.writeln("%s" % err)
318
def report_cleaning_up(self):
321
def report_success(self, test):
325
class TextTestResult(ExtendedTestResult):
326
"""Displays progress and results of tests in text form"""
328
def __init__(self, stream, descriptions, verbosity,
333
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
334
bench_history, num_tests)
336
self.pb = self.ui.nested_progress_bar()
337
self._supplied_pb = False
340
self._supplied_pb = True
341
self.pb.show_pct = False
342
self.pb.show_spinner = False
343
self.pb.show_eta = False,
344
self.pb.show_count = False
345
self.pb.show_bar = False
347
def report_starting(self):
348
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
350
def _progress_prefix_text(self):
351
a = '[%d' % self.count
352
if self.num_tests is not None:
353
a +='/%d' % self.num_tests
354
a += ' in %ds' % (time.time() - self._overall_start_time)
356
a += ', %d errors' % self.error_count
357
if self.failure_count:
358
a += ', %d failed' % self.failure_count
359
if self.known_failure_count:
360
a += ', %d known failures' % self.known_failure_count
362
a += ', %d skipped' % self.skip_count
364
a += ', %d missing features' % len(self.unsupported)
368
def report_test_start(self, test):
371
self._progress_prefix_text()
373
+ self._shortened_test_description(test))
375
def _test_description(self, test):
376
return self._shortened_test_description(test)
378
def report_error(self, test, err):
379
self.pb.note('ERROR: %s\n %s\n',
380
self._test_description(test),
384
def report_failure(self, test, err):
385
self.pb.note('FAIL: %s\n %s\n',
386
self._test_description(test),
390
def report_known_failure(self, test, err):
391
self.pb.note('XFAIL: %s\n%s\n',
392
self._test_description(test), err[1])
394
def report_skip(self, test, skip_excinfo):
397
# at the moment these are mostly not things we can fix
398
# and so they just produce stipple; use the verbose reporter
401
# show test and reason for skip
402
self.pb.note('SKIP: %s\n %s\n',
403
self._shortened_test_description(test),
406
# since the class name was left behind in the still-visible
408
self.pb.note('SKIP: %s', skip_excinfo[1])
410
def report_unsupported(self, test, feature):
411
"""test cannot be run because feature is missing."""
413
def report_cleaning_up(self):
414
self.pb.update('cleaning up...')
417
if not self._supplied_pb:
421
class VerboseTestResult(ExtendedTestResult):
422
"""Produce long output, with one line per test run plus times"""
424
def _ellipsize_to_right(self, a_string, final_width):
425
"""Truncate and pad a string, keeping the right hand side"""
426
if len(a_string) > final_width:
427
result = '...' + a_string[3-final_width:]
430
return result.ljust(final_width)
432
def report_starting(self):
433
self.stream.write('running %d tests...\n' % self.num_tests)
435
def report_test_start(self, test):
437
name = self._shortened_test_description(test)
438
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
439
# numbers, plus a trailing blank
440
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
441
self.stream.write(self._ellipsize_to_right(name,
442
osutils.terminal_width()-30))
445
def _error_summary(self, err):
447
return '%s%s' % (indent, err[1])
449
def report_error(self, test, err):
450
self.stream.writeln('ERROR %s\n%s'
451
% (self._testTimeString(),
452
self._error_summary(err)))
454
def report_failure(self, test, err):
455
self.stream.writeln(' FAIL %s\n%s'
456
% (self._testTimeString(),
457
self._error_summary(err)))
459
def report_known_failure(self, test, err):
460
self.stream.writeln('XFAIL %s\n%s'
461
% (self._testTimeString(),
462
self._error_summary(err)))
464
def report_success(self, test):
465
self.stream.writeln(' OK %s' % self._testTimeString())
466
for bench_called, stats in getattr(test, '_benchcalls', []):
467
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
468
stats.pprint(file=self.stream)
469
# flush the stream so that we get smooth output. This verbose mode is
470
# used to show the output in PQM.
473
def report_skip(self, test, skip_excinfo):
475
self.stream.writeln(' SKIP %s\n%s'
476
% (self._testTimeString(),
477
self._error_summary(skip_excinfo)))
479
def report_unsupported(self, test, feature):
480
"""test cannot be run because feature is missing."""
481
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
482
%(self._testTimeString(), feature))
486
class TextTestRunner(object):
487
stop_on_failure = False
496
self.stream = unittest._WritelnDecorator(stream)
497
self.descriptions = descriptions
498
self.verbosity = verbosity
499
self._bench_history = bench_history
500
self.list_only = list_only
503
"Run the given test case or test suite."
504
startTime = time.time()
505
if self.verbosity == 1:
506
result_class = TextTestResult
507
elif self.verbosity >= 2:
508
result_class = VerboseTestResult
509
result = result_class(self.stream,
512
bench_history=self._bench_history,
513
num_tests=test.countTestCases(),
515
result.stop_early = self.stop_on_failure
516
result.report_starting()
518
if self.verbosity >= 2:
519
self.stream.writeln("Listing tests only ...\n")
521
for t in iter_suite_tests(test):
522
self.stream.writeln("%s" % (t.id()))
524
actionTaken = "Listed"
527
run = result.testsRun
529
stopTime = time.time()
530
timeTaken = stopTime - startTime
532
self.stream.writeln(result.separator2)
533
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
534
run, run != 1 and "s" or "", timeTaken))
535
self.stream.writeln()
536
if not result.wasSuccessful():
537
self.stream.write("FAILED (")
538
failed, errored = map(len, (result.failures, result.errors))
540
self.stream.write("failures=%d" % failed)
542
if failed: self.stream.write(", ")
543
self.stream.write("errors=%d" % errored)
544
if result.known_failure_count:
545
if failed or errored: self.stream.write(", ")
546
self.stream.write("known_failure_count=%d" %
547
result.known_failure_count)
548
self.stream.writeln(")")
550
if result.known_failure_count:
551
self.stream.writeln("OK (known_failures=%d)" %
552
result.known_failure_count)
554
self.stream.writeln("OK")
555
if result.skip_count > 0:
556
skipped = result.skip_count
557
self.stream.writeln('%d test%s skipped' %
558
(skipped, skipped != 1 and "s" or ""))
559
if result.unsupported:
560
for feature, count in sorted(result.unsupported.items()):
561
self.stream.writeln("Missing feature '%s' skipped %d tests." %
123
class TextTestRunner(unittest.TextTestRunner):
125
def _makeResult(self):
126
result = _MyResult(self.stream, self.descriptions, self.verbosity)
127
return EarlyStoppingTestResultAdapter(result)
567
130
def iter_suite_tests(suite):
702
157
Error and debug log messages are redirected from their usual
703
158
location into a temporary file, the contents of which can be
704
retrieved by _get_log(). We use a real OS file, not an in-memory object,
705
so that it can also capture file IO. When the test completes this file
706
is read into memory and removed from disk.
159
retrieved by _get_log().
708
161
There are also convenience functions to invoke bzr's command-line
709
routine, and to build and check bzr trees.
711
In addition to the usual method of overriding tearDown(), this class also
712
allows subclasses to register functions into the _cleanups list, which is
713
run in order as the object is torn down. It's less likely this will be
714
accidentally overlooked.
717
_log_file_name = None
719
_keep_log_file = False
720
# record lsprof data when performing benchmark calls.
721
_gather_lsprof_in_benchmarks = False
723
def __init__(self, methodName='testMethod'):
724
super(TestCase, self).__init__(methodName)
162
routine, and to build and check bzr trees."""
728
167
unittest.TestCase.setUp(self)
729
self._cleanEnvironment()
730
168
bzrlib.trace.disable_default_logging()
733
self._benchcalls = []
734
self._benchtime = None
736
self._clear_debug_flags()
738
def _clear_debug_flags(self):
739
"""Prevent externally set debug flags affecting tests.
741
Tests that want to use debug flags can just set them in the
742
debug_flags set during setup/teardown.
744
self._preserved_debug_flags = set(debug.debug_flags)
745
debug.debug_flags.clear()
746
self.addCleanup(self._restore_debug_flags)
748
def _clear_hooks(self):
749
# prevent hooks affecting tests
751
import bzrlib.smart.server
752
self._preserved_hooks = {
753
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
754
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
756
self.addCleanup(self._restoreHooks)
757
# reset all hooks to an empty instance of the appropriate type
758
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
759
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
761
def _silenceUI(self):
762
"""Turn off UI for duration of test"""
763
# by default the UI is off; tests can turn it on if they want it.
764
saved = ui.ui_factory
766
ui.ui_factory = saved
767
ui.ui_factory = ui.SilentUIFactory()
768
self.addCleanup(_restore)
770
def _ndiff_strings(self, a, b):
771
"""Return ndiff between two strings containing lines.
773
A trailing newline is added if missing to make the strings
775
if b and b[-1] != '\n':
777
if a and a[-1] != '\n':
779
difflines = difflib.ndiff(a.splitlines(True),
781
linejunk=lambda x: False,
782
charjunk=lambda x: False)
783
return ''.join(difflines)
785
def assertEqual(self, a, b, message=''):
789
except UnicodeError, e:
790
# If we can't compare without getting a UnicodeError, then
791
# obviously they are different
792
mutter('UnicodeError: %s', e)
795
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
797
pformat(a), pformat(b)))
799
assertEquals = assertEqual
801
def assertEqualDiff(self, a, b, message=None):
802
"""Assert two texts are equal, if not raise an exception.
804
This is intended for use with multi-line strings where it can
805
be hard to find the differences by eye.
807
# TODO: perhaps override assertEquals to call this for strings?
811
message = "texts not equal:\n"
812
raise AssertionError(message +
813
self._ndiff_strings(a, b))
815
def assertEqualMode(self, mode, mode_test):
816
self.assertEqual(mode, mode_test,
817
'mode mismatch %o != %o' % (mode, mode_test))
819
def assertStartsWith(self, s, prefix):
820
if not s.startswith(prefix):
821
raise AssertionError('string %r does not start with %r' % (s, prefix))
823
def assertEndsWith(self, s, suffix):
824
"""Asserts that s ends with suffix."""
825
if not s.endswith(suffix):
826
raise AssertionError('string %r does not end with %r' % (s, suffix))
828
def assertContainsRe(self, haystack, needle_re):
829
"""Assert that a contains something matching a regular expression."""
830
if not re.search(needle_re, haystack):
831
if '\n' in haystack or len(haystack) > 60:
832
# a long string, format it in a more readable way
833
raise AssertionError(
834
'pattern "%s" not found in\n"""\\\n%s"""\n'
835
% (needle_re, haystack))
837
raise AssertionError('pattern "%s" not found in "%s"'
838
% (needle_re, haystack))
840
def assertNotContainsRe(self, haystack, needle_re):
841
"""Assert that a does not match a regular expression"""
842
if re.search(needle_re, haystack):
843
raise AssertionError('pattern "%s" found in "%s"'
844
% (needle_re, haystack))
846
def assertSubset(self, sublist, superlist):
847
"""Assert that every entry in sublist is present in superlist."""
849
for entry in sublist:
850
if entry not in superlist:
851
missing.append(entry)
853
raise AssertionError("value(s) %r not present in container %r" %
854
(missing, superlist))
856
def assertListRaises(self, excClass, func, *args, **kwargs):
857
"""Fail unless excClass is raised when the iterator from func is used.
859
Many functions can return generators this makes sure
860
to wrap them in a list() call to make sure the whole generator
861
is run, and that the proper exception is raised.
864
list(func(*args, **kwargs))
868
if getattr(excClass,'__name__', None) is not None:
869
excName = excClass.__name__
871
excName = str(excClass)
872
raise self.failureException, "%s not raised" % excName
874
def assertRaises(self, excClass, callableObj, *args, **kwargs):
875
"""Assert that a callable raises a particular exception.
877
:param excClass: As for the except statement, this may be either an
878
exception class, or a tuple of classes.
879
:param callableObj: A callable, will be passed ``*args`` and
882
Returns the exception so that you can examine it.
885
callableObj(*args, **kwargs)
889
if getattr(excClass,'__name__', None) is not None:
890
excName = excClass.__name__
893
excName = str(excClass)
894
raise self.failureException, "%s not raised" % excName
896
def assertIs(self, left, right, message=None):
897
if not (left is right):
898
if message is not None:
899
raise AssertionError(message)
901
raise AssertionError("%r is not %r." % (left, right))
903
def assertIsNot(self, left, right, message=None):
905
if message is not None:
906
raise AssertionError(message)
908
raise AssertionError("%r is %r." % (left, right))
910
def assertTransportMode(self, transport, path, mode):
911
"""Fail if a path does not have mode mode.
913
If modes are not supported on this transport, the assertion is ignored.
915
if not transport._can_roundtrip_unix_modebits():
917
path_stat = transport.stat(path)
918
actual_mode = stat.S_IMODE(path_stat.st_mode)
919
self.assertEqual(mode, actual_mode,
920
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
922
def assertIsInstance(self, obj, kls):
923
"""Fail if obj is not an instance of kls"""
924
if not isinstance(obj, kls):
925
self.fail("%r is an instance of %s rather than %s" % (
926
obj, obj.__class__, kls))
928
def expectFailure(self, reason, assertion, *args, **kwargs):
929
"""Invoke a test, expecting it to fail for the given reason.
931
This is for assertions that ought to succeed, but currently fail.
932
(The failure is *expected* but not *wanted*.) Please be very precise
933
about the failure you're expecting. If a new bug is introduced,
934
AssertionError should be raised, not KnownFailure.
936
Frequently, expectFailure should be followed by an opposite assertion.
939
Intended to be used with a callable that raises AssertionError as the
940
'assertion' parameter. args and kwargs are passed to the 'assertion'.
942
Raises KnownFailure if the test fails. Raises AssertionError if the
947
self.expectFailure('Math is broken', self.assertNotEqual, 54,
949
self.assertEqual(42, dynamic_val)
951
This means that a dynamic_val of 54 will cause the test to raise
952
a KnownFailure. Once math is fixed and the expectFailure is removed,
953
only a dynamic_val of 42 will allow the test to pass. Anything other
954
than 54 or 42 will cause an AssertionError.
957
assertion(*args, **kwargs)
958
except AssertionError:
959
raise KnownFailure(reason)
961
self.fail('Unexpected success. Should have failed: %s' % reason)
963
def _capture_warnings(self, a_callable, *args, **kwargs):
964
"""A helper for callDeprecated and applyDeprecated.
966
:param a_callable: A callable to call.
967
:param args: The positional arguments for the callable
968
:param kwargs: The keyword arguments for the callable
969
:return: A tuple (warnings, result). result is the result of calling
970
a_callable(``*args``, ``**kwargs``).
973
def capture_warnings(msg, cls=None, stacklevel=None):
974
# we've hooked into a deprecation specific callpath,
975
# only deprecations should getting sent via it.
976
self.assertEqual(cls, DeprecationWarning)
977
local_warnings.append(msg)
978
original_warning_method = symbol_versioning.warn
979
symbol_versioning.set_warning_method(capture_warnings)
981
result = a_callable(*args, **kwargs)
983
symbol_versioning.set_warning_method(original_warning_method)
984
return (local_warnings, result)
986
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
987
"""Call a deprecated callable without warning the user.
989
Note that this only captures warnings raised by symbol_versioning.warn,
990
not other callers that go direct to the warning module.
992
:param deprecation_format: The deprecation format that the callable
993
should have been deprecated with. This is the same type as the
994
parameter to deprecated_method/deprecated_function. If the
995
callable is not deprecated with this format, an assertion error
997
:param a_callable: A callable to call. This may be a bound method or
998
a regular function. It will be called with ``*args`` and
1000
:param args: The positional arguments for the callable
1001
:param kwargs: The keyword arguments for the callable
1002
:return: The result of a_callable(``*args``, ``**kwargs``)
1004
call_warnings, result = self._capture_warnings(a_callable,
1006
expected_first_warning = symbol_versioning.deprecation_string(
1007
a_callable, deprecation_format)
1008
if len(call_warnings) == 0:
1009
self.fail("No deprecation warning generated by call to %s" %
1011
self.assertEqual(expected_first_warning, call_warnings[0])
1014
def callDeprecated(self, expected, callable, *args, **kwargs):
1015
"""Assert that a callable is deprecated in a particular way.
1017
This is a very precise test for unusual requirements. The
1018
applyDeprecated helper function is probably more suited for most tests
1019
as it allows you to simply specify the deprecation format being used
1020
and will ensure that that is issued for the function being called.
1022
Note that this only captures warnings raised by symbol_versioning.warn,
1023
not other callers that go direct to the warning module.
1025
:param expected: a list of the deprecation warnings expected, in order
1026
:param callable: The callable to call
1027
:param args: The positional arguments for the callable
1028
:param kwargs: The keyword arguments for the callable
1030
call_warnings, result = self._capture_warnings(callable,
1032
self.assertEqual(expected, call_warnings)
1035
def _startLogFile(self):
1036
"""Send bzr and test log messages to a temporary file.
1038
The file is removed as the test is torn down.
169
self._enable_file_logging()
172
def _enable_file_logging(self):
1040
173
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1041
175
self._log_file = os.fdopen(fileno, 'w+')
1042
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
177
hdlr = logging.StreamHandler(self._log_file)
178
hdlr.setLevel(logging.DEBUG)
179
hdlr.setFormatter(logging.Formatter('%(levelname)8s %(message)s'))
180
logging.getLogger('').addHandler(hdlr)
181
logging.getLogger('').setLevel(logging.DEBUG)
182
self._log_hdlr = hdlr
183
debug('opened log file %s', name)
1043
185
self._log_file_name = name
1044
self.addCleanup(self._finishLogFile)
1046
def _finishLogFile(self):
1047
"""Finished with the log file.
1049
Close the file and delete it, unless setKeepLogfile was called.
1051
if self._log_file is None:
1053
bzrlib.trace.disable_test_log(self._log_nonce)
188
logging.getLogger('').removeHandler(self._log_hdlr)
189
bzrlib.trace.enable_default_logging()
190
logging.debug('%s teardown', self.id())
1054
191
self._log_file.close()
1055
self._log_file = None
1056
if not self._keep_log_file:
1057
os.remove(self._log_file_name)
1058
self._log_file_name = None
1060
def setKeepLogfile(self):
1061
"""Make the logfile not be deleted when _finishLogFile is called."""
1062
self._keep_log_file = True
1064
def addCleanup(self, callable):
1065
"""Arrange to run a callable when this case is torn down.
1067
Callables are run in the reverse of the order they are registered,
1068
ie last-in first-out.
1070
if callable in self._cleanups:
1071
raise ValueError("cleanup function %r already registered on %s"
1073
self._cleanups.append(callable)
1075
def _cleanEnvironment(self):
1077
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1078
'HOME': os.getcwd(),
1079
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1081
'BZREMAIL': None, # may still be present in the environment
1083
'BZR_PROGRESS_BAR': None,
1087
'https_proxy': None,
1088
'HTTPS_PROXY': None,
1093
# Nobody cares about these ones AFAIK. So far at
1094
# least. If you do (care), please update this comment
1098
'BZR_REMOTE_PATH': None,
1101
self.addCleanup(self._restoreEnvironment)
1102
for name, value in new_env.iteritems():
1103
self._captureVar(name, value)
1105
def _captureVar(self, name, newvalue):
1106
"""Set an environment variable, and reset it when finished."""
1107
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1109
def _restore_debug_flags(self):
1110
debug.debug_flags.clear()
1111
debug.debug_flags.update(self._preserved_debug_flags)
1113
def _restoreEnvironment(self):
1114
for name, value in self.__old_env.iteritems():
1115
osutils.set_or_unset_env(name, value)
1117
def _restoreHooks(self):
1118
for klass, hooks in self._preserved_hooks.items():
1119
setattr(klass, 'hooks', hooks)
1121
def knownFailure(self, reason):
1122
"""This test has failed for some known reason."""
1123
raise KnownFailure(reason)
1125
def run(self, result=None):
1126
if result is None: result = self.defaultTestResult()
1127
for feature in getattr(self, '_test_needs_features', []):
1128
if not feature.available():
1129
result.startTest(self)
1130
if getattr(result, 'addNotSupported', None):
1131
result.addNotSupported(self, feature)
1133
result.addSuccess(self)
1134
result.stopTest(self)
1136
return unittest.TestCase.run(self, result)
1140
192
unittest.TestCase.tearDown(self)
1142
def time(self, callable, *args, **kwargs):
1143
"""Run callable and accrue the time it takes to the benchmark time.
1145
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1146
this will cause lsprofile statistics to be gathered and stored in
1149
if self._benchtime is None:
1153
if not self._gather_lsprof_in_benchmarks:
1154
return callable(*args, **kwargs)
1156
# record this benchmark
1157
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1159
self._benchcalls.append(((callable, args, kwargs), stats))
1162
self._benchtime += time.time() - start
1164
def _runCleanups(self):
1165
"""Run registered cleanup functions.
1167
This should only be called from TestCase.tearDown.
1169
# TODO: Perhaps this should keep running cleanups even if
1170
# one of them fails?
1172
# Actually pop the cleanups from the list so tearDown running
1173
# twice is safe (this happens for skipped tests).
1174
while self._cleanups:
1175
self._cleanups.pop()()
1177
194
def log(self, *args):
1180
def _get_log(self, keep_log_file=False):
1181
"""Return as a string the log for this test. If the file is still
1182
on disk and keep_log_file=False, delete the log file and store the
1183
content in self._log_contents."""
1184
# flush the log file, to get all content
1186
bzrlib.trace._trace_file.flush()
1187
if self._log_contents:
1188
return self._log_contents
1189
if self._log_file_name is not None:
1190
logfile = open(self._log_file_name)
1192
log_contents = logfile.read()
1195
if not keep_log_file:
1196
self._log_contents = log_contents
1198
os.remove(self._log_file_name)
1200
if sys.platform == 'win32' and e.errno == errno.EACCES:
1201
print >>sys.stderr, ('Unable to delete log file '
1202
' %r' % self._log_file_name)
1207
return "DELETED log file to reduce memory footprint"
1209
@deprecated_method(zero_eighteen)
1210
def capture(self, cmd, retcode=0):
198
"""Return as a string the log for this test"""
199
return open(self._log_file_name).read()
202
def capture(self, cmd):
1211
203
"""Shortcut that splits cmd into words, runs, and returns stdout"""
1212
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1214
def requireFeature(self, feature):
1215
"""This test requires a specific feature is available.
1217
:raises UnavailableFeature: When feature is not available.
1219
if not feature.available():
1220
raise UnavailableFeature(feature)
1222
@deprecated_method(zero_eighteen)
1223
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1225
"""Invoke bzr and return (stdout, stderr).
1227
Don't call this method, just use run_bzr() which is equivalent.
1229
:param argv: Arguments to invoke bzr. This may be either a
1230
single string, in which case it is split by shlex into words,
1231
or a list of arguments.
1232
:param retcode: Expected return code, or None for don't-care.
1233
:param encoding: Encoding for sys.stdout and sys.stderr
1234
:param stdin: A string to be used as stdin for the command.
1235
:param working_dir: Change to this directory before running
1237
return self._run_bzr_autosplit(argv, retcode=retcode,
1238
encoding=encoding, stdin=stdin, working_dir=working_dir,
1241
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1243
"""Run bazaar command line, splitting up a string command line."""
1244
if isinstance(args, basestring):
1245
args = list(shlex.split(args))
1246
return self._run_bzr_core(args, retcode=retcode,
1247
encoding=encoding, stdin=stdin, working_dir=working_dir,
1250
def _run_bzr_core(self, args, retcode, encoding, stdin,
1252
if encoding is None:
1253
encoding = bzrlib.user_encoding
1254
stdout = StringIOWrapper()
1255
stderr = StringIOWrapper()
1256
stdout.encoding = encoding
1257
stderr.encoding = encoding
1259
self.log('run bzr: %r', args)
1260
# FIXME: don't call into logging here
204
return self.run_bzr_captured(cmd.split())[0]
206
def run_bzr_captured(self, argv, retcode=0):
207
"""Invoke bzr and return (result, stdout, stderr).
209
Useful for code that wants to check the contents of the
210
output, the way error messages are presented, etc.
212
This should be the main method for tests that want to exercise the
213
overall behavior of the bzr application (rather than a unit test
214
or a functional test of the library.)
216
Much of the old code runs bzr by forking a new copy of Python, but
217
that is slower, harder to debug, and generally not necessary.
219
This runs bzr through the interface that catches and reports
220
errors, and with logging set to something approximating the
221
default, so that error reporting can be checked.
223
argv -- arguments to invoke bzr
224
retcode -- expected return code, or None for don't-care.
228
self.log('run bzr: %s', ' '.join(argv))
1261
229
handler = logging.StreamHandler(stderr)
230
handler.setFormatter(bzrlib.trace.QuietFormatter())
1262
231
handler.setLevel(logging.INFO)
1263
232
logger = logging.getLogger('')
1264
233
logger.addHandler(handler)
1265
old_ui_factory = ui.ui_factory
1266
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1269
if working_dir is not None:
1270
cwd = osutils.getcwd()
1271
os.chdir(working_dir)
1274
result = self.apply_redirected(ui.ui_factory.stdin,
1276
bzrlib.commands.run_bzr_catch_errors,
235
result = self.apply_redirected(None, stdout, stderr,
236
bzrlib.commands.run_bzr_catch_errors,
1279
239
logger.removeHandler(handler)
1280
ui.ui_factory = old_ui_factory
1284
240
out = stdout.getvalue()
1285
241
err = stderr.getvalue()
1287
self.log('output:\n%r', out)
243
self.log('output:\n%s', out)
1289
self.log('errors:\n%r', err)
245
self.log('errors:\n%s', err)
1290
246
if retcode is not None:
1291
self.assertEquals(retcode, result,
1292
message='Unexpected return code')
247
self.assertEquals(result, retcode)
1295
250
def run_bzr(self, *args, **kwargs):
1296
251
"""Invoke bzr, as if it were run from the command line.
1298
The argument list should not include the bzr program name - the
1299
first argument is normally the bzr command. Arguments may be
1300
passed in three ways:
1302
1- A list of strings, eg ["commit", "a"]. This is recommended
1303
when the command contains whitespace or metacharacters, or
1304
is built up at run time.
1306
2- A single string, eg "add a". This is the most convenient
1307
for hardcoded commands.
1309
3- Several varargs parameters, eg run_bzr("add", "a").
1310
This is not recommended for new code.
1312
This runs bzr through the interface that catches and reports
1313
errors, and with logging set to something approximating the
1314
default, so that error reporting can be checked.
1316
253
This should be the main method for tests that want to exercise the
1317
254
overall behavior of the bzr application (rather than a unit test
1318
255
or a functional test of the library.)
1320
257
This sends the stdout/stderr results into the test's log,
1321
258
where it may be useful for debugging. See also run_captured.
1323
:keyword stdin: A string to be used as stdin for the command.
1324
:keyword retcode: The status code the command should return;
1326
:keyword working_dir: The directory to run the command in
1327
:keyword error_regexes: A list of expected error messages. If
1328
specified they must be seen in the error output of the command.
1330
260
retcode = kwargs.pop('retcode', 0)
1331
encoding = kwargs.pop('encoding', None)
1332
stdin = kwargs.pop('stdin', None)
1333
working_dir = kwargs.pop('working_dir', None)
1334
error_regexes = kwargs.pop('error_regexes', [])
1337
if isinstance(args[0], (list, basestring)):
1340
symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
1341
DeprecationWarning, stacklevel=3)
1343
out, err = self._run_bzr_autosplit(args=args,
1345
encoding=encoding, stdin=stdin, working_dir=working_dir,
1348
for regex in error_regexes:
1349
self.assertContainsRe(err, regex)
1352
def run_bzr_decode(self, *args, **kwargs):
1353
if 'encoding' in kwargs:
1354
encoding = kwargs['encoding']
1356
encoding = bzrlib.user_encoding
1357
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1359
def run_bzr_error(self, error_regexes, *args, **kwargs):
1360
"""Run bzr, and check that stderr contains the supplied regexes
1362
:param error_regexes: Sequence of regular expressions which
1363
must each be found in the error output. The relative ordering
1365
:param args: command-line arguments for bzr
1366
:param kwargs: Keyword arguments which are interpreted by run_bzr
1367
This function changes the default value of retcode to be 3,
1368
since in most cases this is run when you expect bzr to fail.
1370
:return: (out, err) The actual output of running the command (in case
1371
you want to do more inspection)
1375
# Make sure that commit is failing because there is nothing to do
1376
self.run_bzr_error(['no changes to commit'],
1377
'commit', '-m', 'my commit comment')
1378
# Make sure --strict is handling an unknown file, rather than
1379
# giving us the 'nothing to do' error
1380
self.build_tree(['unknown'])
1381
self.run_bzr_error(['Commit refused because there are unknown files'],
1382
'commit', '--strict', '-m', 'my commit comment')
1384
kwargs.setdefault('retcode', 3)
1385
kwargs['error_regexes'] = error_regexes
1386
out, err = self.run_bzr(*args, **kwargs)
1389
def run_bzr_subprocess(self, *args, **kwargs):
1390
"""Run bzr in a subprocess for testing.
1392
This starts a new Python interpreter and runs bzr in there.
1393
This should only be used for tests that have a justifiable need for
1394
this isolation: e.g. they are testing startup time, or signal
1395
handling, or early startup code, etc. Subprocess code can't be
1396
profiled or debugged so easily.
1398
:keyword retcode: The status code that is expected. Defaults to 0. If
1399
None is supplied, the status code is not checked.
1400
:keyword env_changes: A dictionary which lists changes to environment
1401
variables. A value of None will unset the env variable.
1402
The values must be strings. The change will only occur in the
1403
child, so you don't need to fix the environment after running.
1404
:keyword universal_newlines: Convert CRLF => LF
1405
:keyword allow_plugins: By default the subprocess is run with
1406
--no-plugins to ensure test reproducibility. Also, it is possible
1407
for system-wide plugins to create unexpected output on stderr,
1408
which can cause unnecessary test failures.
1410
env_changes = kwargs.get('env_changes', {})
1411
working_dir = kwargs.get('working_dir', None)
1412
allow_plugins = kwargs.get('allow_plugins', False)
1413
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1414
working_dir=working_dir,
1415
allow_plugins=allow_plugins)
1416
# We distinguish between retcode=None and retcode not passed.
1417
supplied_retcode = kwargs.get('retcode', 0)
1418
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1419
universal_newlines=kwargs.get('universal_newlines', False),
1422
def start_bzr_subprocess(self, process_args, env_changes=None,
1423
skip_if_plan_to_signal=False,
1425
allow_plugins=False):
1426
"""Start bzr in a subprocess for testing.
1428
This starts a new Python interpreter and runs bzr in there.
1429
This should only be used for tests that have a justifiable need for
1430
this isolation: e.g. they are testing startup time, or signal
1431
handling, or early startup code, etc. Subprocess code can't be
1432
profiled or debugged so easily.
1434
:param process_args: a list of arguments to pass to the bzr executable,
1435
for example ``['--version']``.
1436
:param env_changes: A dictionary which lists changes to environment
1437
variables. A value of None will unset the env variable.
1438
The values must be strings. The change will only occur in the
1439
child, so you don't need to fix the environment after running.
1440
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1442
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1444
:returns: Popen object for the started process.
1446
if skip_if_plan_to_signal:
1447
if not getattr(os, 'kill', None):
1448
raise TestSkipped("os.kill not available.")
1450
if env_changes is None:
1454
def cleanup_environment():
1455
for env_var, value in env_changes.iteritems():
1456
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1458
def restore_environment():
1459
for env_var, value in old_env.iteritems():
1460
osutils.set_or_unset_env(env_var, value)
1462
bzr_path = self.get_bzr_path()
1465
if working_dir is not None:
1466
cwd = osutils.getcwd()
1467
os.chdir(working_dir)
1470
# win32 subprocess doesn't support preexec_fn
1471
# so we will avoid using it on all platforms, just to
1472
# make sure the code path is used, and we don't break on win32
1473
cleanup_environment()
1474
command = [sys.executable, bzr_path]
1475
if not allow_plugins:
1476
command.append('--no-plugins')
1477
command.extend(process_args)
1478
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1480
restore_environment()
1486
def _popen(self, *args, **kwargs):
1487
"""Place a call to Popen.
1489
Allows tests to override this method to intercept the calls made to
1490
Popen for introspection.
1492
return Popen(*args, **kwargs)
1494
def get_bzr_path(self):
1495
"""Return the path of the 'bzr' executable for this test suite."""
1496
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1497
if not os.path.isfile(bzr_path):
1498
# We are probably installed. Assume sys.argv is the right file
1499
bzr_path = sys.argv[0]
1502
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1503
universal_newlines=False, process_args=None):
1504
"""Finish the execution of process.
1506
:param process: the Popen object returned from start_bzr_subprocess.
1507
:param retcode: The status code that is expected. Defaults to 0. If
1508
None is supplied, the status code is not checked.
1509
:param send_signal: an optional signal to send to the process.
1510
:param universal_newlines: Convert CRLF => LF
1511
:returns: (stdout, stderr)
1513
if send_signal is not None:
1514
os.kill(process.pid, send_signal)
1515
out, err = process.communicate()
1517
if universal_newlines:
1518
out = out.replace('\r\n', '\n')
1519
err = err.replace('\r\n', '\n')
1521
if retcode is not None and retcode != process.returncode:
1522
if process_args is None:
1523
process_args = "(unknown args)"
1524
mutter('Output of bzr %s:\n%s', process_args, out)
1525
mutter('Error for bzr %s:\n%s', process_args, err)
1526
self.fail('Command bzr %s failed with retcode %s != %s'
1527
% (process_args, retcode, process.returncode))
261
return self.run_bzr_captured(args, retcode)
1530
263
def check_inventory_shape(self, inv, shape):
1531
264
"""Compare an inventory to a list of expected names.
1579
312
sys.stderr = real_stderr
1580
313
sys.stdin = real_stdin
1582
def reduceLockdirTimeout(self):
1583
"""Reduce the default lock timeout for the duration of the test, so that
1584
if LockContention occurs during a test, it does so quickly.
1586
Tests that expect to provoke LockContention errors should call this.
1588
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1590
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1591
self.addCleanup(resetTimeout)
1592
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1595
class TestCaseWithMemoryTransport(TestCase):
1596
"""Common test class for tests that do not need disk resources.
1598
Tests that need disk resources should derive from TestCaseWithTransport.
1600
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1602
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1603
a directory which does not exist. This serves to help ensure test isolation
1604
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1605
must exist. However, TestCaseWithMemoryTransport does not offer local
1606
file defaults for the transport in tests, nor does it obey the command line
1607
override, so tests that accidentally write to the common directory should
1610
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1611
a .bzr directory that stops us ascending higher into the filesystem.
1617
def __init__(self, methodName='runTest'):
1618
# allow test parameterisation after test construction and before test
1619
# execution. Variables that the parameteriser sets need to be
1620
# ones that are not set by setUp, or setUp will trash them.
1621
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1622
self.vfs_transport_factory = default_transport
1623
self.transport_server = None
1624
self.transport_readonly_server = None
1625
self.__vfs_server = None
1627
def get_transport(self, relpath=None):
1628
"""Return a writeable transport.
1630
This transport is for the test scratch space relative to
1633
:param relpath: a path relative to the base url.
1635
t = get_transport(self.get_url(relpath))
1636
self.assertFalse(t.is_readonly())
1639
def get_readonly_transport(self, relpath=None):
1640
"""Return a readonly transport for the test scratch space
1642
This can be used to test that operations which should only need
1643
readonly access in fact do not try to write.
1645
:param relpath: a path relative to the base url.
1647
t = get_transport(self.get_readonly_url(relpath))
1648
self.assertTrue(t.is_readonly())
1651
def create_transport_readonly_server(self):
1652
"""Create a transport server from class defined at init.
1654
This is mostly a hook for daughter classes.
1656
return self.transport_readonly_server()
1658
def get_readonly_server(self):
1659
"""Get the server instance for the readonly transport
1661
This is useful for some tests with specific servers to do diagnostics.
1663
if self.__readonly_server is None:
1664
if self.transport_readonly_server is None:
1665
# readonly decorator requested
1666
# bring up the server
1667
self.__readonly_server = ReadonlyServer()
1668
self.__readonly_server.setUp(self.get_vfs_only_server())
1670
self.__readonly_server = self.create_transport_readonly_server()
1671
self.__readonly_server.setUp(self.get_vfs_only_server())
1672
self.addCleanup(self.__readonly_server.tearDown)
1673
return self.__readonly_server
1675
def get_readonly_url(self, relpath=None):
1676
"""Get a URL for the readonly transport.
1678
This will either be backed by '.' or a decorator to the transport
1679
used by self.get_url()
1680
relpath provides for clients to get a path relative to the base url.
1681
These should only be downwards relative, not upwards.
1683
base = self.get_readonly_server().get_url()
1684
return self._adjust_url(base, relpath)
1686
def get_vfs_only_server(self):
1687
"""Get the vfs only read/write server instance.
1689
This is useful for some tests with specific servers that need
1692
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1693
is no means to override it.
1695
if self.__vfs_server is None:
1696
self.__vfs_server = MemoryServer()
1697
self.__vfs_server.setUp()
1698
self.addCleanup(self.__vfs_server.tearDown)
1699
return self.__vfs_server
1701
def get_server(self):
1702
"""Get the read/write server instance.
1704
This is useful for some tests with specific servers that need
1707
This is built from the self.transport_server factory. If that is None,
1708
then the self.get_vfs_server is returned.
1710
if self.__server is None:
1711
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1712
return self.get_vfs_only_server()
1714
# bring up a decorated means of access to the vfs only server.
1715
self.__server = self.transport_server()
1717
self.__server.setUp(self.get_vfs_only_server())
1718
except TypeError, e:
1719
# This should never happen; the try:Except here is to assist
1720
# developers having to update code rather than seeing an
1721
# uninformative TypeError.
1722
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1723
self.addCleanup(self.__server.tearDown)
1724
return self.__server
1726
def _adjust_url(self, base, relpath):
1727
"""Get a URL (or maybe a path) for the readwrite transport.
1729
This will either be backed by '.' or to an equivalent non-file based
1731
relpath provides for clients to get a path relative to the base url.
1732
These should only be downwards relative, not upwards.
1734
if relpath is not None and relpath != '.':
1735
if not base.endswith('/'):
1737
# XXX: Really base should be a url; we did after all call
1738
# get_url()! But sometimes it's just a path (from
1739
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1740
# to a non-escaped local path.
1741
if base.startswith('./') or base.startswith('/'):
1744
base += urlutils.escape(relpath)
1747
def get_url(self, relpath=None):
1748
"""Get a URL (or maybe a path) for the readwrite transport.
1750
This will either be backed by '.' or to an equivalent non-file based
1752
relpath provides for clients to get a path relative to the base url.
1753
These should only be downwards relative, not upwards.
1755
base = self.get_server().get_url()
1756
return self._adjust_url(base, relpath)
1758
def get_vfs_only_url(self, relpath=None):
1759
"""Get a URL (or maybe a path for the plain old vfs transport.
1761
This will never be a smart protocol. It always has all the
1762
capabilities of the local filesystem, but it might actually be a
1763
MemoryTransport or some other similar virtual filesystem.
1765
This is the backing transport (if any) of the server returned by
1766
get_url and get_readonly_url.
1768
:param relpath: provides for clients to get a path relative to the base
1769
url. These should only be downwards relative, not upwards.
1772
base = self.get_vfs_only_server().get_url()
1773
return self._adjust_url(base, relpath)
1775
def _make_test_root(self):
1776
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1778
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1779
TestCaseWithMemoryTransport.TEST_ROOT = root
1781
# make a fake bzr directory there to prevent any tests propagating
1782
# up onto the source directory's real branch
1783
bzrdir.BzrDir.create_standalone_workingtree(root)
1785
# The same directory is used by all tests, and we're not specifically
1786
# told when all tests are finished. This will do.
1787
atexit.register(_rmtree_temp_dir, root)
1789
def makeAndChdirToTestDir(self):
1790
"""Create a temporary directories for this one test.
1792
This must set self.test_home_dir and self.test_dir and chdir to
1795
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1797
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1798
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1799
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1801
def make_branch(self, relpath, format=None):
1802
"""Create a branch on the transport at relpath."""
1803
repo = self.make_repository(relpath, format=format)
1804
return repo.bzrdir.create_branch()
1806
def make_bzrdir(self, relpath, format=None):
1808
# might be a relative or absolute path
1809
maybe_a_url = self.get_url(relpath)
1810
segments = maybe_a_url.rsplit('/', 1)
1811
t = get_transport(maybe_a_url)
1812
if len(segments) > 1 and segments[-1] not in ('', '.'):
1816
if isinstance(format, basestring):
1817
format = bzrdir.format_registry.make_bzrdir(format)
1818
return format.initialize_on_transport(t)
1819
except errors.UninitializableFormat:
1820
raise TestSkipped("Format %s is not initializable." % format)
1822
def make_repository(self, relpath, shared=False, format=None):
1823
"""Create a repository on our default transport at relpath.
1825
Note that relpath must be a relative path, not a full url.
1827
# FIXME: If you create a remoterepository this returns the underlying
1828
# real format, which is incorrect. Actually we should make sure that
1829
# RemoteBzrDir returns a RemoteRepository.
1830
# maybe mbp 20070410
1831
made_control = self.make_bzrdir(relpath, format=format)
1832
return made_control.create_repository(shared=shared)
1834
def make_branch_and_memory_tree(self, relpath, format=None):
1835
"""Create a branch on the default transport and a MemoryTree for it."""
1836
b = self.make_branch(relpath, format=format)
1837
return memorytree.MemoryTree.create_on_branch(b)
1839
def overrideEnvironmentForTesting(self):
1840
os.environ['HOME'] = self.test_home_dir
1841
os.environ['BZR_HOME'] = self.test_home_dir
1844
super(TestCaseWithMemoryTransport, self).setUp()
1845
self._make_test_root()
1846
_currentdir = os.getcwdu()
1847
def _leaveDirectory():
1848
os.chdir(_currentdir)
1849
self.addCleanup(_leaveDirectory)
1850
self.makeAndChdirToTestDir()
1851
self.overrideEnvironmentForTesting()
1852
self.__readonly_server = None
1853
self.__server = None
1854
self.reduceLockdirTimeout()
316
BzrTestBase = TestCase
1857
class TestCaseInTempDir(TestCaseWithMemoryTransport):
319
class TestCaseInTempDir(TestCase):
1858
320
"""Derived class that runs a test within a temporary directory.
1860
322
This is useful for tests that need to create a branch, etc.
1884
341
self.log("actually: %r" % contents)
1885
342
self.fail("contents of %s not as expected" % filename)
1887
def makeAndChdirToTestDir(self):
1888
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1890
For TestCaseInTempDir we create a temporary directory based on the test
1891
name and then create two subdirs - test and home under it.
1893
# create a directory within the top level test directory
1894
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1895
# now create test and home directories within this dir
1896
self.test_base_dir = candidate_dir
1897
self.test_home_dir = self.test_base_dir + '/home'
1898
os.mkdir(self.test_home_dir)
1899
self.test_dir = self.test_base_dir + '/work'
344
def _make_test_root(self):
345
if TestCaseInTempDir.TEST_ROOT is not None:
349
root = 'test%04d.tmp' % i
353
if e.errno == errno.EEXIST:
358
# successfully created
359
TestCaseInTempDir.TEST_ROOT = os.path.abspath(root)
361
# make a fake bzr directory there to prevent any tests propagating
362
# up onto the source directory's real branch
363
os.mkdir(os.path.join(TestCaseInTempDir.TEST_ROOT, '.bzr'))
366
super(TestCaseInTempDir, self).setUp()
367
self._make_test_root()
368
self._currentdir = os.getcwdu()
369
short_id = self.id().replace('bzrlib.selftest.', '') \
370
.replace('__main__.', '')
371
self.test_dir = os.path.join(self.TEST_ROOT, short_id)
1900
372
os.mkdir(self.test_dir)
1901
373
os.chdir(self.test_dir)
1902
# put name of test inside
1903
f = file(self.test_base_dir + '/name', 'w')
1908
self.addCleanup(self.deleteTestDir)
1910
def deleteTestDir(self):
1911
os.chdir(self.TEST_ROOT)
1912
_rmtree_temp_dir(self.test_base_dir)
1914
def build_tree(self, shape, line_endings='binary', transport=None):
376
os.chdir(self._currentdir)
377
super(TestCaseInTempDir, self).tearDown()
379
def build_tree(self, shape):
1915
380
"""Build a test tree according to a pattern.
1917
382
shape is a sequence of file specifications. If the final
1918
383
character is '/', a directory is created.
1920
This assumes that all the elements in the tree being built are new.
1922
385
This doesn't add anything to a branch.
1924
:param line_endings: Either 'binary' or 'native'
1925
in binary mode, exact contents are written in native mode, the
1926
line endings match the default platform endings.
1927
:param transport: A transport to write to, for building trees on VFS's.
1928
If the transport is readonly or None, "." is opened automatically.
1931
# It's OK to just create them using forward slashes on windows.
1932
if transport is None or transport.is_readonly():
1933
transport = get_transport(".")
387
# XXX: It's OK to just create them using forward slashes on windows?
1934
388
for name in shape:
1935
self.assert_(isinstance(name, basestring))
389
assert isinstance(name, basestring)
1936
390
if name[-1] == '/':
1937
transport.mkdir(urlutils.escape(name[:-1]))
1939
if line_endings == 'binary':
1941
elif line_endings == 'native':
1944
raise errors.BzrError(
1945
'Invalid line ending request %r' % line_endings)
1946
content = "contents of %s%s" % (name.encode('utf-8'), end)
1947
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1949
def build_tree_contents(self, shape):
1950
build_tree_contents(shape)
1952
def assertFileEqual(self, content, path):
1953
"""Fail if path does not contain 'content'."""
1954
self.failUnlessExists(path)
1955
f = file(path, 'rb')
1960
self.assertEqualDiff(content, s)
394
print >>f, "contents of", name
1962
397
def failUnlessExists(self, path):
1963
"""Fail unless path or paths, which may be abs or relative, exist."""
1964
if not isinstance(path, basestring):
1966
self.failUnlessExists(p)
1968
self.failUnless(osutils.lexists(path),path+" does not exist")
1970
def failIfExists(self, path):
1971
"""Fail if path or paths, which may be abs or relative, exist."""
1972
if not isinstance(path, basestring):
1974
self.failIfExists(p)
1976
self.failIf(osutils.lexists(path),path+" exists")
1978
def assertInWorkingTree(self,path,root_path='.',tree=None):
1979
"""Assert whether path or paths are in the WorkingTree"""
1981
tree = workingtree.WorkingTree.open(root_path)
1982
if not isinstance(path, basestring):
1984
self.assertInWorkingTree(p,tree=tree)
1986
self.assertIsNot(tree.path2id(path), None,
1987
path+' not in working tree.')
1989
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
1990
"""Assert whether path or paths are not in the WorkingTree"""
1992
tree = workingtree.WorkingTree.open(root_path)
1993
if not isinstance(path, basestring):
1995
self.assertNotInWorkingTree(p,tree=tree)
1997
self.assertIs(tree.path2id(path), None, path+' in working tree.')
2000
class TestCaseWithTransport(TestCaseInTempDir):
2001
"""A test case that provides get_url and get_readonly_url facilities.
2003
These back onto two transport servers, one for readonly access and one for
2006
If no explicit class is provided for readonly access, a
2007
ReadonlyTransportDecorator is used instead which allows the use of non disk
2008
based read write transports.
2010
If an explicit class is provided for readonly access, that server and the
2011
readwrite one must both define get_url() as resolving to os.getcwd().
2014
def get_vfs_only_server(self):
2015
"""See TestCaseWithMemoryTransport.
2017
This is useful for some tests with specific servers that need
2020
if self.__vfs_server is None:
2021
self.__vfs_server = self.vfs_transport_factory()
2022
self.__vfs_server.setUp()
2023
self.addCleanup(self.__vfs_server.tearDown)
2024
return self.__vfs_server
2026
def make_branch_and_tree(self, relpath, format=None):
2027
"""Create a branch on the transport and a tree locally.
2029
If the transport is not a LocalTransport, the Tree can't be created on
2030
the transport. In that case if the vfs_transport_factory is
2031
LocalURLServer the working tree is created in the local
2032
directory backing the transport, and the returned tree's branch and
2033
repository will also be accessed locally. Otherwise a lightweight
2034
checkout is created and returned.
2036
:param format: The BzrDirFormat.
2037
:returns: the WorkingTree.
2039
# TODO: always use the local disk path for the working tree,
2040
# this obviously requires a format that supports branch references
2041
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2043
b = self.make_branch(relpath, format=format)
2045
return b.bzrdir.create_workingtree()
2046
except errors.NotLocalUrl:
2047
# We can only make working trees locally at the moment. If the
2048
# transport can't support them, then we keep the non-disk-backed
2049
# branch and create a local checkout.
2050
if self.vfs_transport_factory is LocalURLServer:
2051
# the branch is colocated on disk, we cannot create a checkout.
2052
# hopefully callers will expect this.
2053
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2054
return local_controldir.create_workingtree()
2056
return b.create_checkout(relpath, lightweight=True)
2058
def assertIsDirectory(self, relpath, transport):
2059
"""Assert that relpath within transport is a directory.
2061
This may not be possible on all transports; in that case it propagates
2062
a TransportNotPossible.
2065
mode = transport.stat(relpath).st_mode
2066
except errors.NoSuchFile:
2067
self.fail("path %s is not a directory; no such file"
2069
if not stat.S_ISDIR(mode):
2070
self.fail("path %s is not a directory; has mode %#o"
2073
def assertTreesEqual(self, left, right):
2074
"""Check that left and right have the same content and properties."""
2075
# we use a tree delta to check for equality of the content, and we
2076
# manually check for equality of other things such as the parents list.
2077
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2078
differences = left.changes_from(right)
2079
self.assertFalse(differences.has_changed(),
2080
"Trees %r and %r are different: %r" % (left, right, differences))
2083
super(TestCaseWithTransport, self).setUp()
2084
self.__vfs_server = None
2087
class ChrootedTestCase(TestCaseWithTransport):
2088
"""A support class that provides readonly urls outside the local namespace.
2090
This is done by checking if self.transport_server is a MemoryServer. if it
2091
is then we are chrooted already, if it is not then an HttpServer is used
2094
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2095
be used without needed to redo it when a different
2096
subclass is in use ?
2100
super(ChrootedTestCase, self).setUp()
2101
if not self.vfs_transport_factory == MemoryServer:
2102
self.transport_readonly_server = HttpServer
2105
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2106
random_order=False):
2107
"""Create a test suite by filtering another one.
2109
:param suite: the source suite
2110
:param pattern: pattern that names must match
2111
:param exclude_pattern: pattern that names must not match, if any
2112
:param random_order: if True, tests in the new suite will be put in
2114
:returns: the newly created suite
2116
return sort_suite_by_re(suite, pattern, exclude_pattern,
2117
random_order, False)
2120
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2121
random_order=False, append_rest=True):
2122
"""Create a test suite by sorting another one.
2124
:param suite: the source suite
2125
:param pattern: pattern that names must match in order to go
2126
first in the new suite
2127
:param exclude_pattern: pattern that names must not match, if any
2128
:param random_order: if True, tests in the new suite will be put in
2130
:param append_rest: if False, pattern is a strict filter and not
2131
just an ordering directive
2132
:returns: the newly created suite
398
"""Fail unless path, which may be abs or relative, exists."""
399
self.failUnless(osutils.lexists(path))
402
class MetaTestLog(TestCase):
403
def test_logging(self):
404
"""Test logs are captured when a test fails."""
405
logging.info('an info message')
406
warning('something looks dodgy...')
407
logging.debug('hello, test is running')
411
def filter_suite_by_re(suite, pattern):
412
result = TestUtil.TestSuite()
2136
413
filter_re = re.compile(pattern)
2137
if exclude_pattern is not None:
2138
exclude_re = re.compile(exclude_pattern)
2139
414
for test in iter_suite_tests(suite):
2141
if exclude_pattern is None or not exclude_re.search(test_id):
2142
if filter_re.search(test_id):
2147
random.shuffle(first)
2148
random.shuffle(second)
2149
return TestUtil.TestSuite(first + second)
2152
def run_suite(suite, name='test', verbose=False, pattern=".*",
2153
stop_on_failure=False,
2154
transport=None, lsprof_timed=None, bench_history=None,
2155
matching_tests_first=None,
2158
exclude_pattern=None,
2160
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
415
if filter_re.search(test.id()):
420
def run_suite(suite, name='test', verbose=False, pattern=".*"):
421
TestCaseInTempDir._TEST_NAME = name
2165
426
runner = TextTestRunner(stream=sys.stdout,
2167
verbosity=verbosity,
2168
bench_history=bench_history,
2169
list_only=list_only,
2171
runner.stop_on_failure=stop_on_failure
2172
# Initialise the random number generator and display the seed used.
2173
# We convert the seed to a long to make it reuseable across invocations.
2174
random_order = False
2175
if random_seed is not None:
2177
if random_seed == "now":
2178
random_seed = long(time.time())
2180
# Convert the seed to a long if we can
2182
random_seed = long(random_seed)
2185
runner.stream.writeln("Randomizing test order using seed %s\n" %
2187
random.seed(random_seed)
2188
# Customise the list of tests if requested
2189
if pattern != '.*' or exclude_pattern is not None or random_order:
2190
if matching_tests_first:
2191
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2194
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
430
suite = filter_suite_by_re(suite, pattern)
2196
431
result = runner.run(suite)
432
# This is still a little bogus,
433
# but only a little. Folk not using our testrunner will
434
# have to delete their temp directories themselves.
435
if result.wasSuccessful():
436
if TestCaseInTempDir.TEST_ROOT is not None:
437
shutil.rmtree(TestCaseInTempDir.TEST_ROOT)
439
print "Failed tests working directories are in '%s'\n" % TestCaseInTempDir.TEST_ROOT
2197
440
return result.wasSuccessful()
2200
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2202
test_suite_factory=None,
2205
matching_tests_first=None,
2208
exclude_pattern=None):
443
def selftest(verbose=False, pattern=".*"):
2209
444
"""Run the whole test suite under the enhanced runner"""
2210
# XXX: Very ugly way to do this...
2211
# Disable warning about old formats because we don't want it to disturb
2212
# any blackbox tests.
2213
from bzrlib import repository
2214
repository._deprecation_warning_done = True
2216
global default_transport
2217
if transport is None:
2218
transport = default_transport
2219
old_transport = default_transport
2220
default_transport = transport
2222
if test_suite_factory is None:
2223
suite = test_suite()
2225
suite = test_suite_factory()
2226
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2227
stop_on_failure=stop_on_failure,
2228
transport=transport,
2229
lsprof_timed=lsprof_timed,
2230
bench_history=bench_history,
2231
matching_tests_first=matching_tests_first,
2232
list_only=list_only,
2233
random_seed=random_seed,
2234
exclude_pattern=exclude_pattern)
2236
default_transport = old_transport
445
return run_suite(test_suite(), 'testbzr', verbose=verbose, pattern=pattern)
2239
448
def test_suite():
2240
"""Build and return TestSuite for the whole of bzrlib.
2242
This function can be replaced if you need to change the default test
2243
suite on a global basis, but it is not encouraged.
2246
'bzrlib.util.tests.test_bencode',
2247
'bzrlib.tests.test_ancestry',
2248
'bzrlib.tests.test_annotate',
2249
'bzrlib.tests.test_api',
2250
'bzrlib.tests.test_atomicfile',
2251
'bzrlib.tests.test_bad_files',
2252
'bzrlib.tests.test_branch',
2253
'bzrlib.tests.test_branchbuilder',
2254
'bzrlib.tests.test_bugtracker',
2255
'bzrlib.tests.test_bundle',
2256
'bzrlib.tests.test_bzrdir',
2257
'bzrlib.tests.test_cache_utf8',
2258
'bzrlib.tests.test_commands',
2259
'bzrlib.tests.test_commit',
2260
'bzrlib.tests.test_commit_merge',
2261
'bzrlib.tests.test_config',
2262
'bzrlib.tests.test_conflicts',
2263
'bzrlib.tests.test_pack',
2264
'bzrlib.tests.test_counted_lock',
2265
'bzrlib.tests.test_decorators',
2266
'bzrlib.tests.test_delta',
2267
'bzrlib.tests.test_deprecated_graph',
2268
'bzrlib.tests.test_diff',
2269
'bzrlib.tests.test_dirstate',
2270
'bzrlib.tests.test_errors',
2271
'bzrlib.tests.test_escaped_store',
2272
'bzrlib.tests.test_extract',
2273
'bzrlib.tests.test_fetch',
2274
'bzrlib.tests.test_ftp_transport',
2275
'bzrlib.tests.test_generate_docs',
2276
'bzrlib.tests.test_generate_ids',
2277
'bzrlib.tests.test_globbing',
2278
'bzrlib.tests.test_gpg',
2279
'bzrlib.tests.test_graph',
2280
'bzrlib.tests.test_hashcache',
2281
'bzrlib.tests.test_help',
2282
'bzrlib.tests.test_hooks',
2283
'bzrlib.tests.test_http',
2284
'bzrlib.tests.test_http_response',
2285
'bzrlib.tests.test_https_ca_bundle',
2286
'bzrlib.tests.test_identitymap',
2287
'bzrlib.tests.test_ignores',
2288
'bzrlib.tests.test_info',
2289
'bzrlib.tests.test_inv',
2290
'bzrlib.tests.test_knit',
2291
'bzrlib.tests.test_lazy_import',
2292
'bzrlib.tests.test_lazy_regex',
2293
'bzrlib.tests.test_lockdir',
2294
'bzrlib.tests.test_lockable_files',
2295
'bzrlib.tests.test_log',
2296
'bzrlib.tests.test_lsprof',
2297
'bzrlib.tests.test_memorytree',
2298
'bzrlib.tests.test_merge',
2299
'bzrlib.tests.test_merge3',
2300
'bzrlib.tests.test_merge_core',
2301
'bzrlib.tests.test_merge_directive',
2302
'bzrlib.tests.test_missing',
2303
'bzrlib.tests.test_msgeditor',
2304
'bzrlib.tests.test_nonascii',
2305
'bzrlib.tests.test_options',
2306
'bzrlib.tests.test_osutils',
2307
'bzrlib.tests.test_osutils_encodings',
2308
'bzrlib.tests.test_patch',
2309
'bzrlib.tests.test_patches',
2310
'bzrlib.tests.test_permissions',
2311
'bzrlib.tests.test_plugins',
2312
'bzrlib.tests.test_progress',
2313
'bzrlib.tests.test_reconcile',
2314
'bzrlib.tests.test_registry',
2315
'bzrlib.tests.test_remote',
2316
'bzrlib.tests.test_repository',
2317
'bzrlib.tests.test_revert',
2318
'bzrlib.tests.test_revision',
2319
'bzrlib.tests.test_revisionnamespaces',
2320
'bzrlib.tests.test_revisiontree',
2321
'bzrlib.tests.test_rio',
2322
'bzrlib.tests.test_sampler',
2323
'bzrlib.tests.test_selftest',
2324
'bzrlib.tests.test_setup',
2325
'bzrlib.tests.test_sftp_transport',
2326
'bzrlib.tests.test_smart',
2327
'bzrlib.tests.test_smart_add',
2328
'bzrlib.tests.test_smart_transport',
2329
'bzrlib.tests.test_smtp_connection',
2330
'bzrlib.tests.test_source',
2331
'bzrlib.tests.test_ssh_transport',
2332
'bzrlib.tests.test_status',
2333
'bzrlib.tests.test_store',
2334
'bzrlib.tests.test_strace',
2335
'bzrlib.tests.test_subsume',
2336
'bzrlib.tests.test_symbol_versioning',
2337
'bzrlib.tests.test_tag',
2338
'bzrlib.tests.test_testament',
2339
'bzrlib.tests.test_textfile',
2340
'bzrlib.tests.test_textmerge',
2341
'bzrlib.tests.test_timestamp',
2342
'bzrlib.tests.test_trace',
2343
'bzrlib.tests.test_transactions',
2344
'bzrlib.tests.test_transform',
2345
'bzrlib.tests.test_transport',
2346
'bzrlib.tests.test_tree',
2347
'bzrlib.tests.test_treebuilder',
2348
'bzrlib.tests.test_tsort',
2349
'bzrlib.tests.test_tuned_gzip',
2350
'bzrlib.tests.test_ui',
2351
'bzrlib.tests.test_upgrade',
2352
'bzrlib.tests.test_urlutils',
2353
'bzrlib.tests.test_versionedfile',
2354
'bzrlib.tests.test_version',
2355
'bzrlib.tests.test_version_info',
2356
'bzrlib.tests.test_weave',
2357
'bzrlib.tests.test_whitebox',
2358
'bzrlib.tests.test_workingtree',
2359
'bzrlib.tests.test_workingtree_4',
2360
'bzrlib.tests.test_wsgi',
2361
'bzrlib.tests.test_xml',
449
"""Build and return TestSuite for the whole program."""
450
import bzrlib.store, bzrlib.inventory, bzrlib.branch
451
import bzrlib.osutils, bzrlib.merge3, bzrlib.plugin
452
from doctest import DocTestSuite
454
global MODULES_TO_TEST, MODULES_TO_DOCTEST
457
['bzrlib.selftest.MetaTestLog',
458
'bzrlib.selftest.testidentitymap',
459
'bzrlib.selftest.testinv',
460
'bzrlib.selftest.test_ancestry',
461
'bzrlib.selftest.test_commit',
462
'bzrlib.selftest.test_commit_merge',
463
'bzrlib.selftest.testconfig',
464
'bzrlib.selftest.versioning',
465
'bzrlib.selftest.testmerge3',
466
'bzrlib.selftest.testmerge',
467
'bzrlib.selftest.testhashcache',
468
'bzrlib.selftest.teststatus',
469
'bzrlib.selftest.testlog',
470
'bzrlib.selftest.testrevisionnamespaces',
471
'bzrlib.selftest.testbranch',
472
'bzrlib.selftest.testrevision',
473
'bzrlib.selftest.test_revision_info',
474
'bzrlib.selftest.test_merge_core',
475
'bzrlib.selftest.test_smart_add',
476
'bzrlib.selftest.test_bad_files',
477
'bzrlib.selftest.testdiff',
478
'bzrlib.selftest.test_parent',
479
'bzrlib.selftest.test_xml',
480
'bzrlib.selftest.test_weave',
481
'bzrlib.selftest.testfetch',
482
'bzrlib.selftest.whitebox',
483
'bzrlib.selftest.teststore',
484
'bzrlib.selftest.blackbox',
485
'bzrlib.selftest.testsampler',
486
'bzrlib.selftest.testtransactions',
487
'bzrlib.selftest.testtransport',
488
'bzrlib.selftest.testgraph',
489
'bzrlib.selftest.testworkingtree',
490
'bzrlib.selftest.test_upgrade',
491
'bzrlib.selftest.test_conflicts',
2363
test_transport_implementations = [
2364
'bzrlib.tests.test_transport_implementations',
2365
'bzrlib.tests.test_read_bundle',
2367
suite = TestUtil.TestSuite()
2368
loader = TestUtil.TestLoader()
2369
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2370
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2371
adapter = TransportTestProviderAdapter()
2372
adapt_modules(test_transport_implementations, adapter, loader, suite)
2373
for package in packages_to_test():
2374
suite.addTest(package.test_suite())
494
for m in (bzrlib.store, bzrlib.inventory, bzrlib.branch,
495
bzrlib.osutils, bzrlib.commands, bzrlib.merge3):
496
if m not in MODULES_TO_DOCTEST:
497
MODULES_TO_DOCTEST.append(m)
499
TestCase.BZRPATH = os.path.join(os.path.realpath(os.path.dirname(bzrlib.__path__[0])), 'bzr')
500
print '%-30s %s' % ('bzr binary', TestCase.BZRPATH)
503
suite.addTest(TestLoader().loadTestsFromNames(testmod_names))
2375
504
for m in MODULES_TO_TEST:
2376
suite.addTest(loader.loadTestsFromModule(m))
2377
for m in MODULES_TO_DOCTEST:
2379
suite.addTest(doctest.DocTestSuite(m))
2380
except ValueError, e:
2381
print '**failed to get doctest for: %s\n%s' %(m,e)
2383
for name, plugin in bzrlib.plugin.all_plugins().items():
2384
if getattr(plugin, 'test_suite', None) is not None:
2385
default_encoding = sys.getdefaultencoding()
2387
plugin_suite = plugin.test_suite()
2388
except ImportError, e:
2389
bzrlib.trace.warning(
2390
'Unable to test plugin "%s": %s', name, e)
2392
suite.addTest(plugin_suite)
2393
if default_encoding != sys.getdefaultencoding():
2394
bzrlib.trace.warning(
2395
'Plugin "%s" tried to reset default encoding to: %s', name,
2396
sys.getdefaultencoding())
2398
sys.setdefaultencoding(default_encoding)
505
suite.addTest(TestLoader().loadTestsFromModule(m))
506
for m in (MODULES_TO_DOCTEST):
507
suite.addTest(DocTestSuite(m))
508
for p in bzrlib.plugin.all_plugins:
509
if hasattr(p, 'test_suite'):
510
suite.addTest(p.test_suite())
2402
def adapt_modules(mods_list, adapter, loader, suite):
2403
"""Adapt the modules in mods_list using adapter and add to suite."""
2404
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2405
suite.addTests(adapter.adapt(test))
2408
def _rmtree_temp_dir(dirname):
2409
# If LANG=C we probably have created some bogus paths
2410
# which rmtree(unicode) will fail to delete
2411
# so make sure we are using rmtree(str) to delete everything
2412
# except on win32, where rmtree(str) will fail
2413
# since it doesn't have the property of byte-stream paths
2414
# (they are either ascii or mbcs)
2415
if sys.platform == 'win32':
2416
# make sure we are using the unicode win32 api
2417
dirname = unicode(dirname)
2419
dirname = dirname.encode(sys.getfilesystemencoding())
2421
osutils.rmtree(dirname)
2423
if sys.platform == 'win32' and e.errno == errno.EACCES:
2424
print >>sys.stderr, ('Permission denied: '
2425
'unable to remove testing dir '
2426
'%s' % os.path.basename(dirname))
2431
class Feature(object):
2432
"""An operating system Feature."""
2435
self._available = None
2437
def available(self):
2438
"""Is the feature available?
2440
:return: True if the feature is available.
2442
if self._available is None:
2443
self._available = self._probe()
2444
return self._available
2447
"""Implement this method in concrete features.
2449
:return: True if the feature is available.
2451
raise NotImplementedError
2454
if getattr(self, 'feature_name', None):
2455
return self.feature_name()
2456
return self.__class__.__name__
2459
class TestScenarioApplier(object):
2460
"""A tool to apply scenarios to tests."""
2462
def adapt(self, test):
2463
"""Return a TestSuite containing a copy of test for each scenario."""
2464
result = unittest.TestSuite()
2465
for scenario in self.scenarios:
2466
result.addTest(self.adapt_test_to_scenario(test, scenario))
2469
def adapt_test_to_scenario(self, test, scenario):
2470
"""Copy test and apply scenario to it.
2472
:param test: A test to adapt.
2473
:param scenario: A tuple describing the scenarion.
2474
The first element of the tuple is the new test id.
2475
The second element is a dict containing attributes to set on the
2477
:return: The adapted test.
2479
from copy import deepcopy
2480
new_test = deepcopy(test)
2481
for name, value in scenario[1].items():
2482
setattr(new_test, name, value)
2483
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2484
new_test.id = lambda: new_id