13
12
# You should have received a copy of the GNU General Public License
14
13
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
14
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
16
"""Tests for the test framework."""
19
from cStringIO import StringIO
19
from StringIO import StringIO
30
from testtools import (
31
ExtendedToOriginalDecorator,
34
from testtools.content import Content
35
from testtools.content_type import ContentType
36
from testtools.matchers import (
40
import testtools.testresult.doubles
25
from bzrlib import osutils
61
from bzrlib.repofmt import (
64
from bzrlib.symbol_versioning import (
27
from bzrlib.progress import _BaseProgressBar
69
28
from bzrlib.tests import (
75
from bzrlib.trace import note, mutter
76
from bzrlib.transport import memory
79
def _test_ids(test_suite):
80
"""Get the ids for the tests in a test suite."""
81
return [t.id() for t in tests.iter_suite_tests(test_suite)]
84
class MetaTestLog(tests.TestCase):
32
TestCaseWithTransport,
37
from bzrlib.tests.TestUtil import _load_module_by_name
38
import bzrlib.errors as errors
39
from bzrlib import symbol_versioning
40
from bzrlib.trace import note
43
class SelftestTests(TestCase):
45
def test_import_tests(self):
46
mod = _load_module_by_name('bzrlib.tests.test_selftest')
47
self.assertEqual(mod.SelftestTests, SelftestTests)
49
def test_import_test_failure(self):
50
self.assertRaises(ImportError,
55
class MetaTestLog(TestCase):
86
57
def test_logging(self):
87
58
"""Test logs are captured when a test fails."""
88
59
self.log('a test message')
89
details = self.getDetails()
91
self.assertThat(log.content_type, Equals(ContentType(
92
"text", "plain", {"charset": "utf8"})))
93
self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
94
self.assertThat(self.get_log(),
95
DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
98
class TestTreeShape(tests.TestCaseInTempDir):
60
self._log_file.flush()
61
self.assertContainsRe(self._get_log(), 'a test message\n')
64
class TestTreeShape(TestCaseInTempDir):
100
66
def test_unicode_paths(self):
101
self.requireFeature(features.UnicodeFilenameFeature)
103
67
filename = u'hell\u00d8'
104
self.build_tree_contents([(filename, 'contents of hello')])
105
self.assertPathExists(filename)
108
class TestClassesAvailable(tests.TestCase):
109
"""As a convenience we expose Test* classes from bzrlib.tests"""
111
def test_test_case(self):
112
from bzrlib.tests import TestCase
114
def test_test_loader(self):
115
from bzrlib.tests import TestLoader
117
def test_test_suite(self):
118
from bzrlib.tests import TestSuite
121
class TestTransportScenarios(tests.TestCase):
69
self.build_tree_contents([(filename, 'contents of hello')])
70
except UnicodeEncodeError:
71
raise TestSkipped("can't build unicode working tree in "
72
"filesystem encoding %s" % sys.getfilesystemencoding())
73
self.failUnlessExists(filename)
76
class TestTransportProviderAdapter(TestCase):
122
77
"""A group of tests that test the transport implementation adaption core.
124
This is a meta test that the tests are applied to all available
79
This is a meta test that the tests are applied to all available
127
This will be generalised in the future which is why it is in this
82
This will be generalised in the future which is why it is in this
128
83
test file even though it is specific to transport tests at the moment.
131
86
def test_get_transport_permutations(self):
132
# this checks that get_test_permutations defined by the module is
133
# called by the get_transport_test_permutations function.
87
# this checks that we the module get_test_permutations call
88
# is made by the adapter get_transport_test_permitations method.
134
89
class MockModule(object):
135
90
def get_test_permutations(self):
136
91
return sample_permutation
137
92
sample_permutation = [(1,2), (3,4)]
138
from bzrlib.tests.per_transport import get_transport_test_permutations
93
from bzrlib.transport import TransportTestProviderAdapter
94
adapter = TransportTestProviderAdapter()
139
95
self.assertEqual(sample_permutation,
140
get_transport_test_permutations(MockModule()))
96
adapter.get_transport_test_permutations(MockModule()))
142
def test_scenarios_include_all_modules(self):
143
# this checks that the scenario generator returns as many permutations
144
# as there are in all the registered transport modules - we assume if
145
# this matches its probably doing the right thing especially in
146
# combination with the tests for setting the right classes below.
147
from bzrlib.tests.per_transport import transport_test_permutations
148
from bzrlib.transport import _get_transport_modules
98
def test_adapter_checks_all_modules(self):
99
# this checks that the adapter returns as many permurtations as
100
# there are in all the registered# transport modules for there
101
# - we assume if this matches its probably doing the right thing
102
# especially in combination with the tests for setting the right
104
from bzrlib.transport import (TransportTestProviderAdapter,
105
_get_transport_modules
149
107
modules = _get_transport_modules()
150
108
permutation_count = 0
151
109
for module in modules:
153
permutation_count += len(reduce(getattr,
111
permutation_count += len(reduce(getattr,
154
112
(module + ".get_test_permutations").split('.')[1:],
155
113
__import__(module))())
156
114
except errors.DependencyNotPresent:
158
scenarios = transport_test_permutations()
159
self.assertEqual(permutation_count, len(scenarios))
116
input_test = TestTransportProviderAdapter(
117
"test_adapter_sets_transport_class")
118
adapter = TransportTestProviderAdapter()
119
self.assertEqual(permutation_count,
120
len(list(iter(adapter.adapt(input_test)))))
161
def test_scenarios_include_transport_class(self):
122
def test_adapter_sets_transport_class(self):
123
# Check that the test adapter inserts a transport and server into the
162
126
# This test used to know about all the possible transports and the
163
127
# order they were returned but that seems overly brittle (mbp
165
from bzrlib.tests.per_transport import transport_test_permutations
166
scenarios = transport_test_permutations()
129
input_test = TestTransportProviderAdapter(
130
"test_adapter_sets_transport_class")
131
from bzrlib.transport import TransportTestProviderAdapter
132
suite = TransportTestProviderAdapter().adapt(input_test)
133
tests = list(iter(suite))
134
self.assertTrue(len(tests) > 6)
167
135
# there are at least that many builtin transports
168
self.assertTrue(len(scenarios) > 6)
169
one_scenario = scenarios[0]
170
self.assertIsInstance(one_scenario[0], str)
171
self.assertTrue(issubclass(one_scenario[1]["transport_class"],
137
self.assertTrue(issubclass(one_test.transport_class,
172
138
bzrlib.transport.Transport))
173
self.assertTrue(issubclass(one_scenario[1]["transport_server"],
139
self.assertTrue(issubclass(one_test.transport_server,
174
140
bzrlib.transport.Server))
177
class TestBranchScenarios(tests.TestCase):
143
class TestBranchProviderAdapter(TestCase):
144
"""A group of tests that test the branch implementation test adapter."""
179
def test_scenarios(self):
146
def test_adapted_tests(self):
180
147
# check that constructor parameters are passed through to the adapted
182
from bzrlib.tests.per_branch import make_scenarios
149
from bzrlib.branch import BranchTestProviderAdapter
150
input_test = TestBranchProviderAdapter(
151
"test_adapted_tests")
185
154
formats = [("c", "C"), ("d", "D")]
186
scenarios = make_scenarios(server1, server2, formats)
187
self.assertEqual(2, len(scenarios))
190
{'branch_format': 'c',
191
'bzrdir_format': 'C',
192
'transport_readonly_server': 'b',
193
'transport_server': 'a'}),
195
{'branch_format': 'd',
196
'bzrdir_format': 'D',
197
'transport_readonly_server': 'b',
198
'transport_server': 'a'})],
202
class TestBzrDirScenarios(tests.TestCase):
204
def test_scenarios(self):
155
adapter = BranchTestProviderAdapter(server1, server2, formats)
156
suite = adapter.adapt(input_test)
157
tests = list(iter(suite))
158
self.assertEqual(2, len(tests))
159
self.assertEqual(tests[0].branch_format, formats[0][0])
160
self.assertEqual(tests[0].bzrdir_format, formats[0][1])
161
self.assertEqual(tests[0].transport_server, server1)
162
self.assertEqual(tests[0].transport_readonly_server, server2)
163
self.assertEqual(tests[1].branch_format, formats[1][0])
164
self.assertEqual(tests[1].bzrdir_format, formats[1][1])
165
self.assertEqual(tests[1].transport_server, server1)
166
self.assertEqual(tests[1].transport_readonly_server, server2)
169
class TestBzrDirProviderAdapter(TestCase):
170
"""A group of tests that test the bzr dir implementation test adapter."""
172
def test_adapted_tests(self):
205
173
# check that constructor parameters are passed through to the adapted
207
from bzrlib.tests.per_controldir import make_scenarios
175
from bzrlib.bzrdir import BzrDirTestProviderAdapter
176
input_test = TestBzrDirProviderAdapter(
177
"test_adapted_tests")
211
180
formats = ["c", "d"]
212
scenarios = make_scenarios(vfs_factory, server1, server2, formats)
215
{'bzrdir_format': 'c',
216
'transport_readonly_server': 'b',
217
'transport_server': 'a',
218
'vfs_transport_factory': 'v'}),
220
{'bzrdir_format': 'd',
221
'transport_readonly_server': 'b',
222
'transport_server': 'a',
223
'vfs_transport_factory': 'v'})],
227
class TestRepositoryScenarios(tests.TestCase):
229
def test_formats_to_scenarios(self):
230
from bzrlib.tests.per_repository import formats_to_scenarios
231
formats = [("(c)", remote.RemoteRepositoryFormat()),
232
("(d)", repository.format_registry.get(
233
'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
234
no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
236
vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
237
vfs_transport_factory="vfs")
238
# no_vfs generate scenarios without vfs_transport_factory
240
('RemoteRepositoryFormat(c)',
241
{'bzrdir_format': remote.RemoteBzrDirFormat(),
242
'repository_format': remote.RemoteRepositoryFormat(),
243
'transport_readonly_server': 'readonly',
244
'transport_server': 'server'}),
245
('RepositoryFormat2a(d)',
246
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
247
'repository_format': groupcompress_repo.RepositoryFormat2a(),
248
'transport_readonly_server': 'readonly',
249
'transport_server': 'server'})]
250
self.assertEqual(expected, no_vfs_scenarios)
252
('RemoteRepositoryFormat(c)',
253
{'bzrdir_format': remote.RemoteBzrDirFormat(),
254
'repository_format': remote.RemoteRepositoryFormat(),
255
'transport_readonly_server': 'readonly',
256
'transport_server': 'server',
257
'vfs_transport_factory': 'vfs'}),
258
('RepositoryFormat2a(d)',
259
{'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
260
'repository_format': groupcompress_repo.RepositoryFormat2a(),
261
'transport_readonly_server': 'readonly',
262
'transport_server': 'server',
263
'vfs_transport_factory': 'vfs'})],
267
class TestTestScenarioApplication(tests.TestCase):
268
"""Tests for the test adaption facilities."""
270
def test_apply_scenario(self):
271
from bzrlib.tests import apply_scenario
272
input_test = TestTestScenarioApplication("test_apply_scenario")
273
# setup two adapted tests
274
adapted_test1 = apply_scenario(input_test,
276
{"bzrdir_format":"bzr_format",
277
"repository_format":"repo_fmt",
278
"transport_server":"transport_server",
279
"transport_readonly_server":"readonly-server"}))
280
adapted_test2 = apply_scenario(input_test,
281
("new id 2", {"bzrdir_format":None}))
282
# input_test should have been altered.
283
self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
284
# the new tests are mutually incompatible, ensuring it has
285
# made new ones, and unspecified elements in the scenario
286
# should not have been altered.
287
self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
288
self.assertEqual("repo_fmt", adapted_test1.repository_format)
289
self.assertEqual("transport_server", adapted_test1.transport_server)
290
self.assertEqual("readonly-server",
291
adapted_test1.transport_readonly_server)
293
"bzrlib.tests.test_selftest.TestTestScenarioApplication."
294
"test_apply_scenario(new id)",
296
self.assertEqual(None, adapted_test2.bzrdir_format)
298
"bzrlib.tests.test_selftest.TestTestScenarioApplication."
299
"test_apply_scenario(new id 2)",
303
class TestInterRepositoryScenarios(tests.TestCase):
305
def test_scenarios(self):
306
# check that constructor parameters are passed through to the adapted
308
from bzrlib.tests.per_interrepository import make_scenarios
311
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
312
scenarios = make_scenarios(server1, server2, formats)
315
{'repository_format': 'C1',
316
'repository_format_to': 'C2',
317
'transport_readonly_server': 'b',
318
'transport_server': 'a',
319
'extra_setup': 'C3'}),
321
{'repository_format': 'D1',
322
'repository_format_to': 'D2',
323
'transport_readonly_server': 'b',
324
'transport_server': 'a',
325
'extra_setup': 'D3'})],
329
class TestWorkingTreeScenarios(tests.TestCase):
331
def test_scenarios(self):
332
# check that constructor parameters are passed through to the adapted
334
from bzrlib.tests.per_workingtree import make_scenarios
337
formats = [workingtree_4.WorkingTreeFormat4(),
338
workingtree_3.WorkingTreeFormat3(),
339
workingtree_4.WorkingTreeFormat6()]
340
scenarios = make_scenarios(server1, server2, formats,
341
remote_server='c', remote_readonly_server='d',
342
remote_backing_server='e')
344
('WorkingTreeFormat4',
345
{'bzrdir_format': formats[0]._matchingbzrdir,
346
'transport_readonly_server': 'b',
347
'transport_server': 'a',
348
'workingtree_format': formats[0]}),
349
('WorkingTreeFormat3',
350
{'bzrdir_format': formats[1]._matchingbzrdir,
351
'transport_readonly_server': 'b',
352
'transport_server': 'a',
353
'workingtree_format': formats[1]}),
354
('WorkingTreeFormat6',
355
{'bzrdir_format': formats[2]._matchingbzrdir,
356
'transport_readonly_server': 'b',
357
'transport_server': 'a',
358
'workingtree_format': formats[2]}),
359
('WorkingTreeFormat6,remote',
360
{'bzrdir_format': formats[2]._matchingbzrdir,
361
'repo_is_remote': True,
362
'transport_readonly_server': 'd',
363
'transport_server': 'c',
364
'vfs_transport_factory': 'e',
365
'workingtree_format': formats[2]}),
369
class TestTreeScenarios(tests.TestCase):
371
def test_scenarios(self):
372
# the tree implementation scenario generator is meant to setup one
373
# instance for each working tree format, one additional instance
374
# that will use the default wt format, but create a revision tree for
375
# the tests, and one more that uses the default wt format as a
376
# lightweight checkout of a remote repository. This means that the wt
377
# ones should have the workingtree_to_test_tree attribute set to
378
# 'return_parameter' and the revision one set to
181
adapter = BzrDirTestProviderAdapter(server1, server2, formats)
182
suite = adapter.adapt(input_test)
183
tests = list(iter(suite))
184
self.assertEqual(2, len(tests))
185
self.assertEqual(tests[0].bzrdir_format, formats[0])
186
self.assertEqual(tests[0].transport_server, server1)
187
self.assertEqual(tests[0].transport_readonly_server, server2)
188
self.assertEqual(tests[1].bzrdir_format, formats[1])
189
self.assertEqual(tests[1].transport_server, server1)
190
self.assertEqual(tests[1].transport_readonly_server, server2)
193
class TestRepositoryProviderAdapter(TestCase):
194
"""A group of tests that test the repository implementation test adapter."""
196
def test_adapted_tests(self):
197
# check that constructor parameters are passed through to the adapted
199
from bzrlib.repository import RepositoryTestProviderAdapter
200
input_test = TestRepositoryProviderAdapter(
201
"test_adapted_tests")
204
formats = [("c", "C"), ("d", "D")]
205
adapter = RepositoryTestProviderAdapter(server1, server2, formats)
206
suite = adapter.adapt(input_test)
207
tests = list(iter(suite))
208
self.assertEqual(2, len(tests))
209
self.assertEqual(tests[0].bzrdir_format, formats[0][1])
210
self.assertEqual(tests[0].repository_format, formats[0][0])
211
self.assertEqual(tests[0].transport_server, server1)
212
self.assertEqual(tests[0].transport_readonly_server, server2)
213
self.assertEqual(tests[1].bzrdir_format, formats[1][1])
214
self.assertEqual(tests[1].repository_format, formats[1][0])
215
self.assertEqual(tests[1].transport_server, server1)
216
self.assertEqual(tests[1].transport_readonly_server, server2)
219
class TestInterRepositoryProviderAdapter(TestCase):
220
"""A group of tests that test the InterRepository test adapter."""
222
def test_adapted_tests(self):
223
# check that constructor parameters are passed through to the adapted
225
from bzrlib.repository import InterRepositoryTestProviderAdapter
226
input_test = TestInterRepositoryProviderAdapter(
227
"test_adapted_tests")
230
formats = [(str, "C1", "C2"), (int, "D1", "D2")]
231
adapter = InterRepositoryTestProviderAdapter(server1, server2, formats)
232
suite = adapter.adapt(input_test)
233
tests = list(iter(suite))
234
self.assertEqual(2, len(tests))
235
self.assertEqual(tests[0].interrepo_class, formats[0][0])
236
self.assertEqual(tests[0].repository_format, formats[0][1])
237
self.assertEqual(tests[0].repository_format_to, formats[0][2])
238
self.assertEqual(tests[0].transport_server, server1)
239
self.assertEqual(tests[0].transport_readonly_server, server2)
240
self.assertEqual(tests[1].interrepo_class, formats[1][0])
241
self.assertEqual(tests[1].repository_format, formats[1][1])
242
self.assertEqual(tests[1].repository_format_to, formats[1][2])
243
self.assertEqual(tests[1].transport_server, server1)
244
self.assertEqual(tests[1].transport_readonly_server, server2)
247
class TestInterVersionedFileProviderAdapter(TestCase):
248
"""A group of tests that test the InterVersionedFile test adapter."""
250
def test_adapted_tests(self):
251
# check that constructor parameters are passed through to the adapted
253
from bzrlib.versionedfile import InterVersionedFileTestProviderAdapter
254
input_test = TestInterRepositoryProviderAdapter(
255
"test_adapted_tests")
258
formats = [(str, "C1", "C2"), (int, "D1", "D2")]
259
adapter = InterVersionedFileTestProviderAdapter(server1, server2, formats)
260
suite = adapter.adapt(input_test)
261
tests = list(iter(suite))
262
self.assertEqual(2, len(tests))
263
self.assertEqual(tests[0].interversionedfile_class, formats[0][0])
264
self.assertEqual(tests[0].versionedfile_factory, formats[0][1])
265
self.assertEqual(tests[0].versionedfile_factory_to, formats[0][2])
266
self.assertEqual(tests[0].transport_server, server1)
267
self.assertEqual(tests[0].transport_readonly_server, server2)
268
self.assertEqual(tests[1].interversionedfile_class, formats[1][0])
269
self.assertEqual(tests[1].versionedfile_factory, formats[1][1])
270
self.assertEqual(tests[1].versionedfile_factory_to, formats[1][2])
271
self.assertEqual(tests[1].transport_server, server1)
272
self.assertEqual(tests[1].transport_readonly_server, server2)
275
class TestRevisionStoreProviderAdapter(TestCase):
276
"""A group of tests that test the RevisionStore test adapter."""
278
def test_adapted_tests(self):
279
# check that constructor parameters are passed through to the adapted
281
from bzrlib.store.revision import RevisionStoreTestProviderAdapter
282
input_test = TestRevisionStoreProviderAdapter(
283
"test_adapted_tests")
284
# revision stores need a store factory - i.e. RevisionKnit
285
#, a readonly and rw transport
289
store_factories = ["c", "d"]
290
adapter = RevisionStoreTestProviderAdapter(server1, server2, store_factories)
291
suite = adapter.adapt(input_test)
292
tests = list(iter(suite))
293
self.assertEqual(2, len(tests))
294
self.assertEqual(tests[0].store_factory, store_factories[0][0])
295
self.assertEqual(tests[0].transport_server, server1)
296
self.assertEqual(tests[0].transport_readonly_server, server2)
297
self.assertEqual(tests[1].store_factory, store_factories[1][0])
298
self.assertEqual(tests[1].transport_server, server1)
299
self.assertEqual(tests[1].transport_readonly_server, server2)
302
class TestWorkingTreeProviderAdapter(TestCase):
303
"""A group of tests that test the workingtree implementation test adapter."""
305
def test_adapted_tests(self):
306
# check that constructor parameters are passed through to the adapted
308
from bzrlib.workingtree import WorkingTreeTestProviderAdapter
309
input_test = TestWorkingTreeProviderAdapter(
310
"test_adapted_tests")
313
formats = [("c", "C"), ("d", "D")]
314
adapter = WorkingTreeTestProviderAdapter(server1, server2, formats)
315
suite = adapter.adapt(input_test)
316
tests = list(iter(suite))
317
self.assertEqual(2, len(tests))
318
self.assertEqual(tests[0].workingtree_format, formats[0][0])
319
self.assertEqual(tests[0].bzrdir_format, formats[0][1])
320
self.assertEqual(tests[0].transport_server, server1)
321
self.assertEqual(tests[0].transport_readonly_server, server2)
322
self.assertEqual(tests[1].workingtree_format, formats[1][0])
323
self.assertEqual(tests[1].bzrdir_format, formats[1][1])
324
self.assertEqual(tests[1].transport_server, server1)
325
self.assertEqual(tests[1].transport_readonly_server, server2)
328
class TestTreeProviderAdapter(TestCase):
329
"""Test the setup of tree_implementation tests."""
331
def test_adapted_tests(self):
332
# the tree implementation adapter is meant to setup one instance for
333
# each working tree format, and one additional instance that will
334
# use the default wt format, but create a revision tree for the tests.
335
# this means that the wt ones should have the workingtree_to_test_tree
336
# attribute set to 'return_parameter' and the revision one set to
379
337
# revision_tree_from_workingtree.
381
from bzrlib.tests.per_tree import (
382
_dirstate_tree_from_workingtree,
339
from bzrlib.tests.tree_implementations import (
340
TreeTestProviderAdapter,
386
341
return_parameter,
387
342
revision_tree_from_workingtree
344
from bzrlib.workingtree import WorkingTreeFormat
345
input_test = TestTreeProviderAdapter(
346
"test_adapted_tests")
391
smart_server = test_server.SmartTCPServer_for_testing
392
smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
393
mem_server = memory.MemoryServer
394
formats = [workingtree_4.WorkingTreeFormat4(),
395
workingtree_3.WorkingTreeFormat3(),]
396
scenarios = make_scenarios(server1, server2, formats)
397
self.assertEqual(8, len(scenarios))
398
default_wt_format = workingtree.format_registry.get_default()
399
wt4_format = workingtree_4.WorkingTreeFormat4()
400
wt5_format = workingtree_4.WorkingTreeFormat5()
401
wt6_format = workingtree_4.WorkingTreeFormat6()
402
expected_scenarios = [
403
('WorkingTreeFormat4',
404
{'bzrdir_format': formats[0]._matchingbzrdir,
405
'transport_readonly_server': 'b',
406
'transport_server': 'a',
407
'workingtree_format': formats[0],
408
'_workingtree_to_test_tree': return_parameter,
410
('WorkingTreeFormat3',
411
{'bzrdir_format': formats[1]._matchingbzrdir,
412
'transport_readonly_server': 'b',
413
'transport_server': 'a',
414
'workingtree_format': formats[1],
415
'_workingtree_to_test_tree': return_parameter,
417
('WorkingTreeFormat6,remote',
418
{'bzrdir_format': wt6_format._matchingbzrdir,
419
'repo_is_remote': True,
420
'transport_readonly_server': smart_readonly_server,
421
'transport_server': smart_server,
422
'vfs_transport_factory': mem_server,
423
'workingtree_format': wt6_format,
424
'_workingtree_to_test_tree': return_parameter,
427
{'_workingtree_to_test_tree': revision_tree_from_workingtree,
428
'bzrdir_format': default_wt_format._matchingbzrdir,
429
'transport_readonly_server': 'b',
430
'transport_server': 'a',
431
'workingtree_format': default_wt_format,
433
('DirStateRevisionTree,WT4',
434
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
435
'bzrdir_format': wt4_format._matchingbzrdir,
436
'transport_readonly_server': 'b',
437
'transport_server': 'a',
438
'workingtree_format': wt4_format,
440
('DirStateRevisionTree,WT5',
441
{'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
442
'bzrdir_format': wt5_format._matchingbzrdir,
443
'transport_readonly_server': 'b',
444
'transport_server': 'a',
445
'workingtree_format': wt5_format,
448
{'_workingtree_to_test_tree': preview_tree_pre,
449
'bzrdir_format': default_wt_format._matchingbzrdir,
450
'transport_readonly_server': 'b',
451
'transport_server': 'a',
452
'workingtree_format': default_wt_format}),
454
{'_workingtree_to_test_tree': preview_tree_post,
455
'bzrdir_format': default_wt_format._matchingbzrdir,
456
'transport_readonly_server': 'b',
457
'transport_server': 'a',
458
'workingtree_format': default_wt_format}),
460
self.assertEqual(expected_scenarios, scenarios)
463
class TestInterTreeScenarios(tests.TestCase):
349
formats = [("c", "C"), ("d", "D")]
350
adapter = TreeTestProviderAdapter(server1, server2, formats)
351
suite = adapter.adapt(input_test)
352
tests = list(iter(suite))
353
self.assertEqual(3, len(tests))
354
default_format = WorkingTreeFormat.get_default_format()
355
self.assertEqual(tests[0].workingtree_format, formats[0][0])
356
self.assertEqual(tests[0].bzrdir_format, formats[0][1])
357
self.assertEqual(tests[0].transport_server, server1)
358
self.assertEqual(tests[0].transport_readonly_server, server2)
359
self.assertEqual(tests[0].workingtree_to_test_tree, return_parameter)
360
self.assertEqual(tests[1].workingtree_format, formats[1][0])
361
self.assertEqual(tests[1].bzrdir_format, formats[1][1])
362
self.assertEqual(tests[1].transport_server, server1)
363
self.assertEqual(tests[1].transport_readonly_server, server2)
364
self.assertEqual(tests[1].workingtree_to_test_tree, return_parameter)
365
self.assertEqual(tests[2].workingtree_format, default_format)
366
self.assertEqual(tests[2].bzrdir_format, default_format._matchingbzrdir)
367
self.assertEqual(tests[2].transport_server, server1)
368
self.assertEqual(tests[2].transport_readonly_server, server2)
369
self.assertEqual(tests[2].workingtree_to_test_tree,
370
revision_tree_from_workingtree)
373
class TestInterTreeProviderAdapter(TestCase):
464
374
"""A group of tests that test the InterTreeTestAdapter."""
466
def test_scenarios(self):
376
def test_adapted_tests(self):
467
377
# check that constructor parameters are passed through to the adapted
469
379
# for InterTree tests we want the machinery to bring up two trees in
1493
794
self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1494
795
self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
1495
796
self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
1496
del self._benchcalls[:]
1498
def test_knownFailure(self):
1499
"""Self.knownFailure() should raise a KnownFailure exception."""
1500
self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1502
def test_open_bzrdir_safe_roots(self):
1503
# even a memory transport should fail to open when its url isn't
1505
# Manually set one up (TestCase doesn't and shouldn't provide magic
1507
transport_server = memory.MemoryServer()
1508
transport_server.start_server()
1509
self.addCleanup(transport_server.stop_server)
1510
t = transport.get_transport_from_url(transport_server.get_url())
1511
controldir.ControlDir.create(t.base)
1512
self.assertRaises(errors.BzrError,
1513
controldir.ControlDir.open_from_transport, t)
1514
# But if we declare this as safe, we can open the bzrdir.
1515
self.permit_url(t.base)
1516
self._bzr_selftest_roots.append(t.base)
1517
controldir.ControlDir.open_from_transport(t)
1519
def test_requireFeature_available(self):
1520
"""self.requireFeature(available) is a no-op."""
1521
class Available(features.Feature):
1522
def _probe(self):return True
1523
feature = Available()
1524
self.requireFeature(feature)
1526
def test_requireFeature_unavailable(self):
1527
"""self.requireFeature(unavailable) raises UnavailableFeature."""
1528
class Unavailable(features.Feature):
1529
def _probe(self):return False
1530
feature = Unavailable()
1531
self.assertRaises(tests.UnavailableFeature,
1532
self.requireFeature, feature)
1534
def test_run_no_parameters(self):
1535
test = SampleTestCase('_test_pass')
1538
def test_run_enabled_unittest_result(self):
1539
"""Test we revert to regular behaviour when the test is enabled."""
1540
test = SampleTestCase('_test_pass')
1541
class EnabledFeature(object):
1542
def available(self):
1544
test._test_needs_features = [EnabledFeature()]
1545
result = unittest.TestResult()
1547
self.assertEqual(1, result.testsRun)
1548
self.assertEqual([], result.errors)
1549
self.assertEqual([], result.failures)
1551
def test_run_disabled_unittest_result(self):
1552
"""Test our compatability for disabled tests with unittest results."""
1553
test = SampleTestCase('_test_pass')
1554
class DisabledFeature(object):
1555
def available(self):
1557
test._test_needs_features = [DisabledFeature()]
1558
result = unittest.TestResult()
1560
self.assertEqual(1, result.testsRun)
1561
self.assertEqual([], result.errors)
1562
self.assertEqual([], result.failures)
1564
def test_run_disabled_supporting_result(self):
1565
"""Test disabled tests behaviour with support aware results."""
1566
test = SampleTestCase('_test_pass')
1567
class DisabledFeature(object):
1568
def __eq__(self, other):
1569
return isinstance(other, DisabledFeature)
1570
def available(self):
1572
the_feature = DisabledFeature()
1573
test._test_needs_features = [the_feature]
1574
class InstrumentedTestResult(unittest.TestResult):
1576
unittest.TestResult.__init__(self)
1578
def startTest(self, test):
1579
self.calls.append(('startTest', test))
1580
def stopTest(self, test):
1581
self.calls.append(('stopTest', test))
1582
def addNotSupported(self, test, feature):
1583
self.calls.append(('addNotSupported', test, feature))
1584
result = InstrumentedTestResult()
1586
case = result.calls[0][1]
1588
('startTest', case),
1589
('addNotSupported', case, the_feature),
1594
def test_start_server_registers_url(self):
1595
transport_server = memory.MemoryServer()
1596
# A little strict, but unlikely to be changed soon.
1597
self.assertEqual([], self._bzr_selftest_roots)
1598
self.start_server(transport_server)
1599
self.assertSubset([transport_server.get_url()],
1600
self._bzr_selftest_roots)
1602
def test_assert_list_raises_on_generator(self):
1603
def generator_which_will_raise():
1604
# This will not raise until after the first yield
1606
raise _TestException()
1608
e = self.assertListRaises(_TestException, generator_which_will_raise)
1609
self.assertIsInstance(e, _TestException)
1611
e = self.assertListRaises(Exception, generator_which_will_raise)
1612
self.assertIsInstance(e, _TestException)
1614
def test_assert_list_raises_on_plain(self):
1615
def plain_exception():
1616
raise _TestException()
1619
e = self.assertListRaises(_TestException, plain_exception)
1620
self.assertIsInstance(e, _TestException)
1622
e = self.assertListRaises(Exception, plain_exception)
1623
self.assertIsInstance(e, _TestException)
1625
def test_assert_list_raises_assert_wrong_exception(self):
1626
class _NotTestException(Exception):
1629
def wrong_exception():
1630
raise _NotTestException()
1632
def wrong_exception_generator():
1635
raise _NotTestException()
1637
# Wrong exceptions are not intercepted
1638
self.assertRaises(_NotTestException,
1639
self.assertListRaises, _TestException, wrong_exception)
1640
self.assertRaises(_NotTestException,
1641
self.assertListRaises, _TestException, wrong_exception_generator)
1643
def test_assert_list_raises_no_exception(self):
1647
def success_generator():
1651
self.assertRaises(AssertionError,
1652
self.assertListRaises, _TestException, success)
1654
self.assertRaises(AssertionError,
1655
self.assertListRaises, _TestException, success_generator)
1657
def _run_successful_test(self, test):
1658
result = testtools.TestResult()
1660
self.assertTrue(result.wasSuccessful())
1663
def test_overrideAttr_without_value(self):
1664
self.test_attr = 'original' # Define a test attribute
1665
obj = self # Make 'obj' visible to the embedded test
1666
class Test(tests.TestCase):
1669
super(Test, self).setUp()
1670
self.orig = self.overrideAttr(obj, 'test_attr')
1672
def test_value(self):
1673
self.assertEqual('original', self.orig)
1674
self.assertEqual('original', obj.test_attr)
1675
obj.test_attr = 'modified'
1676
self.assertEqual('modified', obj.test_attr)
1678
self._run_successful_test(Test('test_value'))
1679
self.assertEqual('original', obj.test_attr)
1681
def test_overrideAttr_with_value(self):
1682
self.test_attr = 'original' # Define a test attribute
1683
obj = self # Make 'obj' visible to the embedded test
1684
class Test(tests.TestCase):
1687
super(Test, self).setUp()
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1690
def test_value(self):
1691
self.assertEqual('original', self.orig)
1692
self.assertEqual('modified', obj.test_attr)
1694
self._run_successful_test(Test('test_value'))
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1729
def test_recordCalls(self):
1730
from bzrlib.tests import test_selftest
1731
calls = self.recordCalls(
1732
test_selftest, '_add_numbers')
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1735
self.assertEqual(calls, [((2, 10), {})])
1738
def _add_numbers(a, b):
1742
class _MissingFeature(features.Feature):
1745
missing_feature = _MissingFeature()
1748
def _get_test(name):
1749
"""Get an instance of a specific example test.
1751
We protect this in a function so that they don't auto-run in the test
1755
class ExampleTests(tests.TestCase):
1757
def test_fail(self):
1758
mutter('this was a failing test')
1759
self.fail('this test will fail')
1761
def test_error(self):
1762
mutter('this test errored')
1763
raise RuntimeError('gotcha')
1765
def test_missing_feature(self):
1766
mutter('missing the feature')
1767
self.requireFeature(missing_feature)
1769
def test_skip(self):
1770
mutter('this test will be skipped')
1771
raise tests.TestSkipped('reason')
1773
def test_success(self):
1774
mutter('this test succeeds')
1776
def test_xfail(self):
1777
mutter('test with expected failure')
1778
self.knownFailure('this_fails')
1780
def test_unexpected_success(self):
1781
mutter('test with unexpected success')
1782
self.expectFailure('should_fail', lambda: None)
1784
return ExampleTests(name)
1787
class TestTestCaseLogDetails(tests.TestCase):
1789
def _run_test(self, test_name):
1790
test = _get_test(test_name)
1791
result = testtools.TestResult()
1795
def test_fail_has_log(self):
1796
result = self._run_test('test_fail')
1797
self.assertEqual(1, len(result.failures))
1798
result_content = result.failures[0][1]
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1801
self.assertContainsRe(result_content, 'this was a failing test')
1803
def test_error_has_log(self):
1804
result = self._run_test('test_error')
1805
self.assertEqual(1, len(result.errors))
1806
result_content = result.errors[0][1]
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1809
self.assertContainsRe(result_content, 'this test errored')
1811
def test_skip_has_no_log(self):
1812
result = self._run_test('test_skip')
1813
self.assertEqual(['reason'], result.skip_reasons.keys())
1814
skips = result.skip_reasons['reason']
1815
self.assertEqual(1, len(skips))
1817
self.assertFalse('log' in test.getDetails())
1819
def test_missing_feature_has_no_log(self):
1820
# testtools doesn't know about addNotSupported, so it just gets
1821
# considered as a skip
1822
result = self._run_test('test_missing_feature')
1823
self.assertEqual([missing_feature], result.skip_reasons.keys())
1824
skips = result.skip_reasons[missing_feature]
1825
self.assertEqual(1, len(skips))
1827
self.assertFalse('log' in test.getDetails())
1829
def test_xfail_has_no_log(self):
1830
result = self._run_test('test_xfail')
1831
self.assertEqual(1, len(result.expectedFailures))
1832
result_content = result.expectedFailures[0][1]
1833
self.assertNotContainsRe(result_content,
1834
'(?m)^(?:Text attachment: )?log(?:$|: )')
1835
self.assertNotContainsRe(result_content, 'test with expected failure')
1837
def test_unexpected_success_has_log(self):
1838
result = self._run_test('test_unexpected_success')
1839
self.assertEqual(1, len(result.unexpectedSuccesses))
1840
# Inconsistency, unexpectedSuccesses is a list of tests,
1841
# expectedFailures is a list of reasons?
1842
test = result.unexpectedSuccesses[0]
1843
details = test.getDetails()
1844
self.assertTrue('log' in details)
1847
class TestTestCloning(tests.TestCase):
1848
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1850
def test_cloned_testcase_does_not_share_details(self):
1851
"""A TestCase cloned with clone_test does not share mutable attributes
1852
such as details or cleanups.
1854
class Test(tests.TestCase):
1856
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1857
orig_test = Test('test_foo')
1858
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1859
orig_test.run(unittest.TestResult())
1860
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1861
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1863
def test_double_apply_scenario_preserves_first_scenario(self):
1864
"""Applying two levels of scenarios to a test preserves the attributes
1865
added by both scenarios.
1867
class Test(tests.TestCase):
1870
test = Test('test_foo')
1871
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1872
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1873
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1874
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1875
all_tests = list(tests.iter_suite_tests(suite))
1876
self.assertLength(4, all_tests)
1877
all_xys = sorted((t.x, t.y) for t in all_tests)
1878
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1881
# NB: Don't delete this; it's not actually from 0.11!
1882
@deprecated_function(deprecated_in((0, 11, 0)))
1883
def sample_deprecated_function():
1884
"""A deprecated function to test applyDeprecated with."""
1888
def sample_undeprecated_function(a_param):
1889
"""A undeprecated function to test applyDeprecated with."""
1892
class ApplyDeprecatedHelper(object):
1893
"""A helper class for ApplyDeprecated tests."""
1895
@deprecated_method(deprecated_in((0, 11, 0)))
1896
def sample_deprecated_method(self, param_one):
1897
"""A deprecated method for testing with."""
1900
def sample_normal_method(self):
1901
"""A undeprecated method."""
1903
@deprecated_method(deprecated_in((0, 10, 0)))
1904
def sample_nested_deprecation(self):
1905
return sample_deprecated_function()
1908
class TestExtraAssertions(tests.TestCase):
799
class TestExtraAssertions(TestCase):
1909
800
"""Tests for new test assertions in bzrlib test suite"""
1911
802
def test_assert_isinstance(self):
1912
803
self.assertIsInstance(2, int)
1913
804
self.assertIsInstance(u'', basestring)
1914
e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1915
self.assertEqual(str(e),
1916
"None is an instance of <type 'NoneType'> rather than <type 'int'>")
805
self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1917
806
self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1918
e = self.assertRaises(AssertionError,
1919
self.assertIsInstance, None, int, "it's just not")
1920
self.assertEqual(str(e),
1921
"None is an instance of <type 'NoneType'> rather than <type 'int'>"
1924
808
def test_assertEndsWith(self):
1925
809
self.assertEndsWith('foo', 'oo')
1926
810
self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1928
def test_assertEqualDiff(self):
1929
e = self.assertRaises(AssertionError,
1930
self.assertEqualDiff, '', '\n')
1931
self.assertEqual(str(e),
1932
# Don't blink ! The '+' applies to the second string
1933
'first string is missing a final newline.\n+ \n')
1934
e = self.assertRaises(AssertionError,
1935
self.assertEqualDiff, '\n', '')
1936
self.assertEqual(str(e),
1937
# Don't blink ! The '-' applies to the second string
1938
'second string is missing a final newline.\n- \n')
1941
class TestDeprecations(tests.TestCase):
1943
def test_applyDeprecated_not_deprecated(self):
1944
sample_object = ApplyDeprecatedHelper()
1945
# calling an undeprecated callable raises an assertion
1946
self.assertRaises(AssertionError, self.applyDeprecated,
1947
deprecated_in((0, 11, 0)),
1948
sample_object.sample_normal_method)
1949
self.assertRaises(AssertionError, self.applyDeprecated,
1950
deprecated_in((0, 11, 0)),
1951
sample_undeprecated_function, "a param value")
1952
# calling a deprecated callable (function or method) with the wrong
1953
# expected deprecation fails.
1954
self.assertRaises(AssertionError, self.applyDeprecated,
1955
deprecated_in((0, 10, 0)),
1956
sample_object.sample_deprecated_method, "a param value")
1957
self.assertRaises(AssertionError, self.applyDeprecated,
1958
deprecated_in((0, 10, 0)),
1959
sample_deprecated_function)
1960
# calling a deprecated callable (function or method) with the right
1961
# expected deprecation returns the functions result.
1962
self.assertEqual("a param value",
1963
self.applyDeprecated(deprecated_in((0, 11, 0)),
1964
sample_object.sample_deprecated_method, "a param value"))
1965
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
1966
sample_deprecated_function))
1967
# calling a nested deprecation with the wrong deprecation version
1968
# fails even if a deeper nested function was deprecated with the
1970
self.assertRaises(AssertionError, self.applyDeprecated,
1971
deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
1972
# calling a nested deprecation with the right deprecation value
1973
# returns the calls result.
1974
self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
1975
sample_object.sample_nested_deprecation))
1977
def test_callDeprecated(self):
1978
def testfunc(be_deprecated, result=None):
812
def test_assertDeprecated(self):
813
def testfunc(be_deprecated):
1979
814
if be_deprecated is True:
1980
symbol_versioning.warn('i am deprecated', DeprecationWarning,
815
symbol_versioning.warn('i am deprecated', DeprecationWarning,
1983
result = self.callDeprecated(['i am deprecated'], testfunc, True)
1984
self.assertIs(None, result)
1985
result = self.callDeprecated([], testfunc, False, 'result')
1986
self.assertEqual('result', result)
1987
self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
1988
self.callDeprecated([], testfunc, be_deprecated=False)
1991
class TestWarningTests(tests.TestCase):
1992
"""Tests for calling methods that raise warnings."""
1994
def test_callCatchWarnings(self):
1996
warnings.warn("this is your last warning")
1998
wlist, result = self.callCatchWarnings(meth, 1, 2)
1999
self.assertEqual(3, result)
2000
# would like just to compare them, but UserWarning doesn't implement
2003
self.assertIsInstance(w0, UserWarning)
2004
self.assertEqual("this is your last warning", str(w0))
2007
class TestConvenienceMakers(tests.TestCaseWithTransport):
817
self.assertDeprecated(['i am deprecated'], testfunc, True)
818
self.assertDeprecated([], testfunc, False)
819
self.assertDeprecated(['i am deprecated'], testfunc,
821
self.assertDeprecated([], testfunc, be_deprecated=False)
824
class TestConvenienceMakers(TestCaseWithTransport):
2008
825
"""Test for the make_* convenience functions."""
2010
827
def test_make_branch_and_tree_with_format(self):
2011
828
# we should be able to supply a format to make_branch_and_tree
2012
829
self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
2013
self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format,
830
self.make_branch_and_tree('b', format=bzrlib.bzrdir.BzrDirFormat6())
831
self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
2014
832
bzrlib.bzrdir.BzrDirMetaFormat1)
2016
def test_make_branch_and_memory_tree(self):
2017
# we should be able to get a new branch and a mutable tree from
2018
# TestCaseWithTransport
2019
tree = self.make_branch_and_memory_tree('a')
2020
self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
2022
def test_make_tree_for_local_vfs_backed_transport(self):
2023
# make_branch_and_tree has to use local branch and repositories
2024
# when the vfs transport and local disk are colocated, even if
2025
# a different transport is in use for url generation.
2026
self.transport_server = test_server.FakeVFATServer
2027
self.assertFalse(self.get_url('t1').startswith('file://'))
2028
tree = self.make_branch_and_tree('t1')
2029
base = tree.bzrdir.root_transport.base
2030
self.assertStartsWith(base, 'file://')
2031
self.assertEqual(tree.bzrdir.root_transport,
2032
tree.branch.bzrdir.root_transport)
2033
self.assertEqual(tree.bzrdir.root_transport,
2034
tree.branch.repository.bzrdir.root_transport)
2037
class SelfTestHelper(object):
2039
def run_selftest(self, **kwargs):
2040
"""Run selftest returning its output."""
2042
old_transport = bzrlib.tests.default_transport
2043
old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2044
tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2046
self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2048
bzrlib.tests.default_transport = old_transport
2049
tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2054
class TestSelftest(tests.TestCase, SelfTestHelper):
833
self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('b')._format,
834
bzrlib.bzrdir.BzrDirFormat6)
837
class TestSelftest(TestCase):
2055
838
"""Tests of bzrlib.tests.selftest."""
2057
840
def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
2058
841
factory_called = []
2060
843
factory_called.append(True)
2061
return TestUtil.TestSuite()
2062
845
out = StringIO()
2063
846
err = StringIO()
2064
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
847
self.apply_redirected(out, err, None, bzrlib.tests.selftest,
2065
848
test_suite_factory=factory)
2066
849
self.assertEqual([True], factory_called)
2069
"""A test suite factory."""
2070
class Test(tests.TestCase):
2077
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2079
def test_list_only(self):
2080
output = self.run_selftest(test_suite_factory=self.factory,
2082
self.assertEqual(3, len(output.readlines()))
2084
def test_list_only_filtered(self):
2085
output = self.run_selftest(test_suite_factory=self.factory,
2086
list_only=True, pattern="Test.b")
2087
self.assertEndsWith(output.getvalue(), "Test.b\n")
2088
self.assertLength(1, output.readlines())
2090
def test_list_only_excludes(self):
2091
output = self.run_selftest(test_suite_factory=self.factory,
2092
list_only=True, exclude_pattern="Test.b")
2093
self.assertNotContainsRe("Test.b", output.getvalue())
2094
self.assertLength(2, output.readlines())
2096
def test_lsprof_tests(self):
2097
self.requireFeature(features.lsprof_feature)
2100
def __call__(test, result):
2102
def run(test, result):
2103
results.append(result)
2104
def countTestCases(self):
2106
self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2107
self.assertLength(1, results)
2108
self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2110
def test_random(self):
2111
# test randomising by listing a number of tests.
2112
output_123 = self.run_selftest(test_suite_factory=self.factory,
2113
list_only=True, random_seed="123")
2114
output_234 = self.run_selftest(test_suite_factory=self.factory,
2115
list_only=True, random_seed="234")
2116
self.assertNotEqual(output_123, output_234)
2117
# "Randominzing test order..\n\n
2118
self.assertLength(5, output_123.readlines())
2119
self.assertLength(5, output_234.readlines())
2121
def test_random_reuse_is_same_order(self):
2122
# test randomising by listing a number of tests.
2123
expected = self.run_selftest(test_suite_factory=self.factory,
2124
list_only=True, random_seed="123")
2125
repeated = self.run_selftest(test_suite_factory=self.factory,
2126
list_only=True, random_seed="123")
2127
self.assertEqual(expected.getvalue(), repeated.getvalue())
2129
def test_runner_class(self):
2130
self.requireFeature(features.subunit)
2131
from subunit import ProtocolTestCase
2132
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2133
test_suite_factory=self.factory)
2134
test = ProtocolTestCase(stream)
2135
result = unittest.TestResult()
2137
self.assertEqual(3, result.testsRun)
2139
def test_starting_with_single_argument(self):
2140
output = self.run_selftest(test_suite_factory=self.factory,
2141
starting_with=['bzrlib.tests.test_selftest.Test.a'],
2143
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
2146
def test_starting_with_multiple_argument(self):
2147
output = self.run_selftest(test_suite_factory=self.factory,
2148
starting_with=['bzrlib.tests.test_selftest.Test.a',
2149
'bzrlib.tests.test_selftest.Test.b'],
2151
self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
2152
'bzrlib.tests.test_selftest.Test.b\n',
2155
def check_transport_set(self, transport_server):
2156
captured_transport = []
2157
def seen_transport(a_transport):
2158
captured_transport.append(a_transport)
2159
class Capture(tests.TestCase):
2161
seen_transport(bzrlib.tests.default_transport)
2163
return TestUtil.TestSuite([Capture("a")])
2164
self.run_selftest(transport=transport_server, test_suite_factory=factory)
2165
self.assertEqual(transport_server, captured_transport[0])
2167
def test_transport_sftp(self):
2168
self.requireFeature(features.paramiko)
2169
from bzrlib.tests import stub_sftp
2170
self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2172
def test_transport_memory(self):
2173
self.check_transport_set(memory.MemoryServer)
2176
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2177
# Does IO: reads test.list
2179
def test_load_list(self):
2180
# Provide a list with one test - this test.
2181
test_id_line = '%s\n' % self.id()
2182
self.build_tree_contents([('test.list', test_id_line)])
2183
# And generate a list of the tests in the suite.
2184
stream = self.run_selftest(load_list='test.list', list_only=True)
2185
self.assertEqual(test_id_line, stream.getvalue())
2187
def test_load_unknown(self):
2188
# Provide a list with one test - this test.
2189
# And generate a list of the tests in the suite.
2190
err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
2191
load_list='missing file name', list_only=True)
2194
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2196
_test_needs_features = [features.subunit]
2198
def run_subunit_stream(self, test_name):
2199
from subunit import ProtocolTestCase
2201
return TestUtil.TestSuite([_get_test(test_name)])
2202
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2203
test_suite_factory=factory)
2204
test = ProtocolTestCase(stream)
2205
result = testtools.TestResult()
2207
content = stream.getvalue()
2208
return content, result
2210
def test_fail_has_log(self):
2211
content, result = self.run_subunit_stream('test_fail')
2212
self.assertEqual(1, len(result.failures))
2213
self.assertContainsRe(content, '(?m)^log$')
2214
self.assertContainsRe(content, 'this test will fail')
2216
def test_error_has_log(self):
2217
content, result = self.run_subunit_stream('test_error')
2218
self.assertContainsRe(content, '(?m)^log$')
2219
self.assertContainsRe(content, 'this test errored')
2221
def test_skip_has_no_log(self):
2222
content, result = self.run_subunit_stream('test_skip')
2223
self.assertNotContainsRe(content, '(?m)^log$')
2224
self.assertNotContainsRe(content, 'this test will be skipped')
2225
self.assertEqual(['reason'], result.skip_reasons.keys())
2226
skips = result.skip_reasons['reason']
2227
self.assertEqual(1, len(skips))
2229
# RemotedTestCase doesn't preserve the "details"
2230
## self.assertFalse('log' in test.getDetails())
2232
def test_missing_feature_has_no_log(self):
2233
content, result = self.run_subunit_stream('test_missing_feature')
2234
self.assertNotContainsRe(content, '(?m)^log$')
2235
self.assertNotContainsRe(content, 'missing the feature')
2236
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2237
skips = result.skip_reasons['_MissingFeature\n']
2238
self.assertEqual(1, len(skips))
2240
# RemotedTestCase doesn't preserve the "details"
2241
## self.assertFalse('log' in test.getDetails())
2243
def test_xfail_has_no_log(self):
2244
content, result = self.run_subunit_stream('test_xfail')
2245
self.assertNotContainsRe(content, '(?m)^log$')
2246
self.assertNotContainsRe(content, 'test with expected failure')
2247
self.assertEqual(1, len(result.expectedFailures))
2248
result_content = result.expectedFailures[0][1]
2249
self.assertNotContainsRe(result_content,
2250
'(?m)^(?:Text attachment: )?log(?:$|: )')
2251
self.assertNotContainsRe(result_content, 'test with expected failure')
2253
def test_unexpected_success_has_log(self):
2254
content, result = self.run_subunit_stream('test_unexpected_success')
2255
self.assertContainsRe(content, '(?m)^log$')
2256
self.assertContainsRe(content, 'test with unexpected success')
2257
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2258
# success, if a min version check is added remove this
2259
from subunit import TestProtocolClient as _Client
2260
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2261
self.expectFailure('subunit treats "unexpectedSuccess"'
2262
' as a plain success',
2263
self.assertEqual, 1, len(result.unexpectedSuccesses))
2264
self.assertEqual(1, len(result.unexpectedSuccesses))
2265
test = result.unexpectedSuccesses[0]
2266
# RemotedTestCase doesn't preserve the "details"
2267
## self.assertTrue('log' in test.getDetails())
2269
def test_success_has_no_log(self):
2270
content, result = self.run_subunit_stream('test_success')
2271
self.assertEqual(1, result.testsRun)
2272
self.assertNotContainsRe(content, '(?m)^log$')
2273
self.assertNotContainsRe(content, 'this test succeeds')
2276
class TestRunBzr(tests.TestCase):
2281
def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
2283
"""Override _run_bzr_core to test how it is invoked by run_bzr.
2285
Attempts to run bzr from inside this class don't actually run it.
2287
We test how run_bzr actually invokes bzr in another location. Here we
2288
only need to test that it passes the right parameters to run_bzr.
2290
self.argv = list(argv)
2291
self.retcode = retcode
2292
self.encoding = encoding
2294
self.working_dir = working_dir
2295
return self.retcode, self.out, self.err
2297
def test_run_bzr_error(self):
2298
self.out = "It sure does!\n"
2299
out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2300
self.assertEqual(['rocks'], self.argv)
2301
self.assertEqual(34, self.retcode)
2302
self.assertEqual('It sure does!\n', out)
2303
self.assertEqual(out, self.out)
2304
self.assertEqual('', err)
2305
self.assertEqual(err, self.err)
2307
def test_run_bzr_error_regexes(self):
2309
self.err = "bzr: ERROR: foobarbaz is not versioned"
2310
out, err = self.run_bzr_error(
2311
["bzr: ERROR: foobarbaz is not versioned"],
2312
['file-id', 'foobarbaz'])
2314
def test_encoding(self):
2315
"""Test that run_bzr passes encoding to _run_bzr_core"""
2316
self.run_bzr('foo bar')
2317
self.assertEqual(None, self.encoding)
2318
self.assertEqual(['foo', 'bar'], self.argv)
2320
self.run_bzr('foo bar', encoding='baz')
2321
self.assertEqual('baz', self.encoding)
2322
self.assertEqual(['foo', 'bar'], self.argv)
2324
def test_retcode(self):
2325
"""Test that run_bzr passes retcode to _run_bzr_core"""
2326
# Default is retcode == 0
2327
self.run_bzr('foo bar')
2328
self.assertEqual(0, self.retcode)
2329
self.assertEqual(['foo', 'bar'], self.argv)
2331
self.run_bzr('foo bar', retcode=1)
2332
self.assertEqual(1, self.retcode)
2333
self.assertEqual(['foo', 'bar'], self.argv)
2335
self.run_bzr('foo bar', retcode=None)
2336
self.assertEqual(None, self.retcode)
2337
self.assertEqual(['foo', 'bar'], self.argv)
2339
self.run_bzr(['foo', 'bar'], retcode=3)
2340
self.assertEqual(3, self.retcode)
2341
self.assertEqual(['foo', 'bar'], self.argv)
2343
def test_stdin(self):
2344
# test that the stdin keyword to run_bzr is passed through to
2345
# _run_bzr_core as-is. We do this by overriding
2346
# _run_bzr_core in this class, and then calling run_bzr,
2347
# which is a convenience function for _run_bzr_core, so
2349
self.run_bzr('foo bar', stdin='gam')
2350
self.assertEqual('gam', self.stdin)
2351
self.assertEqual(['foo', 'bar'], self.argv)
2353
self.run_bzr('foo bar', stdin='zippy')
2354
self.assertEqual('zippy', self.stdin)
2355
self.assertEqual(['foo', 'bar'], self.argv)
2357
def test_working_dir(self):
2358
"""Test that run_bzr passes working_dir to _run_bzr_core"""
2359
self.run_bzr('foo bar')
2360
self.assertEqual(None, self.working_dir)
2361
self.assertEqual(['foo', 'bar'], self.argv)
2363
self.run_bzr('foo bar', working_dir='baz')
2364
self.assertEqual('baz', self.working_dir)
2365
self.assertEqual(['foo', 'bar'], self.argv)
2367
def test_reject_extra_keyword_arguments(self):
2368
self.assertRaises(TypeError, self.run_bzr, "foo bar",
2369
error_regex=['error message'])
2372
class TestRunBzrCaptured(tests.TestCaseWithTransport):
2373
# Does IO when testing the working_dir parameter.
2375
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2376
a_callable=None, *args, **kwargs):
2378
self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2379
self.factory = bzrlib.ui.ui_factory
2380
self.working_dir = osutils.getcwd()
2381
stdout.write('foo\n')
2382
stderr.write('bar\n')
2385
def test_stdin(self):
2386
# test that the stdin keyword to _run_bzr_core is passed through to
2387
# apply_redirected as a StringIO. We do this by overriding
2388
# apply_redirected in this class, and then calling _run_bzr_core,
2389
# which calls apply_redirected.
2390
self.run_bzr(['foo', 'bar'], stdin='gam')
2391
self.assertEqual('gam', self.stdin.read())
2392
self.assertTrue(self.stdin is self.factory_stdin)
2393
self.run_bzr(['foo', 'bar'], stdin='zippy')
2394
self.assertEqual('zippy', self.stdin.read())
2395
self.assertTrue(self.stdin is self.factory_stdin)
2397
def test_ui_factory(self):
2398
# each invocation of self.run_bzr should get its
2399
# own UI factory, which is an instance of TestUIFactory,
2400
# with stdin, stdout and stderr attached to the stdin,
2401
# stdout and stderr of the invoked run_bzr
2402
current_factory = bzrlib.ui.ui_factory
2403
self.run_bzr(['foo'])
2404
self.assertFalse(current_factory is self.factory)
2405
self.assertNotEqual(sys.stdout, self.factory.stdout)
2406
self.assertNotEqual(sys.stderr, self.factory.stderr)
2407
self.assertEqual('foo\n', self.factory.stdout.getvalue())
2408
self.assertEqual('bar\n', self.factory.stderr.getvalue())
2409
self.assertIsInstance(self.factory, tests.TestUIFactory)
2411
def test_working_dir(self):
2412
self.build_tree(['one/', 'two/'])
2413
cwd = osutils.getcwd()
2415
# Default is to work in the current directory
2416
self.run_bzr(['foo', 'bar'])
2417
self.assertEqual(cwd, self.working_dir)
2419
self.run_bzr(['foo', 'bar'], working_dir=None)
2420
self.assertEqual(cwd, self.working_dir)
2422
# The function should be run in the alternative directory
2423
# but afterwards the current working dir shouldn't be changed
2424
self.run_bzr(['foo', 'bar'], working_dir='one')
2425
self.assertNotEqual(cwd, self.working_dir)
2426
self.assertEndsWith(self.working_dir, 'one')
2427
self.assertEqual(cwd, osutils.getcwd())
2429
self.run_bzr(['foo', 'bar'], working_dir='two')
2430
self.assertNotEqual(cwd, self.working_dir)
2431
self.assertEndsWith(self.working_dir, 'two')
2432
self.assertEqual(cwd, osutils.getcwd())
2435
class StubProcess(object):
2436
"""A stub process for testing run_bzr_subprocess."""
2438
def __init__(self, out="", err="", retcode=0):
2441
self.returncode = retcode
2443
def communicate(self):
2444
return self.out, self.err
2447
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2448
"""Base class for tests testing how we might run bzr."""
2451
super(TestWithFakedStartBzrSubprocess, self).setUp()
2452
self.subprocess_calls = []
2454
def start_bzr_subprocess(self, process_args, env_changes=None,
2455
skip_if_plan_to_signal=False,
2457
allow_plugins=False):
2458
"""capture what run_bzr_subprocess tries to do."""
2459
self.subprocess_calls.append({'process_args':process_args,
2460
'env_changes':env_changes,
2461
'skip_if_plan_to_signal':skip_if_plan_to_signal,
2462
'working_dir':working_dir, 'allow_plugins':allow_plugins})
2463
return self.next_subprocess
2466
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2468
def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2469
"""Run run_bzr_subprocess with args and kwargs using a stubbed process.
2471
Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2472
that will return static results. This assertion method populates those
2473
results and also checks the arguments run_bzr_subprocess generates.
2475
self.next_subprocess = process
2477
result = self.run_bzr_subprocess(*args, **kwargs)
2479
self.next_subprocess = None
2480
for key, expected in expected_args.iteritems():
2481
self.assertEqual(expected, self.subprocess_calls[-1][key])
2484
self.next_subprocess = None
2485
for key, expected in expected_args.iteritems():
2486
self.assertEqual(expected, self.subprocess_calls[-1][key])
2489
def test_run_bzr_subprocess(self):
2490
"""The run_bzr_helper_external command behaves nicely."""
2491
self.assertRunBzrSubprocess({'process_args':['--version']},
2492
StubProcess(), '--version')
2493
self.assertRunBzrSubprocess({'process_args':['--version']},
2494
StubProcess(), ['--version'])
2495
# retcode=None disables retcode checking
2496
result = self.assertRunBzrSubprocess({},
2497
StubProcess(retcode=3), '--version', retcode=None)
2498
result = self.assertRunBzrSubprocess({},
2499
StubProcess(out="is free software"), '--version')
2500
self.assertContainsRe(result[0], 'is free software')
2501
# Running a subcommand that is missing errors
2502
self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2503
{'process_args':['--versionn']}, StubProcess(retcode=3),
2505
# Unless it is told to expect the error from the subprocess
2506
result = self.assertRunBzrSubprocess({},
2507
StubProcess(retcode=3), '--versionn', retcode=3)
2508
# Or to ignore retcode checking
2509
result = self.assertRunBzrSubprocess({},
2510
StubProcess(err="unknown command", retcode=3), '--versionn',
2512
self.assertContainsRe(result[1], 'unknown command')
2514
def test_env_change_passes_through(self):
2515
self.assertRunBzrSubprocess(
2516
{'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2518
env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2520
def test_no_working_dir_passed_as_None(self):
2521
self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2523
def test_no_working_dir_passed_through(self):
2524
self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2527
def test_run_bzr_subprocess_no_plugins(self):
2528
self.assertRunBzrSubprocess({'allow_plugins': False},
2531
def test_allow_plugins(self):
2532
self.assertRunBzrSubprocess({'allow_plugins': True},
2533
StubProcess(), '', allow_plugins=True)
2536
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2538
def test_finish_bzr_subprocess_with_error(self):
2539
"""finish_bzr_subprocess allows specification of the desired exit code.
2541
process = StubProcess(err="unknown command", retcode=3)
2542
result = self.finish_bzr_subprocess(process, retcode=3)
2543
self.assertEqual('', result[0])
2544
self.assertContainsRe(result[1], 'unknown command')
2546
def test_finish_bzr_subprocess_ignoring_retcode(self):
2547
"""finish_bzr_subprocess allows the exit code to be ignored."""
2548
process = StubProcess(err="unknown command", retcode=3)
2549
result = self.finish_bzr_subprocess(process, retcode=None)
2550
self.assertEqual('', result[0])
2551
self.assertContainsRe(result[1], 'unknown command')
2553
def test_finish_subprocess_with_unexpected_retcode(self):
2554
"""finish_bzr_subprocess raises self.failureException if the retcode is
2555
not the expected one.
2557
process = StubProcess(err="unknown command", retcode=3)
2558
self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2562
class _DontSpawnProcess(Exception):
2563
"""A simple exception which just allows us to skip unnecessary steps"""
2566
class TestStartBzrSubProcess(tests.TestCase):
2567
"""Stub test start_bzr_subprocess."""
2569
def _subprocess_log_cleanup(self):
2570
"""Inhibits the base version as we don't produce a log file."""
2572
def _popen(self, *args, **kwargs):
2573
"""Override the base version to record the command that is run.
2575
From there we can ensure it is correct without spawning a real process.
2577
self.check_popen_state()
2578
self._popen_args = args
2579
self._popen_kwargs = kwargs
2580
raise _DontSpawnProcess()
2582
def check_popen_state(self):
2583
"""Replace to make assertions when popen is called."""
2585
def test_run_bzr_subprocess_no_plugins(self):
2586
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2587
command = self._popen_args[0]
2588
self.assertEqual(sys.executable, command[0])
2589
self.assertEqual(self.get_bzr_path(), command[1])
2590
self.assertEqual(['--no-plugins'], command[2:])
2592
def test_allow_plugins(self):
2593
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2595
command = self._popen_args[0]
2596
self.assertEqual([], command[2:])
2598
def test_set_env(self):
2599
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2601
def check_environment():
2602
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2603
self.check_popen_state = check_environment
2604
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2605
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2606
# not set in theparent
2607
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2609
def test_run_bzr_subprocess_env_del(self):
2610
"""run_bzr_subprocess can remove environment variables too."""
2611
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2612
def check_environment():
2613
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2614
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2615
self.check_popen_state = check_environment
2616
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2617
env_changes={'EXISTANT_ENV_VAR':None})
2618
# Still set in parent
2619
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2620
del os.environ['EXISTANT_ENV_VAR']
2622
def test_env_del_missing(self):
2623
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2624
def check_environment():
2625
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2626
self.check_popen_state = check_environment
2627
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2628
env_changes={'NON_EXISTANT_ENV_VAR':None})
2630
def test_working_dir(self):
2631
"""Test that we can specify the working dir for the child"""
2632
orig_getcwd = osutils.getcwd
2633
orig_chdir = os.chdir
2637
self.overrideAttr(os, 'chdir', chdir)
2640
self.overrideAttr(osutils, 'getcwd', getcwd)
2641
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2643
self.assertEqual(['foo', 'current'], chdirs)
2645
def test_get_bzr_path_with_cwd_bzrlib(self):
2646
self.get_source_path = lambda: ""
2647
self.overrideAttr(os.path, "isfile", lambda path: True)
2648
self.assertEqual(self.get_bzr_path(), "bzr")
2651
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2652
"""Tests that really need to do things with an external bzr."""
2654
def test_start_and_stop_bzr_subprocess_send_signal(self):
2655
"""finish_bzr_subprocess raises self.failureException if the retcode is
2656
not the expected one.
2658
self.disable_missing_extensions_warning()
2659
process = self.start_bzr_subprocess(['wait-until-signalled'],
2660
skip_if_plan_to_signal=True)
2661
self.assertEqual('running\n', process.stdout.readline())
2662
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2664
self.assertEqual('', result[0])
2665
self.assertEqual('bzr: interrupted\n', result[1])
2668
class TestSelftestFiltering(tests.TestCase):
2671
super(TestSelftestFiltering, self).setUp()
2672
self.suite = TestUtil.TestSuite()
2673
self.loader = TestUtil.TestLoader()
2674
self.suite.addTest(self.loader.loadTestsFromModule(
2675
sys.modules['bzrlib.tests.test_selftest']))
2676
self.all_names = _test_ids(self.suite)
2678
def test_condition_id_re(self):
2679
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2680
'test_condition_id_re')
2681
filtered_suite = tests.filter_suite_by_condition(
2682
self.suite, tests.condition_id_re('test_condition_id_re'))
2683
self.assertEqual([test_name], _test_ids(filtered_suite))
2685
def test_condition_id_in_list(self):
2686
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2687
'test_condition_id_in_list']
2688
id_list = tests.TestIdList(test_names)
2689
filtered_suite = tests.filter_suite_by_condition(
2690
self.suite, tests.condition_id_in_list(id_list))
2691
my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2692
re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2693
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2695
def test_condition_id_startswith(self):
2696
klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2697
start1 = klass + 'test_condition_id_starts'
2698
start2 = klass + 'test_condition_id_in'
2699
test_names = [ klass + 'test_condition_id_in_list',
2700
klass + 'test_condition_id_startswith',
2702
filtered_suite = tests.filter_suite_by_condition(
2703
self.suite, tests.condition_id_startswith([start1, start2]))
2704
self.assertEqual(test_names, _test_ids(filtered_suite))
2706
def test_condition_isinstance(self):
2707
filtered_suite = tests.filter_suite_by_condition(
2708
self.suite, tests.condition_isinstance(self.__class__))
2709
class_pattern = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2710
re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2711
self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2713
def test_exclude_tests_by_condition(self):
2714
excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2715
'test_exclude_tests_by_condition')
2716
filtered_suite = tests.exclude_tests_by_condition(self.suite,
2717
lambda x:x.id() == excluded_name)
2718
self.assertEqual(len(self.all_names) - 1,
2719
filtered_suite.countTestCases())
2720
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2721
remaining_names = list(self.all_names)
2722
remaining_names.remove(excluded_name)
2723
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2725
def test_exclude_tests_by_re(self):
2726
self.all_names = _test_ids(self.suite)
2727
filtered_suite = tests.exclude_tests_by_re(self.suite,
2728
'exclude_tests_by_re')
2729
excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2730
'test_exclude_tests_by_re')
2731
self.assertEqual(len(self.all_names) - 1,
2732
filtered_suite.countTestCases())
2733
self.assertFalse(excluded_name in _test_ids(filtered_suite))
2734
remaining_names = list(self.all_names)
2735
remaining_names.remove(excluded_name)
2736
self.assertEqual(remaining_names, _test_ids(filtered_suite))
2738
def test_filter_suite_by_condition(self):
2739
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2740
'test_filter_suite_by_condition')
2741
filtered_suite = tests.filter_suite_by_condition(self.suite,
2742
lambda x:x.id() == test_name)
2743
self.assertEqual([test_name], _test_ids(filtered_suite))
2745
def test_filter_suite_by_re(self):
2746
filtered_suite = tests.filter_suite_by_re(self.suite,
2747
'test_filter_suite_by_r')
2748
filtered_names = _test_ids(filtered_suite)
2749
self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.'
2750
'TestSelftestFiltering.test_filter_suite_by_re'])
2752
def test_filter_suite_by_id_list(self):
2753
test_list = ['bzrlib.tests.test_selftest.'
2754
'TestSelftestFiltering.test_filter_suite_by_id_list']
2755
filtered_suite = tests.filter_suite_by_id_list(
2756
self.suite, tests.TestIdList(test_list))
2757
filtered_names = _test_ids(filtered_suite)
2760
['bzrlib.tests.test_selftest.'
2761
'TestSelftestFiltering.test_filter_suite_by_id_list'])
2763
def test_filter_suite_by_id_startswith(self):
2764
# By design this test may fail if another test is added whose name also
2765
# begins with one of the start value used.
2766
klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
2767
start1 = klass + 'test_filter_suite_by_id_starts'
2768
start2 = klass + 'test_filter_suite_by_id_li'
2769
test_list = [klass + 'test_filter_suite_by_id_list',
2770
klass + 'test_filter_suite_by_id_startswith',
2772
filtered_suite = tests.filter_suite_by_id_startswith(
2773
self.suite, [start1, start2])
2776
_test_ids(filtered_suite),
2779
def test_preserve_input(self):
2780
# NB: Surely this is something in the stdlib to do this?
2781
self.assertTrue(self.suite is tests.preserve_input(self.suite))
2782
self.assertTrue("@#$" is tests.preserve_input("@#$"))
2784
def test_randomize_suite(self):
2785
randomized_suite = tests.randomize_suite(self.suite)
2786
# randomizing should not add or remove test names.
2787
self.assertEqual(set(_test_ids(self.suite)),
2788
set(_test_ids(randomized_suite)))
2789
# Technically, this *can* fail, because random.shuffle(list) can be
2790
# equal to list. Trying multiple times just pushes the frequency back.
2791
# As its len(self.all_names)!:1, the failure frequency should be low
2792
# enough to ignore. RBC 20071021.
2793
# It should change the order.
2794
self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2795
# But not the length. (Possibly redundant with the set test, but not
2797
self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2799
def test_split_suit_by_condition(self):
2800
self.all_names = _test_ids(self.suite)
2801
condition = tests.condition_id_re('test_filter_suite_by_r')
2802
split_suite = tests.split_suite_by_condition(self.suite, condition)
2803
filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2804
'test_filter_suite_by_re')
2805
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2806
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2807
remaining_names = list(self.all_names)
2808
remaining_names.remove(filtered_name)
2809
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2811
def test_split_suit_by_re(self):
2812
self.all_names = _test_ids(self.suite)
2813
split_suite = tests.split_suite_by_re(self.suite,
2814
'test_filter_suite_by_r')
2815
filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2816
'test_filter_suite_by_re')
2817
self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2818
self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2819
remaining_names = list(self.all_names)
2820
remaining_names.remove(filtered_name)
2821
self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2824
class TestCheckTreeShape(tests.TestCaseWithTransport):
2826
def test_check_tree_shape(self):
2827
files = ['a', 'b/', 'b/c']
2828
tree = self.make_branch_and_tree('.')
2829
self.build_tree(files)
2833
self.check_tree_shape(tree, files)
2838
class TestBlackboxSupport(tests.TestCase):
2839
"""Tests for testsuite blackbox features."""
2841
def test_run_bzr_failure_not_caught(self):
2842
# When we run bzr in blackbox mode, we want any unexpected errors to
2843
# propagate up to the test suite so that it can show the error in the
2844
# usual way, and we won't get a double traceback.
2845
e = self.assertRaises(
2847
self.run_bzr, ['assert-fail'])
2848
# make sure we got the real thing, not an error from somewhere else in
2849
# the test framework
2850
self.assertEqual('always fails', str(e))
2851
# check that there's no traceback in the test log
2852
self.assertNotContainsRe(self.get_log(), r'Traceback')
2854
def test_run_bzr_user_error_caught(self):
2855
# Running bzr in blackbox mode, normal/expected/user errors should be
2856
# caught in the regular way and turned into an error message plus exit
2858
transport_server = memory.MemoryServer()
2859
transport_server.start_server()
2860
self.addCleanup(transport_server.stop_server)
2861
url = transport_server.get_url()
2862
self.permit_url(url)
2863
out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2864
self.assertEqual(out, '')
2865
self.assertContainsRe(err,
2866
'bzr: ERROR: Not a branch: ".*nonexistantpath/".\n')
2869
class TestTestLoader(tests.TestCase):
2870
"""Tests for the test loader."""
2872
def _get_loader_and_module(self):
2873
"""Gets a TestLoader and a module with one test in it."""
2874
loader = TestUtil.TestLoader()
2876
class Stub(tests.TestCase):
2879
class MyModule(object):
2881
MyModule.a_class = Stub
2883
return loader, module
2885
def test_module_no_load_tests_attribute_loads_classes(self):
2886
loader, module = self._get_loader_and_module()
2887
self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
2889
def test_module_load_tests_attribute_gets_called(self):
2890
loader, module = self._get_loader_and_module()
2891
# 'self' is here because we're faking the module with a class. Regular
2892
# load_tests do not need that :)
2893
def load_tests(self, standard_tests, module, loader):
2894
result = loader.suiteClass()
2895
for test in tests.iter_suite_tests(standard_tests):
2896
result.addTests([test, test])
2898
# add a load_tests() method which multiplies the tests from the module.
2899
module.__class__.load_tests = load_tests
2900
self.assertEqual(2, loader.loadTestsFromModule(module).countTestCases())
2902
def test_load_tests_from_module_name_smoke_test(self):
2903
loader = TestUtil.TestLoader()
2904
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2905
self.assertEqual(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
2908
def test_load_tests_from_module_name_with_bogus_module_name(self):
2909
loader = TestUtil.TestLoader()
2910
self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
2913
class TestTestIdList(tests.TestCase):
2915
def _create_id_list(self, test_list):
2916
return tests.TestIdList(test_list)
2918
def _create_suite(self, test_id_list):
2920
class Stub(tests.TestCase):
2924
def _create_test_id(id):
2927
suite = TestUtil.TestSuite()
2928
for id in test_id_list:
2929
t = Stub('test_foo')
2930
t.id = _create_test_id(id)
2934
def _test_ids(self, test_suite):
2935
"""Get the ids for the tests in a test suite."""
2936
return [t.id() for t in tests.iter_suite_tests(test_suite)]
2938
def test_empty_list(self):
2939
id_list = self._create_id_list([])
2940
self.assertEqual({}, id_list.tests)
2941
self.assertEqual({}, id_list.modules)
2943
def test_valid_list(self):
2944
id_list = self._create_id_list(
2945
['mod1.cl1.meth1', 'mod1.cl1.meth2',
2946
'mod1.func1', 'mod1.cl2.meth2',
2948
'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
2950
self.assertTrue(id_list.refers_to('mod1'))
2951
self.assertTrue(id_list.refers_to('mod1.submod1'))
2952
self.assertTrue(id_list.refers_to('mod1.submod2'))
2953
self.assertTrue(id_list.includes('mod1.cl1.meth1'))
2954
self.assertTrue(id_list.includes('mod1.submod1'))
2955
self.assertTrue(id_list.includes('mod1.func1'))
2957
def test_bad_chars_in_params(self):
2958
id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
2959
self.assertTrue(id_list.refers_to('mod1'))
2960
self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
2962
def test_module_used(self):
2963
id_list = self._create_id_list(['mod.class.meth'])
2964
self.assertTrue(id_list.refers_to('mod'))
2965
self.assertTrue(id_list.refers_to('mod.class'))
2966
self.assertTrue(id_list.refers_to('mod.class.meth'))
2968
def test_test_suite_matches_id_list_with_unknown(self):
2969
loader = TestUtil.TestLoader()
2970
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2971
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',
2973
not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
2974
self.assertEqual(['bogus'], not_found)
2975
self.assertEqual([], duplicates)
2977
def test_suite_matches_id_list_with_duplicates(self):
2978
loader = TestUtil.TestLoader()
2979
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2980
dupes = loader.suiteClass()
2981
for test in tests.iter_suite_tests(suite):
2983
dupes.addTest(test) # Add it again
2985
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',]
2986
not_found, duplicates = tests.suite_matches_id_list(
2988
self.assertEqual([], not_found)
2989
self.assertEqual(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
2993
class TestTestSuite(tests.TestCase):
2995
def test__test_suite_testmod_names(self):
2996
# Test that a plausible list of test module names are returned
2997
# by _test_suite_testmod_names.
2998
test_list = tests._test_suite_testmod_names()
3000
'bzrlib.tests.blackbox',
3001
'bzrlib.tests.per_transport',
3002
'bzrlib.tests.test_selftest',
3006
def test__test_suite_modules_to_doctest(self):
3007
# Test that a plausible list of modules to doctest is returned
3008
# by _test_suite_modules_to_doctest.
3009
test_list = tests._test_suite_modules_to_doctest()
3011
# When docstrings are stripped, there are no modules to doctest
3012
self.assertEqual([], test_list)
3019
def test_test_suite(self):
3020
# test_suite() loads the entire test suite to operate. To avoid this
3021
# overhead, and yet still be confident that things are happening,
3022
# we temporarily replace two functions used by test_suite with
3023
# test doubles that supply a few sample tests to load, and check they
3026
def testmod_names():
3027
calls.append("testmod_names")
3029
'bzrlib.tests.blackbox.test_branch',
3030
'bzrlib.tests.per_transport',
3031
'bzrlib.tests.test_selftest',
3033
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3035
calls.append("modules_to_doctest")
3038
return ['bzrlib.timestamp']
3039
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3040
expected_test_list = [
3042
'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
3043
('bzrlib.tests.per_transport.TransportTests'
3044
'.test_abspath(LocalTransport,LocalURLServer)'),
3045
'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
3046
# plugins can't be tested that way since selftest may be run with
3049
if __doc__ is not None:
3050
expected_test_list.extend([
3051
# modules_to_doctest
3052
'bzrlib.timestamp.format_highres_date',
3054
suite = tests.test_suite()
3055
self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3057
self.assertSubset(expected_test_list, _test_ids(suite))
3059
def test_test_suite_list_and_start(self):
3060
# We cannot test this at the same time as the main load, because we want
3061
# to know that starting_with == None works. So a second load is
3062
# incurred - note that the starting_with parameter causes a partial load
3063
# rather than a full load so this test should be pretty quick.
3064
test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
3065
suite = tests.test_suite(test_list,
3066
['bzrlib.tests.test_selftest.TestTestSuite'])
3067
# test_test_suite_list_and_start is not included
3068
self.assertEqual(test_list, _test_ids(suite))
3071
class TestLoadTestIdList(tests.TestCaseInTempDir):
3073
def _create_test_list_file(self, file_name, content):
3074
fl = open(file_name, 'wt')
3078
def test_load_unknown(self):
3079
self.assertRaises(errors.NoSuchFile,
3080
tests.load_test_id_list, 'i_do_not_exist')
3082
def test_load_test_list(self):
3083
test_list_fname = 'test.list'
3084
self._create_test_list_file(test_list_fname,
3085
'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3086
tlist = tests.load_test_id_list(test_list_fname)
3087
self.assertEqual(2, len(tlist))
3088
self.assertEqual('mod1.cl1.meth1', tlist[0])
3089
self.assertEqual('mod2.cl2.meth2', tlist[1])
3091
def test_load_dirty_file(self):
3092
test_list_fname = 'test.list'
3093
self._create_test_list_file(test_list_fname,
3094
' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n'
3096
tlist = tests.load_test_id_list(test_list_fname)
3097
self.assertEqual(4, len(tlist))
3098
self.assertEqual('mod1.cl1.meth1', tlist[0])
3099
self.assertEqual('', tlist[1])
3100
self.assertEqual('mod2.cl2.meth2', tlist[2])
3101
self.assertEqual('bar baz', tlist[3])
3104
class TestFilteredByModuleTestLoader(tests.TestCase):
3106
def _create_loader(self, test_list):
3107
id_filter = tests.TestIdList(test_list)
3108
loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3111
def test_load_tests(self):
3112
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3113
loader = self._create_loader(test_list)
3114
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3115
self.assertEqual(test_list, _test_ids(suite))
3117
def test_exclude_tests(self):
3118
test_list = ['bogus']
3119
loader = self._create_loader(test_list)
3120
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3121
self.assertEqual([], _test_ids(suite))
3124
class TestFilteredByNameStartTestLoader(tests.TestCase):
3126
def _create_loader(self, name_start):
3127
def needs_module(name):
3128
return name.startswith(name_start) or name_start.startswith(name)
3129
loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3132
def test_load_tests(self):
3133
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3134
loader = self._create_loader('bzrlib.tests.test_samp')
3136
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3137
self.assertEqual(test_list, _test_ids(suite))
3139
def test_load_tests_inside_module(self):
3140
test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
3141
loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
3143
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3144
self.assertEqual(test_list, _test_ids(suite))
3146
def test_exclude_tests(self):
3147
test_list = ['bogus']
3148
loader = self._create_loader('bogus')
3150
suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
3151
self.assertEqual([], _test_ids(suite))
3154
class TestTestPrefixRegistry(tests.TestCase):
3156
def _get_registry(self):
3157
tp_registry = tests.TestPrefixAliasRegistry()
3160
def test_register_new_prefix(self):
3161
tpr = self._get_registry()
3162
tpr.register('foo', 'fff.ooo.ooo')
3163
self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3165
def test_register_existing_prefix(self):
3166
tpr = self._get_registry()
3167
tpr.register('bar', 'bbb.aaa.rrr')
3168
tpr.register('bar', 'bBB.aAA.rRR')
3169
self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3170
self.assertThat(self.get_log(),
3171
DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3174
def test_get_unknown_prefix(self):
3175
tpr = self._get_registry()
3176
self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3178
def test_resolve_prefix(self):
3179
tpr = self._get_registry()
3180
tpr.register('bar', 'bb.aa.rr')
3181
self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
3183
def test_resolve_unknown_alias(self):
3184
tpr = self._get_registry()
3185
self.assertRaises(errors.BzrCommandError,
3186
tpr.resolve_alias, 'I am not a prefix')
3188
def test_predefined_prefixes(self):
3189
tpr = tests.test_prefix_alias_registry
3190
self.assertEqual('bzrlib', tpr.resolve_alias('bzrlib'))
3191
self.assertEqual('bzrlib.doc', tpr.resolve_alias('bd'))
3192
self.assertEqual('bzrlib.utils', tpr.resolve_alias('bu'))
3193
self.assertEqual('bzrlib.tests', tpr.resolve_alias('bt'))
3194
self.assertEqual('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
3195
self.assertEqual('bzrlib.plugins', tpr.resolve_alias('bp'))
3198
class TestThreadLeakDetection(tests.TestCase):
3199
"""Ensure when tests leak threads we detect and report it"""
3201
class LeakRecordingResult(tests.ExtendedTestResult):
3203
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3205
def _report_thread_leak(self, test, leaks, alive):
3206
self.leaks.append((test, leaks))
3208
def test_testcase_without_addCleanups(self):
3209
"""Check old TestCase instances don't break with leak detection"""
3210
class Test(unittest.TestCase):
3213
result = self.LeakRecordingResult()
3215
result.startTestRun()
3217
result.stopTestRun()
3218
self.assertEqual(result._tests_leaking_threads_count, 0)
3219
self.assertEqual(result.leaks, [])
3221
def test_thread_leak(self):
3222
"""Ensure a thread that outlives the running of a test is reported
3224
Uses a thread that blocks on an event, and is started by the inner
3225
test case. As the thread outlives the inner case's run, it should be
3226
detected as a leak, but the event is then set so that the thread can
3227
be safely joined in cleanup so it's not leaked for real.
3229
event = threading.Event()
3230
thread = threading.Thread(name="Leaker", target=event.wait)
3231
class Test(tests.TestCase):
3232
def test_leak(self):
3234
result = self.LeakRecordingResult()
3235
test = Test("test_leak")
3236
self.addCleanup(thread.join)
3237
self.addCleanup(event.set)
3238
result.startTestRun()
3240
result.stopTestRun()
3241
self.assertEqual(result._tests_leaking_threads_count, 1)
3242
self.assertEqual(result._first_thread_leaker_id, test.id())
3243
self.assertEqual(result.leaks, [(test, set([thread]))])
3244
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3246
def test_multiple_leaks(self):
3247
"""Check multiple leaks are blamed on the test cases at fault
3249
Same concept as the previous test, but has one inner test method that
3250
leaks two threads, and one that doesn't leak at all.
3252
event = threading.Event()
3253
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3254
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3255
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3256
class Test(tests.TestCase):
3257
def test_first_leak(self):
3259
def test_second_no_leak(self):
3261
def test_third_leak(self):
3264
result = self.LeakRecordingResult()
3265
first_test = Test("test_first_leak")
3266
third_test = Test("test_third_leak")
3267
self.addCleanup(thread_a.join)
3268
self.addCleanup(thread_b.join)
3269
self.addCleanup(thread_c.join)
3270
self.addCleanup(event.set)
3271
result.startTestRun()
3273
[first_test, Test("test_second_no_leak"), third_test]
3275
result.stopTestRun()
3276
self.assertEqual(result._tests_leaking_threads_count, 2)
3277
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3278
self.assertEqual(result.leaks, [
3279
(first_test, set([thread_b])),
3280
(third_test, set([thread_a, thread_c]))])
3281
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3284
class TestPostMortemDebugging(tests.TestCase):
3285
"""Check post mortem debugging works when tests fail or error"""
3287
class TracebackRecordingResult(tests.ExtendedTestResult):
3289
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3290
self.postcode = None
3291
def _post_mortem(self, tb=None):
3292
"""Record the code object at the end of the current traceback"""
3293
tb = tb or sys.exc_info()[2]
3296
while next is not None:
3299
self.postcode = tb.tb_frame.f_code
3300
def report_error(self, test, err):
3302
def report_failure(self, test, err):
3305
def test_location_unittest_error(self):
3306
"""Needs right post mortem traceback with erroring unittest case"""
3307
class Test(unittest.TestCase):
3310
result = self.TracebackRecordingResult()
3312
self.assertEqual(result.postcode, Test.runTest.func_code)
3314
def test_location_unittest_failure(self):
3315
"""Needs right post mortem traceback with failing unittest case"""
3316
class Test(unittest.TestCase):
3318
raise self.failureException
3319
result = self.TracebackRecordingResult()
3321
self.assertEqual(result.postcode, Test.runTest.func_code)
3323
def test_location_bt_error(self):
3324
"""Needs right post mortem traceback with erroring bzrlib.tests case"""
3325
class Test(tests.TestCase):
3326
def test_error(self):
3328
result = self.TracebackRecordingResult()
3329
Test("test_error").run(result)
3330
self.assertEqual(result.postcode, Test.test_error.func_code)
3332
def test_location_bt_failure(self):
3333
"""Needs right post mortem traceback with failing bzrlib.tests case"""
3334
class Test(tests.TestCase):
3335
def test_failure(self):
3336
raise self.failureException
3337
result = self.TracebackRecordingResult()
3338
Test("test_failure").run(result)
3339
self.assertEqual(result.postcode, Test.test_failure.func_code)
3341
def test_env_var_triggers_post_mortem(self):
3342
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3344
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3345
post_mortem_calls = []
3346
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3347
self.overrideEnv('BZR_TEST_PDB', None)
3348
result._post_mortem(1)
3349
self.overrideEnv('BZR_TEST_PDB', 'on')
3350
result._post_mortem(2)
3351
self.assertEqual([2], post_mortem_calls)
3354
class TestRunSuite(tests.TestCase):
3356
def test_runner_class(self):
3357
"""run_suite accepts and uses a runner_class keyword argument."""
3358
class Stub(tests.TestCase):
3361
suite = Stub("test_foo")
3363
class MyRunner(tests.TextTestRunner):
3364
def run(self, test):
3366
return tests.ExtendedTestResult(self.stream, self.descriptions,
3368
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3369
self.assertLength(1, calls)
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3438
"""Check a test case still alive after being run emits a warning"""
3440
class Test(tests.TestCase):
3441
def test_pass(self):
3443
def test_self_ref(self):
3444
self.also_self = self.test_self_ref
3445
def test_skip(self):
3446
self.skip("Don't need")
3448
def _get_suite(self):
3449
return TestUtil.TestSuite([
3450
self.Test("test_pass"),
3451
self.Test("test_self_ref"),
3452
self.Test("test_skip"),
3455
def _run_selftest_with_suite(self, **kwargs):
3456
old_flags = tests.selftest_debug_flags
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3458
gc_on = gc.isenabled()
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3467
tests.selftest_debug_flags = old_flags
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3472
def test_testsuite(self):
3473
self._run_selftest_with_suite()
3475
def test_pattern(self):
3476
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3477
self.assertNotContainsRe(out, "test_skip")
3479
def test_exclude_pattern(self):
3480
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3481
self.assertNotContainsRe(out, "test_skip")
3483
def test_random_seed(self):
3484
self._run_selftest_with_suite(random_seed="now")
3486
def test_matching_tests_first(self):
3487
self._run_selftest_with_suite(matching_tests_first=True,
3488
pattern="test_self_ref$")
3490
def test_starting_with_and_exclude(self):
3491
out = self._run_selftest_with_suite(starting_with=["bt."],
3492
exclude_pattern="test_skip$")
3493
self.assertNotContainsRe(out, "test_skip")
3495
def test_additonal_decorator(self):
3496
out = self._run_selftest_with_suite(
3497
suite_decorators=[tests.TestDecorator])
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted with subunit"""
3503
_test_needs_features = [features.subunit]
3505
def _run_selftest_with_suite(self, **kwargs):
3506
return TestUncollectedWarnings._run_selftest_with_suite(self,
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3511
"""Check warnings from tests staying alive are emitted when forking"""
3514
class TestEnvironHandling(tests.TestCase):
3516
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3517
self.assertFalse('MYVAR' in os.environ)
3518
self.overrideEnv('MYVAR', '42')
3519
# We use an embedded test to make sure we fix the _captureVar bug
3520
class Test(tests.TestCase):
3522
# The first call save the 42 value
3523
self.overrideEnv('MYVAR', None)
3524
self.assertEqual(None, os.environ.get('MYVAR'))
3525
# Make sure we can call it twice
3526
self.overrideEnv('MYVAR', None)
3527
self.assertEqual(None, os.environ.get('MYVAR'))
3529
result = tests.TextTestResult(output, 0, 1)
3530
Test('test_me').run(result)
3531
if not result.wasStrictlySuccessful():
3532
self.fail(output.getvalue())
3533
# We get our value back
3534
self.assertEqual('42', os.environ.get('MYVAR'))
3537
class TestIsolatedEnv(tests.TestCase):
3538
"""Test isolating tests from os.environ.
3540
Since we use tests that are already isolated from os.environ a bit of care
3541
should be taken when designing the tests to avoid bootstrap side-effects.
3542
The tests start an already clean os.environ which allow doing valid
3543
assertions about which variables are present or not and design tests around
3547
class ScratchMonkey(tests.TestCase):
3552
def test_basics(self):
3553
# Make sure we know the definition of BZR_HOME: not part of os.environ
3554
# for tests.TestCase.
3555
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3556
self.assertEqual(None, tests.isolated_environ['BZR_HOME'])
3557
# Being part of isolated_environ, BZR_HOME should not appear here
3558
self.assertFalse('BZR_HOME' in os.environ)
3559
# Make sure we know the definition of LINES: part of os.environ for
3561
self.assertTrue('LINES' in tests.isolated_environ)
3562
self.assertEqual('25', tests.isolated_environ['LINES'])
3563
self.assertEqual('25', os.environ['LINES'])
3565
def test_injecting_unknown_variable(self):
3566
# BZR_HOME is known to be absent from os.environ
3567
test = self.ScratchMonkey('test_me')
3568
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3569
self.assertEqual('foo', os.environ['BZR_HOME'])
3570
tests.restore_os_environ(test)
3571
self.assertFalse('BZR_HOME' in os.environ)
3573
def test_injecting_known_variable(self):
3574
test = self.ScratchMonkey('test_me')
3575
# LINES is known to be present in os.environ
3576
tests.override_os_environ(test, {'LINES': '42'})
3577
self.assertEqual('42', os.environ['LINES'])
3578
tests.restore_os_environ(test)
3579
self.assertEqual('25', os.environ['LINES'])
3581
def test_deleting_variable(self):
3582
test = self.ScratchMonkey('test_me')
3583
# LINES is known to be present in os.environ
3584
tests.override_os_environ(test, {'LINES': None})
3585
self.assertTrue('LINES' not in os.environ)
3586
tests.restore_os_environ(test)
3587
self.assertEqual('25', os.environ['LINES'])
3590
class TestDocTestSuiteIsolation(tests.TestCase):
3591
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3593
Since tests.TestCase alreay provides an isolation from os.environ, we use
3594
the clean environment as a base for testing. To precisely capture the
3595
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3598
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3599
not `os.environ` so each test overrides it to suit its needs.
3603
def get_doctest_suite_for_string(self, klass, string):
3604
class Finder(doctest.DocTestFinder):
3606
def find(*args, **kwargs):
3607
test = doctest.DocTestParser().get_doctest(
3608
string, {}, 'foo', 'foo.py', 0)
3611
suite = klass(test_finder=Finder())
3614
def run_doctest_suite_for_string(self, klass, string):
3615
suite = self.get_doctest_suite_for_string(klass, string)
3617
result = tests.TextTestResult(output, 0, 1)
3619
return result, output
3621
def assertDocTestStringSucceds(self, klass, string):
3622
result, output = self.run_doctest_suite_for_string(klass, string)
3623
if not result.wasStrictlySuccessful():
3624
self.fail(output.getvalue())
3626
def assertDocTestStringFails(self, klass, string):
3627
result, output = self.run_doctest_suite_for_string(klass, string)
3628
if result.wasStrictlySuccessful():
3629
self.fail(output.getvalue())
3631
def test_injected_variable(self):
3632
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3635
>>> os.environ['LINES']
3638
# doctest.DocTestSuite fails as it sees '25'
3639
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3640
# tests.DocTestSuite sees '42'
3641
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3643
def test_deleted_variable(self):
3644
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3647
>>> os.environ.get('LINES')
3649
# doctest.DocTestSuite fails as it sees '25'
3650
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3651
# tests.DocTestSuite sees None
3652
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3655
class TestSelftestExcludePatterns(tests.TestCase):
3658
super(TestSelftestExcludePatterns, self).setUp()
3659
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3661
def suite_factory(self, keep_only=None, starting_with=None):
3662
"""A test suite factory with only a few tests."""
3663
class Test(tests.TestCase):
3665
# We don't need the full class path
3666
return self._testMethodName
3673
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3675
def assertTestList(self, expected, *selftest_args):
3676
# We rely on setUp installing the right test suite factory so we can
3677
# test at the command level without loading the whole test suite
3678
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3679
actual = out.splitlines()
3680
self.assertEqual(expected, actual)
3682
def test_full_list(self):
3683
self.assertTestList(['a', 'b', 'c'])
3685
def test_single_exclude(self):
3686
self.assertTestList(['b', 'c'], '-x', 'a')
3688
def test_mutiple_excludes(self):
3689
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3692
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3694
_test_needs_features = [features.subunit]
3697
super(TestCounterHooks, self).setUp()
3698
class Test(tests.TestCase):
3701
super(Test, self).setUp()
3702
self.hooks = hooks.Hooks()
3703
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3704
self.install_counter_hook(self.hooks, 'myhook')
3709
def run_hook_once(self):
3710
for hook in self.hooks['myhook']:
3713
self.test_class = Test
3715
def assertHookCalls(self, expected_calls, test_name):
3716
test = self.test_class(test_name)
3717
result = unittest.TestResult()
3719
self.assertTrue(hasattr(test, '_counters'))
3720
self.assertTrue(test._counters.has_key('myhook'))
3721
self.assertEqual(expected_calls, test._counters['myhook'])
3723
def test_no_hook(self):
3724
self.assertHookCalls(0, 'no_hook')
3726
def test_run_hook_once(self):
3727
tt = features.testtools
3728
if tt.module.__version__ < (0, 9, 8):
3729
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3730
self.assertHookCalls(1, 'run_hook_once')