238
209
'transport_readonly_server': 'b',
239
210
'transport_server': 'a',
240
211
'vfs_transport_factory': 'v'})],
244
class TestRepositoryParameterisation(TestCase):
245
"""A group of tests that test the repository implementation test adapter."""
247
def test_setting_vfs_transport(self):
248
"""The vfs_transport_factory can be set optionally."""
249
from bzrlib.tests.repository_implementations import formats_to_scenarios
250
scenarios = formats_to_scenarios(
251
[("(one)", "a", "b"), ("(two)", "c", "d")],
254
vfs_transport_factory="vfs")
257
{'bzrdir_format': 'b',
258
'repository_format': 'a',
259
'transport_readonly_server': None,
260
'transport_server': None,
261
'vfs_transport_factory': 'vfs'}),
263
{'bzrdir_format': 'd',
264
'repository_format': 'c',
265
'transport_readonly_server': None,
266
'transport_server': None,
267
'vfs_transport_factory': 'vfs'})],
215
class TestRepositoryScenarios(tests.TestCase):
270
217
def test_formats_to_scenarios(self):
271
"""The adapter can generate all the scenarios needed."""
272
from bzrlib.tests.repository_implementations import formats_to_scenarios
273
formats = [("(c)", "c", "C"), ("(d)", 1, "D")]
218
from bzrlib.tests.per_repository import formats_to_scenarios
219
formats = [("(c)", remote.RemoteRepositoryFormat()),
220
("(d)", repository.format_registry.get(
221
'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
274
222
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
276
224
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
277
225
vfs_transport_factory="vfs")
278
# no_vfs generate scenarios without vfs_transport_factor
281
{'bzrdir_format': 'C',
282
'repository_format': 'c',
226
# no_vfs generate scenarios without vfs_transport_factory
228
('RemoteRepositoryFormat(c)',
229
{'bzrdir_format': remote.RemoteBzrDirFormat(),
230
'repository_format': remote.RemoteRepositoryFormat(),
283
231
'transport_readonly_server': 'readonly',
284
232
'transport_server': 'server'}),
286
{'bzrdir_format': 'D',
287
'repository_format': 1,
233
('RepositoryFormat2a(d)',
234
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
235
'repository_format': groupcompress_repo.RepositoryFormat2a(),
288
236
'transport_readonly_server': 'readonly',
289
'transport_server': 'server'})],
237
'transport_server': 'server'})]
238
self.assertEqual(expected, no_vfs_scenarios)
291
239
self.assertEqual([
293
{'bzrdir_format': 'C',
294
'repository_format': 'c',
240
('RemoteRepositoryFormat(c)',
241
{'bzrdir_format': remote.RemoteBzrDirFormat(),
242
'repository_format': remote.RemoteRepositoryFormat(),
295
243
'transport_readonly_server': 'readonly',
296
244
'transport_server': 'server',
297
245
'vfs_transport_factory': 'vfs'}),
299
{'bzrdir_format': 'D',
300
'repository_format': 1,
246
('RepositoryFormat2a(d)',
247
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
248
'repository_format': groupcompress_repo.RepositoryFormat2a(),
301
249
'transport_readonly_server': 'readonly',
302
250
'transport_server': 'server',
303
251
'vfs_transport_factory': 'vfs'})],
307
class TestTestScenarioApplier(TestCase):
255
class TestTestScenarioApplication(tests.TestCase):
308
256
"""Tests for the test adaption facilities."""
310
def test_adapt_applies_scenarios(self):
311
from bzrlib.tests.repository_implementations import TestScenarioApplier
312
input_test = TestTestScenarioApplier("test_adapt_test_to_scenario")
313
adapter = TestScenarioApplier()
314
adapter.scenarios = [("1", "dict"), ("2", "settings")]
316
def capture_call(test, scenario):
317
calls.append((test, scenario))
319
adapter.adapt_test_to_scenario = capture_call
320
adapter.adapt(input_test)
321
self.assertEqual([(input_test, ("1", "dict")),
322
(input_test, ("2", "settings"))], calls)
324
def test_adapt_test_to_scenario(self):
325
from bzrlib.tests.repository_implementations import TestScenarioApplier
326
input_test = TestTestScenarioApplier("test_adapt_test_to_scenario")
327
adapter = TestScenarioApplier()
258
def test_apply_scenario(self):
259
from bzrlib.tests import apply_scenario
260
input_test = TestTestScenarioApplication("test_apply_scenario")
328
261
# setup two adapted tests
329
adapted_test1 = adapter.adapt_test_to_scenario(input_test,
262
adapted_test1 = apply_scenario(input_test,
331
264
{"bzrdir_format":"bzr_format",
332
265
"repository_format":"repo_fmt",
333
266
"transport_server":"transport_server",
334
267
"transport_readonly_server":"readonly-server"}))
335
adapted_test2 = adapter.adapt_test_to_scenario(input_test,
268
adapted_test2 = apply_scenario(input_test,
336
269
("new id 2", {"bzrdir_format":None}))
337
270
# input_test should have been altered.
338
271
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
339
# the new tests are mutually incompatible, ensuring it has
272
# the new tests are mutually incompatible, ensuring it has
340
273
# made new ones, and unspecified elements in the scenario
341
274
# should not have been altered.
342
275
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
345
278
self.assertEqual("readonly-server",
346
279
adapted_test1.transport_readonly_server)
347
280
self.assertEqual(
348
"bzrlib.tests.test_selftest.TestTestScenarioApplier."
349
"test_adapt_test_to_scenario(new id)",
281
"bzrlib.tests.test_selftest.TestTestScenarioApplication."
282
"test_apply_scenario(new id)",
350
283
adapted_test1.id())
351
284
self.assertEqual(None, adapted_test2.bzrdir_format)
352
285
self.assertEqual(
353
"bzrlib.tests.test_selftest.TestTestScenarioApplier."
354
"test_adapt_test_to_scenario(new id 2)",
286
"bzrlib.tests.test_selftest.TestTestScenarioApplication."
287
"test_apply_scenario(new id 2)",
355
288
adapted_test2.id())
358
class TestInterRepositoryProviderAdapter(TestCase):
359
"""A group of tests that test the InterRepository test adapter."""
291
class TestInterRepositoryScenarios(tests.TestCase):
361
def test_adapted_tests(self):
293
def test_scenarios(self):
362
294
# check that constructor parameters are passed through to the adapted
364
from bzrlib.tests.interrepository_implementations import \
365
InterRepositoryTestProviderAdapter
296
from bzrlib.tests.per_interrepository import make_scenarios
368
formats = [(str, "C1", "C2"), (int, "D1", "D2")]
369
adapter = InterRepositoryTestProviderAdapter(server1, server2, formats)
299
formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
300
scenarios = make_scenarios(server1, server2, formats)
370
301
self.assertEqual([
372
{'interrepo_class': str,
373
'repository_format': 'C1',
303
{'repository_format': 'C1',
374
304
'repository_format_to': 'C2',
375
305
'transport_readonly_server': 'b',
376
306
'transport_server': 'a'}),
378
{'interrepo_class': int,
379
'repository_format': 'D1',
308
{'repository_format': 'D1',
380
309
'repository_format_to': 'D2',
381
310
'transport_readonly_server': 'b',
382
311
'transport_server': 'a'})],
383
adapter.formats_to_scenarios(formats))
386
class TestWorkingTreeProviderAdapter(TestCase):
387
"""A group of tests that test the workingtree implementation test adapter."""
315
class TestWorkingTreeScenarios(tests.TestCase):
389
317
def test_scenarios(self):
390
318
# check that constructor parameters are passed through to the adapted
392
from bzrlib.tests.workingtree_implementations \
393
import WorkingTreeTestProviderAdapter
320
from bzrlib.tests.per_workingtree import make_scenarios
396
formats = [("c", "C"), ("d", "D")]
397
adapter = WorkingTreeTestProviderAdapter(server1, server2, formats)
323
formats = [workingtree.WorkingTreeFormat2(),
324
workingtree.WorkingTreeFormat3(),]
325
scenarios = make_scenarios(server1, server2, formats)
398
326
self.assertEqual([
400
{'bzrdir_format': 'C',
401
'transport_readonly_server': 'b',
402
'transport_server': 'a',
403
'workingtree_format': 'c'}),
405
{'bzrdir_format': 'D',
406
'transport_readonly_server': 'b',
407
'transport_server': 'a',
408
'workingtree_format': 'd'})],
412
class TestTreeProviderAdapter(TestCase):
413
"""Test the setup of tree_implementation tests."""
415
def test_adapted_tests(self):
416
# the tree implementation adapter is meant to setup one instance for
417
# each working tree format, and one additional instance that will
418
# use the default wt format, but create a revision tree for the tests.
419
# this means that the wt ones should have the workingtree_to_test_tree
420
# attribute set to 'return_parameter' and the revision one set to
421
# revision_tree_from_workingtree.
423
from bzrlib.tests.tree_implementations import (
424
TreeTestProviderAdapter,
327
('WorkingTreeFormat2',
328
{'bzrdir_format': formats[0]._matchingbzrdir,
329
'transport_readonly_server': 'b',
330
'transport_server': 'a',
331
'workingtree_format': formats[0]}),
332
('WorkingTreeFormat3',
333
{'bzrdir_format': formats[1]._matchingbzrdir,
334
'transport_readonly_server': 'b',
335
'transport_server': 'a',
336
'workingtree_format': formats[1]})],
340
class TestTreeScenarios(tests.TestCase):
342
def test_scenarios(self):
343
# the tree implementation scenario generator is meant to setup one
344
# instance for each working tree format, and one additional instance
345
# that will use the default wt format, but create a revision tree for
346
# the tests. this means that the wt ones should have the
347
# workingtree_to_test_tree attribute set to 'return_parameter' and the
348
# revision one set to revision_tree_from_workingtree.
350
from bzrlib.tests.per_tree import (
351
_dirstate_tree_from_workingtree,
425
355
return_parameter,
426
356
revision_tree_from_workingtree
428
from bzrlib.workingtree import WorkingTreeFormat, WorkingTreeFormat3
429
input_test = TestTreeProviderAdapter(
430
"test_adapted_tests")
433
formats = [("c", "C"), ("d", "D")]
434
adapter = TreeTestProviderAdapter(server1, server2, formats)
435
suite = adapter.adapt(input_test)
436
tests = list(iter(suite))
437
# XXX We should not have tests fail as we add more scenarios
439
self.assertEqual(5, len(tests))
440
# this must match the default format setp up in
441
# TreeTestProviderAdapter.adapt
442
default_format = WorkingTreeFormat3
443
self.assertEqual(tests[0].workingtree_format, formats[0][0])
444
self.assertEqual(tests[0].bzrdir_format, formats[0][1])
445
self.assertEqual(tests[0].transport_server, server1)
446
self.assertEqual(tests[0].transport_readonly_server, server2)
447
self.assertEqual(tests[0]._workingtree_to_test_tree, return_parameter)
448
self.assertEqual(tests[1].workingtree_format, formats[1][0])
449
self.assertEqual(tests[1].bzrdir_format, formats[1][1])
450
self.assertEqual(tests[1].transport_server, server1)
451
self.assertEqual(tests[1].transport_readonly_server, server2)
452
self.assertEqual(tests[1]._workingtree_to_test_tree, return_parameter)
453
self.assertIsInstance(tests[2].workingtree_format, default_format)
454
#self.assertEqual(tests[2].bzrdir_format,
455
# default_format._matchingbzrdir)
456
self.assertEqual(tests[2].transport_server, server1)
457
self.assertEqual(tests[2].transport_readonly_server, server2)
458
self.assertEqual(tests[2]._workingtree_to_test_tree,
459
revision_tree_from_workingtree)
462
class TestInterTreeProviderAdapter(TestCase):
360
formats = [workingtree.WorkingTreeFormat2(),
361
workingtree.WorkingTreeFormat3(),]
362
scenarios = make_scenarios(server1, server2, formats)
363
self.assertEqual(7, len(scenarios))
364
default_wt_format = workingtree.WorkingTreeFormat4._default_format
365
wt4_format = workingtree.WorkingTreeFormat4()
366
wt5_format = workingtree.WorkingTreeFormat5()
367
expected_scenarios = [
368
('WorkingTreeFormat2',
369
{'bzrdir_format': formats[0]._matchingbzrdir,
370
'transport_readonly_server': 'b',
371
'transport_server': 'a',
372
'workingtree_format': formats[0],
373
'_workingtree_to_test_tree': return_parameter,
375
('WorkingTreeFormat3',
376
{'bzrdir_format': formats[1]._matchingbzrdir,
377
'transport_readonly_server': 'b',
378
'transport_server': 'a',
379
'workingtree_format': formats[1],
380
'_workingtree_to_test_tree': return_parameter,
383
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
384
'bzrdir_format': default_wt_format._matchingbzrdir,
385
'transport_readonly_server': 'b',
386
'transport_server': 'a',
387
'workingtree_format': default_wt_format,
389
('DirStateRevisionTree,WT4',
390
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
391
'bzrdir_format': wt4_format._matchingbzrdir,
392
'transport_readonly_server': 'b',
393
'transport_server': 'a',
394
'workingtree_format': wt4_format,
396
('DirStateRevisionTree,WT5',
397
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
398
'bzrdir_format': wt5_format._matchingbzrdir,
399
'transport_readonly_server': 'b',
400
'transport_server': 'a',
401
'workingtree_format': wt5_format,
404
{'_workingtree_to_test_tree': preview_tree_pre,
405
'bzrdir_format': default_wt_format._matchingbzrdir,
406
'transport_readonly_server': 'b',
407
'transport_server': 'a',
408
'workingtree_format': default_wt_format}),
410
{'_workingtree_to_test_tree': preview_tree_post,
411
'bzrdir_format': default_wt_format._matchingbzrdir,
412
'transport_readonly_server': 'b',
413
'transport_server': 'a',
414
'workingtree_format': default_wt_format}),
416
self.assertEqual(expected_scenarios, scenarios)
419
class TestInterTreeScenarios(tests.TestCase):
463
420
"""A group of tests that test the InterTreeTestAdapter."""
465
def test_adapted_tests(self):
422
def test_scenarios(self):
466
423
# check that constructor parameters are passed through to the adapted
468
425
# for InterTree tests we want the machinery to bring up two trees in
1293
1272
class _TestException(Exception):
1296
class TestTestCase(TestCase):
1276
class TestTestCase(tests.TestCase):
1297
1277
"""Tests that test the core bzrlib TestCase."""
1279
def test_assertLength_matches_empty(self):
1281
self.assertLength(0, a_list)
1283
def test_assertLength_matches_nonempty(self):
1285
self.assertLength(3, a_list)
1287
def test_assertLength_fails_different(self):
1289
self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1291
def test_assertLength_shows_sequence_in_failure(self):
1293
exception = self.assertRaises(AssertionError, self.assertLength, 2,
1295
self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1298
def test_base_setUp_not_called_causes_failure(self):
1299
class TestCaseWithBrokenSetUp(tests.TestCase):
1301
pass # does not call TestCase.setUp
1304
test = TestCaseWithBrokenSetUp('test_foo')
1305
result = unittest.TestResult()
1307
self.assertFalse(result.wasSuccessful())
1308
self.assertEqual(1, result.testsRun)
1310
def test_base_tearDown_not_called_causes_failure(self):
1311
class TestCaseWithBrokenTearDown(tests.TestCase):
1313
pass # does not call TestCase.tearDown
1316
test = TestCaseWithBrokenTearDown('test_foo')
1317
result = unittest.TestResult()
1319
self.assertFalse(result.wasSuccessful())
1320
self.assertEqual(1, result.testsRun)
1299
1322
def test_debug_flags_sanitised(self):
1300
1323
"""The bzrlib debug flags should be sanitised by setUp."""
1324
if 'allow_debug' in tests.selftest_debug_flags:
1325
raise tests.TestNotApplicable(
1326
'-Eallow_debug option prevents debug flag sanitisation')
1301
1327
# we could set something and run a test that will check
1302
1328
# it gets santised, but this is probably sufficient for now:
1303
1329
# if someone runs the test with -Dsomething it will error.
1304
self.assertEqual(set(), bzrlib.debug.debug_flags)
1331
if self._lock_check_thorough:
1332
flags.add('strict_locks')
1333
self.assertEqual(flags, bzrlib.debug.debug_flags)
1335
def change_selftest_debug_flags(self, new_flags):
1336
orig_selftest_flags = tests.selftest_debug_flags
1337
self.addCleanup(self._restore_selftest_debug_flags, orig_selftest_flags)
1338
tests.selftest_debug_flags = set(new_flags)
1340
def _restore_selftest_debug_flags(self, flags):
1341
tests.selftest_debug_flags = flags
1343
def test_allow_debug_flag(self):
1344
"""The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
1345
sanitised (i.e. cleared) before running a test.
1347
self.change_selftest_debug_flags(set(['allow_debug']))
1348
bzrlib.debug.debug_flags = set(['a-flag'])
1349
class TestThatRecordsFlags(tests.TestCase):
1350
def test_foo(nested_self):
1351
self.flags = set(bzrlib.debug.debug_flags)
1352
test = TestThatRecordsFlags('test_foo')
1353
test.run(self.make_test_result())
1354
flags = set(['a-flag'])
1355
if 'disable_lock_checks' not in tests.selftest_debug_flags:
1356
flags.add('strict_locks')
1357
self.assertEqual(flags, self.flags)
1359
def test_disable_lock_checks(self):
1360
"""The -Edisable_lock_checks flag disables thorough checks."""
1361
class TestThatRecordsFlags(tests.TestCase):
1362
def test_foo(nested_self):
1363
self.flags = set(bzrlib.debug.debug_flags)
1364
self.test_lock_check_thorough = nested_self._lock_check_thorough
1365
self.change_selftest_debug_flags(set())
1366
test = TestThatRecordsFlags('test_foo')
1367
test.run(self.make_test_result())
1368
# By default we do strict lock checking and thorough lock/unlock
1370
self.assertTrue(self.test_lock_check_thorough)
1371
self.assertEqual(set(['strict_locks']), self.flags)
1372
# Now set the disable_lock_checks flag, and show that this changed.
1373
self.change_selftest_debug_flags(set(['disable_lock_checks']))
1374
test = TestThatRecordsFlags('test_foo')
1375
test.run(self.make_test_result())
1376
self.assertFalse(self.test_lock_check_thorough)
1377
self.assertEqual(set(), self.flags)
1379
def test_this_fails_strict_lock_check(self):
1380
class TestThatRecordsFlags(tests.TestCase):
1381
def test_foo(nested_self):
1382
self.flags1 = set(bzrlib.debug.debug_flags)
1383
self.thisFailsStrictLockCheck()
1384
self.flags2 = set(bzrlib.debug.debug_flags)
1385
# Make sure lock checking is active
1386
self.change_selftest_debug_flags(set())
1387
test = TestThatRecordsFlags('test_foo')
1388
test.run(self.make_test_result())
1389
self.assertEqual(set(['strict_locks']), self.flags1)
1390
self.assertEqual(set(), self.flags2)
1392
def test_debug_flags_restored(self):
1393
"""The bzrlib debug flags should be restored to their original state
1394
after the test was run, even if allow_debug is set.
1396
self.change_selftest_debug_flags(set(['allow_debug']))
1397
# Now run a test that modifies debug.debug_flags.
1398
bzrlib.debug.debug_flags = set(['original-state'])
1399
class TestThatModifiesFlags(tests.TestCase):
1401
bzrlib.debug.debug_flags = set(['modified'])
1402
test = TestThatModifiesFlags('test_foo')
1403
test.run(self.make_test_result())
1404
self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags)
1406
def make_test_result(self):
1407
return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1)
1306
1409
def inner_test(self):
1307
1410
# the inner child test
1643
1760
tree.branch.repository.bzrdir.root_transport)
1646
class TestSelftest(TestCase):
1763
class SelfTestHelper:
1765
def run_selftest(self, **kwargs):
1766
"""Run selftest returning its output."""
1768
old_transport = bzrlib.tests.default_transport
1769
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
1770
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
1772
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
1774
bzrlib.tests.default_transport = old_transport
1775
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
1780
class TestSelftest(tests.TestCase, SelfTestHelper):
1647
1781
"""Tests of bzrlib.tests.selftest."""
1649
1783
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
1650
1784
factory_called = []
1652
1786
factory_called.append(True)
1787
return TestUtil.TestSuite()
1654
1788
out = StringIO()
1655
1789
err = StringIO()
1656
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
1790
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
1657
1791
test_suite_factory=factory)
1658
1792
self.assertEqual([True], factory_called)
1661
class TestKnownFailure(TestCase):
1795
"""A test suite factory."""
1796
class Test(tests.TestCase):
1803
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
1805
def test_list_only(self):
1806
output = self.run_selftest(test_suite_factory=self.factory,
1808
self.assertEqual(3, len(output.readlines()))
1810
def test_list_only_filtered(self):
1811
output = self.run_selftest(test_suite_factory=self.factory,
1812
list_only=True, pattern="Test.b")
1813
self.assertEndsWith(output.getvalue(), "Test.b\n")
1814
self.assertLength(1, output.readlines())
1816
def test_list_only_excludes(self):
1817
output = self.run_selftest(test_suite_factory=self.factory,
1818
list_only=True, exclude_pattern="Test.b")
1819
self.assertNotContainsRe("Test.b", output.getvalue())
1820
self.assertLength(2, output.readlines())
1822
def test_random(self):
1823
# test randomising by listing a number of tests.
1824
output_123 = self.run_selftest(test_suite_factory=self.factory,
1825
list_only=True, random_seed="123")
1826
output_234 = self.run_selftest(test_suite_factory=self.factory,
1827
list_only=True, random_seed="234")
1828
self.assertNotEqual(output_123, output_234)
1829
# "Randominzing test order..\n\n
1830
self.assertLength(5, output_123.readlines())
1831
self.assertLength(5, output_234.readlines())
1833
def test_random_reuse_is_same_order(self):
1834
# test randomising by listing a number of tests.
1835
expected = self.run_selftest(test_suite_factory=self.factory,
1836
list_only=True, random_seed="123")
1837
repeated = self.run_selftest(test_suite_factory=self.factory,
1838
list_only=True, random_seed="123")
1839
self.assertEqual(expected.getvalue(), repeated.getvalue())
1841
def test_runner_class(self):
1842
self.requireFeature(SubUnitFeature)
1843
from subunit import ProtocolTestCase
1844
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
1845
test_suite_factory=self.factory)
1846
test = ProtocolTestCase(stream)
1847
result = unittest.TestResult()
1849
self.assertEqual(3, result.testsRun)
1851
def test_starting_with_single_argument(self):
1852
output = self.run_selftest(test_suite_factory=self.factory,
1853
starting_with=['bzrlib.tests.test_selftest.Test.a'],
1855
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
1858
def test_starting_with_multiple_argument(self):
1859
output = self.run_selftest(test_suite_factory=self.factory,
1860
starting_with=['bzrlib.tests.test_selftest.Test.a',
1861
'bzrlib.tests.test_selftest.Test.b'],
1863
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
1864
'bzrlib.tests.test_selftest.Test.b\n',
1867
def check_transport_set(self, transport_server):
1868
captured_transport = []
1869
def seen_transport(a_transport):
1870
captured_transport.append(a_transport)
1871
class Capture(tests.TestCase):
1873
seen_transport(bzrlib.tests.default_transport)
1875
return TestUtil.TestSuite([Capture("a")])
1876
self.run_selftest(transport=transport_server, test_suite_factory=factory)
1877
self.assertEqual(transport_server, captured_transport[0])
1879
def test_transport_sftp(self):
1881
import bzrlib.transport.sftp
1882
except errors.ParamikoNotPresent:
1883
raise tests.TestSkipped("Paramiko not present")
1884
self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
1886
def test_transport_memory(self):
1887
self.check_transport_set(bzrlib.transport.memory.MemoryServer)
1890
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1891
# Does IO: reads test.list
1893
def test_load_list(self):
1894
# Provide a list with one test - this test.
1895
test_id_line = '%s\n' % self.id()
1896
self.build_tree_contents([('test.list', test_id_line)])
1897
# And generate a list of the tests in the suite.
1898
stream = self.run_selftest(load_list='test.list', list_only=True)
1899
self.assertEqual(test_id_line, stream.getvalue())
1901
def test_load_unknown(self):
1902
# Provide a list with one test - this test.
1903
# And generate a list of the tests in the suite.
1904
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1905
load_list='missing file name', list_only=True)
1908
class TestRunBzr(tests.TestCase):
1913
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
1915
"""Override _run_bzr_core to test how it is invoked by run_bzr.
1917
Attempts to run bzr from inside this class don't actually run it.
1919
We test how run_bzr actually invokes bzr in another location.
1920
Here we only need to test that it is run_bzr passes the right
1921
parameters to run_bzr.
1923
self.argv = list(argv)
1924
self.retcode = retcode
1925
self.encoding = encoding
1927
self.working_dir = working_dir
1928
return self.out, self.err
1930
def test_run_bzr_error(self):
1931
self.out = "It sure does!\n"
1932
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
1933
self.assertEqual(['rocks'], self.argv)
1934
self.assertEqual(34, self.retcode)
1935
self.assertEqual(out, 'It sure does!\n')
1937
def test_run_bzr_error_regexes(self):
1939
self.err = "bzr: ERROR: foobarbaz is not versioned"
1940
out, err = self.run_bzr_error(
1941
["bzr: ERROR: foobarbaz is not versioned"],
1942
['file-id', 'foobarbaz'])
1944
def test_encoding(self):
1945
"""Test that run_bzr passes encoding to _run_bzr_core"""
1946
self.run_bzr('foo bar')
1947
self.assertEqual(None, self.encoding)
1948
self.assertEqual(['foo', 'bar'], self.argv)
1950
self.run_bzr('foo bar', encoding='baz')
1951
self.assertEqual('baz', self.encoding)
1952
self.assertEqual(['foo', 'bar'], self.argv)
1954
def test_retcode(self):
1955
"""Test that run_bzr passes retcode to _run_bzr_core"""
1956
# Default is retcode == 0
1957
self.run_bzr('foo bar')
1958
self.assertEqual(0, self.retcode)
1959
self.assertEqual(['foo', 'bar'], self.argv)
1961
self.run_bzr('foo bar', retcode=1)
1962
self.assertEqual(1, self.retcode)
1963
self.assertEqual(['foo', 'bar'], self.argv)
1965
self.run_bzr('foo bar', retcode=None)
1966
self.assertEqual(None, self.retcode)
1967
self.assertEqual(['foo', 'bar'], self.argv)
1969
self.run_bzr(['foo', 'bar'], retcode=3)
1970
self.assertEqual(3, self.retcode)
1971
self.assertEqual(['foo', 'bar'], self.argv)
1973
def test_stdin(self):
1974
# test that the stdin keyword to run_bzr is passed through to
1975
# _run_bzr_core as-is. We do this by overriding
1976
# _run_bzr_core in this class, and then calling run_bzr,
1977
# which is a convenience function for _run_bzr_core, so
1979
self.run_bzr('foo bar', stdin='gam')
1980
self.assertEqual('gam', self.stdin)
1981
self.assertEqual(['foo', 'bar'], self.argv)
1983
self.run_bzr('foo bar', stdin='zippy')
1984
self.assertEqual('zippy', self.stdin)
1985
self.assertEqual(['foo', 'bar'], self.argv)
1987
def test_working_dir(self):
1988
"""Test that run_bzr passes working_dir to _run_bzr_core"""
1989
self.run_bzr('foo bar')
1990
self.assertEqual(None, self.working_dir)
1991
self.assertEqual(['foo', 'bar'], self.argv)
1993
self.run_bzr('foo bar', working_dir='baz')
1994
self.assertEqual('baz', self.working_dir)
1995
self.assertEqual(['foo', 'bar'], self.argv)
1997
def test_reject_extra_keyword_arguments(self):
1998
self.assertRaises(TypeError, self.run_bzr, "foo bar",
1999
error_regex=['error message'])
2002
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2003
# Does IO when testing the working_dir parameter.
2005
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2006
a_callable=None, *args, **kwargs):
2008
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2009
self.factory = bzrlib.ui.ui_factory
2010
self.working_dir = osutils.getcwd()
2011
stdout.write('foo\n')
2012
stderr.write('bar\n')
2015
def test_stdin(self):
2016
# test that the stdin keyword to _run_bzr_core is passed through to
2017
# apply_redirected as a StringIO. We do this by overriding
2018
# apply_redirected in this class, and then calling _run_bzr_core,
2019
# which calls apply_redirected.
2020
self.run_bzr(['foo', 'bar'], stdin='gam')
2021
self.assertEqual('gam', self.stdin.read())
2022
self.assertTrue(self.stdin is self.factory_stdin)
2023
self.run_bzr(['foo', 'bar'], stdin='zippy')
2024
self.assertEqual('zippy', self.stdin.read())
2025
self.assertTrue(self.stdin is self.factory_stdin)
2027
def test_ui_factory(self):
2028
# each invocation of self.run_bzr should get its
2029
# own UI factory, which is an instance of TestUIFactory,
2030
# with stdin, stdout and stderr attached to the stdin,
2031
# stdout and stderr of the invoked run_bzr
2032
current_factory = bzrlib.ui.ui_factory
2033
self.run_bzr(['foo'])
2034
self.failIf(current_factory is self.factory)
2035
self.assertNotEqual(sys.stdout, self.factory.stdout)
2036
self.assertNotEqual(sys.stderr, self.factory.stderr)
2037
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2038
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2039
self.assertIsInstance(self.factory, tests.TestUIFactory)
2041
def test_working_dir(self):
2042
self.build_tree(['one/', 'two/'])
2043
cwd = osutils.getcwd()
2045
# Default is to work in the current directory
2046
self.run_bzr(['foo', 'bar'])
2047
self.assertEqual(cwd, self.working_dir)
2049
self.run_bzr(['foo', 'bar'], working_dir=None)
2050
self.assertEqual(cwd, self.working_dir)
2052
# The function should be run in the alternative directory
2053
# but afterwards the current working dir shouldn't be changed
2054
self.run_bzr(['foo', 'bar'], working_dir='one')
2055
self.assertNotEqual(cwd, self.working_dir)
2056
self.assertEndsWith(self.working_dir, 'one')
2057
self.assertEqual(cwd, osutils.getcwd())
2059
self.run_bzr(['foo', 'bar'], working_dir='two')
2060
self.assertNotEqual(cwd, self.working_dir)
2061
self.assertEndsWith(self.working_dir, 'two')
2062
self.assertEqual(cwd, osutils.getcwd())
2065
class StubProcess(object):
2066
"""A stub process for testing run_bzr_subprocess."""
2068
def __init__(self, out="", err="", retcode=0):
2071
self.returncode = retcode
2073
def communicate(self):
2074
return self.out, self.err
2077
class TestRunBzrSubprocess(tests.TestCaseWithTransport):
2080
tests.TestCaseWithTransport.setUp(self)
2081
self.subprocess_calls = []
2083
def start_bzr_subprocess(self, process_args, env_changes=None,
2084
skip_if_plan_to_signal=False,
2086
allow_plugins=False):
2087
"""capture what run_bzr_subprocess tries to do."""
2088
self.subprocess_calls.append({'process_args':process_args,
2089
'env_changes':env_changes,
2090
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2091
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2092
return self.next_subprocess
2094
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2095
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2097
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2098
that will return static results. This assertion method populates those
2099
results and also checks the arguments run_bzr_subprocess generates.
2101
self.next_subprocess = process
2103
result = self.run_bzr_subprocess(*args, **kwargs)
2105
self.next_subprocess = None
2106
for key, expected in expected_args.iteritems():
2107
self.assertEqual(expected, self.subprocess_calls[-1][key])
2110
self.next_subprocess = None
2111
for key, expected in expected_args.iteritems():
2112
self.assertEqual(expected, self.subprocess_calls[-1][key])
2115
def test_run_bzr_subprocess(self):
2116
"""The run_bzr_helper_external command behaves nicely."""
2117
self.assertRunBzrSubprocess({'process_args':['--version']},
2118
StubProcess(), '--version')
2119
self.assertRunBzrSubprocess({'process_args':['--version']},
2120
StubProcess(), ['--version'])
2121
# retcode=None disables retcode checking
2122
result = self.assertRunBzrSubprocess({},
2123
StubProcess(retcode=3), '--version', retcode=None)
2124
result = self.assertRunBzrSubprocess({},
2125
StubProcess(out="is free software"), '--version')
2126
self.assertContainsRe(result[0], 'is free software')
2127
# Running a subcommand that is missing errors
2128
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2129
{'process_args':['--versionn']}, StubProcess(retcode=3),
2131
# Unless it is told to expect the error from the subprocess
2132
result = self.assertRunBzrSubprocess({},
2133
StubProcess(retcode=3), '--versionn', retcode=3)
2134
# Or to ignore retcode checking
2135
result = self.assertRunBzrSubprocess({},
2136
StubProcess(err="unknown command", retcode=3), '--versionn',
2138
self.assertContainsRe(result[1], 'unknown command')
2140
def test_env_change_passes_through(self):
2141
self.assertRunBzrSubprocess(
2142
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2144
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2146
def test_no_working_dir_passed_as_None(self):
2147
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2149
def test_no_working_dir_passed_through(self):
2150
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2153
def test_run_bzr_subprocess_no_plugins(self):
2154
self.assertRunBzrSubprocess({'allow_plugins': False},
2157
def test_allow_plugins(self):
2158
self.assertRunBzrSubprocess({'allow_plugins': True},
2159
StubProcess(), '', allow_plugins=True)
2162
class _DontSpawnProcess(Exception):
2163
"""A simple exception which just allows us to skip unnecessary steps"""
2166
class TestStartBzrSubProcess(tests.TestCase):
2168
def check_popen_state(self):
2169
"""Replace to make assertions when popen is called."""
2171
def _popen(self, *args, **kwargs):
2172
"""Record the command that is run, so that we can ensure it is correct"""
2173
self.check_popen_state()
2174
self._popen_args = args
2175
self._popen_kwargs = kwargs
2176
raise _DontSpawnProcess()
2178
def test_run_bzr_subprocess_no_plugins(self):
2179
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2180
command = self._popen_args[0]
2181
self.assertEqual(sys.executable, command[0])
2182
self.assertEqual(self.get_bzr_path(), command[1])
2183
self.assertEqual(['--no-plugins'], command[2:])
2185
def test_allow_plugins(self):
2186
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2188
command = self._popen_args[0]
2189
self.assertEqual([], command[2:])
2191
def test_set_env(self):
2192
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2194
def check_environment():
2195
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2196
self.check_popen_state = check_environment
2197
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2198
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2199
# not set in theparent
2200
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2202
def test_run_bzr_subprocess_env_del(self):
2203
"""run_bzr_subprocess can remove environment variables too."""
2204
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2205
def check_environment():
2206
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2207
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2208
self.check_popen_state = check_environment
2209
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2210
env_changes={'EXISTANT_ENV_VAR':None})
2211
# Still set in parent
2212
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2213
del os.environ['EXISTANT_ENV_VAR']
2215
def test_env_del_missing(self):
2216
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2217
def check_environment():
2218
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2219
self.check_popen_state = check_environment
2220
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2221
env_changes={'NON_EXISTANT_ENV_VAR':None})
2223
def test_working_dir(self):
2224
"""Test that we can specify the working dir for the child"""
2225
orig_getcwd = osutils.getcwd
2226
orig_chdir = os.chdir
2234
osutils.getcwd = getcwd
2236
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2239
osutils.getcwd = orig_getcwd
2241
os.chdir = orig_chdir
2242
self.assertEqual(['foo', 'current'], chdirs)
2245
class TestBzrSubprocess(tests.TestCaseWithTransport):
2247
def test_start_and_stop_bzr_subprocess(self):
2248
"""We can start and perform other test actions while that process is
2251
process = self.start_bzr_subprocess(['--version'])
2252
result = self.finish_bzr_subprocess(process)
2253
self.assertContainsRe(result[0], 'is free software')
2254
self.assertEqual('', result[1])
2256
def test_start_and_stop_bzr_subprocess_with_error(self):
2257
"""finish_bzr_subprocess allows specification of the desired exit code.
2259
process = self.start_bzr_subprocess(['--versionn'])
2260
result = self.finish_bzr_subprocess(process, retcode=3)
2261
self.assertEqual('', result[0])
2262
self.assertContainsRe(result[1], 'unknown command')
2264
def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
2265
"""finish_bzr_subprocess allows the exit code to be ignored."""
2266
process = self.start_bzr_subprocess(['--versionn'])
2267
result = self.finish_bzr_subprocess(process, retcode=None)
2268
self.assertEqual('', result[0])
2269
self.assertContainsRe(result[1], 'unknown command')
2271
def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
2272
"""finish_bzr_subprocess raises self.failureException if the retcode is
2273
not the expected one.
2275
process = self.start_bzr_subprocess(['--versionn'])
2276
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2279
def test_start_and_stop_bzr_subprocess_send_signal(self):
2280
"""finish_bzr_subprocess raises self.failureException if the retcode is
2281
not the expected one.
2283
process = self.start_bzr_subprocess(['wait-until-signalled'],
2284
skip_if_plan_to_signal=True)
2285
self.assertEqual('running\n', process.stdout.readline())
2286
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2288
self.assertEqual('', result[0])
2289
self.assertEqual('bzr: interrupted\n', result[1])
2291
def test_start_and_stop_working_dir(self):
2292
cwd = osutils.getcwd()
2293
self.make_branch_and_tree('one')
2294
process = self.start_bzr_subprocess(['root'], working_dir='one')
2295
result = self.finish_bzr_subprocess(process, universal_newlines=True)
2296
self.assertEndsWith(result[0], 'one\n')
2297
self.assertEqual('', result[1])
2300
class TestKnownFailure(tests.TestCase):
1663
2302
def test_known_failure(self):
1664
2303
"""Check that KnownFailure is defined appropriately."""
1665
2304
# a KnownFailure is an assertion error for compatability with unaware
1667
self.assertIsInstance(KnownFailure(""), AssertionError)
2306
self.assertIsInstance(tests.KnownFailure(""), AssertionError)
1669
2308
def test_expect_failure(self):
1671
2310
self.expectFailure("Doomed to failure", self.assertTrue, False)
1672
except KnownFailure, e:
2311
except tests.KnownFailure, e:
1673
2312
self.assertEqual('Doomed to failure', e.args[0])
1675
2314
self.expectFailure("Doomed to failure", self.assertTrue, True)