~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-09-18 09:49:39 UTC
  • mfrom: (4702.1.1 integration2)
  • Revision ID: pqm@pqm.ubuntu.com-20090918094939-xmi0ihhvas4qlks9
(vila) Introduce shell-like tests feature

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
from cStringIO import StringIO
 
28
 
 
29
from bzrlib import (
 
30
    osutils,
 
31
    tests,
 
32
    )
 
33
 
 
34
 
 
35
def split(s):
 
36
    """Split a command line respecting quotes."""
 
37
    scanner = shlex.shlex(s)
 
38
    scanner.quotes = '\'"`'
 
39
    scanner.whitespace_split = True
 
40
    for t in list(scanner):
 
41
        yield t
 
42
 
 
43
 
 
44
def _script_to_commands(text, file_name=None):
 
45
    """Turn a script into a list of commands with their associated IOs.
 
46
 
 
47
    Each command appears on a line by itself starting with '$ '. It can be
 
48
    associated with an input that will feed it and an expected output.
 
49
 
 
50
    Comments starts with '#' until the end of line.
 
51
    Empty lines are ignored.
 
52
 
 
53
    Input and output are full lines terminated by a '\n'.
 
54
 
 
55
    Input lines start with '<'.
 
56
    Output lines start with nothing.
 
57
    Error lines start with '2>'.
 
58
    """
 
59
 
 
60
    commands = []
 
61
 
 
62
    def add_command(cmd, input, output, error):
 
63
        if cmd is not None:
 
64
            if input is not None:
 
65
                input = ''.join(input)
 
66
            if output is not None:
 
67
                output = ''.join(output)
 
68
            if error is not None:
 
69
                error = ''.join(error)
 
70
            commands.append((cmd, input, output, error))
 
71
 
 
72
    cmd_cur = None
 
73
    cmd_line = 1
 
74
    lineno = 0
 
75
    input, output, error = None, None, None
 
76
    for line in text.split('\n'):
 
77
        lineno += 1
 
78
        # Keep a copy for error reporting
 
79
        orig = line
 
80
        comment =  line.find('#')
 
81
        if comment >= 0:
 
82
            # Delete comments
 
83
            line = line[0:comment]
 
84
            line = line.rstrip()
 
85
        if line == '':
 
86
            # Ignore empty lines
 
87
            continue
 
88
        if line.startswith('$'):
 
89
            # Time to output the current command
 
90
            add_command(cmd_cur, input, output, error)
 
91
            # And start a new one
 
92
            cmd_cur = list(split(line[1:]))
 
93
            cmd_line = lineno
 
94
            input, output, error = None, None, None
 
95
        elif line.startswith('<'):
 
96
            if input is None:
 
97
                if cmd_cur is None:
 
98
                    raise SyntaxError('No command for that input',
 
99
                                      (file_name, lineno, 1, orig))
 
100
                input = []
 
101
            input.append(line[1:] + '\n')
 
102
        elif line.startswith('2>'):
 
103
            if error is None:
 
104
                if cmd_cur is None:
 
105
                    raise SyntaxError('No command for that error',
 
106
                                      (file_name, lineno, 1, orig))
 
107
                error = []
 
108
            error.append(line[2:] + '\n')
 
109
        else:
 
110
            if output is None:
 
111
                if cmd_cur is None:
 
112
                    raise SyntaxError('No command for that output',
 
113
                                      (file_name, lineno, 1, orig))
 
114
                output = []
 
115
            output.append(line + '\n')
 
116
    # Add the last seen command
 
117
    add_command(cmd_cur, input, output, error)
 
118
    return commands
 
119
 
 
120
 
 
121
def _scan_redirection_options(args):
 
122
    """Recognize and process input and output redirections.
 
123
 
 
124
    :param args: The command line arguments
 
125
 
 
126
    :return: A tuple containing: 
 
127
        - The file name redirected from or None
 
128
        - The file name redirected to or None
 
129
        - The mode to open the output file or None
 
130
        - The reamining arguments
 
131
    """
 
132
    def redirected_file_name(direction, name, args):
 
133
        if name == '':
 
134
            try:
 
135
                name = args.pop(0)
 
136
            except IndexError:
 
137
                # We leave the error handling to higher levels, an empty name
 
138
                # can't be legal.
 
139
                name = ''
 
140
        return name
 
141
 
 
142
    remaining = []
 
143
    in_name = None
 
144
    out_name, out_mode = None, None
 
145
    while args:
 
146
        arg = args.pop(0)
 
147
        if arg.startswith('<'):
 
148
            in_name = redirected_file_name('<', arg[1:], args)
 
149
        elif arg.startswith('>>'):
 
150
            out_name = redirected_file_name('>>', arg[2:], args)
 
151
            out_mode = 'ab+'
 
152
        elif arg.startswith('>',):
 
153
            out_name = redirected_file_name('>', arg[1:], args)
 
154
            out_mode = 'wb+'
 
155
        else:
 
156
            remaining.append(arg)
 
157
    return in_name, out_name, out_mode, remaining
 
158
 
 
159
 
 
160
class ScriptRunner(object):
 
161
 
 
162
    def __init__(self, test_case):
 
163
        self.test_case = test_case
 
164
        self.output_checker = doctest.OutputChecker()
 
165
        self.check_options = doctest.ELLIPSIS
 
166
 
 
167
    def run_script(self, text):
 
168
        for cmd, input, output, error in _script_to_commands(text):
 
169
            self.run_command(cmd, input, output, error)
 
170
 
 
171
    def _check_output(self, expected, actual):
 
172
        if expected is None:
 
173
            # Specifying None means: any output is accepted
 
174
            return
 
175
        if actual is None:
 
176
            self.test_case.fail('Unexpected: %s' % actual)
 
177
        matching = self.output_checker.check_output(
 
178
            expected, actual, self.check_options)
 
179
        if not matching:
 
180
            # Note that we can't use output_checker.output_difference() here
 
181
            # because... the API is broken ('expected' must be a doctest
 
182
            # specific object of which a 'want' attribute will be our
 
183
            # 'expected' parameter. So we just fallback to our good old
 
184
            # assertEqualDiff since we know there *are* differences and the
 
185
            # output should be decently readable.
 
186
            self.test_case.assertEqualDiff(expected, actual)
 
187
 
 
188
    def _pre_process_args(self, args):
 
189
        new_args = []
 
190
        for arg in args:
 
191
            # Strip the simple and double quotes since we don't care about
 
192
            # them.  We leave the backquotes in place though since they have a
 
193
            # different semantic.
 
194
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
195
                yield arg[1:-1]
 
196
            else:
 
197
                if glob.has_magic(arg):
 
198
                    matches = glob.glob(arg)
 
199
                    if matches:
 
200
                        # We care more about order stability than performance
 
201
                        # here
 
202
                        matches.sort()
 
203
                        for m in matches:
 
204
                            yield m
 
205
                else:
 
206
                    yield arg
 
207
 
 
208
    def run_command(self, cmd, input, output, error):
 
209
        mname = 'do_' + cmd[0]
 
210
        method = getattr(self, mname, None)
 
211
        if method is None:
 
212
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
213
                              None, 1, ' '.join(cmd))
 
214
        if input is None:
 
215
            str_input = ''
 
216
        else:
 
217
            str_input = ''.join(input)
 
218
        args = list(self._pre_process_args(cmd[1:]))
 
219
        retcode, actual_output, actual_error = method(str_input, args)
 
220
 
 
221
        self._check_output(output, actual_output)
 
222
        self._check_output(error, actual_error)
 
223
        if retcode and not error and actual_error:
 
224
            self.test_case.fail('In \n\t%s\nUnexpected error: %s'
 
225
                                % (' '.join(cmd), actual_error))
 
226
        return retcode, actual_output, actual_error
 
227
 
 
228
    def _read_input(self, input, in_name):
 
229
        if in_name is not None:
 
230
            infile = open(in_name, 'rb')
 
231
            try:
 
232
                # Command redirection takes precedence over provided input
 
233
                input = infile.read()
 
234
            finally:
 
235
                infile.close()
 
236
        return input
 
237
 
 
238
    def _write_output(self, output, out_name, out_mode):
 
239
        if out_name is not None:
 
240
            outfile = open(out_name, out_mode)
 
241
            try:
 
242
                outfile.write(output)
 
243
            finally:
 
244
                outfile.close()
 
245
            output = None
 
246
        return output
 
247
 
 
248
    def do_bzr(self, input, args):
 
249
        retcode, out, err = self.test_case._run_bzr_core(
 
250
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
251
        return retcode, out, err
 
252
 
 
253
    def do_cat(self, input, args):
 
254
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
255
        if args and in_name is not None:
 
256
            raise SyntaxError('Specify a file OR use redirection')
 
257
 
 
258
        inputs = []
 
259
        if input:
 
260
            inputs.append(input)
 
261
        input_names = args
 
262
        if in_name:
 
263
            args.append(in_name)
 
264
        for in_name in input_names:
 
265
            try:
 
266
                inputs.append(self._read_input(None, in_name))
 
267
            except IOError, e:
 
268
                if e.errno == errno.ENOENT:
 
269
                    return (1, None,
 
270
                            '%s: No such file or directory\n' % (in_name,))
 
271
        # Basically cat copy input to output
 
272
        output = ''.join(inputs)
 
273
        # Handle output redirections
 
274
        try:
 
275
            output = self._write_output(output, out_name, out_mode)
 
276
        except IOError, e:
 
277
            if e.errno == errno.ENOENT:
 
278
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
279
        return 0, output, None
 
280
 
 
281
    def do_echo(self, input, args):
 
282
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
283
        if input and args:
 
284
                raise SyntaxError('Specify parameters OR use redirection')
 
285
        if args:
 
286
            input = ' '.join(args)
 
287
        try:
 
288
            input = self._read_input(input, in_name)
 
289
        except IOError, e:
 
290
            if e.errno == errno.ENOENT:
 
291
                return 1, None, '%s: No such file or directory\n' % (in_name,)
 
292
        # Always append a \n'
 
293
        input += '\n'
 
294
        # Process output
 
295
        output = input
 
296
        # Handle output redirections
 
297
        try:
 
298
            output = self._write_output(output, out_name, out_mode)
 
299
        except IOError, e:
 
300
            if e.errno == errno.ENOENT:
 
301
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
302
        return 0, output, None
 
303
 
 
304
    def _ensure_in_jail(self, path):
 
305
        jail_root = self.test_case.get_jail_root()
 
306
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
307
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
308
 
 
309
    def do_cd(self, input, args):
 
310
        if len(args) > 1:
 
311
            raise SyntaxError('Usage: cd [dir]')
 
312
        if len(args) == 1:
 
313
            d = args[0]
 
314
            self._ensure_in_jail(d)
 
315
        else:
 
316
            d = self.test_case.get_jail_root()
 
317
        os.chdir(d)
 
318
        return 0, None, None
 
319
 
 
320
    def do_mkdir(self, input, args):
 
321
        if not args or len(args) != 1:
 
322
            raise SyntaxError('Usage: mkdir dir')
 
323
        d = args[0]
 
324
        self._ensure_in_jail(d)
 
325
        os.mkdir(d)
 
326
        return 0, None, None
 
327
 
 
328
    def do_rm(self, input, args):
 
329
        err = None
 
330
 
 
331
        def error(msg, path):
 
332
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
333
 
 
334
        force, recursive = False, False
 
335
        opts = None
 
336
        if args and args[0][0] == '-':
 
337
            opts = args.pop(0)[1:]
 
338
            if 'f' in opts:
 
339
                force = True
 
340
                opts = opts.replace('f', '', 1)
 
341
            if 'r' in opts:
 
342
                recursive = True
 
343
                opts = opts.replace('r', '', 1)
 
344
        if not args or opts:
 
345
            raise SyntaxError('Usage: rm [-fr] path+')
 
346
        for p in args:
 
347
            self._ensure_in_jail(p)
 
348
            # FIXME: Should we put that in osutils ?
 
349
            try:
 
350
                os.remove(p)
 
351
            except OSError, e:
 
352
                if e.errno == errno.EISDIR:
 
353
                    if recursive:
 
354
                        osutils.rmtree(p)
 
355
                    else:
 
356
                        err = error('Is a directory', p)
 
357
                        break
 
358
                elif e.errno == errno.ENOENT:
 
359
                    if not force:
 
360
                        err =  error('No such file or directory', p)
 
361
                        break
 
362
                else:
 
363
                    raise
 
364
        if err:
 
365
            retcode = 1
 
366
        else:
 
367
            retcode = 0
 
368
        return retcode, None, err
 
369
 
 
370
 
 
371
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
372
 
 
373
    def setUp(self):
 
374
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
375
        self.script_runner = ScriptRunner(self)
 
376
        # Break the circular dependency
 
377
        def break_dependency():
 
378
            self.script_runner = None
 
379
        self.addCleanup(break_dependency)
 
380
 
 
381
    def get_jail_root(self):
 
382
        raise NotImplementedError(self.get_jail_root)
 
383
 
 
384
    def run_script(self, script):
 
385
        return self.script_runner.run_script(script)
 
386
 
 
387
    def run_command(self, cmd, input, output, error):
 
388
        return self.script_runner.run_command(cmd, input, output, error)
 
389
 
 
390
 
 
391
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
392
 
 
393
    def setUp(self):
 
394
        super(TestCaseWithTransportAndScript, self).setUp()
 
395
        self.script_runner = ScriptRunner(self)
 
396
        # Break the circular dependency
 
397
        def break_dependency():
 
398
            self.script_runner = None
 
399
        self.addCleanup(break_dependency)
 
400
 
 
401
    def get_jail_root(self):
 
402
        return self.test_dir
 
403
 
 
404
    def run_script(self, script):
 
405
        return self.script_runner.run_script(script)
 
406
 
 
407
    def run_command(self, cmd, input, output, error):
 
408
        return self.script_runner.run_command(cmd, input, output, error)