1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the test framework."""
19
from cStringIO import StringIO
30
from testtools import (
31
ExtendedToOriginalDecorator,
34
from testtools.content import Content
35
from testtools.content_type import ContentType
36
from testtools.matchers import (
40
import testtools.testresult.doubles
61
from bzrlib.repofmt import (
64
from bzrlib.symbol_versioning import (
69
from bzrlib.tests import (
75
from bzrlib.trace import note, mutter
76
from bzrlib.transport import memory
79
def _test_ids(test_suite):
80
"""Get the ids for the tests in a test suite."""
81
return [t.id() for t in tests.iter_suite_tests(test_suite)]
84
class MetaTestLog(tests.TestCase):
86
def test_logging(self):
87
"""Test logs are captured when a test fails."""
88
self.log('a test message')
89
details = self.getDetails()
91
self.assertThat(log.content_type, Equals(ContentType(
92
"text", "plain", {"charset": "utf8"})))
93
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
94
self.assertThat(self.get_log(),
95
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
98
class TestTreeShape(tests.TestCaseInTempDir):
100
def test_unicode_paths(self):
101
self.requireFeature(features.UnicodeFilenameFeature)
103
filename = u'hell\u00d8'
104
self.build_tree_contents([(filename, 'contents of hello')])
105
self.assertPathExists(filename)
108
class TestClassesAvailable(tests.TestCase):
109
"""As a convenience we expose Test* classes from bzrlib.tests"""
111
def test_test_case(self):
112
from bzrlib.tests import TestCase
114
def test_test_loader(self):
115
from bzrlib.tests import TestLoader
117
def test_test_suite(self):
118
from bzrlib.tests import TestSuite
121
class TestTransportScenarios(tests.TestCase):
122
"""A group of tests that test the transport implementation adaption core.
124
This is a meta test that the tests are applied to all available
127
This will be generalised in the future which is why it is in this
128
test file even though it is specific to transport tests at the moment.
131
def test_get_transport_permutations(self):
132
# this checks that get_test_permutations defined by the module is
133
# called by the get_transport_test_permutations function.
134
class MockModule(object):
135
def get_test_permutations(self):
136
return sample_permutation
137
sample_permutation = [(1,2), (3,4)]
138
from bzrlib.tests.per_transport import get_transport_test_permutations
139
self.assertEqual(sample_permutation,
140
get_transport_test_permutations(MockModule()))
142
def test_scenarios_include_all_modules(self):
143
# this checks that the scenario generator returns as many permutations
144
# as there are in all the registered transport modules - we assume if
145
# this matches its probably doing the right thing especially in
146
# combination with the tests for setting the right classes below.
147
from bzrlib.tests.per_transport import transport_test_permutations
148
from bzrlib.transport import _get_transport_modules
149
modules = _get_transport_modules()
150
permutation_count = 0
151
for module in modules:
153
permutation_count += len(reduce(getattr,
154
(module + ".get_test_permutations").split('.')[1:],
155
__import__(module))())
156
except errors.DependencyNotPresent:
158
scenarios = transport_test_permutations()
159
self.assertEqual(permutation_count, len(scenarios))
161
def test_scenarios_include_transport_class(self):
162
# This test used to know about all the possible transports and the
163
# order they were returned but that seems overly brittle (mbp
165
from bzrlib.tests.per_transport import transport_test_permutations
166
scenarios = transport_test_permutations()
167
# there are at least that many builtin transports
168
self.assertTrue(len(scenarios) > 6)
169
one_scenario = scenarios[0]
170
self.assertIsInstance(one_scenario[0], str)
171
self.assertTrue(issubclass(one_scenario[1]["transport_class"],
172
bzrlib.transport.Transport))
173
self.assertTrue(issubclass(one_scenario[1]["transport_server"],
174
bzrlib.transport.Server))
177
class TestBranchScenarios(tests.TestCase):
179
def test_scenarios(self):
180
# check that constructor parameters are passed through to the adapted
182
from bzrlib.tests.per_branch import make_scenarios
185
formats = [("c", "C"), ("d", "D")]
186
scenarios = make_scenarios(server1, server2, formats)
187
self.assertEqual(2, len(scenarios))
190
{'branch_format': 'c',
191
'bzrdir_format': 'C',
192
'transport_readonly_server': 'b',
193
'transport_server': 'a'}),
195
{'branch_format': 'd',
196
'bzrdir_format': 'D',
197
'transport_readonly_server': 'b',
198
'transport_server': 'a'})],
202
class TestBzrDirScenarios(tests.TestCase):
204
def test_scenarios(self):
205
# check that constructor parameters are passed through to the adapted
207
from bzrlib.tests.per_controldir import make_scenarios
212
scenarios = make_scenarios(vfs_factory, server1, server2, formats)
215
{'bzrdir_format': 'c',
216
'transport_readonly_server': 'b',
217
'transport_server': 'a',
218
'vfs_transport_factory': 'v'}),
220
{'bzrdir_format': 'd',
221
'transport_readonly_server': 'b',
222
'transport_server': 'a',
223
'vfs_transport_factory': 'v'})],
227
class TestRepositoryScenarios(tests.TestCase):
229
def test_formats_to_scenarios(self):
230
from bzrlib.tests.per_repository import formats_to_scenarios
231
formats = [("(c)", remote.RemoteRepositoryFormat()),
232
("(d)", repository.format_registry.get(
233
'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
234
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
236
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
237
vfs_transport_factory="vfs")
238
# no_vfs generate scenarios without vfs_transport_factory
240
('RemoteRepositoryFormat(c)',
241
{'bzrdir_format': remote.RemoteBzrDirFormat(),
242
'repository_format': remote.RemoteRepositoryFormat(),
243
'transport_readonly_server': 'readonly',
244
'transport_server': 'server'}),
245
('RepositoryFormat2a(d)',
246
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
247
'repository_format': groupcompress_repo.RepositoryFormat2a(),
248
'transport_readonly_server': 'readonly',
249
'transport_server': 'server'})]
250
self.assertEqual(expected, no_vfs_scenarios)
252
('RemoteRepositoryFormat(c)',
253
{'bzrdir_format': remote.RemoteBzrDirFormat(),
254
'repository_format': remote.RemoteRepositoryFormat(),
255
'transport_readonly_server': 'readonly',
256
'transport_server': 'server',
257
'vfs_transport_factory': 'vfs'}),
258
('RepositoryFormat2a(d)',
259
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
260
'repository_format': groupcompress_repo.RepositoryFormat2a(),
261
'transport_readonly_server': 'readonly',
262
'transport_server': 'server',
263
'vfs_transport_factory': 'vfs'})],
267
class TestTestScenarioApplication(tests.TestCase):
268
"""Tests for the test adaption facilities."""
270
def test_apply_scenario(self):
271
from bzrlib.tests import apply_scenario
272
input_test = TestTestScenarioApplication("test_apply_scenario")
273
# setup two adapted tests
274
adapted_test1 = apply_scenario(input_test,
276
{"bzrdir_format":"bzr_format",
277
"repository_format":"repo_fmt",
278
"transport_server":"transport_server",
279
"transport_readonly_server":"readonly-server"}))
280
adapted_test2 = apply_scenario(input_test,
281
("new id 2", {"bzrdir_format":None}))
282
# input_test should have been altered.
283
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
284
# the new tests are mutually incompatible, ensuring it has
285
# made new ones, and unspecified elements in the scenario
286
# should not have been altered.
287
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
288
self.assertEqual("repo_fmt", adapted_test1.repository_format)
289
self.assertEqual("transport_server", adapted_test1.transport_server)
290
self.assertEqual("readonly-server",
291
adapted_test1.transport_readonly_server)
293
"bzrlib.tests.test_selftest.TestTestScenarioApplication."
294
"test_apply_scenario(new id)",
296
self.assertEqual(None, adapted_test2.bzrdir_format)
298
"bzrlib.tests.test_selftest.TestTestScenarioApplication."
299
"test_apply_scenario(new id 2)",
303
class TestInterRepositoryScenarios(tests.TestCase):
305
def test_scenarios(self):
306
# check that constructor parameters are passed through to the adapted
308
from bzrlib.tests.per_interrepository import make_scenarios
311
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
312
scenarios = make_scenarios(server1, server2, formats)
315
{'repository_format': 'C1',
316
'repository_format_to': 'C2',
317
'transport_readonly_server': 'b',
318
'transport_server': 'a',
319
'extra_setup': 'C3'}),
321
{'repository_format': 'D1',
322
'repository_format_to': 'D2',
323
'transport_readonly_server': 'b',
324
'transport_server': 'a',
325
'extra_setup': 'D3'})],
329
class TestWorkingTreeScenarios(tests.TestCase):
331
def test_scenarios(self):
332
# check that constructor parameters are passed through to the adapted
334
from bzrlib.tests.per_workingtree import make_scenarios
337
formats = [workingtree_4.WorkingTreeFormat4(),
338
workingtree_3.WorkingTreeFormat3(),
339
workingtree_4.WorkingTreeFormat6()]
340
scenarios = make_scenarios(server1, server2, formats,
341
remote_server='c', remote_readonly_server='d',
342
remote_backing_server='e')
344
('WorkingTreeFormat4',
345
{'bzrdir_format': formats[0]._matchingbzrdir,
346
'transport_readonly_server': 'b',
347
'transport_server': 'a',
348
'workingtree_format': formats[0]}),
349
('WorkingTreeFormat3',
350
{'bzrdir_format': formats[1]._matchingbzrdir,
351
'transport_readonly_server': 'b',
352
'transport_server': 'a',
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
369
class TestTreeScenarios(tests.TestCase):
371
def test_scenarios(self):
372
# the tree implementation scenario generator is meant to setup one
373
# instance for each working tree format, one additional instance
374
# that will use the default wt format, but create a revision tree for
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
379
# revision_tree_from_workingtree.
381
from bzrlib.tests.per_tree import (
382
_dirstate_tree_from_workingtree,
387
revision_tree_from_workingtree
391
smart_server = test_server.SmartTCPServer_for_testing
392
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
393
mem_server = memory.MemoryServer
394
formats = [workingtree_4.WorkingTreeFormat4(),
395
workingtree_3.WorkingTreeFormat3(),]
396
scenarios = make_scenarios(server1, server2, formats)
397
self.assertEqual(8, len(scenarios))
398
default_wt_format = workingtree.format_registry.get_default()
399
wt4_format = workingtree_4.WorkingTreeFormat4()
400
wt5_format = workingtree_4.WorkingTreeFormat5()
401
wt6_format = workingtree_4.WorkingTreeFormat6()
402
expected_scenarios = [
403
('WorkingTreeFormat4',
404
{'bzrdir_format': formats[0]._matchingbzrdir,
405
'transport_readonly_server': 'b',
406
'transport_server': 'a',
407
'workingtree_format': formats[0],
408
'_workingtree_to_test_tree': return_parameter,
410
('WorkingTreeFormat3',
411
{'bzrdir_format': formats[1]._matchingbzrdir,
412
'transport_readonly_server': 'b',
413
'transport_server': 'a',
414
'workingtree_format': formats[1],
415
'_workingtree_to_test_tree': return_parameter,
417
('WorkingTreeFormat6,remote',
418
{'bzrdir_format': wt6_format._matchingbzrdir,
419
'repo_is_remote': True,
420
'transport_readonly_server': smart_readonly_server,
421
'transport_server': smart_server,
422
'vfs_transport_factory': mem_server,
423
'workingtree_format': wt6_format,
424
'_workingtree_to_test_tree': return_parameter,
427
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
428
'bzrdir_format': default_wt_format._matchingbzrdir,
429
'transport_readonly_server': 'b',
430
'transport_server': 'a',
431
'workingtree_format': default_wt_format,
433
('DirStateRevisionTree,WT4',
434
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
435
'bzrdir_format': wt4_format._matchingbzrdir,
436
'transport_readonly_server': 'b',
437
'transport_server': 'a',
438
'workingtree_format': wt4_format,
440
('DirStateRevisionTree,WT5',
441
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
442
'bzrdir_format': wt5_format._matchingbzrdir,
443
'transport_readonly_server': 'b',
444
'transport_server': 'a',
445
'workingtree_format': wt5_format,
448
{'_workingtree_to_test_tree': preview_tree_pre,
449
'bzrdir_format': default_wt_format._matchingbzrdir,
450
'transport_readonly_server': 'b',
451
'transport_server': 'a',
452
'workingtree_format': default_wt_format}),
454
{'_workingtree_to_test_tree': preview_tree_post,
455
'bzrdir_format': default_wt_format._matchingbzrdir,
456
'transport_readonly_server': 'b',
457
'transport_server': 'a',
458
'workingtree_format': default_wt_format}),
460
self.assertEqual(expected_scenarios, scenarios)
463
class TestInterTreeScenarios(tests.TestCase):
464
"""A group of tests that test the InterTreeTestAdapter."""
466
def test_scenarios(self):
467
# check that constructor parameters are passed through to the adapted
469
# for InterTree tests we want the machinery to bring up two trees in
470
# each instance: the base one, and the one we are interacting with.
471
# because each optimiser can be direction specific, we need to test
472
# each optimiser in its chosen direction.
473
# unlike the TestProviderAdapter we dont want to automatically add a
474
# parameterized one for WorkingTree - the optimisers will tell us what
476
from bzrlib.tests.per_tree import (
479
from bzrlib.tests.per_intertree import (
482
from bzrlib.workingtree_3 import WorkingTreeFormat3
483
from bzrlib.workingtree_4 import WorkingTreeFormat4
484
input_test = TestInterTreeScenarios(
488
format1 = WorkingTreeFormat4()
489
format2 = WorkingTreeFormat3()
490
formats = [("1", str, format1, format2, "converter1"),
491
("2", int, format2, format1, "converter2")]
492
scenarios = make_scenarios(server1, server2, formats)
493
self.assertEqual(2, len(scenarios))
494
expected_scenarios = [
496
"bzrdir_format": format1._matchingbzrdir,
497
"intertree_class": formats[0][1],
498
"workingtree_format": formats[0][2],
499
"workingtree_format_to": formats[0][3],
500
"mutable_trees_to_test_trees": formats[0][4],
501
"_workingtree_to_test_tree": return_parameter,
502
"transport_server": server1,
503
"transport_readonly_server": server2,
506
"bzrdir_format": format2._matchingbzrdir,
507
"intertree_class": formats[1][1],
508
"workingtree_format": formats[1][2],
509
"workingtree_format_to": formats[1][3],
510
"mutable_trees_to_test_trees": formats[1][4],
511
"_workingtree_to_test_tree": return_parameter,
512
"transport_server": server1,
513
"transport_readonly_server": server2,
516
self.assertEqual(scenarios, expected_scenarios)
519
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
521
def test_home_is_not_working(self):
522
self.assertNotEqual(self.test_dir, self.test_home_dir)
523
cwd = osutils.getcwd()
524
self.assertIsSameRealPath(self.test_dir, cwd)
525
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
527
def test_assertEqualStat_equal(self):
528
from bzrlib.tests.test_dirstate import _FakeStat
529
self.build_tree(["foo"])
530
real = os.lstat("foo")
531
fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
532
real.st_dev, real.st_ino, real.st_mode)
533
self.assertEqualStat(real, fake)
535
def test_assertEqualStat_notequal(self):
536
self.build_tree(["foo", "longname"])
537
self.assertRaises(AssertionError, self.assertEqualStat,
538
os.lstat("foo"), os.lstat("longname"))
540
def test_failUnlessExists(self):
541
"""Deprecated failUnlessExists and failIfExists"""
542
self.applyDeprecated(
543
deprecated_in((2, 4)),
544
self.failUnlessExists, '.')
545
self.build_tree(['foo/', 'foo/bar'])
546
self.applyDeprecated(
547
deprecated_in((2, 4)),
548
self.failUnlessExists, 'foo/bar')
549
self.applyDeprecated(
550
deprecated_in((2, 4)),
551
self.failIfExists, 'foo/foo')
553
def test_assertPathExists(self):
554
self.assertPathExists('.')
555
self.build_tree(['foo/', 'foo/bar'])
556
self.assertPathExists('foo/bar')
557
self.assertPathDoesNotExist('foo/foo')
560
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
562
def test_home_is_non_existant_dir_under_root(self):
563
"""The test_home_dir for TestCaseWithMemoryTransport is missing.
565
This is because TestCaseWithMemoryTransport is for tests that do not
566
need any disk resources: they should be hooked into bzrlib in such a
567
way that no global settings are being changed by the test (only a
568
few tests should need to do that), and having a missing dir as home is
569
an effective way to ensure that this is the case.
571
self.assertIsSameRealPath(
572
self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
574
self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
576
def test_cwd_is_TEST_ROOT(self):
577
self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
578
cwd = osutils.getcwd()
579
self.assertIsSameRealPath(self.test_dir, cwd)
581
def test_BZR_HOME_and_HOME_are_bytestrings(self):
582
"""The $BZR_HOME and $HOME environment variables should not be unicode.
584
See https://bugs.launchpad.net/bzr/+bug/464174
586
self.assertIsInstance(os.environ['BZR_HOME'], str)
587
self.assertIsInstance(os.environ['HOME'], str)
589
def test_make_branch_and_memory_tree(self):
590
"""In TestCaseWithMemoryTransport we should not make the branch on disk.
592
This is hard to comprehensively robustly test, so we settle for making
593
a branch and checking no directory was created at its relpath.
595
tree = self.make_branch_and_memory_tree('dir')
596
# Guard against regression into MemoryTransport leaking
597
# files to disk instead of keeping them in memory.
598
self.assertFalse(osutils.lexists('dir'))
599
self.assertIsInstance(tree, memorytree.MemoryTree)
601
def test_make_branch_and_memory_tree_with_format(self):
602
"""make_branch_and_memory_tree should accept a format option."""
603
format = bzrdir.BzrDirMetaFormat1()
604
format.repository_format = repository.format_registry.get_default()
605
tree = self.make_branch_and_memory_tree('dir', format=format)
606
# Guard against regression into MemoryTransport leaking
607
# files to disk instead of keeping them in memory.
608
self.assertFalse(osutils.lexists('dir'))
609
self.assertIsInstance(tree, memorytree.MemoryTree)
610
self.assertEqual(format.repository_format.__class__,
611
tree.branch.repository._format.__class__)
613
def test_make_branch_builder(self):
614
builder = self.make_branch_builder('dir')
615
self.assertIsInstance(builder, branchbuilder.BranchBuilder)
616
# Guard against regression into MemoryTransport leaking
617
# files to disk instead of keeping them in memory.
618
self.assertFalse(osutils.lexists('dir'))
620
def test_make_branch_builder_with_format(self):
621
# Use a repo layout that doesn't conform to a 'named' layout, to ensure
622
# that the format objects are used.
623
format = bzrdir.BzrDirMetaFormat1()
624
repo_format = repository.format_registry.get_default()
625
format.repository_format = repo_format
626
builder = self.make_branch_builder('dir', format=format)
627
the_branch = builder.get_branch()
628
# Guard against regression into MemoryTransport leaking
629
# files to disk instead of keeping them in memory.
630
self.assertFalse(osutils.lexists('dir'))
631
self.assertEqual(format.repository_format.__class__,
632
the_branch.repository._format.__class__)
633
self.assertEqual(repo_format.get_format_string(),
634
self.get_transport().get_bytes(
635
'dir/.bzr/repository/format'))
637
def test_make_branch_builder_with_format_name(self):
638
builder = self.make_branch_builder('dir', format='knit')
639
the_branch = builder.get_branch()
640
# Guard against regression into MemoryTransport leaking
641
# files to disk instead of keeping them in memory.
642
self.assertFalse(osutils.lexists('dir'))
643
dir_format = controldir.format_registry.make_bzrdir('knit')
644
self.assertEqual(dir_format.repository_format.__class__,
645
the_branch.repository._format.__class__)
646
self.assertEqual('Bazaar-NG Knit Repository Format 1',
647
self.get_transport().get_bytes(
648
'dir/.bzr/repository/format'))
650
def test_dangling_locks_cause_failures(self):
651
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
652
def test_function(self):
653
t = self.get_transport_from_path('.')
654
l = lockdir.LockDir(t, 'lock')
657
test = TestDanglingLock('test_function')
659
total_failures = result.errors + result.failures
660
if self._lock_check_thorough:
661
self.assertEqual(1, len(total_failures))
663
# When _lock_check_thorough is disabled, then we don't trigger a
665
self.assertEqual(0, len(total_failures))
668
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
669
"""Tests for the convenience functions TestCaseWithTransport introduces."""
671
def test_get_readonly_url_none(self):
672
from bzrlib.transport.readonly import ReadonlyTransportDecorator
673
self.vfs_transport_factory = memory.MemoryServer
674
self.transport_readonly_server = None
675
# calling get_readonly_transport() constructs a decorator on the url
677
url = self.get_readonly_url()
678
url2 = self.get_readonly_url('foo/bar')
679
t = transport.get_transport_from_url(url)
680
t2 = transport.get_transport_from_url(url2)
681
self.assertIsInstance(t, ReadonlyTransportDecorator)
682
self.assertIsInstance(t2, ReadonlyTransportDecorator)
683
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
685
def test_get_readonly_url_http(self):
686
from bzrlib.tests.http_server import HttpServer
687
from bzrlib.transport.http import HttpTransportBase
688
self.transport_server = test_server.LocalURLServer
689
self.transport_readonly_server = HttpServer
690
# calling get_readonly_transport() gives us a HTTP server instance.
691
url = self.get_readonly_url()
692
url2 = self.get_readonly_url('foo/bar')
693
# the transport returned may be any HttpTransportBase subclass
694
t = transport.get_transport_from_url(url)
695
t2 = transport.get_transport_from_url(url2)
696
self.assertIsInstance(t, HttpTransportBase)
697
self.assertIsInstance(t2, HttpTransportBase)
698
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
700
def test_is_directory(self):
701
"""Test assertIsDirectory assertion"""
702
t = self.get_transport()
703
self.build_tree(['a_dir/', 'a_file'], transport=t)
704
self.assertIsDirectory('a_dir', t)
705
self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
706
self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
708
def test_make_branch_builder(self):
709
builder = self.make_branch_builder('dir')
710
rev_id = builder.build_commit()
711
self.assertPathExists('dir')
712
a_dir = controldir.ControlDir.open('dir')
713
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
714
a_branch = a_dir.open_branch()
715
builder_branch = builder.get_branch()
716
self.assertEqual(a_branch.base, builder_branch.base)
717
self.assertEqual((1, rev_id), builder_branch.last_revision_info())
718
self.assertEqual((1, rev_id), a_branch.last_revision_info())
721
class TestTestCaseTransports(tests.TestCaseWithTransport):
724
super(TestTestCaseTransports, self).setUp()
725
self.vfs_transport_factory = memory.MemoryServer
727
def test_make_bzrdir_preserves_transport(self):
728
t = self.get_transport()
729
result_bzrdir = self.make_bzrdir('subdir')
730
self.assertIsInstance(result_bzrdir.transport,
731
memory.MemoryTransport)
732
# should not be on disk, should only be in memory
733
self.assertPathDoesNotExist('subdir')
736
class TestChrootedTest(tests.ChrootedTestCase):
738
def test_root_is_root(self):
739
t = transport.get_transport_from_url(self.get_readonly_url())
741
self.assertEqual(url, t.clone('..').base)
744
class TestProfileResult(tests.TestCase):
746
def test_profiles_tests(self):
747
self.requireFeature(features.lsprof_feature)
748
terminal = testtools.testresult.doubles.ExtendedTestResult()
749
result = tests.ProfileResult(terminal)
750
class Sample(tests.TestCase):
752
self.sample_function()
753
def sample_function(self):
757
case = terminal._events[0][1]
758
self.assertLength(1, case._benchcalls)
759
# We must be able to unpack it as the test reporting code wants
760
(_, _, _), stats = case._benchcalls[0]
761
self.assertTrue(callable(stats.pprint))
764
class TestTestResult(tests.TestCase):
766
def check_timing(self, test_case, expected_re):
767
result = bzrlib.tests.TextTestResult(self._log_file,
771
capture = testtools.testresult.doubles.ExtendedTestResult()
772
test_case.run(MultiTestResult(result, capture))
773
run_case = capture._events[0][1]
774
timed_string = result._testTimeString(run_case)
775
self.assertContainsRe(timed_string, expected_re)
777
def test_test_reporting(self):
778
class ShortDelayTestCase(tests.TestCase):
779
def test_short_delay(self):
781
def test_short_benchmark(self):
782
self.time(time.sleep, 0.003)
783
self.check_timing(ShortDelayTestCase('test_short_delay'),
785
# if a benchmark time is given, we now show just that time followed by
787
self.check_timing(ShortDelayTestCase('test_short_benchmark'),
790
def test_unittest_reporting_unittest_class(self):
791
# getting the time from a non-bzrlib test works ok
792
class ShortDelayTestCase(unittest.TestCase):
793
def test_short_delay(self):
795
self.check_timing(ShortDelayTestCase('test_short_delay'),
798
def _time_hello_world_encoding(self):
799
"""Profile two sleep calls
801
This is used to exercise the test framework.
803
self.time(unicode, 'hello', errors='replace')
804
self.time(unicode, 'world', errors='replace')
806
def test_lsprofiling(self):
807
"""Verbose test result prints lsprof statistics from test cases."""
808
self.requireFeature(features.lsprof_feature)
809
result_stream = StringIO()
810
result = bzrlib.tests.VerboseTestResult(
815
# we want profile a call of some sort and check it is output by
816
# addSuccess. We dont care about addError or addFailure as they
817
# are not that interesting for performance tuning.
818
# make a new test instance that when run will generate a profile
819
example_test_case = TestTestResult("_time_hello_world_encoding")
820
example_test_case._gather_lsprof_in_benchmarks = True
821
# execute the test, which should succeed and record profiles
822
example_test_case.run(result)
823
# lsprofile_something()
824
# if this worked we want
825
# LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
826
# CallCount Recursive Total(ms) Inline(ms) module:lineno(function)
827
# (the lsprof header)
828
# ... an arbitrary number of lines
829
# and the function call which is time.sleep.
830
# 1 0 ??? ??? ???(sleep)
831
# and then repeated but with 'world', rather than 'hello'.
832
# this should appear in the output stream of our test result.
833
output = result_stream.getvalue()
834
self.assertContainsRe(output,
835
r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
836
self.assertContainsRe(output,
837
r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
838
self.assertContainsRe(output,
839
r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
840
self.assertContainsRe(output,
841
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
843
def test_uses_time_from_testtools(self):
844
"""Test case timings in verbose results should use testtools times"""
846
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
847
def startTest(self, test):
848
self.time(datetime.datetime.utcfromtimestamp(1.145))
849
super(TimeAddedVerboseTestResult, self).startTest(test)
850
def addSuccess(self, test):
851
self.time(datetime.datetime.utcfromtimestamp(51.147))
852
super(TimeAddedVerboseTestResult, self).addSuccess(test)
853
def report_tests_starting(self): pass
855
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
856
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
858
def test_known_failure(self):
859
"""Using knownFailure should trigger several result actions."""
860
class InstrumentedTestResult(tests.ExtendedTestResult):
861
def stopTestRun(self): pass
862
def report_tests_starting(self): pass
863
def report_known_failure(self, test, err=None, details=None):
864
self._call = test, 'known failure'
865
result = InstrumentedTestResult(None, None, None, None)
866
class Test(tests.TestCase):
867
def test_function(self):
868
self.knownFailure('failed!')
869
test = Test("test_function")
871
# it should invoke 'report_known_failure'.
872
self.assertEqual(2, len(result._call))
873
self.assertEqual(test.id(), result._call[0].id())
874
self.assertEqual('known failure', result._call[1])
875
# we dont introspec the traceback, if the rest is ok, it would be
876
# exceptional for it not to be.
877
# it should update the known_failure_count on the object.
878
self.assertEqual(1, result.known_failure_count)
879
# the result should be successful.
880
self.assertTrue(result.wasSuccessful())
882
def test_verbose_report_known_failure(self):
883
# verbose test output formatting
884
result_stream = StringIO()
885
result = bzrlib.tests.VerboseTestResult(
890
_get_test("test_xfail").run(result)
891
self.assertContainsRe(result_stream.getvalue(),
892
"\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
893
"\\s*(?:Text attachment: )?reason"
898
def get_passing_test(self):
899
"""Return a test object that can't be run usefully."""
902
return unittest.FunctionTestCase(passing_test)
904
def test_add_not_supported(self):
905
"""Test the behaviour of invoking addNotSupported."""
906
class InstrumentedTestResult(tests.ExtendedTestResult):
907
def stopTestRun(self): pass
908
def report_tests_starting(self): pass
909
def report_unsupported(self, test, feature):
910
self._call = test, feature
911
result = InstrumentedTestResult(None, None, None, None)
912
test = SampleTestCase('_test_pass')
913
feature = features.Feature()
914
result.startTest(test)
915
result.addNotSupported(test, feature)
916
# it should invoke 'report_unsupported'.
917
self.assertEqual(2, len(result._call))
918
self.assertEqual(test, result._call[0])
919
self.assertEqual(feature, result._call[1])
920
# the result should be successful.
921
self.assertTrue(result.wasSuccessful())
922
# it should record the test against a count of tests not run due to
924
self.assertEqual(1, result.unsupported['Feature'])
925
# and invoking it again should increment that counter
926
result.addNotSupported(test, feature)
927
self.assertEqual(2, result.unsupported['Feature'])
929
def test_verbose_report_unsupported(self):
930
# verbose test output formatting
931
result_stream = StringIO()
932
result = bzrlib.tests.VerboseTestResult(
937
test = self.get_passing_test()
938
feature = features.Feature()
939
result.startTest(test)
940
prefix = len(result_stream.getvalue())
941
result.report_unsupported(test, feature)
942
output = result_stream.getvalue()[prefix:]
943
lines = output.splitlines()
944
# We don't check for the final '0ms' since it may fail on slow hosts
945
self.assertStartsWith(lines[0], 'NODEP')
946
self.assertEqual(lines[1],
947
" The feature 'Feature' is not available.")
949
def test_unavailable_exception(self):
950
"""An UnavailableFeature being raised should invoke addNotSupported."""
951
class InstrumentedTestResult(tests.ExtendedTestResult):
952
def stopTestRun(self): pass
953
def report_tests_starting(self): pass
954
def addNotSupported(self, test, feature):
955
self._call = test, feature
956
result = InstrumentedTestResult(None, None, None, None)
957
feature = features.Feature()
958
class Test(tests.TestCase):
959
def test_function(self):
960
raise tests.UnavailableFeature(feature)
961
test = Test("test_function")
963
# it should invoke 'addNotSupported'.
964
self.assertEqual(2, len(result._call))
965
self.assertEqual(test.id(), result._call[0].id())
966
self.assertEqual(feature, result._call[1])
967
# and not count as an error
968
self.assertEqual(0, result.error_count)
970
def test_strict_with_unsupported_feature(self):
971
result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
973
test = self.get_passing_test()
974
feature = "Unsupported Feature"
975
result.addNotSupported(test, feature)
976
self.assertFalse(result.wasStrictlySuccessful())
977
self.assertEqual(None, result._extractBenchmarkTime(test))
979
def test_strict_with_known_failure(self):
980
result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
982
test = _get_test("test_xfail")
984
self.assertFalse(result.wasStrictlySuccessful())
985
self.assertEqual(None, result._extractBenchmarkTime(test))
987
def test_strict_with_success(self):
988
result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
990
test = self.get_passing_test()
991
result.addSuccess(test)
992
self.assertTrue(result.wasStrictlySuccessful())
993
self.assertEqual(None, result._extractBenchmarkTime(test))
995
def test_startTests(self):
996
"""Starting the first test should trigger startTests."""
997
class InstrumentedTestResult(tests.ExtendedTestResult):
999
def startTests(self): self.calls += 1
1000
result = InstrumentedTestResult(None, None, None, None)
1001
def test_function():
1003
test = unittest.FunctionTestCase(test_function)
1005
self.assertEquals(1, result.calls)
1007
def test_startTests_only_once(self):
1008
"""With multiple tests startTests should still only be called once"""
1009
class InstrumentedTestResult(tests.ExtendedTestResult):
1011
def startTests(self): self.calls += 1
1012
result = InstrumentedTestResult(None, None, None, None)
1013
suite = unittest.TestSuite([
1014
unittest.FunctionTestCase(lambda: None),
1015
unittest.FunctionTestCase(lambda: None)])
1017
self.assertEquals(1, result.calls)
1018
self.assertEquals(2, result.count)
1021
class TestRunner(tests.TestCase):
1023
def dummy_test(self):
1026
def run_test_runner(self, testrunner, test):
1027
"""Run suite in testrunner, saving global state and restoring it.
1029
This current saves and restores:
1030
TestCaseInTempDir.TEST_ROOT
1032
There should be no tests in this file that use
1033
bzrlib.tests.TextTestRunner without using this convenience method,
1034
because of our use of global state.
1036
old_root = tests.TestCaseInTempDir.TEST_ROOT
1038
tests.TestCaseInTempDir.TEST_ROOT = None
1039
return testrunner.run(test)
1041
tests.TestCaseInTempDir.TEST_ROOT = old_root
1043
def test_known_failure_failed_run(self):
1044
# run a test that generates a known failure which should be printed in
1045
# the final output when real failures occur.
1046
class Test(tests.TestCase):
1047
def known_failure_test(self):
1048
self.expectFailure('failed', self.assertTrue, False)
1049
test = unittest.TestSuite()
1050
test.addTest(Test("known_failure_test"))
1052
raise AssertionError('foo')
1053
test.addTest(unittest.FunctionTestCase(failing_test))
1055
runner = tests.TextTestRunner(stream=stream)
1056
result = self.run_test_runner(runner, test)
1057
lines = stream.getvalue().splitlines()
1058
self.assertContainsRe(stream.getvalue(),
1059
'(?sm)^bzr selftest.*$'
1061
'^======================================================================\n'
1062
'^FAIL: failing_test\n'
1063
'^----------------------------------------------------------------------\n'
1064
'Traceback \\(most recent call last\\):\n'
1065
' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1066
' raise AssertionError\\(\'foo\'\\)\n'
1068
'^----------------------------------------------------------------------\n'
1070
'FAILED \\(failures=1, known_failure_count=1\\)'
1073
def test_known_failure_ok_run(self):
1074
# run a test that generates a known failure which should be printed in
1076
class Test(tests.TestCase):
1077
def known_failure_test(self):
1078
self.knownFailure("Never works...")
1079
test = Test("known_failure_test")
1081
runner = tests.TextTestRunner(stream=stream)
1082
result = self.run_test_runner(runner, test)
1083
self.assertContainsRe(stream.getvalue(),
1086
'Ran 1 test in .*\n'
1088
'OK \\(known_failures=1\\)\n')
1090
def test_unexpected_success_bad(self):
1091
class Test(tests.TestCase):
1092
def test_truth(self):
1093
self.expectFailure("No absolute truth", self.assertTrue, True)
1094
runner = tests.TextTestRunner(stream=StringIO())
1095
result = self.run_test_runner(runner, Test("test_truth"))
1096
self.assertContainsRe(runner.stream.getvalue(),
1098
"FAIL: \\S+\.test_truth\n"
1101
"\\s*(?:Text attachment: )?reason"
1107
"Ran 1 test in .*\n"
1109
"FAILED \\(failures=1\\)\n\\Z")
1111
def test_result_decorator(self):
1114
class LoggingDecorator(ExtendedToOriginalDecorator):
1115
def startTest(self, test):
1116
ExtendedToOriginalDecorator.startTest(self, test)
1117
calls.append('start')
1118
test = unittest.FunctionTestCase(lambda:None)
1120
runner = tests.TextTestRunner(stream=stream,
1121
result_decorators=[LoggingDecorator])
1122
result = self.run_test_runner(runner, test)
1123
self.assertLength(1, calls)
1125
def test_skipped_test(self):
1126
# run a test that is skipped, and check the suite as a whole still
1128
# skipping_test must be hidden in here so it's not run as a real test
1129
class SkippingTest(tests.TestCase):
1130
def skipping_test(self):
1131
raise tests.TestSkipped('test intentionally skipped')
1132
runner = tests.TextTestRunner(stream=self._log_file)
1133
test = SkippingTest("skipping_test")
1134
result = self.run_test_runner(runner, test)
1135
self.assertTrue(result.wasSuccessful())
1137
def test_skipped_from_setup(self):
1139
class SkippedSetupTest(tests.TestCase):
1142
calls.append('setUp')
1143
self.addCleanup(self.cleanup)
1144
raise tests.TestSkipped('skipped setup')
1146
def test_skip(self):
1147
self.fail('test reached')
1150
calls.append('cleanup')
1152
runner = tests.TextTestRunner(stream=self._log_file)
1153
test = SkippedSetupTest('test_skip')
1154
result = self.run_test_runner(runner, test)
1155
self.assertTrue(result.wasSuccessful())
1156
# Check if cleanup was called the right number of times.
1157
self.assertEqual(['setUp', 'cleanup'], calls)
1159
def test_skipped_from_test(self):
1161
class SkippedTest(tests.TestCase):
1164
super(SkippedTest, self).setUp()
1165
calls.append('setUp')
1166
self.addCleanup(self.cleanup)
1168
def test_skip(self):
1169
raise tests.TestSkipped('skipped test')
1172
calls.append('cleanup')
1174
runner = tests.TextTestRunner(stream=self._log_file)
1175
test = SkippedTest('test_skip')
1176
result = self.run_test_runner(runner, test)
1177
self.assertTrue(result.wasSuccessful())
1178
# Check if cleanup was called the right number of times.
1179
self.assertEqual(['setUp', 'cleanup'], calls)
1181
def test_not_applicable(self):
1182
# run a test that is skipped because it's not applicable
1183
class Test(tests.TestCase):
1184
def not_applicable_test(self):
1185
raise tests.TestNotApplicable('this test never runs')
1187
runner = tests.TextTestRunner(stream=out, verbosity=2)
1188
test = Test("not_applicable_test")
1189
result = self.run_test_runner(runner, test)
1190
self._log_file.write(out.getvalue())
1191
self.assertTrue(result.wasSuccessful())
1192
self.assertTrue(result.wasStrictlySuccessful())
1193
self.assertContainsRe(out.getvalue(),
1194
r'(?m)not_applicable_test * N/A')
1195
self.assertContainsRe(out.getvalue(),
1196
r'(?m)^ this test never runs')
1198
def test_unsupported_features_listed(self):
1199
"""When unsupported features are encountered they are detailed."""
1200
class Feature1(features.Feature):
1201
def _probe(self): return False
1202
class Feature2(features.Feature):
1203
def _probe(self): return False
1204
# create sample tests
1205
test1 = SampleTestCase('_test_pass')
1206
test1._test_needs_features = [Feature1()]
1207
test2 = SampleTestCase('_test_pass')
1208
test2._test_needs_features = [Feature2()]
1209
test = unittest.TestSuite()
1213
runner = tests.TextTestRunner(stream=stream)
1214
result = self.run_test_runner(runner, test)
1215
lines = stream.getvalue().splitlines()
1218
"Missing feature 'Feature1' skipped 1 tests.",
1219
"Missing feature 'Feature2' skipped 1 tests.",
1223
def test_verbose_test_count(self):
1224
"""A verbose test run reports the right test count at the start"""
1225
suite = TestUtil.TestSuite([
1226
unittest.FunctionTestCase(lambda:None),
1227
unittest.FunctionTestCase(lambda:None)])
1228
self.assertEqual(suite.countTestCases(), 2)
1230
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1231
# Need to use the CountingDecorator as that's what sets num_tests
1232
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1233
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1235
def test_startTestRun(self):
1236
"""run should call result.startTestRun()"""
1238
class LoggingDecorator(ExtendedToOriginalDecorator):
1239
def startTestRun(self):
1240
ExtendedToOriginalDecorator.startTestRun(self)
1241
calls.append('startTestRun')
1242
test = unittest.FunctionTestCase(lambda:None)
1244
runner = tests.TextTestRunner(stream=stream,
1245
result_decorators=[LoggingDecorator])
1246
result = self.run_test_runner(runner, test)
1247
self.assertLength(1, calls)
1249
def test_stopTestRun(self):
1250
"""run should call result.stopTestRun()"""
1252
class LoggingDecorator(ExtendedToOriginalDecorator):
1253
def stopTestRun(self):
1254
ExtendedToOriginalDecorator.stopTestRun(self)
1255
calls.append('stopTestRun')
1256
test = unittest.FunctionTestCase(lambda:None)
1258
runner = tests.TextTestRunner(stream=stream,
1259
result_decorators=[LoggingDecorator])
1260
result = self.run_test_runner(runner, test)
1261
self.assertLength(1, calls)
1263
def test_unicode_test_output_on_ascii_stream(self):
1264
"""Showing results should always succeed even on an ascii console"""
1265
class FailureWithUnicode(tests.TestCase):
1266
def test_log_unicode(self):
1268
self.fail("Now print that log!")
1270
self.overrideAttr(osutils, "get_terminal_encoding",
1271
lambda trace=False: "ascii")
1272
result = self.run_test_runner(tests.TextTestRunner(stream=out),
1273
FailureWithUnicode("test_log_unicode"))
1274
self.assertContainsRe(out.getvalue(),
1275
"(?:Text attachment: )?log"
1277
"\d+\.\d+ \\\\u2606"
1281
class SampleTestCase(tests.TestCase):
1283
def _test_pass(self):
1286
class _TestException(Exception):
1290
class TestTestCase(tests.TestCase):
1291
"""Tests that test the core bzrlib TestCase."""
1293
def test_assertLength_matches_empty(self):
1295
self.assertLength(0, a_list)
1297
def test_assertLength_matches_nonempty(self):
1299
self.assertLength(3, a_list)
1301
def test_assertLength_fails_different(self):
1303
self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1305
def test_assertLength_shows_sequence_in_failure(self):
1307
exception = self.assertRaises(AssertionError, self.assertLength, 2,
1309
self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1312
def test_base_setUp_not_called_causes_failure(self):
1313
class TestCaseWithBrokenSetUp(tests.TestCase):
1315
pass # does not call TestCase.setUp
1318
test = TestCaseWithBrokenSetUp('test_foo')
1319
result = unittest.TestResult()
1321
self.assertFalse(result.wasSuccessful())
1322
self.assertEqual(1, result.testsRun)
1324
def test_base_tearDown_not_called_causes_failure(self):
1325
class TestCaseWithBrokenTearDown(tests.TestCase):
1327
pass # does not call TestCase.tearDown
1330
test = TestCaseWithBrokenTearDown('test_foo')
1331
result = unittest.TestResult()
1333
self.assertFalse(result.wasSuccessful())
1334
self.assertEqual(1, result.testsRun)
1336
def test_debug_flags_sanitised(self):
1337
"""The bzrlib debug flags should be sanitised by setUp."""
1338
if 'allow_debug' in tests.selftest_debug_flags:
1339
raise tests.TestNotApplicable(
1340
'-Eallow_debug option prevents debug flag sanitisation')
1341
# we could set something and run a test that will check
1342
# it gets santised, but this is probably sufficient for now:
1343
# if someone runs the test with -Dsomething it will error.
1345
if self._lock_check_thorough:
1346
flags.add('strict_locks')
1347
self.assertEqual(flags, bzrlib.debug.debug_flags)
1349
def change_selftest_debug_flags(self, new_flags):
1350
self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1352
def test_allow_debug_flag(self):
1353
"""The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
1354
sanitised (i.e. cleared) before running a test.
1356
self.change_selftest_debug_flags(set(['allow_debug']))
1357
bzrlib.debug.debug_flags = set(['a-flag'])
1358
class TestThatRecordsFlags(tests.TestCase):
1359
def test_foo(nested_self):
1360
self.flags = set(bzrlib.debug.debug_flags)
1361
test = TestThatRecordsFlags('test_foo')
1362
test.run(self.make_test_result())
1363
flags = set(['a-flag'])
1364
if 'disable_lock_checks' not in tests.selftest_debug_flags:
1365
flags.add('strict_locks')
1366
self.assertEqual(flags, self.flags)
1368
def test_disable_lock_checks(self):
1369
"""The -Edisable_lock_checks flag disables thorough checks."""
1370
class TestThatRecordsFlags(tests.TestCase):
1371
def test_foo(nested_self):
1372
self.flags = set(bzrlib.debug.debug_flags)
1373
self.test_lock_check_thorough = nested_self._lock_check_thorough
1374
self.change_selftest_debug_flags(set())
1375
test = TestThatRecordsFlags('test_foo')
1376
test.run(self.make_test_result())
1377
# By default we do strict lock checking and thorough lock/unlock
1379
self.assertTrue(self.test_lock_check_thorough)
1380
self.assertEqual(set(['strict_locks']), self.flags)
1381
# Now set the disable_lock_checks flag, and show that this changed.
1382
self.change_selftest_debug_flags(set(['disable_lock_checks']))
1383
test = TestThatRecordsFlags('test_foo')
1384
test.run(self.make_test_result())
1385
self.assertFalse(self.test_lock_check_thorough)
1386
self.assertEqual(set(), self.flags)
1388
def test_this_fails_strict_lock_check(self):
1389
class TestThatRecordsFlags(tests.TestCase):
1390
def test_foo(nested_self):
1391
self.flags1 = set(bzrlib.debug.debug_flags)
1392
self.thisFailsStrictLockCheck()
1393
self.flags2 = set(bzrlib.debug.debug_flags)
1394
# Make sure lock checking is active
1395
self.change_selftest_debug_flags(set())
1396
test = TestThatRecordsFlags('test_foo')
1397
test.run(self.make_test_result())
1398
self.assertEqual(set(['strict_locks']), self.flags1)
1399
self.assertEqual(set(), self.flags2)
1401
def test_debug_flags_restored(self):
1402
"""The bzrlib debug flags should be restored to their original state
1403
after the test was run, even if allow_debug is set.
1405
self.change_selftest_debug_flags(set(['allow_debug']))
1406
# Now run a test that modifies debug.debug_flags.
1407
bzrlib.debug.debug_flags = set(['original-state'])
1408
class TestThatModifiesFlags(tests.TestCase):
1410
bzrlib.debug.debug_flags = set(['modified'])
1411
test = TestThatModifiesFlags('test_foo')
1412
test.run(self.make_test_result())
1413
self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags)
1415
def make_test_result(self):
1416
"""Get a test result that writes to the test log file."""
1417
return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1)
1419
def inner_test(self):
1420
# the inner child test
1423
def outer_child(self):
1424
# the outer child test
1426
self.inner_test = TestTestCase("inner_child")
1427
result = self.make_test_result()
1428
self.inner_test.run(result)
1429
note("outer finish")
1430
self.addCleanup(osutils.delete_any, self._log_file_name)
1432
def test_trace_nesting(self):
1433
# this tests that each test case nests its trace facility correctly.
1434
# we do this by running a test case manually. That test case (A)
1435
# should setup a new log, log content to it, setup a child case (B),
1436
# which should log independently, then case (A) should log a trailer
1438
# we do two nested children so that we can verify the state of the
1439
# logs after the outer child finishes is correct, which a bad clean
1440
# up routine in tearDown might trigger a fault in our test with only
1441
# one child, we should instead see the bad result inside our test with
1443
# the outer child test
1444
original_trace = bzrlib.trace._trace_file
1445
outer_test = TestTestCase("outer_child")
1446
result = self.make_test_result()
1447
outer_test.run(result)
1448
self.assertEqual(original_trace, bzrlib.trace._trace_file)
1450
def method_that_times_a_bit_twice(self):
1451
# call self.time twice to ensure it aggregates
1452
self.time(time.sleep, 0.007)
1453
self.time(time.sleep, 0.007)
1455
def test_time_creates_benchmark_in_result(self):
1456
"""Test that the TestCase.time() method accumulates a benchmark time."""
1457
sample_test = TestTestCase("method_that_times_a_bit_twice")
1458
output_stream = StringIO()
1459
result = bzrlib.tests.VerboseTestResult(
1463
sample_test.run(result)
1464
self.assertContainsRe(
1465
output_stream.getvalue(),
1468
def test_hooks_sanitised(self):
1469
"""The bzrlib hooks should be sanitised by setUp."""
1470
# Note this test won't fail with hooks that the core library doesn't
1471
# use - but it trigger with a plugin that adds hooks, so its still a
1472
# useful warning in that case.
1473
self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
1475
bzrlib.smart.server.SmartServerHooks(),
1476
bzrlib.smart.server.SmartTCPServer.hooks)
1478
bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
1480
def test__gather_lsprof_in_benchmarks(self):
1481
"""When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1483
Each self.time() call is individually and separately profiled.
1485
self.requireFeature(features.lsprof_feature)
1486
# overrides the class member with an instance member so no cleanup
1488
self._gather_lsprof_in_benchmarks = True
1489
self.time(time.sleep, 0.000)
1490
self.time(time.sleep, 0.003)
1491
self.assertEqual(2, len(self._benchcalls))
1492
self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
1493
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1494
self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
1495
self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
1496
del self._benchcalls[:]
1498
def test_knownFailure(self):
1499
"""Self.knownFailure() should raise a KnownFailure exception."""
1500
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1502
def test_open_bzrdir_safe_roots(self):
1503
# even a memory transport should fail to open when its url isn't
1505
# Manually set one up (TestCase doesn't and shouldn't provide magic
1507
transport_server = memory.MemoryServer()
1508
transport_server.start_server()
1509
self.addCleanup(transport_server.stop_server)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1512
self.assertRaises(errors.BzrError,
1513
controldir.ControlDir.open_from_transport, t)
1514
# But if we declare this as safe, we can open the bzrdir.
1515
self.permit_url(t.base)
1516
self._bzr_selftest_roots.append(t.base)
1517
controldir.ControlDir.open_from_transport(t)
1519
def test_requireFeature_available(self):
1520
"""self.requireFeature(available) is a no-op."""
1521
class Available(features.Feature):
1522
def _probe(self):return True
1523
feature = Available()
1524
self.requireFeature(feature)
1526
def test_requireFeature_unavailable(self):
1527
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1528
class Unavailable(features.Feature):
1529
def _probe(self):return False
1530
feature = Unavailable()
1531
self.assertRaises(tests.UnavailableFeature,
1532
self.requireFeature, feature)
1534
def test_run_no_parameters(self):
1535
test = SampleTestCase('_test_pass')
1538
def test_run_enabled_unittest_result(self):
1539
"""Test we revert to regular behaviour when the test is enabled."""
1540
test = SampleTestCase('_test_pass')
1541
class EnabledFeature(object):
1542
def available(self):
1544
test._test_needs_features = [EnabledFeature()]
1545
result = unittest.TestResult()
1547
self.assertEqual(1, result.testsRun)
1548
self.assertEqual([], result.errors)
1549
self.assertEqual([], result.failures)
1551
def test_run_disabled_unittest_result(self):
1552
"""Test our compatability for disabled tests with unittest results."""
1553
test = SampleTestCase('_test_pass')
1554
class DisabledFeature(object):
1555
def available(self):
1557
test._test_needs_features = [DisabledFeature()]
1558
result = unittest.TestResult()
1560
self.assertEqual(1, result.testsRun)
1561
self.assertEqual([], result.errors)
1562
self.assertEqual([], result.failures)
1564
def test_run_disabled_supporting_result(self):
1565
"""Test disabled tests behaviour with support aware results."""
1566
test = SampleTestCase('_test_pass')
1567
class DisabledFeature(object):
1568
def __eq__(self, other):
1569
return isinstance(other, DisabledFeature)
1570
def available(self):
1572
the_feature = DisabledFeature()
1573
test._test_needs_features = [the_feature]
1574
class InstrumentedTestResult(unittest.TestResult):
1576
unittest.TestResult.__init__(self)
1578
def startTest(self, test):
1579
self.calls.append(('startTest', test))
1580
def stopTest(self, test):
1581
self.calls.append(('stopTest', test))
1582
def addNotSupported(self, test, feature):
1583
self.calls.append(('addNotSupported', test, feature))
1584
result = InstrumentedTestResult()
1586
case = result.calls[0][1]
1588
('startTest', case),
1589
('addNotSupported', case, the_feature),
1594
def test_start_server_registers_url(self):
1595
transport_server = memory.MemoryServer()
1596
# A little strict, but unlikely to be changed soon.
1597
self.assertEqual([], self._bzr_selftest_roots)
1598
self.start_server(transport_server)
1599
self.assertSubset([transport_server.get_url()],
1600
self._bzr_selftest_roots)
1602
def test_assert_list_raises_on_generator(self):
1603
def generator_which_will_raise():
1604
# This will not raise until after the first yield
1606
raise _TestException()
1608
e = self.assertListRaises(_TestException, generator_which_will_raise)
1609
self.assertIsInstance(e, _TestException)
1611
e = self.assertListRaises(Exception, generator_which_will_raise)
1612
self.assertIsInstance(e, _TestException)
1614
def test_assert_list_raises_on_plain(self):
1615
def plain_exception():
1616
raise _TestException()
1619
e = self.assertListRaises(_TestException, plain_exception)
1620
self.assertIsInstance(e, _TestException)
1622
e = self.assertListRaises(Exception, plain_exception)
1623
self.assertIsInstance(e, _TestException)
1625
def test_assert_list_raises_assert_wrong_exception(self):
1626
class _NotTestException(Exception):
1629
def wrong_exception():
1630
raise _NotTestException()
1632
def wrong_exception_generator():
1635
raise _NotTestException()
1637
# Wrong exceptions are not intercepted
1638
self.assertRaises(_NotTestException,
1639
self.assertListRaises, _TestException, wrong_exception)
1640
self.assertRaises(_NotTestException,
1641
self.assertListRaises, _TestException, wrong_exception_generator)
1643
def test_assert_list_raises_no_exception(self):
1647
def success_generator():
1651
self.assertRaises(AssertionError,
1652
self.assertListRaises, _TestException, success)
1654
self.assertRaises(AssertionError,
1655
self.assertListRaises, _TestException, success_generator)
1657
def test_overrideAttr_without_value(self):
1658
self.test_attr = 'original' # Define a test attribute
1659
obj = self # Make 'obj' visible to the embedded test
1660
class Test(tests.TestCase):
1663
super(Test, self).setUp()
1664
self.orig = self.overrideAttr(obj, 'test_attr')
1666
def test_value(self):
1667
self.assertEqual('original', self.orig)
1668
self.assertEqual('original', obj.test_attr)
1669
obj.test_attr = 'modified'
1670
self.assertEqual('modified', obj.test_attr)
1672
test = Test('test_value')
1673
test.run(unittest.TestResult())
1674
self.assertEqual('original', obj.test_attr)
1676
def test_overrideAttr_with_value(self):
1677
self.test_attr = 'original' # Define a test attribute
1678
obj = self # Make 'obj' visible to the embedded test
1679
class Test(tests.TestCase):
1682
super(Test, self).setUp()
1683
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1685
def test_value(self):
1686
self.assertEqual('original', self.orig)
1687
self.assertEqual('modified', obj.test_attr)
1689
test = Test('test_value')
1690
test.run(unittest.TestResult())
1691
self.assertEqual('original', obj.test_attr)
1693
def test_recordCalls(self):
1694
from bzrlib.tests import test_selftest
1695
calls = self.recordCalls(
1696
test_selftest, '_add_numbers')
1697
self.assertEqual(test_selftest._add_numbers(2, 10),
1699
self.assertEquals(calls, [((2, 10), {})])
1702
def _add_numbers(a, b):
1706
class _MissingFeature(features.Feature):
1709
missing_feature = _MissingFeature()
1712
def _get_test(name):
1713
"""Get an instance of a specific example test.
1715
We protect this in a function so that they don't auto-run in the test
1719
class ExampleTests(tests.TestCase):
1721
def test_fail(self):
1722
mutter('this was a failing test')
1723
self.fail('this test will fail')
1725
def test_error(self):
1726
mutter('this test errored')
1727
raise RuntimeError('gotcha')
1729
def test_missing_feature(self):
1730
mutter('missing the feature')
1731
self.requireFeature(missing_feature)
1733
def test_skip(self):
1734
mutter('this test will be skipped')
1735
raise tests.TestSkipped('reason')
1737
def test_success(self):
1738
mutter('this test succeeds')
1740
def test_xfail(self):
1741
mutter('test with expected failure')
1742
self.knownFailure('this_fails')
1744
def test_unexpected_success(self):
1745
mutter('test with unexpected success')
1746
self.expectFailure('should_fail', lambda: None)
1748
return ExampleTests(name)
1751
class TestTestCaseLogDetails(tests.TestCase):
1753
def _run_test(self, test_name):
1754
test = _get_test(test_name)
1755
result = testtools.TestResult()
1759
def test_fail_has_log(self):
1760
result = self._run_test('test_fail')
1761
self.assertEqual(1, len(result.failures))
1762
result_content = result.failures[0][1]
1763
self.assertContainsRe(result_content,
1764
'(?m)^(?:Text attachment: )?log(?:$|: )')
1765
self.assertContainsRe(result_content, 'this was a failing test')
1767
def test_error_has_log(self):
1768
result = self._run_test('test_error')
1769
self.assertEqual(1, len(result.errors))
1770
result_content = result.errors[0][1]
1771
self.assertContainsRe(result_content,
1772
'(?m)^(?:Text attachment: )?log(?:$|: )')
1773
self.assertContainsRe(result_content, 'this test errored')
1775
def test_skip_has_no_log(self):
1776
result = self._run_test('test_skip')
1777
self.assertEqual(['reason'], result.skip_reasons.keys())
1778
skips = result.skip_reasons['reason']
1779
self.assertEqual(1, len(skips))
1781
self.assertFalse('log' in test.getDetails())
1783
def test_missing_feature_has_no_log(self):
1784
# testtools doesn't know about addNotSupported, so it just gets
1785
# considered as a skip
1786
result = self._run_test('test_missing_feature')
1787
self.assertEqual([missing_feature], result.skip_reasons.keys())
1788
skips = result.skip_reasons[missing_feature]
1789
self.assertEqual(1, len(skips))
1791
self.assertFalse('log' in test.getDetails())
1793
def test_xfail_has_no_log(self):
1794
result = self._run_test('test_xfail')
1795
self.assertEqual(1, len(result.expectedFailures))
1796
result_content = result.expectedFailures[0][1]
1797
self.assertNotContainsRe(result_content,
1798
'(?m)^(?:Text attachment: )?log(?:$|: )')
1799
self.assertNotContainsRe(result_content, 'test with expected failure')
1801
def test_unexpected_success_has_log(self):
1802
result = self._run_test('test_unexpected_success')
1803
self.assertEqual(1, len(result.unexpectedSuccesses))
1804
# Inconsistency, unexpectedSuccesses is a list of tests,
1805
# expectedFailures is a list of reasons?
1806
test = result.unexpectedSuccesses[0]
1807
details = test.getDetails()
1808
self.assertTrue('log' in details)
1811
class TestTestCloning(tests.TestCase):
1812
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1814
def test_cloned_testcase_does_not_share_details(self):
1815
"""A TestCase cloned with clone_test does not share mutable attributes
1816
such as details or cleanups.
1818
class Test(tests.TestCase):
1820
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1821
orig_test = Test('test_foo')
1822
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1823
orig_test.run(unittest.TestResult())
1824
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1825
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1827
def test_double_apply_scenario_preserves_first_scenario(self):
1828
"""Applying two levels of scenarios to a test preserves the attributes
1829
added by both scenarios.
1831
class Test(tests.TestCase):
1834
test = Test('test_foo')
1835
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1836
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1837
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1838
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1839
all_tests = list(tests.iter_suite_tests(suite))
1840
self.assertLength(4, all_tests)
1841
all_xys = sorted((t.x, t.y) for t in all_tests)
1842
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1845
# NB: Don't delete this; it's not actually from 0.11!
1846
@deprecated_function(deprecated_in((0, 11, 0)))
1847
def sample_deprecated_function():
1848
"""A deprecated function to test applyDeprecated with."""
1852
def sample_undeprecated_function(a_param):
1853
"""A undeprecated function to test applyDeprecated with."""
1856
class ApplyDeprecatedHelper(object):
1857
"""A helper class for ApplyDeprecated tests."""
1859
@deprecated_method(deprecated_in((0, 11, 0)))
1860
def sample_deprecated_method(self, param_one):
1861
"""A deprecated method for testing with."""
1864
def sample_normal_method(self):
1865
"""A undeprecated method."""
1867
@deprecated_method(deprecated_in((0, 10, 0)))
1868
def sample_nested_deprecation(self):
1869
return sample_deprecated_function()
1872
class TestExtraAssertions(tests.TestCase):
1873
"""Tests for new test assertions in bzrlib test suite"""
1875
def test_assert_isinstance(self):
1876
self.assertIsInstance(2, int)
1877
self.assertIsInstance(u'', basestring)
1878
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1879
self.assertEquals(str(e),
1880
"None is an instance of <type 'NoneType'> rather than <type 'int'>")
1881
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1882
e = self.assertRaises(AssertionError,
1883
self.assertIsInstance, None, int, "it's just not")
1884
self.assertEquals(str(e),
1885
"None is an instance of <type 'NoneType'> rather than <type 'int'>"
1888
def test_assertEndsWith(self):
1889
self.assertEndsWith('foo', 'oo')
1890
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1892
def test_assertEqualDiff(self):
1893
e = self.assertRaises(AssertionError,
1894
self.assertEqualDiff, '', '\n')
1895
self.assertEquals(str(e),
1896
# Don't blink ! The '+' applies to the second string
1897
'first string is missing a final newline.\n+ \n')
1898
e = self.assertRaises(AssertionError,
1899
self.assertEqualDiff, '\n', '')
1900
self.assertEquals(str(e),
1901
# Don't blink ! The '-' applies to the second string
1902
'second string is missing a final newline.\n- \n')
1905
class TestDeprecations(tests.TestCase):
1907
def test_applyDeprecated_not_deprecated(self):
1908
sample_object = ApplyDeprecatedHelper()
1909
# calling an undeprecated callable raises an assertion
1910
self.assertRaises(AssertionError, self.applyDeprecated,
1911
deprecated_in((0, 11, 0)),
1912
sample_object.sample_normal_method)
1913
self.assertRaises(AssertionError, self.applyDeprecated,
1914
deprecated_in((0, 11, 0)),
1915
sample_undeprecated_function, "a param value")
1916
# calling a deprecated callable (function or method) with the wrong
1917
# expected deprecation fails.
1918
self.assertRaises(AssertionError, self.applyDeprecated,
1919
deprecated_in((0, 10, 0)),
1920
sample_object.sample_deprecated_method, "a param value")
1921
self.assertRaises(AssertionError, self.applyDeprecated,
1922
deprecated_in((0, 10, 0)),
1923
sample_deprecated_function)
1924
# calling a deprecated callable (function or method) with the right
1925
# expected deprecation returns the functions result.
1926
self.assertEqual("a param value",
1927
self.applyDeprecated(deprecated_in((0, 11, 0)),
1928
sample_object.sample_deprecated_method, "a param value"))
1929
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
1930
sample_deprecated_function))
1931
# calling a nested deprecation with the wrong deprecation version
1932
# fails even if a deeper nested function was deprecated with the
1934
self.assertRaises(AssertionError, self.applyDeprecated,
1935
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
1936
# calling a nested deprecation with the right deprecation value
1937
# returns the calls result.
1938
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1939
sample_object.sample_nested_deprecation))
1941
def test_callDeprecated(self):
1942
def testfunc(be_deprecated, result=None):
1943
if be_deprecated is True:
1944
symbol_versioning.warn('i am deprecated', DeprecationWarning,
1947
result = self.callDeprecated(['i am deprecated'], testfunc, True)
1948
self.assertIs(None, result)
1949
result = self.callDeprecated([], testfunc, False, 'result')
1950
self.assertEqual('result', result)
1951
self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
1952
self.callDeprecated([], testfunc, be_deprecated=False)
1955
class TestWarningTests(tests.TestCase):
1956
"""Tests for calling methods that raise warnings."""
1958
def test_callCatchWarnings(self):
1960
warnings.warn("this is your last warning")
1962
wlist, result = self.callCatchWarnings(meth, 1, 2)
1963
self.assertEquals(3, result)
1964
# would like just to compare them, but UserWarning doesn't implement
1967
self.assertIsInstance(w0, UserWarning)
1968
self.assertEquals("this is your last warning", str(w0))
1971
class TestConvenienceMakers(tests.TestCaseWithTransport):
1972
"""Test for the make_* convenience functions."""
1974
def test_make_branch_and_tree_with_format(self):
1975
# we should be able to supply a format to make_branch_and_tree
1976
self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
1977
self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
1978
bzrlib.bzrdir.BzrDirMetaFormat1)
1980
def test_make_branch_and_memory_tree(self):
1981
# we should be able to get a new branch and a mutable tree from
1982
# TestCaseWithTransport
1983
tree = self.make_branch_and_memory_tree('a')
1984
self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
1986
def test_make_tree_for_local_vfs_backed_transport(self):
1987
# make_branch_and_tree has to use local branch and repositories
1988
# when the vfs transport and local disk are colocated, even if
1989
# a different transport is in use for url generation.
1990
self.transport_server = test_server.FakeVFATServer
1991
self.assertFalse(self.get_url('t1').startswith('file://'))
1992
tree = self.make_branch_and_tree('t1')
1993
base = tree.bzrdir.root_transport.base
1994
self.assertStartsWith(base, 'file://')
1995
self.assertEquals(tree.bzrdir.root_transport,
1996
tree.branch.bzrdir.root_transport)
1997
self.assertEquals(tree.bzrdir.root_transport,
1998
tree.branch.repository.bzrdir.root_transport)
2001
class SelfTestHelper(object):
2003
def run_selftest(self, **kwargs):
2004
"""Run selftest returning its output."""
2006
old_transport = bzrlib.tests.default_transport
2007
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2008
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2010
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2012
bzrlib.tests.default_transport = old_transport
2013
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2018
class TestSelftest(tests.TestCase, SelfTestHelper):
2019
"""Tests of bzrlib.tests.selftest."""
2021
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2024
factory_called.append(True)
2025
return TestUtil.TestSuite()
2028
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
2029
test_suite_factory=factory)
2030
self.assertEqual([True], factory_called)
2033
"""A test suite factory."""
2034
class Test(tests.TestCase):
2041
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2043
def test_list_only(self):
2044
output = self.run_selftest(test_suite_factory=self.factory,
2046
self.assertEqual(3, len(output.readlines()))
2048
def test_list_only_filtered(self):
2049
output = self.run_selftest(test_suite_factory=self.factory,
2050
list_only=True, pattern="Test.b")
2051
self.assertEndsWith(output.getvalue(), "Test.b\n")
2052
self.assertLength(1, output.readlines())
2054
def test_list_only_excludes(self):
2055
output = self.run_selftest(test_suite_factory=self.factory,
2056
list_only=True, exclude_pattern="Test.b")
2057
self.assertNotContainsRe("Test.b", output.getvalue())
2058
self.assertLength(2, output.readlines())
2060
def test_lsprof_tests(self):
2061
self.requireFeature(features.lsprof_feature)
2064
def __call__(test, result):
2066
def run(test, result):
2067
results.append(result)
2068
def countTestCases(self):
2070
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2071
self.assertLength(1, results)
2072
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2074
def test_random(self):
2075
# test randomising by listing a number of tests.
2076
output_123 = self.run_selftest(test_suite_factory=self.factory,
2077
list_only=True, random_seed="123")
2078
output_234 = self.run_selftest(test_suite_factory=self.factory,
2079
list_only=True, random_seed="234")
2080
self.assertNotEqual(output_123, output_234)
2081
# "Randominzing test order..\n\n
2082
self.assertLength(5, output_123.readlines())
2083
self.assertLength(5, output_234.readlines())
2085
def test_random_reuse_is_same_order(self):
2086
# test randomising by listing a number of tests.
2087
expected = self.run_selftest(test_suite_factory=self.factory,
2088
list_only=True, random_seed="123")
2089
repeated = self.run_selftest(test_suite_factory=self.factory,
2090
list_only=True, random_seed="123")
2091
self.assertEqual(expected.getvalue(), repeated.getvalue())
2093
def test_runner_class(self):
2094
self.requireFeature(features.subunit)
2095
from subunit import ProtocolTestCase
2096
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2097
test_suite_factory=self.factory)
2098
test = ProtocolTestCase(stream)
2099
result = unittest.TestResult()
2101
self.assertEqual(3, result.testsRun)
2103
def test_starting_with_single_argument(self):
2104
output = self.run_selftest(test_suite_factory=self.factory,
2105
starting_with=['bzrlib.tests.test_selftest.Test.a'],
2107
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
2110
def test_starting_with_multiple_argument(self):
2111
output = self.run_selftest(test_suite_factory=self.factory,
2112
starting_with=['bzrlib.tests.test_selftest.Test.a',
2113
'bzrlib.tests.test_selftest.Test.b'],
2115
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
2116
'bzrlib.tests.test_selftest.Test.b\n',
2119
def check_transport_set(self, transport_server):
2120
captured_transport = []
2121
def seen_transport(a_transport):
2122
captured_transport.append(a_transport)
2123
class Capture(tests.TestCase):
2125
seen_transport(bzrlib.tests.default_transport)
2127
return TestUtil.TestSuite([Capture("a")])
2128
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2129
self.assertEqual(transport_server, captured_transport[0])
2131
def test_transport_sftp(self):
2132
self.requireFeature(features.paramiko)
2133
from bzrlib.tests import stub_sftp
2134
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2136
def test_transport_memory(self):
2137
self.check_transport_set(memory.MemoryServer)
2140
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2141
# Does IO: reads test.list
2143
def test_load_list(self):
2144
# Provide a list with one test - this test.
2145
test_id_line = '%s\n' % self.id()
2146
self.build_tree_contents([('test.list', test_id_line)])
2147
# And generate a list of the tests in the suite.
2148
stream = self.run_selftest(load_list='test.list', list_only=True)
2149
self.assertEqual(test_id_line, stream.getvalue())
2151
def test_load_unknown(self):
2152
# Provide a list with one test - this test.
2153
# And generate a list of the tests in the suite.
2154
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2155
load_list='missing file name', list_only=True)
2158
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2160
_test_needs_features = [features.subunit]
2162
def run_subunit_stream(self, test_name):
2163
from subunit import ProtocolTestCase
2165
return TestUtil.TestSuite([_get_test(test_name)])
2166
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2167
test_suite_factory=factory)
2168
test = ProtocolTestCase(stream)
2169
result = testtools.TestResult()
2171
content = stream.getvalue()
2172
return content, result
2174
def test_fail_has_log(self):
2175
content, result = self.run_subunit_stream('test_fail')
2176
self.assertEqual(1, len(result.failures))
2177
self.assertContainsRe(content, '(?m)^log$')
2178
self.assertContainsRe(content, 'this test will fail')
2180
def test_error_has_log(self):
2181
content, result = self.run_subunit_stream('test_error')
2182
self.assertContainsRe(content, '(?m)^log$')
2183
self.assertContainsRe(content, 'this test errored')
2185
def test_skip_has_no_log(self):
2186
content, result = self.run_subunit_stream('test_skip')
2187
self.assertNotContainsRe(content, '(?m)^log$')
2188
self.assertNotContainsRe(content, 'this test will be skipped')
2189
self.assertEqual(['reason'], result.skip_reasons.keys())
2190
skips = result.skip_reasons['reason']
2191
self.assertEqual(1, len(skips))
2193
# RemotedTestCase doesn't preserve the "details"
2194
## self.assertFalse('log' in test.getDetails())
2196
def test_missing_feature_has_no_log(self):
2197
content, result = self.run_subunit_stream('test_missing_feature')
2198
self.assertNotContainsRe(content, '(?m)^log$')
2199
self.assertNotContainsRe(content, 'missing the feature')
2200
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2201
skips = result.skip_reasons['_MissingFeature\n']
2202
self.assertEqual(1, len(skips))
2204
# RemotedTestCase doesn't preserve the "details"
2205
## self.assertFalse('log' in test.getDetails())
2207
def test_xfail_has_no_log(self):
2208
content, result = self.run_subunit_stream('test_xfail')
2209
self.assertNotContainsRe(content, '(?m)^log$')
2210
self.assertNotContainsRe(content, 'test with expected failure')
2211
self.assertEqual(1, len(result.expectedFailures))
2212
result_content = result.expectedFailures[0][1]
2213
self.assertNotContainsRe(result_content,
2214
'(?m)^(?:Text attachment: )?log(?:$|: )')
2215
self.assertNotContainsRe(result_content, 'test with expected failure')
2217
def test_unexpected_success_has_log(self):
2218
content, result = self.run_subunit_stream('test_unexpected_success')
2219
self.assertContainsRe(content, '(?m)^log$')
2220
self.assertContainsRe(content, 'test with unexpected success')
2221
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2222
# success, if a min version check is added remove this
2223
from subunit import TestProtocolClient as _Client
2224
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2225
self.expectFailure('subunit treats "unexpectedSuccess"'
2226
' as a plain success',
2227
self.assertEqual, 1, len(result.unexpectedSuccesses))
2228
self.assertEqual(1, len(result.unexpectedSuccesses))
2229
test = result.unexpectedSuccesses[0]
2230
# RemotedTestCase doesn't preserve the "details"
2231
## self.assertTrue('log' in test.getDetails())
2233
def test_success_has_no_log(self):
2234
content, result = self.run_subunit_stream('test_success')
2235
self.assertEqual(1, result.testsRun)
2236
self.assertNotContainsRe(content, '(?m)^log$')
2237
self.assertNotContainsRe(content, 'this test succeeds')
2240
class TestRunBzr(tests.TestCase):
2245
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2247
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2249
Attempts to run bzr from inside this class don't actually run it.
2251
We test how run_bzr actually invokes bzr in another location. Here we
2252
only need to test that it passes the right parameters to run_bzr.
2254
self.argv = list(argv)
2255
self.retcode = retcode
2256
self.encoding = encoding
2258
self.working_dir = working_dir
2259
return self.retcode, self.out, self.err
2261
def test_run_bzr_error(self):
2262
self.out = "It sure does!\n"
2263
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2264
self.assertEqual(['rocks'], self.argv)
2265
self.assertEqual(34, self.retcode)
2266
self.assertEqual('It sure does!\n', out)
2267
self.assertEquals(out, self.out)
2268
self.assertEqual('', err)
2269
self.assertEquals(err, self.err)
2271
def test_run_bzr_error_regexes(self):
2273
self.err = "bzr: ERROR: foobarbaz is not versioned"
2274
out, err = self.run_bzr_error(
2275
["bzr: ERROR: foobarbaz is not versioned"],
2276
['file-id', 'foobarbaz'])
2278
def test_encoding(self):
2279
"""Test that run_bzr passes encoding to _run_bzr_core"""
2280
self.run_bzr('foo bar')
2281
self.assertEqual(None, self.encoding)
2282
self.assertEqual(['foo', 'bar'], self.argv)
2284
self.run_bzr('foo bar', encoding='baz')
2285
self.assertEqual('baz', self.encoding)
2286
self.assertEqual(['foo', 'bar'], self.argv)
2288
def test_retcode(self):
2289
"""Test that run_bzr passes retcode to _run_bzr_core"""
2290
# Default is retcode == 0
2291
self.run_bzr('foo bar')
2292
self.assertEqual(0, self.retcode)
2293
self.assertEqual(['foo', 'bar'], self.argv)
2295
self.run_bzr('foo bar', retcode=1)
2296
self.assertEqual(1, self.retcode)
2297
self.assertEqual(['foo', 'bar'], self.argv)
2299
self.run_bzr('foo bar', retcode=None)
2300
self.assertEqual(None, self.retcode)
2301
self.assertEqual(['foo', 'bar'], self.argv)
2303
self.run_bzr(['foo', 'bar'], retcode=3)
2304
self.assertEqual(3, self.retcode)
2305
self.assertEqual(['foo', 'bar'], self.argv)
2307
def test_stdin(self):
2308
# test that the stdin keyword to run_bzr is passed through to
2309
# _run_bzr_core as-is. We do this by overriding
2310
# _run_bzr_core in this class, and then calling run_bzr,
2311
# which is a convenience function for _run_bzr_core, so
2313
self.run_bzr('foo bar', stdin='gam')
2314
self.assertEqual('gam', self.stdin)
2315
self.assertEqual(['foo', 'bar'], self.argv)
2317
self.run_bzr('foo bar', stdin='zippy')
2318
self.assertEqual('zippy', self.stdin)
2319
self.assertEqual(['foo', 'bar'], self.argv)
2321
def test_working_dir(self):
2322
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2323
self.run_bzr('foo bar')
2324
self.assertEqual(None, self.working_dir)
2325
self.assertEqual(['foo', 'bar'], self.argv)
2327
self.run_bzr('foo bar', working_dir='baz')
2328
self.assertEqual('baz', self.working_dir)
2329
self.assertEqual(['foo', 'bar'], self.argv)
2331
def test_reject_extra_keyword_arguments(self):
2332
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2333
error_regex=['error message'])
2336
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2337
# Does IO when testing the working_dir parameter.
2339
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2340
a_callable=None, *args, **kwargs):
2342
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2343
self.factory = bzrlib.ui.ui_factory
2344
self.working_dir = osutils.getcwd()
2345
stdout.write('foo\n')
2346
stderr.write('bar\n')
2349
def test_stdin(self):
2350
# test that the stdin keyword to _run_bzr_core is passed through to
2351
# apply_redirected as a StringIO. We do this by overriding
2352
# apply_redirected in this class, and then calling _run_bzr_core,
2353
# which calls apply_redirected.
2354
self.run_bzr(['foo', 'bar'], stdin='gam')
2355
self.assertEqual('gam', self.stdin.read())
2356
self.assertTrue(self.stdin is self.factory_stdin)
2357
self.run_bzr(['foo', 'bar'], stdin='zippy')
2358
self.assertEqual('zippy', self.stdin.read())
2359
self.assertTrue(self.stdin is self.factory_stdin)
2361
def test_ui_factory(self):
2362
# each invocation of self.run_bzr should get its
2363
# own UI factory, which is an instance of TestUIFactory,
2364
# with stdin, stdout and stderr attached to the stdin,
2365
# stdout and stderr of the invoked run_bzr
2366
current_factory = bzrlib.ui.ui_factory
2367
self.run_bzr(['foo'])
2368
self.assertFalse(current_factory is self.factory)
2369
self.assertNotEqual(sys.stdout, self.factory.stdout)
2370
self.assertNotEqual(sys.stderr, self.factory.stderr)
2371
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2372
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2373
self.assertIsInstance(self.factory, tests.TestUIFactory)
2375
def test_working_dir(self):
2376
self.build_tree(['one/', 'two/'])
2377
cwd = osutils.getcwd()
2379
# Default is to work in the current directory
2380
self.run_bzr(['foo', 'bar'])
2381
self.assertEqual(cwd, self.working_dir)
2383
self.run_bzr(['foo', 'bar'], working_dir=None)
2384
self.assertEqual(cwd, self.working_dir)
2386
# The function should be run in the alternative directory
2387
# but afterwards the current working dir shouldn't be changed
2388
self.run_bzr(['foo', 'bar'], working_dir='one')
2389
self.assertNotEqual(cwd, self.working_dir)
2390
self.assertEndsWith(self.working_dir, 'one')
2391
self.assertEqual(cwd, osutils.getcwd())
2393
self.run_bzr(['foo', 'bar'], working_dir='two')
2394
self.assertNotEqual(cwd, self.working_dir)
2395
self.assertEndsWith(self.working_dir, 'two')
2396
self.assertEqual(cwd, osutils.getcwd())
2399
class StubProcess(object):
2400
"""A stub process for testing run_bzr_subprocess."""
2402
def __init__(self, out="", err="", retcode=0):
2405
self.returncode = retcode
2407
def communicate(self):
2408
return self.out, self.err
2411
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2412
"""Base class for tests testing how we might run bzr."""
2415
super(TestWithFakedStartBzrSubprocess, self).setUp()
2416
self.subprocess_calls = []
2418
def start_bzr_subprocess(self, process_args, env_changes=None,
2419
skip_if_plan_to_signal=False,
2421
allow_plugins=False):
2422
"""capture what run_bzr_subprocess tries to do."""
2423
self.subprocess_calls.append({'process_args':process_args,
2424
'env_changes':env_changes,
2425
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2426
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2427
return self.next_subprocess
2430
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2432
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2433
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2435
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2436
that will return static results. This assertion method populates those
2437
results and also checks the arguments run_bzr_subprocess generates.
2439
self.next_subprocess = process
2441
result = self.run_bzr_subprocess(*args, **kwargs)
2443
self.next_subprocess = None
2444
for key, expected in expected_args.iteritems():
2445
self.assertEqual(expected, self.subprocess_calls[-1][key])
2448
self.next_subprocess = None
2449
for key, expected in expected_args.iteritems():
2450
self.assertEqual(expected, self.subprocess_calls[-1][key])
2453
def test_run_bzr_subprocess(self):
2454
"""The run_bzr_helper_external command behaves nicely."""
2455
self.assertRunBzrSubprocess({'process_args':['--version']},
2456
StubProcess(), '--version')
2457
self.assertRunBzrSubprocess({'process_args':['--version']},
2458
StubProcess(), ['--version'])
2459
# retcode=None disables retcode checking
2460
result = self.assertRunBzrSubprocess({},
2461
StubProcess(retcode=3), '--version', retcode=None)
2462
result = self.assertRunBzrSubprocess({},
2463
StubProcess(out="is free software"), '--version')
2464
self.assertContainsRe(result[0], 'is free software')
2465
# Running a subcommand that is missing errors
2466
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2467
{'process_args':['--versionn']}, StubProcess(retcode=3),
2469
# Unless it is told to expect the error from the subprocess
2470
result = self.assertRunBzrSubprocess({},
2471
StubProcess(retcode=3), '--versionn', retcode=3)
2472
# Or to ignore retcode checking
2473
result = self.assertRunBzrSubprocess({},
2474
StubProcess(err="unknown command", retcode=3), '--versionn',
2476
self.assertContainsRe(result[1], 'unknown command')
2478
def test_env_change_passes_through(self):
2479
self.assertRunBzrSubprocess(
2480
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2482
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2484
def test_no_working_dir_passed_as_None(self):
2485
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2487
def test_no_working_dir_passed_through(self):
2488
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2491
def test_run_bzr_subprocess_no_plugins(self):
2492
self.assertRunBzrSubprocess({'allow_plugins': False},
2495
def test_allow_plugins(self):
2496
self.assertRunBzrSubprocess({'allow_plugins': True},
2497
StubProcess(), '', allow_plugins=True)
2500
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2502
def test_finish_bzr_subprocess_with_error(self):
2503
"""finish_bzr_subprocess allows specification of the desired exit code.
2505
process = StubProcess(err="unknown command", retcode=3)
2506
result = self.finish_bzr_subprocess(process, retcode=3)
2507
self.assertEqual('', result[0])
2508
self.assertContainsRe(result[1], 'unknown command')
2510
def test_finish_bzr_subprocess_ignoring_retcode(self):
2511
"""finish_bzr_subprocess allows the exit code to be ignored."""
2512
process = StubProcess(err="unknown command", retcode=3)
2513
result = self.finish_bzr_subprocess(process, retcode=None)
2514
self.assertEqual('', result[0])
2515
self.assertContainsRe(result[1], 'unknown command')
2517
def test_finish_subprocess_with_unexpected_retcode(self):
2518
"""finish_bzr_subprocess raises self.failureException if the retcode is
2519
not the expected one.
2521
process = StubProcess(err="unknown command", retcode=3)
2522
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2526
class _DontSpawnProcess(Exception):
2527
"""A simple exception which just allows us to skip unnecessary steps"""
2530
class TestStartBzrSubProcess(tests.TestCase):
2531
"""Stub test start_bzr_subprocess."""
2533
def _subprocess_log_cleanup(self):
2534
"""Inhibits the base version as we don't produce a log file."""
2536
def _popen(self, *args, **kwargs):
2537
"""Override the base version to record the command that is run.
2539
From there we can ensure it is correct without spawning a real process.
2541
self.check_popen_state()
2542
self._popen_args = args
2543
self._popen_kwargs = kwargs
2544
raise _DontSpawnProcess()
2546
def check_popen_state(self):
2547
"""Replace to make assertions when popen is called."""
2549
def test_run_bzr_subprocess_no_plugins(self):
2550
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2551
command = self._popen_args[0]
2552
self.assertEqual(sys.executable, command[0])
2553
self.assertEqual(self.get_bzr_path(), command[1])
2554
self.assertEqual(['--no-plugins'], command[2:])
2556
def test_allow_plugins(self):
2557
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2559
command = self._popen_args[0]
2560
self.assertEqual([], command[2:])
2562
def test_set_env(self):
2563
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2565
def check_environment():
2566
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2567
self.check_popen_state = check_environment
2568
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2569
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2570
# not set in theparent
2571
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2573
def test_run_bzr_subprocess_env_del(self):
2574
"""run_bzr_subprocess can remove environment variables too."""
2575
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2576
def check_environment():
2577
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2578
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2579
self.check_popen_state = check_environment
2580
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2581
env_changes={'EXISTANT_ENV_VAR':None})
2582
# Still set in parent
2583
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2584
del os.environ['EXISTANT_ENV_VAR']
2586
def test_env_del_missing(self):
2587
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2588
def check_environment():
2589
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2590
self.check_popen_state = check_environment
2591
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2592
env_changes={'NON_EXISTANT_ENV_VAR':None})
2594
def test_working_dir(self):
2595
"""Test that we can specify the working dir for the child"""
2596
orig_getcwd = osutils.getcwd
2597
orig_chdir = os.chdir
2601
self.overrideAttr(os, 'chdir', chdir)
2604
self.overrideAttr(osutils, 'getcwd', getcwd)
2605
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2607
self.assertEqual(['foo', 'current'], chdirs)
2609
def test_get_bzr_path_with_cwd_bzrlib(self):
2610
self.get_source_path = lambda: ""
2611
self.overrideAttr(os.path, "isfile", lambda path: True)
2612
self.assertEqual(self.get_bzr_path(), "bzr")
2615
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2616
"""Tests that really need to do things with an external bzr."""
2618
def test_start_and_stop_bzr_subprocess_send_signal(self):
2619
"""finish_bzr_subprocess raises self.failureException if the retcode is
2620
not the expected one.
2622
self.disable_missing_extensions_warning()
2623
process = self.start_bzr_subprocess(['wait-until-signalled'],
2624
skip_if_plan_to_signal=True)
2625
self.assertEqual('running\n', process.stdout.readline())
2626
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2628
self.assertEqual('', result[0])
2629
self.assertEqual('bzr: interrupted\n', result[1])
2632
class TestSelftestFiltering(tests.TestCase):
2635
super(TestSelftestFiltering, self).setUp()
2636
self.suite = TestUtil.TestSuite()
2637
self.loader = TestUtil.TestLoader()
2638
self.suite.addTest(self.loader.loadTestsFromModule(
2639
sys.modules['bzrlib.tests.test_selftest']))
2640
self.all_names = _test_ids(self.suite)
2642
def test_condition_id_re(self):
2643
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2644
'test_condition_id_re')
2645
filtered_suite = tests.filter_suite_by_condition(
2646
self.suite, tests.condition_id_re('test_condition_id_re'))
2647
self.assertEqual([test_name], _test_ids(filtered_suite))
2649
def test_condition_id_in_list(self):
2650
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2651
'test_condition_id_in_list']
2652
id_list = tests.TestIdList(test_names)
2653
filtered_suite = tests.filter_suite_by_condition(
2654
self.suite, tests.condition_id_in_list(id_list))
2655
my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2656
re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2657
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2659
def test_condition_id_startswith(self):
2660
klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2661
start1 = klass + 'test_condition_id_starts'
2662
start2 = klass + 'test_condition_id_in'
2663
test_names = [ klass + 'test_condition_id_in_list',
2664
klass + 'test_condition_id_startswith',
2666
filtered_suite = tests.filter_suite_by_condition(
2667
self.suite, tests.condition_id_startswith([start1, start2]))
2668
self.assertEqual(test_names, _test_ids(filtered_suite))
2670
def test_condition_isinstance(self):
2671
filtered_suite = tests.filter_suite_by_condition(
2672
self.suite, tests.condition_isinstance(self.__class__))
2673
class_pattern = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2674
re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2675
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2677
def test_exclude_tests_by_condition(self):
2678
excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2679
'test_exclude_tests_by_condition')
2680
filtered_suite = tests.exclude_tests_by_condition(self.suite,
2681
lambda x:x.id() == excluded_name)
2682
self.assertEqual(len(self.all_names) - 1,
2683
filtered_suite.countTestCases())
2684
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2685
remaining_names = list(self.all_names)
2686
remaining_names.remove(excluded_name)
2687
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2689
def test_exclude_tests_by_re(self):
2690
self.all_names = _test_ids(self.suite)
2691
filtered_suite = tests.exclude_tests_by_re(self.suite,
2692
'exclude_tests_by_re')
2693
excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2694
'test_exclude_tests_by_re')
2695
self.assertEqual(len(self.all_names) - 1,
2696
filtered_suite.countTestCases())
2697
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2698
remaining_names = list(self.all_names)
2699
remaining_names.remove(excluded_name)
2700
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2702
def test_filter_suite_by_condition(self):
2703
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2704
'test_filter_suite_by_condition')
2705
filtered_suite = tests.filter_suite_by_condition(self.suite,
2706
lambda x:x.id() == test_name)
2707
self.assertEqual([test_name], _test_ids(filtered_suite))
2709
def test_filter_suite_by_re(self):
2710
filtered_suite = tests.filter_suite_by_re(self.suite,
2711
'test_filter_suite_by_r')
2712
filtered_names = _test_ids(filtered_suite)
2713
self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.'
2714
'TestSelftestFiltering.test_filter_suite_by_re'])
2716
def test_filter_suite_by_id_list(self):
2717
test_list = ['bzrlib.tests.test_selftest.'
2718
'TestSelftestFiltering.test_filter_suite_by_id_list']
2719
filtered_suite = tests.filter_suite_by_id_list(
2720
self.suite, tests.TestIdList(test_list))
2721
filtered_names = _test_ids(filtered_suite)
2724
['bzrlib.tests.test_selftest.'
2725
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2727
def test_filter_suite_by_id_startswith(self):
2728
# By design this test may fail if another test is added whose name also
2729
# begins with one of the start value used.
2730
klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2731
start1 = klass + 'test_filter_suite_by_id_starts'
2732
start2 = klass + 'test_filter_suite_by_id_li'
2733
test_list = [klass + 'test_filter_suite_by_id_list',
2734
klass + 'test_filter_suite_by_id_startswith',
2736
filtered_suite = tests.filter_suite_by_id_startswith(
2737
self.suite, [start1, start2])
2740
_test_ids(filtered_suite),
2743
def test_preserve_input(self):
2744
# NB: Surely this is something in the stdlib to do this?
2745
self.assertTrue(self.suite is tests.preserve_input(self.suite))
2746
self.assertTrue("@#$" is tests.preserve_input("@#$"))
2748
def test_randomize_suite(self):
2749
randomized_suite = tests.randomize_suite(self.suite)
2750
# randomizing should not add or remove test names.
2751
self.assertEqual(set(_test_ids(self.suite)),
2752
set(_test_ids(randomized_suite)))
2753
# Technically, this *can* fail, because random.shuffle(list) can be
2754
# equal to list. Trying multiple times just pushes the frequency back.
2755
# As its len(self.all_names)!:1, the failure frequency should be low
2756
# enough to ignore. RBC 20071021.
2757
# It should change the order.
2758
self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2759
# But not the length. (Possibly redundant with the set test, but not
2761
self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2763
def test_split_suit_by_condition(self):
2764
self.all_names = _test_ids(self.suite)
2765
condition = tests.condition_id_re('test_filter_suite_by_r')
2766
split_suite = tests.split_suite_by_condition(self.suite, condition)
2767
filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2768
'test_filter_suite_by_re')
2769
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2770
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2771
remaining_names = list(self.all_names)
2772
remaining_names.remove(filtered_name)
2773
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2775
def test_split_suit_by_re(self):
2776
self.all_names = _test_ids(self.suite)
2777
split_suite = tests.split_suite_by_re(self.suite,
2778
'test_filter_suite_by_r')
2779
filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2780
'test_filter_suite_by_re')
2781
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2782
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2783
remaining_names = list(self.all_names)
2784
remaining_names.remove(filtered_name)
2785
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2788
class TestCheckTreeShape(tests.TestCaseWithTransport):
2790
def test_check_tree_shape(self):
2791
files = ['a', 'b/', 'b/c']
2792
tree = self.make_branch_and_tree('.')
2793
self.build_tree(files)
2797
self.check_tree_shape(tree, files)
2802
class TestBlackboxSupport(tests.TestCase):
2803
"""Tests for testsuite blackbox features."""
2805
def test_run_bzr_failure_not_caught(self):
2806
# When we run bzr in blackbox mode, we want any unexpected errors to
2807
# propagate up to the test suite so that it can show the error in the
2808
# usual way, and we won't get a double traceback.
2809
e = self.assertRaises(
2811
self.run_bzr, ['assert-fail'])
2812
# make sure we got the real thing, not an error from somewhere else in
2813
# the test framework
2814
self.assertEquals('always fails', str(e))
2815
# check that there's no traceback in the test log
2816
self.assertNotContainsRe(self.get_log(), r'Traceback')
2818
def test_run_bzr_user_error_caught(self):
2819
# Running bzr in blackbox mode, normal/expected/user errors should be
2820
# caught in the regular way and turned into an error message plus exit
2822
transport_server = memory.MemoryServer()
2823
transport_server.start_server()
2824
self.addCleanup(transport_server.stop_server)
2825
url = transport_server.get_url()
2826
self.permit_url(url)
2827
out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2828
self.assertEqual(out, '')
2829
self.assertContainsRe(err,
2830
'bzr: ERROR: Not a branch: ".*nonexistantpath/".\n')
2833
class TestTestLoader(tests.TestCase):
2834
"""Tests for the test loader."""
2836
def _get_loader_and_module(self):
2837
"""Gets a TestLoader and a module with one test in it."""
2838
loader = TestUtil.TestLoader()
2840
class Stub(tests.TestCase):
2843
class MyModule(object):
2845
MyModule.a_class = Stub
2847
return loader, module
2849
def test_module_no_load_tests_attribute_loads_classes(self):
2850
loader, module = self._get_loader_and_module()
2851
self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
2853
def test_module_load_tests_attribute_gets_called(self):
2854
loader, module = self._get_loader_and_module()
2855
# 'self' is here because we're faking the module with a class. Regular
2856
# load_tests do not need that :)
2857
def load_tests(self, standard_tests, module, loader):
2858
result = loader.suiteClass()
2859
for test in tests.iter_suite_tests(standard_tests):
2860
result.addTests([test, test])
2862
# add a load_tests() method which multiplies the tests from the module.
2863
module.__class__.load_tests = load_tests
2864
self.assertEqual(2, loader.loadTestsFromModule(module).countTestCases())
2866
def test_load_tests_from_module_name_smoke_test(self):
2867
loader = TestUtil.TestLoader()
2868
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2869
self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
2872
def test_load_tests_from_module_name_with_bogus_module_name(self):
2873
loader = TestUtil.TestLoader()
2874
self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
2877
class TestTestIdList(tests.TestCase):
2879
def _create_id_list(self, test_list):
2880
return tests.TestIdList(test_list)
2882
def _create_suite(self, test_id_list):
2884
class Stub(tests.TestCase):
2888
def _create_test_id(id):
2891
suite = TestUtil.TestSuite()
2892
for id in test_id_list:
2893
t = Stub('test_foo')
2894
t.id = _create_test_id(id)
2898
def _test_ids(self, test_suite):
2899
"""Get the ids for the tests in a test suite."""
2900
return [t.id() for t in tests.iter_suite_tests(test_suite)]
2902
def test_empty_list(self):
2903
id_list = self._create_id_list([])
2904
self.assertEquals({}, id_list.tests)
2905
self.assertEquals({}, id_list.modules)
2907
def test_valid_list(self):
2908
id_list = self._create_id_list(
2909
['mod1.cl1.meth1', 'mod1.cl1.meth2',
2910
'mod1.func1', 'mod1.cl2.meth2',
2912
'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
2914
self.assertTrue(id_list.refers_to('mod1'))
2915
self.assertTrue(id_list.refers_to('mod1.submod1'))
2916
self.assertTrue(id_list.refers_to('mod1.submod2'))
2917
self.assertTrue(id_list.includes('mod1.cl1.meth1'))
2918
self.assertTrue(id_list.includes('mod1.submod1'))
2919
self.assertTrue(id_list.includes('mod1.func1'))
2921
def test_bad_chars_in_params(self):
2922
id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
2923
self.assertTrue(id_list.refers_to('mod1'))
2924
self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
2926
def test_module_used(self):
2927
id_list = self._create_id_list(['mod.class.meth'])
2928
self.assertTrue(id_list.refers_to('mod'))
2929
self.assertTrue(id_list.refers_to('mod.class'))
2930
self.assertTrue(id_list.refers_to('mod.class.meth'))
2932
def test_test_suite_matches_id_list_with_unknown(self):
2933
loader = TestUtil.TestLoader()
2934
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2935
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',
2937
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
2938
self.assertEquals(['bogus'], not_found)
2939
self.assertEquals([], duplicates)
2941
def test_suite_matches_id_list_with_duplicates(self):
2942
loader = TestUtil.TestLoader()
2943
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2944
dupes = loader.suiteClass()
2945
for test in tests.iter_suite_tests(suite):
2947
dupes.addTest(test) # Add it again
2949
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',]
2950
not_found, duplicates = tests.suite_matches_id_list(
2952
self.assertEquals([], not_found)
2953
self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
2957
class TestTestSuite(tests.TestCase):
2959
def test__test_suite_testmod_names(self):
2960
# Test that a plausible list of test module names are returned
2961
# by _test_suite_testmod_names.
2962
test_list = tests._test_suite_testmod_names()
2964
'bzrlib.tests.blackbox',
2965
'bzrlib.tests.per_transport',
2966
'bzrlib.tests.test_selftest',
2970
def test__test_suite_modules_to_doctest(self):
2971
# Test that a plausible list of modules to doctest is returned
2972
# by _test_suite_modules_to_doctest.
2973
test_list = tests._test_suite_modules_to_doctest()
2975
# When docstrings are stripped, there are no modules to doctest
2976
self.assertEqual([], test_list)
2983
def test_test_suite(self):
2984
# test_suite() loads the entire test suite to operate. To avoid this
2985
# overhead, and yet still be confident that things are happening,
2986
# we temporarily replace two functions used by test_suite with
2987
# test doubles that supply a few sample tests to load, and check they
2990
def testmod_names():
2991
calls.append("testmod_names")
2993
'bzrlib.tests.blackbox.test_branch',
2994
'bzrlib.tests.per_transport',
2995
'bzrlib.tests.test_selftest',
2997
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2999
calls.append("modules_to_doctest")
3002
return ['bzrlib.timestamp']
3003
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3004
expected_test_list = [
3006
'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
3007
('bzrlib.tests.per_transport.TransportTests'
3008
'.test_abspath(LocalTransport,LocalURLServer)'),
3009
'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
3010
# plugins can't be tested that way since selftest may be run with
3013
if __doc__ is not None:
3014
expected_test_list.extend([
3015
# modules_to_doctest
3016
'bzrlib.timestamp.format_highres_date',
3018
suite = tests.test_suite()
3019
self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3021
self.assertSubset(expected_test_list, _test_ids(suite))
3023
def test_test_suite_list_and_start(self):
3024
# We cannot test this at the same time as the main load, because we want
3025
# to know that starting_with == None works. So a second load is
3026
# incurred - note that the starting_with parameter causes a partial load
3027
# rather than a full load so this test should be pretty quick.
3028
test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
3029
suite = tests.test_suite(test_list,
3030
['bzrlib.tests.test_selftest.TestTestSuite'])
3031
# test_test_suite_list_and_start is not included
3032
self.assertEquals(test_list, _test_ids(suite))
3035
class TestLoadTestIdList(tests.TestCaseInTempDir):
3037
def _create_test_list_file(self, file_name, content):
3038
fl = open(file_name, 'wt')
3042
def test_load_unknown(self):
3043
self.assertRaises(errors.NoSuchFile,
3044
tests.load_test_id_list, 'i_do_not_exist')
3046
def test_load_test_list(self):
3047
test_list_fname = 'test.list'
3048
self._create_test_list_file(test_list_fname,
3049
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3050
tlist = tests.load_test_id_list(test_list_fname)
3051
self.assertEquals(2, len(tlist))
3052
self.assertEquals('mod1.cl1.meth1', tlist[0])
3053
self.assertEquals('mod2.cl2.meth2', tlist[1])
3055
def test_load_dirty_file(self):
3056
test_list_fname = 'test.list'
3057
self._create_test_list_file(test_list_fname,
3058
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3060
tlist = tests.load_test_id_list(test_list_fname)
3061
self.assertEquals(4, len(tlist))
3062
self.assertEquals('mod1.cl1.meth1', tlist[0])
3063
self.assertEquals('', tlist[1])
3064
self.assertEquals('mod2.cl2.meth2', tlist[2])
3065
self.assertEquals('bar baz', tlist[3])
3068
class TestFilteredByModuleTestLoader(tests.TestCase):
3070
def _create_loader(self, test_list):
3071
id_filter = tests.TestIdList(test_list)
3072
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3075
def test_load_tests(self):
3076
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3077
loader = self._create_loader(test_list)
3078
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3079
self.assertEquals(test_list, _test_ids(suite))
3081
def test_exclude_tests(self):
3082
test_list = ['bogus']
3083
loader = self._create_loader(test_list)
3084
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3085
self.assertEquals([], _test_ids(suite))
3088
class TestFilteredByNameStartTestLoader(tests.TestCase):
3090
def _create_loader(self, name_start):
3091
def needs_module(name):
3092
return name.startswith(name_start) or name_start.startswith(name)
3093
loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3096
def test_load_tests(self):
3097
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3098
loader = self._create_loader('bzrlib.tests.test_samp')
3100
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3101
self.assertEquals(test_list, _test_ids(suite))
3103
def test_load_tests_inside_module(self):
3104
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3105
loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
3107
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3108
self.assertEquals(test_list, _test_ids(suite))
3110
def test_exclude_tests(self):
3111
test_list = ['bogus']
3112
loader = self._create_loader('bogus')
3114
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3115
self.assertEquals([], _test_ids(suite))
3118
class TestTestPrefixRegistry(tests.TestCase):
3120
def _get_registry(self):
3121
tp_registry = tests.TestPrefixAliasRegistry()
3124
def test_register_new_prefix(self):
3125
tpr = self._get_registry()
3126
tpr.register('foo', 'fff.ooo.ooo')
3127
self.assertEquals('fff.ooo.ooo', tpr.get('foo'))
3129
def test_register_existing_prefix(self):
3130
tpr = self._get_registry()
3131
tpr.register('bar', 'bbb.aaa.rrr')
3132
tpr.register('bar', 'bBB.aAA.rRR')
3133
self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3134
self.assertThat(self.get_log(),
3135
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3138
def test_get_unknown_prefix(self):
3139
tpr = self._get_registry()
3140
self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3142
def test_resolve_prefix(self):
3143
tpr = self._get_registry()
3144
tpr.register('bar', 'bb.aa.rr')
3145
self.assertEquals('bb.aa.rr', tpr.resolve_alias('bar'))
3147
def test_resolve_unknown_alias(self):
3148
tpr = self._get_registry()
3149
self.assertRaises(errors.BzrCommandError,
3150
tpr.resolve_alias, 'I am not a prefix')
3152
def test_predefined_prefixes(self):
3153
tpr = tests.test_prefix_alias_registry
3154
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
3155
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
3156
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
3157
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
3158
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3159
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3162
class TestThreadLeakDetection(tests.TestCase):
3163
"""Ensure when tests leak threads we detect and report it"""
3165
class LeakRecordingResult(tests.ExtendedTestResult):
3167
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3169
def _report_thread_leak(self, test, leaks, alive):
3170
self.leaks.append((test, leaks))
3172
def test_testcase_without_addCleanups(self):
3173
"""Check old TestCase instances don't break with leak detection"""
3174
class Test(unittest.TestCase):
3177
result = self.LeakRecordingResult()
3179
result.startTestRun()
3181
result.stopTestRun()
3182
self.assertEqual(result._tests_leaking_threads_count, 0)
3183
self.assertEqual(result.leaks, [])
3185
def test_thread_leak(self):
3186
"""Ensure a thread that outlives the running of a test is reported
3188
Uses a thread that blocks on an event, and is started by the inner
3189
test case. As the thread outlives the inner case's run, it should be
3190
detected as a leak, but the event is then set so that the thread can
3191
be safely joined in cleanup so it's not leaked for real.
3193
event = threading.Event()
3194
thread = threading.Thread(name="Leaker", target=event.wait)
3195
class Test(tests.TestCase):
3196
def test_leak(self):
3198
result = self.LeakRecordingResult()
3199
test = Test("test_leak")
3200
self.addCleanup(thread.join)
3201
self.addCleanup(event.set)
3202
result.startTestRun()
3204
result.stopTestRun()
3205
self.assertEqual(result._tests_leaking_threads_count, 1)
3206
self.assertEqual(result._first_thread_leaker_id, test.id())
3207
self.assertEqual(result.leaks, [(test, set([thread]))])
3208
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3210
def test_multiple_leaks(self):
3211
"""Check multiple leaks are blamed on the test cases at fault
3213
Same concept as the previous test, but has one inner test method that
3214
leaks two threads, and one that doesn't leak at all.
3216
event = threading.Event()
3217
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3218
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3219
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3220
class Test(tests.TestCase):
3221
def test_first_leak(self):
3223
def test_second_no_leak(self):
3225
def test_third_leak(self):
3228
result = self.LeakRecordingResult()
3229
first_test = Test("test_first_leak")
3230
third_test = Test("test_third_leak")
3231
self.addCleanup(thread_a.join)
3232
self.addCleanup(thread_b.join)
3233
self.addCleanup(thread_c.join)
3234
self.addCleanup(event.set)
3235
result.startTestRun()
3237
[first_test, Test("test_second_no_leak"), third_test]
3239
result.stopTestRun()
3240
self.assertEqual(result._tests_leaking_threads_count, 2)
3241
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3242
self.assertEqual(result.leaks, [
3243
(first_test, set([thread_b])),
3244
(third_test, set([thread_a, thread_c]))])
3245
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3248
class TestPostMortemDebugging(tests.TestCase):
3249
"""Check post mortem debugging works when tests fail or error"""
3251
class TracebackRecordingResult(tests.ExtendedTestResult):
3253
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3254
self.postcode = None
3255
def _post_mortem(self, tb=None):
3256
"""Record the code object at the end of the current traceback"""
3257
tb = tb or sys.exc_info()[2]
3260
while next is not None:
3263
self.postcode = tb.tb_frame.f_code
3264
def report_error(self, test, err):
3266
def report_failure(self, test, err):
3269
def test_location_unittest_error(self):
3270
"""Needs right post mortem traceback with erroring unittest case"""
3271
class Test(unittest.TestCase):
3274
result = self.TracebackRecordingResult()
3276
self.assertEqual(result.postcode, Test.runTest.func_code)
3278
def test_location_unittest_failure(self):
3279
"""Needs right post mortem traceback with failing unittest case"""
3280
class Test(unittest.TestCase):
3282
raise self.failureException
3283
result = self.TracebackRecordingResult()
3285
self.assertEqual(result.postcode, Test.runTest.func_code)
3287
def test_location_bt_error(self):
3288
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3289
class Test(tests.TestCase):
3290
def test_error(self):
3292
result = self.TracebackRecordingResult()
3293
Test("test_error").run(result)
3294
self.assertEqual(result.postcode, Test.test_error.func_code)
3296
def test_location_bt_failure(self):
3297
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3298
class Test(tests.TestCase):
3299
def test_failure(self):
3300
raise self.failureException
3301
result = self.TracebackRecordingResult()
3302
Test("test_failure").run(result)
3303
self.assertEqual(result.postcode, Test.test_failure.func_code)
3305
def test_env_var_triggers_post_mortem(self):
3306
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3308
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3309
post_mortem_calls = []
3310
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3311
self.overrideEnv('BZR_TEST_PDB', None)
3312
result._post_mortem(1)
3313
self.overrideEnv('BZR_TEST_PDB', 'on')
3314
result._post_mortem(2)
3315
self.assertEqual([2], post_mortem_calls)
3318
class TestRunSuite(tests.TestCase):
3320
def test_runner_class(self):
3321
"""run_suite accepts and uses a runner_class keyword argument."""
3322
class Stub(tests.TestCase):
3325
suite = Stub("test_foo")
3327
class MyRunner(tests.TextTestRunner):
3328
def run(self, test):
3330
return tests.ExtendedTestResult(self.stream, self.descriptions,
3332
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3333
self.assertLength(1, calls)
3336
class _Selftest(object):
3337
"""Mixin for tests needing full selftest output"""
3339
def _inject_stream_into_subunit(self, stream):
3340
"""To be overridden by subclasses that run tests out of process"""
3342
def _run_selftest(self, **kwargs):
3344
self._inject_stream_into_subunit(sio)
3345
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3346
return sio.getvalue()
3349
class _ForkedSelftest(_Selftest):
3350
"""Mixin for tests needing full selftest output with forked children"""
3352
_test_needs_features = [features.subunit]
3354
def _inject_stream_into_subunit(self, stream):
3355
"""Monkey-patch subunit so the extra output goes to stream not stdout
3357
Some APIs need rewriting so this kind of bogus hackery can be replaced
3358
by passing the stream param from run_tests down into ProtocolTestCase.
3360
from subunit import ProtocolTestCase
3361
_original_init = ProtocolTestCase.__init__
3362
def _init_with_passthrough(self, *args, **kwargs):
3363
_original_init(self, *args, **kwargs)
3364
self._passthrough = stream
3365
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3367
def _run_selftest(self, **kwargs):
3368
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3369
if getattr(os, "fork", None) is None:
3370
raise tests.TestNotApplicable("Platform doesn't support forking")
3371
# Make sure the fork code is actually invoked by claiming two cores
3372
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3373
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3374
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3377
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3378
"""Check operation of --parallel=fork selftest option"""
3380
def test_error_in_child_during_fork(self):
3381
"""Error in a forked child during test setup should get reported"""
3382
class Test(tests.TestCase):
3383
def testMethod(self):
3385
# We don't care what, just break something that a child will run
3386
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3387
out = self._run_selftest(test_suite_factory=Test)
3388
# Lines from the tracebacks of the two child processes may be mixed
3389
# together due to the way subunit parses and forwards the streams,
3390
# so permit extra lines between each part of the error output.
3391
self.assertContainsRe(out,
3394
".+ in fork_for_tests\n"
3396
"\s*workaround_zealous_crypto_random\(\)\n"
3401
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3402
"""Check a test case still alive after being run emits a warning"""
3404
class Test(tests.TestCase):
3405
def test_pass(self):
3407
def test_self_ref(self):
3408
self.also_self = self.test_self_ref
3409
def test_skip(self):
3410
self.skip("Don't need")
3412
def _get_suite(self):
3413
return TestUtil.TestSuite([
3414
self.Test("test_pass"),
3415
self.Test("test_self_ref"),
3416
self.Test("test_skip"),
3419
def _run_selftest_with_suite(self, **kwargs):
3420
old_flags = tests.selftest_debug_flags
3421
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3422
gc_on = gc.isenabled()
3426
output = self._run_selftest(test_suite_factory=self._get_suite,
3431
tests.selftest_debug_flags = old_flags
3432
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3433
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3436
def test_testsuite(self):
3437
self._run_selftest_with_suite()
3439
def test_pattern(self):
3440
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3441
self.assertNotContainsRe(out, "test_skip")
3443
def test_exclude_pattern(self):
3444
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3445
self.assertNotContainsRe(out, "test_skip")
3447
def test_random_seed(self):
3448
self._run_selftest_with_suite(random_seed="now")
3450
def test_matching_tests_first(self):
3451
self._run_selftest_with_suite(matching_tests_first=True,
3452
pattern="test_self_ref$")
3454
def test_starting_with_and_exclude(self):
3455
out = self._run_selftest_with_suite(starting_with=["bt."],
3456
exclude_pattern="test_skip$")
3457
self.assertNotContainsRe(out, "test_skip")
3459
def test_additonal_decorator(self):
3460
out = self._run_selftest_with_suite(
3461
suite_decorators=[tests.TestDecorator])
3464
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3465
"""Check warnings from tests staying alive are emitted with subunit"""
3467
_test_needs_features = [features.subunit]
3469
def _run_selftest_with_suite(self, **kwargs):
3470
return TestUncollectedWarnings._run_selftest_with_suite(self,
3471
runner_class=tests.SubUnitBzrRunner, **kwargs)
3474
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3475
"""Check warnings from tests staying alive are emitted when forking"""
3478
class TestEnvironHandling(tests.TestCase):
3480
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3481
self.assertFalse('MYVAR' in os.environ)
3482
self.overrideEnv('MYVAR', '42')
3483
# We use an embedded test to make sure we fix the _captureVar bug
3484
class Test(tests.TestCase):
3486
# The first call save the 42 value
3487
self.overrideEnv('MYVAR', None)
3488
self.assertEquals(None, os.environ.get('MYVAR'))
3489
# Make sure we can call it twice
3490
self.overrideEnv('MYVAR', None)
3491
self.assertEquals(None, os.environ.get('MYVAR'))
3493
result = tests.TextTestResult(output, 0, 1)
3494
Test('test_me').run(result)
3495
if not result.wasStrictlySuccessful():
3496
self.fail(output.getvalue())
3497
# We get our value back
3498
self.assertEquals('42', os.environ.get('MYVAR'))
3501
class TestIsolatedEnv(tests.TestCase):
3502
"""Test isolating tests from os.environ.
3504
Since we use tests that are already isolated from os.environ a bit of care
3505
should be taken when designing the tests to avoid bootstrap side-effects.
3506
The tests start an already clean os.environ which allow doing valid
3507
assertions about which variables are present or not and design tests around
3511
class ScratchMonkey(tests.TestCase):
3516
def test_basics(self):
3517
# Make sure we know the definition of BZR_HOME: not part of os.environ
3518
# for tests.TestCase.
3519
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3520
self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
3521
# Being part of isolated_environ, BZR_HOME should not appear here
3522
self.assertFalse('BZR_HOME' in os.environ)
3523
# Make sure we know the definition of LINES: part of os.environ for
3525
self.assertTrue('LINES' in tests.isolated_environ)
3526
self.assertEquals('25', tests.isolated_environ['LINES'])
3527
self.assertEquals('25', os.environ['LINES'])
3529
def test_injecting_unknown_variable(self):
3530
# BZR_HOME is known to be absent from os.environ
3531
test = self.ScratchMonkey('test_me')
3532
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3533
self.assertEquals('foo', os.environ['BZR_HOME'])
3534
tests.restore_os_environ(test)
3535
self.assertFalse('BZR_HOME' in os.environ)
3537
def test_injecting_known_variable(self):
3538
test = self.ScratchMonkey('test_me')
3539
# LINES is known to be present in os.environ
3540
tests.override_os_environ(test, {'LINES': '42'})
3541
self.assertEquals('42', os.environ['LINES'])
3542
tests.restore_os_environ(test)
3543
self.assertEquals('25', os.environ['LINES'])
3545
def test_deleting_variable(self):
3546
test = self.ScratchMonkey('test_me')
3547
# LINES is known to be present in os.environ
3548
tests.override_os_environ(test, {'LINES': None})
3549
self.assertTrue('LINES' not in os.environ)
3550
tests.restore_os_environ(test)
3551
self.assertEquals('25', os.environ['LINES'])
3554
class TestDocTestSuiteIsolation(tests.TestCase):
3555
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3557
Since tests.TestCase alreay provides an isolation from os.environ, we use
3558
the clean environment as a base for testing. To precisely capture the
3559
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3562
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3563
not `os.environ` so each test overrides it to suit its needs.
3567
def get_doctest_suite_for_string(self, klass, string):
3568
class Finder(doctest.DocTestFinder):
3570
def find(*args, **kwargs):
3571
test = doctest.DocTestParser().get_doctest(
3572
string, {}, 'foo', 'foo.py', 0)
3575
suite = klass(test_finder=Finder())
3578
def run_doctest_suite_for_string(self, klass, string):
3579
suite = self.get_doctest_suite_for_string(klass, string)
3581
result = tests.TextTestResult(output, 0, 1)
3583
return result, output
3585
def assertDocTestStringSucceds(self, klass, string):
3586
result, output = self.run_doctest_suite_for_string(klass, string)
3587
if not result.wasStrictlySuccessful():
3588
self.fail(output.getvalue())
3590
def assertDocTestStringFails(self, klass, string):
3591
result, output = self.run_doctest_suite_for_string(klass, string)
3592
if result.wasStrictlySuccessful():
3593
self.fail(output.getvalue())
3595
def test_injected_variable(self):
3596
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3599
>>> os.environ['LINES']
3602
# doctest.DocTestSuite fails as it sees '25'
3603
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3604
# tests.DocTestSuite sees '42'
3605
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3607
def test_deleted_variable(self):
3608
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3611
>>> os.environ.get('LINES')
3613
# doctest.DocTestSuite fails as it sees '25'
3614
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3615
# tests.DocTestSuite sees None
3616
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3619
class TestSelftestExcludePatterns(tests.TestCase):
3622
super(TestSelftestExcludePatterns, self).setUp()
3623
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3625
def suite_factory(self, keep_only=None, starting_with=None):
3626
"""A test suite factory with only a few tests."""
3627
class Test(tests.TestCase):
3629
# We don't need the full class path
3630
return self._testMethodName
3637
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3639
def assertTestList(self, expected, *selftest_args):
3640
# We rely on setUp installing the right test suite factory so we can
3641
# test at the command level without loading the whole test suite
3642
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3643
actual = out.splitlines()
3644
self.assertEquals(expected, actual)
3646
def test_full_list(self):
3647
self.assertTestList(['a', 'b', 'c'])
3649
def test_single_exclude(self):
3650
self.assertTestList(['b', 'c'], '-x', 'a')
3652
def test_mutiple_excludes(self):
3653
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3656
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3658
_test_needs_features = [features.subunit]
3661
super(TestCounterHooks, self).setUp()
3662
class Test(tests.TestCase):
3665
super(Test, self).setUp()
3666
self.hooks = hooks.Hooks()
3667
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3668
self.install_counter_hook(self.hooks, 'myhook')
3673
def run_hook_once(self):
3674
for hook in self.hooks['myhook']:
3677
self.test_class = Test
3679
def assertHookCalls(self, expected_calls, test_name):
3680
test = self.test_class(test_name)
3681
result = unittest.TestResult()
3683
self.assertTrue(hasattr(test, '_counters'))
3684
self.assertTrue(test._counters.has_key('myhook'))
3685
self.assertEquals(expected_calls, test._counters['myhook'])
3687
def test_no_hook(self):
3688
self.assertHookCalls(0, 'no_hook')
3690
def test_run_hook_once(self):
3691
tt = features.testtools
3692
if tt.module.__version__ < (0, 9, 8):
3693
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3694
self.assertHookCalls(1, 'run_hook_once')