128
117
import bzrlib.tests.blackbox
129
118
import bzrlib.tests.branch_implementations
130
119
import bzrlib.tests.bzrdir_implementations
131
import bzrlib.tests.commands
132
120
import bzrlib.tests.interrepository_implementations
133
121
import bzrlib.tests.interversionedfile_implementations
134
122
import bzrlib.tests.intertree_implementations
135
import bzrlib.tests.per_lock
136
123
import bzrlib.tests.repository_implementations
137
124
import bzrlib.tests.revisionstore_implementations
138
125
import bzrlib.tests.tree_implementations
142
129
bzrlib.tests.blackbox,
143
130
bzrlib.tests.branch_implementations,
144
131
bzrlib.tests.bzrdir_implementations,
145
bzrlib.tests.commands,
146
132
bzrlib.tests.interrepository_implementations,
147
133
bzrlib.tests.interversionedfile_implementations,
148
134
bzrlib.tests.intertree_implementations,
149
bzrlib.tests.per_lock,
150
135
bzrlib.tests.repository_implementations,
151
136
bzrlib.tests.revisionstore_implementations,
152
137
bzrlib.tests.tree_implementations,
236
219
"""Record that a test has started."""
237
220
self._start_time = time.time()
239
def _cleanupLogFile(self, test):
240
# We can only do this if we have one of our TestCases, not if
242
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
243
if setKeepLogfile is not None:
246
222
def addError(self, test, err):
247
self.extractBenchmarkTime(test)
248
self._cleanupLogFile(test)
249
223
if isinstance(err[1], TestSkipped):
250
return self.addSkipped(test, err)
251
elif isinstance(err[1], UnavailableFeature):
252
return self.addNotSupported(test, err[1].args[0])
224
return self.addSkipped(test, err)
253
225
unittest.TestResult.addError(self, test, err)
254
self.error_count += 1
226
# We can only do this if we have one of our TestCases, not if
228
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
229
if setKeepLogfile is not None:
231
self.extractBenchmarkTime(test)
255
232
self.report_error(test, err)
256
233
if self.stop_early:
259
236
def addFailure(self, test, err):
260
self._cleanupLogFile(test)
261
self.extractBenchmarkTime(test)
262
if isinstance(err[1], KnownFailure):
263
return self.addKnownFailure(test, err)
264
237
unittest.TestResult.addFailure(self, test, err)
265
self.failure_count += 1
238
# We can only do this if we have one of our TestCases, not if
240
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
241
if setKeepLogfile is not None:
243
self.extractBenchmarkTime(test)
266
244
self.report_failure(test, err)
267
245
if self.stop_early:
270
def addKnownFailure(self, test, err):
271
self.known_failure_count += 1
272
self.report_known_failure(test, err)
274
def addNotSupported(self, test, feature):
275
self.unsupported.setdefault(str(feature), 0)
276
self.unsupported[str(feature)] += 1
277
self.report_unsupported(test, feature)
279
248
def addSuccess(self, test):
280
249
self.extractBenchmarkTime(test)
281
250
if self._bench_history is not None:
327
296
class TextTestResult(ExtendedTestResult):
328
297
"""Displays progress and results of tests in text form"""
330
def __init__(self, stream, descriptions, verbosity,
335
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
336
bench_history, num_tests)
338
self.pb = self.ui.nested_progress_bar()
339
self._supplied_pb = False
342
self._supplied_pb = True
299
def __init__(self, *args, **kw):
300
ExtendedTestResult.__init__(self, *args, **kw)
301
self.pb = self.ui.nested_progress_bar()
343
302
self.pb.show_pct = False
344
303
self.pb.show_spinner = False
345
self.pb.show_eta = False,
304
self.pb.show_eta = False,
346
305
self.pb.show_count = False
347
306
self.pb.show_bar = False
375
330
+ self._shortened_test_description(test))
377
332
def _test_description(self, test):
378
return self._shortened_test_description(test)
334
return '#%d %s' % (self.count,
335
self._shortened_test_description(test))
337
return self._shortened_test_description(test)
380
339
def report_error(self, test, err):
340
self.error_count += 1
381
341
self.pb.note('ERROR: %s\n %s\n',
382
342
self._test_description(test),
386
346
def report_failure(self, test, err):
347
self.failure_count += 1
387
348
self.pb.note('FAIL: %s\n %s\n',
388
349
self._test_description(test),
392
def report_known_failure(self, test, err):
393
self.pb.note('XFAIL: %s\n%s\n',
394
self._test_description(test), err[1])
396
353
def report_skip(self, test, skip_excinfo):
397
354
self.skip_count += 1
440
393
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
441
394
# numbers, plus a trailing blank
442
395
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
443
self.stream.write(self._ellipsize_to_right(name,
444
osutils.terminal_width()-30))
397
self.stream.write('%5d ' % self.count)
398
self.stream.write(self._ellipsize_to_right(name,
399
osutils.terminal_width()-36))
401
self.stream.write(self._ellipsize_to_right(name,
402
osutils.terminal_width()-30))
445
403
self.stream.flush()
447
405
def _error_summary(self, err):
449
409
return '%s%s' % (indent, err[1])
451
411
def report_error(self, test, err):
412
self.error_count += 1
452
413
self.stream.writeln('ERROR %s\n%s'
453
414
% (self._testTimeString(),
454
415
self._error_summary(err)))
456
417
def report_failure(self, test, err):
418
self.failure_count += 1
457
419
self.stream.writeln(' FAIL %s\n%s'
458
420
% (self._testTimeString(),
459
421
self._error_summary(err)))
461
def report_known_failure(self, test, err):
462
self.stream.writeln('XFAIL %s\n%s'
463
% (self._testTimeString(),
464
self._error_summary(err)))
466
423
def report_success(self, test):
467
424
self.stream.writeln(' OK %s' % self._testTimeString())
468
425
for bench_called, stats in getattr(test, '_benchcalls', []):
469
426
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
470
427
stats.pprint(file=self.stream)
471
# flush the stream so that we get smooth output. This verbose mode is
472
# used to show the output in PQM.
473
428
self.stream.flush()
475
430
def report_skip(self, test, skip_excinfo):
517
465
result.stop_early = self.stop_on_failure
518
466
result.report_starting()
520
if self.verbosity >= 2:
521
self.stream.writeln("Listing tests only ...\n")
523
for t in iter_suite_tests(test):
524
self.stream.writeln("%s" % (t.id()))
526
actionTaken = "Listed"
529
run = result.testsRun
531
468
stopTime = time.time()
532
469
timeTaken = stopTime - startTime
533
470
result.printErrors()
534
471
self.stream.writeln(result.separator2)
535
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
536
run, run != 1 and "s" or "", timeTaken))
472
run = result.testsRun
473
self.stream.writeln("Ran %d test%s in %.3fs" %
474
(run, run != 1 and "s" or "", timeTaken))
537
475
self.stream.writeln()
538
476
if not result.wasSuccessful():
539
477
self.stream.write("FAILED (")
544
482
if failed: self.stream.write(", ")
545
483
self.stream.write("errors=%d" % errored)
546
if result.known_failure_count:
547
if failed or errored: self.stream.write(", ")
548
self.stream.write("known_failure_count=%d" %
549
result.known_failure_count)
550
484
self.stream.writeln(")")
552
if result.known_failure_count:
553
self.stream.writeln("OK (known_failures=%d)" %
554
result.known_failure_count)
556
self.stream.writeln("OK")
486
self.stream.writeln("OK")
557
487
if result.skip_count > 0:
558
488
skipped = result.skip_count
559
489
self.stream.writeln('%d test%s skipped' %
560
490
(skipped, skipped != 1 and "s" or ""))
561
if result.unsupported:
562
for feature, count in sorted(result.unsupported.items()):
563
self.stream.writeln("Missing feature '%s' skipped %d tests." %
491
result.report_cleaning_up()
492
# This is still a little bogus,
493
# but only a little. Folk not using our testrunner will
494
# have to delete their temp directories themselves.
495
test_root = TestCaseWithMemoryTransport.TEST_ROOT
496
if result.wasSuccessful() or not self.keep_output:
497
if test_root is not None:
498
# If LANG=C we probably have created some bogus paths
499
# which rmtree(unicode) will fail to delete
500
# so make sure we are using rmtree(str) to delete everything
501
# except on win32, where rmtree(str) will fail
502
# since it doesn't have the property of byte-stream paths
503
# (they are either ascii or mbcs)
504
if sys.platform == 'win32':
505
# make sure we are using the unicode win32 api
506
test_root = unicode(test_root)
508
test_root = test_root.encode(
509
sys.getfilesystemencoding())
511
osutils.rmtree(test_root)
513
if sys.platform == 'win32' and e.errno == errno.EACCES:
514
print >>sys.stderr, ('Permission denied: '
515
'unable to remove testing dir '
516
'%s' % os.path.basename(test_root))
520
note("Failed tests working directories are in '%s'\n", test_root)
521
TestCaseWithMemoryTransport.TEST_ROOT = None
565
522
result.finished()
583
540
"""Indicates that a test was intentionally skipped, rather than failing."""
586
class KnownFailure(AssertionError):
587
"""Indicates that a test failed in a precisely expected manner.
589
Such failures dont block the whole test suite from passing because they are
590
indicators of partially completed code or of future work. We have an
591
explicit error for them so that we can ensure that they are always visible:
592
KnownFailures are always shown in the output of bzr selftest.
596
class UnavailableFeature(Exception):
597
"""A feature required for this test was not available.
599
The feature should be used to construct the exception.
603
543
class CommandFailed(Exception):
734
674
self._startLogFile()
735
675
self._benchcalls = []
736
676
self._benchtime = None
738
self._clear_debug_flags()
740
def _clear_debug_flags(self):
741
"""Prevent externally set debug flags affecting tests.
743
Tests that want to use debug flags can just set them in the
744
debug_flags set during setup/teardown.
746
self._preserved_debug_flags = set(debug.debug_flags)
747
debug.debug_flags.clear()
748
self.addCleanup(self._restore_debug_flags)
750
def _clear_hooks(self):
751
677
# prevent hooks affecting tests
753
import bzrlib.smart.server
754
self._preserved_hooks = {
755
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
756
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
678
self._preserved_hooks = bzrlib.branch.Branch.hooks
758
679
self.addCleanup(self._restoreHooks)
759
# reset all hooks to an empty instance of the appropriate type
680
# this list of hooks must be kept in sync with the defaults
760
682
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
761
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
763
684
def _silenceUI(self):
764
685
"""Turn off UI for duration of test"""
785
706
return ''.join(difflines)
787
708
def assertEqual(self, a, b, message=''):
791
except UnicodeError, e:
792
# If we can't compare without getting a UnicodeError, then
793
# obviously they are different
794
mutter('UnicodeError: %s', e)
797
713
raise AssertionError("%snot equal:\na = %s\nb = %s\n"
799
pformat(a), pformat(b)))
715
pformat(a, indent=4), pformat(b, indent=4)))
801
717
assertEquals = assertEqual
812
728
if message is None:
813
729
message = "texts not equal:\n"
814
raise AssertionError(message +
815
self._ndiff_strings(a, b))
730
raise AssertionError(message +
731
self._ndiff_strings(a, b))
817
733
def assertEqualMode(self, mode, mode_test):
818
734
self.assertEqual(mode, mode_test,
819
735
'mode mismatch %o != %o' % (mode, mode_test))
821
def assertPositive(self, val):
822
"""Assert that val is greater than 0."""
823
self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
825
def assertNegative(self, val):
826
"""Assert that val is less than 0."""
827
self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
829
737
def assertStartsWith(self, s, prefix):
830
738
if not s.startswith(prefix):
831
739
raise AssertionError('string %r does not start with %r' % (s, prefix))
838
746
def assertContainsRe(self, haystack, needle_re):
839
747
"""Assert that a contains something matching a regular expression."""
840
748
if not re.search(needle_re, haystack):
841
if '\n' in haystack or len(haystack) > 60:
842
# a long string, format it in a more readable way
843
raise AssertionError(
844
'pattern "%s" not found in\n"""\\\n%s"""\n'
845
% (needle_re, haystack))
847
raise AssertionError('pattern "%s" not found in "%s"'
848
% (needle_re, haystack))
749
raise AssertionError('pattern "%r" not found in "%r"'
750
% (needle_re, haystack))
850
752
def assertNotContainsRe(self, haystack, needle_re):
851
753
"""Assert that a does not match a regular expression"""
881
783
excName = str(excClass)
882
784
raise self.failureException, "%s not raised" % excName
884
def assertRaises(self, excClass, callableObj, *args, **kwargs):
885
"""Assert that a callable raises a particular exception.
887
:param excClass: As for the except statement, this may be either an
888
exception class, or a tuple of classes.
889
:param callableObj: A callable, will be passed ``*args`` and
892
Returns the exception so that you can examine it.
895
callableObj(*args, **kwargs)
899
if getattr(excClass,'__name__', None) is not None:
900
excName = excClass.__name__
903
excName = str(excClass)
904
raise self.failureException, "%s not raised" % excName
906
786
def assertIs(self, left, right, message=None):
907
787
if not (left is right):
908
788
if message is not None:
935
815
self.fail("%r is an instance of %s rather than %s" % (
936
816
obj, obj.__class__, kls))
938
def expectFailure(self, reason, assertion, *args, **kwargs):
939
"""Invoke a test, expecting it to fail for the given reason.
941
This is for assertions that ought to succeed, but currently fail.
942
(The failure is *expected* but not *wanted*.) Please be very precise
943
about the failure you're expecting. If a new bug is introduced,
944
AssertionError should be raised, not KnownFailure.
946
Frequently, expectFailure should be followed by an opposite assertion.
949
Intended to be used with a callable that raises AssertionError as the
950
'assertion' parameter. args and kwargs are passed to the 'assertion'.
952
Raises KnownFailure if the test fails. Raises AssertionError if the
957
self.expectFailure('Math is broken', self.assertNotEqual, 54,
959
self.assertEqual(42, dynamic_val)
961
This means that a dynamic_val of 54 will cause the test to raise
962
a KnownFailure. Once math is fixed and the expectFailure is removed,
963
only a dynamic_val of 42 will allow the test to pass. Anything other
964
than 54 or 42 will cause an AssertionError.
967
assertion(*args, **kwargs)
968
except AssertionError:
969
raise KnownFailure(reason)
971
self.fail('Unexpected success. Should have failed: %s' % reason)
973
818
def _capture_warnings(self, a_callable, *args, **kwargs):
974
819
"""A helper for callDeprecated and applyDeprecated.
996
841
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
997
842
"""Call a deprecated callable without warning the user.
999
Note that this only captures warnings raised by symbol_versioning.warn,
1000
not other callers that go direct to the warning module.
1002
844
:param deprecation_format: The deprecation format that the callable
1003
should have been deprecated with. This is the same type as the
1004
parameter to deprecated_method/deprecated_function. If the
845
should have been deprecated with. This is the same type as the
846
parameter to deprecated_method/deprecated_function. If the
1005
847
callable is not deprecated with this format, an assertion error
1007
849
:param a_callable: A callable to call. This may be a bound method or
1008
a regular function. It will be called with ``*args`` and
850
a regular function. It will be called with *args and **kwargs.
1010
851
:param args: The positional arguments for the callable
1011
852
:param kwargs: The keyword arguments for the callable
1012
:return: The result of a_callable(``*args``, ``**kwargs``)
853
:return: The result of a_callable(*args, **kwargs)
1014
855
call_warnings, result = self._capture_warnings(a_callable,
1015
856
*args, **kwargs)
1118
953
"""Set an environment variable, and reset it when finished."""
1119
954
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1121
def _restore_debug_flags(self):
1122
debug.debug_flags.clear()
1123
debug.debug_flags.update(self._preserved_debug_flags)
1125
956
def _restoreEnvironment(self):
1126
957
for name, value in self.__old_env.iteritems():
1127
958
osutils.set_or_unset_env(name, value)
1129
960
def _restoreHooks(self):
1130
for klass, hooks in self._preserved_hooks.items():
1131
setattr(klass, 'hooks', hooks)
1133
def knownFailure(self, reason):
1134
"""This test has failed for some known reason."""
1135
raise KnownFailure(reason)
1137
def run(self, result=None):
1138
if result is None: result = self.defaultTestResult()
1139
for feature in getattr(self, '_test_needs_features', []):
1140
if not feature.available():
1141
result.startTest(self)
1142
if getattr(result, 'addNotSupported', None):
1143
result.addNotSupported(self, feature)
1145
result.addSuccess(self)
1146
result.stopTest(self)
1148
return unittest.TestCase.run(self, result)
961
bzrlib.branch.Branch.hooks = self._preserved_hooks
1150
963
def tearDown(self):
1151
964
self._runCleanups()
1219
1032
return "DELETED log file to reduce memory footprint"
1221
@deprecated_method(zero_eighteen)
1222
1034
def capture(self, cmd, retcode=0):
1223
1035
"""Shortcut that splits cmd into words, runs, and returns stdout"""
1224
1036
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1226
def requireFeature(self, feature):
1227
"""This test requires a specific feature is available.
1229
:raises UnavailableFeature: When feature is not available.
1231
if not feature.available():
1232
raise UnavailableFeature(feature)
1234
@deprecated_method(zero_eighteen)
1235
1038
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1236
1039
working_dir=None):
1237
1040
"""Invoke bzr and return (stdout, stderr).
1239
Don't call this method, just use run_bzr() which is equivalent.
1241
:param argv: Arguments to invoke bzr. This may be either a
1242
single string, in which case it is split by shlex into words,
1243
or a list of arguments.
1244
:param retcode: Expected return code, or None for don't-care.
1245
:param encoding: Encoding for sys.stdout and sys.stderr
1042
Useful for code that wants to check the contents of the
1043
output, the way error messages are presented, etc.
1045
This should be the main method for tests that want to exercise the
1046
overall behavior of the bzr application (rather than a unit test
1047
or a functional test of the library.)
1049
Much of the old code runs bzr by forking a new copy of Python, but
1050
that is slower, harder to debug, and generally not necessary.
1052
This runs bzr through the interface that catches and reports
1053
errors, and with logging set to something approximating the
1054
default, so that error reporting can be checked.
1056
:param argv: arguments to invoke bzr
1057
:param retcode: expected return code, or None for don't-care.
1058
:param encoding: encoding for sys.stdout and sys.stderr
1246
1059
:param stdin: A string to be used as stdin for the command.
1247
1060
:param working_dir: Change to this directory before running
1249
return self._run_bzr_autosplit(argv, retcode=retcode,
1250
encoding=encoding, stdin=stdin, working_dir=working_dir,
1253
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1255
"""Run bazaar command line, splitting up a string command line."""
1256
if isinstance(args, basestring):
1257
args = list(shlex.split(args))
1258
return self._run_bzr_core(args, retcode=retcode,
1259
encoding=encoding, stdin=stdin, working_dir=working_dir,
1262
def _run_bzr_core(self, args, retcode, encoding, stdin,
1264
1062
if encoding is None:
1265
1063
encoding = bzrlib.user_encoding
1266
1064
stdout = StringIOWrapper()
1283
1081
os.chdir(working_dir)
1286
result = self.apply_redirected(ui.ui_factory.stdin,
1288
bzrlib.commands.run_bzr_catch_errors,
1084
saved_debug_flags = frozenset(debug.debug_flags)
1085
debug.debug_flags.clear()
1087
result = self.apply_redirected(ui.ui_factory.stdin,
1089
bzrlib.commands.run_bzr_catch_errors,
1092
debug.debug_flags.update(saved_debug_flags)
1291
1094
logger.removeHandler(handler)
1292
1095
ui.ui_factory = old_ui_factory
1301
1104
self.log('errors:\n%r', err)
1302
1105
if retcode is not None:
1303
self.assertEquals(retcode, result,
1304
message='Unexpected return code')
1106
self.assertEquals(retcode, result)
1305
1107
return out, err
1307
1109
def run_bzr(self, *args, **kwargs):
1308
1110
"""Invoke bzr, as if it were run from the command line.
1310
The argument list should not include the bzr program name - the
1311
first argument is normally the bzr command. Arguments may be
1312
passed in three ways:
1314
1- A list of strings, eg ["commit", "a"]. This is recommended
1315
when the command contains whitespace or metacharacters, or
1316
is built up at run time.
1318
2- A single string, eg "add a". This is the most convenient
1319
for hardcoded commands.
1321
3- Several varargs parameters, eg run_bzr("add", "a").
1322
This is not recommended for new code.
1324
This runs bzr through the interface that catches and reports
1325
errors, and with logging set to something approximating the
1326
default, so that error reporting can be checked.
1328
1112
This should be the main method for tests that want to exercise the
1329
1113
overall behavior of the bzr application (rather than a unit test
1330
1114
or a functional test of the library.)
1332
1116
This sends the stdout/stderr results into the test's log,
1333
1117
where it may be useful for debugging. See also run_captured.
1335
:keyword stdin: A string to be used as stdin for the command.
1336
:keyword retcode: The status code the command should return;
1338
:keyword working_dir: The directory to run the command in
1339
:keyword error_regexes: A list of expected error messages. If
1340
specified they must be seen in the error output of the command.
1119
:param stdin: A string to be used as stdin for the command.
1120
:param retcode: The status code the command should return
1121
:param working_dir: The directory to run the command in
1342
1123
retcode = kwargs.pop('retcode', 0)
1343
1124
encoding = kwargs.pop('encoding', None)
1344
1125
stdin = kwargs.pop('stdin', None)
1345
1126
working_dir = kwargs.pop('working_dir', None)
1346
error_regexes = kwargs.pop('error_regexes', [])
1349
if isinstance(args[0], (list, basestring)):
1352
symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
1353
DeprecationWarning, stacklevel=3)
1355
out, err = self._run_bzr_autosplit(args=args,
1357
encoding=encoding, stdin=stdin, working_dir=working_dir,
1360
for regex in error_regexes:
1361
self.assertContainsRe(err, regex)
1127
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
1128
stdin=stdin, working_dir=working_dir)
1364
1130
def run_bzr_decode(self, *args, **kwargs):
1365
1131
if 'encoding' in kwargs:
1371
1137
def run_bzr_error(self, error_regexes, *args, **kwargs):
1372
1138
"""Run bzr, and check that stderr contains the supplied regexes
1374
:param error_regexes: Sequence of regular expressions which
1140
:param error_regexes: Sequence of regular expressions which
1375
1141
must each be found in the error output. The relative ordering
1376
1142
is not enforced.
1377
1143
:param args: command-line arguments for bzr
1378
1144
:param kwargs: Keyword arguments which are interpreted by run_bzr
1379
1145
This function changes the default value of retcode to be 3,
1380
1146
since in most cases this is run when you expect bzr to fail.
1382
:return: (out, err) The actual output of running the command (in case
1383
you want to do more inspection)
1147
:return: (out, err) The actual output of running the command (in case you
1148
want to do more inspection)
1387
1151
# Make sure that commit is failing because there is nothing to do
1388
1152
self.run_bzr_error(['no changes to commit'],
1389
1153
'commit', '-m', 'my commit comment')
1394
1158
'commit', '--strict', '-m', 'my commit comment')
1396
1160
kwargs.setdefault('retcode', 3)
1397
kwargs['error_regexes'] = error_regexes
1398
1161
out, err = self.run_bzr(*args, **kwargs)
1162
for regex in error_regexes:
1163
self.assertContainsRe(err, regex)
1399
1164
return out, err
1401
1166
def run_bzr_subprocess(self, *args, **kwargs):
1407
1172
handling, or early startup code, etc. Subprocess code can't be
1408
1173
profiled or debugged so easily.
1410
:keyword retcode: The status code that is expected. Defaults to 0. If
1175
:param retcode: The status code that is expected. Defaults to 0. If
1411
1176
None is supplied, the status code is not checked.
1412
:keyword env_changes: A dictionary which lists changes to environment
1177
:param env_changes: A dictionary which lists changes to environment
1413
1178
variables. A value of None will unset the env variable.
1414
1179
The values must be strings. The change will only occur in the
1415
1180
child, so you don't need to fix the environment after running.
1416
:keyword universal_newlines: Convert CRLF => LF
1417
:keyword allow_plugins: By default the subprocess is run with
1181
:param universal_newlines: Convert CRLF => LF
1182
:param allow_plugins: By default the subprocess is run with
1418
1183
--no-plugins to ensure test reproducibility. Also, it is possible
1419
1184
for system-wide plugins to create unexpected output on stderr,
1420
1185
which can cause unnecessary test failures.
1591
1356
sys.stderr = real_stderr
1592
1357
sys.stdin = real_stdin
1594
def reduceLockdirTimeout(self):
1595
"""Reduce the default lock timeout for the duration of the test, so that
1596
if LockContention occurs during a test, it does so quickly.
1359
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1360
def merge(self, branch_from, wt_to):
1361
"""A helper for tests to do a ui-less merge.
1598
Tests that expect to provoke LockContention errors should call this.
1363
This should move to the main library when someone has time to integrate
1600
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1602
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1603
self.addCleanup(resetTimeout)
1604
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1366
# minimal ui-less merge.
1367
wt_to.branch.fetch(branch_from)
1368
base_rev = common_ancestor(branch_from.last_revision(),
1369
wt_to.branch.last_revision(),
1370
wt_to.branch.repository)
1371
merge_inner(wt_to.branch, branch_from.basis_tree(),
1372
wt_to.branch.repository.revision_tree(base_rev),
1374
wt_to.add_parent_tree_id(branch_from.last_revision())
1377
BzrTestBase = TestCase
1607
1380
class TestCaseWithMemoryTransport(TestCase):
1618
1391
file defaults for the transport in tests, nor does it obey the command line
1619
1392
override, so tests that accidentally write to the common directory should
1622
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1623
a .bzr directory that stops us ascending higher into the filesystem.
1626
1396
TEST_ROOT = None
1627
1397
_TEST_NAME = 'test'
1629
1400
def __init__(self, methodName='runTest'):
1630
1401
# allow test parameterisation after test construction and before test
1631
1402
# execution. Variables that the parameteriser sets need to be
1632
1403
# ones that are not set by setUp, or setUp will trash them.
1633
1404
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1634
self.vfs_transport_factory = default_transport
1635
self.transport_server = None
1405
self.transport_server = default_transport
1636
1406
self.transport_readonly_server = None
1637
self.__vfs_server = None
1639
def get_transport(self, relpath=None):
1640
"""Return a writeable transport.
1642
This transport is for the test scratch space relative to
1645
:param relpath: a path relative to the base url.
1647
t = get_transport(self.get_url(relpath))
1408
def get_transport(self):
1409
"""Return a writeable transport for the test scratch space"""
1410
t = get_transport(self.get_url())
1648
1411
self.assertFalse(t.is_readonly())
1651
def get_readonly_transport(self, relpath=None):
1414
def get_readonly_transport(self):
1652
1415
"""Return a readonly transport for the test scratch space
1654
1417
This can be used to test that operations which should only need
1655
1418
readonly access in fact do not try to write.
1657
:param relpath: a path relative to the base url.
1659
t = get_transport(self.get_readonly_url(relpath))
1420
t = get_transport(self.get_readonly_url())
1660
1421
self.assertTrue(t.is_readonly())
1676
1437
if self.transport_readonly_server is None:
1677
1438
# readonly decorator requested
1678
1439
# bring up the server
1679
1441
self.__readonly_server = ReadonlyServer()
1680
self.__readonly_server.setUp(self.get_vfs_only_server())
1442
self.__readonly_server.setUp(self.__server)
1682
1444
self.__readonly_server = self.create_transport_readonly_server()
1683
self.__readonly_server.setUp(self.get_vfs_only_server())
1445
self.__readonly_server.setUp()
1684
1446
self.addCleanup(self.__readonly_server.tearDown)
1685
1447
return self.__readonly_server
1704
1470
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1705
1471
is no means to override it.
1707
if self.__vfs_server is None:
1708
self.__vfs_server = MemoryServer()
1709
self.__vfs_server.setUp()
1710
self.addCleanup(self.__vfs_server.tearDown)
1711
return self.__vfs_server
1713
def get_server(self):
1714
"""Get the read/write server instance.
1716
This is useful for some tests with specific servers that need
1719
This is built from the self.transport_server factory. If that is None,
1720
then the self.get_vfs_server is returned.
1722
1473
if self.__server is None:
1723
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1724
return self.get_vfs_only_server()
1726
# bring up a decorated means of access to the vfs only server.
1727
self.__server = self.transport_server()
1729
self.__server.setUp(self.get_vfs_only_server())
1730
except TypeError, e:
1731
# This should never happen; the try:Except here is to assist
1732
# developers having to update code rather than seeing an
1733
# uninformative TypeError.
1734
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1474
self.__server = MemoryServer()
1475
self.__server.setUp()
1735
1476
self.addCleanup(self.__server.tearDown)
1736
1477
return self.__server
1738
def _adjust_url(self, base, relpath):
1479
def get_url(self, relpath=None):
1739
1480
"""Get a URL (or maybe a path) for the readwrite transport.
1741
1482
This will either be backed by '.' or to an equivalent non-file based
1756
1498
base += urlutils.escape(relpath)
1759
def get_url(self, relpath=None):
1760
"""Get a URL (or maybe a path) for the readwrite transport.
1762
This will either be backed by '.' or to an equivalent non-file based
1764
relpath provides for clients to get a path relative to the base url.
1765
These should only be downwards relative, not upwards.
1767
base = self.get_server().get_url()
1768
return self._adjust_url(base, relpath)
1770
def get_vfs_only_url(self, relpath=None):
1771
"""Get a URL (or maybe a path for the plain old vfs transport.
1773
This will never be a smart protocol. It always has all the
1774
capabilities of the local filesystem, but it might actually be a
1775
MemoryTransport or some other similar virtual filesystem.
1777
This is the backing transport (if any) of the server returned by
1778
get_url and get_readonly_url.
1780
:param relpath: provides for clients to get a path relative to the base
1781
url. These should only be downwards relative, not upwards.
1784
base = self.get_vfs_only_server().get_url()
1785
return self._adjust_url(base, relpath)
1787
1501
def _make_test_root(self):
1788
1502
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1790
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1791
TestCaseWithMemoryTransport.TEST_ROOT = root
1506
root = u'test%04d.tmp' % i
1510
if e.errno == errno.EEXIST:
1515
# successfully created
1516
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1793
1518
# make a fake bzr directory there to prevent any tests propagating
1794
1519
# up onto the source directory's real branch
1795
bzrdir.BzrDir.create_standalone_workingtree(root)
1797
# The same directory is used by all tests, and we're not specifically
1798
# told when all tests are finished. This will do.
1799
atexit.register(_rmtree_temp_dir, root)
1520
bzrdir.BzrDir.create_standalone_workingtree(
1521
TestCaseWithMemoryTransport.TEST_ROOT)
1801
1523
def makeAndChdirToTestDir(self):
1802
1524
"""Create a temporary directories for this one test.
1832
1557
raise TestSkipped("Format %s is not initializable." % format)
1834
1559
def make_repository(self, relpath, shared=False, format=None):
1835
"""Create a repository on our default transport at relpath.
1837
Note that relpath must be a relative path, not a full url.
1839
# FIXME: If you create a remoterepository this returns the underlying
1840
# real format, which is incorrect. Actually we should make sure that
1841
# RemoteBzrDir returns a RemoteRepository.
1842
# maybe mbp 20070410
1560
"""Create a repository on our default transport at relpath."""
1843
1561
made_control = self.make_bzrdir(relpath, format=format)
1844
1562
return made_control.create_repository(shared=shared)
1876
1593
All test cases create their own directory within that. If the
1877
1594
tests complete successfully, the directory is removed.
1879
:ivar test_base_dir: The path of the top-level directory for this
1880
test, which contains a home directory and a work directory.
1882
:ivar test_home_dir: An initially empty directory under test_base_dir
1883
which is used as $HOME for this test.
1885
:ivar test_dir: A directory under test_base_dir used as the current
1886
directory when the test proper is run.
1596
InTempDir is an old alias for FunctionalTestCase.
1889
1599
OVERRIDE_PYTHON = 'python'
1902
1612
For TestCaseInTempDir we create a temporary directory based on the test
1903
1613
name and then create two subdirs - test and home under it.
1905
# create a directory within the top level test directory
1906
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1907
# now create test and home directories within this dir
1908
self.test_base_dir = candidate_dir
1909
self.test_home_dir = self.test_base_dir + '/home'
1910
os.mkdir(self.test_home_dir)
1911
self.test_dir = self.test_base_dir + '/work'
1912
os.mkdir(self.test_dir)
1913
os.chdir(self.test_dir)
1914
# put name of test inside
1915
f = file(self.test_base_dir + '/name', 'w')
1615
if NUMBERED_DIRS: # strongly recommended on Windows
1616
# due the path length limitation (260 chars)
1617
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1618
int(self.number/1000),
1620
os.makedirs(candidate_dir)
1621
self.test_home_dir = candidate_dir + '/home'
1622
os.mkdir(self.test_home_dir)
1623
self.test_dir = candidate_dir + '/work'
1624
os.mkdir(self.test_dir)
1625
os.chdir(self.test_dir)
1626
# put name of test inside
1627
f = file(candidate_dir + '/name', 'w')
1917
1628
f.write(self.id())
1920
self.addCleanup(self.deleteTestDir)
1922
def deleteTestDir(self):
1923
os.chdir(self.TEST_ROOT)
1924
_rmtree_temp_dir(self.test_base_dir)
1632
# shorten the name, to avoid test failures due to path length
1633
short_id = self.id().replace('bzrlib.tests.', '') \
1634
.replace('__main__.', '')[-100:]
1635
# it's possible the same test class is run several times for
1636
# parameterized tests, so make sure the names don't collide.
1640
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1642
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1643
if os.path.exists(candidate_dir):
1647
os.mkdir(candidate_dir)
1648
self.test_home_dir = candidate_dir + '/home'
1649
os.mkdir(self.test_home_dir)
1650
self.test_dir = candidate_dir + '/work'
1651
os.mkdir(self.test_dir)
1652
os.chdir(self.test_dir)
1926
1655
def build_tree(self, shape, line_endings='binary', transport=None):
1927
1656
"""Build a test tree according to a pattern.
1932
1661
This assumes that all the elements in the tree being built are new.
1934
1663
This doesn't add anything to a branch.
1936
1664
:param line_endings: Either 'binary' or 'native'
1937
in binary mode, exact contents are written in native mode, the
1938
line endings match the default platform endings.
1939
:param transport: A transport to write to, for building trees on VFS's.
1940
If the transport is readonly or None, "." is opened automatically.
1665
in binary mode, exact contents are written
1666
in native mode, the line endings match the
1667
default platform endings.
1669
:param transport: A transport to write to, for building trees on
1670
VFS's. If the transport is readonly or None,
1671
"." is opened automatically.
1943
1673
# It's OK to just create them using forward slashes on windows.
1944
1674
if transport is None or transport.is_readonly():
1964
1694
def assertFileEqual(self, content, path):
1965
1695
"""Fail if path does not contain 'content'."""
1966
1696
self.failUnlessExists(path)
1967
f = file(path, 'rb')
1972
self.assertEqualDiff(content, s)
1697
# TODO: jam 20060427 Shouldn't this be 'rb'?
1698
self.assertEqualDiff(content, open(path, 'r').read())
1974
1700
def failUnlessExists(self, path):
1975
"""Fail unless path or paths, which may be abs or relative, exist."""
1976
if not isinstance(path, basestring):
1978
self.failUnlessExists(p)
1980
self.failUnless(osutils.lexists(path),path+" does not exist")
1701
"""Fail unless path, which may be abs or relative, exists."""
1702
self.failUnless(osutils.lexists(path),path+" does not exist")
1982
1704
def failIfExists(self, path):
1983
"""Fail if path or paths, which may be abs or relative, exist."""
1984
if not isinstance(path, basestring):
1986
self.failIfExists(p)
1988
self.failIf(osutils.lexists(path),path+" exists")
1990
def assertInWorkingTree(self,path,root_path='.',tree=None):
1991
"""Assert whether path or paths are in the WorkingTree"""
1993
tree = workingtree.WorkingTree.open(root_path)
1994
if not isinstance(path, basestring):
1996
self.assertInWorkingTree(p,tree=tree)
1998
self.assertIsNot(tree.path2id(path), None,
1999
path+' not in working tree.')
2001
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
2002
"""Assert whether path or paths are not in the WorkingTree"""
2004
tree = workingtree.WorkingTree.open(root_path)
2005
if not isinstance(path, basestring):
2007
self.assertNotInWorkingTree(p,tree=tree)
2009
self.assertIs(tree.path2id(path), None, path+' in working tree.')
1705
"""Fail if path, which may be abs or relative, exists."""
1706
self.failIf(osutils.lexists(path),path+" exists")
2012
1709
class TestCaseWithTransport(TestCaseInTempDir):
2023
1720
readwrite one must both define get_url() as resolving to os.getcwd().
2026
def get_vfs_only_server(self):
1723
def create_transport_server(self):
1724
"""Create a transport server from class defined at init.
1726
This is mostly a hook for daughter classes.
1728
return self.transport_server()
1730
def get_server(self):
2027
1731
"""See TestCaseWithMemoryTransport.
2029
1733
This is useful for some tests with specific servers that need
2032
if self.__vfs_server is None:
2033
self.__vfs_server = self.vfs_transport_factory()
2034
self.__vfs_server.setUp()
2035
self.addCleanup(self.__vfs_server.tearDown)
2036
return self.__vfs_server
1736
if self.__server is None:
1737
self.__server = self.create_transport_server()
1738
self.__server.setUp()
1739
self.addCleanup(self.__server.tearDown)
1740
return self.__server
2038
1742
def make_branch_and_tree(self, relpath, format=None):
2039
1743
"""Create a branch on the transport and a tree locally.
2041
1745
If the transport is not a LocalTransport, the Tree can't be created on
2042
the transport. In that case if the vfs_transport_factory is
2043
LocalURLServer the working tree is created in the local
2044
directory backing the transport, and the returned tree's branch and
2045
repository will also be accessed locally. Otherwise a lightweight
2046
checkout is created and returned.
1746
the transport. In that case the working tree is created in the local
1747
directory, and the returned tree's branch and repository will also be
1750
This will fail if the original default transport for this test
1751
case wasn't backed by the working directory, as the branch won't
1752
be on disk for us to open it.
2048
1754
:param format: The BzrDirFormat.
2049
1755
:returns: the WorkingTree.
2057
1763
return b.bzrdir.create_workingtree()
2058
1764
except errors.NotLocalUrl:
2059
1765
# We can only make working trees locally at the moment. If the
2060
# transport can't support them, then we keep the non-disk-backed
2061
# branch and create a local checkout.
2062
if self.vfs_transport_factory is LocalURLServer:
2063
# the branch is colocated on disk, we cannot create a checkout.
2064
# hopefully callers will expect this.
2065
local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2066
return local_controldir.create_workingtree()
2068
return b.create_checkout(relpath, lightweight=True)
1766
# transport can't support them, then reopen the branch on a local
1767
# transport, and create the working tree there.
1769
# Possibly we should instead keep
1770
# the non-disk-backed branch and create a local checkout?
1771
bd = bzrdir.BzrDir.open(relpath)
1772
return bd.create_workingtree()
2070
1774
def assertIsDirectory(self, relpath, transport):
2071
1775
"""Assert that relpath within transport is a directory.
2111
1815
def setUp(self):
2112
1816
super(ChrootedTestCase, self).setUp()
2113
if not self.vfs_transport_factory == MemoryServer:
1817
if not self.transport_server == MemoryServer:
2114
1818
self.transport_readonly_server = HttpServer
2117
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2118
random_order=False):
2119
"""Create a test suite by filtering another one.
2121
:param suite: the source suite
2122
:param pattern: pattern that names must match
2123
:param exclude_pattern: pattern that names must not match, if any
2124
:param random_order: if True, tests in the new suite will be put in
2126
:returns: the newly created suite
2128
return sort_suite_by_re(suite, pattern, exclude_pattern,
2129
random_order, False)
2132
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2133
random_order=False, append_rest=True):
2134
"""Create a test suite by sorting another one.
2136
:param suite: the source suite
2137
:param pattern: pattern that names must match in order to go
2138
first in the new suite
2139
:param exclude_pattern: pattern that names must not match, if any
2140
:param random_order: if True, tests in the new suite will be put in
2142
:param append_rest: if False, pattern is a strict filter and not
2143
just an ordering directive
2144
:returns: the newly created suite
1821
def filter_suite_by_re(suite, pattern):
1822
result = TestUtil.TestSuite()
1823
filter_re = re.compile(pattern)
1824
for test in iter_suite_tests(suite):
1825
if filter_re.search(test.id()):
1826
result.addTest(test)
1830
def sort_suite_by_re(suite, pattern):
2148
1833
filter_re = re.compile(pattern)
2149
if exclude_pattern is not None:
2150
exclude_re = re.compile(exclude_pattern)
2151
1834
for test in iter_suite_tests(suite):
2153
if exclude_pattern is None or not exclude_re.search(test_id):
2154
if filter_re.search(test_id):
2159
random.shuffle(first)
2160
random.shuffle(second)
1835
if filter_re.search(test.id()):
2161
1839
return TestUtil.TestSuite(first + second)
2164
1842
def run_suite(suite, name='test', verbose=False, pattern=".*",
2165
stop_on_failure=False,
1843
stop_on_failure=False, keep_output=False,
2166
1844
transport=None, lsprof_timed=None, bench_history=None,
2167
1845
matching_tests_first=None,
2170
exclude_pattern=None,
1846
numbered_dirs=None):
1847
global NUMBERED_DIRS
1848
if numbered_dirs is not None:
1849
NUMBERED_DIRS = bool(numbered_dirs)
2172
1851
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2177
1856
runner = TextTestRunner(stream=sys.stdout,
2178
1857
descriptions=0,
2179
1858
verbosity=verbosity,
2180
bench_history=bench_history,
2181
list_only=list_only,
1859
keep_output=keep_output,
1860
bench_history=bench_history)
2183
1861
runner.stop_on_failure=stop_on_failure
2184
# Initialise the random number generator and display the seed used.
2185
# We convert the seed to a long to make it reuseable across invocations.
2186
random_order = False
2187
if random_seed is not None:
2189
if random_seed == "now":
2190
random_seed = long(time.time())
2192
# Convert the seed to a long if we can
2194
random_seed = long(random_seed)
2197
runner.stream.writeln("Randomizing test order using seed %s\n" %
2199
random.seed(random_seed)
2200
# Customise the list of tests if requested
2201
if pattern != '.*' or exclude_pattern is not None or random_order:
2202
1863
if matching_tests_first:
2203
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
1864
suite = sort_suite_by_re(suite, pattern)
2206
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
1866
suite = filter_suite_by_re(suite, pattern)
2208
1867
result = runner.run(suite)
2209
1868
return result.wasSuccessful()
2212
1871
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2213
1873
transport=None,
2214
1874
test_suite_factory=None,
2215
1875
lsprof_timed=None,
2216
1876
bench_history=None,
2217
1877
matching_tests_first=None,
2220
exclude_pattern=None):
1878
numbered_dirs=None):
2221
1879
"""Run the whole test suite under the enhanced runner"""
2222
1880
# XXX: Very ugly way to do this...
2223
1881
# Disable warning about old formats because we don't want it to disturb
2237
1895
suite = test_suite_factory()
2238
1896
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2239
stop_on_failure=stop_on_failure,
1897
stop_on_failure=stop_on_failure, keep_output=keep_output,
2240
1898
transport=transport,
2241
1899
lsprof_timed=lsprof_timed,
2242
1900
bench_history=bench_history,
2243
1901
matching_tests_first=matching_tests_first,
2244
list_only=list_only,
2245
random_seed=random_seed,
2246
exclude_pattern=exclude_pattern)
1902
numbered_dirs=numbered_dirs)
2248
1904
default_transport = old_transport
2255
1911
suite on a global basis, but it is not encouraged.
2257
1913
testmod_names = [
2258
'bzrlib.util.tests.test_bencode',
2259
'bzrlib.tests.test__dirstate_helpers',
2260
1914
'bzrlib.tests.test_ancestry',
2261
1915
'bzrlib.tests.test_annotate',
2262
1916
'bzrlib.tests.test_api',
2263
1917
'bzrlib.tests.test_atomicfile',
2264
1918
'bzrlib.tests.test_bad_files',
2265
1919
'bzrlib.tests.test_branch',
2266
'bzrlib.tests.test_branchbuilder',
2267
'bzrlib.tests.test_bugtracker',
2268
1920
'bzrlib.tests.test_bundle',
2269
1921
'bzrlib.tests.test_bzrdir',
2270
1922
'bzrlib.tests.test_cache_utf8',
2273
1925
'bzrlib.tests.test_commit_merge',
2274
1926
'bzrlib.tests.test_config',
2275
1927
'bzrlib.tests.test_conflicts',
2276
'bzrlib.tests.test_counted_lock',
2277
1928
'bzrlib.tests.test_decorators',
2278
1929
'bzrlib.tests.test_delta',
2279
'bzrlib.tests.test_deprecated_graph',
2280
1930
'bzrlib.tests.test_diff',
2281
1931
'bzrlib.tests.test_dirstate',
2282
'bzrlib.tests.test_email_message',
1932
'bzrlib.tests.test_doc_generate',
2283
1933
'bzrlib.tests.test_errors',
2284
1934
'bzrlib.tests.test_escaped_store',
2285
1935
'bzrlib.tests.test_extract',
2286
1936
'bzrlib.tests.test_fetch',
2287
'bzrlib.tests.test_file_names',
2288
1937
'bzrlib.tests.test_ftp_transport',
2289
1938
'bzrlib.tests.test_generate_docs',
2290
1939
'bzrlib.tests.test_generate_ids',
2292
1941
'bzrlib.tests.test_gpg',
2293
1942
'bzrlib.tests.test_graph',
2294
1943
'bzrlib.tests.test_hashcache',
2295
'bzrlib.tests.test_help',
2296
'bzrlib.tests.test_hooks',
2297
1944
'bzrlib.tests.test_http',
2298
1945
'bzrlib.tests.test_http_response',
2299
1946
'bzrlib.tests.test_https_ca_bundle',
2300
1947
'bzrlib.tests.test_identitymap',
2301
1948
'bzrlib.tests.test_ignores',
2302
'bzrlib.tests.test_index',
2303
'bzrlib.tests.test_info',
2304
1949
'bzrlib.tests.test_inv',
2305
1950
'bzrlib.tests.test_knit',
2306
1951
'bzrlib.tests.test_lazy_import',
2307
1952
'bzrlib.tests.test_lazy_regex',
1953
'bzrlib.tests.test_lock',
2308
1954
'bzrlib.tests.test_lockdir',
2309
1955
'bzrlib.tests.test_lockable_files',
2310
1956
'bzrlib.tests.test_log',
2311
'bzrlib.tests.test_lsprof',
2312
1957
'bzrlib.tests.test_memorytree',
2313
1958
'bzrlib.tests.test_merge',
2314
1959
'bzrlib.tests.test_merge3',
2316
1961
'bzrlib.tests.test_merge_directive',
2317
1962
'bzrlib.tests.test_missing',
2318
1963
'bzrlib.tests.test_msgeditor',
2319
'bzrlib.tests.test_multiparent',
2320
1964
'bzrlib.tests.test_nonascii',
2321
1965
'bzrlib.tests.test_options',
2322
1966
'bzrlib.tests.test_osutils',
2323
1967
'bzrlib.tests.test_osutils_encodings',
2324
'bzrlib.tests.test_pack',
2325
1968
'bzrlib.tests.test_patch',
2326
1969
'bzrlib.tests.test_patches',
2327
1970
'bzrlib.tests.test_permissions',
2340
1982
'bzrlib.tests.test_selftest',
2341
1983
'bzrlib.tests.test_setup',
2342
1984
'bzrlib.tests.test_sftp_transport',
2343
'bzrlib.tests.test_smart',
2344
1985
'bzrlib.tests.test_smart_add',
2345
1986
'bzrlib.tests.test_smart_transport',
2346
'bzrlib.tests.test_smtp_connection',
2347
1987
'bzrlib.tests.test_source',
2348
1988
'bzrlib.tests.test_ssh_transport',
2349
1989
'bzrlib.tests.test_status',
2350
1990
'bzrlib.tests.test_store',
2351
'bzrlib.tests.test_strace',
2352
1991
'bzrlib.tests.test_subsume',
2353
1992
'bzrlib.tests.test_symbol_versioning',
2354
1993
'bzrlib.tests.test_tag',
2422
2061
suite.addTests(adapter.adapt(test))
2425
def _rmtree_temp_dir(dirname):
2426
# If LANG=C we probably have created some bogus paths
2427
# which rmtree(unicode) will fail to delete
2428
# so make sure we are using rmtree(str) to delete everything
2429
# except on win32, where rmtree(str) will fail
2430
# since it doesn't have the property of byte-stream paths
2431
# (they are either ascii or mbcs)
2432
if sys.platform == 'win32':
2433
# make sure we are using the unicode win32 api
2434
dirname = unicode(dirname)
2436
dirname = dirname.encode(sys.getfilesystemencoding())
2438
osutils.rmtree(dirname)
2440
if sys.platform == 'win32' and e.errno == errno.EACCES:
2441
print >>sys.stderr, ('Permission denied: '
2442
'unable to remove testing dir '
2443
'%s' % os.path.basename(dirname))
2448
class Feature(object):
2449
"""An operating system Feature."""
2452
self._available = None
2454
def available(self):
2455
"""Is the feature available?
2457
:return: True if the feature is available.
2459
if self._available is None:
2460
self._available = self._probe()
2461
return self._available
2464
"""Implement this method in concrete features.
2466
:return: True if the feature is available.
2468
raise NotImplementedError
2471
if getattr(self, 'feature_name', None):
2472
return self.feature_name()
2473
return self.__class__.__name__
2476
class TestScenarioApplier(object):
2477
"""A tool to apply scenarios to tests."""
2479
def adapt(self, test):
2480
"""Return a TestSuite containing a copy of test for each scenario."""
2481
result = unittest.TestSuite()
2482
for scenario in self.scenarios:
2483
result.addTest(self.adapt_test_to_scenario(test, scenario))
2486
def adapt_test_to_scenario(self, test, scenario):
2487
"""Copy test and apply scenario to it.
2489
:param test: A test to adapt.
2490
:param scenario: A tuple describing the scenarion.
2491
The first element of the tuple is the new test id.
2492
The second element is a dict containing attributes to set on the
2494
:return: The adapted test.
2496
from copy import deepcopy
2497
new_test = deepcopy(test)
2498
for name, value in scenario[1].items():
2499
setattr(new_test, name, value)
2500
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2501
new_test.id = lambda: new_id
2064
def clean_selftest_output(root=None, quiet=False):
2065
"""Remove all selftest output directories from root directory.
2067
:param root: root directory for clean
2068
(if ommitted or None then clean current directory).
2069
:param quiet: suppress report about deleting directories
2074
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2077
for i in os.listdir(root):
2078
if os.path.isdir(i) and re_dir.match(i):
2080
print 'delete directory:', i