~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

Merge bzr.dev to resolve conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the test framework."""
 
18
 
 
19
from cStringIO import StringIO
 
20
import gc
 
21
import doctest
 
22
import os
 
23
import signal
 
24
import sys
 
25
import threading
 
26
import time
 
27
import unittest
 
28
import warnings
 
29
 
 
30
from testtools import (
 
31
    ExtendedToOriginalDecorator,
 
32
    MultiTestResult,
 
33
    )
 
34
from testtools.content import Content
 
35
from testtools.content_type import ContentType
 
36
from testtools.matchers import (
 
37
    DocTestMatches,
 
38
    Equals,
 
39
    )
 
40
import testtools.testresult.doubles
 
41
 
 
42
import bzrlib
 
43
from bzrlib import (
 
44
    branchbuilder,
 
45
    bzrdir,
 
46
    errors,
 
47
    lockdir,
 
48
    memorytree,
 
49
    osutils,
 
50
    remote,
 
51
    repository,
 
52
    symbol_versioning,
 
53
    tests,
 
54
    transport,
 
55
    workingtree,
 
56
    workingtree_3,
 
57
    workingtree_4,
 
58
    )
 
59
from bzrlib.repofmt import (
 
60
    groupcompress_repo,
 
61
    )
 
62
from bzrlib.symbol_versioning import (
 
63
    deprecated_function,
 
64
    deprecated_in,
 
65
    deprecated_method,
 
66
    )
 
67
from bzrlib.tests import (
 
68
    features,
 
69
    test_lsprof,
 
70
    test_server,
 
71
    TestUtil,
 
72
    )
 
73
from bzrlib.trace import note, mutter
 
74
from bzrlib.transport import memory
 
75
 
 
76
 
 
77
def _test_ids(test_suite):
 
78
    """Get the ids for the tests in a test suite."""
 
79
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
80
 
 
81
 
 
82
class MetaTestLog(tests.TestCase):
 
83
 
 
84
    def test_logging(self):
 
85
        """Test logs are captured when a test fails."""
 
86
        self.log('a test message')
 
87
        details = self.getDetails()
 
88
        log = details['log']
 
89
        self.assertThat(log.content_type, Equals(ContentType(
 
90
            "text", "plain", {"charset": "utf8"})))
 
91
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
 
92
        self.assertThat(self.get_log(),
 
93
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
 
94
 
 
95
 
 
96
class TestUnicodeFilename(tests.TestCase):
 
97
 
 
98
    def test_probe_passes(self):
 
99
        """UnicodeFilename._probe passes."""
 
100
        # We can't test much more than that because the behaviour depends
 
101
        # on the platform.
 
102
        tests.UnicodeFilename._probe()
 
103
 
 
104
 
 
105
class TestTreeShape(tests.TestCaseInTempDir):
 
106
 
 
107
    def test_unicode_paths(self):
 
108
        self.requireFeature(tests.UnicodeFilename)
 
109
 
 
110
        filename = u'hell\u00d8'
 
111
        self.build_tree_contents([(filename, 'contents of hello')])
 
112
        self.assertPathExists(filename)
 
113
 
 
114
 
 
115
class TestClassesAvailable(tests.TestCase):
 
116
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
117
 
 
118
    def test_test_case(self):
 
119
        from bzrlib.tests import TestCase
 
120
 
 
121
    def test_test_loader(self):
 
122
        from bzrlib.tests import TestLoader
 
123
 
 
124
    def test_test_suite(self):
 
125
        from bzrlib.tests import TestSuite
 
126
 
 
127
 
 
128
class TestTransportScenarios(tests.TestCase):
 
129
    """A group of tests that test the transport implementation adaption core.
 
130
 
 
131
    This is a meta test that the tests are applied to all available
 
132
    transports.
 
133
 
 
134
    This will be generalised in the future which is why it is in this
 
135
    test file even though it is specific to transport tests at the moment.
 
136
    """
 
137
 
 
138
    def test_get_transport_permutations(self):
 
139
        # this checks that get_test_permutations defined by the module is
 
140
        # called by the get_transport_test_permutations function.
 
141
        class MockModule(object):
 
142
            def get_test_permutations(self):
 
143
                return sample_permutation
 
144
        sample_permutation = [(1,2), (3,4)]
 
145
        from bzrlib.tests.per_transport import get_transport_test_permutations
 
146
        self.assertEqual(sample_permutation,
 
147
                         get_transport_test_permutations(MockModule()))
 
148
 
 
149
    def test_scenarios_include_all_modules(self):
 
150
        # this checks that the scenario generator returns as many permutations
 
151
        # as there are in all the registered transport modules - we assume if
 
152
        # this matches its probably doing the right thing especially in
 
153
        # combination with the tests for setting the right classes below.
 
154
        from bzrlib.tests.per_transport import transport_test_permutations
 
155
        from bzrlib.transport import _get_transport_modules
 
156
        modules = _get_transport_modules()
 
157
        permutation_count = 0
 
158
        for module in modules:
 
159
            try:
 
160
                permutation_count += len(reduce(getattr,
 
161
                    (module + ".get_test_permutations").split('.')[1:],
 
162
                     __import__(module))())
 
163
            except errors.DependencyNotPresent:
 
164
                pass
 
165
        scenarios = transport_test_permutations()
 
166
        self.assertEqual(permutation_count, len(scenarios))
 
167
 
 
168
    def test_scenarios_include_transport_class(self):
 
169
        # This test used to know about all the possible transports and the
 
170
        # order they were returned but that seems overly brittle (mbp
 
171
        # 20060307)
 
172
        from bzrlib.tests.per_transport import transport_test_permutations
 
173
        scenarios = transport_test_permutations()
 
174
        # there are at least that many builtin transports
 
175
        self.assertTrue(len(scenarios) > 6)
 
176
        one_scenario = scenarios[0]
 
177
        self.assertIsInstance(one_scenario[0], str)
 
178
        self.assertTrue(issubclass(one_scenario[1]["transport_class"],
 
179
                                   bzrlib.transport.Transport))
 
180
        self.assertTrue(issubclass(one_scenario[1]["transport_server"],
 
181
                                   bzrlib.transport.Server))
 
182
 
 
183
 
 
184
class TestBranchScenarios(tests.TestCase):
 
185
 
 
186
    def test_scenarios(self):
 
187
        # check that constructor parameters are passed through to the adapted
 
188
        # test.
 
189
        from bzrlib.tests.per_branch import make_scenarios
 
190
        server1 = "a"
 
191
        server2 = "b"
 
192
        formats = [("c", "C"), ("d", "D")]
 
193
        scenarios = make_scenarios(server1, server2, formats)
 
194
        self.assertEqual(2, len(scenarios))
 
195
        self.assertEqual([
 
196
            ('str',
 
197
             {'branch_format': 'c',
 
198
              'bzrdir_format': 'C',
 
199
              'transport_readonly_server': 'b',
 
200
              'transport_server': 'a'}),
 
201
            ('str',
 
202
             {'branch_format': 'd',
 
203
              'bzrdir_format': 'D',
 
204
              'transport_readonly_server': 'b',
 
205
              'transport_server': 'a'})],
 
206
            scenarios)
 
207
 
 
208
 
 
209
class TestBzrDirScenarios(tests.TestCase):
 
210
 
 
211
    def test_scenarios(self):
 
212
        # check that constructor parameters are passed through to the adapted
 
213
        # test.
 
214
        from bzrlib.tests.per_controldir import make_scenarios
 
215
        vfs_factory = "v"
 
216
        server1 = "a"
 
217
        server2 = "b"
 
218
        formats = ["c", "d"]
 
219
        scenarios = make_scenarios(vfs_factory, server1, server2, formats)
 
220
        self.assertEqual([
 
221
            ('str',
 
222
             {'bzrdir_format': 'c',
 
223
              'transport_readonly_server': 'b',
 
224
              'transport_server': 'a',
 
225
              'vfs_transport_factory': 'v'}),
 
226
            ('str',
 
227
             {'bzrdir_format': 'd',
 
228
              'transport_readonly_server': 'b',
 
229
              'transport_server': 'a',
 
230
              'vfs_transport_factory': 'v'})],
 
231
            scenarios)
 
232
 
 
233
 
 
234
class TestRepositoryScenarios(tests.TestCase):
 
235
 
 
236
    def test_formats_to_scenarios(self):
 
237
        from bzrlib.tests.per_repository import formats_to_scenarios
 
238
        formats = [("(c)", remote.RemoteRepositoryFormat()),
 
239
                   ("(d)", repository.format_registry.get(
 
240
                    'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
 
241
        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
242
            None)
 
243
        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
 
244
            vfs_transport_factory="vfs")
 
245
        # no_vfs generate scenarios without vfs_transport_factory
 
246
        expected = [
 
247
            ('RemoteRepositoryFormat(c)',
 
248
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
249
              'repository_format': remote.RemoteRepositoryFormat(),
 
250
              'transport_readonly_server': 'readonly',
 
251
              'transport_server': 'server'}),
 
252
            ('RepositoryFormat2a(d)',
 
253
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
254
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
255
              'transport_readonly_server': 'readonly',
 
256
              'transport_server': 'server'})]
 
257
        self.assertEqual(expected, no_vfs_scenarios)
 
258
        self.assertEqual([
 
259
            ('RemoteRepositoryFormat(c)',
 
260
             {'bzrdir_format': remote.RemoteBzrDirFormat(),
 
261
              'repository_format': remote.RemoteRepositoryFormat(),
 
262
              'transport_readonly_server': 'readonly',
 
263
              'transport_server': 'server',
 
264
              'vfs_transport_factory': 'vfs'}),
 
265
            ('RepositoryFormat2a(d)',
 
266
             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
 
267
              'repository_format': groupcompress_repo.RepositoryFormat2a(),
 
268
              'transport_readonly_server': 'readonly',
 
269
              'transport_server': 'server',
 
270
              'vfs_transport_factory': 'vfs'})],
 
271
            vfs_scenarios)
 
272
 
 
273
 
 
274
class TestTestScenarioApplication(tests.TestCase):
 
275
    """Tests for the test adaption facilities."""
 
276
 
 
277
    def test_apply_scenario(self):
 
278
        from bzrlib.tests import apply_scenario
 
279
        input_test = TestTestScenarioApplication("test_apply_scenario")
 
280
        # setup two adapted tests
 
281
        adapted_test1 = apply_scenario(input_test,
 
282
            ("new id",
 
283
            {"bzrdir_format":"bzr_format",
 
284
             "repository_format":"repo_fmt",
 
285
             "transport_server":"transport_server",
 
286
             "transport_readonly_server":"readonly-server"}))
 
287
        adapted_test2 = apply_scenario(input_test,
 
288
            ("new id 2", {"bzrdir_format":None}))
 
289
        # input_test should have been altered.
 
290
        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
 
291
        # the new tests are mutually incompatible, ensuring it has
 
292
        # made new ones, and unspecified elements in the scenario
 
293
        # should not have been altered.
 
294
        self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
 
295
        self.assertEqual("repo_fmt", adapted_test1.repository_format)
 
296
        self.assertEqual("transport_server", adapted_test1.transport_server)
 
297
        self.assertEqual("readonly-server",
 
298
            adapted_test1.transport_readonly_server)
 
299
        self.assertEqual(
 
300
            "bzrlib.tests.test_selftest.TestTestScenarioApplication."
 
301
            "test_apply_scenario(new id)",
 
302
            adapted_test1.id())
 
303
        self.assertEqual(None, adapted_test2.bzrdir_format)
 
304
        self.assertEqual(
 
305
            "bzrlib.tests.test_selftest.TestTestScenarioApplication."
 
306
            "test_apply_scenario(new id 2)",
 
307
            adapted_test2.id())
 
308
 
 
309
 
 
310
class TestInterRepositoryScenarios(tests.TestCase):
 
311
 
 
312
    def test_scenarios(self):
 
313
        # check that constructor parameters are passed through to the adapted
 
314
        # test.
 
315
        from bzrlib.tests.per_interrepository import make_scenarios
 
316
        server1 = "a"
 
317
        server2 = "b"
 
318
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
 
319
        scenarios = make_scenarios(server1, server2, formats)
 
320
        self.assertEqual([
 
321
            ('C0,str,str',
 
322
             {'repository_format': 'C1',
 
323
              'repository_format_to': 'C2',
 
324
              'transport_readonly_server': 'b',
 
325
              'transport_server': 'a',
 
326
              'extra_setup': 'C3'}),
 
327
            ('D0,str,str',
 
328
             {'repository_format': 'D1',
 
329
              'repository_format_to': 'D2',
 
330
              'transport_readonly_server': 'b',
 
331
              'transport_server': 'a',
 
332
              'extra_setup': 'D3'})],
 
333
            scenarios)
 
334
 
 
335
 
 
336
class TestWorkingTreeScenarios(tests.TestCase):
 
337
 
 
338
    def test_scenarios(self):
 
339
        # check that constructor parameters are passed through to the adapted
 
340
        # test.
 
341
        from bzrlib.tests.per_workingtree import make_scenarios
 
342
        server1 = "a"
 
343
        server2 = "b"
 
344
        formats = [workingtree_4.WorkingTreeFormat4(),
 
345
                   workingtree_3.WorkingTreeFormat3(),]
 
346
        scenarios = make_scenarios(server1, server2, formats)
 
347
        self.assertEqual([
 
348
            ('WorkingTreeFormat4',
 
349
             {'bzrdir_format': formats[0]._matchingbzrdir,
 
350
              'transport_readonly_server': 'b',
 
351
              'transport_server': 'a',
 
352
              'workingtree_format': formats[0]}),
 
353
            ('WorkingTreeFormat3',
 
354
             {'bzrdir_format': formats[1]._matchingbzrdir,
 
355
              'transport_readonly_server': 'b',
 
356
              'transport_server': 'a',
 
357
              'workingtree_format': formats[1]})],
 
358
            scenarios)
 
359
 
 
360
 
 
361
class TestTreeScenarios(tests.TestCase):
 
362
 
 
363
    def test_scenarios(self):
 
364
        # the tree implementation scenario generator is meant to setup one
 
365
        # instance for each working tree format, and one additional instance
 
366
        # that will use the default wt format, but create a revision tree for
 
367
        # the tests.  this means that the wt ones should have the
 
368
        # workingtree_to_test_tree attribute set to 'return_parameter' and the
 
369
        # revision one set to revision_tree_from_workingtree.
 
370
 
 
371
        from bzrlib.tests.per_tree import (
 
372
            _dirstate_tree_from_workingtree,
 
373
            make_scenarios,
 
374
            preview_tree_pre,
 
375
            preview_tree_post,
 
376
            return_parameter,
 
377
            revision_tree_from_workingtree
 
378
            )
 
379
        server1 = "a"
 
380
        server2 = "b"
 
381
        formats = [workingtree_4.WorkingTreeFormat4(),
 
382
                   workingtree_3.WorkingTreeFormat3(),]
 
383
        scenarios = make_scenarios(server1, server2, formats)
 
384
        self.assertEqual(7, len(scenarios))
 
385
        default_wt_format = workingtree.format_registry.get_default()
 
386
        wt4_format = workingtree_4.WorkingTreeFormat4()
 
387
        wt5_format = workingtree_4.WorkingTreeFormat5()
 
388
        expected_scenarios = [
 
389
            ('WorkingTreeFormat4',
 
390
             {'bzrdir_format': formats[0]._matchingbzrdir,
 
391
              'transport_readonly_server': 'b',
 
392
              'transport_server': 'a',
 
393
              'workingtree_format': formats[0],
 
394
              '_workingtree_to_test_tree': return_parameter,
 
395
              }),
 
396
            ('WorkingTreeFormat3',
 
397
             {'bzrdir_format': formats[1]._matchingbzrdir,
 
398
              'transport_readonly_server': 'b',
 
399
              'transport_server': 'a',
 
400
              'workingtree_format': formats[1],
 
401
              '_workingtree_to_test_tree': return_parameter,
 
402
             }),
 
403
            ('RevisionTree',
 
404
             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
 
405
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
406
              'transport_readonly_server': 'b',
 
407
              'transport_server': 'a',
 
408
              'workingtree_format': default_wt_format,
 
409
             }),
 
410
            ('DirStateRevisionTree,WT4',
 
411
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
412
              'bzrdir_format': wt4_format._matchingbzrdir,
 
413
              'transport_readonly_server': 'b',
 
414
              'transport_server': 'a',
 
415
              'workingtree_format': wt4_format,
 
416
             }),
 
417
            ('DirStateRevisionTree,WT5',
 
418
             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
 
419
              'bzrdir_format': wt5_format._matchingbzrdir,
 
420
              'transport_readonly_server': 'b',
 
421
              'transport_server': 'a',
 
422
              'workingtree_format': wt5_format,
 
423
             }),
 
424
            ('PreviewTree',
 
425
             {'_workingtree_to_test_tree': preview_tree_pre,
 
426
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
427
              'transport_readonly_server': 'b',
 
428
              'transport_server': 'a',
 
429
              'workingtree_format': default_wt_format}),
 
430
            ('PreviewTreePost',
 
431
             {'_workingtree_to_test_tree': preview_tree_post,
 
432
              'bzrdir_format': default_wt_format._matchingbzrdir,
 
433
              'transport_readonly_server': 'b',
 
434
              'transport_server': 'a',
 
435
              'workingtree_format': default_wt_format}),
 
436
             ]
 
437
        self.assertEqual(expected_scenarios, scenarios)
 
438
 
 
439
 
 
440
class TestInterTreeScenarios(tests.TestCase):
 
441
    """A group of tests that test the InterTreeTestAdapter."""
 
442
 
 
443
    def test_scenarios(self):
 
444
        # check that constructor parameters are passed through to the adapted
 
445
        # test.
 
446
        # for InterTree tests we want the machinery to bring up two trees in
 
447
        # each instance: the base one, and the one we are interacting with.
 
448
        # because each optimiser can be direction specific, we need to test
 
449
        # each optimiser in its chosen direction.
 
450
        # unlike the TestProviderAdapter we dont want to automatically add a
 
451
        # parameterized one for WorkingTree - the optimisers will tell us what
 
452
        # ones to add.
 
453
        from bzrlib.tests.per_tree import (
 
454
            return_parameter,
 
455
            )
 
456
        from bzrlib.tests.per_intertree import (
 
457
            make_scenarios,
 
458
            )
 
459
        from bzrlib.workingtree_3 import WorkingTreeFormat3
 
460
        from bzrlib.workingtree_4 import WorkingTreeFormat4
 
461
        input_test = TestInterTreeScenarios(
 
462
            "test_scenarios")
 
463
        server1 = "a"
 
464
        server2 = "b"
 
465
        format1 = WorkingTreeFormat4()
 
466
        format2 = WorkingTreeFormat3()
 
467
        formats = [("1", str, format1, format2, "converter1"),
 
468
            ("2", int, format2, format1, "converter2")]
 
469
        scenarios = make_scenarios(server1, server2, formats)
 
470
        self.assertEqual(2, len(scenarios))
 
471
        expected_scenarios = [
 
472
            ("1", {
 
473
                "bzrdir_format": format1._matchingbzrdir,
 
474
                "intertree_class": formats[0][1],
 
475
                "workingtree_format": formats[0][2],
 
476
                "workingtree_format_to": formats[0][3],
 
477
                "mutable_trees_to_test_trees": formats[0][4],
 
478
                "_workingtree_to_test_tree": return_parameter,
 
479
                "transport_server": server1,
 
480
                "transport_readonly_server": server2,
 
481
                }),
 
482
            ("2", {
 
483
                "bzrdir_format": format2._matchingbzrdir,
 
484
                "intertree_class": formats[1][1],
 
485
                "workingtree_format": formats[1][2],
 
486
                "workingtree_format_to": formats[1][3],
 
487
                "mutable_trees_to_test_trees": formats[1][4],
 
488
                "_workingtree_to_test_tree": return_parameter,
 
489
                "transport_server": server1,
 
490
                "transport_readonly_server": server2,
 
491
                }),
 
492
            ]
 
493
        self.assertEqual(scenarios, expected_scenarios)
 
494
 
 
495
 
 
496
class TestTestCaseInTempDir(tests.TestCaseInTempDir):
 
497
 
 
498
    def test_home_is_not_working(self):
 
499
        self.assertNotEqual(self.test_dir, self.test_home_dir)
 
500
        cwd = osutils.getcwd()
 
501
        self.assertIsSameRealPath(self.test_dir, cwd)
 
502
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
503
 
 
504
    def test_assertEqualStat_equal(self):
 
505
        from bzrlib.tests.test_dirstate import _FakeStat
 
506
        self.build_tree(["foo"])
 
507
        real = os.lstat("foo")
 
508
        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
 
509
            real.st_dev, real.st_ino, real.st_mode)
 
510
        self.assertEqualStat(real, fake)
 
511
 
 
512
    def test_assertEqualStat_notequal(self):
 
513
        self.build_tree(["foo", "longname"])
 
514
        self.assertRaises(AssertionError, self.assertEqualStat,
 
515
            os.lstat("foo"), os.lstat("longname"))
 
516
 
 
517
    def test_failUnlessExists(self):
 
518
        """Deprecated failUnlessExists and failIfExists"""
 
519
        self.applyDeprecated(
 
520
            deprecated_in((2, 4)),
 
521
            self.failUnlessExists, '.')
 
522
        self.build_tree(['foo/', 'foo/bar'])
 
523
        self.applyDeprecated(
 
524
            deprecated_in((2, 4)),
 
525
            self.failUnlessExists, 'foo/bar')
 
526
        self.applyDeprecated(
 
527
            deprecated_in((2, 4)),
 
528
            self.failIfExists, 'foo/foo')
 
529
 
 
530
    def test_assertPathExists(self):
 
531
        self.assertPathExists('.')
 
532
        self.build_tree(['foo/', 'foo/bar'])
 
533
        self.assertPathExists('foo/bar')
 
534
        self.assertPathDoesNotExist('foo/foo')
 
535
 
 
536
 
 
537
class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
 
538
 
 
539
    def test_home_is_non_existant_dir_under_root(self):
 
540
        """The test_home_dir for TestCaseWithMemoryTransport is missing.
 
541
 
 
542
        This is because TestCaseWithMemoryTransport is for tests that do not
 
543
        need any disk resources: they should be hooked into bzrlib in such a
 
544
        way that no global settings are being changed by the test (only a
 
545
        few tests should need to do that), and having a missing dir as home is
 
546
        an effective way to ensure that this is the case.
 
547
        """
 
548
        self.assertIsSameRealPath(
 
549
            self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
 
550
            self.test_home_dir)
 
551
        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
 
552
 
 
553
    def test_cwd_is_TEST_ROOT(self):
 
554
        self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
 
555
        cwd = osutils.getcwd()
 
556
        self.assertIsSameRealPath(self.test_dir, cwd)
 
557
 
 
558
    def test_BZR_HOME_and_HOME_are_bytestrings(self):
 
559
        """The $BZR_HOME and $HOME environment variables should not be unicode.
 
560
 
 
561
        See https://bugs.launchpad.net/bzr/+bug/464174
 
562
        """
 
563
        self.assertIsInstance(os.environ['BZR_HOME'], str)
 
564
        self.assertIsInstance(os.environ['HOME'], str)
 
565
 
 
566
    def test_make_branch_and_memory_tree(self):
 
567
        """In TestCaseWithMemoryTransport we should not make the branch on disk.
 
568
 
 
569
        This is hard to comprehensively robustly test, so we settle for making
 
570
        a branch and checking no directory was created at its relpath.
 
571
        """
 
572
        tree = self.make_branch_and_memory_tree('dir')
 
573
        # Guard against regression into MemoryTransport leaking
 
574
        # files to disk instead of keeping them in memory.
 
575
        self.assertFalse(osutils.lexists('dir'))
 
576
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
577
 
 
578
    def test_make_branch_and_memory_tree_with_format(self):
 
579
        """make_branch_and_memory_tree should accept a format option."""
 
580
        format = bzrdir.BzrDirMetaFormat1()
 
581
        format.repository_format = repository.format_registry.get_default()
 
582
        tree = self.make_branch_and_memory_tree('dir', format=format)
 
583
        # Guard against regression into MemoryTransport leaking
 
584
        # files to disk instead of keeping them in memory.
 
585
        self.assertFalse(osutils.lexists('dir'))
 
586
        self.assertIsInstance(tree, memorytree.MemoryTree)
 
587
        self.assertEqual(format.repository_format.__class__,
 
588
            tree.branch.repository._format.__class__)
 
589
 
 
590
    def test_make_branch_builder(self):
 
591
        builder = self.make_branch_builder('dir')
 
592
        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
 
593
        # Guard against regression into MemoryTransport leaking
 
594
        # files to disk instead of keeping them in memory.
 
595
        self.assertFalse(osutils.lexists('dir'))
 
596
 
 
597
    def test_make_branch_builder_with_format(self):
 
598
        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
 
599
        # that the format objects are used.
 
600
        format = bzrdir.BzrDirMetaFormat1()
 
601
        repo_format = repository.format_registry.get_default()
 
602
        format.repository_format = repo_format
 
603
        builder = self.make_branch_builder('dir', format=format)
 
604
        the_branch = builder.get_branch()
 
605
        # Guard against regression into MemoryTransport leaking
 
606
        # files to disk instead of keeping them in memory.
 
607
        self.assertFalse(osutils.lexists('dir'))
 
608
        self.assertEqual(format.repository_format.__class__,
 
609
                         the_branch.repository._format.__class__)
 
610
        self.assertEqual(repo_format.get_format_string(),
 
611
                         self.get_transport().get_bytes(
 
612
                            'dir/.bzr/repository/format'))
 
613
 
 
614
    def test_make_branch_builder_with_format_name(self):
 
615
        builder = self.make_branch_builder('dir', format='knit')
 
616
        the_branch = builder.get_branch()
 
617
        # Guard against regression into MemoryTransport leaking
 
618
        # files to disk instead of keeping them in memory.
 
619
        self.assertFalse(osutils.lexists('dir'))
 
620
        dir_format = bzrdir.format_registry.make_bzrdir('knit')
 
621
        self.assertEqual(dir_format.repository_format.__class__,
 
622
                         the_branch.repository._format.__class__)
 
623
        self.assertEqual('Bazaar-NG Knit Repository Format 1',
 
624
                         self.get_transport().get_bytes(
 
625
                            'dir/.bzr/repository/format'))
 
626
 
 
627
    def test_dangling_locks_cause_failures(self):
 
628
        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
 
629
            def test_function(self):
 
630
                t = self.get_transport('.')
 
631
                l = lockdir.LockDir(t, 'lock')
 
632
                l.create()
 
633
                l.attempt_lock()
 
634
        test = TestDanglingLock('test_function')
 
635
        result = test.run()
 
636
        total_failures = result.errors + result.failures
 
637
        if self._lock_check_thorough:
 
638
            self.assertEqual(1, len(total_failures))
 
639
        else:
 
640
            # When _lock_check_thorough is disabled, then we don't trigger a
 
641
            # failure
 
642
            self.assertEqual(0, len(total_failures))
 
643
 
 
644
 
 
645
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
 
646
    """Tests for the convenience functions TestCaseWithTransport introduces."""
 
647
 
 
648
    def test_get_readonly_url_none(self):
 
649
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
650
        self.vfs_transport_factory = memory.MemoryServer
 
651
        self.transport_readonly_server = None
 
652
        # calling get_readonly_transport() constructs a decorator on the url
 
653
        # for the server
 
654
        url = self.get_readonly_url()
 
655
        url2 = self.get_readonly_url('foo/bar')
 
656
        t = transport.get_transport(url)
 
657
        t2 = transport.get_transport(url2)
 
658
        self.assertIsInstance(t, ReadonlyTransportDecorator)
 
659
        self.assertIsInstance(t2, ReadonlyTransportDecorator)
 
660
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
661
 
 
662
    def test_get_readonly_url_http(self):
 
663
        from bzrlib.tests.http_server import HttpServer
 
664
        from bzrlib.transport.http import HttpTransportBase
 
665
        self.transport_server = test_server.LocalURLServer
 
666
        self.transport_readonly_server = HttpServer
 
667
        # calling get_readonly_transport() gives us a HTTP server instance.
 
668
        url = self.get_readonly_url()
 
669
        url2 = self.get_readonly_url('foo/bar')
 
670
        # the transport returned may be any HttpTransportBase subclass
 
671
        t = transport.get_transport(url)
 
672
        t2 = transport.get_transport(url2)
 
673
        self.assertIsInstance(t, HttpTransportBase)
 
674
        self.assertIsInstance(t2, HttpTransportBase)
 
675
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
 
676
 
 
677
    def test_is_directory(self):
 
678
        """Test assertIsDirectory assertion"""
 
679
        t = self.get_transport()
 
680
        self.build_tree(['a_dir/', 'a_file'], transport=t)
 
681
        self.assertIsDirectory('a_dir', t)
 
682
        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
 
683
        self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t)
 
684
 
 
685
    def test_make_branch_builder(self):
 
686
        builder = self.make_branch_builder('dir')
 
687
        rev_id = builder.build_commit()
 
688
        self.assertPathExists('dir')
 
689
        a_dir = bzrdir.BzrDir.open('dir')
 
690
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
691
        a_branch = a_dir.open_branch()
 
692
        builder_branch = builder.get_branch()
 
693
        self.assertEqual(a_branch.base, builder_branch.base)
 
694
        self.assertEqual((1, rev_id), builder_branch.last_revision_info())
 
695
        self.assertEqual((1, rev_id), a_branch.last_revision_info())
 
696
 
 
697
 
 
698
class TestTestCaseTransports(tests.TestCaseWithTransport):
 
699
 
 
700
    def setUp(self):
 
701
        super(TestTestCaseTransports, self).setUp()
 
702
        self.vfs_transport_factory = memory.MemoryServer
 
703
 
 
704
    def test_make_bzrdir_preserves_transport(self):
 
705
        t = self.get_transport()
 
706
        result_bzrdir = self.make_bzrdir('subdir')
 
707
        self.assertIsInstance(result_bzrdir.transport,
 
708
                              memory.MemoryTransport)
 
709
        # should not be on disk, should only be in memory
 
710
        self.assertPathDoesNotExist('subdir')
 
711
 
 
712
 
 
713
class TestChrootedTest(tests.ChrootedTestCase):
 
714
 
 
715
    def test_root_is_root(self):
 
716
        t = transport.get_transport(self.get_readonly_url())
 
717
        url = t.base
 
718
        self.assertEqual(url, t.clone('..').base)
 
719
 
 
720
 
 
721
class TestProfileResult(tests.TestCase):
 
722
 
 
723
    def test_profiles_tests(self):
 
724
        self.requireFeature(test_lsprof.LSProfFeature)
 
725
        terminal = testtools.testresult.doubles.ExtendedTestResult()
 
726
        result = tests.ProfileResult(terminal)
 
727
        class Sample(tests.TestCase):
 
728
            def a(self):
 
729
                self.sample_function()
 
730
            def sample_function(self):
 
731
                pass
 
732
        test = Sample("a")
 
733
        test.run(result)
 
734
        case = terminal._events[0][1]
 
735
        self.assertLength(1, case._benchcalls)
 
736
        # We must be able to unpack it as the test reporting code wants
 
737
        (_, _, _), stats = case._benchcalls[0]
 
738
        self.assertTrue(callable(stats.pprint))
 
739
 
 
740
 
 
741
class TestTestResult(tests.TestCase):
 
742
 
 
743
    def check_timing(self, test_case, expected_re):
 
744
        result = bzrlib.tests.TextTestResult(self._log_file,
 
745
                descriptions=0,
 
746
                verbosity=1,
 
747
                )
 
748
        capture = testtools.testresult.doubles.ExtendedTestResult()
 
749
        test_case.run(MultiTestResult(result, capture))
 
750
        run_case = capture._events[0][1]
 
751
        timed_string = result._testTimeString(run_case)
 
752
        self.assertContainsRe(timed_string, expected_re)
 
753
 
 
754
    def test_test_reporting(self):
 
755
        class ShortDelayTestCase(tests.TestCase):
 
756
            def test_short_delay(self):
 
757
                time.sleep(0.003)
 
758
            def test_short_benchmark(self):
 
759
                self.time(time.sleep, 0.003)
 
760
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
761
                          r"^ +[0-9]+ms$")
 
762
        # if a benchmark time is given, we now show just that time followed by
 
763
        # a star
 
764
        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
 
765
                          r"^ +[0-9]+ms\*$")
 
766
 
 
767
    def test_unittest_reporting_unittest_class(self):
 
768
        # getting the time from a non-bzrlib test works ok
 
769
        class ShortDelayTestCase(unittest.TestCase):
 
770
            def test_short_delay(self):
 
771
                time.sleep(0.003)
 
772
        self.check_timing(ShortDelayTestCase('test_short_delay'),
 
773
                          r"^ +[0-9]+ms$")
 
774
 
 
775
    def _time_hello_world_encoding(self):
 
776
        """Profile two sleep calls
 
777
 
 
778
        This is used to exercise the test framework.
 
779
        """
 
780
        self.time(unicode, 'hello', errors='replace')
 
781
        self.time(unicode, 'world', errors='replace')
 
782
 
 
783
    def test_lsprofiling(self):
 
784
        """Verbose test result prints lsprof statistics from test cases."""
 
785
        self.requireFeature(test_lsprof.LSProfFeature)
 
786
        result_stream = StringIO()
 
787
        result = bzrlib.tests.VerboseTestResult(
 
788
            result_stream,
 
789
            descriptions=0,
 
790
            verbosity=2,
 
791
            )
 
792
        # we want profile a call of some sort and check it is output by
 
793
        # addSuccess. We dont care about addError or addFailure as they
 
794
        # are not that interesting for performance tuning.
 
795
        # make a new test instance that when run will generate a profile
 
796
        example_test_case = TestTestResult("_time_hello_world_encoding")
 
797
        example_test_case._gather_lsprof_in_benchmarks = True
 
798
        # execute the test, which should succeed and record profiles
 
799
        example_test_case.run(result)
 
800
        # lsprofile_something()
 
801
        # if this worked we want
 
802
        # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
 
803
        #    CallCount    Recursive    Total(ms)   Inline(ms) module:lineno(function)
 
804
        # (the lsprof header)
 
805
        # ... an arbitrary number of lines
 
806
        # and the function call which is time.sleep.
 
807
        #           1        0            ???         ???       ???(sleep)
 
808
        # and then repeated but with 'world', rather than 'hello'.
 
809
        # this should appear in the output stream of our test result.
 
810
        output = result_stream.getvalue()
 
811
        self.assertContainsRe(output,
 
812
            r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)")
 
813
        self.assertContainsRe(output,
 
814
            r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
 
815
        self.assertContainsRe(output,
 
816
            r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
 
817
        self.assertContainsRe(output,
 
818
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
 
819
 
 
820
    def test_uses_time_from_testtools(self):
 
821
        """Test case timings in verbose results should use testtools times"""
 
822
        import datetime
 
823
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
824
            def startTest(self, test):
 
825
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
826
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
827
            def addSuccess(self, test):
 
828
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
829
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
830
            def report_tests_starting(self): pass
 
831
        sio = StringIO()
 
832
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
833
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
834
 
 
835
    def test_known_failure(self):
 
836
        """A KnownFailure being raised should trigger several result actions."""
 
837
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
838
            def stopTestRun(self): pass
 
839
            def report_tests_starting(self): pass
 
840
            def report_known_failure(self, test, err=None, details=None):
 
841
                self._call = test, 'known failure'
 
842
        result = InstrumentedTestResult(None, None, None, None)
 
843
        class Test(tests.TestCase):
 
844
            def test_function(self):
 
845
                raise tests.KnownFailure('failed!')
 
846
        test = Test("test_function")
 
847
        test.run(result)
 
848
        # it should invoke 'report_known_failure'.
 
849
        self.assertEqual(2, len(result._call))
 
850
        self.assertEqual(test.id(), result._call[0].id())
 
851
        self.assertEqual('known failure', result._call[1])
 
852
        # we dont introspec the traceback, if the rest is ok, it would be
 
853
        # exceptional for it not to be.
 
854
        # it should update the known_failure_count on the object.
 
855
        self.assertEqual(1, result.known_failure_count)
 
856
        # the result should be successful.
 
857
        self.assertTrue(result.wasSuccessful())
 
858
 
 
859
    def test_verbose_report_known_failure(self):
 
860
        # verbose test output formatting
 
861
        result_stream = StringIO()
 
862
        result = bzrlib.tests.VerboseTestResult(
 
863
            result_stream,
 
864
            descriptions=0,
 
865
            verbosity=2,
 
866
            )
 
867
        test = self.get_passing_test()
 
868
        result.startTest(test)
 
869
        prefix = len(result_stream.getvalue())
 
870
        # the err parameter has the shape:
 
871
        # (class, exception object, traceback)
 
872
        # KnownFailures dont get their tracebacks shown though, so we
 
873
        # can skip that.
 
874
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
875
        result.report_known_failure(test, err)
 
876
        output = result_stream.getvalue()[prefix:]
 
877
        lines = output.splitlines()
 
878
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
879
        if sys.version_info > (2, 7):
 
880
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
881
                self.assertNotEqual, lines[1], '    ')
 
882
        self.assertEqual(lines[1], '    foo')
 
883
        self.assertEqual(2, len(lines))
 
884
 
 
885
    def get_passing_test(self):
 
886
        """Return a test object that can't be run usefully."""
 
887
        def passing_test():
 
888
            pass
 
889
        return unittest.FunctionTestCase(passing_test)
 
890
 
 
891
    def test_add_not_supported(self):
 
892
        """Test the behaviour of invoking addNotSupported."""
 
893
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
894
            def stopTestRun(self): pass
 
895
            def report_tests_starting(self): pass
 
896
            def report_unsupported(self, test, feature):
 
897
                self._call = test, feature
 
898
        result = InstrumentedTestResult(None, None, None, None)
 
899
        test = SampleTestCase('_test_pass')
 
900
        feature = tests.Feature()
 
901
        result.startTest(test)
 
902
        result.addNotSupported(test, feature)
 
903
        # it should invoke 'report_unsupported'.
 
904
        self.assertEqual(2, len(result._call))
 
905
        self.assertEqual(test, result._call[0])
 
906
        self.assertEqual(feature, result._call[1])
 
907
        # the result should be successful.
 
908
        self.assertTrue(result.wasSuccessful())
 
909
        # it should record the test against a count of tests not run due to
 
910
        # this feature.
 
911
        self.assertEqual(1, result.unsupported['Feature'])
 
912
        # and invoking it again should increment that counter
 
913
        result.addNotSupported(test, feature)
 
914
        self.assertEqual(2, result.unsupported['Feature'])
 
915
 
 
916
    def test_verbose_report_unsupported(self):
 
917
        # verbose test output formatting
 
918
        result_stream = StringIO()
 
919
        result = bzrlib.tests.VerboseTestResult(
 
920
            result_stream,
 
921
            descriptions=0,
 
922
            verbosity=2,
 
923
            )
 
924
        test = self.get_passing_test()
 
925
        feature = tests.Feature()
 
926
        result.startTest(test)
 
927
        prefix = len(result_stream.getvalue())
 
928
        result.report_unsupported(test, feature)
 
929
        output = result_stream.getvalue()[prefix:]
 
930
        lines = output.splitlines()
 
931
        # We don't check for the final '0ms' since it may fail on slow hosts
 
932
        self.assertStartsWith(lines[0], 'NODEP')
 
933
        self.assertEqual(lines[1],
 
934
                         "    The feature 'Feature' is not available.")
 
935
 
 
936
    def test_unavailable_exception(self):
 
937
        """An UnavailableFeature being raised should invoke addNotSupported."""
 
938
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
939
            def stopTestRun(self): pass
 
940
            def report_tests_starting(self): pass
 
941
            def addNotSupported(self, test, feature):
 
942
                self._call = test, feature
 
943
        result = InstrumentedTestResult(None, None, None, None)
 
944
        feature = tests.Feature()
 
945
        class Test(tests.TestCase):
 
946
            def test_function(self):
 
947
                raise tests.UnavailableFeature(feature)
 
948
        test = Test("test_function")
 
949
        test.run(result)
 
950
        # it should invoke 'addNotSupported'.
 
951
        self.assertEqual(2, len(result._call))
 
952
        self.assertEqual(test.id(), result._call[0].id())
 
953
        self.assertEqual(feature, result._call[1])
 
954
        # and not count as an error
 
955
        self.assertEqual(0, result.error_count)
 
956
 
 
957
    def test_strict_with_unsupported_feature(self):
 
958
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
 
959
                                             verbosity=1)
 
960
        test = self.get_passing_test()
 
961
        feature = "Unsupported Feature"
 
962
        result.addNotSupported(test, feature)
 
963
        self.assertFalse(result.wasStrictlySuccessful())
 
964
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
965
 
 
966
    def test_strict_with_known_failure(self):
 
967
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
 
968
                                             verbosity=1)
 
969
        test = self.get_passing_test()
 
970
        err = (tests.KnownFailure, tests.KnownFailure('foo'), None)
 
971
        result.addExpectedFailure(test, err)
 
972
        self.assertFalse(result.wasStrictlySuccessful())
 
973
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
974
 
 
975
    def test_strict_with_success(self):
 
976
        result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0,
 
977
                                             verbosity=1)
 
978
        test = self.get_passing_test()
 
979
        result.addSuccess(test)
 
980
        self.assertTrue(result.wasStrictlySuccessful())
 
981
        self.assertEqual(None, result._extractBenchmarkTime(test))
 
982
 
 
983
    def test_startTests(self):
 
984
        """Starting the first test should trigger startTests."""
 
985
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
986
            calls = 0
 
987
            def startTests(self): self.calls += 1
 
988
        result = InstrumentedTestResult(None, None, None, None)
 
989
        def test_function():
 
990
            pass
 
991
        test = unittest.FunctionTestCase(test_function)
 
992
        test.run(result)
 
993
        self.assertEquals(1, result.calls)
 
994
 
 
995
    def test_startTests_only_once(self):
 
996
        """With multiple tests startTests should still only be called once"""
 
997
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
998
            calls = 0
 
999
            def startTests(self): self.calls += 1
 
1000
        result = InstrumentedTestResult(None, None, None, None)
 
1001
        suite = unittest.TestSuite([
 
1002
            unittest.FunctionTestCase(lambda: None),
 
1003
            unittest.FunctionTestCase(lambda: None)])
 
1004
        suite.run(result)
 
1005
        self.assertEquals(1, result.calls)
 
1006
        self.assertEquals(2, result.count)
 
1007
 
 
1008
 
 
1009
class TestUnicodeFilenameFeature(tests.TestCase):
 
1010
 
 
1011
    def test_probe_passes(self):
 
1012
        """UnicodeFilenameFeature._probe passes."""
 
1013
        # We can't test much more than that because the behaviour depends
 
1014
        # on the platform.
 
1015
        tests.UnicodeFilenameFeature._probe()
 
1016
 
 
1017
 
 
1018
class TestRunner(tests.TestCase):
 
1019
 
 
1020
    def dummy_test(self):
 
1021
        pass
 
1022
 
 
1023
    def run_test_runner(self, testrunner, test):
 
1024
        """Run suite in testrunner, saving global state and restoring it.
 
1025
 
 
1026
        This current saves and restores:
 
1027
        TestCaseInTempDir.TEST_ROOT
 
1028
 
 
1029
        There should be no tests in this file that use
 
1030
        bzrlib.tests.TextTestRunner without using this convenience method,
 
1031
        because of our use of global state.
 
1032
        """
 
1033
        old_root = tests.TestCaseInTempDir.TEST_ROOT
 
1034
        try:
 
1035
            tests.TestCaseInTempDir.TEST_ROOT = None
 
1036
            return testrunner.run(test)
 
1037
        finally:
 
1038
            tests.TestCaseInTempDir.TEST_ROOT = old_root
 
1039
 
 
1040
    def test_known_failure_failed_run(self):
 
1041
        # run a test that generates a known failure which should be printed in
 
1042
        # the final output when real failures occur.
 
1043
        class Test(tests.TestCase):
 
1044
            def known_failure_test(self):
 
1045
                self.expectFailure('failed', self.assertTrue, False)
 
1046
        test = unittest.TestSuite()
 
1047
        test.addTest(Test("known_failure_test"))
 
1048
        def failing_test():
 
1049
            raise AssertionError('foo')
 
1050
        test.addTest(unittest.FunctionTestCase(failing_test))
 
1051
        stream = StringIO()
 
1052
        runner = tests.TextTestRunner(stream=stream)
 
1053
        result = self.run_test_runner(runner, test)
 
1054
        lines = stream.getvalue().splitlines()
 
1055
        self.assertContainsRe(stream.getvalue(),
 
1056
            '(?sm)^bzr selftest.*$'
 
1057
            '.*'
 
1058
            '^======================================================================\n'
 
1059
            '^FAIL: failing_test\n'
 
1060
            '^----------------------------------------------------------------------\n'
 
1061
            'Traceback \\(most recent call last\\):\n'
 
1062
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
 
1063
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1064
            '.*'
 
1065
            '^----------------------------------------------------------------------\n'
 
1066
            '.*'
 
1067
            'FAILED \\(failures=1, known_failure_count=1\\)'
 
1068
            )
 
1069
 
 
1070
    def test_known_failure_ok_run(self):
 
1071
        # run a test that generates a known failure which should be printed in
 
1072
        # the final output.
 
1073
        class Test(tests.TestCase):
 
1074
            def known_failure_test(self):
 
1075
                self.knownFailure("Never works...")
 
1076
        test = Test("known_failure_test")
 
1077
        stream = StringIO()
 
1078
        runner = tests.TextTestRunner(stream=stream)
 
1079
        result = self.run_test_runner(runner, test)
 
1080
        self.assertContainsRe(stream.getvalue(),
 
1081
            '\n'
 
1082
            '-*\n'
 
1083
            'Ran 1 test in .*\n'
 
1084
            '\n'
 
1085
            'OK \\(known_failures=1\\)\n')
 
1086
 
 
1087
    def test_unexpected_success_bad(self):
 
1088
        class Test(tests.TestCase):
 
1089
            def test_truth(self):
 
1090
                self.expectFailure("No absolute truth", self.assertTrue, True)
 
1091
        runner = tests.TextTestRunner(stream=StringIO())
 
1092
        result = self.run_test_runner(runner, Test("test_truth"))
 
1093
        self.assertContainsRe(runner.stream.getvalue(),
 
1094
            "=+\n"
 
1095
            "FAIL: \\S+\.test_truth\n"
 
1096
            "-+\n"
 
1097
            "(?:.*\n)*"
 
1098
            "No absolute truth\n"
 
1099
            "(?:.*\n)*"
 
1100
            "-+\n"
 
1101
            "Ran 1 test in .*\n"
 
1102
            "\n"
 
1103
            "FAILED \\(failures=1\\)\n\\Z")
 
1104
 
 
1105
    def test_result_decorator(self):
 
1106
        # decorate results
 
1107
        calls = []
 
1108
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1109
            def startTest(self, test):
 
1110
                ExtendedToOriginalDecorator.startTest(self, test)
 
1111
                calls.append('start')
 
1112
        test = unittest.FunctionTestCase(lambda:None)
 
1113
        stream = StringIO()
 
1114
        runner = tests.TextTestRunner(stream=stream,
 
1115
            result_decorators=[LoggingDecorator])
 
1116
        result = self.run_test_runner(runner, test)
 
1117
        self.assertLength(1, calls)
 
1118
 
 
1119
    def test_skipped_test(self):
 
1120
        # run a test that is skipped, and check the suite as a whole still
 
1121
        # succeeds.
 
1122
        # skipping_test must be hidden in here so it's not run as a real test
 
1123
        class SkippingTest(tests.TestCase):
 
1124
            def skipping_test(self):
 
1125
                raise tests.TestSkipped('test intentionally skipped')
 
1126
        runner = tests.TextTestRunner(stream=self._log_file)
 
1127
        test = SkippingTest("skipping_test")
 
1128
        result = self.run_test_runner(runner, test)
 
1129
        self.assertTrue(result.wasSuccessful())
 
1130
 
 
1131
    def test_skipped_from_setup(self):
 
1132
        calls = []
 
1133
        class SkippedSetupTest(tests.TestCase):
 
1134
 
 
1135
            def setUp(self):
 
1136
                calls.append('setUp')
 
1137
                self.addCleanup(self.cleanup)
 
1138
                raise tests.TestSkipped('skipped setup')
 
1139
 
 
1140
            def test_skip(self):
 
1141
                self.fail('test reached')
 
1142
 
 
1143
            def cleanup(self):
 
1144
                calls.append('cleanup')
 
1145
 
 
1146
        runner = tests.TextTestRunner(stream=self._log_file)
 
1147
        test = SkippedSetupTest('test_skip')
 
1148
        result = self.run_test_runner(runner, test)
 
1149
        self.assertTrue(result.wasSuccessful())
 
1150
        # Check if cleanup was called the right number of times.
 
1151
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1152
 
 
1153
    def test_skipped_from_test(self):
 
1154
        calls = []
 
1155
        class SkippedTest(tests.TestCase):
 
1156
 
 
1157
            def setUp(self):
 
1158
                tests.TestCase.setUp(self)
 
1159
                calls.append('setUp')
 
1160
                self.addCleanup(self.cleanup)
 
1161
 
 
1162
            def test_skip(self):
 
1163
                raise tests.TestSkipped('skipped test')
 
1164
 
 
1165
            def cleanup(self):
 
1166
                calls.append('cleanup')
 
1167
 
 
1168
        runner = tests.TextTestRunner(stream=self._log_file)
 
1169
        test = SkippedTest('test_skip')
 
1170
        result = self.run_test_runner(runner, test)
 
1171
        self.assertTrue(result.wasSuccessful())
 
1172
        # Check if cleanup was called the right number of times.
 
1173
        self.assertEqual(['setUp', 'cleanup'], calls)
 
1174
 
 
1175
    def test_not_applicable(self):
 
1176
        # run a test that is skipped because it's not applicable
 
1177
        class Test(tests.TestCase):
 
1178
            def not_applicable_test(self):
 
1179
                raise tests.TestNotApplicable('this test never runs')
 
1180
        out = StringIO()
 
1181
        runner = tests.TextTestRunner(stream=out, verbosity=2)
 
1182
        test = Test("not_applicable_test")
 
1183
        result = self.run_test_runner(runner, test)
 
1184
        self._log_file.write(out.getvalue())
 
1185
        self.assertTrue(result.wasSuccessful())
 
1186
        self.assertTrue(result.wasStrictlySuccessful())
 
1187
        self.assertContainsRe(out.getvalue(),
 
1188
                r'(?m)not_applicable_test   * N/A')
 
1189
        self.assertContainsRe(out.getvalue(),
 
1190
                r'(?m)^    this test never runs')
 
1191
 
 
1192
    def test_unsupported_features_listed(self):
 
1193
        """When unsupported features are encountered they are detailed."""
 
1194
        class Feature1(tests.Feature):
 
1195
            def _probe(self): return False
 
1196
        class Feature2(tests.Feature):
 
1197
            def _probe(self): return False
 
1198
        # create sample tests
 
1199
        test1 = SampleTestCase('_test_pass')
 
1200
        test1._test_needs_features = [Feature1()]
 
1201
        test2 = SampleTestCase('_test_pass')
 
1202
        test2._test_needs_features = [Feature2()]
 
1203
        test = unittest.TestSuite()
 
1204
        test.addTest(test1)
 
1205
        test.addTest(test2)
 
1206
        stream = StringIO()
 
1207
        runner = tests.TextTestRunner(stream=stream)
 
1208
        result = self.run_test_runner(runner, test)
 
1209
        lines = stream.getvalue().splitlines()
 
1210
        self.assertEqual([
 
1211
            'OK',
 
1212
            "Missing feature 'Feature1' skipped 1 tests.",
 
1213
            "Missing feature 'Feature2' skipped 1 tests.",
 
1214
            ],
 
1215
            lines[-3:])
 
1216
 
 
1217
    def test_verbose_test_count(self):
 
1218
        """A verbose test run reports the right test count at the start"""
 
1219
        suite = TestUtil.TestSuite([
 
1220
            unittest.FunctionTestCase(lambda:None),
 
1221
            unittest.FunctionTestCase(lambda:None)])
 
1222
        self.assertEqual(suite.countTestCases(), 2)
 
1223
        stream = StringIO()
 
1224
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1225
        # Need to use the CountingDecorator as that's what sets num_tests
 
1226
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1227
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1228
 
 
1229
    def test_startTestRun(self):
 
1230
        """run should call result.startTestRun()"""
 
1231
        calls = []
 
1232
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1233
            def startTestRun(self):
 
1234
                ExtendedToOriginalDecorator.startTestRun(self)
 
1235
                calls.append('startTestRun')
 
1236
        test = unittest.FunctionTestCase(lambda:None)
 
1237
        stream = StringIO()
 
1238
        runner = tests.TextTestRunner(stream=stream,
 
1239
            result_decorators=[LoggingDecorator])
 
1240
        result = self.run_test_runner(runner, test)
 
1241
        self.assertLength(1, calls)
 
1242
 
 
1243
    def test_stopTestRun(self):
 
1244
        """run should call result.stopTestRun()"""
 
1245
        calls = []
 
1246
        class LoggingDecorator(ExtendedToOriginalDecorator):
 
1247
            def stopTestRun(self):
 
1248
                ExtendedToOriginalDecorator.stopTestRun(self)
 
1249
                calls.append('stopTestRun')
 
1250
        test = unittest.FunctionTestCase(lambda:None)
 
1251
        stream = StringIO()
 
1252
        runner = tests.TextTestRunner(stream=stream,
 
1253
            result_decorators=[LoggingDecorator])
 
1254
        result = self.run_test_runner(runner, test)
 
1255
        self.assertLength(1, calls)
 
1256
 
 
1257
    def test_unicode_test_output_on_ascii_stream(self):
 
1258
        """Showing results should always succeed even on an ascii console"""
 
1259
        class FailureWithUnicode(tests.TestCase):
 
1260
            def test_log_unicode(self):
 
1261
                self.log(u"\u2606")
 
1262
                self.fail("Now print that log!")
 
1263
        out = StringIO()
 
1264
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1265
            lambda trace=False: "ascii")
 
1266
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1267
            FailureWithUnicode("test_log_unicode"))
 
1268
        self.assertContainsRe(out.getvalue(),
 
1269
            "Text attachment: log\n"
 
1270
            "-+\n"
 
1271
            "\d+\.\d+  \\\\u2606\n"
 
1272
            "-+\n")
 
1273
 
 
1274
 
 
1275
class SampleTestCase(tests.TestCase):
 
1276
 
 
1277
    def _test_pass(self):
 
1278
        pass
 
1279
 
 
1280
class _TestException(Exception):
 
1281
    pass
 
1282
 
 
1283
 
 
1284
class TestTestCase(tests.TestCase):
 
1285
    """Tests that test the core bzrlib TestCase."""
 
1286
 
 
1287
    def test_assertLength_matches_empty(self):
 
1288
        a_list = []
 
1289
        self.assertLength(0, a_list)
 
1290
 
 
1291
    def test_assertLength_matches_nonempty(self):
 
1292
        a_list = [1, 2, 3]
 
1293
        self.assertLength(3, a_list)
 
1294
 
 
1295
    def test_assertLength_fails_different(self):
 
1296
        a_list = []
 
1297
        self.assertRaises(AssertionError, self.assertLength, 1, a_list)
 
1298
 
 
1299
    def test_assertLength_shows_sequence_in_failure(self):
 
1300
        a_list = [1, 2, 3]
 
1301
        exception = self.assertRaises(AssertionError, self.assertLength, 2,
 
1302
            a_list)
 
1303
        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
 
1304
            exception.args[0])
 
1305
 
 
1306
    def test_base_setUp_not_called_causes_failure(self):
 
1307
        class TestCaseWithBrokenSetUp(tests.TestCase):
 
1308
            def setUp(self):
 
1309
                pass # does not call TestCase.setUp
 
1310
            def test_foo(self):
 
1311
                pass
 
1312
        test = TestCaseWithBrokenSetUp('test_foo')
 
1313
        result = unittest.TestResult()
 
1314
        test.run(result)
 
1315
        self.assertFalse(result.wasSuccessful())
 
1316
        self.assertEqual(1, result.testsRun)
 
1317
 
 
1318
    def test_base_tearDown_not_called_causes_failure(self):
 
1319
        class TestCaseWithBrokenTearDown(tests.TestCase):
 
1320
            def tearDown(self):
 
1321
                pass # does not call TestCase.tearDown
 
1322
            def test_foo(self):
 
1323
                pass
 
1324
        test = TestCaseWithBrokenTearDown('test_foo')
 
1325
        result = unittest.TestResult()
 
1326
        test.run(result)
 
1327
        self.assertFalse(result.wasSuccessful())
 
1328
        self.assertEqual(1, result.testsRun)
 
1329
 
 
1330
    def test_debug_flags_sanitised(self):
 
1331
        """The bzrlib debug flags should be sanitised by setUp."""
 
1332
        if 'allow_debug' in tests.selftest_debug_flags:
 
1333
            raise tests.TestNotApplicable(
 
1334
                '-Eallow_debug option prevents debug flag sanitisation')
 
1335
        # we could set something and run a test that will check
 
1336
        # it gets santised, but this is probably sufficient for now:
 
1337
        # if someone runs the test with -Dsomething it will error.
 
1338
        flags = set()
 
1339
        if self._lock_check_thorough:
 
1340
            flags.add('strict_locks')
 
1341
        self.assertEqual(flags, bzrlib.debug.debug_flags)
 
1342
 
 
1343
    def change_selftest_debug_flags(self, new_flags):
 
1344
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
 
1345
 
 
1346
    def test_allow_debug_flag(self):
 
1347
        """The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
 
1348
        sanitised (i.e. cleared) before running a test.
 
1349
        """
 
1350
        self.change_selftest_debug_flags(set(['allow_debug']))
 
1351
        bzrlib.debug.debug_flags = set(['a-flag'])
 
1352
        class TestThatRecordsFlags(tests.TestCase):
 
1353
            def test_foo(nested_self):
 
1354
                self.flags = set(bzrlib.debug.debug_flags)
 
1355
        test = TestThatRecordsFlags('test_foo')
 
1356
        test.run(self.make_test_result())
 
1357
        flags = set(['a-flag'])
 
1358
        if 'disable_lock_checks' not in tests.selftest_debug_flags:
 
1359
            flags.add('strict_locks')
 
1360
        self.assertEqual(flags, self.flags)
 
1361
 
 
1362
    def test_disable_lock_checks(self):
 
1363
        """The -Edisable_lock_checks flag disables thorough checks."""
 
1364
        class TestThatRecordsFlags(tests.TestCase):
 
1365
            def test_foo(nested_self):
 
1366
                self.flags = set(bzrlib.debug.debug_flags)
 
1367
                self.test_lock_check_thorough = nested_self._lock_check_thorough
 
1368
        self.change_selftest_debug_flags(set())
 
1369
        test = TestThatRecordsFlags('test_foo')
 
1370
        test.run(self.make_test_result())
 
1371
        # By default we do strict lock checking and thorough lock/unlock
 
1372
        # tracking.
 
1373
        self.assertTrue(self.test_lock_check_thorough)
 
1374
        self.assertEqual(set(['strict_locks']), self.flags)
 
1375
        # Now set the disable_lock_checks flag, and show that this changed.
 
1376
        self.change_selftest_debug_flags(set(['disable_lock_checks']))
 
1377
        test = TestThatRecordsFlags('test_foo')
 
1378
        test.run(self.make_test_result())
 
1379
        self.assertFalse(self.test_lock_check_thorough)
 
1380
        self.assertEqual(set(), self.flags)
 
1381
 
 
1382
    def test_this_fails_strict_lock_check(self):
 
1383
        class TestThatRecordsFlags(tests.TestCase):
 
1384
            def test_foo(nested_self):
 
1385
                self.flags1 = set(bzrlib.debug.debug_flags)
 
1386
                self.thisFailsStrictLockCheck()
 
1387
                self.flags2 = set(bzrlib.debug.debug_flags)
 
1388
        # Make sure lock checking is active
 
1389
        self.change_selftest_debug_flags(set())
 
1390
        test = TestThatRecordsFlags('test_foo')
 
1391
        test.run(self.make_test_result())
 
1392
        self.assertEqual(set(['strict_locks']), self.flags1)
 
1393
        self.assertEqual(set(), self.flags2)
 
1394
 
 
1395
    def test_debug_flags_restored(self):
 
1396
        """The bzrlib debug flags should be restored to their original state
 
1397
        after the test was run, even if allow_debug is set.
 
1398
        """
 
1399
        self.change_selftest_debug_flags(set(['allow_debug']))
 
1400
        # Now run a test that modifies debug.debug_flags.
 
1401
        bzrlib.debug.debug_flags = set(['original-state'])
 
1402
        class TestThatModifiesFlags(tests.TestCase):
 
1403
            def test_foo(self):
 
1404
                bzrlib.debug.debug_flags = set(['modified'])
 
1405
        test = TestThatModifiesFlags('test_foo')
 
1406
        test.run(self.make_test_result())
 
1407
        self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags)
 
1408
 
 
1409
    def make_test_result(self):
 
1410
        """Get a test result that writes to the test log file."""
 
1411
        return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1)
 
1412
 
 
1413
    def inner_test(self):
 
1414
        # the inner child test
 
1415
        note("inner_test")
 
1416
 
 
1417
    def outer_child(self):
 
1418
        # the outer child test
 
1419
        note("outer_start")
 
1420
        self.inner_test = TestTestCase("inner_child")
 
1421
        result = self.make_test_result()
 
1422
        self.inner_test.run(result)
 
1423
        note("outer finish")
 
1424
        self.addCleanup(osutils.delete_any, self._log_file_name)
 
1425
 
 
1426
    def test_trace_nesting(self):
 
1427
        # this tests that each test case nests its trace facility correctly.
 
1428
        # we do this by running a test case manually. That test case (A)
 
1429
        # should setup a new log, log content to it, setup a child case (B),
 
1430
        # which should log independently, then case (A) should log a trailer
 
1431
        # and return.
 
1432
        # we do two nested children so that we can verify the state of the
 
1433
        # logs after the outer child finishes is correct, which a bad clean
 
1434
        # up routine in tearDown might trigger a fault in our test with only
 
1435
        # one child, we should instead see the bad result inside our test with
 
1436
        # the two children.
 
1437
        # the outer child test
 
1438
        original_trace = bzrlib.trace._trace_file
 
1439
        outer_test = TestTestCase("outer_child")
 
1440
        result = self.make_test_result()
 
1441
        outer_test.run(result)
 
1442
        self.assertEqual(original_trace, bzrlib.trace._trace_file)
 
1443
 
 
1444
    def method_that_times_a_bit_twice(self):
 
1445
        # call self.time twice to ensure it aggregates
 
1446
        self.time(time.sleep, 0.007)
 
1447
        self.time(time.sleep, 0.007)
 
1448
 
 
1449
    def test_time_creates_benchmark_in_result(self):
 
1450
        """Test that the TestCase.time() method accumulates a benchmark time."""
 
1451
        sample_test = TestTestCase("method_that_times_a_bit_twice")
 
1452
        output_stream = StringIO()
 
1453
        result = bzrlib.tests.VerboseTestResult(
 
1454
            output_stream,
 
1455
            descriptions=0,
 
1456
            verbosity=2)
 
1457
        sample_test.run(result)
 
1458
        self.assertContainsRe(
 
1459
            output_stream.getvalue(),
 
1460
            r"\d+ms\*\n$")
 
1461
 
 
1462
    def test_hooks_sanitised(self):
 
1463
        """The bzrlib hooks should be sanitised by setUp."""
 
1464
        # Note this test won't fail with hooks that the core library doesn't
 
1465
        # use - but it trigger with a plugin that adds hooks, so its still a
 
1466
        # useful warning in that case.
 
1467
        self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks)
 
1468
        self.assertEqual(
 
1469
            bzrlib.smart.server.SmartServerHooks(),
 
1470
            bzrlib.smart.server.SmartTCPServer.hooks)
 
1471
        self.assertEqual(
 
1472
            bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks)
 
1473
 
 
1474
    def test__gather_lsprof_in_benchmarks(self):
 
1475
        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
 
1476
 
 
1477
        Each self.time() call is individually and separately profiled.
 
1478
        """
 
1479
        self.requireFeature(test_lsprof.LSProfFeature)
 
1480
        # overrides the class member with an instance member so no cleanup
 
1481
        # needed.
 
1482
        self._gather_lsprof_in_benchmarks = True
 
1483
        self.time(time.sleep, 0.000)
 
1484
        self.time(time.sleep, 0.003)
 
1485
        self.assertEqual(2, len(self._benchcalls))
 
1486
        self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
 
1487
        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
 
1488
        self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats)
 
1489
        self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats)
 
1490
        del self._benchcalls[:]
 
1491
 
 
1492
    def test_knownFailure(self):
 
1493
        """Self.knownFailure() should raise a KnownFailure exception."""
 
1494
        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
 
1495
 
 
1496
    def test_open_bzrdir_safe_roots(self):
 
1497
        # even a memory transport should fail to open when its url isn't 
 
1498
        # permitted.
 
1499
        # Manually set one up (TestCase doesn't and shouldn't provide magic
 
1500
        # machinery)
 
1501
        transport_server = memory.MemoryServer()
 
1502
        transport_server.start_server()
 
1503
        self.addCleanup(transport_server.stop_server)
 
1504
        t = transport.get_transport(transport_server.get_url())
 
1505
        bzrdir.BzrDir.create(t.base)
 
1506
        self.assertRaises(errors.BzrError,
 
1507
            bzrdir.BzrDir.open_from_transport, t)
 
1508
        # But if we declare this as safe, we can open the bzrdir.
 
1509
        self.permit_url(t.base)
 
1510
        self._bzr_selftest_roots.append(t.base)
 
1511
        bzrdir.BzrDir.open_from_transport(t)
 
1512
 
 
1513
    def test_requireFeature_available(self):
 
1514
        """self.requireFeature(available) is a no-op."""
 
1515
        class Available(tests.Feature):
 
1516
            def _probe(self):return True
 
1517
        feature = Available()
 
1518
        self.requireFeature(feature)
 
1519
 
 
1520
    def test_requireFeature_unavailable(self):
 
1521
        """self.requireFeature(unavailable) raises UnavailableFeature."""
 
1522
        class Unavailable(tests.Feature):
 
1523
            def _probe(self):return False
 
1524
        feature = Unavailable()
 
1525
        self.assertRaises(tests.UnavailableFeature,
 
1526
                          self.requireFeature, feature)
 
1527
 
 
1528
    def test_run_no_parameters(self):
 
1529
        test = SampleTestCase('_test_pass')
 
1530
        test.run()
 
1531
 
 
1532
    def test_run_enabled_unittest_result(self):
 
1533
        """Test we revert to regular behaviour when the test is enabled."""
 
1534
        test = SampleTestCase('_test_pass')
 
1535
        class EnabledFeature(object):
 
1536
            def available(self):
 
1537
                return True
 
1538
        test._test_needs_features = [EnabledFeature()]
 
1539
        result = unittest.TestResult()
 
1540
        test.run(result)
 
1541
        self.assertEqual(1, result.testsRun)
 
1542
        self.assertEqual([], result.errors)
 
1543
        self.assertEqual([], result.failures)
 
1544
 
 
1545
    def test_run_disabled_unittest_result(self):
 
1546
        """Test our compatability for disabled tests with unittest results."""
 
1547
        test = SampleTestCase('_test_pass')
 
1548
        class DisabledFeature(object):
 
1549
            def available(self):
 
1550
                return False
 
1551
        test._test_needs_features = [DisabledFeature()]
 
1552
        result = unittest.TestResult()
 
1553
        test.run(result)
 
1554
        self.assertEqual(1, result.testsRun)
 
1555
        self.assertEqual([], result.errors)
 
1556
        self.assertEqual([], result.failures)
 
1557
 
 
1558
    def test_run_disabled_supporting_result(self):
 
1559
        """Test disabled tests behaviour with support aware results."""
 
1560
        test = SampleTestCase('_test_pass')
 
1561
        class DisabledFeature(object):
 
1562
            def __eq__(self, other):
 
1563
                return isinstance(other, DisabledFeature)
 
1564
            def available(self):
 
1565
                return False
 
1566
        the_feature = DisabledFeature()
 
1567
        test._test_needs_features = [the_feature]
 
1568
        class InstrumentedTestResult(unittest.TestResult):
 
1569
            def __init__(self):
 
1570
                unittest.TestResult.__init__(self)
 
1571
                self.calls = []
 
1572
            def startTest(self, test):
 
1573
                self.calls.append(('startTest', test))
 
1574
            def stopTest(self, test):
 
1575
                self.calls.append(('stopTest', test))
 
1576
            def addNotSupported(self, test, feature):
 
1577
                self.calls.append(('addNotSupported', test, feature))
 
1578
        result = InstrumentedTestResult()
 
1579
        test.run(result)
 
1580
        case = result.calls[0][1]
 
1581
        self.assertEqual([
 
1582
            ('startTest', case),
 
1583
            ('addNotSupported', case, the_feature),
 
1584
            ('stopTest', case),
 
1585
            ],
 
1586
            result.calls)
 
1587
 
 
1588
    def test_start_server_registers_url(self):
 
1589
        transport_server = memory.MemoryServer()
 
1590
        # A little strict, but unlikely to be changed soon.
 
1591
        self.assertEqual([], self._bzr_selftest_roots)
 
1592
        self.start_server(transport_server)
 
1593
        self.assertSubset([transport_server.get_url()],
 
1594
            self._bzr_selftest_roots)
 
1595
 
 
1596
    def test_assert_list_raises_on_generator(self):
 
1597
        def generator_which_will_raise():
 
1598
            # This will not raise until after the first yield
 
1599
            yield 1
 
1600
            raise _TestException()
 
1601
 
 
1602
        e = self.assertListRaises(_TestException, generator_which_will_raise)
 
1603
        self.assertIsInstance(e, _TestException)
 
1604
 
 
1605
        e = self.assertListRaises(Exception, generator_which_will_raise)
 
1606
        self.assertIsInstance(e, _TestException)
 
1607
 
 
1608
    def test_assert_list_raises_on_plain(self):
 
1609
        def plain_exception():
 
1610
            raise _TestException()
 
1611
            return []
 
1612
 
 
1613
        e = self.assertListRaises(_TestException, plain_exception)
 
1614
        self.assertIsInstance(e, _TestException)
 
1615
 
 
1616
        e = self.assertListRaises(Exception, plain_exception)
 
1617
        self.assertIsInstance(e, _TestException)
 
1618
 
 
1619
    def test_assert_list_raises_assert_wrong_exception(self):
 
1620
        class _NotTestException(Exception):
 
1621
            pass
 
1622
 
 
1623
        def wrong_exception():
 
1624
            raise _NotTestException()
 
1625
 
 
1626
        def wrong_exception_generator():
 
1627
            yield 1
 
1628
            yield 2
 
1629
            raise _NotTestException()
 
1630
 
 
1631
        # Wrong exceptions are not intercepted
 
1632
        self.assertRaises(_NotTestException,
 
1633
            self.assertListRaises, _TestException, wrong_exception)
 
1634
        self.assertRaises(_NotTestException,
 
1635
            self.assertListRaises, _TestException, wrong_exception_generator)
 
1636
 
 
1637
    def test_assert_list_raises_no_exception(self):
 
1638
        def success():
 
1639
            return []
 
1640
 
 
1641
        def success_generator():
 
1642
            yield 1
 
1643
            yield 2
 
1644
 
 
1645
        self.assertRaises(AssertionError,
 
1646
            self.assertListRaises, _TestException, success)
 
1647
 
 
1648
        self.assertRaises(AssertionError,
 
1649
            self.assertListRaises, _TestException, success_generator)
 
1650
 
 
1651
    def test_overrideAttr_without_value(self):
 
1652
        self.test_attr = 'original' # Define a test attribute
 
1653
        obj = self # Make 'obj' visible to the embedded test
 
1654
        class Test(tests.TestCase):
 
1655
 
 
1656
            def setUp(self):
 
1657
                tests.TestCase.setUp(self)
 
1658
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1659
 
 
1660
            def test_value(self):
 
1661
                self.assertEqual('original', self.orig)
 
1662
                self.assertEqual('original', obj.test_attr)
 
1663
                obj.test_attr = 'modified'
 
1664
                self.assertEqual('modified', obj.test_attr)
 
1665
 
 
1666
        test = Test('test_value')
 
1667
        test.run(unittest.TestResult())
 
1668
        self.assertEqual('original', obj.test_attr)
 
1669
 
 
1670
    def test_overrideAttr_with_value(self):
 
1671
        self.test_attr = 'original' # Define a test attribute
 
1672
        obj = self # Make 'obj' visible to the embedded test
 
1673
        class Test(tests.TestCase):
 
1674
 
 
1675
            def setUp(self):
 
1676
                tests.TestCase.setUp(self)
 
1677
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1678
 
 
1679
            def test_value(self):
 
1680
                self.assertEqual('original', self.orig)
 
1681
                self.assertEqual('modified', obj.test_attr)
 
1682
 
 
1683
        test = Test('test_value')
 
1684
        test.run(unittest.TestResult())
 
1685
        self.assertEqual('original', obj.test_attr)
 
1686
 
 
1687
 
 
1688
class _MissingFeature(tests.Feature):
 
1689
    def _probe(self):
 
1690
        return False
 
1691
missing_feature = _MissingFeature()
 
1692
 
 
1693
 
 
1694
def _get_test(name):
 
1695
    """Get an instance of a specific example test.
 
1696
 
 
1697
    We protect this in a function so that they don't auto-run in the test
 
1698
    suite.
 
1699
    """
 
1700
 
 
1701
    class ExampleTests(tests.TestCase):
 
1702
 
 
1703
        def test_fail(self):
 
1704
            mutter('this was a failing test')
 
1705
            self.fail('this test will fail')
 
1706
 
 
1707
        def test_error(self):
 
1708
            mutter('this test errored')
 
1709
            raise RuntimeError('gotcha')
 
1710
 
 
1711
        def test_missing_feature(self):
 
1712
            mutter('missing the feature')
 
1713
            self.requireFeature(missing_feature)
 
1714
 
 
1715
        def test_skip(self):
 
1716
            mutter('this test will be skipped')
 
1717
            raise tests.TestSkipped('reason')
 
1718
 
 
1719
        def test_success(self):
 
1720
            mutter('this test succeeds')
 
1721
 
 
1722
        def test_xfail(self):
 
1723
            mutter('test with expected failure')
 
1724
            self.knownFailure('this_fails')
 
1725
 
 
1726
        def test_unexpected_success(self):
 
1727
            mutter('test with unexpected success')
 
1728
            self.expectFailure('should_fail', lambda: None)
 
1729
 
 
1730
    return ExampleTests(name)
 
1731
 
 
1732
 
 
1733
class TestTestCaseLogDetails(tests.TestCase):
 
1734
 
 
1735
    def _run_test(self, test_name):
 
1736
        test = _get_test(test_name)
 
1737
        result = testtools.TestResult()
 
1738
        test.run(result)
 
1739
        return result
 
1740
 
 
1741
    def test_fail_has_log(self):
 
1742
        result = self._run_test('test_fail')
 
1743
        self.assertEqual(1, len(result.failures))
 
1744
        result_content = result.failures[0][1]
 
1745
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1746
        self.assertContainsRe(result_content, 'this was a failing test')
 
1747
 
 
1748
    def test_error_has_log(self):
 
1749
        result = self._run_test('test_error')
 
1750
        self.assertEqual(1, len(result.errors))
 
1751
        result_content = result.errors[0][1]
 
1752
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1753
        self.assertContainsRe(result_content, 'this test errored')
 
1754
 
 
1755
    def test_skip_has_no_log(self):
 
1756
        result = self._run_test('test_skip')
 
1757
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1758
        skips = result.skip_reasons['reason']
 
1759
        self.assertEqual(1, len(skips))
 
1760
        test = skips[0]
 
1761
        self.assertFalse('log' in test.getDetails())
 
1762
 
 
1763
    def test_missing_feature_has_no_log(self):
 
1764
        # testtools doesn't know about addNotSupported, so it just gets
 
1765
        # considered as a skip
 
1766
        result = self._run_test('test_missing_feature')
 
1767
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1768
        skips = result.skip_reasons[missing_feature]
 
1769
        self.assertEqual(1, len(skips))
 
1770
        test = skips[0]
 
1771
        self.assertFalse('log' in test.getDetails())
 
1772
 
 
1773
    def test_xfail_has_no_log(self):
 
1774
        result = self._run_test('test_xfail')
 
1775
        self.assertEqual(1, len(result.expectedFailures))
 
1776
        result_content = result.expectedFailures[0][1]
 
1777
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1778
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1779
 
 
1780
    def test_unexpected_success_has_log(self):
 
1781
        result = self._run_test('test_unexpected_success')
 
1782
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1783
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1784
        # expectedFailures is a list of reasons?
 
1785
        test = result.unexpectedSuccesses[0]
 
1786
        details = test.getDetails()
 
1787
        self.assertTrue('log' in details)
 
1788
 
 
1789
 
 
1790
class TestTestCloning(tests.TestCase):
 
1791
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1792
 
 
1793
    def test_cloned_testcase_does_not_share_details(self):
 
1794
        """A TestCase cloned with clone_test does not share mutable attributes
 
1795
        such as details or cleanups.
 
1796
        """
 
1797
        class Test(tests.TestCase):
 
1798
            def test_foo(self):
 
1799
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1800
        orig_test = Test('test_foo')
 
1801
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1802
        orig_test.run(unittest.TestResult())
 
1803
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1804
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1805
 
 
1806
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1807
        """Applying two levels of scenarios to a test preserves the attributes
 
1808
        added by both scenarios.
 
1809
        """
 
1810
        class Test(tests.TestCase):
 
1811
            def test_foo(self):
 
1812
                pass
 
1813
        test = Test('test_foo')
 
1814
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1815
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1816
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1817
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1818
        all_tests = list(tests.iter_suite_tests(suite))
 
1819
        self.assertLength(4, all_tests)
 
1820
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1821
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1822
 
 
1823
 
 
1824
# NB: Don't delete this; it's not actually from 0.11!
 
1825
@deprecated_function(deprecated_in((0, 11, 0)))
 
1826
def sample_deprecated_function():
 
1827
    """A deprecated function to test applyDeprecated with."""
 
1828
    return 2
 
1829
 
 
1830
 
 
1831
def sample_undeprecated_function(a_param):
 
1832
    """A undeprecated function to test applyDeprecated with."""
 
1833
 
 
1834
 
 
1835
class ApplyDeprecatedHelper(object):
 
1836
    """A helper class for ApplyDeprecated tests."""
 
1837
 
 
1838
    @deprecated_method(deprecated_in((0, 11, 0)))
 
1839
    def sample_deprecated_method(self, param_one):
 
1840
        """A deprecated method for testing with."""
 
1841
        return param_one
 
1842
 
 
1843
    def sample_normal_method(self):
 
1844
        """A undeprecated method."""
 
1845
 
 
1846
    @deprecated_method(deprecated_in((0, 10, 0)))
 
1847
    def sample_nested_deprecation(self):
 
1848
        return sample_deprecated_function()
 
1849
 
 
1850
 
 
1851
class TestExtraAssertions(tests.TestCase):
 
1852
    """Tests for new test assertions in bzrlib test suite"""
 
1853
 
 
1854
    def test_assert_isinstance(self):
 
1855
        self.assertIsInstance(2, int)
 
1856
        self.assertIsInstance(u'', basestring)
 
1857
        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
 
1858
        self.assertEquals(str(e),
 
1859
            "None is an instance of <type 'NoneType'> rather than <type 'int'>")
 
1860
        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
 
1861
        e = self.assertRaises(AssertionError,
 
1862
            self.assertIsInstance, None, int, "it's just not")
 
1863
        self.assertEquals(str(e),
 
1864
            "None is an instance of <type 'NoneType'> rather than <type 'int'>"
 
1865
            ": it's just not")
 
1866
 
 
1867
    def test_assertEndsWith(self):
 
1868
        self.assertEndsWith('foo', 'oo')
 
1869
        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
 
1870
 
 
1871
    def test_assertEqualDiff(self):
 
1872
        e = self.assertRaises(AssertionError,
 
1873
                              self.assertEqualDiff, '', '\n')
 
1874
        self.assertEquals(str(e),
 
1875
                          # Don't blink ! The '+' applies to the second string
 
1876
                          'first string is missing a final newline.\n+ \n')
 
1877
        e = self.assertRaises(AssertionError,
 
1878
                              self.assertEqualDiff, '\n', '')
 
1879
        self.assertEquals(str(e),
 
1880
                          # Don't blink ! The '-' applies to the second string
 
1881
                          'second string is missing a final newline.\n- \n')
 
1882
 
 
1883
 
 
1884
class TestDeprecations(tests.TestCase):
 
1885
 
 
1886
    def test_applyDeprecated_not_deprecated(self):
 
1887
        sample_object = ApplyDeprecatedHelper()
 
1888
        # calling an undeprecated callable raises an assertion
 
1889
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1890
            deprecated_in((0, 11, 0)),
 
1891
            sample_object.sample_normal_method)
 
1892
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1893
            deprecated_in((0, 11, 0)),
 
1894
            sample_undeprecated_function, "a param value")
 
1895
        # calling a deprecated callable (function or method) with the wrong
 
1896
        # expected deprecation fails.
 
1897
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1898
            deprecated_in((0, 10, 0)),
 
1899
            sample_object.sample_deprecated_method, "a param value")
 
1900
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1901
            deprecated_in((0, 10, 0)),
 
1902
            sample_deprecated_function)
 
1903
        # calling a deprecated callable (function or method) with the right
 
1904
        # expected deprecation returns the functions result.
 
1905
        self.assertEqual("a param value",
 
1906
            self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1907
            sample_object.sample_deprecated_method, "a param value"))
 
1908
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
 
1909
            sample_deprecated_function))
 
1910
        # calling a nested deprecation with the wrong deprecation version
 
1911
        # fails even if a deeper nested function was deprecated with the
 
1912
        # supplied version.
 
1913
        self.assertRaises(AssertionError, self.applyDeprecated,
 
1914
            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
 
1915
        # calling a nested deprecation with the right deprecation value
 
1916
        # returns the calls result.
 
1917
        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)),
 
1918
            sample_object.sample_nested_deprecation))
 
1919
 
 
1920
    def test_callDeprecated(self):
 
1921
        def testfunc(be_deprecated, result=None):
 
1922
            if be_deprecated is True:
 
1923
                symbol_versioning.warn('i am deprecated', DeprecationWarning,
 
1924
                                       stacklevel=1)
 
1925
            return result
 
1926
        result = self.callDeprecated(['i am deprecated'], testfunc, True)
 
1927
        self.assertIs(None, result)
 
1928
        result = self.callDeprecated([], testfunc, False, 'result')
 
1929
        self.assertEqual('result', result)
 
1930
        self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
 
1931
        self.callDeprecated([], testfunc, be_deprecated=False)
 
1932
 
 
1933
 
 
1934
class TestWarningTests(tests.TestCase):
 
1935
    """Tests for calling methods that raise warnings."""
 
1936
 
 
1937
    def test_callCatchWarnings(self):
 
1938
        def meth(a, b):
 
1939
            warnings.warn("this is your last warning")
 
1940
            return a + b
 
1941
        wlist, result = self.callCatchWarnings(meth, 1, 2)
 
1942
        self.assertEquals(3, result)
 
1943
        # would like just to compare them, but UserWarning doesn't implement
 
1944
        # eq well
 
1945
        w0, = wlist
 
1946
        self.assertIsInstance(w0, UserWarning)
 
1947
        self.assertEquals("this is your last warning", str(w0))
 
1948
 
 
1949
 
 
1950
class TestConvenienceMakers(tests.TestCaseWithTransport):
 
1951
    """Test for the make_* convenience functions."""
 
1952
 
 
1953
    def test_make_branch_and_tree_with_format(self):
 
1954
        # we should be able to supply a format to make_branch_and_tree
 
1955
        self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1())
 
1956
        self.assertIsInstance(bzrlib.bzrdir.BzrDir.open('a')._format,
 
1957
                              bzrlib.bzrdir.BzrDirMetaFormat1)
 
1958
 
 
1959
    def test_make_branch_and_memory_tree(self):
 
1960
        # we should be able to get a new branch and a mutable tree from
 
1961
        # TestCaseWithTransport
 
1962
        tree = self.make_branch_and_memory_tree('a')
 
1963
        self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree)
 
1964
 
 
1965
    def test_make_tree_for_local_vfs_backed_transport(self):
 
1966
        # make_branch_and_tree has to use local branch and repositories
 
1967
        # when the vfs transport and local disk are colocated, even if
 
1968
        # a different transport is in use for url generation.
 
1969
        self.transport_server = test_server.FakeVFATServer
 
1970
        self.assertFalse(self.get_url('t1').startswith('file://'))
 
1971
        tree = self.make_branch_and_tree('t1')
 
1972
        base = tree.bzrdir.root_transport.base
 
1973
        self.assertStartsWith(base, 'file://')
 
1974
        self.assertEquals(tree.bzrdir.root_transport,
 
1975
                tree.branch.bzrdir.root_transport)
 
1976
        self.assertEquals(tree.bzrdir.root_transport,
 
1977
                tree.branch.repository.bzrdir.root_transport)
 
1978
 
 
1979
 
 
1980
class SelfTestHelper(object):
 
1981
 
 
1982
    def run_selftest(self, **kwargs):
 
1983
        """Run selftest returning its output."""
 
1984
        output = StringIO()
 
1985
        old_transport = bzrlib.tests.default_transport
 
1986
        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
 
1987
        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
 
1988
        try:
 
1989
            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
 
1990
        finally:
 
1991
            bzrlib.tests.default_transport = old_transport
 
1992
            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
1993
        output.seek(0)
 
1994
        return output
 
1995
 
 
1996
 
 
1997
class TestSelftest(tests.TestCase, SelfTestHelper):
 
1998
    """Tests of bzrlib.tests.selftest."""
 
1999
 
 
2000
    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
 
2001
        factory_called = []
 
2002
        def factory():
 
2003
            factory_called.append(True)
 
2004
            return TestUtil.TestSuite()
 
2005
        out = StringIO()
 
2006
        err = StringIO()
 
2007
        self.apply_redirected(out, err, None, bzrlib.tests.selftest,
 
2008
            test_suite_factory=factory)
 
2009
        self.assertEqual([True], factory_called)
 
2010
 
 
2011
    def factory(self):
 
2012
        """A test suite factory."""
 
2013
        class Test(tests.TestCase):
 
2014
            def a(self):
 
2015
                pass
 
2016
            def b(self):
 
2017
                pass
 
2018
            def c(self):
 
2019
                pass
 
2020
        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
 
2021
 
 
2022
    def test_list_only(self):
 
2023
        output = self.run_selftest(test_suite_factory=self.factory,
 
2024
            list_only=True)
 
2025
        self.assertEqual(3, len(output.readlines()))
 
2026
 
 
2027
    def test_list_only_filtered(self):
 
2028
        output = self.run_selftest(test_suite_factory=self.factory,
 
2029
            list_only=True, pattern="Test.b")
 
2030
        self.assertEndsWith(output.getvalue(), "Test.b\n")
 
2031
        self.assertLength(1, output.readlines())
 
2032
 
 
2033
    def test_list_only_excludes(self):
 
2034
        output = self.run_selftest(test_suite_factory=self.factory,
 
2035
            list_only=True, exclude_pattern="Test.b")
 
2036
        self.assertNotContainsRe("Test.b", output.getvalue())
 
2037
        self.assertLength(2, output.readlines())
 
2038
 
 
2039
    def test_lsprof_tests(self):
 
2040
        self.requireFeature(test_lsprof.LSProfFeature)
 
2041
        results = []
 
2042
        class Test(object):
 
2043
            def __call__(test, result):
 
2044
                test.run(result)
 
2045
            def run(test, result):
 
2046
                results.append(result)
 
2047
            def countTestCases(self):
 
2048
                return 1
 
2049
        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
 
2050
        self.assertLength(1, results)
 
2051
        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
 
2052
 
 
2053
    def test_random(self):
 
2054
        # test randomising by listing a number of tests.
 
2055
        output_123 = self.run_selftest(test_suite_factory=self.factory,
 
2056
            list_only=True, random_seed="123")
 
2057
        output_234 = self.run_selftest(test_suite_factory=self.factory,
 
2058
            list_only=True, random_seed="234")
 
2059
        self.assertNotEqual(output_123, output_234)
 
2060
        # "Randominzing test order..\n\n
 
2061
        self.assertLength(5, output_123.readlines())
 
2062
        self.assertLength(5, output_234.readlines())
 
2063
 
 
2064
    def test_random_reuse_is_same_order(self):
 
2065
        # test randomising by listing a number of tests.
 
2066
        expected = self.run_selftest(test_suite_factory=self.factory,
 
2067
            list_only=True, random_seed="123")
 
2068
        repeated = self.run_selftest(test_suite_factory=self.factory,
 
2069
            list_only=True, random_seed="123")
 
2070
        self.assertEqual(expected.getvalue(), repeated.getvalue())
 
2071
 
 
2072
    def test_runner_class(self):
 
2073
        self.requireFeature(features.subunit)
 
2074
        from subunit import ProtocolTestCase
 
2075
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2076
            test_suite_factory=self.factory)
 
2077
        test = ProtocolTestCase(stream)
 
2078
        result = unittest.TestResult()
 
2079
        test.run(result)
 
2080
        self.assertEqual(3, result.testsRun)
 
2081
 
 
2082
    def test_starting_with_single_argument(self):
 
2083
        output = self.run_selftest(test_suite_factory=self.factory,
 
2084
            starting_with=['bzrlib.tests.test_selftest.Test.a'],
 
2085
            list_only=True)
 
2086
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
 
2087
            output.getvalue())
 
2088
 
 
2089
    def test_starting_with_multiple_argument(self):
 
2090
        output = self.run_selftest(test_suite_factory=self.factory,
 
2091
            starting_with=['bzrlib.tests.test_selftest.Test.a',
 
2092
                'bzrlib.tests.test_selftest.Test.b'],
 
2093
            list_only=True)
 
2094
        self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
 
2095
            'bzrlib.tests.test_selftest.Test.b\n',
 
2096
            output.getvalue())
 
2097
 
 
2098
    def check_transport_set(self, transport_server):
 
2099
        captured_transport = []
 
2100
        def seen_transport(a_transport):
 
2101
            captured_transport.append(a_transport)
 
2102
        class Capture(tests.TestCase):
 
2103
            def a(self):
 
2104
                seen_transport(bzrlib.tests.default_transport)
 
2105
        def factory():
 
2106
            return TestUtil.TestSuite([Capture("a")])
 
2107
        self.run_selftest(transport=transport_server, test_suite_factory=factory)
 
2108
        self.assertEqual(transport_server, captured_transport[0])
 
2109
 
 
2110
    def test_transport_sftp(self):
 
2111
        self.requireFeature(features.paramiko)
 
2112
        from bzrlib.tests import stub_sftp
 
2113
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
 
2114
 
 
2115
    def test_transport_memory(self):
 
2116
        self.check_transport_set(memory.MemoryServer)
 
2117
 
 
2118
 
 
2119
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
 
2120
    # Does IO: reads test.list
 
2121
 
 
2122
    def test_load_list(self):
 
2123
        # Provide a list with one test - this test.
 
2124
        test_id_line = '%s\n' % self.id()
 
2125
        self.build_tree_contents([('test.list', test_id_line)])
 
2126
        # And generate a list of the tests in  the suite.
 
2127
        stream = self.run_selftest(load_list='test.list', list_only=True)
 
2128
        self.assertEqual(test_id_line, stream.getvalue())
 
2129
 
 
2130
    def test_load_unknown(self):
 
2131
        # Provide a list with one test - this test.
 
2132
        # And generate a list of the tests in  the suite.
 
2133
        err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
 
2134
            load_list='missing file name', list_only=True)
 
2135
 
 
2136
 
 
2137
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2138
 
 
2139
    _test_needs_features = [features.subunit]
 
2140
 
 
2141
    def run_subunit_stream(self, test_name):
 
2142
        from subunit import ProtocolTestCase
 
2143
        def factory():
 
2144
            return TestUtil.TestSuite([_get_test(test_name)])
 
2145
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2146
            test_suite_factory=factory)
 
2147
        test = ProtocolTestCase(stream)
 
2148
        result = testtools.TestResult()
 
2149
        test.run(result)
 
2150
        content = stream.getvalue()
 
2151
        return content, result
 
2152
 
 
2153
    def test_fail_has_log(self):
 
2154
        content, result = self.run_subunit_stream('test_fail')
 
2155
        self.assertEqual(1, len(result.failures))
 
2156
        self.assertContainsRe(content, '(?m)^log$')
 
2157
        self.assertContainsRe(content, 'this test will fail')
 
2158
 
 
2159
    def test_error_has_log(self):
 
2160
        content, result = self.run_subunit_stream('test_error')
 
2161
        self.assertContainsRe(content, '(?m)^log$')
 
2162
        self.assertContainsRe(content, 'this test errored')
 
2163
 
 
2164
    def test_skip_has_no_log(self):
 
2165
        content, result = self.run_subunit_stream('test_skip')
 
2166
        self.assertNotContainsRe(content, '(?m)^log$')
 
2167
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2168
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2169
        skips = result.skip_reasons['reason']
 
2170
        self.assertEqual(1, len(skips))
 
2171
        test = skips[0]
 
2172
        # RemotedTestCase doesn't preserve the "details"
 
2173
        ## self.assertFalse('log' in test.getDetails())
 
2174
 
 
2175
    def test_missing_feature_has_no_log(self):
 
2176
        content, result = self.run_subunit_stream('test_missing_feature')
 
2177
        self.assertNotContainsRe(content, '(?m)^log$')
 
2178
        self.assertNotContainsRe(content, 'missing the feature')
 
2179
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2180
        skips = result.skip_reasons['_MissingFeature\n']
 
2181
        self.assertEqual(1, len(skips))
 
2182
        test = skips[0]
 
2183
        # RemotedTestCase doesn't preserve the "details"
 
2184
        ## self.assertFalse('log' in test.getDetails())
 
2185
 
 
2186
    def test_xfail_has_no_log(self):
 
2187
        content, result = self.run_subunit_stream('test_xfail')
 
2188
        self.assertNotContainsRe(content, '(?m)^log$')
 
2189
        self.assertNotContainsRe(content, 'test with expected failure')
 
2190
        self.assertEqual(1, len(result.expectedFailures))
 
2191
        result_content = result.expectedFailures[0][1]
 
2192
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2193
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2194
 
 
2195
    def test_unexpected_success_has_log(self):
 
2196
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2197
        self.assertContainsRe(content, '(?m)^log$')
 
2198
        self.assertContainsRe(content, 'test with unexpected success')
 
2199
        # GZ 2011-05-18: Old versions of subunit treat unexpected success as a
 
2200
        #                success, if a min version check is added remove this
 
2201
        from subunit import TestProtocolClient as _Client
 
2202
        if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
 
2203
            self.expectFailure('subunit treats "unexpectedSuccess"'
 
2204
                               ' as a plain success',
 
2205
                self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2206
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2207
        test = result.unexpectedSuccesses[0]
 
2208
        # RemotedTestCase doesn't preserve the "details"
 
2209
        ## self.assertTrue('log' in test.getDetails())
 
2210
 
 
2211
    def test_success_has_no_log(self):
 
2212
        content, result = self.run_subunit_stream('test_success')
 
2213
        self.assertEqual(1, result.testsRun)
 
2214
        self.assertNotContainsRe(content, '(?m)^log$')
 
2215
        self.assertNotContainsRe(content, 'this test succeeds')
 
2216
 
 
2217
 
 
2218
class TestRunBzr(tests.TestCase):
 
2219
 
 
2220
    out = ''
 
2221
    err = ''
 
2222
 
 
2223
    def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
 
2224
                         working_dir=None):
 
2225
        """Override _run_bzr_core to test how it is invoked by run_bzr.
 
2226
 
 
2227
        Attempts to run bzr from inside this class don't actually run it.
 
2228
 
 
2229
        We test how run_bzr actually invokes bzr in another location.  Here we
 
2230
        only need to test that it passes the right parameters to run_bzr.
 
2231
        """
 
2232
        self.argv = list(argv)
 
2233
        self.retcode = retcode
 
2234
        self.encoding = encoding
 
2235
        self.stdin = stdin
 
2236
        self.working_dir = working_dir
 
2237
        return self.retcode, self.out, self.err
 
2238
 
 
2239
    def test_run_bzr_error(self):
 
2240
        self.out = "It sure does!\n"
 
2241
        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
 
2242
        self.assertEqual(['rocks'], self.argv)
 
2243
        self.assertEqual(34, self.retcode)
 
2244
        self.assertEqual('It sure does!\n', out)
 
2245
        self.assertEquals(out, self.out)
 
2246
        self.assertEqual('', err)
 
2247
        self.assertEquals(err, self.err)
 
2248
 
 
2249
    def test_run_bzr_error_regexes(self):
 
2250
        self.out = ''
 
2251
        self.err = "bzr: ERROR: foobarbaz is not versioned"
 
2252
        out, err = self.run_bzr_error(
 
2253
            ["bzr: ERROR: foobarbaz is not versioned"],
 
2254
            ['file-id', 'foobarbaz'])
 
2255
 
 
2256
    def test_encoding(self):
 
2257
        """Test that run_bzr passes encoding to _run_bzr_core"""
 
2258
        self.run_bzr('foo bar')
 
2259
        self.assertEqual(None, self.encoding)
 
2260
        self.assertEqual(['foo', 'bar'], self.argv)
 
2261
 
 
2262
        self.run_bzr('foo bar', encoding='baz')
 
2263
        self.assertEqual('baz', self.encoding)
 
2264
        self.assertEqual(['foo', 'bar'], self.argv)
 
2265
 
 
2266
    def test_retcode(self):
 
2267
        """Test that run_bzr passes retcode to _run_bzr_core"""
 
2268
        # Default is retcode == 0
 
2269
        self.run_bzr('foo bar')
 
2270
        self.assertEqual(0, self.retcode)
 
2271
        self.assertEqual(['foo', 'bar'], self.argv)
 
2272
 
 
2273
        self.run_bzr('foo bar', retcode=1)
 
2274
        self.assertEqual(1, self.retcode)
 
2275
        self.assertEqual(['foo', 'bar'], self.argv)
 
2276
 
 
2277
        self.run_bzr('foo bar', retcode=None)
 
2278
        self.assertEqual(None, self.retcode)
 
2279
        self.assertEqual(['foo', 'bar'], self.argv)
 
2280
 
 
2281
        self.run_bzr(['foo', 'bar'], retcode=3)
 
2282
        self.assertEqual(3, self.retcode)
 
2283
        self.assertEqual(['foo', 'bar'], self.argv)
 
2284
 
 
2285
    def test_stdin(self):
 
2286
        # test that the stdin keyword to run_bzr is passed through to
 
2287
        # _run_bzr_core as-is. We do this by overriding
 
2288
        # _run_bzr_core in this class, and then calling run_bzr,
 
2289
        # which is a convenience function for _run_bzr_core, so
 
2290
        # should invoke it.
 
2291
        self.run_bzr('foo bar', stdin='gam')
 
2292
        self.assertEqual('gam', self.stdin)
 
2293
        self.assertEqual(['foo', 'bar'], self.argv)
 
2294
 
 
2295
        self.run_bzr('foo bar', stdin='zippy')
 
2296
        self.assertEqual('zippy', self.stdin)
 
2297
        self.assertEqual(['foo', 'bar'], self.argv)
 
2298
 
 
2299
    def test_working_dir(self):
 
2300
        """Test that run_bzr passes working_dir to _run_bzr_core"""
 
2301
        self.run_bzr('foo bar')
 
2302
        self.assertEqual(None, self.working_dir)
 
2303
        self.assertEqual(['foo', 'bar'], self.argv)
 
2304
 
 
2305
        self.run_bzr('foo bar', working_dir='baz')
 
2306
        self.assertEqual('baz', self.working_dir)
 
2307
        self.assertEqual(['foo', 'bar'], self.argv)
 
2308
 
 
2309
    def test_reject_extra_keyword_arguments(self):
 
2310
        self.assertRaises(TypeError, self.run_bzr, "foo bar",
 
2311
                          error_regex=['error message'])
 
2312
 
 
2313
 
 
2314
class TestRunBzrCaptured(tests.TestCaseWithTransport):
 
2315
    # Does IO when testing the working_dir parameter.
 
2316
 
 
2317
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
2318
                         a_callable=None, *args, **kwargs):
 
2319
        self.stdin = stdin
 
2320
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
2321
        self.factory = bzrlib.ui.ui_factory
 
2322
        self.working_dir = osutils.getcwd()
 
2323
        stdout.write('foo\n')
 
2324
        stderr.write('bar\n')
 
2325
        return 0
 
2326
 
 
2327
    def test_stdin(self):
 
2328
        # test that the stdin keyword to _run_bzr_core is passed through to
 
2329
        # apply_redirected as a StringIO. We do this by overriding
 
2330
        # apply_redirected in this class, and then calling _run_bzr_core,
 
2331
        # which calls apply_redirected.
 
2332
        self.run_bzr(['foo', 'bar'], stdin='gam')
 
2333
        self.assertEqual('gam', self.stdin.read())
 
2334
        self.assertTrue(self.stdin is self.factory_stdin)
 
2335
        self.run_bzr(['foo', 'bar'], stdin='zippy')
 
2336
        self.assertEqual('zippy', self.stdin.read())
 
2337
        self.assertTrue(self.stdin is self.factory_stdin)
 
2338
 
 
2339
    def test_ui_factory(self):
 
2340
        # each invocation of self.run_bzr should get its
 
2341
        # own UI factory, which is an instance of TestUIFactory,
 
2342
        # with stdin, stdout and stderr attached to the stdin,
 
2343
        # stdout and stderr of the invoked run_bzr
 
2344
        current_factory = bzrlib.ui.ui_factory
 
2345
        self.run_bzr(['foo'])
 
2346
        self.assertFalse(current_factory is self.factory)
 
2347
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
2348
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
2349
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
2350
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
2351
        self.assertIsInstance(self.factory, tests.TestUIFactory)
 
2352
 
 
2353
    def test_working_dir(self):
 
2354
        self.build_tree(['one/', 'two/'])
 
2355
        cwd = osutils.getcwd()
 
2356
 
 
2357
        # Default is to work in the current directory
 
2358
        self.run_bzr(['foo', 'bar'])
 
2359
        self.assertEqual(cwd, self.working_dir)
 
2360
 
 
2361
        self.run_bzr(['foo', 'bar'], working_dir=None)
 
2362
        self.assertEqual(cwd, self.working_dir)
 
2363
 
 
2364
        # The function should be run in the alternative directory
 
2365
        # but afterwards the current working dir shouldn't be changed
 
2366
        self.run_bzr(['foo', 'bar'], working_dir='one')
 
2367
        self.assertNotEqual(cwd, self.working_dir)
 
2368
        self.assertEndsWith(self.working_dir, 'one')
 
2369
        self.assertEqual(cwd, osutils.getcwd())
 
2370
 
 
2371
        self.run_bzr(['foo', 'bar'], working_dir='two')
 
2372
        self.assertNotEqual(cwd, self.working_dir)
 
2373
        self.assertEndsWith(self.working_dir, 'two')
 
2374
        self.assertEqual(cwd, osutils.getcwd())
 
2375
 
 
2376
 
 
2377
class StubProcess(object):
 
2378
    """A stub process for testing run_bzr_subprocess."""
 
2379
    
 
2380
    def __init__(self, out="", err="", retcode=0):
 
2381
        self.out = out
 
2382
        self.err = err
 
2383
        self.returncode = retcode
 
2384
 
 
2385
    def communicate(self):
 
2386
        return self.out, self.err
 
2387
 
 
2388
 
 
2389
class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
 
2390
    """Base class for tests testing how we might run bzr."""
 
2391
 
 
2392
    def setUp(self):
 
2393
        tests.TestCaseWithTransport.setUp(self)
 
2394
        self.subprocess_calls = []
 
2395
 
 
2396
    def start_bzr_subprocess(self, process_args, env_changes=None,
 
2397
                             skip_if_plan_to_signal=False,
 
2398
                             working_dir=None,
 
2399
                             allow_plugins=False):
 
2400
        """capture what run_bzr_subprocess tries to do."""
 
2401
        self.subprocess_calls.append({'process_args':process_args,
 
2402
            'env_changes':env_changes,
 
2403
            'skip_if_plan_to_signal':skip_if_plan_to_signal,
 
2404
            'working_dir':working_dir, 'allow_plugins':allow_plugins})
 
2405
        return self.next_subprocess
 
2406
 
 
2407
 
 
2408
class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2409
 
 
2410
    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
 
2411
        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
 
2412
 
 
2413
        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
 
2414
        that will return static results. This assertion method populates those
 
2415
        results and also checks the arguments run_bzr_subprocess generates.
 
2416
        """
 
2417
        self.next_subprocess = process
 
2418
        try:
 
2419
            result = self.run_bzr_subprocess(*args, **kwargs)
 
2420
        except:
 
2421
            self.next_subprocess = None
 
2422
            for key, expected in expected_args.iteritems():
 
2423
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2424
            raise
 
2425
        else:
 
2426
            self.next_subprocess = None
 
2427
            for key, expected in expected_args.iteritems():
 
2428
                self.assertEqual(expected, self.subprocess_calls[-1][key])
 
2429
            return result
 
2430
 
 
2431
    def test_run_bzr_subprocess(self):
 
2432
        """The run_bzr_helper_external command behaves nicely."""
 
2433
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2434
            StubProcess(), '--version')
 
2435
        self.assertRunBzrSubprocess({'process_args':['--version']},
 
2436
            StubProcess(), ['--version'])
 
2437
        # retcode=None disables retcode checking
 
2438
        result = self.assertRunBzrSubprocess({},
 
2439
            StubProcess(retcode=3), '--version', retcode=None)
 
2440
        result = self.assertRunBzrSubprocess({},
 
2441
            StubProcess(out="is free software"), '--version')
 
2442
        self.assertContainsRe(result[0], 'is free software')
 
2443
        # Running a subcommand that is missing errors
 
2444
        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
 
2445
            {'process_args':['--versionn']}, StubProcess(retcode=3),
 
2446
            '--versionn')
 
2447
        # Unless it is told to expect the error from the subprocess
 
2448
        result = self.assertRunBzrSubprocess({},
 
2449
            StubProcess(retcode=3), '--versionn', retcode=3)
 
2450
        # Or to ignore retcode checking
 
2451
        result = self.assertRunBzrSubprocess({},
 
2452
            StubProcess(err="unknown command", retcode=3), '--versionn',
 
2453
            retcode=None)
 
2454
        self.assertContainsRe(result[1], 'unknown command')
 
2455
 
 
2456
    def test_env_change_passes_through(self):
 
2457
        self.assertRunBzrSubprocess(
 
2458
            {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
 
2459
            StubProcess(), '',
 
2460
            env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
 
2461
 
 
2462
    def test_no_working_dir_passed_as_None(self):
 
2463
        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
 
2464
 
 
2465
    def test_no_working_dir_passed_through(self):
 
2466
        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
 
2467
            working_dir='dir')
 
2468
 
 
2469
    def test_run_bzr_subprocess_no_plugins(self):
 
2470
        self.assertRunBzrSubprocess({'allow_plugins': False},
 
2471
            StubProcess(), '')
 
2472
 
 
2473
    def test_allow_plugins(self):
 
2474
        self.assertRunBzrSubprocess({'allow_plugins': True},
 
2475
            StubProcess(), '', allow_plugins=True)
 
2476
 
 
2477
 
 
2478
class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
 
2479
 
 
2480
    def test_finish_bzr_subprocess_with_error(self):
 
2481
        """finish_bzr_subprocess allows specification of the desired exit code.
 
2482
        """
 
2483
        process = StubProcess(err="unknown command", retcode=3)
 
2484
        result = self.finish_bzr_subprocess(process, retcode=3)
 
2485
        self.assertEqual('', result[0])
 
2486
        self.assertContainsRe(result[1], 'unknown command')
 
2487
 
 
2488
    def test_finish_bzr_subprocess_ignoring_retcode(self):
 
2489
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
2490
        process = StubProcess(err="unknown command", retcode=3)
 
2491
        result = self.finish_bzr_subprocess(process, retcode=None)
 
2492
        self.assertEqual('', result[0])
 
2493
        self.assertContainsRe(result[1], 'unknown command')
 
2494
 
 
2495
    def test_finish_subprocess_with_unexpected_retcode(self):
 
2496
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2497
        not the expected one.
 
2498
        """
 
2499
        process = StubProcess(err="unknown command", retcode=3)
 
2500
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
2501
                          process)
 
2502
 
 
2503
 
 
2504
class _DontSpawnProcess(Exception):
 
2505
    """A simple exception which just allows us to skip unnecessary steps"""
 
2506
 
 
2507
 
 
2508
class TestStartBzrSubProcess(tests.TestCase):
 
2509
 
 
2510
    def check_popen_state(self):
 
2511
        """Replace to make assertions when popen is called."""
 
2512
 
 
2513
    def _popen(self, *args, **kwargs):
 
2514
        """Record the command that is run, so that we can ensure it is correct"""
 
2515
        self.check_popen_state()
 
2516
        self._popen_args = args
 
2517
        self._popen_kwargs = kwargs
 
2518
        raise _DontSpawnProcess()
 
2519
 
 
2520
    def test_run_bzr_subprocess_no_plugins(self):
 
2521
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
 
2522
        command = self._popen_args[0]
 
2523
        self.assertEqual(sys.executable, command[0])
 
2524
        self.assertEqual(self.get_bzr_path(), command[1])
 
2525
        self.assertEqual(['--no-plugins'], command[2:])
 
2526
 
 
2527
    def test_allow_plugins(self):
 
2528
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2529
            allow_plugins=True)
 
2530
        command = self._popen_args[0]
 
2531
        self.assertEqual([], command[2:])
 
2532
 
 
2533
    def test_set_env(self):
 
2534
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2535
        # set in the child
 
2536
        def check_environment():
 
2537
            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2538
        self.check_popen_state = check_environment
 
2539
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2540
            env_changes={'EXISTANT_ENV_VAR':'set variable'})
 
2541
        # not set in theparent
 
2542
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2543
 
 
2544
    def test_run_bzr_subprocess_env_del(self):
 
2545
        """run_bzr_subprocess can remove environment variables too."""
 
2546
        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2547
        def check_environment():
 
2548
            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
 
2549
        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
 
2550
        self.check_popen_state = check_environment
 
2551
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2552
            env_changes={'EXISTANT_ENV_VAR':None})
 
2553
        # Still set in parent
 
2554
        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
 
2555
        del os.environ['EXISTANT_ENV_VAR']
 
2556
 
 
2557
    def test_env_del_missing(self):
 
2558
        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2559
        def check_environment():
 
2560
            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
 
2561
        self.check_popen_state = check_environment
 
2562
        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2563
            env_changes={'NON_EXISTANT_ENV_VAR':None})
 
2564
 
 
2565
    def test_working_dir(self):
 
2566
        """Test that we can specify the working dir for the child"""
 
2567
        orig_getcwd = osutils.getcwd
 
2568
        orig_chdir = os.chdir
 
2569
        chdirs = []
 
2570
        def chdir(path):
 
2571
            chdirs.append(path)
 
2572
        os.chdir = chdir
 
2573
        try:
 
2574
            def getcwd():
 
2575
                return 'current'
 
2576
            osutils.getcwd = getcwd
 
2577
            try:
 
2578
                self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
 
2579
                    working_dir='foo')
 
2580
            finally:
 
2581
                osutils.getcwd = orig_getcwd
 
2582
        finally:
 
2583
            os.chdir = orig_chdir
 
2584
        self.assertEqual(['foo', 'current'], chdirs)
 
2585
 
 
2586
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2587
        self.get_source_path = lambda: ""
 
2588
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2589
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2590
 
 
2591
 
 
2592
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
 
2593
    """Tests that really need to do things with an external bzr."""
 
2594
 
 
2595
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
2596
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
2597
        not the expected one.
 
2598
        """
 
2599
        self.disable_missing_extensions_warning()
 
2600
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
2601
                                            skip_if_plan_to_signal=True)
 
2602
        self.assertEqual('running\n', process.stdout.readline())
 
2603
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
2604
                                            retcode=3)
 
2605
        self.assertEqual('', result[0])
 
2606
        self.assertEqual('bzr: interrupted\n', result[1])
 
2607
 
 
2608
 
 
2609
class TestFeature(tests.TestCase):
 
2610
 
 
2611
    def test_caching(self):
 
2612
        """Feature._probe is called by the feature at most once."""
 
2613
        class InstrumentedFeature(tests.Feature):
 
2614
            def __init__(self):
 
2615
                super(InstrumentedFeature, self).__init__()
 
2616
                self.calls = []
 
2617
            def _probe(self):
 
2618
                self.calls.append('_probe')
 
2619
                return False
 
2620
        feature = InstrumentedFeature()
 
2621
        feature.available()
 
2622
        self.assertEqual(['_probe'], feature.calls)
 
2623
        feature.available()
 
2624
        self.assertEqual(['_probe'], feature.calls)
 
2625
 
 
2626
    def test_named_str(self):
 
2627
        """Feature.__str__ should thunk to feature_name()."""
 
2628
        class NamedFeature(tests.Feature):
 
2629
            def feature_name(self):
 
2630
                return 'symlinks'
 
2631
        feature = NamedFeature()
 
2632
        self.assertEqual('symlinks', str(feature))
 
2633
 
 
2634
    def test_default_str(self):
 
2635
        """Feature.__str__ should default to __class__.__name__."""
 
2636
        class NamedFeature(tests.Feature):
 
2637
            pass
 
2638
        feature = NamedFeature()
 
2639
        self.assertEqual('NamedFeature', str(feature))
 
2640
 
 
2641
 
 
2642
class TestUnavailableFeature(tests.TestCase):
 
2643
 
 
2644
    def test_access_feature(self):
 
2645
        feature = tests.Feature()
 
2646
        exception = tests.UnavailableFeature(feature)
 
2647
        self.assertIs(feature, exception.args[0])
 
2648
 
 
2649
 
 
2650
simple_thunk_feature = tests._CompatabilityThunkFeature(
 
2651
    deprecated_in((2, 1, 0)),
 
2652
    'bzrlib.tests.test_selftest',
 
2653
    'simple_thunk_feature','UnicodeFilename',
 
2654
    replacement_module='bzrlib.tests'
 
2655
    )
 
2656
 
 
2657
class Test_CompatibilityFeature(tests.TestCase):
 
2658
 
 
2659
    def test_does_thunk(self):
 
2660
        res = self.callDeprecated(
 
2661
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
 
2662
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
 
2663
            simple_thunk_feature.available)
 
2664
        self.assertEqual(tests.UnicodeFilename.available(), res)
 
2665
 
 
2666
 
 
2667
class TestModuleAvailableFeature(tests.TestCase):
 
2668
 
 
2669
    def test_available_module(self):
 
2670
        feature = tests.ModuleAvailableFeature('bzrlib.tests')
 
2671
        self.assertEqual('bzrlib.tests', feature.module_name)
 
2672
        self.assertEqual('bzrlib.tests', str(feature))
 
2673
        self.assertTrue(feature.available())
 
2674
        self.assertIs(tests, feature.module)
 
2675
 
 
2676
    def test_unavailable_module(self):
 
2677
        feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
 
2678
        self.assertEqual('bzrlib.no_such_module_exists', str(feature))
 
2679
        self.assertFalse(feature.available())
 
2680
        self.assertIs(None, feature.module)
 
2681
 
 
2682
 
 
2683
class TestSelftestFiltering(tests.TestCase):
 
2684
 
 
2685
    def setUp(self):
 
2686
        tests.TestCase.setUp(self)
 
2687
        self.suite = TestUtil.TestSuite()
 
2688
        self.loader = TestUtil.TestLoader()
 
2689
        self.suite.addTest(self.loader.loadTestsFromModule(
 
2690
            sys.modules['bzrlib.tests.test_selftest']))
 
2691
        self.all_names = _test_ids(self.suite)
 
2692
 
 
2693
    def test_condition_id_re(self):
 
2694
        test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2695
            'test_condition_id_re')
 
2696
        filtered_suite = tests.filter_suite_by_condition(
 
2697
            self.suite, tests.condition_id_re('test_condition_id_re'))
 
2698
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2699
 
 
2700
    def test_condition_id_in_list(self):
 
2701
        test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2702
                      'test_condition_id_in_list']
 
2703
        id_list = tests.TestIdList(test_names)
 
2704
        filtered_suite = tests.filter_suite_by_condition(
 
2705
            self.suite, tests.condition_id_in_list(id_list))
 
2706
        my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
 
2707
        re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
 
2708
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2709
 
 
2710
    def test_condition_id_startswith(self):
 
2711
        klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2712
        start1 = klass + 'test_condition_id_starts'
 
2713
        start2 = klass + 'test_condition_id_in'
 
2714
        test_names = [ klass + 'test_condition_id_in_list',
 
2715
                      klass + 'test_condition_id_startswith',
 
2716
                     ]
 
2717
        filtered_suite = tests.filter_suite_by_condition(
 
2718
            self.suite, tests.condition_id_startswith([start1, start2]))
 
2719
        self.assertEqual(test_names, _test_ids(filtered_suite))
 
2720
 
 
2721
    def test_condition_isinstance(self):
 
2722
        filtered_suite = tests.filter_suite_by_condition(
 
2723
            self.suite, tests.condition_isinstance(self.__class__))
 
2724
        class_pattern = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2725
        re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
 
2726
        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
 
2727
 
 
2728
    def test_exclude_tests_by_condition(self):
 
2729
        excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2730
            'test_exclude_tests_by_condition')
 
2731
        filtered_suite = tests.exclude_tests_by_condition(self.suite,
 
2732
            lambda x:x.id() == excluded_name)
 
2733
        self.assertEqual(len(self.all_names) - 1,
 
2734
            filtered_suite.countTestCases())
 
2735
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2736
        remaining_names = list(self.all_names)
 
2737
        remaining_names.remove(excluded_name)
 
2738
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2739
 
 
2740
    def test_exclude_tests_by_re(self):
 
2741
        self.all_names = _test_ids(self.suite)
 
2742
        filtered_suite = tests.exclude_tests_by_re(self.suite,
 
2743
                                                   'exclude_tests_by_re')
 
2744
        excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2745
            'test_exclude_tests_by_re')
 
2746
        self.assertEqual(len(self.all_names) - 1,
 
2747
            filtered_suite.countTestCases())
 
2748
        self.assertFalse(excluded_name in _test_ids(filtered_suite))
 
2749
        remaining_names = list(self.all_names)
 
2750
        remaining_names.remove(excluded_name)
 
2751
        self.assertEqual(remaining_names, _test_ids(filtered_suite))
 
2752
 
 
2753
    def test_filter_suite_by_condition(self):
 
2754
        test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2755
            'test_filter_suite_by_condition')
 
2756
        filtered_suite = tests.filter_suite_by_condition(self.suite,
 
2757
            lambda x:x.id() == test_name)
 
2758
        self.assertEqual([test_name], _test_ids(filtered_suite))
 
2759
 
 
2760
    def test_filter_suite_by_re(self):
 
2761
        filtered_suite = tests.filter_suite_by_re(self.suite,
 
2762
                                                  'test_filter_suite_by_r')
 
2763
        filtered_names = _test_ids(filtered_suite)
 
2764
        self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.'
 
2765
            'TestSelftestFiltering.test_filter_suite_by_re'])
 
2766
 
 
2767
    def test_filter_suite_by_id_list(self):
 
2768
        test_list = ['bzrlib.tests.test_selftest.'
 
2769
                     'TestSelftestFiltering.test_filter_suite_by_id_list']
 
2770
        filtered_suite = tests.filter_suite_by_id_list(
 
2771
            self.suite, tests.TestIdList(test_list))
 
2772
        filtered_names = _test_ids(filtered_suite)
 
2773
        self.assertEqual(
 
2774
            filtered_names,
 
2775
            ['bzrlib.tests.test_selftest.'
 
2776
             'TestSelftestFiltering.test_filter_suite_by_id_list'])
 
2777
 
 
2778
    def test_filter_suite_by_id_startswith(self):
 
2779
        # By design this test may fail if another test is added whose name also
 
2780
        # begins with one of the start value used.
 
2781
        klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2782
        start1 = klass + 'test_filter_suite_by_id_starts'
 
2783
        start2 = klass + 'test_filter_suite_by_id_li'
 
2784
        test_list = [klass + 'test_filter_suite_by_id_list',
 
2785
                     klass + 'test_filter_suite_by_id_startswith',
 
2786
                     ]
 
2787
        filtered_suite = tests.filter_suite_by_id_startswith(
 
2788
            self.suite, [start1, start2])
 
2789
        self.assertEqual(
 
2790
            test_list,
 
2791
            _test_ids(filtered_suite),
 
2792
            )
 
2793
 
 
2794
    def test_preserve_input(self):
 
2795
        # NB: Surely this is something in the stdlib to do this?
 
2796
        self.assertTrue(self.suite is tests.preserve_input(self.suite))
 
2797
        self.assertTrue("@#$" is tests.preserve_input("@#$"))
 
2798
 
 
2799
    def test_randomize_suite(self):
 
2800
        randomized_suite = tests.randomize_suite(self.suite)
 
2801
        # randomizing should not add or remove test names.
 
2802
        self.assertEqual(set(_test_ids(self.suite)),
 
2803
                         set(_test_ids(randomized_suite)))
 
2804
        # Technically, this *can* fail, because random.shuffle(list) can be
 
2805
        # equal to list. Trying multiple times just pushes the frequency back.
 
2806
        # As its len(self.all_names)!:1, the failure frequency should be low
 
2807
        # enough to ignore. RBC 20071021.
 
2808
        # It should change the order.
 
2809
        self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
 
2810
        # But not the length. (Possibly redundant with the set test, but not
 
2811
        # necessarily.)
 
2812
        self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
 
2813
 
 
2814
    def test_split_suit_by_condition(self):
 
2815
        self.all_names = _test_ids(self.suite)
 
2816
        condition = tests.condition_id_re('test_filter_suite_by_r')
 
2817
        split_suite = tests.split_suite_by_condition(self.suite, condition)
 
2818
        filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2819
            'test_filter_suite_by_re')
 
2820
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2821
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2822
        remaining_names = list(self.all_names)
 
2823
        remaining_names.remove(filtered_name)
 
2824
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2825
 
 
2826
    def test_split_suit_by_re(self):
 
2827
        self.all_names = _test_ids(self.suite)
 
2828
        split_suite = tests.split_suite_by_re(self.suite,
 
2829
                                              'test_filter_suite_by_r')
 
2830
        filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
 
2831
            'test_filter_suite_by_re')
 
2832
        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
 
2833
        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
 
2834
        remaining_names = list(self.all_names)
 
2835
        remaining_names.remove(filtered_name)
 
2836
        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
 
2837
 
 
2838
 
 
2839
class TestCheckTreeShape(tests.TestCaseWithTransport):
 
2840
 
 
2841
    def test_check_tree_shape(self):
 
2842
        files = ['a', 'b/', 'b/c']
 
2843
        tree = self.make_branch_and_tree('.')
 
2844
        self.build_tree(files)
 
2845
        tree.add(files)
 
2846
        tree.lock_read()
 
2847
        try:
 
2848
            self.check_tree_shape(tree, files)
 
2849
        finally:
 
2850
            tree.unlock()
 
2851
 
 
2852
 
 
2853
class TestBlackboxSupport(tests.TestCase):
 
2854
    """Tests for testsuite blackbox features."""
 
2855
 
 
2856
    def test_run_bzr_failure_not_caught(self):
 
2857
        # When we run bzr in blackbox mode, we want any unexpected errors to
 
2858
        # propagate up to the test suite so that it can show the error in the
 
2859
        # usual way, and we won't get a double traceback.
 
2860
        e = self.assertRaises(
 
2861
            AssertionError,
 
2862
            self.run_bzr, ['assert-fail'])
 
2863
        # make sure we got the real thing, not an error from somewhere else in
 
2864
        # the test framework
 
2865
        self.assertEquals('always fails', str(e))
 
2866
        # check that there's no traceback in the test log
 
2867
        self.assertNotContainsRe(self.get_log(), r'Traceback')
 
2868
 
 
2869
    def test_run_bzr_user_error_caught(self):
 
2870
        # Running bzr in blackbox mode, normal/expected/user errors should be
 
2871
        # caught in the regular way and turned into an error message plus exit
 
2872
        # code.
 
2873
        transport_server = memory.MemoryServer()
 
2874
        transport_server.start_server()
 
2875
        self.addCleanup(transport_server.stop_server)
 
2876
        url = transport_server.get_url()
 
2877
        self.permit_url(url)
 
2878
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
 
2879
        self.assertEqual(out, '')
 
2880
        self.assertContainsRe(err,
 
2881
            'bzr: ERROR: Not a branch: ".*nonexistantpath/".\n')
 
2882
 
 
2883
 
 
2884
class TestTestLoader(tests.TestCase):
 
2885
    """Tests for the test loader."""
 
2886
 
 
2887
    def _get_loader_and_module(self):
 
2888
        """Gets a TestLoader and a module with one test in it."""
 
2889
        loader = TestUtil.TestLoader()
 
2890
        module = {}
 
2891
        class Stub(tests.TestCase):
 
2892
            def test_foo(self):
 
2893
                pass
 
2894
        class MyModule(object):
 
2895
            pass
 
2896
        MyModule.a_class = Stub
 
2897
        module = MyModule()
 
2898
        return loader, module
 
2899
 
 
2900
    def test_module_no_load_tests_attribute_loads_classes(self):
 
2901
        loader, module = self._get_loader_and_module()
 
2902
        self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases())
 
2903
 
 
2904
    def test_module_load_tests_attribute_gets_called(self):
 
2905
        loader, module = self._get_loader_and_module()
 
2906
        # 'self' is here because we're faking the module with a class. Regular
 
2907
        # load_tests do not need that :)
 
2908
        def load_tests(self, standard_tests, module, loader):
 
2909
            result = loader.suiteClass()
 
2910
            for test in tests.iter_suite_tests(standard_tests):
 
2911
                result.addTests([test, test])
 
2912
            return result
 
2913
        # add a load_tests() method which multiplies the tests from the module.
 
2914
        module.__class__.load_tests = load_tests
 
2915
        self.assertEqual(2, loader.loadTestsFromModule(module).countTestCases())
 
2916
 
 
2917
    def test_load_tests_from_module_name_smoke_test(self):
 
2918
        loader = TestUtil.TestLoader()
 
2919
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2920
        self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
 
2921
                          _test_ids(suite))
 
2922
 
 
2923
    def test_load_tests_from_module_name_with_bogus_module_name(self):
 
2924
        loader = TestUtil.TestLoader()
 
2925
        self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
 
2926
 
 
2927
 
 
2928
class TestTestIdList(tests.TestCase):
 
2929
 
 
2930
    def _create_id_list(self, test_list):
 
2931
        return tests.TestIdList(test_list)
 
2932
 
 
2933
    def _create_suite(self, test_id_list):
 
2934
 
 
2935
        class Stub(tests.TestCase):
 
2936
            def test_foo(self):
 
2937
                pass
 
2938
 
 
2939
        def _create_test_id(id):
 
2940
            return lambda: id
 
2941
 
 
2942
        suite = TestUtil.TestSuite()
 
2943
        for id in test_id_list:
 
2944
            t  = Stub('test_foo')
 
2945
            t.id = _create_test_id(id)
 
2946
            suite.addTest(t)
 
2947
        return suite
 
2948
 
 
2949
    def _test_ids(self, test_suite):
 
2950
        """Get the ids for the tests in a test suite."""
 
2951
        return [t.id() for t in tests.iter_suite_tests(test_suite)]
 
2952
 
 
2953
    def test_empty_list(self):
 
2954
        id_list = self._create_id_list([])
 
2955
        self.assertEquals({}, id_list.tests)
 
2956
        self.assertEquals({}, id_list.modules)
 
2957
 
 
2958
    def test_valid_list(self):
 
2959
        id_list = self._create_id_list(
 
2960
            ['mod1.cl1.meth1', 'mod1.cl1.meth2',
 
2961
             'mod1.func1', 'mod1.cl2.meth2',
 
2962
             'mod1.submod1',
 
2963
             'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
 
2964
             ])
 
2965
        self.assertTrue(id_list.refers_to('mod1'))
 
2966
        self.assertTrue(id_list.refers_to('mod1.submod1'))
 
2967
        self.assertTrue(id_list.refers_to('mod1.submod2'))
 
2968
        self.assertTrue(id_list.includes('mod1.cl1.meth1'))
 
2969
        self.assertTrue(id_list.includes('mod1.submod1'))
 
2970
        self.assertTrue(id_list.includes('mod1.func1'))
 
2971
 
 
2972
    def test_bad_chars_in_params(self):
 
2973
        id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
 
2974
        self.assertTrue(id_list.refers_to('mod1'))
 
2975
        self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
 
2976
 
 
2977
    def test_module_used(self):
 
2978
        id_list = self._create_id_list(['mod.class.meth'])
 
2979
        self.assertTrue(id_list.refers_to('mod'))
 
2980
        self.assertTrue(id_list.refers_to('mod.class'))
 
2981
        self.assertTrue(id_list.refers_to('mod.class.meth'))
 
2982
 
 
2983
    def test_test_suite_matches_id_list_with_unknown(self):
 
2984
        loader = TestUtil.TestLoader()
 
2985
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2986
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',
 
2987
                     'bogus']
 
2988
        not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
 
2989
        self.assertEquals(['bogus'], not_found)
 
2990
        self.assertEquals([], duplicates)
 
2991
 
 
2992
    def test_suite_matches_id_list_with_duplicates(self):
 
2993
        loader = TestUtil.TestLoader()
 
2994
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
2995
        dupes = loader.suiteClass()
 
2996
        for test in tests.iter_suite_tests(suite):
 
2997
            dupes.addTest(test)
 
2998
            dupes.addTest(test) # Add it again
 
2999
 
 
3000
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',]
 
3001
        not_found, duplicates = tests.suite_matches_id_list(
 
3002
            dupes, test_list)
 
3003
        self.assertEquals([], not_found)
 
3004
        self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'],
 
3005
                          duplicates)
 
3006
 
 
3007
 
 
3008
class TestTestSuite(tests.TestCase):
 
3009
 
 
3010
    def test__test_suite_testmod_names(self):
 
3011
        # Test that a plausible list of test module names are returned
 
3012
        # by _test_suite_testmod_names.
 
3013
        test_list = tests._test_suite_testmod_names()
 
3014
        self.assertSubset([
 
3015
            'bzrlib.tests.blackbox',
 
3016
            'bzrlib.tests.per_transport',
 
3017
            'bzrlib.tests.test_selftest',
 
3018
            ],
 
3019
            test_list)
 
3020
 
 
3021
    def test__test_suite_modules_to_doctest(self):
 
3022
        # Test that a plausible list of modules to doctest is returned
 
3023
        # by _test_suite_modules_to_doctest.
 
3024
        test_list = tests._test_suite_modules_to_doctest()
 
3025
        if __doc__ is None:
 
3026
            # When docstrings are stripped, there are no modules to doctest
 
3027
            self.assertEqual([], test_list)
 
3028
            return
 
3029
        self.assertSubset([
 
3030
            'bzrlib.timestamp',
 
3031
            ],
 
3032
            test_list)
 
3033
 
 
3034
    def test_test_suite(self):
 
3035
        # test_suite() loads the entire test suite to operate. To avoid this
 
3036
        # overhead, and yet still be confident that things are happening,
 
3037
        # we temporarily replace two functions used by test_suite with 
 
3038
        # test doubles that supply a few sample tests to load, and check they
 
3039
        # are loaded.
 
3040
        calls = []
 
3041
        def testmod_names():
 
3042
            calls.append("testmod_names")
 
3043
            return [
 
3044
                'bzrlib.tests.blackbox.test_branch',
 
3045
                'bzrlib.tests.per_transport',
 
3046
                'bzrlib.tests.test_selftest',
 
3047
                ]
 
3048
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
 
3049
        def doctests():
 
3050
            calls.append("modules_to_doctest")
 
3051
            if __doc__ is None:
 
3052
                return []
 
3053
            return ['bzrlib.timestamp']
 
3054
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
 
3055
        expected_test_list = [
 
3056
            # testmod_names
 
3057
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
 
3058
            ('bzrlib.tests.per_transport.TransportTests'
 
3059
             '.test_abspath(LocalTransport,LocalURLServer)'),
 
3060
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
 
3061
            # plugins can't be tested that way since selftest may be run with
 
3062
            # --no-plugins
 
3063
            ]
 
3064
        if __doc__ is not None:
 
3065
            expected_test_list.extend([
 
3066
                # modules_to_doctest
 
3067
                'bzrlib.timestamp.format_highres_date',
 
3068
                ])
 
3069
        suite = tests.test_suite()
 
3070
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
 
3071
            set(calls))
 
3072
        self.assertSubset(expected_test_list, _test_ids(suite))
 
3073
 
 
3074
    def test_test_suite_list_and_start(self):
 
3075
        # We cannot test this at the same time as the main load, because we want
 
3076
        # to know that starting_with == None works. So a second load is
 
3077
        # incurred - note that the starting_with parameter causes a partial load
 
3078
        # rather than a full load so this test should be pretty quick.
 
3079
        test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
 
3080
        suite = tests.test_suite(test_list,
 
3081
                                 ['bzrlib.tests.test_selftest.TestTestSuite'])
 
3082
        # test_test_suite_list_and_start is not included 
 
3083
        self.assertEquals(test_list, _test_ids(suite))
 
3084
 
 
3085
 
 
3086
class TestLoadTestIdList(tests.TestCaseInTempDir):
 
3087
 
 
3088
    def _create_test_list_file(self, file_name, content):
 
3089
        fl = open(file_name, 'wt')
 
3090
        fl.write(content)
 
3091
        fl.close()
 
3092
 
 
3093
    def test_load_unknown(self):
 
3094
        self.assertRaises(errors.NoSuchFile,
 
3095
                          tests.load_test_id_list, 'i_do_not_exist')
 
3096
 
 
3097
    def test_load_test_list(self):
 
3098
        test_list_fname = 'test.list'
 
3099
        self._create_test_list_file(test_list_fname,
 
3100
                                    'mod1.cl1.meth1\nmod2.cl2.meth2\n')
 
3101
        tlist = tests.load_test_id_list(test_list_fname)
 
3102
        self.assertEquals(2, len(tlist))
 
3103
        self.assertEquals('mod1.cl1.meth1', tlist[0])
 
3104
        self.assertEquals('mod2.cl2.meth2', tlist[1])
 
3105
 
 
3106
    def test_load_dirty_file(self):
 
3107
        test_list_fname = 'test.list'
 
3108
        self._create_test_list_file(test_list_fname,
 
3109
                                    '  mod1.cl1.meth1\n\nmod2.cl2.meth2  \n'
 
3110
                                    'bar baz\n')
 
3111
        tlist = tests.load_test_id_list(test_list_fname)
 
3112
        self.assertEquals(4, len(tlist))
 
3113
        self.assertEquals('mod1.cl1.meth1', tlist[0])
 
3114
        self.assertEquals('', tlist[1])
 
3115
        self.assertEquals('mod2.cl2.meth2', tlist[2])
 
3116
        self.assertEquals('bar baz', tlist[3])
 
3117
 
 
3118
 
 
3119
class TestFilteredByModuleTestLoader(tests.TestCase):
 
3120
 
 
3121
    def _create_loader(self, test_list):
 
3122
        id_filter = tests.TestIdList(test_list)
 
3123
        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
 
3124
        return loader
 
3125
 
 
3126
    def test_load_tests(self):
 
3127
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
 
3128
        loader = self._create_loader(test_list)
 
3129
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
3130
        self.assertEquals(test_list, _test_ids(suite))
 
3131
 
 
3132
    def test_exclude_tests(self):
 
3133
        test_list = ['bogus']
 
3134
        loader = self._create_loader(test_list)
 
3135
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
3136
        self.assertEquals([], _test_ids(suite))
 
3137
 
 
3138
 
 
3139
class TestFilteredByNameStartTestLoader(tests.TestCase):
 
3140
 
 
3141
    def _create_loader(self, name_start):
 
3142
        def needs_module(name):
 
3143
            return name.startswith(name_start) or name_start.startswith(name)
 
3144
        loader = TestUtil.FilteredByModuleTestLoader(needs_module)
 
3145
        return loader
 
3146
 
 
3147
    def test_load_tests(self):
 
3148
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
 
3149
        loader = self._create_loader('bzrlib.tests.test_samp')
 
3150
 
 
3151
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
3152
        self.assertEquals(test_list, _test_ids(suite))
 
3153
 
 
3154
    def test_load_tests_inside_module(self):
 
3155
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
 
3156
        loader = self._create_loader('bzrlib.tests.test_sampler.Demo')
 
3157
 
 
3158
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
3159
        self.assertEquals(test_list, _test_ids(suite))
 
3160
 
 
3161
    def test_exclude_tests(self):
 
3162
        test_list = ['bogus']
 
3163
        loader = self._create_loader('bogus')
 
3164
 
 
3165
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
 
3166
        self.assertEquals([], _test_ids(suite))
 
3167
 
 
3168
 
 
3169
class TestTestPrefixRegistry(tests.TestCase):
 
3170
 
 
3171
    def _get_registry(self):
 
3172
        tp_registry = tests.TestPrefixAliasRegistry()
 
3173
        return tp_registry
 
3174
 
 
3175
    def test_register_new_prefix(self):
 
3176
        tpr = self._get_registry()
 
3177
        tpr.register('foo', 'fff.ooo.ooo')
 
3178
        self.assertEquals('fff.ooo.ooo', tpr.get('foo'))
 
3179
 
 
3180
    def test_register_existing_prefix(self):
 
3181
        tpr = self._get_registry()
 
3182
        tpr.register('bar', 'bbb.aaa.rrr')
 
3183
        tpr.register('bar', 'bBB.aAA.rRR')
 
3184
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
 
3185
        self.assertThat(self.get_log(),
 
3186
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3187
                           doctest.ELLIPSIS))
 
3188
 
 
3189
    def test_get_unknown_prefix(self):
 
3190
        tpr = self._get_registry()
 
3191
        self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
 
3192
 
 
3193
    def test_resolve_prefix(self):
 
3194
        tpr = self._get_registry()
 
3195
        tpr.register('bar', 'bb.aa.rr')
 
3196
        self.assertEquals('bb.aa.rr', tpr.resolve_alias('bar'))
 
3197
 
 
3198
    def test_resolve_unknown_alias(self):
 
3199
        tpr = self._get_registry()
 
3200
        self.assertRaises(errors.BzrCommandError,
 
3201
                          tpr.resolve_alias, 'I am not a prefix')
 
3202
 
 
3203
    def test_predefined_prefixes(self):
 
3204
        tpr = tests.test_prefix_alias_registry
 
3205
        self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
 
3206
        self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
 
3207
        self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
 
3208
        self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
 
3209
        self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
 
3210
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
 
3211
 
 
3212
 
 
3213
class TestThreadLeakDetection(tests.TestCase):
 
3214
    """Ensure when tests leak threads we detect and report it"""
 
3215
 
 
3216
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3217
        def __init__(self):
 
3218
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3219
            self.leaks = []
 
3220
        def _report_thread_leak(self, test, leaks, alive):
 
3221
            self.leaks.append((test, leaks))
 
3222
 
 
3223
    def test_testcase_without_addCleanups(self):
 
3224
        """Check old TestCase instances don't break with leak detection"""
 
3225
        class Test(unittest.TestCase):
 
3226
            def runTest(self):
 
3227
                pass
 
3228
        result = self.LeakRecordingResult()
 
3229
        test = Test()
 
3230
        result.startTestRun()
 
3231
        test.run(result)
 
3232
        result.stopTestRun()
 
3233
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3234
        self.assertEqual(result.leaks, [])
 
3235
        
 
3236
    def test_thread_leak(self):
 
3237
        """Ensure a thread that outlives the running of a test is reported
 
3238
 
 
3239
        Uses a thread that blocks on an event, and is started by the inner
 
3240
        test case. As the thread outlives the inner case's run, it should be
 
3241
        detected as a leak, but the event is then set so that the thread can
 
3242
        be safely joined in cleanup so it's not leaked for real.
 
3243
        """
 
3244
        event = threading.Event()
 
3245
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3246
        class Test(tests.TestCase):
 
3247
            def test_leak(self):
 
3248
                thread.start()
 
3249
        result = self.LeakRecordingResult()
 
3250
        test = Test("test_leak")
 
3251
        self.addCleanup(thread.join)
 
3252
        self.addCleanup(event.set)
 
3253
        result.startTestRun()
 
3254
        test.run(result)
 
3255
        result.stopTestRun()
 
3256
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3257
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3258
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3259
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3260
 
 
3261
    def test_multiple_leaks(self):
 
3262
        """Check multiple leaks are blamed on the test cases at fault
 
3263
 
 
3264
        Same concept as the previous test, but has one inner test method that
 
3265
        leaks two threads, and one that doesn't leak at all.
 
3266
        """
 
3267
        event = threading.Event()
 
3268
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3269
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3270
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3271
        class Test(tests.TestCase):
 
3272
            def test_first_leak(self):
 
3273
                thread_b.start()
 
3274
            def test_second_no_leak(self):
 
3275
                pass
 
3276
            def test_third_leak(self):
 
3277
                thread_c.start()
 
3278
                thread_a.start()
 
3279
        result = self.LeakRecordingResult()
 
3280
        first_test = Test("test_first_leak")
 
3281
        third_test = Test("test_third_leak")
 
3282
        self.addCleanup(thread_a.join)
 
3283
        self.addCleanup(thread_b.join)
 
3284
        self.addCleanup(thread_c.join)
 
3285
        self.addCleanup(event.set)
 
3286
        result.startTestRun()
 
3287
        unittest.TestSuite(
 
3288
            [first_test, Test("test_second_no_leak"), third_test]
 
3289
            ).run(result)
 
3290
        result.stopTestRun()
 
3291
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3292
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3293
        self.assertEqual(result.leaks, [
 
3294
            (first_test, set([thread_b])),
 
3295
            (third_test, set([thread_a, thread_c]))])
 
3296
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3297
 
 
3298
 
 
3299
class TestPostMortemDebugging(tests.TestCase):
 
3300
    """Check post mortem debugging works when tests fail or error"""
 
3301
 
 
3302
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3303
        def __init__(self):
 
3304
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3305
            self.postcode = None
 
3306
        def _post_mortem(self, tb=None):
 
3307
            """Record the code object at the end of the current traceback"""
 
3308
            tb = tb or sys.exc_info()[2]
 
3309
            if tb is not None:
 
3310
                next = tb.tb_next
 
3311
                while next is not None:
 
3312
                    tb = next
 
3313
                    next = next.tb_next
 
3314
                self.postcode = tb.tb_frame.f_code
 
3315
        def report_error(self, test, err):
 
3316
            pass
 
3317
        def report_failure(self, test, err):
 
3318
            pass
 
3319
 
 
3320
    def test_location_unittest_error(self):
 
3321
        """Needs right post mortem traceback with erroring unittest case"""
 
3322
        class Test(unittest.TestCase):
 
3323
            def runTest(self):
 
3324
                raise RuntimeError
 
3325
        result = self.TracebackRecordingResult()
 
3326
        Test().run(result)
 
3327
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3328
 
 
3329
    def test_location_unittest_failure(self):
 
3330
        """Needs right post mortem traceback with failing unittest case"""
 
3331
        class Test(unittest.TestCase):
 
3332
            def runTest(self):
 
3333
                raise self.failureException
 
3334
        result = self.TracebackRecordingResult()
 
3335
        Test().run(result)
 
3336
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3337
 
 
3338
    def test_location_bt_error(self):
 
3339
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3340
        class Test(tests.TestCase):
 
3341
            def test_error(self):
 
3342
                raise RuntimeError
 
3343
        result = self.TracebackRecordingResult()
 
3344
        Test("test_error").run(result)
 
3345
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3346
 
 
3347
    def test_location_bt_failure(self):
 
3348
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3349
        class Test(tests.TestCase):
 
3350
            def test_failure(self):
 
3351
                raise self.failureException
 
3352
        result = self.TracebackRecordingResult()
 
3353
        Test("test_failure").run(result)
 
3354
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3355
 
 
3356
    def test_env_var_triggers_post_mortem(self):
 
3357
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3358
        import pdb
 
3359
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3360
        post_mortem_calls = []
 
3361
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3362
        self.overrideEnv('BZR_TEST_PDB', None)
 
3363
        result._post_mortem(1)
 
3364
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3365
        result._post_mortem(2)
 
3366
        self.assertEqual([2], post_mortem_calls)
 
3367
 
 
3368
 
 
3369
class TestRunSuite(tests.TestCase):
 
3370
 
 
3371
    def test_runner_class(self):
 
3372
        """run_suite accepts and uses a runner_class keyword argument."""
 
3373
        class Stub(tests.TestCase):
 
3374
            def test_foo(self):
 
3375
                pass
 
3376
        suite = Stub("test_foo")
 
3377
        calls = []
 
3378
        class MyRunner(tests.TextTestRunner):
 
3379
            def run(self, test):
 
3380
                calls.append(test)
 
3381
                return tests.ExtendedTestResult(self.stream, self.descriptions,
 
3382
                                                self.verbosity)
 
3383
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
 
3384
        self.assertLength(1, calls)
 
3385
 
 
3386
 
 
3387
class TestUncollectedWarnings(tests.TestCase):
 
3388
    """Check a test case still alive after being run emits a warning"""
 
3389
 
 
3390
    class Test(tests.TestCase):
 
3391
        def test_pass(self):
 
3392
            pass
 
3393
        def test_self_ref(self):
 
3394
            self.also_self = self.test_self_ref
 
3395
        def test_skip(self):
 
3396
            self.skip("Don't need")
 
3397
 
 
3398
    def _get_suite(self):
 
3399
        return TestUtil.TestSuite([
 
3400
            self.Test("test_pass"),
 
3401
            self.Test("test_self_ref"),
 
3402
            self.Test("test_skip"),
 
3403
            ])
 
3404
 
 
3405
    def _inject_stream_into_subunit(self, stream):
 
3406
        """To be overridden by subclasses that run tests out of process"""
 
3407
 
 
3408
    def _run_selftest_with_suite(self, **kwargs):
 
3409
        sio = StringIO()
 
3410
        self._inject_stream_into_subunit(sio)
 
3411
        old_flags = tests.selftest_debug_flags
 
3412
        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
 
3413
        gc_on = gc.isenabled()
 
3414
        if gc_on:
 
3415
            gc.disable()
 
3416
        try:
 
3417
            tests.selftest(test_suite_factory=self._get_suite, stream=sio,
 
3418
                **kwargs)
 
3419
        finally:
 
3420
            if gc_on:
 
3421
                gc.enable()
 
3422
            tests.selftest_debug_flags = old_flags
 
3423
        output = sio.getvalue()
 
3424
        self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
 
3425
        self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
 
3426
        return output
 
3427
 
 
3428
    def test_testsuite(self):
 
3429
        self._run_selftest_with_suite()
 
3430
 
 
3431
    def test_pattern(self):
 
3432
        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
 
3433
        self.assertNotContainsRe(out, "test_skip")
 
3434
 
 
3435
    def test_exclude_pattern(self):
 
3436
        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
 
3437
        self.assertNotContainsRe(out, "test_skip")
 
3438
 
 
3439
    def test_random_seed(self):
 
3440
        self._run_selftest_with_suite(random_seed="now")
 
3441
 
 
3442
    def test_matching_tests_first(self):
 
3443
        self._run_selftest_with_suite(matching_tests_first=True,
 
3444
            pattern="test_self_ref$")
 
3445
 
 
3446
    def test_starting_with_and_exclude(self):
 
3447
        out = self._run_selftest_with_suite(starting_with=["bt."],
 
3448
            exclude_pattern="test_skip$")
 
3449
        self.assertNotContainsRe(out, "test_skip")
 
3450
 
 
3451
    def test_additonal_decorator(self):
 
3452
        out = self._run_selftest_with_suite(
 
3453
            suite_decorators=[tests.TestDecorator])
 
3454
 
 
3455
 
 
3456
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
 
3457
    """Check warnings from tests staying alive are emitted with subunit"""
 
3458
 
 
3459
    _test_needs_features = [features.subunit]
 
3460
 
 
3461
    def _run_selftest_with_suite(self, **kwargs):
 
3462
        return TestUncollectedWarnings._run_selftest_with_suite(self,
 
3463
            runner_class=tests.SubUnitBzrRunner, **kwargs)
 
3464
 
 
3465
 
 
3466
class TestUncollectedWarningsForking(TestUncollectedWarnings):
 
3467
    """Check warnings from tests staying alive are emitted when forking"""
 
3468
 
 
3469
    _test_needs_features = [features.subunit]
 
3470
 
 
3471
    def _inject_stream_into_subunit(self, stream):
 
3472
        """Monkey-patch subunit so the extra output goes to stream not stdout
 
3473
 
 
3474
        Some APIs need rewriting so this kind of bogus hackery can be replaced
 
3475
        by passing the stream param from run_tests down into ProtocolTestCase.
 
3476
        """
 
3477
        from subunit import ProtocolTestCase
 
3478
        _original_init = ProtocolTestCase.__init__
 
3479
        def _init_with_passthrough(self, *args, **kwargs):
 
3480
            _original_init(self, *args, **kwargs)
 
3481
            self._passthrough = stream
 
3482
        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
 
3483
 
 
3484
    def _run_selftest_with_suite(self, **kwargs):
 
3485
        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
 
3486
        if getattr(os, "fork", None) is None:
 
3487
            raise tests.TestNotApplicable("Platform doesn't support forking")
 
3488
        # Make sure the fork code is actually invoked by claiming two cores
 
3489
        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
 
3490
        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
 
3491
        return TestUncollectedWarnings._run_selftest_with_suite(self, **kwargs)
 
3492
 
 
3493
 
 
3494
class TestEnvironHandling(tests.TestCase):
 
3495
 
 
3496
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3497
        self.assertFalse('MYVAR' in os.environ)
 
3498
        self.overrideEnv('MYVAR', '42')
 
3499
        # We use an embedded test to make sure we fix the _captureVar bug
 
3500
        class Test(tests.TestCase):
 
3501
            def test_me(self):
 
3502
                # The first call save the 42 value
 
3503
                self.overrideEnv('MYVAR', None)
 
3504
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3505
                # Make sure we can call it twice
 
3506
                self.overrideEnv('MYVAR', None)
 
3507
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3508
        output = StringIO()
 
3509
        result = tests.TextTestResult(output, 0, 1)
 
3510
        Test('test_me').run(result)
 
3511
        if not result.wasStrictlySuccessful():
 
3512
            self.fail(output.getvalue())
 
3513
        # We get our value back
 
3514
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3515
 
 
3516
 
 
3517
class TestIsolatedEnv(tests.TestCase):
 
3518
    """Test isolating tests from os.environ.
 
3519
 
 
3520
    Since we use tests that are already isolated from os.environ a bit of care
 
3521
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3522
    The tests start an already clean os.environ which allow doing valid
 
3523
    assertions about which variables are present or not and design tests around
 
3524
    these assertions.
 
3525
    """
 
3526
 
 
3527
    class ScratchMonkey(tests.TestCase):
 
3528
 
 
3529
        def test_me(self):
 
3530
            pass
 
3531
 
 
3532
    def test_basics(self):
 
3533
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3534
        # for tests.TestCase.
 
3535
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3536
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3537
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3538
        self.assertFalse('BZR_HOME' in os.environ)
 
3539
        # Make sure we know the definition of LINES: part of os.environ for
 
3540
        # tests.TestCase
 
3541
        self.assertTrue('LINES' in tests.isolated_environ)
 
3542
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3543
        self.assertEquals('25', os.environ['LINES'])
 
3544
 
 
3545
    def test_injecting_unknown_variable(self):
 
3546
        # BZR_HOME is known to be absent from os.environ
 
3547
        test = self.ScratchMonkey('test_me')
 
3548
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3549
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3550
        tests.restore_os_environ(test)
 
3551
        self.assertFalse('BZR_HOME' in os.environ)
 
3552
 
 
3553
    def test_injecting_known_variable(self):
 
3554
        test = self.ScratchMonkey('test_me')
 
3555
        # LINES is known to be present in os.environ
 
3556
        tests.override_os_environ(test, {'LINES': '42'})
 
3557
        self.assertEquals('42', os.environ['LINES'])
 
3558
        tests.restore_os_environ(test)
 
3559
        self.assertEquals('25', os.environ['LINES'])
 
3560
 
 
3561
    def test_deleting_variable(self):
 
3562
        test = self.ScratchMonkey('test_me')
 
3563
        # LINES is known to be present in os.environ
 
3564
        tests.override_os_environ(test, {'LINES': None})
 
3565
        self.assertTrue('LINES' not in os.environ)
 
3566
        tests.restore_os_environ(test)
 
3567
        self.assertEquals('25', os.environ['LINES'])
 
3568
 
 
3569
 
 
3570
class TestDocTestSuiteIsolation(tests.TestCase):
 
3571
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3572
 
 
3573
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3574
    the clean environment as a base for testing. To precisely capture the
 
3575
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3576
    compare against.
 
3577
 
 
3578
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3579
    not `os.environ` so each test overrides it to suit its needs.
 
3580
 
 
3581
    """
 
3582
 
 
3583
    def get_doctest_suite_for_string(self, klass, string):
 
3584
        class Finder(doctest.DocTestFinder):
 
3585
 
 
3586
            def find(*args, **kwargs):
 
3587
                test = doctest.DocTestParser().get_doctest(
 
3588
                    string, {}, 'foo', 'foo.py', 0)
 
3589
                return [test]
 
3590
 
 
3591
        suite = klass(test_finder=Finder())
 
3592
        return suite
 
3593
 
 
3594
    def run_doctest_suite_for_string(self, klass, string):
 
3595
        suite = self.get_doctest_suite_for_string(klass, string)
 
3596
        output = StringIO()
 
3597
        result = tests.TextTestResult(output, 0, 1)
 
3598
        suite.run(result)
 
3599
        return result, output
 
3600
 
 
3601
    def assertDocTestStringSucceds(self, klass, string):
 
3602
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3603
        if not result.wasStrictlySuccessful():
 
3604
            self.fail(output.getvalue())
 
3605
 
 
3606
    def assertDocTestStringFails(self, klass, string):
 
3607
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3608
        if result.wasStrictlySuccessful():
 
3609
            self.fail(output.getvalue())
 
3610
 
 
3611
    def test_injected_variable(self):
 
3612
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3613
        test = """
 
3614
            >>> import os
 
3615
            >>> os.environ['LINES']
 
3616
            '42'
 
3617
            """
 
3618
        # doctest.DocTestSuite fails as it sees '25'
 
3619
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3620
        # tests.DocTestSuite sees '42'
 
3621
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3622
 
 
3623
    def test_deleted_variable(self):
 
3624
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3625
        test = """
 
3626
            >>> import os
 
3627
            >>> os.environ.get('LINES')
 
3628
            """
 
3629
        # doctest.DocTestSuite fails as it sees '25'
 
3630
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3631
        # tests.DocTestSuite sees None
 
3632
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)