354
350
{'bzrdir_format': formats[1]._matchingbzrdir,
355
351
'transport_readonly_server': 'b',
356
352
'transport_server': 'a',
357
'workingtree_format': formats[1]})],
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
361
369
class TestTreeScenarios(tests.TestCase):
363
371
def test_scenarios(self):
364
372
# the tree implementation scenario generator is meant to setup one
365
# instance for each working tree format, and one additional instance
373
# instance for each working tree format, one additional instance
366
374
# that will use the default wt format, but create a revision tree for
367
# the tests. this means that the wt ones should have the
368
# workingtree_to_test_tree attribute set to 'return_parameter' and the
369
# revision one set to revision_tree_from_workingtree.
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
371
381
from bzrlib.tests.per_tree import (
372
382
_dirstate_tree_from_workingtree,
867
test = self.get_passing_test()
868
result.startTest(test)
869
prefix = len(result_stream.getvalue())
870
# the err parameter has the shape:
871
# (class, exception object, traceback)
872
# KnownFailures dont get their tracebacks shown though, so we
874
err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
875
result.report_known_failure(test, err)
876
output = result_stream.getvalue()[prefix:]
877
lines = output.splitlines()
878
self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
879
if sys.version_info > (2, 7):
880
self.expectFailure("_ExpectedFailure on 2.7 loses the message",
881
self.assertNotEqual, lines[1], ' ')
882
self.assertEqual(lines[1], ' foo')
883
self.assertEqual(2, len(lines))
890
_get_test("test_xfail").run(result)
891
self.assertContainsRe(result_stream.getvalue(),
892
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
893
"\\s*(?:Text attachment: )?reason"
885
898
def get_passing_test(self):
886
899
"""Return a test object that can't be run usefully."""
1504
1507
transport_server = memory.MemoryServer()
1505
1508
transport_server.start_server()
1506
1509
self.addCleanup(transport_server.stop_server)
1507
t = transport.get_transport(transport_server.get_url())
1508
bzrdir.BzrDir.create(t.base)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1509
1512
self.assertRaises(errors.BzrError,
1510
bzrdir.BzrDir.open_from_transport, t)
1513
controldir.ControlDir.open_from_transport, t)
1511
1514
# But if we declare this as safe, we can open the bzrdir.
1512
1515
self.permit_url(t.base)
1513
1516
self._bzr_selftest_roots.append(t.base)
1514
bzrdir.BzrDir.open_from_transport(t)
1517
controldir.ControlDir.open_from_transport(t)
1516
1519
def test_requireFeature_available(self):
1517
1520
"""self.requireFeature(available) is a no-op."""
1518
class Available(tests.Feature):
1521
class Available(features.Feature):
1519
1522
def _probe(self):return True
1520
1523
feature = Available()
1521
1524
self.requireFeature(feature)
1523
1526
def test_requireFeature_unavailable(self):
1524
1527
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1525
class Unavailable(tests.Feature):
1528
class Unavailable(features.Feature):
1526
1529
def _probe(self):return False
1527
1530
feature = Unavailable()
1528
1531
self.assertRaises(tests.UnavailableFeature,
1676
1684
class Test(tests.TestCase):
1678
1686
def setUp(self):
1679
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1680
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1682
1690
def test_value(self):
1683
1691
self.assertEqual('original', self.orig)
1684
1692
self.assertEqual('modified', obj.test_attr)
1686
test = Test('test_value')
1687
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1688
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1690
1729
def test_recordCalls(self):
1691
1730
from bzrlib.tests import test_selftest
1692
1731
calls = self.recordCalls(
1693
1732
test_selftest, '_add_numbers')
1694
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1696
self.assertEquals(calls, [((2, 10), {})])
1735
self.assertEqual(calls, [((2, 10), {})])
1699
1738
def _add_numbers(a, b):
1703
class _MissingFeature(tests.Feature):
1742
class _MissingFeature(features.Feature):
1704
1743
def _probe(self):
1706
1745
missing_feature = _MissingFeature()
2626
2665
self.assertEqual('bzr: interrupted\n', result[1])
2629
class TestFeature(tests.TestCase):
2631
def test_caching(self):
2632
"""Feature._probe is called by the feature at most once."""
2633
class InstrumentedFeature(tests.Feature):
2635
super(InstrumentedFeature, self).__init__()
2638
self.calls.append('_probe')
2640
feature = InstrumentedFeature()
2642
self.assertEqual(['_probe'], feature.calls)
2644
self.assertEqual(['_probe'], feature.calls)
2646
def test_named_str(self):
2647
"""Feature.__str__ should thunk to feature_name()."""
2648
class NamedFeature(tests.Feature):
2649
def feature_name(self):
2651
feature = NamedFeature()
2652
self.assertEqual('symlinks', str(feature))
2654
def test_default_str(self):
2655
"""Feature.__str__ should default to __class__.__name__."""
2656
class NamedFeature(tests.Feature):
2658
feature = NamedFeature()
2659
self.assertEqual('NamedFeature', str(feature))
2662
class TestUnavailableFeature(tests.TestCase):
2664
def test_access_feature(self):
2665
feature = tests.Feature()
2666
exception = tests.UnavailableFeature(feature)
2667
self.assertIs(feature, exception.args[0])
2670
simple_thunk_feature = tests._CompatabilityThunkFeature(
2671
deprecated_in((2, 1, 0)),
2672
'bzrlib.tests.test_selftest',
2673
'simple_thunk_feature','UnicodeFilename',
2674
replacement_module='bzrlib.tests'
2677
class Test_CompatibilityFeature(tests.TestCase):
2679
def test_does_thunk(self):
2680
res = self.callDeprecated(
2681
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2682
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2683
simple_thunk_feature.available)
2684
self.assertEqual(tests.UnicodeFilename.available(), res)
2687
class TestModuleAvailableFeature(tests.TestCase):
2689
def test_available_module(self):
2690
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2691
self.assertEqual('bzrlib.tests', feature.module_name)
2692
self.assertEqual('bzrlib.tests', str(feature))
2693
self.assertTrue(feature.available())
2694
self.assertIs(tests, feature.module)
2696
def test_unavailable_module(self):
2697
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2698
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2699
self.assertFalse(feature.available())
2700
self.assertIs(None, feature.module)
2703
2668
class TestSelftestFiltering(tests.TestCase):
2705
2670
def setUp(self):
2706
tests.TestCase.setUp(self)
2671
super(TestSelftestFiltering, self).setUp()
2707
2672
self.suite = TestUtil.TestSuite()
2708
2673
self.loader = TestUtil.TestLoader()
2709
2674
self.suite.addTest(self.loader.loadTestsFromModule(
3169
3134
loader = self._create_loader('bzrlib.tests.test_samp')
3171
3136
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3172
self.assertEquals(test_list, _test_ids(suite))
3137
self.assertEqual(test_list, _test_ids(suite))
3174
3139
def test_load_tests_inside_module(self):
3175
3140
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3176
3141
loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
3178
3143
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3179
self.assertEquals(test_list, _test_ids(suite))
3144
self.assertEqual(test_list, _test_ids(suite))
3181
3146
def test_exclude_tests(self):
3182
3147
test_list = ['bogus']
3183
3148
loader = self._create_loader('bogus')
3185
3150
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3186
self.assertEquals([], _test_ids(suite))
3151
self.assertEqual([], _test_ids(suite))
3189
3154
class TestTestPrefixRegistry(tests.TestCase):
3223
3188
def test_predefined_prefixes(self):
3224
3189
tpr = tests.test_prefix_alias_registry
3225
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
3226
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
3227
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
3228
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
3229
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3230
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3190
self.assertEqual('bzrlib', tpr.resolve_alias('bzrlib'))
3191
self.assertEqual('bzrlib.doc', tpr.resolve_alias('bd'))
3192
self.assertEqual('bzrlib.utils', tpr.resolve_alias('bu'))
3193
self.assertEqual('bzrlib.tests', tpr.resolve_alias('bt'))
3194
self.assertEqual('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3195
self.assertEqual('bzrlib.plugins', tpr.resolve_alias('bp'))
3233
3198
class TestThreadLeakDetection(tests.TestCase):
3404
3369
self.assertLength(1, calls)
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3438
"""Check a test case still alive after being run emits a warning"""
3440
class Test(tests.TestCase):
3441
def test_pass(self):
3443
def test_self_ref(self):
3444
self.also_self = self.test_self_ref
3445
def test_skip(self):
3446
self.skip("Don't need")
3448
def _get_suite(self):
3449
return TestUtil.TestSuite([
3450
self.Test("test_pass"),
3451
self.Test("test_self_ref"),
3452
self.Test("test_skip"),
3455
def _run_selftest_with_suite(self, **kwargs):
3456
old_flags = tests.selftest_debug_flags
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3458
gc_on = gc.isenabled()
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3467
tests.selftest_debug_flags = old_flags
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3472
def test_testsuite(self):
3473
self._run_selftest_with_suite()
3475
def test_pattern(self):
3476
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3477
self.assertNotContainsRe(out, "test_skip")
3479
def test_exclude_pattern(self):
3480
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3481
self.assertNotContainsRe(out, "test_skip")
3483
def test_random_seed(self):
3484
self._run_selftest_with_suite(random_seed="now")
3486
def test_matching_tests_first(self):
3487
self._run_selftest_with_suite(matching_tests_first=True,
3488
pattern="test_self_ref$")
3490
def test_starting_with_and_exclude(self):
3491
out = self._run_selftest_with_suite(starting_with=["bt."],
3492
exclude_pattern="test_skip$")
3493
self.assertNotContainsRe(out, "test_skip")
3495
def test_additonal_decorator(self):
3496
out = self._run_selftest_with_suite(
3497
suite_decorators=[tests.TestDecorator])
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted with subunit"""
3503
_test_needs_features = [features.subunit]
3505
def _run_selftest_with_suite(self, **kwargs):
3506
return TestUncollectedWarnings._run_selftest_with_suite(self,
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3511
"""Check warnings from tests staying alive are emitted when forking"""
3407
3514
class TestEnvironHandling(tests.TestCase):
3409
3516
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3414
3521
def test_me(self):
3415
3522
# The first call save the 42 value
3416
3523
self.overrideEnv('MYVAR', None)
3417
self.assertEquals(None, os.environ.get('MYVAR'))
3524
self.assertEqual(None, os.environ.get('MYVAR'))
3418
3525
# Make sure we can call it twice
3419
3526
self.overrideEnv('MYVAR', None)
3420
self.assertEquals(None, os.environ.get('MYVAR'))
3527
self.assertEqual(None, os.environ.get('MYVAR'))
3421
3528
output = StringIO()
3422
3529
result = tests.TextTestResult(output, 0, 1)
3423
3530
Test('test_me').run(result)
3424
3531
if not result.wasStrictlySuccessful():
3425
3532
self.fail(output.getvalue())
3426
3533
# We get our value back
3427
self.assertEquals('42', os.environ.get('MYVAR'))
3534
self.assertEqual('42', os.environ.get('MYVAR'))
3430
3537
class TestIsolatedEnv(tests.TestCase):
3446
3553
# Make sure we know the definition of BZR_HOME: not part of os.environ
3447
3554
# for tests.TestCase.
3448
3555
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3449
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3556
self.assertEqual(None, tests.isolated_environ['BZR_HOME'])
3450
3557
# Being part of isolated_environ, BZR_HOME should not appear here
3451
3558
self.assertFalse('BZR_HOME' in os.environ)
3452
3559
# Make sure we know the definition of LINES: part of os.environ for
3453
3560
# tests.TestCase
3454
3561
self.assertTrue('LINES' in tests.isolated_environ)
3455
self.assertEquals('25', tests.isolated_environ['LINES'])
3456
self.assertEquals('25', os.environ['LINES'])
3562
self.assertEqual('25', tests.isolated_environ['LINES'])
3563
self.assertEqual('25', os.environ['LINES'])
3458
3565
def test_injecting_unknown_variable(self):
3459
3566
# BZR_HOME is known to be absent from os.environ
3460
3567
test = self.ScratchMonkey('test_me')
3461
3568
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3462
self.assertEquals('foo', os.environ['BZR_HOME'])
3569
self.assertEqual('foo', os.environ['BZR_HOME'])
3463
3570
tests.restore_os_environ(test)
3464
3571
self.assertFalse('BZR_HOME' in os.environ)