~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/blackbox/test_selftest.py

Smart server mediums now detect which protocol version a request is and dispatch accordingly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2005 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""UI tests for the test framework."""
18
18
 
19
19
import os
 
20
import signal
 
21
import sys
20
22
 
 
23
import bzrlib
21
24
from bzrlib import (
22
 
    tests,
 
25
    osutils,
23
26
    )
 
27
from bzrlib.errors import ParamikoNotPresent
24
28
from bzrlib.tests import (
25
 
    features,
26
 
    )
27
 
from bzrlib.transport import memory
28
 
 
29
 
class SelfTestPatch:
30
 
 
31
 
    def get_params_passed_to_core(self, cmdline):
32
 
        params = []
33
 
        def selftest(*args, **kwargs):
34
 
            """Capture the arguments selftest was run with."""
35
 
            params.append((args, kwargs))
36
 
            return True
37
 
        # Yes this prevents using threads to run the test suite in parallel,
38
 
        # however we don't have a clean dependency injector for commands, 
39
 
        # and even if we did - we'd still be testing that the glue is wired
40
 
        # up correctly. XXX: TODO: Solve this testing problem.
41
 
        original_selftest = tests.selftest
42
 
        tests.selftest = selftest
43
 
        try:
44
 
            self.run_bzr(cmdline)
45
 
            return params[0]
46
 
        finally:
47
 
            tests.selftest = original_selftest
48
 
 
49
 
 
50
 
class TestOptions(tests.TestCase, SelfTestPatch):
51
 
 
52
 
    def test_load_list(self):
53
 
        params = self.get_params_passed_to_core('selftest --load-list foo')
54
 
        self.assertEqual('foo', params[1]['load_list'])
 
29
                          TestCase,
 
30
                          TestCaseInTempDir,
 
31
                          TestCaseWithMemoryTransport,
 
32
                          TestCaseWithTransport,
 
33
                          TestUIFactory,
 
34
                          TestSkipped,
 
35
                          )
 
36
from bzrlib.tests.blackbox import ExternalBase
 
37
 
 
38
 
 
39
class TestOptions(TestCase):
 
40
 
 
41
    current_test = None
55
42
 
56
43
    def test_transport_set_to_sftp(self):
57
 
        # Test that we can pass a transport to the selftest core - sftp
58
 
        # version.
59
 
        self.requireFeature(features.paramiko)
60
 
        from bzrlib.tests import stub_sftp
61
 
        params = self.get_params_passed_to_core('selftest --transport=sftp')
62
 
        self.assertEqual(stub_sftp.SFTPAbsoluteServer,
63
 
            params[1]["transport"])
 
44
        # test the --transport option has taken effect from within the
 
45
        # test_transport test
 
46
        try:
 
47
            import bzrlib.transport.sftp
 
48
        except ParamikoNotPresent:
 
49
            raise TestSkipped("Paramiko not present")
 
50
        if TestOptions.current_test != "test_transport_set_to_sftp":
 
51
            return
 
52
        self.assertEqual(bzrlib.transport.sftp.SFTPAbsoluteServer,
 
53
                         bzrlib.tests.default_transport)
64
54
 
65
55
    def test_transport_set_to_memory(self):
66
 
        # Test that we can pass a transport to the selftest core - memory
67
 
        # version.
68
 
        params = self.get_params_passed_to_core('selftest --transport=memory')
69
 
        self.assertEqual(memory.MemoryServer, params[1]["transport"])
70
 
 
71
 
    def test_parameters_passed_to_core(self):
72
 
        params = self.get_params_passed_to_core('selftest --list-only')
73
 
        self.assertTrue("list_only" in params[1])
74
 
        params = self.get_params_passed_to_core('selftest --list-only selftest')
75
 
        self.assertTrue("list_only" in params[1])
76
 
        params = self.get_params_passed_to_core(['selftest', '--list-only',
77
 
            '--exclude', 'selftest'])
78
 
        self.assertTrue("list_only" in params[1])
79
 
        params = self.get_params_passed_to_core(['selftest', '--list-only',
80
 
            'selftest', '--randomize', 'now'])
81
 
        self.assertSubset(["list_only", "random_seed"], params[1])
82
 
 
83
 
    def test_starting_with(self):
84
 
        params = self.get_params_passed_to_core('selftest --starting-with foo')
85
 
        self.assertEqual(['foo'], params[1]['starting_with'])
86
 
 
87
 
    def test_starting_with_multiple_argument(self):
88
 
        params = self.get_params_passed_to_core(
89
 
            'selftest --starting-with foo --starting-with bar')
90
 
        self.assertEqual(['foo', 'bar'], params[1]['starting_with'])
91
 
 
92
 
    def test_subunit(self):
93
 
        self.requireFeature(features.subunit)
94
 
        params = self.get_params_passed_to_core('selftest --subunit')
95
 
        self.assertEqual(tests.SubUnitBzrRunner, params[1]['runner_class'])
96
 
 
97
 
    def _parse_test_list(self, lines, newlines_in_header=0):
98
 
        "Parse a list of lines into a tuple of 3 lists (header,body,footer)."
99
 
        in_header = newlines_in_header != 0
100
 
        in_footer = False
101
 
        header = []
102
 
        body = []
103
 
        footer = []
104
 
        header_newlines_found = 0
105
 
        for line in lines:
106
 
            if in_header:
107
 
                if line == '':
108
 
                    header_newlines_found += 1
109
 
                    if header_newlines_found >= newlines_in_header:
110
 
                        in_header = False
111
 
                        continue
112
 
                header.append(line)
113
 
            elif not in_footer:
114
 
                if line.startswith('-------'):
115
 
                    in_footer = True
116
 
                else:
117
 
                    body.append(line)
118
 
            else:
119
 
                footer.append(line)
120
 
        # If the last body line is blank, drop it off the list
121
 
        if len(body) > 0 and body[-1] == '':
122
 
            body.pop()
123
 
        return (header,body,footer)
124
 
 
125
 
    def test_list_only(self):
126
 
        # check that bzr selftest --list-only outputs no ui noise
127
 
        def selftest(*args, **kwargs):
128
 
            """Capture the arguments selftest was run with."""
129
 
            return True
130
 
        def outputs_nothing(cmdline):
131
 
            out,err = self.run_bzr(cmdline)
132
 
            (header,body,footer) = self._parse_test_list(out.splitlines())
133
 
            num_tests = len(body)
134
 
            self.assertLength(0, header)
135
 
            self.assertLength(0, footer)
136
 
            self.assertEqual('', err)
137
 
        # Yes this prevents using threads to run the test suite in parallel,
138
 
        # however we don't have a clean dependency injector for commands, 
139
 
        # and even if we did - we'd still be testing that the glue is wired
140
 
        # up correctly. XXX: TODO: Solve this testing problem.
141
 
        original_selftest = tests.selftest
142
 
        tests.selftest = selftest
143
 
        try:
144
 
            outputs_nothing('selftest --list-only')
145
 
            outputs_nothing('selftest --list-only selftest')
146
 
            outputs_nothing(['selftest', '--list-only', '--exclude', 'selftest'])
147
 
        finally:
148
 
            tests.selftest = original_selftest
149
 
 
150
 
    def test_lsprof_tests(self):
151
 
        params = self.get_params_passed_to_core('selftest --lsprof-tests')
152
 
        self.assertEqual(True, params[1]["lsprof_tests"])
153
 
 
154
 
    def test_parallel_fork_unsupported(self):
155
 
        if getattr(os, "fork", None) is not None:
156
 
            self.addCleanup(setattr, os, "fork", os.fork)
157
 
            del os.fork
158
 
        out, err = self.run_bzr(["selftest", "--parallel=fork", "-s", "bt.x"],
159
 
            retcode=3)
160
 
        self.assertIn("platform does not support fork", err)
161
 
        self.assertFalse(out)
 
56
        # test the --transport option has taken effect from within the
 
57
        # test_transport test
 
58
        import bzrlib.transport.memory
 
59
        if TestOptions.current_test != "test_transport_set_to_memory":
 
60
            return
 
61
        self.assertEqual(bzrlib.transport.memory.MemoryServer,
 
62
                         bzrlib.tests.default_transport)
 
63
 
 
64
    def test_transport(self):
 
65
        # test that --transport=sftp works
 
66
        try:
 
67
            import bzrlib.transport.sftp
 
68
        except ParamikoNotPresent:
 
69
            raise TestSkipped("Paramiko not present")
 
70
        old_transport = bzrlib.tests.default_transport
 
71
        old_root = TestCaseWithMemoryTransport.TEST_ROOT
 
72
        TestCaseWithMemoryTransport.TEST_ROOT = None
 
73
        try:
 
74
            TestOptions.current_test = "test_transport_set_to_sftp"
 
75
            stdout = self.capture('selftest --transport=sftp test_transport_set_to_sftp')
 
76
            
 
77
            self.assertContainsRe(stdout, 'Ran 1 test')
 
78
            self.assertEqual(old_transport, bzrlib.tests.default_transport)
 
79
 
 
80
            TestOptions.current_test = "test_transport_set_to_memory"
 
81
            stdout = self.capture('selftest --transport=memory test_transport_set_to_memory')
 
82
            self.assertContainsRe(stdout, 'Ran 1 test')
 
83
            self.assertEqual(old_transport, bzrlib.tests.default_transport)
 
84
        finally:
 
85
            bzrlib.tests.default_transport = old_transport
 
86
            TestOptions.current_test = None
 
87
            TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
88
 
 
89
 
 
90
class TestRunBzr(ExternalBase):
 
91
 
 
92
    def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
 
93
                         working_dir=None):
 
94
        """Override run_bzr_captured to test how it is invoked by run_bzr.
 
95
 
 
96
        We test how run_bzr_captured actually invokes bzr in another location.
 
97
        Here we only need to test that it is run_bzr passes the right
 
98
        parameters to run_bzr_captured.
 
99
        """
 
100
        self.argv = argv
 
101
        self.retcode = retcode
 
102
        self.encoding = encoding
 
103
        self.stdin = stdin
 
104
        self.working_dir = working_dir
 
105
 
 
106
    def test_args(self):
 
107
        """Test that run_bzr passes args correctly to run_bzr_captured"""
 
108
        self.run_bzr('arg1', 'arg2', 'arg3', retcode=1)
 
109
        self.assertEqual(('arg1', 'arg2', 'arg3'), self.argv)
 
110
 
 
111
    def test_encoding(self):
 
112
        """Test that run_bzr passes encoding to run_bzr_captured"""
 
113
        self.run_bzr('foo', 'bar')
 
114
        self.assertEqual(None, self.encoding)
 
115
        self.assertEqual(('foo', 'bar'), self.argv)
 
116
 
 
117
        self.run_bzr('foo', 'bar', encoding='baz')
 
118
        self.assertEqual('baz', self.encoding)
 
119
        self.assertEqual(('foo', 'bar'), self.argv)
 
120
 
 
121
    def test_retcode(self):
 
122
        """Test that run_bzr passes retcode to run_bzr_captured"""
 
123
        # Default is retcode == 0
 
124
        self.run_bzr('foo', 'bar')
 
125
        self.assertEqual(0, self.retcode)
 
126
        self.assertEqual(('foo', 'bar'), self.argv)
 
127
 
 
128
        self.run_bzr('foo', 'bar', retcode=1)
 
129
        self.assertEqual(1, self.retcode)
 
130
        self.assertEqual(('foo', 'bar'), self.argv)
 
131
 
 
132
        self.run_bzr('foo', 'bar', retcode=None)
 
133
        self.assertEqual(None, self.retcode)
 
134
        self.assertEqual(('foo', 'bar'), self.argv)
 
135
 
 
136
        self.run_bzr('foo', 'bar', retcode=3)
 
137
        self.assertEqual(3, self.retcode)
 
138
        self.assertEqual(('foo', 'bar'), self.argv)
 
139
 
 
140
    def test_stdin(self):
 
141
        # test that the stdin keyword to run_bzr is passed through to
 
142
        # run_bzr_captured as-is. We do this by overriding
 
143
        # run_bzr_captured in this class, and then calling run_bzr,
 
144
        # which is a convenience function for run_bzr_captured, so 
 
145
        # should invoke it.
 
146
        self.run_bzr('foo', 'bar', stdin='gam')
 
147
        self.assertEqual('gam', self.stdin)
 
148
        self.assertEqual(('foo', 'bar'), self.argv)
 
149
 
 
150
        self.run_bzr('foo', 'bar', stdin='zippy')
 
151
        self.assertEqual('zippy', self.stdin)
 
152
        self.assertEqual(('foo', 'bar'), self.argv)
 
153
 
 
154
    def test_working_dir(self):
 
155
        """Test that run_bzr passes working_dir to run_bzr_captured"""
 
156
        self.run_bzr('foo', 'bar')
 
157
        self.assertEqual(None, self.working_dir)
 
158
        self.assertEqual(('foo', 'bar'), self.argv)
 
159
 
 
160
        self.run_bzr('foo', 'bar', working_dir='baz')
 
161
        self.assertEqual('baz', self.working_dir)
 
162
        self.assertEqual(('foo', 'bar'), self.argv)
 
163
 
 
164
 
 
165
class TestBenchmarkTests(TestCaseWithTransport):
 
166
 
 
167
    def test_benchmark_runs_benchmark_tests(self):
 
168
        """bzr selftest --benchmark should not run the default test suite."""
 
169
        # We test this by passing a regression test name to --benchmark, which
 
170
        # should result in 0 rests run.
 
171
        old_root = TestCaseWithMemoryTransport.TEST_ROOT
 
172
        try:
 
173
            TestCaseWithMemoryTransport.TEST_ROOT = None
 
174
            out, err = self.run_bzr('selftest', '--benchmark', 'workingtree_implementations')
 
175
        finally:
 
176
            TestCaseWithMemoryTransport.TEST_ROOT = old_root
 
177
        self.assertContainsRe(out, 'Ran 0 tests.*\n\nOK')
 
178
        self.assertEqual(
 
179
            'tests passed\n',
 
180
            err)
 
181
        benchfile = open(".perf_history", "rt")
 
182
        try:
 
183
            lines = benchfile.readlines()
 
184
        finally:
 
185
            benchfile.close()
 
186
        self.assertEqual(1, len(lines))
 
187
        self.assertContainsRe(lines[0], "--date [0-9.]+")
 
188
 
 
189
 
 
190
class TestRunBzrCaptured(ExternalBase):
 
191
 
 
192
    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
 
193
                         a_callable=None, *args, **kwargs):
 
194
        self.stdin = stdin
 
195
        self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
 
196
        self.factory = bzrlib.ui.ui_factory
 
197
        self.working_dir = osutils.getcwd()
 
198
        stdout.write('foo\n')
 
199
        stderr.write('bar\n')
 
200
        return 0
 
201
 
 
202
    def test_stdin(self):
 
203
        # test that the stdin keyword to run_bzr_captured is passed through to
 
204
        # apply_redirected as a StringIO. We do this by overriding
 
205
        # apply_redirected in this class, and then calling run_bzr_captured,
 
206
        # which calls apply_redirected. 
 
207
        self.run_bzr_captured(['foo', 'bar'], stdin='gam')
 
208
        self.assertEqual('gam', self.stdin.read())
 
209
        self.assertTrue(self.stdin is self.factory_stdin)
 
210
        self.run_bzr_captured(['foo', 'bar'], stdin='zippy')
 
211
        self.assertEqual('zippy', self.stdin.read())
 
212
        self.assertTrue(self.stdin is self.factory_stdin)
 
213
 
 
214
    def test_ui_factory(self):
 
215
        # each invocation of self.run_bzr_captured should get its
 
216
        # own UI factory, which is an instance of TestUIFactory,
 
217
        # with stdin, stdout and stderr attached to the stdin,
 
218
        # stdout and stderr of the invoked run_bzr_captured
 
219
        current_factory = bzrlib.ui.ui_factory
 
220
        self.run_bzr_captured(['foo'])
 
221
        self.failIf(current_factory is self.factory)
 
222
        self.assertNotEqual(sys.stdout, self.factory.stdout)
 
223
        self.assertNotEqual(sys.stderr, self.factory.stderr)
 
224
        self.assertEqual('foo\n', self.factory.stdout.getvalue())
 
225
        self.assertEqual('bar\n', self.factory.stderr.getvalue())
 
226
        self.assertIsInstance(self.factory, TestUIFactory)
 
227
 
 
228
    def test_working_dir(self):
 
229
        self.build_tree(['one/', 'two/'])
 
230
        cwd = osutils.getcwd()
 
231
 
 
232
        # Default is to work in the current directory
 
233
        self.run_bzr_captured(['foo', 'bar'])
 
234
        self.assertEqual(cwd, self.working_dir)
 
235
 
 
236
        self.run_bzr_captured(['foo', 'bar'], working_dir=None)
 
237
        self.assertEqual(cwd, self.working_dir)
 
238
 
 
239
        # The function should be run in the alternative directory
 
240
        # but afterwards the current working dir shouldn't be changed
 
241
        self.run_bzr_captured(['foo', 'bar'], working_dir='one')
 
242
        self.assertNotEqual(cwd, self.working_dir)
 
243
        self.assertEndsWith(self.working_dir, 'one')
 
244
        self.assertEqual(cwd, osutils.getcwd())
 
245
 
 
246
        self.run_bzr_captured(['foo', 'bar'], working_dir='two')
 
247
        self.assertNotEqual(cwd, self.working_dir)
 
248
        self.assertEndsWith(self.working_dir, 'two')
 
249
        self.assertEqual(cwd, osutils.getcwd())
 
250
 
 
251
 
 
252
class TestRunBzrSubprocess(TestCaseWithTransport):
 
253
 
 
254
    def test_run_bzr_subprocess(self):
 
255
        """The run_bzr_helper_external comand behaves nicely."""
 
256
        result = self.run_bzr_subprocess('--version')
 
257
        result = self.run_bzr_subprocess('--version', retcode=None)
 
258
        self.assertContainsRe(result[0], 'is free software')
 
259
        self.assertRaises(AssertionError, self.run_bzr_subprocess, 
 
260
                          '--versionn')
 
261
        result = self.run_bzr_subprocess('--versionn', retcode=3)
 
262
        result = self.run_bzr_subprocess('--versionn', retcode=None)
 
263
        self.assertContainsRe(result[1], 'unknown command')
 
264
        err = self.run_bzr_subprocess('merge', '--merge-type', 'magic merge', 
 
265
                                      retcode=3)[1]
 
266
        self.assertContainsRe(err, 'Bad value "magic merge" for option'
 
267
                              ' "merge-type"')
 
268
 
 
269
    def test_run_bzr_subprocess_env(self):
 
270
        """run_bzr_subprocess can set environment variables in the child only.
 
271
 
 
272
        These changes should not change the running process, only the child.
 
273
        """
 
274
        # The test suite should unset this variable
 
275
        self.assertEqual(None, os.environ.get('BZR_EMAIL'))
 
276
        out, err = self.run_bzr_subprocess('whoami', env_changes={
 
277
                                            'BZR_EMAIL':'Joe Foo <joe@foo.com>'
 
278
                                          }, universal_newlines=True)
 
279
        self.assertEqual('', err)
 
280
        self.assertEqual('Joe Foo <joe@foo.com>\n', out)
 
281
        # And it should not be modified
 
282
        self.assertEqual(None, os.environ.get('BZR_EMAIL'))
 
283
 
 
284
        # Do it again with a different address, just to make sure
 
285
        # it is actually changing
 
286
        out, err = self.run_bzr_subprocess('whoami', env_changes={
 
287
                                            'BZR_EMAIL':'Barry <bar@foo.com>'
 
288
                                          }, universal_newlines=True)
 
289
        self.assertEqual('', err)
 
290
        self.assertEqual('Barry <bar@foo.com>\n', out)
 
291
        self.assertEqual(None, os.environ.get('BZR_EMAIL'))
 
292
 
 
293
    def test_run_bzr_subprocess_env_del(self):
 
294
        """run_bzr_subprocess can remove environment variables too."""
 
295
        # Create a random email, so we are sure this won't collide
 
296
        rand_bzr_email = 'John Doe <jdoe@%s.com>' % (osutils.rand_chars(20),)
 
297
        rand_email = 'Jane Doe <jdoe@%s.com>' % (osutils.rand_chars(20),)
 
298
        os.environ['BZR_EMAIL'] = rand_bzr_email
 
299
        os.environ['EMAIL'] = rand_email
 
300
        try:
 
301
            # By default, the child will inherit the current env setting
 
302
            out, err = self.run_bzr_subprocess('whoami', universal_newlines=True)
 
303
            self.assertEqual('', err)
 
304
            self.assertEqual(rand_bzr_email + '\n', out)
 
305
 
 
306
            # Now that BZR_EMAIL is not set, it should fall back to EMAIL
 
307
            out, err = self.run_bzr_subprocess('whoami',
 
308
                                               env_changes={'BZR_EMAIL':None},
 
309
                                               universal_newlines=True)
 
310
            self.assertEqual('', err)
 
311
            self.assertEqual(rand_email + '\n', out)
 
312
 
 
313
            # This switches back to the default email guessing logic
 
314
            # Which shouldn't match either of the above addresses
 
315
            out, err = self.run_bzr_subprocess('whoami',
 
316
                           env_changes={'BZR_EMAIL':None, 'EMAIL':None},
 
317
                           universal_newlines=True)
 
318
 
 
319
            self.assertEqual('', err)
 
320
            self.assertNotEqual(rand_bzr_email + '\n', out)
 
321
            self.assertNotEqual(rand_email + '\n', out)
 
322
        finally:
 
323
            # TestCase cleans up BZR_EMAIL, and EMAIL at startup
 
324
            del os.environ['BZR_EMAIL']
 
325
            del os.environ['EMAIL']
 
326
 
 
327
    def test_run_bzr_subprocess_env_del_missing(self):
 
328
        """run_bzr_subprocess won't fail if deleting a nonexistant env var"""
 
329
        self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
 
330
        out, err = self.run_bzr_subprocess('rocks',
 
331
                        env_changes={'NON_EXISTANT_ENV_VAR':None},
 
332
                        universal_newlines=True)
 
333
        self.assertEqual('It sure does!\n', out)
 
334
        self.assertEqual('', err)
 
335
 
 
336
    def test_run_bzr_subprocess_working_dir(self):
 
337
        """Test that we can specify the working dir for the child"""
 
338
        cwd = osutils.getcwd()
 
339
 
 
340
        self.make_branch_and_tree('.')
 
341
        self.make_branch_and_tree('one')
 
342
        self.make_branch_and_tree('two')
 
343
 
 
344
        def get_root(**kwargs):
 
345
            """Spawn a process to get the 'root' of the tree.
 
346
 
 
347
            You can pass in arbitrary new arguments. This just makes
 
348
            sure that the returned path doesn't have trailing whitespace.
 
349
            """
 
350
            return self.run_bzr_subprocess('root', **kwargs)[0].rstrip()
 
351
 
 
352
        self.assertEqual(cwd, get_root())
 
353
        self.assertEqual(cwd, get_root(working_dir=None))
 
354
        # Has our path changed?
 
355
        self.assertEqual(cwd, osutils.getcwd())
 
356
 
 
357
        dir1 = get_root(working_dir='one')
 
358
        self.assertEndsWith(dir1, 'one')
 
359
        self.assertEqual(cwd, osutils.getcwd())
 
360
 
 
361
        dir2 = get_root(working_dir='two')
 
362
        self.assertEndsWith(dir2, 'two')
 
363
        self.assertEqual(cwd, osutils.getcwd())
 
364
 
 
365
 
 
366
class _DontSpawnProcess(Exception):
 
367
    """A simple exception which just allows us to skip unnecessary steps"""
 
368
 
 
369
 
 
370
class TestRunBzrSubprocessCommands(TestCaseWithTransport):
 
371
 
 
372
    def _popen(self, *args, **kwargs):
 
373
        """Record the command that is run, so that we can ensure it is correct"""
 
374
        self._popen_args = args
 
375
        self._popen_kwargs = kwargs
 
376
        raise _DontSpawnProcess()
 
377
 
 
378
    def test_run_bzr_subprocess_no_plugins(self):
 
379
        self.assertRaises(_DontSpawnProcess, self.run_bzr_subprocess)
 
380
        command = self._popen_args[0]
 
381
        self.assertEqual(sys.executable, command[0])
 
382
        self.assertEqual(self.get_bzr_path(), command[1])
 
383
        self.assertEqual(['--no-plugins'], command[2:])
 
384
 
 
385
    def test_allow_plugins(self):
 
386
        self.assertRaises(_DontSpawnProcess,
 
387
                          self.run_bzr_subprocess, allow_plugins=True)
 
388
        command = self._popen_args[0]
 
389
        self.assertEqual([], command[2:])
 
390
 
 
391
 
 
392
class TestBzrSubprocess(TestCaseWithTransport):
 
393
 
 
394
    def test_start_and_stop_bzr_subprocess(self):
 
395
        """We can start and perform other test actions while that process is
 
396
        still alive.
 
397
        """
 
398
        process = self.start_bzr_subprocess(['--version'])
 
399
        result = self.finish_bzr_subprocess(process)
 
400
        self.assertContainsRe(result[0], 'is free software')
 
401
        self.assertEqual('', result[1])
 
402
 
 
403
    def test_start_and_stop_bzr_subprocess_with_error(self):
 
404
        """finish_bzr_subprocess allows specification of the desired exit code.
 
405
        """
 
406
        process = self.start_bzr_subprocess(['--versionn'])
 
407
        result = self.finish_bzr_subprocess(process, retcode=3)
 
408
        self.assertEqual('', result[0])
 
409
        self.assertContainsRe(result[1], 'unknown command')
 
410
 
 
411
    def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
 
412
        """finish_bzr_subprocess allows the exit code to be ignored."""
 
413
        process = self.start_bzr_subprocess(['--versionn'])
 
414
        result = self.finish_bzr_subprocess(process, retcode=None)
 
415
        self.assertEqual('', result[0])
 
416
        self.assertContainsRe(result[1], 'unknown command')
 
417
 
 
418
    def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
 
419
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
420
        not the expected one.
 
421
        """
 
422
        process = self.start_bzr_subprocess(['--versionn'])
 
423
        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
 
424
                          process, retcode=0)
 
425
        
 
426
    def test_start_and_stop_bzr_subprocess_send_signal(self):
 
427
        """finish_bzr_subprocess raises self.failureException if the retcode is
 
428
        not the expected one.
 
429
        """
 
430
        process = self.start_bzr_subprocess(['wait-until-signalled'],
 
431
                                            skip_if_plan_to_signal=True)
 
432
        self.assertEqual('running\n', process.stdout.readline())
 
433
        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
 
434
                                            retcode=3)
 
435
        self.assertEqual('', result[0])
 
436
        self.assertEqual('bzr: interrupted\n', result[1])
 
437
 
 
438
    def test_start_and_stop_working_dir(self):
 
439
        cwd = osutils.getcwd()
 
440
 
 
441
        self.make_branch_and_tree('one')
 
442
 
 
443
        process = self.start_bzr_subprocess(['root'], working_dir='one')
 
444
        result = self.finish_bzr_subprocess(process, universal_newlines=True)
 
445
        self.assertEndsWith(result[0], 'one\n')
 
446
        self.assertEqual('', result[1])
 
447
 
 
448
 
 
449
class TestRunBzrError(ExternalBase):
 
450
 
 
451
    def test_run_bzr_error(self):
 
452
        out, err = self.run_bzr_error(['^$'], 'rocks', retcode=0)
 
453
        self.assertEqual(out, 'It sure does!\n')
 
454
 
 
455
        out, err = self.run_bzr_error(["bzr: ERROR: foobarbaz is not versioned"],
 
456
                                      'file-id', 'foobarbaz')
 
457
 
 
458
 
 
459
class TestSelftestCleanOutput(TestCaseInTempDir):
 
460
 
 
461
    def test_clean_output(self):
 
462
        # check that 'bzr selftest --clean-output' works correct
 
463
        dirs = ('test0000.tmp', 'test0001.tmp', 'bzrlib', 'tests')
 
464
        files = ('bzr', 'setup.py', 'test9999.tmp')
 
465
        for i in dirs:
 
466
            os.mkdir(i)
 
467
        for i in files:
 
468
            f = file(i, 'wb')
 
469
            f.write('content of ')
 
470
            f.write(i)
 
471
            f.close()
 
472
 
 
473
        root = os.getcwdu()
 
474
        before = os.listdir(root)
 
475
        before.sort()
 
476
        self.assertEquals(['bzr','bzrlib','setup.py',
 
477
                           'test0000.tmp','test0001.tmp',
 
478
                           'test9999.tmp','tests'],
 
479
                           before)
 
480
 
 
481
        out,err = self.run_bzr_captured(['selftest','--clean-output'],
 
482
                                        working_dir=root)
 
483
 
 
484
        self.assertEquals(['delete directory: test0000.tmp',
 
485
                          'delete directory: test0001.tmp'],
 
486
                          sorted(out.splitlines()))
 
487
        self.assertEquals('', err)
 
488
 
 
489
        after = os.listdir(root)
 
490
        after.sort()
 
491
        self.assertEquals(['bzr','bzrlib','setup.py',
 
492
                           'test9999.tmp','tests'],
 
493
                           after)