159
155
bench_history.write("--date %s %s\n" % (time.time(), revision_id))
160
156
self._bench_history = bench_history
162
def extractBenchmarkTime(self, testCase):
157
self.ui = ui.ui_factory
158
self.num_tests = num_tests
160
self.failure_count = 0
161
self.known_failure_count = 0
163
self.not_applicable_count = 0
164
self.unsupported = {}
166
self._overall_start_time = time.time()
168
def _extractBenchmarkTime(self, testCase):
163
169
"""Add a benchmark time for the current test case."""
164
self._benchmarkTime = getattr(testCase, "_benchtime", None)
170
return getattr(testCase, "_benchtime", None)
166
172
def _elapsedTestTimeString(self):
167
173
"""Return a time string for the overall time the current test has taken."""
168
174
return self._formatTime(time.time() - self._start_time)
170
def _testTimeString(self):
171
if self._benchmarkTime is not None:
176
def _testTimeString(self, testCase):
177
benchmark_time = self._extractBenchmarkTime(testCase)
178
if benchmark_time is not None:
172
179
return "%s/%s" % (
173
self._formatTime(self._benchmarkTime),
180
self._formatTime(benchmark_time),
174
181
self._elapsedTestTimeString())
176
return " %s" % self._elapsedTestTimeString()
183
return " %s" % self._elapsedTestTimeString()
178
185
def _formatTime(self, seconds):
179
186
"""Format seconds as milliseconds with leading spaces."""
180
return "%5dms" % (1000 * seconds)
187
# some benchmarks can take thousands of seconds to run, so we need 8
189
return "%8dms" % (1000 * seconds)
182
def _ellipsise_unimportant_words(self, a_string, final_width,
184
"""Add ellipses (sp?) for overly long strings.
186
:param keep_start: If true preserve the start of a_string rather
190
if len(a_string) > final_width:
191
result = a_string[:final_width-3] + '...'
195
if len(a_string) > final_width:
196
result = '...' + a_string[3-final_width:]
199
return result.ljust(final_width)
191
def _shortened_test_description(self, test):
193
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
201
196
def startTest(self, test):
202
197
unittest.TestResult.startTest(self, test)
203
# In a short description, the important words are in
204
# the beginning, but in an id, the important words are
206
SHOW_DESCRIPTIONS = False
208
if not self.showAll and self.dots and self.pb is not None:
211
final_width = osutils.terminal_width()
212
final_width = final_width - 15 - 8
214
if SHOW_DESCRIPTIONS:
215
what = test.shortDescription()
217
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
220
if what.startswith('bzrlib.tests.'):
222
what = self._ellipsise_unimportant_words(what, final_width)
224
self.stream.write(what)
225
elif self.dots and self.pb is not None:
226
self.pb.update(what, self.testsRun - 1, None)
198
self.report_test_start(test)
199
test.number = self.count
228
200
self._recordTestStartTime()
230
202
def _recordTestStartTime(self):
231
203
"""Record that a test has started."""
232
204
self._start_time = time.time()
206
def _cleanupLogFile(self, test):
207
# We can only do this if we have one of our TestCases, not if
209
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
210
if setKeepLogfile is not None:
234
213
def addError(self, test, err):
235
if isinstance(err[1], TestSkipped):
236
return self.addSkipped(test, err)
237
unittest.TestResult.addError(self, test, err)
238
self.extractBenchmarkTime(test)
240
self.stream.writeln("ERROR %s" % self._testTimeString())
241
elif self.dots and self.pb is None:
242
self.stream.write('E')
244
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
245
self.pb.note(self._ellipsise_unimportant_words(
246
test.id() + ': ERROR',
247
osutils.terminal_width()))
214
"""Tell result that test finished with an error.
216
Called from the TestCase run() method when the test
217
fails with an unexpected error.
219
self._testConcluded(test)
220
if isinstance(err[1], TestNotApplicable):
221
return self._addNotApplicable(test, err)
222
elif isinstance(err[1], UnavailableFeature):
223
return self.addNotSupported(test, err[1].args[0])
225
unittest.TestResult.addError(self, test, err)
226
self.error_count += 1
227
self.report_error(test, err)
230
self._cleanupLogFile(test)
252
232
def addFailure(self, test, err):
253
unittest.TestResult.addFailure(self, test, err)
254
self.extractBenchmarkTime(test)
256
self.stream.writeln(" FAIL %s" % self._testTimeString())
257
elif self.dots and self.pb is None:
258
self.stream.write('F')
260
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
261
self.pb.note(self._ellipsise_unimportant_words(
262
test.id() + ': FAIL',
263
osutils.terminal_width()))
233
"""Tell result that test failed.
235
Called from the TestCase run() method when the test
236
fails because e.g. an assert() method failed.
238
self._testConcluded(test)
239
if isinstance(err[1], KnownFailure):
240
return self._addKnownFailure(test, err)
242
unittest.TestResult.addFailure(self, test, err)
243
self.failure_count += 1
244
self.report_failure(test, err)
247
self._cleanupLogFile(test)
268
249
def addSuccess(self, test):
269
self.extractBenchmarkTime(test)
250
"""Tell result that test completed successfully.
252
Called from the TestCase run()
254
self._testConcluded(test)
270
255
if self._bench_history is not None:
271
if self._benchmarkTime is not None:
256
benchmark_time = self._extractBenchmarkTime(test)
257
if benchmark_time is not None:
272
258
self._bench_history.write("%s %s\n" % (
273
self._formatTime(self._benchmarkTime),
259
self._formatTime(benchmark_time),
276
self.stream.writeln(' OK %s' % self._testTimeString())
277
for bench_called, stats in getattr(test, '_benchcalls', []):
278
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
279
stats.pprint(file=self.stream)
280
elif self.dots and self.pb is None:
281
self.stream.write('~')
283
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
261
self.report_success(test)
262
self._cleanupLogFile(test)
285
263
unittest.TestResult.addSuccess(self, test)
287
def addSkipped(self, test, skip_excinfo):
288
self.extractBenchmarkTime(test)
290
print >>self.stream, ' SKIP %s' % self._testTimeString()
291
print >>self.stream, ' %s' % skip_excinfo[1]
292
elif self.dots and self.pb is None:
293
self.stream.write('S')
295
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
297
# seems best to treat this as success from point-of-view of unittest
298
# -- it actually does nothing so it barely matters :)
264
test._log_contents = ''
266
def _testConcluded(self, test):
267
"""Common code when a test has finished.
269
Called regardless of whether it succeded, failed, etc.
273
def _addKnownFailure(self, test, err):
274
self.known_failure_count += 1
275
self.report_known_failure(test, err)
277
def addNotSupported(self, test, feature):
278
"""The test will not be run because of a missing feature.
280
# this can be called in two different ways: it may be that the
281
# test started running, and then raised (through addError)
282
# UnavailableFeature. Alternatively this method can be called
283
# while probing for features before running the tests; in that
284
# case we will see startTest and stopTest, but the test will never
286
self.unsupported.setdefault(str(feature), 0)
287
self.unsupported[str(feature)] += 1
288
self.report_unsupported(test, feature)
290
def addSkip(self, test, reason):
291
"""A test has not run for 'reason'."""
293
self.report_skip(test, reason)
295
def _addNotApplicable(self, test, skip_excinfo):
296
if isinstance(skip_excinfo[1], TestNotApplicable):
297
self.not_applicable_count += 1
298
self.report_not_applicable(test, skip_excinfo)
301
301
except KeyboardInterrupt:
304
self.addError(test, test.__exc_info())
304
self.addError(test, test.exc_info())
306
# seems best to treat this as success from point-of-view of unittest
307
# -- it actually does nothing so it barely matters :)
306
308
unittest.TestResult.addSuccess(self, test)
309
test._log_contents = ''
308
311
def printErrorList(self, flavour, errors):
309
312
for test, err in errors:
310
313
self.stream.writeln(self.separator1)
311
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
314
self.stream.write("%s: " % flavour)
315
self.stream.writeln(self.getDescription(test))
312
316
if getattr(test, '_get_log', None) is not None:
314
print >>self.stream, \
315
('vvvv[log from %s]' % test.id()).ljust(78,'-')
316
print >>self.stream, test._get_log()
317
print >>self.stream, \
318
('^^^^[log from %s]' % test.id()).ljust(78,'-')
317
self.stream.write('\n')
319
('vvvv[log from %s]' % test.id()).ljust(78,'-'))
320
self.stream.write('\n')
321
self.stream.write(test._get_log())
322
self.stream.write('\n')
324
('^^^^[log from %s]' % test.id()).ljust(78,'-'))
325
self.stream.write('\n')
319
326
self.stream.writeln(self.separator2)
320
327
self.stream.writeln("%s" % err)
332
def report_cleaning_up(self):
335
def report_success(self, test):
338
def wasStrictlySuccessful(self):
339
if self.unsupported or self.known_failure_count:
341
return self.wasSuccessful()
344
class TextTestResult(ExtendedTestResult):
345
"""Displays progress and results of tests in text form"""
347
def __init__(self, stream, descriptions, verbosity,
352
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
353
bench_history, num_tests)
355
self.pb = self.ui.nested_progress_bar()
356
self._supplied_pb = False
359
self._supplied_pb = True
360
self.pb.show_pct = False
361
self.pb.show_spinner = False
362
self.pb.show_eta = False,
363
self.pb.show_count = False
364
self.pb.show_bar = False
366
def report_starting(self):
367
self.pb.update('[test 0/%d] Starting' % (self.num_tests))
369
def _progress_prefix_text(self):
370
# the longer this text, the less space we have to show the test
372
a = '[%d' % self.count # total that have been run
373
# tests skipped as known not to be relevant are not important enough
375
## if self.skip_count:
376
## a += ', %d skip' % self.skip_count
377
## if self.known_failure_count:
378
## a += '+%dX' % self.known_failure_count
379
if self.num_tests is not None:
380
a +='/%d' % self.num_tests
382
runtime = time.time() - self._overall_start_time
384
a += '%dm%ds' % (runtime / 60, runtime % 60)
388
a += ', %d err' % self.error_count
389
if self.failure_count:
390
a += ', %d fail' % self.failure_count
392
a += ', %d missing' % len(self.unsupported)
396
def report_test_start(self, test):
399
self._progress_prefix_text()
401
+ self._shortened_test_description(test))
403
def _test_description(self, test):
404
return self._shortened_test_description(test)
406
def report_error(self, test, err):
407
self.pb.note('ERROR: %s\n %s\n',
408
self._test_description(test),
412
def report_failure(self, test, err):
413
self.pb.note('FAIL: %s\n %s\n',
414
self._test_description(test),
418
def report_known_failure(self, test, err):
419
self.pb.note('XFAIL: %s\n%s\n',
420
self._test_description(test), err[1])
422
def report_skip(self, test, reason):
425
def report_not_applicable(self, test, skip_excinfo):
428
def report_unsupported(self, test, feature):
429
"""test cannot be run because feature is missing."""
431
def report_cleaning_up(self):
432
self.pb.update('Cleaning up')
435
if not self._supplied_pb:
439
class VerboseTestResult(ExtendedTestResult):
440
"""Produce long output, with one line per test run plus times"""
442
def _ellipsize_to_right(self, a_string, final_width):
443
"""Truncate and pad a string, keeping the right hand side"""
444
if len(a_string) > final_width:
445
result = '...' + a_string[3-final_width:]
448
return result.ljust(final_width)
450
def report_starting(self):
451
self.stream.write('running %d tests...\n' % self.num_tests)
453
def report_test_start(self, test):
455
name = self._shortened_test_description(test)
456
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
457
# numbers, plus a trailing blank
458
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
459
self.stream.write(self._ellipsize_to_right(name,
460
osutils.terminal_width()-30))
463
def _error_summary(self, err):
465
return '%s%s' % (indent, err[1])
467
def report_error(self, test, err):
468
self.stream.writeln('ERROR %s\n%s'
469
% (self._testTimeString(test),
470
self._error_summary(err)))
472
def report_failure(self, test, err):
473
self.stream.writeln(' FAIL %s\n%s'
474
% (self._testTimeString(test),
475
self._error_summary(err)))
477
def report_known_failure(self, test, err):
478
self.stream.writeln('XFAIL %s\n%s'
479
% (self._testTimeString(test),
480
self._error_summary(err)))
482
def report_success(self, test):
483
self.stream.writeln(' OK %s' % self._testTimeString(test))
484
for bench_called, stats in getattr(test, '_benchcalls', []):
485
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
486
stats.pprint(file=self.stream)
487
# flush the stream so that we get smooth output. This verbose mode is
488
# used to show the output in PQM.
491
def report_skip(self, test, reason):
492
self.stream.writeln(' SKIP %s\n%s'
493
% (self._testTimeString(test), reason))
495
def report_not_applicable(self, test, skip_excinfo):
496
self.stream.writeln(' N/A %s\n%s'
497
% (self._testTimeString(test),
498
self._error_summary(skip_excinfo)))
500
def report_unsupported(self, test, feature):
501
"""test cannot be run because feature is missing."""
502
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
503
%(self._testTimeString(test), feature))
323
506
class TextTestRunner(object):
324
507
stop_on_failure = False
822
1549
This sends the stdout/stderr results into the test's log,
823
1550
where it may be useful for debugging. See also run_captured.
825
:param stdin: A string to be used as stdin for the command.
1552
:keyword stdin: A string to be used as stdin for the command.
1553
:keyword retcode: The status code the command should return;
1555
:keyword working_dir: The directory to run the command in
1556
:keyword error_regexes: A list of expected error messages. If
1557
specified they must be seen in the error output of the command.
827
retcode = kwargs.pop('retcode', 0)
828
encoding = kwargs.pop('encoding', None)
829
stdin = kwargs.pop('stdin', None)
830
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding, stdin=stdin)
832
def run_bzr_decode(self, *args, **kwargs):
833
if 'encoding' in kwargs:
834
encoding = kwargs['encoding']
836
encoding = bzrlib.user_encoding
837
return self.run_bzr(*args, **kwargs)[0].decode(encoding)
1559
out, err = self._run_bzr_autosplit(
1564
working_dir=working_dir,
1566
for regex in error_regexes:
1567
self.assertContainsRe(err, regex)
839
1570
def run_bzr_error(self, error_regexes, *args, **kwargs):
840
1571
"""Run bzr, and check that stderr contains the supplied regexes
842
:param error_regexes: Sequence of regular expressions which
1573
:param error_regexes: Sequence of regular expressions which
843
1574
must each be found in the error output. The relative ordering
844
1575
is not enforced.
845
1576
:param args: command-line arguments for bzr
846
1577
:param kwargs: Keyword arguments which are interpreted by run_bzr
847
1578
This function changes the default value of retcode to be 3,
848
1579
since in most cases this is run when you expect bzr to fail.
849
:return: (out, err) The actual output of running the command (in case you
850
want to do more inspection)
1581
:return: (out, err) The actual output of running the command (in case
1582
you want to do more inspection)
853
1586
# Make sure that commit is failing because there is nothing to do
854
1587
self.run_bzr_error(['no changes to commit'],
855
'commit', '-m', 'my commit comment')
1588
['commit', '-m', 'my commit comment'])
856
1589
# Make sure --strict is handling an unknown file, rather than
857
1590
# giving us the 'nothing to do' error
858
1591
self.build_tree(['unknown'])
859
1592
self.run_bzr_error(['Commit refused because there are unknown files'],
860
'commit', '--strict', '-m', 'my commit comment')
1593
['commit', --strict', '-m', 'my commit comment'])
862
1595
kwargs.setdefault('retcode', 3)
1596
kwargs['error_regexes'] = error_regexes
863
1597
out, err = self.run_bzr(*args, **kwargs)
864
for regex in error_regexes:
865
self.assertContainsRe(err, regex)
868
1600
def run_bzr_subprocess(self, *args, **kwargs):
869
1601
"""Run bzr in a subprocess for testing.
871
This starts a new Python interpreter and runs bzr in there.
1603
This starts a new Python interpreter and runs bzr in there.
872
1604
This should only be used for tests that have a justifiable need for
873
1605
this isolation: e.g. they are testing startup time, or signal
874
handling, or early startup code, etc. Subprocess code can't be
1606
handling, or early startup code, etc. Subprocess code can't be
875
1607
profiled or debugged so easily.
877
:param retcode: The status code that is expected. Defaults to 0. If
1609
:keyword retcode: The status code that is expected. Defaults to 0. If
878
1610
None is supplied, the status code is not checked.
1611
:keyword env_changes: A dictionary which lists changes to environment
1612
variables. A value of None will unset the env variable.
1613
The values must be strings. The change will only occur in the
1614
child, so you don't need to fix the environment after running.
1615
:keyword universal_newlines: Convert CRLF => LF
1616
:keyword allow_plugins: By default the subprocess is run with
1617
--no-plugins to ensure test reproducibility. Also, it is possible
1618
for system-wide plugins to create unexpected output on stderr,
1619
which can cause unnecessary test failures.
1621
env_changes = kwargs.get('env_changes', {})
1622
working_dir = kwargs.get('working_dir', None)
1623
allow_plugins = kwargs.get('allow_plugins', False)
1625
if isinstance(args[0], list):
1627
elif isinstance(args[0], basestring):
1628
args = list(shlex.split(args[0]))
1630
raise ValueError("passing varargs to run_bzr_subprocess")
1631
process = self.start_bzr_subprocess(args, env_changes=env_changes,
1632
working_dir=working_dir,
1633
allow_plugins=allow_plugins)
1634
# We distinguish between retcode=None and retcode not passed.
1635
supplied_retcode = kwargs.get('retcode', 0)
1636
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
1637
universal_newlines=kwargs.get('universal_newlines', False),
1640
def start_bzr_subprocess(self, process_args, env_changes=None,
1641
skip_if_plan_to_signal=False,
1643
allow_plugins=False):
1644
"""Start bzr in a subprocess for testing.
1646
This starts a new Python interpreter and runs bzr in there.
1647
This should only be used for tests that have a justifiable need for
1648
this isolation: e.g. they are testing startup time, or signal
1649
handling, or early startup code, etc. Subprocess code can't be
1650
profiled or debugged so easily.
1652
:param process_args: a list of arguments to pass to the bzr executable,
1653
for example ``['--version']``.
879
1654
:param env_changes: A dictionary which lists changes to environment
880
1655
variables. A value of None will unset the env variable.
881
1656
The values must be strings. The change will only occur in the
882
1657
child, so you don't need to fix the environment after running.
883
:param universal_newlines: Convert CRLF => LF
1658
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
1660
:param allow_plugins: If False (default) pass --no-plugins to bzr.
1662
:returns: Popen object for the started process.
885
env_changes = kwargs.get('env_changes', {})
1664
if skip_if_plan_to_signal:
1665
if not getattr(os, 'kill', None):
1666
raise TestSkipped("os.kill not available.")
1668
if env_changes is None:
889
1672
def cleanup_environment():
972
1800
sys.stderr = real_stderr
973
1801
sys.stdin = real_stdin
975
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
976
def merge(self, branch_from, wt_to):
977
"""A helper for tests to do a ui-less merge.
979
This should move to the main library when someone has time to integrate
982
# minimal ui-less merge.
983
wt_to.branch.fetch(branch_from)
984
base_rev = common_ancestor(branch_from.last_revision(),
985
wt_to.branch.last_revision(),
986
wt_to.branch.repository)
987
merge_inner(wt_to.branch, branch_from.basis_tree(),
988
wt_to.branch.repository.revision_tree(base_rev),
990
wt_to.add_parent_tree_id(branch_from.last_revision())
993
BzrTestBase = TestCase
996
class TestCaseInTempDir(TestCase):
1803
def reduceLockdirTimeout(self):
1804
"""Reduce the default lock timeout for the duration of the test, so that
1805
if LockContention occurs during a test, it does so quickly.
1807
Tests that expect to provoke LockContention errors should call this.
1809
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1811
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1812
self.addCleanup(resetTimeout)
1813
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1815
def make_utf8_encoded_stringio(self, encoding_type=None):
1816
"""Return a StringIOWrapper instance, that will encode Unicode
1819
if encoding_type is None:
1820
encoding_type = 'strict'
1822
output_encoding = 'utf-8'
1823
sio = codecs.getwriter(output_encoding)(sio, errors=encoding_type)
1824
sio.encoding = output_encoding
1828
class CapturedCall(object):
1829
"""A helper for capturing smart server calls for easy debug analysis."""
1831
def __init__(self, params, prefix_length):
1832
"""Capture the call with params and skip prefix_length stack frames."""
1835
# The last 5 frames are the __init__, the hook frame, and 3 smart
1836
# client frames. Beyond this we could get more clever, but this is good
1838
stack = traceback.extract_stack()[prefix_length:-5]
1839
self.stack = ''.join(traceback.format_list(stack))
1842
return self.call.method
1845
return self.call.method
1851
class TestCaseWithMemoryTransport(TestCase):
1852
"""Common test class for tests that do not need disk resources.
1854
Tests that need disk resources should derive from TestCaseWithTransport.
1856
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1858
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1859
a directory which does not exist. This serves to help ensure test isolation
1860
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1861
must exist. However, TestCaseWithMemoryTransport does not offer local
1862
file defaults for the transport in tests, nor does it obey the command line
1863
override, so tests that accidentally write to the common directory should
1866
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1867
a .bzr directory that stops us ascending higher into the filesystem.
1873
def __init__(self, methodName='runTest'):
1874
# allow test parameterization after test construction and before test
1875
# execution. Variables that the parameterizer sets need to be
1876
# ones that are not set by setUp, or setUp will trash them.
1877
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1878
self.vfs_transport_factory = default_transport
1879
self.transport_server = None
1880
self.transport_readonly_server = None
1881
self.__vfs_server = None
1883
def get_transport(self, relpath=None):
1884
"""Return a writeable transport.
1886
This transport is for the test scratch space relative to
1889
:param relpath: a path relative to the base url.
1891
t = get_transport(self.get_url(relpath))
1892
self.assertFalse(t.is_readonly())
1895
def get_readonly_transport(self, relpath=None):
1896
"""Return a readonly transport for the test scratch space
1898
This can be used to test that operations which should only need
1899
readonly access in fact do not try to write.
1901
:param relpath: a path relative to the base url.
1903
t = get_transport(self.get_readonly_url(relpath))
1904
self.assertTrue(t.is_readonly())
1907
def create_transport_readonly_server(self):
1908
"""Create a transport server from class defined at init.
1910
This is mostly a hook for daughter classes.
1912
return self.transport_readonly_server()
1914
def get_readonly_server(self):
1915
"""Get the server instance for the readonly transport
1917
This is useful for some tests with specific servers to do diagnostics.
1919
if self.__readonly_server is None:
1920
if self.transport_readonly_server is None:
1921
# readonly decorator requested
1922
# bring up the server
1923
self.__readonly_server = ReadonlyServer()
1924
self.__readonly_server.setUp(self.get_vfs_only_server())
1926
self.__readonly_server = self.create_transport_readonly_server()
1927
self.__readonly_server.setUp(self.get_vfs_only_server())
1928
self.addCleanup(self.__readonly_server.tearDown)
1929
return self.__readonly_server
1931
def get_readonly_url(self, relpath=None):
1932
"""Get a URL for the readonly transport.
1934
This will either be backed by '.' or a decorator to the transport
1935
used by self.get_url()
1936
relpath provides for clients to get a path relative to the base url.
1937
These should only be downwards relative, not upwards.
1939
base = self.get_readonly_server().get_url()
1940
return self._adjust_url(base, relpath)
1942
def get_vfs_only_server(self):
1943
"""Get the vfs only read/write server instance.
1945
This is useful for some tests with specific servers that need
1948
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1949
is no means to override it.
1951
if self.__vfs_server is None:
1952
self.__vfs_server = MemoryServer()
1953
self.__vfs_server.setUp()
1954
self.addCleanup(self.__vfs_server.tearDown)
1955
return self.__vfs_server
1957
def get_server(self):
1958
"""Get the read/write server instance.
1960
This is useful for some tests with specific servers that need
1963
This is built from the self.transport_server factory. If that is None,
1964
then the self.get_vfs_server is returned.
1966
if self.__server is None:
1967
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1968
return self.get_vfs_only_server()
1970
# bring up a decorated means of access to the vfs only server.
1971
self.__server = self.transport_server()
1973
self.__server.setUp(self.get_vfs_only_server())
1974
except TypeError, e:
1975
# This should never happen; the try:Except here is to assist
1976
# developers having to update code rather than seeing an
1977
# uninformative TypeError.
1978
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1979
self.addCleanup(self.__server.tearDown)
1980
return self.__server
1982
def _adjust_url(self, base, relpath):
1983
"""Get a URL (or maybe a path) for the readwrite transport.
1985
This will either be backed by '.' or to an equivalent non-file based
1987
relpath provides for clients to get a path relative to the base url.
1988
These should only be downwards relative, not upwards.
1990
if relpath is not None and relpath != '.':
1991
if not base.endswith('/'):
1993
# XXX: Really base should be a url; we did after all call
1994
# get_url()! But sometimes it's just a path (from
1995
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1996
# to a non-escaped local path.
1997
if base.startswith('./') or base.startswith('/'):
2000
base += urlutils.escape(relpath)
2003
def get_url(self, relpath=None):
2004
"""Get a URL (or maybe a path) for the readwrite transport.
2006
This will either be backed by '.' or to an equivalent non-file based
2008
relpath provides for clients to get a path relative to the base url.
2009
These should only be downwards relative, not upwards.
2011
base = self.get_server().get_url()
2012
return self._adjust_url(base, relpath)
2014
def get_vfs_only_url(self, relpath=None):
2015
"""Get a URL (or maybe a path for the plain old vfs transport.
2017
This will never be a smart protocol. It always has all the
2018
capabilities of the local filesystem, but it might actually be a
2019
MemoryTransport or some other similar virtual filesystem.
2021
This is the backing transport (if any) of the server returned by
2022
get_url and get_readonly_url.
2024
:param relpath: provides for clients to get a path relative to the base
2025
url. These should only be downwards relative, not upwards.
2028
base = self.get_vfs_only_server().get_url()
2029
return self._adjust_url(base, relpath)
2031
def _create_safety_net(self):
2032
"""Make a fake bzr directory.
2034
This prevents any tests propagating up onto the TEST_ROOT directory's
2037
root = TestCaseWithMemoryTransport.TEST_ROOT
2038
bzrdir.BzrDir.create_standalone_workingtree(root)
2040
def _check_safety_net(self):
2041
"""Check that the safety .bzr directory have not been touched.
2043
_make_test_root have created a .bzr directory to prevent tests from
2044
propagating. This method ensures than a test did not leaked.
2046
root = TestCaseWithMemoryTransport.TEST_ROOT
2047
wt = workingtree.WorkingTree.open(root)
2048
last_rev = wt.last_revision()
2049
if last_rev != 'null:':
2050
# The current test have modified the /bzr directory, we need to
2051
# recreate a new one or all the followng tests will fail.
2052
# If you need to inspect its content uncomment the following line
2053
# import pdb; pdb.set_trace()
2054
_rmtree_temp_dir(root + '/.bzr')
2055
self._create_safety_net()
2056
raise AssertionError('%s/.bzr should not be modified' % root)
2058
def _make_test_root(self):
2059
if TestCaseWithMemoryTransport.TEST_ROOT is None:
2060
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
2061
TestCaseWithMemoryTransport.TEST_ROOT = root
2063
self._create_safety_net()
2065
# The same directory is used by all tests, and we're not
2066
# specifically told when all tests are finished. This will do.
2067
atexit.register(_rmtree_temp_dir, root)
2069
self.addCleanup(self._check_safety_net)
2071
def makeAndChdirToTestDir(self):
2072
"""Create a temporary directories for this one test.
2074
This must set self.test_home_dir and self.test_dir and chdir to
2077
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
2079
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2080
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2081
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2083
def make_branch(self, relpath, format=None):
2084
"""Create a branch on the transport at relpath."""
2085
repo = self.make_repository(relpath, format=format)
2086
return repo.bzrdir.create_branch()
2088
def make_bzrdir(self, relpath, format=None):
2090
# might be a relative or absolute path
2091
maybe_a_url = self.get_url(relpath)
2092
segments = maybe_a_url.rsplit('/', 1)
2093
t = get_transport(maybe_a_url)
2094
if len(segments) > 1 and segments[-1] not in ('', '.'):
2098
if isinstance(format, basestring):
2099
format = bzrdir.format_registry.make_bzrdir(format)
2100
return format.initialize_on_transport(t)
2101
except errors.UninitializableFormat:
2102
raise TestSkipped("Format %s is not initializable." % format)
2104
def make_repository(self, relpath, shared=False, format=None):
2105
"""Create a repository on our default transport at relpath.
2107
Note that relpath must be a relative path, not a full url.
2109
# FIXME: If you create a remoterepository this returns the underlying
2110
# real format, which is incorrect. Actually we should make sure that
2111
# RemoteBzrDir returns a RemoteRepository.
2112
# maybe mbp 20070410
2113
made_control = self.make_bzrdir(relpath, format=format)
2114
return made_control.create_repository(shared=shared)
2116
def make_smart_server(self, path):
2117
smart_server = server.SmartTCPServer_for_testing()
2118
smart_server.setUp(self.get_server())
2119
remote_transport = get_transport(smart_server.get_url()).clone(path)
2120
self.addCleanup(smart_server.tearDown)
2121
return remote_transport
2123
def make_branch_and_memory_tree(self, relpath, format=None):
2124
"""Create a branch on the default transport and a MemoryTree for it."""
2125
b = self.make_branch(relpath, format=format)
2126
return memorytree.MemoryTree.create_on_branch(b)
2128
def make_branch_builder(self, relpath, format=None):
2129
return branchbuilder.BranchBuilder(self.get_transport(relpath),
2132
def overrideEnvironmentForTesting(self):
2133
os.environ['HOME'] = self.test_home_dir
2134
os.environ['BZR_HOME'] = self.test_home_dir
2137
super(TestCaseWithMemoryTransport, self).setUp()
2138
self._make_test_root()
2139
_currentdir = os.getcwdu()
2140
def _leaveDirectory():
2141
os.chdir(_currentdir)
2142
self.addCleanup(_leaveDirectory)
2143
self.makeAndChdirToTestDir()
2144
self.overrideEnvironmentForTesting()
2145
self.__readonly_server = None
2146
self.__server = None
2147
self.reduceLockdirTimeout()
2149
def setup_smart_server_with_call_log(self):
2150
"""Sets up a smart server as the transport server with a call log."""
2151
self.transport_server = server.SmartTCPServer_for_testing
2152
self.hpss_calls = []
2154
# Skip the current stack down to the caller of
2155
# setup_smart_server_with_call_log
2156
prefix_length = len(traceback.extract_stack()) - 2
2157
def capture_hpss_call(params):
2158
self.hpss_calls.append(
2159
CapturedCall(params, prefix_length))
2160
client._SmartClient.hooks.install_named_hook(
2161
'call', capture_hpss_call, None)
2163
def reset_smart_call_log(self):
2164
self.hpss_calls = []
2167
class TestCaseInTempDir(TestCaseWithMemoryTransport):
997
2168
"""Derived class that runs a test within a temporary directory.
999
2170
This is useful for tests that need to create a branch, etc.
1301
2403
for readonly urls.
1303
2405
TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
1304
be used without needed to redo it when a different
2406
be used without needed to redo it when a different
1305
2407
subclass is in use ?
1308
2410
def setUp(self):
1309
2411
super(ChrootedTestCase, self).setUp()
1310
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1311
self.transport_readonly_server = bzrlib.transport.http.HttpServer
2412
if not self.vfs_transport_factory == MemoryServer:
2413
self.transport_readonly_server = HttpServer
2416
def condition_id_re(pattern):
2417
"""Create a condition filter which performs a re check on a test's id.
2419
:param pattern: A regular expression string.
2420
:return: A callable that returns True if the re matches.
2422
filter_re = re.compile(pattern)
2423
def condition(test):
2425
return filter_re.search(test_id)
2429
def condition_isinstance(klass_or_klass_list):
2430
"""Create a condition filter which returns isinstance(param, klass).
2432
:return: A callable which when called with one parameter obj return the
2433
result of isinstance(obj, klass_or_klass_list).
2436
return isinstance(obj, klass_or_klass_list)
2440
def condition_id_in_list(id_list):
2441
"""Create a condition filter which verify that test's id in a list.
2443
:param id_list: A TestIdList object.
2444
:return: A callable that returns True if the test's id appears in the list.
2446
def condition(test):
2447
return id_list.includes(test.id())
2451
def condition_id_startswith(starts):
2452
"""Create a condition filter verifying that test's id starts with a string.
2454
:param starts: A list of string.
2455
:return: A callable that returns True if the test's id starts with one of
2458
def condition(test):
2459
for start in starts:
2460
if test.id().startswith(start):
2466
def exclude_tests_by_condition(suite, condition):
2467
"""Create a test suite which excludes some tests from suite.
2469
:param suite: The suite to get tests from.
2470
:param condition: A callable whose result evaluates True when called with a
2471
test case which should be excluded from the result.
2472
:return: A suite which contains the tests found in suite that fail
2476
for test in iter_suite_tests(suite):
2477
if not condition(test):
2479
return TestUtil.TestSuite(result)
2482
def filter_suite_by_condition(suite, condition):
2483
"""Create a test suite by filtering another one.
2485
:param suite: The source suite.
2486
:param condition: A callable whose result evaluates True when called with a
2487
test case which should be included in the result.
2488
:return: A suite which contains the tests found in suite that pass
2492
for test in iter_suite_tests(suite):
2495
return TestUtil.TestSuite(result)
1314
2498
def filter_suite_by_re(suite, pattern):
1315
result = TestUtil.TestSuite()
1316
filter_re = re.compile(pattern)
2499
"""Create a test suite by filtering another one.
2501
:param suite: the source suite
2502
:param pattern: pattern that names must match
2503
:returns: the newly created suite
2505
condition = condition_id_re(pattern)
2506
result_suite = filter_suite_by_condition(suite, condition)
2510
def filter_suite_by_id_list(suite, test_id_list):
2511
"""Create a test suite by filtering another one.
2513
:param suite: The source suite.
2514
:param test_id_list: A list of the test ids to keep as strings.
2515
:returns: the newly created suite
2517
condition = condition_id_in_list(test_id_list)
2518
result_suite = filter_suite_by_condition(suite, condition)
2522
def filter_suite_by_id_startswith(suite, start):
2523
"""Create a test suite by filtering another one.
2525
:param suite: The source suite.
2526
:param start: A list of string the test id must start with one of.
2527
:returns: the newly created suite
2529
condition = condition_id_startswith(start)
2530
result_suite = filter_suite_by_condition(suite, condition)
2534
def exclude_tests_by_re(suite, pattern):
2535
"""Create a test suite which excludes some tests from suite.
2537
:param suite: The suite to get tests from.
2538
:param pattern: A regular expression string. Test ids that match this
2539
pattern will be excluded from the result.
2540
:return: A TestSuite that contains all the tests from suite without the
2541
tests that matched pattern. The order of tests is the same as it was in
2544
return exclude_tests_by_condition(suite, condition_id_re(pattern))
2547
def preserve_input(something):
2548
"""A helper for performing test suite transformation chains.
2550
:param something: Anything you want to preserve.
2556
def randomize_suite(suite):
2557
"""Return a new TestSuite with suite's tests in random order.
2559
The tests in the input suite are flattened into a single suite in order to
2560
accomplish this. Any nested TestSuites are removed to provide global
2563
tests = list(iter_suite_tests(suite))
2564
random.shuffle(tests)
2565
return TestUtil.TestSuite(tests)
2568
def split_suite_by_condition(suite, condition):
2569
"""Split a test suite into two by a condition.
2571
:param suite: The suite to split.
2572
:param condition: The condition to match on. Tests that match this
2573
condition are returned in the first test suite, ones that do not match
2574
are in the second suite.
2575
:return: A tuple of two test suites, where the first contains tests from
2576
suite matching the condition, and the second contains the remainder
2577
from suite. The order within each output suite is the same as it was in
1317
2582
for test in iter_suite_tests(suite):
1318
if filter_re.search(test.id()):
1319
result.addTest(test)
2584
matched.append(test)
2586
did_not_match.append(test)
2587
return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
2590
def split_suite_by_re(suite, pattern):
2591
"""Split a test suite into two by a regular expression.
2593
:param suite: The suite to split.
2594
:param pattern: A regular expression string. Test ids that match this
2595
pattern will be in the first test suite returned, and the others in the
2596
second test suite returned.
2597
:return: A tuple of two test suites, where the first contains tests from
2598
suite matching pattern, and the second contains the remainder from
2599
suite. The order within each output suite is the same as it was in
2602
return split_suite_by_condition(suite, condition_id_re(pattern))
1323
2605
def run_suite(suite, name='test', verbose=False, pattern=".*",
1324
stop_on_failure=False, keep_output=False,
1325
transport=None, lsprof_timed=None, bench_history=None):
1326
TestCaseInTempDir._TEST_NAME = name
2606
stop_on_failure=False,
2607
transport=None, lsprof_timed=None, bench_history=None,
2608
matching_tests_first=None,
2611
exclude_pattern=None,
2614
"""Run a test suite for bzr selftest.
2616
:param runner_class: The class of runner to use. Must support the
2617
constructor arguments passed by run_suite which are more than standard
2619
:return: A boolean indicating success.
1327
2621
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
1333
pb = progress.ProgressBar()
1334
runner = TextTestRunner(stream=sys.stdout,
2626
if runner_class is None:
2627
runner_class = TextTestRunner
2628
runner = runner_class(stream=sys.stdout,
1335
2629
descriptions=0,
1336
2630
verbosity=verbosity,
1337
keep_output=keep_output,
1339
bench_history=bench_history)
2631
bench_history=bench_history,
2632
list_only=list_only,
1340
2634
runner.stop_on_failure=stop_on_failure
1342
suite = filter_suite_by_re(suite, pattern)
2635
# Initialise the random number generator and display the seed used.
2636
# We convert the seed to a long to make it reuseable across invocations.
2637
random_order = False
2638
if random_seed is not None:
2640
if random_seed == "now":
2641
random_seed = long(time.time())
2643
# Convert the seed to a long if we can
2645
random_seed = long(random_seed)
2648
runner.stream.writeln("Randomizing test order using seed %s\n" %
2650
random.seed(random_seed)
2651
# Customise the list of tests if requested
2652
if exclude_pattern is not None:
2653
suite = exclude_tests_by_re(suite, exclude_pattern)
2655
order_changer = randomize_suite
2657
order_changer = preserve_input
2658
if pattern != '.*' or random_order:
2659
if matching_tests_first:
2660
suites = map(order_changer, split_suite_by_re(suite, pattern))
2661
suite = TestUtil.TestSuite(suites)
2663
suite = order_changer(filter_suite_by_re(suite, pattern))
1343
2665
result = runner.run(suite)
2668
return result.wasStrictlySuccessful()
1344
2670
return result.wasSuccessful()
2673
# Controlled by "bzr selftest -E=..." option
2674
selftest_debug_flags = set()
1347
2677
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1349
2678
transport=None,
1350
2679
test_suite_factory=None,
1351
2680
lsprof_timed=None,
1352
bench_history=None):
2682
matching_tests_first=None,
2685
exclude_pattern=None,
1353
2692
"""Run the whole test suite under the enhanced runner"""
1354
2693
# XXX: Very ugly way to do this...
1355
2694
# Disable warning about old formats because we don't want it to disturb
1362
2701
transport = default_transport
1363
2702
old_transport = default_transport
1364
2703
default_transport = transport
2704
global selftest_debug_flags
2705
old_debug_flags = selftest_debug_flags
2706
if debug_flags is not None:
2707
selftest_debug_flags = set(debug_flags)
2709
if load_list is None:
2712
keep_only = load_test_id_list(load_list)
1366
2713
if test_suite_factory is None:
1367
suite = test_suite()
2714
suite = test_suite(keep_only, starting_with)
1369
2716
suite = test_suite_factory()
1370
2717
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
1371
stop_on_failure=stop_on_failure, keep_output=keep_output,
2718
stop_on_failure=stop_on_failure,
1372
2719
transport=transport,
1373
2720
lsprof_timed=lsprof_timed,
1374
bench_history=bench_history)
2721
bench_history=bench_history,
2722
matching_tests_first=matching_tests_first,
2723
list_only=list_only,
2724
random_seed=random_seed,
2725
exclude_pattern=exclude_pattern,
2727
runner_class=runner_class,
1376
2730
default_transport = old_transport
2731
selftest_debug_flags = old_debug_flags
2734
def load_test_id_list(file_name):
2735
"""Load a test id list from a text file.
2737
The format is one test id by line. No special care is taken to impose
2738
strict rules, these test ids are used to filter the test suite so a test id
2739
that do not match an existing test will do no harm. This allows user to add
2740
comments, leave blank lines, etc.
2744
ftest = open(file_name, 'rt')
2746
if e.errno != errno.ENOENT:
2749
raise errors.NoSuchFile(file_name)
2751
for test_name in ftest.readlines():
2752
test_list.append(test_name.strip())
2757
def suite_matches_id_list(test_suite, id_list):
2758
"""Warns about tests not appearing or appearing more than once.
2760
:param test_suite: A TestSuite object.
2761
:param test_id_list: The list of test ids that should be found in
2764
:return: (absents, duplicates) absents is a list containing the test found
2765
in id_list but not in test_suite, duplicates is a list containing the
2766
test found multiple times in test_suite.
2768
When using a prefined test id list, it may occurs that some tests do not
2769
exist anymore or that some tests use the same id. This function warns the
2770
tester about potential problems in his workflow (test lists are volatile)
2771
or in the test suite itself (using the same id for several tests does not
2772
help to localize defects).
2774
# Build a dict counting id occurrences
2776
for test in iter_suite_tests(test_suite):
2778
tests[id] = tests.get(id, 0) + 1
2783
occurs = tests.get(id, 0)
2785
not_found.append(id)
2787
duplicates.append(id)
2789
return not_found, duplicates
2792
class TestIdList(object):
2793
"""Test id list to filter a test suite.
2795
Relying on the assumption that test ids are built as:
2796
<module>[.<class>.<method>][(<param>+)], <module> being in python dotted
2797
notation, this class offers methods to :
2798
- avoid building a test suite for modules not refered to in the test list,
2799
- keep only the tests listed from the module test suite.
2802
def __init__(self, test_id_list):
2803
# When a test suite needs to be filtered against us we compare test ids
2804
# for equality, so a simple dict offers a quick and simple solution.
2805
self.tests = dict().fromkeys(test_id_list, True)
2807
# While unittest.TestCase have ids like:
2808
# <module>.<class>.<method>[(<param+)],
2809
# doctest.DocTestCase can have ids like:
2812
# <module>.<function>
2813
# <module>.<class>.<method>
2815
# Since we can't predict a test class from its name only, we settle on
2816
# a simple constraint: a test id always begins with its module name.
2819
for test_id in test_id_list:
2820
parts = test_id.split('.')
2821
mod_name = parts.pop(0)
2822
modules[mod_name] = True
2824
mod_name += '.' + part
2825
modules[mod_name] = True
2826
self.modules = modules
2828
def refers_to(self, module_name):
2829
"""Is there tests for the module or one of its sub modules."""
2830
return self.modules.has_key(module_name)
2832
def includes(self, test_id):
2833
return self.tests.has_key(test_id)
2836
class TestPrefixAliasRegistry(registry.Registry):
2837
"""A registry for test prefix aliases.
2839
This helps implement shorcuts for the --starting-with selftest
2840
option. Overriding existing prefixes is not allowed but not fatal (a
2841
warning will be emitted).
2844
def register(self, key, obj, help=None, info=None,
2845
override_existing=False):
2846
"""See Registry.register.
2848
Trying to override an existing alias causes a warning to be emitted,
2849
not a fatal execption.
2852
super(TestPrefixAliasRegistry, self).register(
2853
key, obj, help=help, info=info, override_existing=False)
2855
actual = self.get(key)
2856
note('Test prefix alias %s is already used for %s, ignoring %s'
2857
% (key, actual, obj))
2859
def resolve_alias(self, id_start):
2860
"""Replace the alias by the prefix in the given string.
2862
Using an unknown prefix is an error to help catching typos.
2864
parts = id_start.split('.')
2866
parts[0] = self.get(parts[0])
2868
raise errors.BzrCommandError(
2869
'%s is not a known test prefix alias' % parts[0])
2870
return '.'.join(parts)
2873
test_prefix_alias_registry = TestPrefixAliasRegistry()
2874
"""Registry of test prefix aliases."""
2877
# This alias allows to detect typos ('bzrlin.') by making all valid test ids
2878
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
2879
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
2881
# Obvious higest levels prefixes, feel free to add your own via a plugin
2882
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
2883
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
2884
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
2885
test_prefix_alias_registry.register('bb', 'bzrlib.tests.blackbox')
2886
test_prefix_alias_registry.register('bp', 'bzrlib.plugins')
2889
def test_suite(keep_only=None, starting_with=None):
1380
2890
"""Build and return TestSuite for the whole of bzrlib.
2892
:param keep_only: A list of test ids limiting the suite returned.
2894
:param starting_with: An id limiting the suite returned to the tests
1382
2897
This function can be replaced if you need to change the default test
1383
2898
suite on a global basis, but it is not encouraged.
1385
2900
testmod_names = [
2902
'bzrlib.tests.blackbox',
2903
'bzrlib.tests.branch_implementations',
2904
'bzrlib.tests.bzrdir_implementations',
2905
'bzrlib.tests.commands',
2906
'bzrlib.tests.interrepository_implementations',
2907
'bzrlib.tests.intertree_implementations',
2908
'bzrlib.tests.inventory_implementations',
2909
'bzrlib.tests.per_interbranch',
2910
'bzrlib.tests.per_lock',
2911
'bzrlib.tests.per_repository',
2912
'bzrlib.tests.per_repository_reference',
2913
'bzrlib.tests.test__dirstate_helpers',
2914
'bzrlib.tests.test__walkdirs_win32',
1386
2915
'bzrlib.tests.test_ancestry',
2916
'bzrlib.tests.test_annotate',
1387
2917
'bzrlib.tests.test_api',
1388
2918
'bzrlib.tests.test_atomicfile',
1389
2919
'bzrlib.tests.test_bad_files',
2920
'bzrlib.tests.test_bisect_multi',
1390
2921
'bzrlib.tests.test_branch',
2922
'bzrlib.tests.test_branchbuilder',
2923
'bzrlib.tests.test_btree_index',
2924
'bzrlib.tests.test_bugtracker',
1391
2925
'bzrlib.tests.test_bundle',
1392
2926
'bzrlib.tests.test_bzrdir',
1393
2927
'bzrlib.tests.test_cache_utf8',
1394
'bzrlib.tests.test_command',
2928
'bzrlib.tests.test_clean_tree',
2929
'bzrlib.tests.test_chunk_writer',
2930
'bzrlib.tests.test__chunks_to_lines',
2931
'bzrlib.tests.test_commands',
1395
2932
'bzrlib.tests.test_commit',
1396
2933
'bzrlib.tests.test_commit_merge',
1397
2934
'bzrlib.tests.test_config',
1398
2935
'bzrlib.tests.test_conflicts',
2936
'bzrlib.tests.test_counted_lock',
1399
2937
'bzrlib.tests.test_decorators',
2938
'bzrlib.tests.test_delta',
2939
'bzrlib.tests.test_debug',
2940
'bzrlib.tests.test_deprecated_graph',
1400
2941
'bzrlib.tests.test_diff',
1401
'bzrlib.tests.test_doc_generate',
2942
'bzrlib.tests.test_directory_service',
2943
'bzrlib.tests.test_dirstate',
2944
'bzrlib.tests.test_email_message',
1402
2945
'bzrlib.tests.test_errors',
1403
'bzrlib.tests.test_escaped_store',
2946
'bzrlib.tests.test_export',
2947
'bzrlib.tests.test_extract',
1404
2948
'bzrlib.tests.test_fetch',
2949
'bzrlib.tests.test_fifo_cache',
2950
'bzrlib.tests.test_ftp_transport',
2951
'bzrlib.tests.test_foreign',
2952
'bzrlib.tests.test_generate_docs',
2953
'bzrlib.tests.test_generate_ids',
2954
'bzrlib.tests.test_globbing',
1405
2955
'bzrlib.tests.test_gpg',
1406
2956
'bzrlib.tests.test_graph',
1407
2957
'bzrlib.tests.test_hashcache',
2958
'bzrlib.tests.test_help',
2959
'bzrlib.tests.test_hooks',
1408
2960
'bzrlib.tests.test_http',
2961
'bzrlib.tests.test_http_implementations',
1409
2962
'bzrlib.tests.test_http_response',
2963
'bzrlib.tests.test_https_ca_bundle',
1410
2964
'bzrlib.tests.test_identitymap',
1411
2965
'bzrlib.tests.test_ignores',
2966
'bzrlib.tests.test_index',
2967
'bzrlib.tests.test_info',
1412
2968
'bzrlib.tests.test_inv',
1413
2969
'bzrlib.tests.test_knit',
1414
2970
'bzrlib.tests.test_lazy_import',
2971
'bzrlib.tests.test_lazy_regex',
2972
'bzrlib.tests.test_lockable_files',
1415
2973
'bzrlib.tests.test_lockdir',
1416
'bzrlib.tests.test_lockable_files',
1417
2974
'bzrlib.tests.test_log',
2975
'bzrlib.tests.test_lru_cache',
2976
'bzrlib.tests.test_lsprof',
2977
'bzrlib.tests.test_mail_client',
2978
'bzrlib.tests.test_memorytree',
1418
2979
'bzrlib.tests.test_merge',
1419
2980
'bzrlib.tests.test_merge3',
1420
2981
'bzrlib.tests.test_merge_core',
2982
'bzrlib.tests.test_merge_directive',
1421
2983
'bzrlib.tests.test_missing',
1422
2984
'bzrlib.tests.test_msgeditor',
2985
'bzrlib.tests.test_multiparent',
2986
'bzrlib.tests.test_mutabletree',
1423
2987
'bzrlib.tests.test_nonascii',
1424
2988
'bzrlib.tests.test_options',
1425
2989
'bzrlib.tests.test_osutils',
2990
'bzrlib.tests.test_osutils_encodings',
2991
'bzrlib.tests.test_pack',
2992
'bzrlib.tests.test_pack_repository',
1426
2993
'bzrlib.tests.test_patch',
1427
2994
'bzrlib.tests.test_patches',
1428
2995
'bzrlib.tests.test_permissions',
1429
2996
'bzrlib.tests.test_plugins',
1430
2997
'bzrlib.tests.test_progress',
2998
'bzrlib.tests.test_read_bundle',
1431
2999
'bzrlib.tests.test_reconcile',
3000
'bzrlib.tests.test_reconfigure',
3001
'bzrlib.tests.test_registry',
3002
'bzrlib.tests.test_remote',
1432
3003
'bzrlib.tests.test_repository',
1433
3004
'bzrlib.tests.test_revert',
1434
3005
'bzrlib.tests.test_revision',
1435
'bzrlib.tests.test_revisionnamespaces',
3006
'bzrlib.tests.test_revisionspec',
1436
3007
'bzrlib.tests.test_revisiontree',
1437
3008
'bzrlib.tests.test_rio',
3009
'bzrlib.tests.test_rules',
1438
3010
'bzrlib.tests.test_sampler',
1439
3011
'bzrlib.tests.test_selftest',
1440
3012
'bzrlib.tests.test_setup',
1441
3013
'bzrlib.tests.test_sftp_transport',
1442
'bzrlib.tests.test_ftp_transport',
3014
'bzrlib.tests.test_shelf',
3015
'bzrlib.tests.test_shelf_ui',
3016
'bzrlib.tests.test_smart',
1443
3017
'bzrlib.tests.test_smart_add',
3018
'bzrlib.tests.test_smart_request',
3019
'bzrlib.tests.test_smart_transport',
3020
'bzrlib.tests.test_smtp_connection',
1444
3021
'bzrlib.tests.test_source',
3022
'bzrlib.tests.test_ssh_transport',
1445
3023
'bzrlib.tests.test_status',
1446
3024
'bzrlib.tests.test_store',
3025
'bzrlib.tests.test_strace',
3026
'bzrlib.tests.test_subsume',
3027
'bzrlib.tests.test_switch',
1447
3028
'bzrlib.tests.test_symbol_versioning',
3029
'bzrlib.tests.test_tag',
1448
3030
'bzrlib.tests.test_testament',
1449
3031
'bzrlib.tests.test_textfile',
1450
3032
'bzrlib.tests.test_textmerge',
3033
'bzrlib.tests.test_timestamp',
1451
3034
'bzrlib.tests.test_trace',
1452
3035
'bzrlib.tests.test_transactions',
1453
3036
'bzrlib.tests.test_transform',
1454
3037
'bzrlib.tests.test_transport',
3038
'bzrlib.tests.test_transport_implementations',
3039
'bzrlib.tests.test_transport_log',
1455
3040
'bzrlib.tests.test_tree',
3041
'bzrlib.tests.test_treebuilder',
1456
3042
'bzrlib.tests.test_tsort',
1457
3043
'bzrlib.tests.test_tuned_gzip',
1458
3044
'bzrlib.tests.test_ui',
3045
'bzrlib.tests.test_uncommit',
1459
3046
'bzrlib.tests.test_upgrade',
3047
'bzrlib.tests.test_upgrade_stacked',
1460
3048
'bzrlib.tests.test_urlutils',
3049
'bzrlib.tests.test_version',
3050
'bzrlib.tests.test_version_info',
1461
3051
'bzrlib.tests.test_versionedfile',
1462
'bzrlib.tests.test_version',
1463
3052
'bzrlib.tests.test_weave',
1464
3053
'bzrlib.tests.test_whitebox',
3054
'bzrlib.tests.test_win32utils',
1465
3055
'bzrlib.tests.test_workingtree',
3056
'bzrlib.tests.test_workingtree_4',
3057
'bzrlib.tests.test_wsgi',
1466
3058
'bzrlib.tests.test_xml',
3059
'bzrlib.tests.tree_implementations',
3060
'bzrlib.tests.workingtree_implementations',
3061
'bzrlib.util.tests.test_bencode',
1468
test_transport_implementations = [
1469
'bzrlib.tests.test_transport_implementations',
1470
'bzrlib.tests.test_read_bundle',
1472
suite = TestUtil.TestSuite()
1473
3064
loader = TestUtil.TestLoader()
3067
starting_with = [test_prefix_alias_registry.resolve_alias(start)
3068
for start in starting_with]
3069
# We take precedence over keep_only because *at loading time* using
3070
# both options means we will load less tests for the same final result.
3071
def interesting_module(name):
3072
for start in starting_with:
3074
# Either the module name starts with the specified string
3075
name.startswith(start)
3076
# or it may contain tests starting with the specified string
3077
or start.startswith(name)
3081
loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
3083
elif keep_only is not None:
3084
id_filter = TestIdList(keep_only)
3085
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3086
def interesting_module(name):
3087
return id_filter.refers_to(name)
3090
loader = TestUtil.TestLoader()
3091
def interesting_module(name):
3092
# No filtering, all modules are interesting
3095
suite = loader.suiteClass()
3097
# modules building their suite with loadTestsFromModuleNames
1474
3098
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
1475
from bzrlib.transport import TransportTestProviderAdapter
1476
adapter = TransportTestProviderAdapter()
1477
adapt_modules(test_transport_implementations, adapter, loader, suite)
1478
for package in packages_to_test():
1479
suite.addTest(package.test_suite())
1480
for m in MODULES_TO_TEST:
1481
suite.addTest(loader.loadTestsFromModule(m))
1482
for m in MODULES_TO_DOCTEST:
3100
modules_to_doctest = [
3102
'bzrlib.branchbuilder',
3105
'bzrlib.iterablefile',
3109
'bzrlib.symbol_versioning',
3112
'bzrlib.version_info_formats.format_custom',
3115
for mod in modules_to_doctest:
3116
if not interesting_module(mod):
3117
# No tests to keep here, move along
1484
suite.addTest(doctest.DocTestSuite(m))
3120
# note that this really does mean "report only" -- doctest
3121
# still runs the rest of the examples
3122
doc_suite = doctest.DocTestSuite(mod,
3123
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
1485
3124
except ValueError, e:
1486
print '**failed to get doctest for: %s\n%s' %(m,e)
3125
print '**failed to get doctest for: %s\n%s' % (mod, e)
1488
for name, plugin in bzrlib.plugin.all_plugins().items():
1489
if getattr(plugin, 'test_suite', None) is not None:
1490
suite.addTest(plugin.test_suite())
3127
if len(doc_suite._tests) == 0:
3128
raise errors.BzrError("no doctests found in %s" % (mod,))
3129
suite.addTest(doc_suite)
3131
default_encoding = sys.getdefaultencoding()
3132
for name, plugin in bzrlib.plugin.plugins().items():
3133
if not interesting_module(plugin.module.__name__):
3135
plugin_suite = plugin.test_suite()
3136
# We used to catch ImportError here and turn it into just a warning,
3137
# but really if you don't have --no-plugins this should be a failure.
3138
# mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
3139
if plugin_suite is None:
3140
plugin_suite = plugin.load_plugin_tests(loader)
3141
if plugin_suite is not None:
3142
suite.addTest(plugin_suite)
3143
if default_encoding != sys.getdefaultencoding():
3144
bzrlib.trace.warning(
3145
'Plugin "%s" tried to reset default encoding to: %s', name,
3146
sys.getdefaultencoding())
3148
sys.setdefaultencoding(default_encoding)
3151
suite = filter_suite_by_id_startswith(suite, starting_with)
3153
if keep_only is not None:
3154
# Now that the referred modules have loaded their tests, keep only the
3156
suite = filter_suite_by_id_list(suite, id_filter)
3157
# Do some sanity checks on the id_list filtering
3158
not_found, duplicates = suite_matches_id_list(suite, keep_only)
3160
# The tester has used both keep_only and starting_with, so he is
3161
# already aware that some tests are excluded from the list, there
3162
# is no need to tell him which.
3165
# Some tests mentioned in the list are not in the test suite. The
3166
# list may be out of date, report to the tester.
3167
for id in not_found:
3168
bzrlib.trace.warning('"%s" not found in the test suite', id)
3169
for id in duplicates:
3170
bzrlib.trace.warning('"%s" is used as an id by several tests', id)
1494
def adapt_modules(mods_list, adapter, loader, suite):
1495
"""Adapt the modules in mods_list using adapter and add to suite."""
1496
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1497
suite.addTests(adapter.adapt(test))
3175
def multiply_scenarios(scenarios_left, scenarios_right):
3176
"""Multiply two sets of scenarios.
3178
:returns: the cartesian product of the two sets of scenarios, that is
3179
a scenario for every possible combination of a left scenario and a
3183
('%s,%s' % (left_name, right_name),
3184
dict(left_dict.items() + right_dict.items()))
3185
for left_name, left_dict in scenarios_left
3186
for right_name, right_dict in scenarios_right]
3189
def multiply_tests(tests, scenarios, result):
3190
"""Multiply tests_list by scenarios into result.
3192
This is the core workhorse for test parameterisation.
3194
Typically the load_tests() method for a per-implementation test suite will
3195
call multiply_tests and return the result.
3197
:param tests: The tests to parameterise.
3198
:param scenarios: The scenarios to apply: pairs of (scenario_name,
3199
scenario_param_dict).
3200
:param result: A TestSuite to add created tests to.
3202
This returns the passed in result TestSuite with the cross product of all
3203
the tests repeated once for each scenario. Each test is adapted by adding
3204
the scenario name at the end of its id(), and updating the test object's
3205
__dict__ with the scenario_param_dict.
3207
>>> r = multiply_tests(
3208
... bzrlib.tests.test_sampler.DemoTest('test_nothing'),
3209
... [('one', dict(param=1)),
3210
... ('two', dict(param=2))],
3212
>>> tests = list(iter_suite_tests(r))
3216
'bzrlib.tests.test_sampler.DemoTest.test_nothing(one)'
3222
for test in iter_suite_tests(tests):
3223
apply_scenarios(test, scenarios, result)
3227
def apply_scenarios(test, scenarios, result):
3228
"""Apply the scenarios in scenarios to test and add to result.
3230
:param test: The test to apply scenarios to.
3231
:param scenarios: An iterable of scenarios to apply to test.
3233
:seealso: apply_scenario
3235
for scenario in scenarios:
3236
result.addTest(apply_scenario(test, scenario))
3240
def apply_scenario(test, scenario):
3241
"""Copy test and apply scenario to it.
3243
:param test: A test to adapt.
3244
:param scenario: A tuple describing the scenarion.
3245
The first element of the tuple is the new test id.
3246
The second element is a dict containing attributes to set on the
3248
:return: The adapted test.
3250
new_id = "%s(%s)" % (test.id(), scenario[0])
3251
new_test = clone_test(test, new_id)
3252
for name, value in scenario[1].items():
3253
setattr(new_test, name, value)
3257
def clone_test(test, new_id):
3258
"""Clone a test giving it a new id.
3260
:param test: The test to clone.
3261
:param new_id: The id to assign to it.
3262
:return: The new test.
3264
from copy import deepcopy
3265
new_test = deepcopy(test)
3266
new_test.id = lambda: new_id
3270
def _rmtree_temp_dir(dirname):
3271
# If LANG=C we probably have created some bogus paths
3272
# which rmtree(unicode) will fail to delete
3273
# so make sure we are using rmtree(str) to delete everything
3274
# except on win32, where rmtree(str) will fail
3275
# since it doesn't have the property of byte-stream paths
3276
# (they are either ascii or mbcs)
3277
if sys.platform == 'win32':
3278
# make sure we are using the unicode win32 api
3279
dirname = unicode(dirname)
3281
dirname = dirname.encode(sys.getfilesystemencoding())
3283
osutils.rmtree(dirname)
3285
if sys.platform == 'win32' and e.errno == errno.EACCES:
3286
sys.stderr.write(('Permission denied: '
3287
'unable to remove testing dir '
3288
'%s\n' % os.path.basename(dirname)))
3293
class Feature(object):
3294
"""An operating system Feature."""
3297
self._available = None
3299
def available(self):
3300
"""Is the feature available?
3302
:return: True if the feature is available.
3304
if self._available is None:
3305
self._available = self._probe()
3306
return self._available
3309
"""Implement this method in concrete features.
3311
:return: True if the feature is available.
3313
raise NotImplementedError
3316
if getattr(self, 'feature_name', None):
3317
return self.feature_name()
3318
return self.__class__.__name__
3321
class _SymlinkFeature(Feature):
3324
return osutils.has_symlinks()
3326
def feature_name(self):
3329
SymlinkFeature = _SymlinkFeature()
3332
class _HardlinkFeature(Feature):
3335
return osutils.has_hardlinks()
3337
def feature_name(self):
3340
HardlinkFeature = _HardlinkFeature()
3343
class _OsFifoFeature(Feature):
3346
return getattr(os, 'mkfifo', None)
3348
def feature_name(self):
3349
return 'filesystem fifos'
3351
OsFifoFeature = _OsFifoFeature()
3354
class _UnicodeFilenameFeature(Feature):
3355
"""Does the filesystem support Unicode filenames?"""
3359
# Check for character combinations unlikely to be covered by any
3360
# single non-unicode encoding. We use the characters
3361
# - greek small letter alpha (U+03B1) and
3362
# - braille pattern dots-123456 (U+283F).
3363
os.stat(u'\u03b1\u283f')
3364
except UnicodeEncodeError:
3366
except (IOError, OSError):
3367
# The filesystem allows the Unicode filename but the file doesn't
3371
# The filesystem allows the Unicode filename and the file exists,
3375
UnicodeFilenameFeature = _UnicodeFilenameFeature()
3378
def probe_unicode_in_user_encoding():
3379
"""Try to encode several unicode strings to use in unicode-aware tests.
3380
Return first successfull match.
3382
:return: (unicode value, encoded plain string value) or (None, None)
3384
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
3385
for uni_val in possible_vals:
3387
str_val = uni_val.encode(osutils.get_user_encoding())
3388
except UnicodeEncodeError:
3389
# Try a different character
3392
return uni_val, str_val
3396
def probe_bad_non_ascii(encoding):
3397
"""Try to find [bad] character with code [128..255]
3398
that cannot be decoded to unicode in some encoding.
3399
Return None if all non-ascii characters is valid
3402
for i in xrange(128, 256):
3405
char.decode(encoding)
3406
except UnicodeDecodeError:
3411
class _FTPServerFeature(Feature):
3412
"""Some tests want an FTP Server, check if one is available.
3414
Right now, the only way this is available is if 'medusa' is installed.
3415
http://www.amk.ca/python/code/medusa.html
3420
import bzrlib.tests.ftp_server
3425
def feature_name(self):
3429
FTPServerFeature = _FTPServerFeature()
3432
class _HTTPSServerFeature(Feature):
3433
"""Some tests want an https Server, check if one is available.
3435
Right now, the only way this is available is under python2.6 which provides
3446
def feature_name(self):
3447
return 'HTTPSServer'
3450
HTTPSServerFeature = _HTTPSServerFeature()
3453
class _UnicodeFilename(Feature):
3454
"""Does the filesystem support Unicode filenames?"""
3459
except UnicodeEncodeError:
3461
except (IOError, OSError):
3462
# The filesystem allows the Unicode filename but the file doesn't
3466
# The filesystem allows the Unicode filename and the file exists,
3470
UnicodeFilename = _UnicodeFilename()
3473
class _UTF8Filesystem(Feature):
3474
"""Is the filesystem UTF-8?"""
3477
if osutils._fs_enc.upper() in ('UTF-8', 'UTF8'):
3481
UTF8Filesystem = _UTF8Filesystem()
3484
class _CaseInsCasePresFilenameFeature(Feature):
3485
"""Is the file-system case insensitive, but case-preserving?"""
3488
fileno, name = tempfile.mkstemp(prefix='MixedCase')
3490
# first check truly case-preserving for created files, then check
3491
# case insensitive when opening existing files.
3492
name = osutils.normpath(name)
3493
base, rel = osutils.split(name)
3494
found_rel = osutils.canonical_relpath(base, name)
3495
return (found_rel == rel
3496
and os.path.isfile(name.upper())
3497
and os.path.isfile(name.lower()))
3502
def feature_name(self):
3503
return "case-insensitive case-preserving filesystem"
3505
CaseInsCasePresFilenameFeature = _CaseInsCasePresFilenameFeature()
3508
class _CaseInsensitiveFilesystemFeature(Feature):
3509
"""Check if underlying filesystem is case-insensitive but *not* case
3512
# Note that on Windows, Cygwin, MacOS etc, the file-systems are far
3513
# more likely to be case preserving, so this case is rare.
3516
if CaseInsCasePresFilenameFeature.available():
3519
if TestCaseWithMemoryTransport.TEST_ROOT is None:
3520
root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
3521
TestCaseWithMemoryTransport.TEST_ROOT = root
3523
root = TestCaseWithMemoryTransport.TEST_ROOT
3524
tdir = osutils.mkdtemp(prefix='case-sensitive-probe-', suffix='',
3526
name_a = osutils.pathjoin(tdir, 'a')
3527
name_A = osutils.pathjoin(tdir, 'A')
3529
result = osutils.isdir(name_A)
3530
_rmtree_temp_dir(tdir)
3533
def feature_name(self):
3534
return 'case-insensitive filesystem'
3536
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()