329
320
bench_history=None,
323
use_numbered_dirs=False,
333
325
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
334
bench_history, num_tests)
326
bench_history, num_tests, use_numbered_dirs)
336
328
self.pb = self.ui.nested_progress_bar()
337
329
self._supplied_pb = False
373
365
+ self._shortened_test_description(test))
375
367
def _test_description(self, test):
376
return self._shortened_test_description(test)
368
if self.use_numbered_dirs:
369
return '#%d %s' % (self.count,
370
self._shortened_test_description(test))
372
return self._shortened_test_description(test)
378
374
def report_error(self, test, err):
379
375
self.pb.note('ERROR: %s\n %s\n',
438
434
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
439
435
# numbers, plus a trailing blank
440
436
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
441
self.stream.write(self._ellipsize_to_right(name,
442
osutils.terminal_width()-30))
437
if self.use_numbered_dirs:
438
self.stream.write('%5d ' % self.count)
439
self.stream.write(self._ellipsize_to_right(name,
440
osutils.terminal_width()-36))
442
self.stream.write(self._ellipsize_to_right(name,
443
osutils.terminal_width()-30))
443
444
self.stream.flush()
445
446
def _error_summary(self, err):
448
if self.use_numbered_dirs:
447
450
return '%s%s' % (indent, err[1])
449
452
def report_error(self, test, err):
490
493
stream=sys.stderr,
493
497
bench_history=None,
498
use_numbered_dirs=False,
496
500
self.stream = unittest._WritelnDecorator(stream)
497
501
self.descriptions = descriptions
498
502
self.verbosity = verbosity
503
self.keep_output = keep_output
499
504
self._bench_history = bench_history
500
self.list_only = list_only
505
self.use_numbered_dirs = use_numbered_dirs
502
507
def run(self, test):
503
508
"Run the given test case or test suite."
512
517
bench_history=self._bench_history,
513
518
num_tests=test.countTestCases(),
519
use_numbered_dirs=self.use_numbered_dirs,
515
521
result.stop_early = self.stop_on_failure
516
522
result.report_starting()
518
if self.verbosity >= 2:
519
self.stream.writeln("Listing tests only ...\n")
521
for t in iter_suite_tests(test):
522
self.stream.writeln("%s" % (t.id()))
524
actionTaken = "Listed"
527
run = result.testsRun
529
524
stopTime = time.time()
530
525
timeTaken = stopTime - startTime
531
526
result.printErrors()
532
527
self.stream.writeln(result.separator2)
533
self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
534
run, run != 1 and "s" or "", timeTaken))
528
run = result.testsRun
529
self.stream.writeln("Ran %d test%s in %.3fs" %
530
(run, run != 1 and "s" or "", timeTaken))
535
531
self.stream.writeln()
536
532
if not result.wasSuccessful():
537
533
self.stream.write("FAILED (")
560
556
for feature, count in sorted(result.unsupported.items()):
561
557
self.stream.writeln("Missing feature '%s' skipped %d tests." %
562
558
(feature, count))
559
result.report_cleaning_up()
560
# This is still a little bogus,
561
# but only a little. Folk not using our testrunner will
562
# have to delete their temp directories themselves.
563
test_root = TestCaseWithMemoryTransport.TEST_ROOT
564
if result.wasSuccessful() or not self.keep_output:
565
if test_root is not None:
566
# If LANG=C we probably have created some bogus paths
567
# which rmtree(unicode) will fail to delete
568
# so make sure we are using rmtree(str) to delete everything
569
# except on win32, where rmtree(str) will fail
570
# since it doesn't have the property of byte-stream paths
571
# (they are either ascii or mbcs)
572
if sys.platform == 'win32':
573
# make sure we are using the unicode win32 api
574
test_root = unicode(test_root)
576
test_root = test_root.encode(
577
sys.getfilesystemencoding())
578
_rmtree_temp_dir(test_root)
580
note("Failed tests working directories are in '%s'\n", test_root)
581
TestCaseWithMemoryTransport.TEST_ROOT = None
563
582
result.finished()
732
751
self._startLogFile()
733
752
self._benchcalls = []
734
753
self._benchtime = None
736
self._clear_debug_flags()
738
def _clear_debug_flags(self):
739
"""Prevent externally set debug flags affecting tests.
741
Tests that want to use debug flags can just set them in the
742
debug_flags set during setup/teardown.
744
self._preserved_debug_flags = set(debug.debug_flags)
745
debug.debug_flags.clear()
746
self.addCleanup(self._restore_debug_flags)
748
def _clear_hooks(self):
749
754
# prevent hooks affecting tests
751
import bzrlib.smart.server
752
755
self._preserved_hooks = {
753
bzrlib.branch.Branch: bzrlib.branch.Branch.hooks,
754
bzrlib.smart.server.SmartTCPServer: bzrlib.smart.server.SmartTCPServer.hooks,
756
bzrlib.branch.Branch:bzrlib.branch.Branch.hooks,
757
bzrlib.smart.server.SmartTCPServer:bzrlib.smart.server.SmartTCPServer.hooks,
756
759
self.addCleanup(self._restoreHooks)
757
# reset all hooks to an empty instance of the appropriate type
760
# this list of hooks must be kept in sync with the defaults
758
762
bzrlib.branch.Branch.hooks = bzrlib.branch.BranchHooks()
759
bzrlib.smart.server.SmartTCPServer.hooks = bzrlib.smart.server.SmartServerHooks()
761
764
def _silenceUI(self):
762
765
"""Turn off UI for duration of test"""
810
813
if message is None:
811
814
message = "texts not equal:\n"
812
raise AssertionError(message +
813
self._ndiff_strings(a, b))
815
raise AssertionError(message +
816
self._ndiff_strings(a, b))
815
818
def assertEqualMode(self, mode, mode_test):
816
819
self.assertEqual(mode, mode_test,
828
831
def assertContainsRe(self, haystack, needle_re):
829
832
"""Assert that a contains something matching a regular expression."""
830
833
if not re.search(needle_re, haystack):
831
if '\n' in haystack or len(haystack) > 60:
832
# a long string, format it in a more readable way
833
raise AssertionError(
834
'pattern "%s" not found in\n"""\\\n%s"""\n'
835
% (needle_re, haystack))
837
raise AssertionError('pattern "%s" not found in "%s"'
838
% (needle_re, haystack))
834
raise AssertionError('pattern "%r" not found in "%r"'
835
% (needle_re, haystack))
840
837
def assertNotContainsRe(self, haystack, needle_re):
841
838
"""Assert that a does not match a regular expression"""
871
868
excName = str(excClass)
872
869
raise self.failureException, "%s not raised" % excName
874
def assertRaises(self, excClass, callableObj, *args, **kwargs):
871
def assertRaises(self, excClass, func, *args, **kwargs):
875
872
"""Assert that a callable raises a particular exception.
877
874
:param excClass: As for the except statement, this may be either an
878
exception class, or a tuple of classes.
879
:param callableObj: A callable, will be passed ``*args`` and
875
exception class, or a tuple of classes.
882
877
Returns the exception so that you can examine it.
885
callableObj(*args, **kwargs)
880
func(*args, **kwargs)
886
881
except excClass, e:
967
962
:param args: The positional arguments for the callable
968
963
:param kwargs: The keyword arguments for the callable
969
964
:return: A tuple (warnings, result). result is the result of calling
970
a_callable(``*args``, ``**kwargs``).
965
a_callable(*args, **kwargs).
972
967
local_warnings = []
973
968
def capture_warnings(msg, cls=None, stacklevel=None):
986
981
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
987
982
"""Call a deprecated callable without warning the user.
989
Note that this only captures warnings raised by symbol_versioning.warn,
990
not other callers that go direct to the warning module.
992
984
:param deprecation_format: The deprecation format that the callable
993
should have been deprecated with. This is the same type as the
994
parameter to deprecated_method/deprecated_function. If the
985
should have been deprecated with. This is the same type as the
986
parameter to deprecated_method/deprecated_function. If the
995
987
callable is not deprecated with this format, an assertion error
997
989
:param a_callable: A callable to call. This may be a bound method or
998
a regular function. It will be called with ``*args`` and
990
a regular function. It will be called with *args and **kwargs.
1000
991
:param args: The positional arguments for the callable
1001
992
:param kwargs: The keyword arguments for the callable
1002
:return: The result of a_callable(``*args``, ``**kwargs``)
993
:return: The result of a_callable(*args, **kwargs)
1004
995
call_warnings, result = self._capture_warnings(a_callable,
1005
996
*args, **kwargs)
1019
1010
as it allows you to simply specify the deprecation format being used
1020
1011
and will ensure that that is issued for the function being called.
1022
Note that this only captures warnings raised by symbol_versioning.warn,
1023
not other callers that go direct to the warning module.
1025
1013
:param expected: a list of the deprecation warnings expected, in order
1026
1014
:param callable: The callable to call
1027
1015
:param args: The positional arguments for the callable
1106
1093
"""Set an environment variable, and reset it when finished."""
1107
1094
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1109
def _restore_debug_flags(self):
1110
debug.debug_flags.clear()
1111
debug.debug_flags.update(self._preserved_debug_flags)
1113
1096
def _restoreEnvironment(self):
1114
1097
for name, value in self.__old_env.iteritems():
1115
1098
osutils.set_or_unset_env(name, value)
1219
1201
if not feature.available():
1220
1202
raise UnavailableFeature(feature)
1222
@deprecated_method(zero_eighteen)
1223
1204
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
1224
1205
working_dir=None):
1225
1206
"""Invoke bzr and return (stdout, stderr).
1227
Don't call this method, just use run_bzr() which is equivalent.
1229
:param argv: Arguments to invoke bzr. This may be either a
1230
single string, in which case it is split by shlex into words,
1231
or a list of arguments.
1232
:param retcode: Expected return code, or None for don't-care.
1233
:param encoding: Encoding for sys.stdout and sys.stderr
1208
Useful for code that wants to check the contents of the
1209
output, the way error messages are presented, etc.
1211
This should be the main method for tests that want to exercise the
1212
overall behavior of the bzr application (rather than a unit test
1213
or a functional test of the library.)
1215
Much of the old code runs bzr by forking a new copy of Python, but
1216
that is slower, harder to debug, and generally not necessary.
1218
This runs bzr through the interface that catches and reports
1219
errors, and with logging set to something approximating the
1220
default, so that error reporting can be checked.
1222
:param argv: arguments to invoke bzr
1223
:param retcode: expected return code, or None for don't-care.
1224
:param encoding: encoding for sys.stdout and sys.stderr
1234
1225
:param stdin: A string to be used as stdin for the command.
1235
1226
:param working_dir: Change to this directory before running
1237
return self._run_bzr_autosplit(argv, retcode=retcode,
1238
encoding=encoding, stdin=stdin, working_dir=working_dir,
1241
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1243
"""Run bazaar command line, splitting up a string command line."""
1244
if isinstance(args, basestring):
1245
args = list(shlex.split(args))
1246
return self._run_bzr_core(args, retcode=retcode,
1247
encoding=encoding, stdin=stdin, working_dir=working_dir,
1250
def _run_bzr_core(self, args, retcode, encoding, stdin,
1252
1228
if encoding is None:
1253
1229
encoding = bzrlib.user_encoding
1254
1230
stdout = StringIOWrapper()
1256
1232
stdout.encoding = encoding
1257
1233
stderr.encoding = encoding
1259
self.log('run bzr: %r', args)
1235
self.log('run bzr: %r', argv)
1260
1236
# FIXME: don't call into logging here
1261
1237
handler = logging.StreamHandler(stderr)
1262
1238
handler.setLevel(logging.INFO)
1271
1247
os.chdir(working_dir)
1274
result = self.apply_redirected(ui.ui_factory.stdin,
1276
bzrlib.commands.run_bzr_catch_errors,
1250
saved_debug_flags = frozenset(debug.debug_flags)
1251
debug.debug_flags.clear()
1253
result = self.apply_redirected(ui.ui_factory.stdin,
1255
bzrlib.commands.run_bzr_catch_errors,
1258
debug.debug_flags.update(saved_debug_flags)
1279
1260
logger.removeHandler(handler)
1280
1261
ui.ui_factory = old_ui_factory
1289
1270
self.log('errors:\n%r', err)
1290
1271
if retcode is not None:
1291
self.assertEquals(retcode, result,
1292
message='Unexpected return code')
1272
self.assertEquals(retcode, result)
1293
1273
return out, err
1295
1275
def run_bzr(self, *args, **kwargs):
1296
1276
"""Invoke bzr, as if it were run from the command line.
1298
The argument list should not include the bzr program name - the
1299
first argument is normally the bzr command. Arguments may be
1300
passed in three ways:
1302
1- A list of strings, eg ["commit", "a"]. This is recommended
1303
when the command contains whitespace or metacharacters, or
1304
is built up at run time.
1306
2- A single string, eg "add a". This is the most convenient
1307
for hardcoded commands.
1309
3- Several varargs parameters, eg run_bzr("add", "a").
1310
This is not recommended for new code.
1312
This runs bzr through the interface that catches and reports
1313
errors, and with logging set to something approximating the
1314
default, so that error reporting can be checked.
1316
1278
This should be the main method for tests that want to exercise the
1317
1279
overall behavior of the bzr application (rather than a unit test
1318
1280
or a functional test of the library.)
1320
1282
This sends the stdout/stderr results into the test's log,
1321
1283
where it may be useful for debugging. See also run_captured.
1323
:keyword stdin: A string to be used as stdin for the command.
1324
:keyword retcode: The status code the command should return;
1326
:keyword working_dir: The directory to run the command in
1327
:keyword error_regexes: A list of expected error messages. If
1328
specified they must be seen in the error output of the command.
1285
:param stdin: A string to be used as stdin for the command.
1286
:param retcode: The status code the command should return
1287
:param working_dir: The directory to run the command in
1330
1289
retcode = kwargs.pop('retcode', 0)
1331
1290
encoding = kwargs.pop('encoding', None)
1332
1291
stdin = kwargs.pop('stdin', None)
1333
1292
working_dir = kwargs.pop('working_dir', None)
1334
error_regexes = kwargs.pop('error_regexes', [])
1337
if isinstance(args[0], (list, basestring)):
1340
symbol_versioning.warn(zero_eighteen % "passing varargs to run_bzr",
1341
DeprecationWarning, stacklevel=3)
1343
out, err = self._run_bzr_autosplit(args=args,
1345
encoding=encoding, stdin=stdin, working_dir=working_dir,
1348
for regex in error_regexes:
1349
self.assertContainsRe(err, regex)
1293
return self.run_bzr_captured(args, retcode=retcode, encoding=encoding,
1294
stdin=stdin, working_dir=working_dir)
1352
1296
def run_bzr_decode(self, *args, **kwargs):
1353
1297
if 'encoding' in kwargs:
1359
1303
def run_bzr_error(self, error_regexes, *args, **kwargs):
1360
1304
"""Run bzr, and check that stderr contains the supplied regexes
1362
:param error_regexes: Sequence of regular expressions which
1306
:param error_regexes: Sequence of regular expressions which
1363
1307
must each be found in the error output. The relative ordering
1364
1308
is not enforced.
1365
1309
:param args: command-line arguments for bzr
1366
1310
:param kwargs: Keyword arguments which are interpreted by run_bzr
1367
1311
This function changes the default value of retcode to be 3,
1368
1312
since in most cases this is run when you expect bzr to fail.
1370
:return: (out, err) The actual output of running the command (in case
1371
you want to do more inspection)
1313
:return: (out, err) The actual output of running the command (in case you
1314
want to do more inspection)
1375
1317
# Make sure that commit is failing because there is nothing to do
1376
1318
self.run_bzr_error(['no changes to commit'],
1377
1319
'commit', '-m', 'my commit comment')
1382
1324
'commit', '--strict', '-m', 'my commit comment')
1384
1326
kwargs.setdefault('retcode', 3)
1385
kwargs['error_regexes'] = error_regexes
1386
1327
out, err = self.run_bzr(*args, **kwargs)
1328
for regex in error_regexes:
1329
self.assertContainsRe(err, regex)
1387
1330
return out, err
1389
1332
def run_bzr_subprocess(self, *args, **kwargs):
1395
1338
handling, or early startup code, etc. Subprocess code can't be
1396
1339
profiled or debugged so easily.
1398
:keyword retcode: The status code that is expected. Defaults to 0. If
1341
:param retcode: The status code that is expected. Defaults to 0. If
1399
1342
None is supplied, the status code is not checked.
1400
:keyword env_changes: A dictionary which lists changes to environment
1343
:param env_changes: A dictionary which lists changes to environment
1401
1344
variables. A value of None will unset the env variable.
1402
1345
The values must be strings. The change will only occur in the
1403
1346
child, so you don't need to fix the environment after running.
1404
:keyword universal_newlines: Convert CRLF => LF
1405
:keyword allow_plugins: By default the subprocess is run with
1347
:param universal_newlines: Convert CRLF => LF
1348
:param allow_plugins: By default the subprocess is run with
1406
1349
--no-plugins to ensure test reproducibility. Also, it is possible
1407
1350
for system-wide plugins to create unexpected output on stderr,
1408
1351
which can cause unnecessary test failures.
1432
1375
profiled or debugged so easily.
1434
1377
:param process_args: a list of arguments to pass to the bzr executable,
1435
for example ``['--version']``.
1378
for example `['--version']`.
1436
1379
:param env_changes: A dictionary which lists changes to environment
1437
1380
variables. A value of None will unset the env variable.
1438
1381
The values must be strings. The change will only occur in the
1579
1522
sys.stderr = real_stderr
1580
1523
sys.stdin = real_stdin
1525
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1526
def merge(self, branch_from, wt_to):
1527
"""A helper for tests to do a ui-less merge.
1529
This should move to the main library when someone has time to integrate
1532
# minimal ui-less merge.
1533
wt_to.branch.fetch(branch_from)
1534
base_rev = common_ancestor(branch_from.last_revision(),
1535
wt_to.branch.last_revision(),
1536
wt_to.branch.repository)
1537
merge_inner(wt_to.branch, branch_from.basis_tree(),
1538
wt_to.branch.repository.revision_tree(base_rev),
1540
wt_to.add_parent_tree_id(branch_from.last_revision())
1582
1542
def reduceLockdirTimeout(self):
1583
1543
"""Reduce the default lock timeout for the duration of the test, so that
1584
1544
if LockContention occurs during a test, it does so quickly.
1606
1568
file defaults for the transport in tests, nor does it obey the command line
1607
1569
override, so tests that accidentally write to the common directory should
1610
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1611
a .bzr directory that stops us ascending higher into the filesystem.
1614
1573
TEST_ROOT = None
1615
1574
_TEST_NAME = 'test'
1617
1577
def __init__(self, methodName='runTest'):
1618
1578
# allow test parameterisation after test construction and before test
1619
1579
# execution. Variables that the parameteriser sets need to be
1624
1584
self.transport_readonly_server = None
1625
1585
self.__vfs_server = None
1627
def get_transport(self, relpath=None):
1628
"""Return a writeable transport.
1630
This transport is for the test scratch space relative to
1633
:param relpath: a path relative to the base url.
1635
t = get_transport(self.get_url(relpath))
1587
def get_transport(self):
1588
"""Return a writeable transport for the test scratch space"""
1589
t = get_transport(self.get_url())
1636
1590
self.assertFalse(t.is_readonly())
1639
def get_readonly_transport(self, relpath=None):
1593
def get_readonly_transport(self):
1640
1594
"""Return a readonly transport for the test scratch space
1642
1596
This can be used to test that operations which should only need
1643
1597
readonly access in fact do not try to write.
1645
:param relpath: a path relative to the base url.
1647
t = get_transport(self.get_readonly_url(relpath))
1599
t = get_transport(self.get_readonly_url())
1648
1600
self.assertTrue(t.is_readonly())
1758
1714
def get_vfs_only_url(self, relpath=None):
1759
1715
"""Get a URL (or maybe a path for the plain old vfs transport.
1761
This will never be a smart protocol. It always has all the
1762
capabilities of the local filesystem, but it might actually be a
1763
MemoryTransport or some other similar virtual filesystem.
1765
This is the backing transport (if any) of the server returned by
1766
get_url and get_readonly_url.
1717
This will never be a smart protocol.
1768
1718
:param relpath: provides for clients to get a path relative to the base
1769
1719
url. These should only be downwards relative, not upwards.
1772
1721
base = self.get_vfs_only_server().get_url()
1773
1722
return self._adjust_url(base, relpath)
1775
1724
def _make_test_root(self):
1776
1725
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1778
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1779
TestCaseWithMemoryTransport.TEST_ROOT = root
1729
root = u'test%04d.tmp' % i
1733
if e.errno == errno.EEXIST:
1738
# successfully created
1739
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1781
1741
# make a fake bzr directory there to prevent any tests propagating
1782
1742
# up onto the source directory's real branch
1783
bzrdir.BzrDir.create_standalone_workingtree(root)
1785
# The same directory is used by all tests, and we're not specifically
1786
# told when all tests are finished. This will do.
1787
atexit.register(_rmtree_temp_dir, root)
1743
bzrdir.BzrDir.create_standalone_workingtree(
1744
TestCaseWithMemoryTransport.TEST_ROOT)
1789
1746
def makeAndChdirToTestDir(self):
1790
1747
"""Create a temporary directories for this one test.
1820
1780
raise TestSkipped("Format %s is not initializable." % format)
1822
1782
def make_repository(self, relpath, shared=False, format=None):
1823
"""Create a repository on our default transport at relpath.
1825
Note that relpath must be a relative path, not a full url.
1827
# FIXME: If you create a remoterepository this returns the underlying
1828
# real format, which is incorrect. Actually we should make sure that
1829
# RemoteBzrDir returns a RemoteRepository.
1830
# maybe mbp 20070410
1783
"""Create a repository on our default transport at relpath."""
1831
1784
made_control = self.make_bzrdir(relpath, format=format)
1832
1785
return made_control.create_repository(shared=shared)
1864
1817
All test cases create their own directory within that. If the
1865
1818
tests complete successfully, the directory is removed.
1867
:ivar test_base_dir: The path of the top-level directory for this
1868
test, which contains a home directory and a work directory.
1870
:ivar test_home_dir: An initially empty directory under test_base_dir
1871
which is used as $HOME for this test.
1873
:ivar test_dir: A directory under test_base_dir used as the current
1874
directory when the test proper is run.
1820
InTempDir is an old alias for FunctionalTestCase.
1877
1823
OVERRIDE_PYTHON = 'python'
1824
use_numbered_dirs = False
1879
1826
def check_file_contents(self, filename, expect):
1880
1827
self.log("check contents of file %s" % filename)
1890
1837
For TestCaseInTempDir we create a temporary directory based on the test
1891
1838
name and then create two subdirs - test and home under it.
1893
# create a directory within the top level test directory
1894
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1895
# now create test and home directories within this dir
1896
self.test_base_dir = candidate_dir
1897
self.test_home_dir = self.test_base_dir + '/home'
1898
os.mkdir(self.test_home_dir)
1899
self.test_dir = self.test_base_dir + '/work'
1900
os.mkdir(self.test_dir)
1901
os.chdir(self.test_dir)
1902
# put name of test inside
1903
f = file(self.test_base_dir + '/name', 'w')
1840
if self.use_numbered_dirs: # strongly recommended on Windows
1841
# due the path length limitation (260 ch.)
1842
candidate_dir = '%s/%dK/%05d' % (self.TEST_ROOT,
1843
int(self.number/1000),
1845
os.makedirs(candidate_dir)
1846
self.test_home_dir = candidate_dir + '/home'
1847
os.mkdir(self.test_home_dir)
1848
self.test_dir = candidate_dir + '/work'
1849
os.mkdir(self.test_dir)
1850
os.chdir(self.test_dir)
1851
# put name of test inside
1852
f = file(candidate_dir + '/name', 'w')
1905
1853
f.write(self.id())
1908
self.addCleanup(self.deleteTestDir)
1910
def deleteTestDir(self):
1911
os.chdir(self.TEST_ROOT)
1912
_rmtree_temp_dir(self.test_base_dir)
1857
# shorten the name, to avoid test failures due to path length
1858
short_id = self.id().replace('bzrlib.tests.', '') \
1859
.replace('__main__.', '')[-100:]
1860
# it's possible the same test class is run several times for
1861
# parameterized tests, so make sure the names don't collide.
1865
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1867
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1868
if os.path.exists(candidate_dir):
1872
os.mkdir(candidate_dir)
1873
self.test_home_dir = candidate_dir + '/home'
1874
os.mkdir(self.test_home_dir)
1875
self.test_dir = candidate_dir + '/work'
1876
os.mkdir(self.test_dir)
1877
os.chdir(self.test_dir)
1914
1880
def build_tree(self, shape, line_endings='binary', transport=None):
1915
1881
"""Build a test tree according to a pattern.
1920
1886
This assumes that all the elements in the tree being built are new.
1922
1888
This doesn't add anything to a branch.
1924
1889
:param line_endings: Either 'binary' or 'native'
1925
in binary mode, exact contents are written in native mode, the
1926
line endings match the default platform endings.
1927
:param transport: A transport to write to, for building trees on VFS's.
1928
If the transport is readonly or None, "." is opened automatically.
1890
in binary mode, exact contents are written
1891
in native mode, the line endings match the
1892
default platform endings.
1894
:param transport: A transport to write to, for building trees on
1895
VFS's. If the transport is readonly or None,
1896
"." is opened automatically.
1931
1898
# It's OK to just create them using forward slashes on windows.
1932
1899
if transport is None or transport.is_readonly():
1960
1928
self.assertEqualDiff(content, s)
1962
1930
def failUnlessExists(self, path):
1963
"""Fail unless path or paths, which may be abs or relative, exist."""
1964
if not isinstance(path, basestring):
1966
self.failUnlessExists(p)
1968
self.failUnless(osutils.lexists(path),path+" does not exist")
1931
"""Fail unless path, which may be abs or relative, exists."""
1932
self.failUnless(osutils.lexists(path),path+" does not exist")
1970
1934
def failIfExists(self, path):
1971
"""Fail if path or paths, which may be abs or relative, exist."""
1972
if not isinstance(path, basestring):
1974
self.failIfExists(p)
1976
self.failIf(osutils.lexists(path),path+" exists")
1978
def assertInWorkingTree(self,path,root_path='.',tree=None):
1979
"""Assert whether path or paths are in the WorkingTree"""
1981
tree = workingtree.WorkingTree.open(root_path)
1982
if not isinstance(path, basestring):
1984
self.assertInWorkingTree(p,tree=tree)
1986
self.assertIsNot(tree.path2id(path), None,
1987
path+' not in working tree.')
1989
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
1990
"""Assert whether path or paths are not in the WorkingTree"""
1992
tree = workingtree.WorkingTree.open(root_path)
1993
if not isinstance(path, basestring):
1995
self.assertNotInWorkingTree(p,tree=tree)
1997
self.assertIs(tree.path2id(path), None, path+' in working tree.')
1935
"""Fail if path, which may be abs or relative, exists."""
1936
self.failIf(osutils.lexists(path),path+" exists")
2000
1939
class TestCaseWithTransport(TestCaseInTempDir):
2102
2041
self.transport_readonly_server = HttpServer
2105
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2106
random_order=False):
2107
"""Create a test suite by filtering another one.
2109
:param suite: the source suite
2110
:param pattern: pattern that names must match
2111
:param exclude_pattern: pattern that names must not match, if any
2112
:param random_order: if True, tests in the new suite will be put in
2114
:returns: the newly created suite
2116
return sort_suite_by_re(suite, pattern, exclude_pattern,
2117
random_order, False)
2120
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2121
random_order=False, append_rest=True):
2122
"""Create a test suite by sorting another one.
2124
:param suite: the source suite
2125
:param pattern: pattern that names must match in order to go
2126
first in the new suite
2127
:param exclude_pattern: pattern that names must not match, if any
2128
:param random_order: if True, tests in the new suite will be put in
2130
:param append_rest: if False, pattern is a strict filter and not
2131
just an ordering directive
2132
:returns: the newly created suite
2044
def filter_suite_by_re(suite, pattern):
2045
result = TestUtil.TestSuite()
2046
filter_re = re.compile(pattern)
2047
for test in iter_suite_tests(suite):
2048
if filter_re.search(test.id()):
2049
result.addTest(test)
2053
def sort_suite_by_re(suite, pattern):
2136
2056
filter_re = re.compile(pattern)
2137
if exclude_pattern is not None:
2138
exclude_re = re.compile(exclude_pattern)
2139
2057
for test in iter_suite_tests(suite):
2141
if exclude_pattern is None or not exclude_re.search(test_id):
2142
if filter_re.search(test_id):
2147
random.shuffle(first)
2148
random.shuffle(second)
2058
if filter_re.search(test.id()):
2149
2062
return TestUtil.TestSuite(first + second)
2152
2065
def run_suite(suite, name='test', verbose=False, pattern=".*",
2153
stop_on_failure=False,
2066
stop_on_failure=False, keep_output=False,
2154
2067
transport=None, lsprof_timed=None, bench_history=None,
2155
2068
matching_tests_first=None,
2158
exclude_pattern=None,
2069
numbered_dirs=None):
2070
use_numbered_dirs = bool(numbered_dirs)
2160
2072
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2073
if numbered_dirs is not None:
2074
TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
2165
2079
runner = TextTestRunner(stream=sys.stdout,
2166
2080
descriptions=0,
2167
2081
verbosity=verbosity,
2082
keep_output=keep_output,
2168
2083
bench_history=bench_history,
2169
list_only=list_only,
2084
use_numbered_dirs=use_numbered_dirs,
2171
2086
runner.stop_on_failure=stop_on_failure
2172
# Initialise the random number generator and display the seed used.
2173
# We convert the seed to a long to make it reuseable across invocations.
2174
random_order = False
2175
if random_seed is not None:
2177
if random_seed == "now":
2178
random_seed = long(time.time())
2180
# Convert the seed to a long if we can
2182
random_seed = long(random_seed)
2185
runner.stream.writeln("Randomizing test order using seed %s\n" %
2187
random.seed(random_seed)
2188
# Customise the list of tests if requested
2189
if pattern != '.*' or exclude_pattern is not None or random_order:
2190
2088
if matching_tests_first:
2191
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2089
suite = sort_suite_by_re(suite, pattern)
2194
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
2091
suite = filter_suite_by_re(suite, pattern)
2196
2092
result = runner.run(suite)
2197
2093
return result.wasSuccessful()
2200
2096
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
2201
2098
transport=None,
2202
2099
test_suite_factory=None,
2203
2100
lsprof_timed=None,
2204
2101
bench_history=None,
2205
2102
matching_tests_first=None,
2208
exclude_pattern=None):
2103
numbered_dirs=None):
2209
2104
"""Run the whole test suite under the enhanced runner"""
2210
2105
# XXX: Very ugly way to do this...
2211
2106
# Disable warning about old formats because we don't want it to disturb
2225
2120
suite = test_suite_factory()
2226
2121
return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
2227
stop_on_failure=stop_on_failure,
2122
stop_on_failure=stop_on_failure, keep_output=keep_output,
2228
2123
transport=transport,
2229
2124
lsprof_timed=lsprof_timed,
2230
2125
bench_history=bench_history,
2231
2126
matching_tests_first=matching_tests_first,
2232
list_only=list_only,
2233
random_seed=random_seed,
2234
exclude_pattern=exclude_pattern)
2127
numbered_dirs=numbered_dirs)
2236
2129
default_transport = old_transport
2243
2136
suite on a global basis, but it is not encouraged.
2245
2138
testmod_names = [
2246
'bzrlib.util.tests.test_bencode',
2247
2139
'bzrlib.tests.test_ancestry',
2248
2140
'bzrlib.tests.test_annotate',
2249
2141
'bzrlib.tests.test_api',
2250
2142
'bzrlib.tests.test_atomicfile',
2251
2143
'bzrlib.tests.test_bad_files',
2252
2144
'bzrlib.tests.test_branch',
2253
'bzrlib.tests.test_branchbuilder',
2254
'bzrlib.tests.test_bugtracker',
2255
2145
'bzrlib.tests.test_bundle',
2256
2146
'bzrlib.tests.test_bzrdir',
2257
2147
'bzrlib.tests.test_cache_utf8',
2260
2150
'bzrlib.tests.test_commit_merge',
2261
2151
'bzrlib.tests.test_config',
2262
2152
'bzrlib.tests.test_conflicts',
2263
'bzrlib.tests.test_pack',
2264
'bzrlib.tests.test_counted_lock',
2265
2153
'bzrlib.tests.test_decorators',
2266
2154
'bzrlib.tests.test_delta',
2267
'bzrlib.tests.test_deprecated_graph',
2268
2155
'bzrlib.tests.test_diff',
2269
2156
'bzrlib.tests.test_dirstate',
2157
'bzrlib.tests.test_doc_generate',
2270
2158
'bzrlib.tests.test_errors',
2271
2159
'bzrlib.tests.test_escaped_store',
2272
2160
'bzrlib.tests.test_extract',
2278
2166
'bzrlib.tests.test_gpg',
2279
2167
'bzrlib.tests.test_graph',
2280
2168
'bzrlib.tests.test_hashcache',
2281
'bzrlib.tests.test_help',
2282
'bzrlib.tests.test_hooks',
2283
2169
'bzrlib.tests.test_http',
2284
2170
'bzrlib.tests.test_http_response',
2285
2171
'bzrlib.tests.test_https_ca_bundle',
2286
2172
'bzrlib.tests.test_identitymap',
2287
2173
'bzrlib.tests.test_ignores',
2288
'bzrlib.tests.test_info',
2289
2174
'bzrlib.tests.test_inv',
2290
2175
'bzrlib.tests.test_knit',
2291
2176
'bzrlib.tests.test_lazy_import',
2293
2178
'bzrlib.tests.test_lockdir',
2294
2179
'bzrlib.tests.test_lockable_files',
2295
2180
'bzrlib.tests.test_log',
2296
'bzrlib.tests.test_lsprof',
2297
2181
'bzrlib.tests.test_memorytree',
2298
2182
'bzrlib.tests.test_merge',
2299
2183
'bzrlib.tests.test_merge3',
2312
2196
'bzrlib.tests.test_progress',
2313
2197
'bzrlib.tests.test_reconcile',
2314
2198
'bzrlib.tests.test_registry',
2315
'bzrlib.tests.test_remote',
2316
2199
'bzrlib.tests.test_repository',
2317
2200
'bzrlib.tests.test_revert',
2318
2201
'bzrlib.tests.test_revision',
2323
2206
'bzrlib.tests.test_selftest',
2324
2207
'bzrlib.tests.test_setup',
2325
2208
'bzrlib.tests.test_sftp_transport',
2326
'bzrlib.tests.test_smart',
2327
2209
'bzrlib.tests.test_smart_add',
2328
2210
'bzrlib.tests.test_smart_transport',
2329
'bzrlib.tests.test_smtp_connection',
2330
2211
'bzrlib.tests.test_source',
2331
2212
'bzrlib.tests.test_ssh_transport',
2332
2213
'bzrlib.tests.test_status',
2367
2248
suite = TestUtil.TestSuite()
2368
2249
loader = TestUtil.TestLoader()
2369
2250
suite.addTest(loader.loadTestsFromModuleNames(testmod_names))
2370
from bzrlib.tests.test_transport_implementations import TransportTestProviderAdapter
2251
from bzrlib.transport import TransportTestProviderAdapter
2371
2252
adapter = TransportTestProviderAdapter()
2372
2253
adapt_modules(test_transport_implementations, adapter, loader, suite)
2373
2254
for package in packages_to_test():
2408
2289
def _rmtree_temp_dir(dirname):
2409
# If LANG=C we probably have created some bogus paths
2410
# which rmtree(unicode) will fail to delete
2411
# so make sure we are using rmtree(str) to delete everything
2412
# except on win32, where rmtree(str) will fail
2413
# since it doesn't have the property of byte-stream paths
2414
# (they are either ascii or mbcs)
2415
if sys.platform == 'win32':
2416
# make sure we are using the unicode win32 api
2417
dirname = unicode(dirname)
2419
dirname = dirname.encode(sys.getfilesystemencoding())
2421
2291
osutils.rmtree(dirname)
2422
2292
except OSError, e:
2301
def clean_selftest_output(root=None, quiet=False):
2302
"""Remove all selftest output directories from root directory.
2304
:param root: root directory for clean
2305
(if ommitted or None then clean current directory).
2306
:param quiet: suppress report about deleting directories
2311
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2314
for i in os.listdir(root):
2315
if os.path.isdir(i) and re_dir.match(i):
2317
print 'delete directory:', i
2431
2321
class Feature(object):
2432
2322
"""An operating system Feature."""
2454
2344
if getattr(self, 'feature_name', None):
2455
2345
return self.feature_name()
2456
2346
return self.__class__.__name__
2459
class TestScenarioApplier(object):
2460
"""A tool to apply scenarios to tests."""
2462
def adapt(self, test):
2463
"""Return a TestSuite containing a copy of test for each scenario."""
2464
result = unittest.TestSuite()
2465
for scenario in self.scenarios:
2466
result.addTest(self.adapt_test_to_scenario(test, scenario))
2469
def adapt_test_to_scenario(self, test, scenario):
2470
"""Copy test and apply scenario to it.
2472
:param test: A test to adapt.
2473
:param scenario: A tuple describing the scenarion.
2474
The first element of the tuple is the new test id.
2475
The second element is a dict containing attributes to set on the
2477
:return: The adapted test.
2479
from copy import deepcopy
2480
new_test = deepcopy(test)
2481
for name, value in scenario[1].items():
2482
setattr(new_test, name, value)
2483
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2484
new_test.id = lambda: new_id