~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

Rework test_script a little bit.


Don't allow someone to request a stdin request to echo.
Echo never reads from stdin, it just echos its arguments.
You use 'cat' if you want to read from stdin.

A few other fixes because the tests were using filenames
that are actually illegal on Windows, rather than just
nonexistant.


Change the exception handling for commands so that
unknown errors don't get silently squashed and then
turn into hard-to-debug errors later.

test_script now passes on Windows.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
from cStringIO import StringIO
 
28
 
 
29
from bzrlib import (
 
30
    osutils,
 
31
    tests,
 
32
    )
 
33
 
 
34
 
 
35
def split(s):
 
36
    """Split a command line respecting quotes."""
 
37
    scanner = shlex.shlex(s)
 
38
    scanner.quotes = '\'"`'
 
39
    scanner.whitespace_split = True
 
40
    for t in list(scanner):
 
41
        yield t
 
42
 
 
43
 
 
44
def _script_to_commands(text, file_name=None):
 
45
    """Turn a script into a list of commands with their associated IOs.
 
46
 
 
47
    Each command appears on a line by itself starting with '$ '. It can be
 
48
    associated with an input that will feed it and an expected output.
 
49
 
 
50
    Comments starts with '#' until the end of line.
 
51
    Empty lines are ignored.
 
52
 
 
53
    Input and output are full lines terminated by a '\n'.
 
54
 
 
55
    Input lines start with '<'.
 
56
    Output lines start with nothing.
 
57
    Error lines start with '2>'.
 
58
    """
 
59
 
 
60
    commands = []
 
61
 
 
62
    def add_command(cmd, input, output, error):
 
63
        if cmd is not None:
 
64
            if input is not None:
 
65
                input = ''.join(input)
 
66
            if output is not None:
 
67
                output = ''.join(output)
 
68
            if error is not None:
 
69
                error = ''.join(error)
 
70
            commands.append((cmd, input, output, error))
 
71
 
 
72
    cmd_cur = None
 
73
    cmd_line = 1
 
74
    lineno = 0
 
75
    input, output, error = None, None, None
 
76
    for line in text.split('\n'):
 
77
        lineno += 1
 
78
        # Keep a copy for error reporting
 
79
        orig = line
 
80
        comment =  line.find('#')
 
81
        if comment >= 0:
 
82
            # Delete comments
 
83
            line = line[0:comment]
 
84
            line = line.rstrip()
 
85
        if line == '':
 
86
            # Ignore empty lines
 
87
            continue
 
88
        if line.startswith('$'):
 
89
            # Time to output the current command
 
90
            add_command(cmd_cur, input, output, error)
 
91
            # And start a new one
 
92
            cmd_cur = list(split(line[1:]))
 
93
            cmd_line = lineno
 
94
            input, output, error = None, None, None
 
95
        elif line.startswith('<'):
 
96
            if input is None:
 
97
                if cmd_cur is None:
 
98
                    raise SyntaxError('No command for that input',
 
99
                                      (file_name, lineno, 1, orig))
 
100
                input = []
 
101
            input.append(line[1:] + '\n')
 
102
        elif line.startswith('2>'):
 
103
            if error is None:
 
104
                if cmd_cur is None:
 
105
                    raise SyntaxError('No command for that error',
 
106
                                      (file_name, lineno, 1, orig))
 
107
                error = []
 
108
            error.append(line[2:] + '\n')
 
109
        else:
 
110
            if output is None:
 
111
                if cmd_cur is None:
 
112
                    raise SyntaxError('No command for that output',
 
113
                                      (file_name, lineno, 1, orig))
 
114
                output = []
 
115
            output.append(line + '\n')
 
116
    # Add the last seen command
 
117
    add_command(cmd_cur, input, output, error)
 
118
    return commands
 
119
 
 
120
 
 
121
def _scan_redirection_options(args):
 
122
    """Recognize and process input and output redirections.
 
123
 
 
124
    :param args: The command line arguments
 
125
 
 
126
    :return: A tuple containing: 
 
127
        - The file name redirected from or None
 
128
        - The file name redirected to or None
 
129
        - The mode to open the output file or None
 
130
        - The reamining arguments
 
131
    """
 
132
    def redirected_file_name(direction, name, args):
 
133
        if name == '':
 
134
            try:
 
135
                name = args.pop(0)
 
136
            except IndexError:
 
137
                # We leave the error handling to higher levels, an empty name
 
138
                # can't be legal.
 
139
                name = ''
 
140
        return name
 
141
 
 
142
    remaining = []
 
143
    in_name = None
 
144
    out_name, out_mode = None, None
 
145
    while args:
 
146
        arg = args.pop(0)
 
147
        if arg.startswith('<'):
 
148
            in_name = redirected_file_name('<', arg[1:], args)
 
149
        elif arg.startswith('>>'):
 
150
            out_name = redirected_file_name('>>', arg[2:], args)
 
151
            out_mode = 'ab+'
 
152
        elif arg.startswith('>',):
 
153
            out_name = redirected_file_name('>', arg[1:], args)
 
154
            out_mode = 'wb+'
 
155
        else:
 
156
            remaining.append(arg)
 
157
    return in_name, out_name, out_mode, remaining
 
158
 
 
159
 
 
160
class ScriptRunner(object):
 
161
    """Run a shell-like script from a test.
 
162
    
 
163
    Can be used as:
 
164
 
 
165
    from bzrlib.tests import script
 
166
 
 
167
    ...
 
168
 
 
169
        def test_bug_nnnnn(self):
 
170
            sr = script.ScriptRunner()
 
171
            sr.run_script(self, '''
 
172
            $ bzr init
 
173
            $ bzr do-this
 
174
            # Boom, error
 
175
            ''')
 
176
    """
 
177
 
 
178
    def __init__(self):
 
179
        self.output_checker = doctest.OutputChecker()
 
180
        self.check_options = doctest.ELLIPSIS
 
181
 
 
182
    def run_script(self, test_case, text):
 
183
        """Run a shell-like script as a test.
 
184
 
 
185
        :param test_case: A TestCase instance that should provide the fail(),
 
186
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
187
            attribute used as a jail root.
 
188
 
 
189
        :param text: A shell-like script (see _script_to_commands for syntax).
 
190
        """
 
191
        for cmd, input, output, error in _script_to_commands(text):
 
192
            self.run_command(test_case, cmd, input, output, error)
 
193
 
 
194
    def run_command(self, test_case, cmd, input, output, error):
 
195
        mname = 'do_' + cmd[0]
 
196
        method = getattr(self, mname, None)
 
197
        if method is None:
 
198
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
199
                              None, 1, ' '.join(cmd))
 
200
        if input is None:
 
201
            str_input = ''
 
202
        else:
 
203
            str_input = ''.join(input)
 
204
        args = list(self._pre_process_args(cmd[1:]))
 
205
        retcode, actual_output, actual_error = method(test_case,
 
206
                                                      str_input, args)
 
207
 
 
208
        self._check_output(output, actual_output, test_case)
 
209
        self._check_output(error, actual_error, test_case)
 
210
        if retcode and not error and actual_error:
 
211
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
212
                           % (' '.join(cmd), actual_error))
 
213
        return retcode, actual_output, actual_error
 
214
 
 
215
    def _check_output(self, expected, actual, test_case):
 
216
        if expected is None:
 
217
            # Specifying None means: any output is accepted
 
218
            return
 
219
        if actual is None:
 
220
            test_case.fail('We expected output: %r, but found None'
 
221
                           % (expected,))
 
222
        matching = self.output_checker.check_output(
 
223
            expected, actual, self.check_options)
 
224
        if not matching:
 
225
            # Note that we can't use output_checker.output_difference() here
 
226
            # because... the API is broken ('expected' must be a doctest
 
227
            # specific object of which a 'want' attribute will be our
 
228
            # 'expected' parameter. So we just fallback to our good old
 
229
            # assertEqualDiff since we know there *are* differences and the
 
230
            # output should be decently readable.
 
231
            test_case.assertEqualDiff(expected, actual)
 
232
 
 
233
    def _pre_process_args(self, args):
 
234
        new_args = []
 
235
        for arg in args:
 
236
            # Strip the simple and double quotes since we don't care about
 
237
            # them.  We leave the backquotes in place though since they have a
 
238
            # different semantic.
 
239
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
240
                yield arg[1:-1]
 
241
            else:
 
242
                if glob.has_magic(arg):
 
243
                    matches = glob.glob(arg)
 
244
                    if matches:
 
245
                        # We care more about order stability than performance
 
246
                        # here
 
247
                        matches.sort()
 
248
                        for m in matches:
 
249
                            yield m
 
250
                else:
 
251
                    yield arg
 
252
 
 
253
    def _read_input(self, input, in_name):
 
254
        if in_name is not None:
 
255
            infile = open(in_name, 'rb')
 
256
            try:
 
257
                # Command redirection takes precedence over provided input
 
258
                input = infile.read()
 
259
            finally:
 
260
                infile.close()
 
261
        return input
 
262
 
 
263
    def _write_output(self, output, out_name, out_mode):
 
264
        if out_name is not None:
 
265
            outfile = open(out_name, out_mode)
 
266
            try:
 
267
                outfile.write(output)
 
268
            finally:
 
269
                outfile.close()
 
270
            output = None
 
271
        return output
 
272
 
 
273
    def do_bzr(self, test_case, input, args):
 
274
        retcode, out, err = test_case._run_bzr_core(
 
275
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
276
        return retcode, out, err
 
277
 
 
278
    def do_cat(self, test_case, input, args):
 
279
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
280
        if args and in_name is not None:
 
281
            raise SyntaxError('Specify a file OR use redirection')
 
282
 
 
283
        inputs = []
 
284
        if input:
 
285
            inputs.append(input)
 
286
        input_names = args
 
287
        if in_name:
 
288
            args.append(in_name)
 
289
        for in_name in input_names:
 
290
            try:
 
291
                inputs.append(self._read_input(None, in_name))
 
292
            except IOError, e:
 
293
                # Some filenames are illegal on Windows and generate EINVAL
 
294
                # rather than just saying the filename doesn't exist
 
295
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
296
                    return (1, None,
 
297
                            '%s: No such file or directory\n' % (in_name,))
 
298
                raise
 
299
        # Basically cat copy input to output
 
300
        output = ''.join(inputs)
 
301
        # Handle output redirections
 
302
        try:
 
303
            output = self._write_output(output, out_name, out_mode)
 
304
        except IOError, e:
 
305
            # If out_name cannot be created, we may get 'ENOENT', however if
 
306
            # out_name is something like '', we can get EINVAL
 
307
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
308
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
309
            raise
 
310
        return 0, output, None
 
311
 
 
312
    def do_echo(self, test_case, input, args):
 
313
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
314
        if input or in_name:
 
315
            raise SyntaxError('echo doesn\'t read from stdin')
 
316
        if args:
 
317
            input = ' '.join(args)
 
318
        # Always append a \n'
 
319
        input += '\n'
 
320
        # Process output
 
321
        output = input
 
322
        # Handle output redirections
 
323
        try:
 
324
            output = self._write_output(output, out_name, out_mode)
 
325
        except IOError, e:
 
326
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
327
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
328
            raise
 
329
        return 0, output, None
 
330
 
 
331
    def _get_jail_root(self, test_case):
 
332
        return test_case.test_dir
 
333
 
 
334
    def _ensure_in_jail(self, test_case, path):
 
335
        jail_root = self._get_jail_root(test_case)
 
336
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
337
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
338
 
 
339
    def do_cd(self, test_case, input, args):
 
340
        if len(args) > 1:
 
341
            raise SyntaxError('Usage: cd [dir]')
 
342
        if len(args) == 1:
 
343
            d = args[0]
 
344
            self._ensure_in_jail(test_case, d)
 
345
        else:
 
346
            # The test "home" directory is the root of its jail
 
347
            d = self._get_jail_root(test_case)
 
348
        os.chdir(d)
 
349
        return 0, None, None
 
350
 
 
351
    def do_mkdir(self, test_case, input, args):
 
352
        if not args or len(args) != 1:
 
353
            raise SyntaxError('Usage: mkdir dir')
 
354
        d = args[0]
 
355
        self._ensure_in_jail(test_case, d)
 
356
        os.mkdir(d)
 
357
        return 0, None, None
 
358
 
 
359
    def do_rm(self, test_case, input, args):
 
360
        err = None
 
361
 
 
362
        def error(msg, path):
 
363
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
364
 
 
365
        force, recursive = False, False
 
366
        opts = None
 
367
        if args and args[0][0] == '-':
 
368
            opts = args.pop(0)[1:]
 
369
            if 'f' in opts:
 
370
                force = True
 
371
                opts = opts.replace('f', '', 1)
 
372
            if 'r' in opts:
 
373
                recursive = True
 
374
                opts = opts.replace('r', '', 1)
 
375
        if not args or opts:
 
376
            raise SyntaxError('Usage: rm [-fr] path+')
 
377
        for p in args:
 
378
            self._ensure_in_jail(test_case, p)
 
379
            # FIXME: Should we put that in osutils ?
 
380
            try:
 
381
                os.remove(p)
 
382
            except OSError, e:
 
383
                # Various OSes raises different exceptions (linux: EISDIR,
 
384
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
385
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
386
                    if recursive:
 
387
                        osutils.rmtree(p)
 
388
                    else:
 
389
                        err = error('Is a directory', p)
 
390
                        break
 
391
                elif e.errno == errno.ENOENT:
 
392
                    if not force:
 
393
                        err =  error('No such file or directory', p)
 
394
                        break
 
395
                else:
 
396
                    raise
 
397
        if err:
 
398
            retcode = 1
 
399
        else:
 
400
            retcode = 0
 
401
        return retcode, None, err
 
402
 
 
403
 
 
404
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
405
    """Helper class to experiment shell-like test and memory fs.
 
406
 
 
407
    This not intended to be used outside of experiments in implementing memoy
 
408
    based file systems and evolving bzr so that test can use only memory based
 
409
    resources.
 
410
    """
 
411
 
 
412
    def setUp(self):
 
413
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
414
        self.script_runner = ScriptRunner()
 
415
 
 
416
    def run_script(self, script):
 
417
        return self.script_runner.run_script(self, script)
 
418
 
 
419
    def run_command(self, cmd, input, output, error):
 
420
        return self.script_runner.run_command(self, cmd, input, output, error)
 
421
 
 
422
 
 
423
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
424
    """Helper class to quickly define shell-like tests.
 
425
 
 
426
    Can be used as:
 
427
 
 
428
    from bzrlib.tests import script
 
429
 
 
430
 
 
431
    class TestBug(script.TestCaseWithTransportAndScript):
 
432
 
 
433
        def test_bug_nnnnn(self):
 
434
            self.run_script('''
 
435
            $ bzr init
 
436
            $ bzr do-this
 
437
            # Boom, error
 
438
            ''')
 
439
    """
 
440
 
 
441
    def setUp(self):
 
442
        super(TestCaseWithTransportAndScript, self).setUp()
 
443
        self.script_runner = ScriptRunner()
 
444
 
 
445
    def run_script(self, script):
 
446
        return self.script_runner.run_script(self, script)
 
447
 
 
448
    def run_command(self, cmd, input, output, error):
 
449
        return self.script_runner.run_command(self, cmd, input, output, error)
 
450