1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
# TODO: Perhaps there should be an API to find out if bzr running under the
19
# test suite -- some plugins might want to avoid making intrusive changes if
20
# this is the case. However, we want behaviour under to test to diverge as
21
# little as possible, so this should be used rarely if it's added at all.
22
# (Suggestion from j-a-meinel, 2005-11-24)
24
# NOTE: Some classes in here use camelCaseNaming() rather than
25
# underscore_naming(). That's for consistency with unittest; it's not the
26
# general style of bzrlib. Please continue that consistency when adding e.g.
27
# new assertFoo() methods.
31
from cStringIO import StringIO
37
from pprint import pformat
41
from subprocess import Popen, PIPE
61
import bzrlib.commands
62
import bzrlib.timestamp
64
import bzrlib.inventory
65
import bzrlib.iterablefile
70
# lsprof not available
72
from bzrlib.merge import merge_inner
76
from bzrlib.revision import common_ancestor
78
from bzrlib import symbol_versioning
80
from bzrlib.transport import get_transport
81
import bzrlib.transport
82
from bzrlib.transport.local import LocalURLServer
83
from bzrlib.transport.memory import MemoryServer
84
from bzrlib.transport.readonly import ReadonlyServer
85
from bzrlib.trace import mutter, note
86
from bzrlib.tests import TestUtil
87
from bzrlib.tests.HttpServer import HttpServer
88
from bzrlib.tests.TestUtil import (
92
from bzrlib.tests.treeshape import build_tree_contents
93
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat2
95
# Mark this python module as being part of the implementation
96
# of unittest: this gives us better tracebacks where the last
97
# shown frame is the test code, not our assertXYZ.
100
default_transport = LocalURLServer
103
MODULES_TO_DOCTEST = [
116
def packages_to_test():
117
"""Return a list of packages to test.
119
The packages are not globally imported so that import failures are
120
triggered when running selftest, not when importing the command.
123
import bzrlib.tests.blackbox
124
import bzrlib.tests.branch_implementations
125
import bzrlib.tests.bzrdir_implementations
126
import bzrlib.tests.commands
127
import bzrlib.tests.interrepository_implementations
128
import bzrlib.tests.interversionedfile_implementations
129
import bzrlib.tests.intertree_implementations
130
import bzrlib.tests.per_lock
131
import bzrlib.tests.repository_implementations
132
import bzrlib.tests.revisionstore_implementations
133
import bzrlib.tests.tree_implementations
134
import bzrlib.tests.workingtree_implementations
137
bzrlib.tests.blackbox,
138
bzrlib.tests.branch_implementations,
139
bzrlib.tests.bzrdir_implementations,
140
bzrlib.tests.commands,
141
bzrlib.tests.interrepository_implementations,
142
bzrlib.tests.interversionedfile_implementations,
143
bzrlib.tests.intertree_implementations,
144
bzrlib.tests.per_lock,
145
bzrlib.tests.repository_implementations,
146
bzrlib.tests.revisionstore_implementations,
147
bzrlib.tests.tree_implementations,
148
bzrlib.tests.workingtree_implementations,
152
class ExtendedTestResult(unittest._TextTestResult):
153
"""Accepts, reports and accumulates the results of running tests.
155
Compared to this unittest version this class adds support for profiling,
156
benchmarking, stopping as soon as a test fails, and skipping tests.
157
There are further-specialized subclasses for different types of display.
162
def __init__(self, stream, descriptions, verbosity,
165
use_numbered_dirs=False,
167
"""Construct new TestResult.
169
:param bench_history: Optionally, a writable file object to accumulate
172
unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
173
if bench_history is not None:
174
from bzrlib.version import _get_bzr_source_tree
175
src_tree = _get_bzr_source_tree()
178
revision_id = src_tree.get_parent_ids()[0]
180
# XXX: if this is a brand new tree, do the same as if there
184
# XXX: If there's no branch, what should we do?
186
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
187
self._bench_history = bench_history
188
self.ui = ui.ui_factory
189
self.num_tests = num_tests
191
self.failure_count = 0
192
self.known_failure_count = 0
194
self.unsupported = {}
196
self.use_numbered_dirs = use_numbered_dirs
197
self._overall_start_time = time.time()
199
def extractBenchmarkTime(self, testCase):
200
"""Add a benchmark time for the current test case."""
201
self._benchmarkTime = getattr(testCase, "_benchtime", None)
203
def _elapsedTestTimeString(self):
204
"""Return a time string for the overall time the current test has taken."""
205
return self._formatTime(time.time() - self._start_time)
207
def _testTimeString(self):
208
if self._benchmarkTime is not None:
210
self._formatTime(self._benchmarkTime),
211
self._elapsedTestTimeString())
213
return " %s" % self._elapsedTestTimeString()
215
def _formatTime(self, seconds):
216
"""Format seconds as milliseconds with leading spaces."""
217
# some benchmarks can take thousands of seconds to run, so we need 8
219
return "%8dms" % (1000 * seconds)
221
def _shortened_test_description(self, test):
223
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
226
def startTest(self, test):
227
unittest.TestResult.startTest(self, test)
228
self.report_test_start(test)
229
test.number = self.count
230
self._recordTestStartTime()
232
def _recordTestStartTime(self):
233
"""Record that a test has started."""
234
self._start_time = time.time()
236
def _cleanupLogFile(self, test):
237
# We can only do this if we have one of our TestCases, not if
239
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
240
if setKeepLogfile is not None:
243
def addError(self, test, err):
244
self.extractBenchmarkTime(test)
245
self._cleanupLogFile(test)
246
if isinstance(err[1], TestSkipped):
247
return self.addSkipped(test, err)
248
elif isinstance(err[1], UnavailableFeature):
249
return self.addNotSupported(test, err[1].args[0])
250
unittest.TestResult.addError(self, test, err)
251
self.error_count += 1
252
self.report_error(test, err)
256
def addFailure(self, test, err):
257
self._cleanupLogFile(test)
258
self.extractBenchmarkTime(test)
259
if isinstance(err[1], KnownFailure):
260
return self.addKnownFailure(test, err)
261
unittest.TestResult.addFailure(self, test, err)
262
self.failure_count += 1
263
self.report_failure(test, err)
267
def addKnownFailure(self, test, err):
268
self.known_failure_count += 1
269
self.report_known_failure(test, err)
271
def addNotSupported(self, test, feature):
272
self.unsupported.setdefault(str(feature), 0)
273
self.unsupported[str(feature)] += 1
274
self.report_unsupported(test, feature)
276
def addSuccess(self, test):
277
self.extractBenchmarkTime(test)
278
if self._bench_history is not None:
279
if self._benchmarkTime is not None:
280
self._bench_history.write("%s %s\n" % (
281
self._formatTime(self._benchmarkTime),
283
self.report_success(test)
284
unittest.TestResult.addSuccess(self, test)
286
def addSkipped(self, test, skip_excinfo):
287
self.report_skip(test, skip_excinfo)
288
# seems best to treat this as success from point-of-view of unittest
289
# -- it actually does nothing so it barely matters :)
292
except KeyboardInterrupt:
295
self.addError(test, test.__exc_info())
297
unittest.TestResult.addSuccess(self, test)
299
def printErrorList(self, flavour, errors):
300
for test, err in errors:
301
self.stream.writeln(self.separator1)
302
self.stream.write("%s: " % flavour)
303
if self.use_numbered_dirs:
304
self.stream.write('#%d ' % test.number)
305
self.stream.writeln(self.getDescription(test))
306
if getattr(test, '_get_log', None) is not None:
308
print >>self.stream, \
309
('vvvv[log from %s]' % test.id()).ljust(78,'-')
310
print >>self.stream, test._get_log()
311
print >>self.stream, \
312
('^^^^[log from %s]' % test.id()).ljust(78,'-')
313
self.stream.writeln(self.separator2)
314
self.stream.writeln("%s" % err)
319
def report_cleaning_up(self):
322
def report_success(self, test):
326
class TextTestResult(ExtendedTestResult):
327
"""Displays progress and results of tests in text form"""
329
def __init__(self, stream, descriptions, verbosity,
333
use_numbered_dirs=False,
335
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
336
bench_history, num_tests, use_numbered_dirs)
338
self.pb = self.ui.nested_progress_bar()
339
self._supplied_pb = False
342
self._supplied_pb = True
343
self.pb.show_pct = False
344
self.pb.show_spinner = False
345
self.pb.show_eta = False,
346
self.pb.show_count = False
347
self.pb.show_bar = False
349
def report_starting(self):
350
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
352
def _progress_prefix_text(self):
353
a = '[%d' % self.count
354
if self.num_tests is not None:
355
a +='/%d' % self.num_tests
356
a += ' in %ds' % (time.time() - self._overall_start_time)
358
a += ', %d errors' % self.error_count
359
if self.failure_count:
360
a += ', %d failed' % self.failure_count
361
if self.known_failure_count:
362
a += ', %d known failures' % self.known_failure_count
364
a += ', %d skipped' % self.skip_count
366
a += ', %d missing features' % len(self.unsupported)
370
def report_test_start(self, test):
373
self._progress_prefix_text()
375
+ self._shortened_test_description(test))
377
def _test_description(self, test):
378
if self.use_numbered_dirs:
379
return '#%d %s' % (self.count,
380
self._shortened_test_description(test))
382
return self._shortened_test_description(test)
384
def report_error(self, test, err):
385
self.pb.note('ERROR: %s\n %s\n',
386
self._test_description(test),
390
def report_failure(self, test, err):
391
self.pb.note('FAIL: %s\n %s\n',
392
self._test_description(test),
396
def report_known_failure(self, test, err):
397
self.pb.note('XFAIL: %s\n%s\n',
398
self._test_description(test), err[1])
400
def report_skip(self, test, skip_excinfo):
403
# at the moment these are mostly not things we can fix
404
# and so they just produce stipple; use the verbose reporter
407
# show test and reason for skip
408
self.pb.note('SKIP: %s\n %s\n',
409
self._shortened_test_description(test),
412
# since the class name was left behind in the still-visible
414
self.pb.note('SKIP: %s', skip_excinfo[1])
416
def report_unsupported(self, test, feature):
417
"""test cannot be run because feature is missing."""
419
def report_cleaning_up(self):
420
self.pb.update('cleaning up...')
423
if not self._supplied_pb:
427
class VerboseTestResult(ExtendedTestResult):
428
"""Produce long output, with one line per test run plus times"""
430
def _ellipsize_to_right(self, a_string, final_width):
431
"""Truncate and pad a string, keeping the right hand side"""
432
if len(a_string) > final_width:
433
result = '...' + a_string[3-final_width:]
436
return result.ljust(final_width)
438
def report_starting(self):
439
self.stream.write('running %d tests...\n' % self.num_tests)
441
def report_test_start(self, test):
443
name = self._shortened_test_description(test)
444
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
445
# numbers, plus a trailing blank
446
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
447
if self.use_numbered_dirs:
448
self.stream.write('%5d ' % self.count)
449
self.stream.write(self._ellipsize_to_right(name,
450
osutils.terminal_width()-36))
452
self.stream.write(self._ellipsize_to_right(name,
453
osutils.terminal_width()-30))
456
def _error_summary(self, err):
458
if self.use_numbered_dirs:
460
return '%s%s' % (indent, err[1])
462
def report_error(self, test, err):
463
self.stream.writeln('ERROR %s\n%s'
464
% (self._testTimeString(),
465
self._error_summary(err)))
467
def report_failure(self, test, err):
468
self.stream.writeln(' FAIL %s\n%s'
469
% (self._testTimeString(),
470
self._error_summary(err)))
472
def report_known_failure(self, test, err):
473
self.stream.writeln('XFAIL %s\n%s'
474
% (self._testTimeString(),
475
self._error_summary(err)))
477
def report_success(self, test):
478
self.stream.writeln(' OK %s' % self._testTimeString())
479
for bench_called, stats in getattr(test, '_benchcalls', []):
480
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
481
stats.pprint(file=self.stream)
482
# flush the stream so that we get smooth output. This verbose mode is
483
# used to show the output in PQM.
486
def report_skip(self, test, skip_excinfo):
488
self.stream.writeln(' SKIP %s\n%s'
489
% (self._testTimeString(),
490
self._error_summary(skip_excinfo)))
492
def report_unsupported(self, test, feature):
493
"""test cannot be run because feature is missing."""
494
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
495
%(self._testTimeString(), feature))
499
class TextTestRunner(object):
500
stop_on_failure = False
507
use_numbered_dirs=False,
510
self.stream = unittest._WritelnDecorator(stream)
511
self.descriptions = descriptions
512
self.verbosity = verbosity
513
self._bench_history = bench_history
514
self.use_numbered_dirs = use_numbered_dirs
515
self.list_only = list_only
518
"Run the given test case or test suite."
519
startTime = time.time()
520
if self.verbosity == 1:
521
result_class = TextTestResult
522
elif self.verbosity >= 2:
523
result_class = VerboseTestResult
524
result = result_class(self.stream,
527
bench_history=self._bench_history,
528
num_tests=test.countTestCases(),
529
use_numbered_dirs=self.use_numbered_dirs,
531
result.stop_early = self.stop_on_failure
532
result.report_starting()
534
if self.verbosity >= 2:
535
self.stream.writeln("Listing tests only ...\n")
537
for t in iter_suite_tests(test):
538
self.stream.writeln("%s" % (t.id()))
540
actionTaken = "Listed"
543
run = result.testsRun
545
stopTime = time.time()
546
timeTaken = stopTime - startTime
548
self.stream.writeln(result.separator2)
549
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
550
run, run != 1 and "s" or "", timeTaken))
551
self.stream.writeln()
552
if not result.wasSuccessful():
553
self.stream.write("FAILED (")
554
failed, errored = map(len, (result.failures, result.errors))
556
self.stream.write("failures=%d" % failed)
558
if failed: self.stream.write(", ")
559
self.stream.write("errors=%d" % errored)
560
if result.known_failure_count:
561
if failed or errored: self.stream.write(", ")
562
self.stream.write("known_failure_count=%d" %
563
result.known_failure_count)
564
self.stream.writeln(")")
566
if result.known_failure_count:
567
self.stream.writeln("OK (known_failures=%d)" %
568
result.known_failure_count)
570
self.stream.writeln("OK")
571
if result.skip_count > 0:
572
skipped = result.skip_count
573
self.stream.writeln('%d test%s skipped' %
574
(skipped, skipped != 1 and "s" or ""))
575
if result.unsupported:
576
for feature, count in sorted(result.unsupported.items()):
577
self.stream.writeln("Missing feature '%s' skipped %d tests." %
583
def iter_suite_tests(suite):
584
"""Return all tests in a suite, recursing through nested suites"""
585
for item in suite._tests:
586
if isinstance(item, unittest.TestCase):
588
elif isinstance(item, unittest.TestSuite):
589
for r in iter_suite_tests(item):
592
raise Exception('unknown object %r inside test suite %r'
596
class TestSkipped(Exception):
597
"""Indicates that a test was intentionally skipped, rather than failing."""
600
class KnownFailure(AssertionError):
601
"""Indicates that a test failed in a precisely expected manner.
603
Such failures dont block the whole test suite from passing because they are
604
indicators of partially completed code or of future work. We have an
605
explicit error for them so that we can ensure that they are always visible:
606
KnownFailures are always shown in the output of bzr selftest.
610
class UnavailableFeature(Exception):
611
"""A feature required for this test was not available.
613
The feature should be used to construct the exception.
617
class CommandFailed(Exception):
621
class StringIOWrapper(object):
622
"""A wrapper around cStringIO which just adds an encoding attribute.
624
Internally we can check sys.stdout to see what the output encoding
625
should be. However, cStringIO has no encoding attribute that we can
626
set. So we wrap it instead.
631
def __init__(self, s=None):
633
self.__dict__['_cstring'] = StringIO(s)
635
self.__dict__['_cstring'] = StringIO()
637
def __getattr__(self, name, getattr=getattr):
638
return getattr(self.__dict__['_cstring'], name)
640
def __setattr__(self, name, val):
641
if name == 'encoding':
642
self.__dict__['encoding'] = val
644
return setattr(self._cstring, name, val)
647
class TestUIFactory(ui.CLIUIFactory):
648
"""A UI Factory for testing.
650
Hide the progress bar but emit note()s.
652
Allows get_password to be tested without real tty attached.
659
super(TestUIFactory, self).__init__()
660
if stdin is not None:
661
# We use a StringIOWrapper to be able to test various
662
# encodings, but the user is still responsible to
663
# encode the string and to set the encoding attribute
664
# of StringIOWrapper.
665
self.stdin = StringIOWrapper(stdin)
667
self.stdout = sys.stdout
671
self.stderr = sys.stderr
676
"""See progress.ProgressBar.clear()."""
678
def clear_term(self):
679
"""See progress.ProgressBar.clear_term()."""
681
def clear_term(self):
682
"""See progress.ProgressBar.clear_term()."""
685
"""See progress.ProgressBar.finished()."""
687
def note(self, fmt_string, *args, **kwargs):
688
"""See progress.ProgressBar.note()."""
689
self.stdout.write((fmt_string + "\n") % args)
691
def progress_bar(self):
694
def nested_progress_bar(self):
697
def update(self, message, count=None, total=None):
698
"""See progress.ProgressBar.update()."""
700
def get_non_echoed_password(self, prompt):
701
"""Get password from stdin without trying to handle the echo mode"""
703
self.stdout.write(prompt.encode(self.stdout.encoding, 'replace'))
704
password = self.stdin.readline()
707
if password[-1] == '\n':
708
password = password[:-1]
712
class TestCase(unittest.TestCase):
713
"""Base class for bzr unit tests.
715
Tests that need access to disk resources should subclass
716
TestCaseInTempDir not TestCase.
718
Error and debug log messages are redirected from their usual
719
location into a temporary file, the contents of which can be
720
retrieved by _get_log(). We use a real OS file, not an in-memory object,
721
so that it can also capture file IO. When the test completes this file
722
is read into memory and removed from disk.
724
There are also convenience functions to invoke bzr's command-line
725
routine, and to build and check bzr trees.
727
In addition to the usual method of overriding tearDown(), this class also
728
allows subclasses to register functions into the _cleanups list, which is
729
run in order as the object is torn down. It's less likely this will be
730
accidentally overlooked.
733
_log_file_name = None
735
_keep_log_file = False
736
# record lsprof data when performing benchmark calls.
737
_gather_lsprof_in_benchmarks = False
739
def __init__(self, methodName='testMethod'):
740
super(TestCase, self).__init__(methodName)
744
unittest.TestCase.setUp(self)
745
self._cleanEnvironment()
746
bzrlib.trace.disable_default_logging()
749
self._benchcalls = []
750
self._benchtime = None
753
def _clear_hooks(self):
754
# prevent hooks affecting tests
756
import bzrlib.smart.server
757
self._preserved_hooks = {
758
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
759
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
761
self.addCleanup(self._restoreHooks)
762
# reset all hooks to an empty instance of the appropriate type
763
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
764
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
765
# FIXME: Rather than constructing new objects like this, how about
766
# having save() and clear() methods on the base Hook class? mbp
769
def _silenceUI(self):
770
"""Turn off UI for duration of test"""
771
# by default the UI is off; tests can turn it on if they want it.
772
saved = ui.ui_factory
774
ui.ui_factory = saved
775
ui.ui_factory = ui.SilentUIFactory()
776
self.addCleanup(_restore)
778
def _ndiff_strings(self, a, b):
779
"""Return ndiff between two strings containing lines.
781
A trailing newline is added if missing to make the strings
783
if b and b[-1] != '\n':
785
if a and a[-1] != '\n':
787
difflines = difflib.ndiff(a.splitlines(True),
789
linejunk=lambda x: False,
790
charjunk=lambda x: False)
791
return ''.join(difflines)
793
def assertEqual(self, a, b, message=''):
797
except UnicodeError, e:
798
# If we can't compare without getting a UnicodeError, then
799
# obviously they are different
800
mutter('UnicodeError: %s', e)
803
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
805
pformat(a), pformat(b)))
807
assertEquals = assertEqual
809
def assertEqualDiff(self, a, b, message=None):
810
"""Assert two texts are equal, if not raise an exception.
812
This is intended for use with multi-line strings where it can
813
be hard to find the differences by eye.
815
# TODO: perhaps override assertEquals to call this for strings?
819
message = "texts not equal:\n"
820
raise AssertionError(message +
821
self._ndiff_strings(a, b))
823
def assertEqualMode(self, mode, mode_test):
824
self.assertEqual(mode, mode_test,
825
'mode mismatch %o != %o' % (mode, mode_test))
827
def assertStartsWith(self, s, prefix):
828
if not s.startswith(prefix):
829
raise AssertionError('string %r does not start with %r' % (s, prefix))
831
def assertEndsWith(self, s, suffix):
832
"""Asserts that s ends with suffix."""
833
if not s.endswith(suffix):
834
raise AssertionError('string %r does not end with %r' % (s, suffix))
836
def assertContainsRe(self, haystack, needle_re):
837
"""Assert that a contains something matching a regular expression."""
838
if not re.search(needle_re, haystack):
839
raise AssertionError('pattern "%r" not found in "%r"'
840
% (needle_re, haystack))
842
def assertNotContainsRe(self, haystack, needle_re):
843
"""Assert that a does not match a regular expression"""
844
if re.search(needle_re, haystack):
845
raise AssertionError('pattern "%s" found in "%s"'
846
% (needle_re, haystack))
848
def assertSubset(self, sublist, superlist):
849
"""Assert that every entry in sublist is present in superlist."""
851
for entry in sublist:
852
if entry not in superlist:
853
missing.append(entry)
855
raise AssertionError("value(s) %r not present in container %r" %
856
(missing, superlist))
858
def assertListRaises(self, excClass, func, *args, **kwargs):
859
"""Fail unless excClass is raised when the iterator from func is used.
861
Many functions can return generators this makes sure
862
to wrap them in a list() call to make sure the whole generator
863
is run, and that the proper exception is raised.
866
list(func(*args, **kwargs))
870
if getattr(excClass,'__name__', None) is not None:
871
excName = excClass.__name__
873
excName = str(excClass)
874
raise self.failureException, "%s not raised" % excName
876
def assertRaises(self, excClass, func, *args, **kwargs):
877
"""Assert that a callable raises a particular exception.
879
:param excClass: As for the except statement, this may be either an
880
exception class, or a tuple of classes.
882
Returns the exception so that you can examine it.
885
func(*args, **kwargs)
889
if getattr(excClass,'__name__', None) is not None:
890
excName = excClass.__name__
893
excName = str(excClass)
894
raise self.failureException, "%s not raised" % excName
896
def assertIs(self, left, right, message=None):
897
if not (left is right):
898
if message is not None:
899
raise AssertionError(message)
901
raise AssertionError("%r is not %r." % (left, right))
903
def assertIsNot(self, left, right, message=None):
905
if message is not None:
906
raise AssertionError(message)
908
raise AssertionError("%r is %r." % (left, right))
910
def assertTransportMode(self, transport, path, mode):
911
"""Fail if a path does not have mode mode.
913
If modes are not supported on this transport, the assertion is ignored.
915
if not transport._can_roundtrip_unix_modebits():
917
path_stat = transport.stat(path)
918
actual_mode = stat.S_IMODE(path_stat.st_mode)
919
self.assertEqual(mode, actual_mode,
920
'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
922
def assertIsInstance(self, obj, kls):
923
"""Fail if obj is not an instance of kls"""
924
if not isinstance(obj, kls):
925
self.fail("%r is an instance of %s rather than %s" % (
926
obj, obj.__class__, kls))
928
def expectFailure(self, reason, assertion, *args, **kwargs):
929
"""Invoke a test, expecting it to fail for the given reason.
931
This is for assertions that ought to succeed, but currently fail.
932
(The failure is *expected* but not *wanted*.) Please be very precise
933
about the failure you're expecting. If a new bug is introduced,
934
AssertionError should be raised, not KnownFailure.
936
Frequently, expectFailure should be followed by an opposite assertion.
939
Intended to be used with a callable that raises AssertionError as the
940
'assertion' parameter. args and kwargs are passed to the 'assertion'.
942
Raises KnownFailure if the test fails. Raises AssertionError if the
947
self.expectFailure('Math is broken', self.assertNotEqual, 54,
949
self.assertEqual(42, dynamic_val)
951
This means that a dynamic_val of 54 will cause the test to raise
952
a KnownFailure. Once math is fixed and the expectFailure is removed,
953
only a dynamic_val of 42 will allow the test to pass. Anything other
954
than 54 or 42 will cause an AssertionError.
957
assertion(*args, **kwargs)
958
except AssertionError:
959
raise KnownFailure(reason)
961
self.fail('Unexpected success. Should have failed: %s' % reason)
963
def _capture_warnings(self, a_callable, *args, **kwargs):
964
"""A helper for callDeprecated and applyDeprecated.
966
:param a_callable: A callable to call.
967
:param args: The positional arguments for the callable
968
:param kwargs: The keyword arguments for the callable
969
:return: A tuple (warnings, result). result is the result of calling
970
a_callable(*args, **kwargs).
973
def capture_warnings(msg, cls=None, stacklevel=None):
974
# we've hooked into a deprecation specific callpath,
975
# only deprecations should getting sent via it.
976
self.assertEqual(cls, DeprecationWarning)
977
local_warnings.append(msg)
978
original_warning_method = symbol_versioning.warn
979
symbol_versioning.set_warning_method(capture_warnings)
981
result = a_callable(*args, **kwargs)
983
symbol_versioning.set_warning_method(original_warning_method)
984
return (local_warnings, result)
986
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
987
"""Call a deprecated callable without warning the user.
989
:param deprecation_format: The deprecation format that the callable
990
should have been deprecated with. This is the same type as the
991
parameter to deprecated_method/deprecated_function. If the
992
callable is not deprecated with this format, an assertion error
994
:param a_callable: A callable to call. This may be a bound method or
995
a regular function. It will be called with *args and **kwargs.
996
:param args: The positional arguments for the callable
997
:param kwargs: The keyword arguments for the callable
998
:return: The result of a_callable(*args, **kwargs)
1000
call_warnings, result = self._capture_warnings(a_callable,
1002
expected_first_warning = symbol_versioning.deprecation_string(
1003
a_callable, deprecation_format)
1004
if len(call_warnings) == 0:
1005
self.fail("No deprecation warning generated by call to %s" %
1007
self.assertEqual(expected_first_warning, call_warnings[0])
1010
def callDeprecated(self, expected, callable, *args, **kwargs):
1011
"""Assert that a callable is deprecated in a particular way.
1013
This is a very precise test for unusual requirements. The
1014
applyDeprecated helper function is probably more suited for most tests
1015
as it allows you to simply specify the deprecation format being used
1016
and will ensure that that is issued for the function being called.
1018
:param expected: a list of the deprecation warnings expected, in order
1019
:param callable: The callable to call
1020
:param args: The positional arguments for the callable
1021
:param kwargs: The keyword arguments for the callable
1023
call_warnings, result = self._capture_warnings(callable,
1025
self.assertEqual(expected, call_warnings)
1028
def _startLogFile(self):
1029
"""Send bzr and test log messages to a temporary file.
1031
The file is removed as the test is torn down.
1033
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
1034
self._log_file = os.fdopen(fileno, 'w+')
1035
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
1036
self._log_file_name = name
1037
self.addCleanup(self._finishLogFile)
1039
def _finishLogFile(self):
1040
"""Finished with the log file.
1042
Close the file and delete it, unless setKeepLogfile was called.
1044
if self._log_file is None:
1046
bzrlib.trace.disable_test_log(self._log_nonce)
1047
self._log_file.close()
1048
self._log_file = None
1049
if not self._keep_log_file:
1050
os.remove(self._log_file_name)
1051
self._log_file_name = None
1053
def setKeepLogfile(self):
1054
"""Make the logfile not be deleted when _finishLogFile is called."""
1055
self._keep_log_file = True
1057
def addCleanup(self, callable):
1058
"""Arrange to run a callable when this case is torn down.
1060
Callables are run in the reverse of the order they are registered,
1061
ie last-in first-out.
1063
if callable in self._cleanups:
1064
raise ValueError("cleanup function %r already registered on %s"
1066
self._cleanups.append(callable)
1068
def _cleanEnvironment(self):
1070
'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1071
'HOME': os.getcwd(),
1072
'APPDATA': None, # bzr now use Win32 API and don't rely on APPDATA
1074
'BZREMAIL': None, # may still be present in the environment
1076
'BZR_PROGRESS_BAR': None,
1080
'https_proxy': None,
1081
'HTTPS_PROXY': None,
1086
# Nobody cares about these ones AFAIK. So far at
1087
# least. If you do (care), please update this comment
1091
'BZR_REMOTE_PATH': None,
1094
self.addCleanup(self._restoreEnvironment)
1095
for name, value in new_env.iteritems():
1096
self._captureVar(name, value)
1098
def _captureVar(self, name, newvalue):
1099
"""Set an environment variable, and reset it when finished."""
1100
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1102
def _restoreEnvironment(self):
1103
for name, value in self.__old_env.iteritems():
1104
osutils.set_or_unset_env(name, value)
1106
def _restoreHooks(self):
1107
for klass, hooks in self._preserved_hooks.items():
1108
setattr(klass, 'hooks', hooks)
1110
def knownFailure(self, reason):
1111
"""This test has failed for some known reason."""
1112
raise KnownFailure(reason)
1114
def run(self, result=None):
1115
if result is None: result = self.defaultTestResult()
1116
for feature in getattr(self, '_test_needs_features', []):
1117
if not feature.available():
1118
result.startTest(self)
1119
if getattr(result, 'addNotSupported', None):
1120
result.addNotSupported(self, feature)
1122
result.addSuccess(self)
1123
result.stopTest(self)
1125
return unittest.TestCase.run(self, result)
1129
unittest.TestCase.tearDown(self)
1131
def time(self, callable, *args, **kwargs):
1132
"""Run callable and accrue the time it takes to the benchmark time.
1134
If lsprofiling is enabled (i.e. by --lsprof-time to bzr selftest) then
1135
this will cause lsprofile statistics to be gathered and stored in
1138
if self._benchtime is None:
1142
if not self._gather_lsprof_in_benchmarks:
1143
return callable(*args, **kwargs)
1145
# record this benchmark
1146
ret, stats = bzrlib.lsprof.profile(callable, *args, **kwargs)
1148
self._benchcalls.append(((callable, args, kwargs), stats))
1151
self._benchtime += time.time() - start
1153
def _runCleanups(self):
1154
"""Run registered cleanup functions.
1156
This should only be called from TestCase.tearDown.
1158
# TODO: Perhaps this should keep running cleanups even if
1159
# one of them fails?
1161
# Actually pop the cleanups from the list so tearDown running
1162
# twice is safe (this happens for skipped tests).
1163
while self._cleanups:
1164
self._cleanups.pop()()
1166
def log(self, *args):
1169
def _get_log(self, keep_log_file=False):
1170
"""Return as a string the log for this test. If the file is still
1171
on disk and keep_log_file=False, delete the log file and store the
1172
content in self._log_contents."""
1173
# flush the log file, to get all content
1175
bzrlib.trace._trace_file.flush()
1176
if self._log_contents:
1177
return self._log_contents
1178
if self._log_file_name is not None:
1179
logfile = open(self._log_file_name)
1181
log_contents = logfile.read()
1184
if not keep_log_file:
1185
self._log_contents = log_contents
1187
os.remove(self._log_file_name)
1189
if sys.platform == 'win32' and e.errno == errno.EACCES:
1190
print >>sys.stderr, ('Unable to delete log file '
1191
' %r' % self._log_file_name)
1196
return "DELETED log file to reduce memory footprint"
1198
def capture(self, cmd, retcode=0):
1199
"""Shortcut that splits cmd into words, runs, and returns stdout"""
1200
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1202
def requireFeature(self, feature):
1203
"""This test requires a specific feature is available.
1205
:raises UnavailableFeature: When feature is not available.
1207
if not feature.available():
1208
raise UnavailableFeature(feature)
1210
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1212
"""Invoke bzr and return (stdout, stderr).
1214
Useful for code that wants to check the contents of the
1215
output, the way error messages are presented, etc.
1217
This should be the main method for tests that want to exercise the
1218
overall behavior of the bzr application (rather than a unit test
1219
or a functional test of the library.)
1221
Much of the old code runs bzr by forking a new copy of Python, but
1222
that is slower, harder to debug, and generally not necessary.
1224
This runs bzr through the interface that catches and reports
1225
errors, and with logging set to something approximating the
1226
default, so that error reporting can be checked.
1228
:param argv: arguments to invoke bzr
1229
:param retcode: expected return code, or None for don't-care.
1230
:param encoding: encoding for sys.stdout and sys.stderr
1231
:param stdin: A string to be used as stdin for the command.
1232
:param working_dir: Change to this directory before running
1234
if encoding is None:
1235
encoding = bzrlib.user_encoding
1236
stdout = StringIOWrapper()
1237
stderr = StringIOWrapper()
1238
stdout.encoding = encoding
1239
stderr.encoding = encoding
1241
self.log('run bzr: %r', argv)
1242
# FIXME: don't call into logging here
1243
handler = logging.StreamHandler(stderr)
1244
handler.setLevel(logging.INFO)
1245
logger = logging.getLogger('')
1246
logger.addHandler(handler)
1247
old_ui_factory = ui.ui_factory
1248
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
1251
if working_dir is not None:
1252
cwd = osutils.getcwd()
1253
os.chdir(working_dir)
1256
saved_debug_flags = frozenset(debug.debug_flags)
1257
debug.debug_flags.clear()
1259
result = self.apply_redirected(ui.ui_factory.stdin,
1261
bzrlib.commands.run_bzr_catch_errors,
1264
debug.debug_flags.update(saved_debug_flags)
1266
logger.removeHandler(handler)
1267
ui.ui_factory = old_ui_factory
1271
out = stdout.getvalue()
1272
err = stderr.getvalue()
1274
self.log('output:\n%r', out)
1276
self.log('errors:\n%r', err)
1277
if retcode is not None:
1278
self.assertEquals(retcode, result,
1279
message='Unexpected return code')
1282
def run_bzr(self, *args, **kwargs):
1283
"""Invoke bzr, as if it were run from the command line.
1285
This should be the main method for tests that want to exercise the
1286
overall behavior of the bzr application (rather than a unit test
1287
or a functional test of the library.)
1289
This sends the stdout/stderr results into the test's log,
1290
where it may be useful for debugging. See also run_captured.
1292
:param stdin: A string to be used as stdin for the command.
1293
:param retcode: The status code the command should return
1294
:param working_dir: The directory to run the command in
1296
retcode = kwargs.pop('retcode', 0)
1297
encoding = kwargs.pop('encoding', None)
1298
stdin = kwargs.pop('stdin', None)
1299
working_dir = kwargs.pop('working_dir', None)
1300
error_regexes = kwargs.pop('error_regexes', [])
1302
out, err = self.run_bzr_captured(args, retcode=retcode,
1303
encoding=encoding, stdin=stdin, working_dir=working_dir)
1305
for regex in error_regexes:
1306
self.assertContainsRe(err, regex)
1310
def run_bzr_decode(self, *args, **kwargs):
1311
if 'encoding' in kwargs:
1312
encoding = kwargs['encoding']
1314
encoding = bzrlib.user_encoding
1315
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1317
def run_bzr_error(self, error_regexes, *args, **kwargs):
1318
"""Run bzr, and check that stderr contains the supplied regexes
1320
:param error_regexes: Sequence of regular expressions which
1321
must each be found in the error output. The relative ordering
1323
:param args: command-line arguments for bzr
1324
:param kwargs: Keyword arguments which are interpreted by run_bzr
1325
This function changes the default value of retcode to be 3,
1326
since in most cases this is run when you expect bzr to fail.
1327
:return: (out, err) The actual output of running the command (in case you
1328
want to do more inspection)
1331
# Make sure that commit is failing because there is nothing to do
1332
self.run_bzr_error(['no changes to commit'],
1333
'commit', '-m', 'my commit comment')
1334
# Make sure --strict is handling an unknown file, rather than
1335
# giving us the 'nothing to do' error
1336
self.build_tree(['unknown'])
1337
self.run_bzr_error(['Commit refused because there are unknown files'],
1338
'commit', '--strict', '-m', 'my commit comment')
1340
kwargs.setdefault('retcode', 3)
1341
out, err = self.run_bzr(error_regexes=error_regexes, *args, **kwargs)
1344
def run_bzr_subprocess(self, *args, **kwargs):
1345
"""Run bzr in a subprocess for testing.
1347
This starts a new Python interpreter and runs bzr in there.
1348
This should only be used for tests that have a justifiable need for
1349
this isolation: e.g. they are testing startup time, or signal
1350
handling, or early startup code, etc. Subprocess code can't be
1351
profiled or debugged so easily.
1353
:param retcode: The status code that is expected. Defaults to 0. If
1354
None is supplied, the status code is not checked.
1355
:param env_changes: A dictionary which lists changes to environment
1356
variables. A value of None will unset the env variable.
1357
The values must be strings. The change will only occur in the
1358
child, so you don't need to fix the environment after running.
1359
:param universal_newlines: Convert CRLF => LF
1360
:param allow_plugins: By default the subprocess is run with
1361
--no-plugins to ensure test reproducibility. Also, it is possible
1362
for system-wide plugins to create unexpected output on stderr,
1363
which can cause unnecessary test failures.
1365
env_changes = kwargs.get('env_changes', {})
1366
working_dir = kwargs.get('working_dir', None)
1367
allow_plugins = kwargs.get('allow_plugins', False)
1368
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1369
working_dir=working_dir,
1370
allow_plugins=allow_plugins)
1371
# We distinguish between retcode=None and retcode not passed.
1372
supplied_retcode = kwargs.get('retcode', 0)
1373
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1374
universal_newlines=kwargs.get('universal_newlines', False),
1377
def start_bzr_subprocess(self, process_args, env_changes=None,
1378
skip_if_plan_to_signal=False,
1380
allow_plugins=False):
1381
"""Start bzr in a subprocess for testing.
1383
This starts a new Python interpreter and runs bzr in there.
1384
This should only be used for tests that have a justifiable need for
1385
this isolation: e.g. they are testing startup time, or signal
1386
handling, or early startup code, etc. Subprocess code can't be
1387
profiled or debugged so easily.
1389
:param process_args: a list of arguments to pass to the bzr executable,
1390
for example `['--version']`.
1391
:param env_changes: A dictionary which lists changes to environment
1392
variables. A value of None will unset the env variable.
1393
The values must be strings. The change will only occur in the
1394
child, so you don't need to fix the environment after running.
1395
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1397
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1399
:returns: Popen object for the started process.
1401
if skip_if_plan_to_signal:
1402
if not getattr(os, 'kill', None):
1403
raise TestSkipped("os.kill not available.")
1405
if env_changes is None:
1409
def cleanup_environment():
1410
for env_var, value in env_changes.iteritems():
1411
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1413
def restore_environment():
1414
for env_var, value in old_env.iteritems():
1415
osutils.set_or_unset_env(env_var, value)
1417
bzr_path = self.get_bzr_path()
1420
if working_dir is not None:
1421
cwd = osutils.getcwd()
1422
os.chdir(working_dir)
1425
# win32 subprocess doesn't support preexec_fn
1426
# so we will avoid using it on all platforms, just to
1427
# make sure the code path is used, and we don't break on win32
1428
cleanup_environment()
1429
command = [sys.executable, bzr_path]
1430
if not allow_plugins:
1431
command.append('--no-plugins')
1432
command.extend(process_args)
1433
process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1435
restore_environment()
1441
def _popen(self, *args, **kwargs):
1442
"""Place a call to Popen.
1444
Allows tests to override this method to intercept the calls made to
1445
Popen for introspection.
1447
return Popen(*args, **kwargs)
1449
def get_bzr_path(self):
1450
"""Return the path of the 'bzr' executable for this test suite."""
1451
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
1452
if not os.path.isfile(bzr_path):
1453
# We are probably installed. Assume sys.argv is the right file
1454
bzr_path = sys.argv[0]
1457
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1458
universal_newlines=False, process_args=None):
1459
"""Finish the execution of process.
1461
:param process: the Popen object returned from start_bzr_subprocess.
1462
:param retcode: The status code that is expected. Defaults to 0. If
1463
None is supplied, the status code is not checked.
1464
:param send_signal: an optional signal to send to the process.
1465
:param universal_newlines: Convert CRLF => LF
1466
:returns: (stdout, stderr)
1468
if send_signal is not None:
1469
os.kill(process.pid, send_signal)
1470
out, err = process.communicate()
1472
if universal_newlines:
1473
out = out.replace('\r\n', '\n')
1474
err = err.replace('\r\n', '\n')
1476
if retcode is not None and retcode != process.returncode:
1477
if process_args is None:
1478
process_args = "(unknown args)"
1479
mutter('Output of bzr %s:\n%s', process_args, out)
1480
mutter('Error for bzr %s:\n%s', process_args, err)
1481
self.fail('Command bzr %s failed with retcode %s != %s'
1482
% (process_args, retcode, process.returncode))
1485
def check_inventory_shape(self, inv, shape):
1486
"""Compare an inventory to a list of expected names.
1488
Fail if they are not precisely equal.
1491
shape = list(shape) # copy
1492
for path, ie in inv.entries():
1493
name = path.replace('\\', '/')
1494
if ie.kind == 'dir':
1501
self.fail("expected paths not found in inventory: %r" % shape)
1503
self.fail("unexpected paths found in inventory: %r" % extras)
1505
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
1506
a_callable=None, *args, **kwargs):
1507
"""Call callable with redirected std io pipes.
1509
Returns the return code."""
1510
if not callable(a_callable):
1511
raise ValueError("a_callable must be callable.")
1513
stdin = StringIO("")
1515
if getattr(self, "_log_file", None) is not None:
1516
stdout = self._log_file
1520
if getattr(self, "_log_file", None is not None):
1521
stderr = self._log_file
1524
real_stdin = sys.stdin
1525
real_stdout = sys.stdout
1526
real_stderr = sys.stderr
1531
return a_callable(*args, **kwargs)
1533
sys.stdout = real_stdout
1534
sys.stderr = real_stderr
1535
sys.stdin = real_stdin
1537
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1538
def merge(self, branch_from, wt_to):
1539
"""A helper for tests to do a ui-less merge.
1541
This should move to the main library when someone has time to integrate
1544
# minimal ui-less merge.
1545
wt_to.branch.fetch(branch_from)
1546
base_rev = common_ancestor(branch_from.last_revision(),
1547
wt_to.branch.last_revision(),
1548
wt_to.branch.repository)
1549
merge_inner(wt_to.branch, branch_from.basis_tree(),
1550
wt_to.branch.repository.revision_tree(base_rev),
1552
wt_to.add_parent_tree_id(branch_from.last_revision())
1554
def reduceLockdirTimeout(self):
1555
"""Reduce the default lock timeout for the duration of the test, so that
1556
if LockContention occurs during a test, it does so quickly.
1558
Tests that expect to provoke LockContention errors should call this.
1560
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1562
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1563
self.addCleanup(resetTimeout)
1564
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1566
BzrTestBase = TestCase
1569
class TestCaseWithMemoryTransport(TestCase):
1570
"""Common test class for tests that do not need disk resources.
1572
Tests that need disk resources should derive from TestCaseWithTransport.
1574
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1576
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1577
a directory which does not exist. This serves to help ensure test isolation
1578
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1579
must exist. However, TestCaseWithMemoryTransport does not offer local
1580
file defaults for the transport in tests, nor does it obey the command line
1581
override, so tests that accidentally write to the common directory should
1584
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1585
a .bzr directory that stops us ascending higher into the filesystem.
1591
def __init__(self, methodName='runTest'):
1592
# allow test parameterisation after test construction and before test
1593
# execution. Variables that the parameteriser sets need to be
1594
# ones that are not set by setUp, or setUp will trash them.
1595
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1596
self.vfs_transport_factory = default_transport
1597
self.transport_server = None
1598
self.transport_readonly_server = None
1599
self.__vfs_server = None
1601
def get_transport(self):
1602
"""Return a writeable transport for the test scratch space"""
1603
t = get_transport(self.get_url())
1604
self.assertFalse(t.is_readonly())
1607
def get_readonly_transport(self):
1608
"""Return a readonly transport for the test scratch space
1610
This can be used to test that operations which should only need
1611
readonly access in fact do not try to write.
1613
t = get_transport(self.get_readonly_url())
1614
self.assertTrue(t.is_readonly())
1617
def create_transport_readonly_server(self):
1618
"""Create a transport server from class defined at init.
1620
This is mostly a hook for daughter classes.
1622
return self.transport_readonly_server()
1624
def get_readonly_server(self):
1625
"""Get the server instance for the readonly transport
1627
This is useful for some tests with specific servers to do diagnostics.
1629
if self.__readonly_server is None:
1630
if self.transport_readonly_server is None:
1631
# readonly decorator requested
1632
# bring up the server
1633
self.__readonly_server = ReadonlyServer()
1634
self.__readonly_server.setUp(self.get_vfs_only_server())
1636
self.__readonly_server = self.create_transport_readonly_server()
1637
self.__readonly_server.setUp(self.get_vfs_only_server())
1638
self.addCleanup(self.__readonly_server.tearDown)
1639
return self.__readonly_server
1641
def get_readonly_url(self, relpath=None):
1642
"""Get a URL for the readonly transport.
1644
This will either be backed by '.' or a decorator to the transport
1645
used by self.get_url()
1646
relpath provides for clients to get a path relative to the base url.
1647
These should only be downwards relative, not upwards.
1649
base = self.get_readonly_server().get_url()
1650
if relpath is not None:
1651
if not base.endswith('/'):
1653
base = base + relpath
1656
def get_vfs_only_server(self):
1657
"""Get the vfs only read/write server instance.
1659
This is useful for some tests with specific servers that need
1662
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1663
is no means to override it.
1665
if self.__vfs_server is None:
1666
self.__vfs_server = MemoryServer()
1667
self.__vfs_server.setUp()
1668
self.addCleanup(self.__vfs_server.tearDown)
1669
return self.__vfs_server
1671
def get_server(self):
1672
"""Get the read/write server instance.
1674
This is useful for some tests with specific servers that need
1677
This is built from the self.transport_server factory. If that is None,
1678
then the self.get_vfs_server is returned.
1680
if self.__server is None:
1681
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1682
return self.get_vfs_only_server()
1684
# bring up a decorated means of access to the vfs only server.
1685
self.__server = self.transport_server()
1687
self.__server.setUp(self.get_vfs_only_server())
1688
except TypeError, e:
1689
# This should never happen; the try:Except here is to assist
1690
# developers having to update code rather than seeing an
1691
# uninformative TypeError.
1692
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1693
self.addCleanup(self.__server.tearDown)
1694
return self.__server
1696
def _adjust_url(self, base, relpath):
1697
"""Get a URL (or maybe a path) for the readwrite transport.
1699
This will either be backed by '.' or to an equivalent non-file based
1701
relpath provides for clients to get a path relative to the base url.
1702
These should only be downwards relative, not upwards.
1704
if relpath is not None and relpath != '.':
1705
if not base.endswith('/'):
1707
# XXX: Really base should be a url; we did after all call
1708
# get_url()! But sometimes it's just a path (from
1709
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1710
# to a non-escaped local path.
1711
if base.startswith('./') or base.startswith('/'):
1714
base += urlutils.escape(relpath)
1717
def get_url(self, relpath=None):
1718
"""Get a URL (or maybe a path) for the readwrite transport.
1720
This will either be backed by '.' or to an equivalent non-file based
1722
relpath provides for clients to get a path relative to the base url.
1723
These should only be downwards relative, not upwards.
1725
base = self.get_server().get_url()
1726
return self._adjust_url(base, relpath)
1728
def get_vfs_only_url(self, relpath=None):
1729
"""Get a URL (or maybe a path for the plain old vfs transport.
1731
This will never be a smart protocol. It always has all the
1732
capabilities of the local filesystem, but it might actually be a
1733
MemoryTransport or some other similar virtual filesystem.
1735
This is the backing transport (if any) of the server returned by
1736
get_url and get_readonly_url.
1738
:param relpath: provides for clients to get a path relative to the base
1739
url. These should only be downwards relative, not upwards.
1741
base = self.get_vfs_only_server().get_url()
1742
return self._adjust_url(base, relpath)
1744
def _make_test_root(self):
1745
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1747
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1748
TestCaseWithMemoryTransport.TEST_ROOT = root
1750
# make a fake bzr directory there to prevent any tests propagating
1751
# up onto the source directory's real branch
1752
bzrdir.BzrDir.create_standalone_workingtree(root)
1754
# The same directory is used by all tests, and we're not specifically
1755
# told when all tests are finished. This will do.
1756
atexit.register(_rmtree_temp_dir, root)
1758
def makeAndChdirToTestDir(self):
1759
"""Create a temporary directories for this one test.
1761
This must set self.test_home_dir and self.test_dir and chdir to
1764
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1766
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1767
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1768
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1770
def make_branch(self, relpath, format=None):
1771
"""Create a branch on the transport at relpath."""
1772
repo = self.make_repository(relpath, format=format)
1773
return repo.bzrdir.create_branch()
1775
def make_bzrdir(self, relpath, format=None):
1777
# might be a relative or absolute path
1778
maybe_a_url = self.get_url(relpath)
1779
segments = maybe_a_url.rsplit('/', 1)
1780
t = get_transport(maybe_a_url)
1781
if len(segments) > 1 and segments[-1] not in ('', '.'):
1785
if isinstance(format, basestring):
1786
format = bzrdir.format_registry.make_bzrdir(format)
1787
return format.initialize_on_transport(t)
1788
except errors.UninitializableFormat:
1789
raise TestSkipped("Format %s is not initializable." % format)
1791
def make_repository(self, relpath, shared=False, format=None):
1792
"""Create a repository on our default transport at relpath.
1794
Note that relpath must be a relative path, not a full url.
1796
# FIXME: If you create a remoterepository this returns the underlying
1797
# real format, which is incorrect. Actually we should make sure that
1798
# RemoteBzrDir returns a RemoteRepository.
1799
# maybe mbp 20070410
1800
made_control = self.make_bzrdir(relpath, format=format)
1801
return made_control.create_repository(shared=shared)
1803
def make_branch_and_memory_tree(self, relpath, format=None):
1804
"""Create a branch on the default transport and a MemoryTree for it."""
1805
b = self.make_branch(relpath, format=format)
1806
return memorytree.MemoryTree.create_on_branch(b)
1808
def overrideEnvironmentForTesting(self):
1809
os.environ['HOME'] = self.test_home_dir
1810
os.environ['BZR_HOME'] = self.test_home_dir
1813
super(TestCaseWithMemoryTransport, self).setUp()
1814
self._make_test_root()
1815
_currentdir = os.getcwdu()
1816
def _leaveDirectory():
1817
os.chdir(_currentdir)
1818
self.addCleanup(_leaveDirectory)
1819
self.makeAndChdirToTestDir()
1820
self.overrideEnvironmentForTesting()
1821
self.__readonly_server = None
1822
self.__server = None
1823
self.reduceLockdirTimeout()
1826
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1827
"""Derived class that runs a test within a temporary directory.
1829
This is useful for tests that need to create a branch, etc.
1831
The directory is created in a slightly complex way: for each
1832
Python invocation, a new temporary top-level directory is created.
1833
All test cases create their own directory within that. If the
1834
tests complete successfully, the directory is removed.
1836
:ivar test_base_dir: The path of the top-level directory for this
1837
test, which contains a home directory and a work directory.
1839
:ivar test_home_dir: An initially empty directory under test_base_dir
1840
which is used as $HOME for this test.
1842
:ivar test_dir: A directory under test_base_dir used as the current
1843
directory when the test proper is run.
1846
OVERRIDE_PYTHON = 'python'
1847
use_numbered_dirs = False
1849
def check_file_contents(self, filename, expect):
1850
self.log("check contents of file %s" % filename)
1851
contents = file(filename, 'r').read()
1852
if contents != expect:
1853
self.log("expected: %r" % expect)
1854
self.log("actually: %r" % contents)
1855
self.fail("contents of %s not as expected" % filename)
1857
def makeAndChdirToTestDir(self):
1858
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1860
For TestCaseInTempDir we create a temporary directory based on the test
1861
name and then create two subdirs - test and home under it.
1863
# create a directory within the top level test directory
1864
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1865
# now create test and home directories within this dir
1866
self.test_base_dir = candidate_dir
1867
self.test_home_dir = self.test_base_dir + '/home'
1868
os.mkdir(self.test_home_dir)
1869
self.test_dir = self.test_base_dir + '/work'
1870
os.mkdir(self.test_dir)
1871
os.chdir(self.test_dir)
1872
# put name of test inside
1873
f = file(self.test_base_dir + '/name', 'w')
1878
self.addCleanup(self.deleteTestDir)
1880
def deleteTestDir(self):
1881
_rmtree_temp_dir(self.test_base_dir)
1883
def build_tree(self, shape, line_endings='binary', transport=None):
1884
"""Build a test tree according to a pattern.
1886
shape is a sequence of file specifications. If the final
1887
character is '/', a directory is created.
1889
This assumes that all the elements in the tree being built are new.
1891
This doesn't add anything to a branch.
1892
:param line_endings: Either 'binary' or 'native'
1893
in binary mode, exact contents are written
1894
in native mode, the line endings match the
1895
default platform endings.
1897
:param transport: A transport to write to, for building trees on
1898
VFS's. If the transport is readonly or None,
1899
"." is opened automatically.
1901
# It's OK to just create them using forward slashes on windows.
1902
if transport is None or transport.is_readonly():
1903
transport = get_transport(".")
1905
self.assert_(isinstance(name, basestring))
1907
transport.mkdir(urlutils.escape(name[:-1]))
1909
if line_endings == 'binary':
1911
elif line_endings == 'native':
1914
raise errors.BzrError(
1915
'Invalid line ending request %r' % line_endings)
1916
content = "contents of %s%s" % (name.encode('utf-8'), end)
1917
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1919
def build_tree_contents(self, shape):
1920
build_tree_contents(shape)
1922
def assertFileEqual(self, content, path):
1923
"""Fail if path does not contain 'content'."""
1924
self.failUnlessExists(path)
1925
f = file(path, 'rb')
1930
self.assertEqualDiff(content, s)
1932
def failUnlessExists(self, path):
1933
"""Fail unless path or paths, which may be abs or relative, exist."""
1934
if not isinstance(path, basestring):
1936
self.failUnlessExists(p)
1938
self.failUnless(osutils.lexists(path),path+" does not exist")
1940
def failIfExists(self, path):
1941
"""Fail if path or paths, which may be abs or relative, exist."""
1942
if not isinstance(path, basestring):
1944
self.failIfExists(p)
1946
self.failIf(osutils.lexists(path),path+" exists")
1948
def assertInWorkingTree(self,path,root_path='.',tree=None):
1949
"""Assert whether path or paths are in the WorkingTree"""
1951
tree = workingtree.WorkingTree.open(root_path)
1952
if not isinstance(path, basestring):
1954
self.assertInWorkingTree(p,tree=tree)
1956
self.assertIsNot(tree.path2id(path), None,
1957
path+' not in working tree.')
1959
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
1960
"""Assert whether path or paths are not in the WorkingTree"""
1962
tree = workingtree.WorkingTree.open(root_path)
1963
if not isinstance(path, basestring):
1965
self.assertNotInWorkingTree(p,tree=tree)
1967
self.assertIs(tree.path2id(path), None, path+' in working tree.')
1970
class TestCaseWithTransport(TestCaseInTempDir):
1971
"""A test case that provides get_url and get_readonly_url facilities.
1973
These back onto two transport servers, one for readonly access and one for
1976
If no explicit class is provided for readonly access, a
1977
ReadonlyTransportDecorator is used instead which allows the use of non disk
1978
based read write transports.
1980
If an explicit class is provided for readonly access, that server and the
1981
readwrite one must both define get_url() as resolving to os.getcwd().
1984
def get_vfs_only_server(self):
1985
"""See TestCaseWithMemoryTransport.
1987
This is useful for some tests with specific servers that need
1990
if self.__vfs_server is None:
1991
self.__vfs_server = self.vfs_transport_factory()
1992
self.__vfs_server.setUp()
1993
self.addCleanup(self.__vfs_server.tearDown)
1994
return self.__vfs_server
1996
def make_branch_and_tree(self, relpath, format=None):
1997
"""Create a branch on the transport and a tree locally.
1999
If the transport is not a LocalTransport, the Tree can't be created on
2000
the transport. In that case if the vfs_transport_factory is
2001
LocalURLServer the working tree is created in the local
2002
directory backing the transport, and the returned tree's branch and
2003
repository will also be accessed locally. Otherwise a lightweight
2004
checkout is created and returned.
2006
:param format: The BzrDirFormat.
2007
:returns: the WorkingTree.
2009
# TODO: always use the local disk path for the working tree,
2010
# this obviously requires a format that supports branch references
2011
# so check for that by checking bzrdir.BzrDirFormat.get_default_format()
2013
b = self.make_branch(relpath, format=format)
2015
return b.bzrdir.create_workingtree()
2016
except errors.NotLocalUrl:
2017
# We can only make working trees locally at the moment. If the
2018
# transport can't support them, then we keep the non-disk-backed
2019
# branch and create a local checkout.
2020
if self.vfs_transport_factory is LocalURLServer:
2021
# the branch is colocated on disk, we cannot create a checkout.
2022
# hopefully callers will expect this.
2023
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2024
return local_controldir.create_workingtree()
2026
return b.create_checkout(relpath, lightweight=True)
2028
def assertIsDirectory(self, relpath, transport):
2029
"""Assert that relpath within transport is a directory.
2031
This may not be possible on all transports; in that case it propagates
2032
a TransportNotPossible.
2035
mode = transport.stat(relpath).st_mode
2036
except errors.NoSuchFile:
2037
self.fail("path %s is not a directory; no such file"
2039
if not stat.S_ISDIR(mode):
2040
self.fail("path %s is not a directory; has mode %#o"
2043
def assertTreesEqual(self, left, right):
2044
"""Check that left and right have the same content and properties."""
2045
# we use a tree delta to check for equality of the content, and we
2046
# manually check for equality of other things such as the parents list.
2047
self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
2048
differences = left.changes_from(right)
2049
self.assertFalse(differences.has_changed(),
2050
"Trees %r and %r are different: %r" % (left, right, differences))
2053
super(TestCaseWithTransport, self).setUp()
2054
self.__vfs_server = None
2057
class ChrootedTestCase(TestCaseWithTransport):
2058
"""A support class that provides readonly urls outside the local namespace.
2060
This is done by checking if self.transport_server is a MemoryServer. if it
2061
is then we are chrooted already, if it is not then an HttpServer is used
2064
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
2065
be used without needed to redo it when a different
2066
subclass is in use ?
2070
super(ChrootedTestCase, self).setUp()
2071
if not self.vfs_transport_factory == MemoryServer:
2072
self.transport_readonly_server = HttpServer
2075
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2076
random_order=False):
2077
"""Create a test suite by filtering another one.
2079
:param suite: the source suite
2080
:param pattern: pattern that names must match
2081
:param exclude_pattern: pattern that names must not match, if any
2082
:param random_order: if True, tests in the new suite will be put in
2084
:returns: the newly created suite
2086
return sort_suite_by_re(suite, pattern, exclude_pattern,
2087
random_order, False)
2090
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2091
random_order=False, append_rest=True):
2092
"""Create a test suite by sorting another one.
2094
:param suite: the source suite
2095
:param pattern: pattern that names must match in order to go
2096
first in the new suite
2097
:param exclude_pattern: pattern that names must not match, if any
2098
:param random_order: if True, tests in the new suite will be put in
2100
:param append_rest: if False, pattern is a strict filter and not
2101
just an ordering directive
2102
:returns: the newly created suite
2106
filter_re = re.compile(pattern)
2107
if exclude_pattern is not None:
2108
exclude_re = re.compile(exclude_pattern)
2109
for test in iter_suite_tests(suite):
2111
if exclude_pattern is None or not exclude_re.search(test_id):
2112
if filter_re.search(test_id):
2117
random.shuffle(first)
2118
random.shuffle(second)
2119
return TestUtil.TestSuite(first + second)
2122
def run_suite(suite, name='test', verbose=False, pattern=".*",
2123
stop_on_failure=False,
2124
transport=None, lsprof_timed=None, bench_history=None,
2125
matching_tests_first=None,
2129
exclude_pattern=None,
2131
use_numbered_dirs = bool(numbered_dirs)
2133
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2134
if numbered_dirs is not None:
2135
TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
2140
runner = TextTestRunner(stream=sys.stdout,
2142
verbosity=verbosity,
2143
bench_history=bench_history,
2144
use_numbered_dirs=use_numbered_dirs,
2145
list_only=list_only,
2147
runner.stop_on_failure=stop_on_failure
2148
# Initialise the random number generator and display the seed used.
2149
# We convert the seed to a long to make it reuseable across invocations.
2150
random_order = False
2151
if random_seed is not None:
2153
if random_seed == "now":
2154
random_seed = long(time.time())
2156
# Convert the seed to a long if we can
2158
random_seed = long(random_seed)
2161
runner.stream.writeln("Randomizing test order using seed %s\n" %
2163
random.seed(random_seed)
2164
# Customise the list of tests if requested
2165
if pattern != '.*' or exclude_pattern is not None or random_order:
2166
if matching_tests_first:
2167
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2170
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2172
result = runner.run(suite)
2173
return result.wasSuccessful()
2176
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2178
test_suite_factory=None,
2181
matching_tests_first=None,
2185
exclude_pattern=None):
2186
"""Run the whole test suite under the enhanced runner"""
2187
# XXX: Very ugly way to do this...
2188
# Disable warning about old formats because we don't want it to disturb
2189
# any blackbox tests.
2190
from bzrlib import repository
2191
repository._deprecation_warning_done = True
2193
global default_transport
2194
if transport is None:
2195
transport = default_transport
2196
old_transport = default_transport
2197
default_transport = transport
2199
if test_suite_factory is None:
2200
suite = test_suite()
2202
suite = test_suite_factory()
2203
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2204
stop_on_failure=stop_on_failure,
2205
transport=transport,
2206
lsprof_timed=lsprof_timed,
2207
bench_history=bench_history,
2208
matching_tests_first=matching_tests_first,
2209
numbered_dirs=numbered_dirs,
2210
list_only=list_only,
2211
random_seed=random_seed,
2212
exclude_pattern=exclude_pattern)
2214
default_transport = old_transport
2218
"""Build and return TestSuite for the whole of bzrlib.
2220
This function can be replaced if you need to change the default test
2221
suite on a global basis, but it is not encouraged.
2224
'bzrlib.tests.test_ancestry',
2225
'bzrlib.tests.test_annotate',
2226
'bzrlib.tests.test_api',
2227
'bzrlib.tests.test_atomicfile',
2228
'bzrlib.tests.test_bad_files',
2229
'bzrlib.tests.test_branch',
2230
'bzrlib.tests.test_branchbuilder',
2231
'bzrlib.tests.test_bugtracker',
2232
'bzrlib.tests.test_bundle',
2233
'bzrlib.tests.test_bzrdir',
2234
'bzrlib.tests.test_cache_utf8',
2235
'bzrlib.tests.test_commands',
2236
'bzrlib.tests.test_commit',
2237
'bzrlib.tests.test_commit_merge',
2238
'bzrlib.tests.test_config',
2239
'bzrlib.tests.test_conflicts',
2240
'bzrlib.tests.test_counted_lock',
2241
'bzrlib.tests.test_decorators',
2242
'bzrlib.tests.test_delta',
2243
'bzrlib.tests.test_deprecated_graph',
2244
'bzrlib.tests.test_diff',
2245
'bzrlib.tests.test_dirstate',
2246
'bzrlib.tests.test_errors',
2247
'bzrlib.tests.test_escaped_store',
2248
'bzrlib.tests.test_extract',
2249
'bzrlib.tests.test_fetch',
2250
'bzrlib.tests.test_ftp_transport',
2251
'bzrlib.tests.test_generate_docs',
2252
'bzrlib.tests.test_generate_ids',
2253
'bzrlib.tests.test_globbing',
2254
'bzrlib.tests.test_gpg',
2255
'bzrlib.tests.test_graph',
2256
'bzrlib.tests.test_hashcache',
2257
'bzrlib.tests.test_help',
2258
'bzrlib.tests.test_http',
2259
'bzrlib.tests.test_http_response',
2260
'bzrlib.tests.test_https_ca_bundle',
2261
'bzrlib.tests.test_identitymap',
2262
'bzrlib.tests.test_ignores',
2263
'bzrlib.tests.test_info',
2264
'bzrlib.tests.test_inv',
2265
'bzrlib.tests.test_knit',
2266
'bzrlib.tests.test_lazy_import',
2267
'bzrlib.tests.test_lazy_regex',
2268
'bzrlib.tests.test_lockdir',
2269
'bzrlib.tests.test_lockable_files',
2270
'bzrlib.tests.test_log',
2271
'bzrlib.tests.test_lsprof',
2272
'bzrlib.tests.test_memorytree',
2273
'bzrlib.tests.test_merge',
2274
'bzrlib.tests.test_merge3',
2275
'bzrlib.tests.test_merge_core',
2276
'bzrlib.tests.test_merge_directive',
2277
'bzrlib.tests.test_missing',
2278
'bzrlib.tests.test_msgeditor',
2279
'bzrlib.tests.test_nonascii',
2280
'bzrlib.tests.test_options',
2281
'bzrlib.tests.test_osutils',
2282
'bzrlib.tests.test_osutils_encodings',
2283
'bzrlib.tests.test_patch',
2284
'bzrlib.tests.test_patches',
2285
'bzrlib.tests.test_permissions',
2286
'bzrlib.tests.test_plugins',
2287
'bzrlib.tests.test_progress',
2288
'bzrlib.tests.test_reconcile',
2289
'bzrlib.tests.test_registry',
2290
'bzrlib.tests.test_remote',
2291
'bzrlib.tests.test_repository',
2292
'bzrlib.tests.test_revert',
2293
'bzrlib.tests.test_revision',
2294
'bzrlib.tests.test_revisionnamespaces',
2295
'bzrlib.tests.test_revisiontree',
2296
'bzrlib.tests.test_rio',
2297
'bzrlib.tests.test_sampler',
2298
'bzrlib.tests.test_selftest',
2299
'bzrlib.tests.test_setup',
2300
'bzrlib.tests.test_sftp_transport',
2301
'bzrlib.tests.test_smart',
2302
'bzrlib.tests.test_smart_add',
2303
'bzrlib.tests.test_smart_transport',
2304
'bzrlib.tests.test_source',
2305
'bzrlib.tests.test_ssh_transport',
2306
'bzrlib.tests.test_status',
2307
'bzrlib.tests.test_store',
2308
'bzrlib.tests.test_strace',
2309
'bzrlib.tests.test_subsume',
2310
'bzrlib.tests.test_symbol_versioning',
2311
'bzrlib.tests.test_tag',
2312
'bzrlib.tests.test_testament',
2313
'bzrlib.tests.test_textfile',
2314
'bzrlib.tests.test_textmerge',
2315
'bzrlib.tests.test_timestamp',
2316
'bzrlib.tests.test_trace',
2317
'bzrlib.tests.test_transactions',
2318
'bzrlib.tests.test_transform',
2319
'bzrlib.tests.test_transport',
2320
'bzrlib.tests.test_tree',
2321
'bzrlib.tests.test_treebuilder',
2322
'bzrlib.tests.test_tsort',
2323
'bzrlib.tests.test_tuned_gzip',
2324
'bzrlib.tests.test_ui',
2325
'bzrlib.tests.test_upgrade',
2326
'bzrlib.tests.test_urlutils',
2327
'bzrlib.tests.test_versionedfile',
2328
'bzrlib.tests.test_version',
2329
'bzrlib.tests.test_version_info',
2330
'bzrlib.tests.test_weave',
2331
'bzrlib.tests.test_whitebox',
2332
'bzrlib.tests.test_workingtree',
2333
'bzrlib.tests.test_workingtree_4',
2334
'bzrlib.tests.test_wsgi',
2335
'bzrlib.tests.test_xml',
2337
test_transport_implementations = [
2338
'bzrlib.tests.test_transport_implementations',
2339
'bzrlib.tests.test_read_bundle',
2341
suite = TestUtil.TestSuite()
2342
loader = TestUtil.TestLoader()
2343
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2344
from bzrlib.transport import TransportTestProviderAdapter
2345
adapter = TransportTestProviderAdapter()
2346
adapt_modules(test_transport_implementations, adapter, loader, suite)
2347
for package in packages_to_test():
2348
suite.addTest(package.test_suite())
2349
for m in MODULES_TO_TEST:
2350
suite.addTest(loader.loadTestsFromModule(m))
2351
for m in MODULES_TO_DOCTEST:
2353
suite.addTest(doctest.DocTestSuite(m))
2354
except ValueError, e:
2355
print '**failed to get doctest for: %s\n%s' %(m,e)
2357
for name, plugin in bzrlib.plugin.all_plugins().items():
2358
if getattr(plugin, 'test_suite', None) is not None:
2359
default_encoding = sys.getdefaultencoding()
2361
plugin_suite = plugin.test_suite()
2362
except ImportError, e:
2363
bzrlib.trace.warning(
2364
'Unable to test plugin "%s": %s', name, e)
2366
suite.addTest(plugin_suite)
2367
if default_encoding != sys.getdefaultencoding():
2368
bzrlib.trace.warning(
2369
'Plugin "%s" tried to reset default encoding to: %s', name,
2370
sys.getdefaultencoding())
2372
sys.setdefaultencoding(default_encoding)
2376
def adapt_modules(mods_list, adapter, loader, suite):
2377
"""Adapt the modules in mods_list using adapter and add to suite."""
2378
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
2379
suite.addTests(adapter.adapt(test))
2382
def _rmtree_temp_dir(dirname):
2383
# If LANG=C we probably have created some bogus paths
2384
# which rmtree(unicode) will fail to delete
2385
# so make sure we are using rmtree(str) to delete everything
2386
# except on win32, where rmtree(str) will fail
2387
# since it doesn't have the property of byte-stream paths
2388
# (they are either ascii or mbcs)
2389
if sys.platform == 'win32':
2390
# make sure we are using the unicode win32 api
2391
dirname = unicode(dirname)
2393
dirname = dirname.encode(sys.getfilesystemencoding())
2395
osutils.rmtree(dirname)
2397
if sys.platform == 'win32' and e.errno == errno.EACCES:
2398
print >>sys.stderr, ('Permission denied: '
2399
'unable to remove testing dir '
2400
'%s' % os.path.basename(dirname))
2405
def clean_selftest_output(root=None, quiet=False):
2406
"""Remove all selftest output directories from root directory.
2408
:param root: root directory for clean
2409
(if ommitted or None then clean current directory).
2410
:param quiet: suppress report about deleting directories
2413
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2416
for i in os.listdir(root):
2417
if os.path.isdir(i) and re_dir.match(i):
2419
print 'delete directory:', i
2423
class Feature(object):
2424
"""An operating system Feature."""
2427
self._available = None
2429
def available(self):
2430
"""Is the feature available?
2432
:return: True if the feature is available.
2434
if self._available is None:
2435
self._available = self._probe()
2436
return self._available
2439
"""Implement this method in concrete features.
2441
:return: True if the feature is available.
2443
raise NotImplementedError
2446
if getattr(self, 'feature_name', None):
2447
return self.feature_name()
2448
return self.__class__.__name__