~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

  • Committer: Andrew Bennetts
  • Date: 2010-10-13 00:26:41 UTC
  • mto: This revision was merged to the branch mainline in revision 5498.
  • Revision ID: andrew.bennetts@canonical.com-20101013002641-9tlh9k89mlj1666m
Keep docs-plain working.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009, 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
import textwrap
 
28
from cStringIO import StringIO
 
29
 
 
30
from bzrlib import (
 
31
    osutils,
 
32
    tests,
 
33
    )
 
34
 
 
35
 
 
36
def split(s):
 
37
    """Split a command line respecting quotes."""
 
38
    scanner = shlex.shlex(s)
 
39
    scanner.quotes = '\'"`'
 
40
    scanner.whitespace_split = True
 
41
    for t in list(scanner):
 
42
        yield t
 
43
 
 
44
 
 
45
def _script_to_commands(text, file_name=None):
 
46
    """Turn a script into a list of commands with their associated IOs.
 
47
 
 
48
    Each command appears on a line by itself starting with '$ '. It can be
 
49
    associated with an input that will feed it and an expected output.
 
50
 
 
51
    Comments starts with '#' until the end of line.
 
52
    Empty lines are ignored.
 
53
 
 
54
    Input and output are full lines terminated by a '\n'.
 
55
 
 
56
    Input lines start with '<'.
 
57
    Output lines start with nothing.
 
58
    Error lines start with '2>'.
 
59
 
 
60
    :return: A sequence of ([args], input, output, errors), where the args are
 
61
        split in to words, and the input, output, and errors are just strings,
 
62
        typically containing newlines.
 
63
    """
 
64
 
 
65
    commands = []
 
66
 
 
67
    def add_command(cmd, input, output, error):
 
68
        if cmd is not None:
 
69
            if input is not None:
 
70
                input = ''.join(input)
 
71
            if output is not None:
 
72
                output = ''.join(output)
 
73
            if error is not None:
 
74
                error = ''.join(error)
 
75
            commands.append((cmd, input, output, error))
 
76
 
 
77
    cmd_cur = None
 
78
    cmd_line = 1
 
79
    lineno = 0
 
80
    input, output, error = None, None, None
 
81
    text = textwrap.dedent(text)
 
82
    lines = text.split('\n')
 
83
    # to make use of triple-quoted strings easier, we ignore a blank line
 
84
    # right at the start and right at the end; the rest are meaningful
 
85
    if lines and lines[0] == '':
 
86
        del lines[0]
 
87
    if lines and lines[-1] == '':
 
88
        del lines[-1]
 
89
    for line in lines:
 
90
        lineno += 1
 
91
        # Keep a copy for error reporting
 
92
        orig = line
 
93
        comment =  line.find('#')
 
94
        if comment >= 0:
 
95
            # Delete comments
 
96
            # NB: this syntax means comments are allowed inside output, which
 
97
            # may be confusing...
 
98
            line = line[0:comment]
 
99
            line = line.rstrip()
 
100
            if line == '':
 
101
                continue
 
102
        if line.startswith('$'):
 
103
            # Time to output the current command
 
104
            add_command(cmd_cur, input, output, error)
 
105
            # And start a new one
 
106
            cmd_cur = list(split(line[1:]))
 
107
            cmd_line = lineno
 
108
            input, output, error = None, None, None
 
109
        elif line.startswith('<'):
 
110
            if input is None:
 
111
                if cmd_cur is None:
 
112
                    raise SyntaxError('No command for that input',
 
113
                                      (file_name, lineno, 1, orig))
 
114
                input = []
 
115
            input.append(line[1:] + '\n')
 
116
        elif line.startswith('2>'):
 
117
            if error is None:
 
118
                if cmd_cur is None:
 
119
                    raise SyntaxError('No command for that error',
 
120
                                      (file_name, lineno, 1, orig))
 
121
                error = []
 
122
            error.append(line[2:] + '\n')
 
123
        else:
 
124
            # can happen if the first line is not recognized as a command, eg
 
125
            # if the prompt has leading whitespace
 
126
            if output is None:
 
127
                if cmd_cur is None:
 
128
                    raise SyntaxError('No command for line %r' % (line,),
 
129
                                      (file_name, lineno, 1, orig))
 
130
                output = []
 
131
            output.append(line + '\n')
 
132
    # Add the last seen command
 
133
    add_command(cmd_cur, input, output, error)
 
134
    return commands
 
135
 
 
136
 
 
137
def _scan_redirection_options(args):
 
138
    """Recognize and process input and output redirections.
 
139
 
 
140
    :param args: The command line arguments
 
141
 
 
142
    :return: A tuple containing: 
 
143
        - The file name redirected from or None
 
144
        - The file name redirected to or None
 
145
        - The mode to open the output file or None
 
146
        - The reamining arguments
 
147
    """
 
148
    def redirected_file_name(direction, name, args):
 
149
        if name == '':
 
150
            try:
 
151
                name = args.pop(0)
 
152
            except IndexError:
 
153
                # We leave the error handling to higher levels, an empty name
 
154
                # can't be legal.
 
155
                name = ''
 
156
        return name
 
157
 
 
158
    remaining = []
 
159
    in_name = None
 
160
    out_name, out_mode = None, None
 
161
    while args:
 
162
        arg = args.pop(0)
 
163
        if arg.startswith('<'):
 
164
            in_name = redirected_file_name('<', arg[1:], args)
 
165
        elif arg.startswith('>>'):
 
166
            out_name = redirected_file_name('>>', arg[2:], args)
 
167
            out_mode = 'ab+'
 
168
        elif arg.startswith('>',):
 
169
            out_name = redirected_file_name('>', arg[1:], args)
 
170
            out_mode = 'wb+'
 
171
        else:
 
172
            remaining.append(arg)
 
173
    return in_name, out_name, out_mode, remaining
 
174
 
 
175
 
 
176
class ScriptRunner(object):
 
177
    """Run a shell-like script from a test.
 
178
    
 
179
    Can be used as:
 
180
 
 
181
    from bzrlib.tests import script
 
182
 
 
183
    ...
 
184
 
 
185
        def test_bug_nnnnn(self):
 
186
            sr = script.ScriptRunner()
 
187
            sr.run_script(self, '''
 
188
            $ bzr init
 
189
            $ bzr do-this
 
190
            # Boom, error
 
191
            ''')
 
192
    """
 
193
 
 
194
    def __init__(self):
 
195
        self.output_checker = doctest.OutputChecker()
 
196
        self.check_options = doctest.ELLIPSIS
 
197
 
 
198
    def run_script(self, test_case, text):
 
199
        """Run a shell-like script as a test.
 
200
 
 
201
        :param test_case: A TestCase instance that should provide the fail(),
 
202
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
203
            attribute used as a jail root.
 
204
 
 
205
        :param text: A shell-like script (see _script_to_commands for syntax).
 
206
        """
 
207
        for cmd, input, output, error in _script_to_commands(text):
 
208
            self.run_command(test_case, cmd, input, output, error)
 
209
 
 
210
    def run_command(self, test_case, cmd, input, output, error):
 
211
        mname = 'do_' + cmd[0]
 
212
        method = getattr(self, mname, None)
 
213
        if method is None:
 
214
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
215
                              None, 1, ' '.join(cmd))
 
216
        if input is None:
 
217
            str_input = ''
 
218
        else:
 
219
            str_input = ''.join(input)
 
220
        args = list(self._pre_process_args(cmd[1:]))
 
221
        retcode, actual_output, actual_error = method(test_case,
 
222
                                                      str_input, args)
 
223
 
 
224
        self._check_output(output, actual_output, test_case)
 
225
        self._check_output(error, actual_error, test_case)
 
226
        if retcode and not error and actual_error:
 
227
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
228
                           % (' '.join(cmd), actual_error))
 
229
        return retcode, actual_output, actual_error
 
230
 
 
231
    def _check_output(self, expected, actual, test_case):
 
232
        if expected is None:
 
233
            # Specifying None means: any output is accepted
 
234
            return
 
235
        if actual is None:
 
236
            test_case.fail('We expected output: %r, but found None'
 
237
                           % (expected,))
 
238
        matching = self.output_checker.check_output(
 
239
            expected, actual, self.check_options)
 
240
        if not matching:
 
241
            # Note that we can't use output_checker.output_difference() here
 
242
            # because... the API is broken ('expected' must be a doctest
 
243
            # specific object of which a 'want' attribute will be our
 
244
            # 'expected' parameter. So we just fallback to our good old
 
245
            # assertEqualDiff since we know there *are* differences and the
 
246
            # output should be decently readable.
 
247
            #
 
248
            # As a special case, we allow output that's missing a final
 
249
            # newline to match an expected string that does have one, so that
 
250
            # we can match a prompt printed on one line, then input given on
 
251
            # the next line.
 
252
            if expected == actual + '\n':
 
253
                pass
 
254
            else:
 
255
                test_case.assertEqualDiff(expected, actual)
 
256
 
 
257
    def _pre_process_args(self, args):
 
258
        new_args = []
 
259
        for arg in args:
 
260
            # Strip the simple and double quotes since we don't care about
 
261
            # them.  We leave the backquotes in place though since they have a
 
262
            # different semantic.
 
263
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
264
                yield arg[1:-1]
 
265
            else:
 
266
                if glob.has_magic(arg):
 
267
                    matches = glob.glob(arg)
 
268
                    if matches:
 
269
                        # We care more about order stability than performance
 
270
                        # here
 
271
                        matches.sort()
 
272
                        for m in matches:
 
273
                            yield m
 
274
                else:
 
275
                    yield arg
 
276
 
 
277
    def _read_input(self, input, in_name):
 
278
        if in_name is not None:
 
279
            infile = open(in_name, 'rb')
 
280
            try:
 
281
                # Command redirection takes precedence over provided input
 
282
                input = infile.read()
 
283
            finally:
 
284
                infile.close()
 
285
        return input
 
286
 
 
287
    def _write_output(self, output, out_name, out_mode):
 
288
        if out_name is not None:
 
289
            outfile = open(out_name, out_mode)
 
290
            try:
 
291
                outfile.write(output)
 
292
            finally:
 
293
                outfile.close()
 
294
            output = None
 
295
        return output
 
296
 
 
297
    def do_bzr(self, test_case, input, args):
 
298
        retcode, out, err = test_case._run_bzr_core(
 
299
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
300
        return retcode, out, err
 
301
 
 
302
    def do_cat(self, test_case, input, args):
 
303
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
304
        if args and in_name is not None:
 
305
            raise SyntaxError('Specify a file OR use redirection')
 
306
 
 
307
        inputs = []
 
308
        if input:
 
309
            inputs.append(input)
 
310
        input_names = args
 
311
        if in_name:
 
312
            args.append(in_name)
 
313
        for in_name in input_names:
 
314
            try:
 
315
                inputs.append(self._read_input(None, in_name))
 
316
            except IOError, e:
 
317
                # Some filenames are illegal on Windows and generate EINVAL
 
318
                # rather than just saying the filename doesn't exist
 
319
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
320
                    return (1, None,
 
321
                            '%s: No such file or directory\n' % (in_name,))
 
322
                raise
 
323
        # Basically cat copy input to output
 
324
        output = ''.join(inputs)
 
325
        # Handle output redirections
 
326
        try:
 
327
            output = self._write_output(output, out_name, out_mode)
 
328
        except IOError, e:
 
329
            # If out_name cannot be created, we may get 'ENOENT', however if
 
330
            # out_name is something like '', we can get EINVAL
 
331
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
332
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
333
            raise
 
334
        return 0, output, None
 
335
 
 
336
    def do_echo(self, test_case, input, args):
 
337
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
338
        if input or in_name:
 
339
            raise SyntaxError('echo doesn\'t read from stdin')
 
340
        if args:
 
341
            input = ' '.join(args)
 
342
        # Always append a \n'
 
343
        input += '\n'
 
344
        # Process output
 
345
        output = input
 
346
        # Handle output redirections
 
347
        try:
 
348
            output = self._write_output(output, out_name, out_mode)
 
349
        except IOError, e:
 
350
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
351
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
352
            raise
 
353
        return 0, output, None
 
354
 
 
355
    def _get_jail_root(self, test_case):
 
356
        return test_case.test_dir
 
357
 
 
358
    def _ensure_in_jail(self, test_case, path):
 
359
        jail_root = self._get_jail_root(test_case)
 
360
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
361
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
362
 
 
363
    def do_cd(self, test_case, input, args):
 
364
        if len(args) > 1:
 
365
            raise SyntaxError('Usage: cd [dir]')
 
366
        if len(args) == 1:
 
367
            d = args[0]
 
368
            self._ensure_in_jail(test_case, d)
 
369
        else:
 
370
            # The test "home" directory is the root of its jail
 
371
            d = self._get_jail_root(test_case)
 
372
        os.chdir(d)
 
373
        return 0, None, None
 
374
 
 
375
    def do_mkdir(self, test_case, input, args):
 
376
        if not args or len(args) != 1:
 
377
            raise SyntaxError('Usage: mkdir dir')
 
378
        d = args[0]
 
379
        self._ensure_in_jail(test_case, d)
 
380
        os.mkdir(d)
 
381
        return 0, None, None
 
382
 
 
383
    def do_rm(self, test_case, input, args):
 
384
        err = None
 
385
 
 
386
        def error(msg, path):
 
387
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
388
 
 
389
        force, recursive = False, False
 
390
        opts = None
 
391
        if args and args[0][0] == '-':
 
392
            opts = args.pop(0)[1:]
 
393
            if 'f' in opts:
 
394
                force = True
 
395
                opts = opts.replace('f', '', 1)
 
396
            if 'r' in opts:
 
397
                recursive = True
 
398
                opts = opts.replace('r', '', 1)
 
399
        if not args or opts:
 
400
            raise SyntaxError('Usage: rm [-fr] path+')
 
401
        for p in args:
 
402
            self._ensure_in_jail(test_case, p)
 
403
            # FIXME: Should we put that in osutils ?
 
404
            try:
 
405
                os.remove(p)
 
406
            except OSError, e:
 
407
                # Various OSes raises different exceptions (linux: EISDIR,
 
408
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
409
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
410
                    if recursive:
 
411
                        osutils.rmtree(p)
 
412
                    else:
 
413
                        err = error('Is a directory', p)
 
414
                        break
 
415
                elif e.errno == errno.ENOENT:
 
416
                    if not force:
 
417
                        err =  error('No such file or directory', p)
 
418
                        break
 
419
                else:
 
420
                    raise
 
421
        if err:
 
422
            retcode = 1
 
423
        else:
 
424
            retcode = 0
 
425
        return retcode, None, err
 
426
 
 
427
    def do_mv(self, test_case, input, args):
 
428
        err = None
 
429
        def error(msg, src, dst):
 
430
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
 
431
 
 
432
        if not args or len(args) != 2:
 
433
            raise SyntaxError("Usage: mv path1 path2")
 
434
        src, dst = args
 
435
        try:
 
436
            real_dst = dst
 
437
            if os.path.isdir(dst):
 
438
                real_dst = os.path.join(dst, os.path.basename(src))
 
439
            os.rename(src, real_dst)
 
440
        except OSError, e:
 
441
            if e.errno == errno.ENOENT:
 
442
                err = error('No such file or directory', src, dst)
 
443
            else:
 
444
                raise
 
445
        if err:
 
446
            retcode = 1
 
447
        else:
 
448
            retcode = 0
 
449
        return retcode, None, err
 
450
 
 
451
 
 
452
 
 
453
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
454
    """Helper class to experiment shell-like test and memory fs.
 
455
 
 
456
    This not intended to be used outside of experiments in implementing memoy
 
457
    based file systems and evolving bzr so that test can use only memory based
 
458
    resources.
 
459
    """
 
460
 
 
461
    def setUp(self):
 
462
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
463
        self.script_runner = ScriptRunner()
 
464
 
 
465
    def run_script(self, script):
 
466
        return self.script_runner.run_script(self, script)
 
467
 
 
468
    def run_command(self, cmd, input, output, error):
 
469
        return self.script_runner.run_command(self, cmd, input, output, error)
 
470
 
 
471
 
 
472
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
473
    """Helper class to quickly define shell-like tests.
 
474
 
 
475
    Can be used as:
 
476
 
 
477
    from bzrlib.tests import script
 
478
 
 
479
 
 
480
    class TestBug(script.TestCaseWithTransportAndScript):
 
481
 
 
482
        def test_bug_nnnnn(self):
 
483
            self.run_script('''
 
484
            $ bzr init
 
485
            $ bzr do-this
 
486
            # Boom, error
 
487
            ''')
 
488
    """
 
489
 
 
490
    def setUp(self):
 
491
        super(TestCaseWithTransportAndScript, self).setUp()
 
492
        self.script_runner = ScriptRunner()
 
493
 
 
494
    def run_script(self, script):
 
495
        return self.script_runner.run_script(self, script)
 
496
 
 
497
    def run_command(self, cmd, input, output, error):
 
498
        return self.script_runner.run_command(self, cmd, input, output, error)
 
499
 
 
500
 
 
501
def run_script(test_case, script_string):
 
502
    """Run the given script within a testcase"""
 
503
    return ScriptRunner().run_script(test_case, script_string)