~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-22 17:14:45 UTC
  • mto: This revision was merged to the branch mainline in revision 4922.
  • Revision ID: john@arbash-meinel.com-20091222171445-fm7lcwhmkfdkc01q
Handle the fact that osutils requires the feature to be available.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
from cStringIO import StringIO
 
28
 
 
29
from bzrlib import (
 
30
    osutils,
 
31
    tests,
 
32
    )
 
33
 
 
34
 
 
35
def split(s):
 
36
    """Split a command line respecting quotes."""
 
37
    scanner = shlex.shlex(s)
 
38
    scanner.quotes = '\'"`'
 
39
    scanner.whitespace_split = True
 
40
    for t in list(scanner):
 
41
        yield t
 
42
 
 
43
 
 
44
def _script_to_commands(text, file_name=None):
 
45
    """Turn a script into a list of commands with their associated IOs.
 
46
 
 
47
    Each command appears on a line by itself starting with '$ '. It can be
 
48
    associated with an input that will feed it and an expected output.
 
49
 
 
50
    Comments starts with '#' until the end of line.
 
51
    Empty lines are ignored.
 
52
 
 
53
    Input and output are full lines terminated by a '\n'.
 
54
 
 
55
    Input lines start with '<'.
 
56
    Output lines start with nothing.
 
57
    Error lines start with '2>'.
 
58
    """
 
59
 
 
60
    commands = []
 
61
 
 
62
    def add_command(cmd, input, output, error):
 
63
        if cmd is not None:
 
64
            if input is not None:
 
65
                input = ''.join(input)
 
66
            if output is not None:
 
67
                output = ''.join(output)
 
68
            if error is not None:
 
69
                error = ''.join(error)
 
70
            commands.append((cmd, input, output, error))
 
71
 
 
72
    cmd_cur = None
 
73
    cmd_line = 1
 
74
    lineno = 0
 
75
    input, output, error = None, None, None
 
76
    for line in text.split('\n'):
 
77
        lineno += 1
 
78
        # Keep a copy for error reporting
 
79
        orig = line
 
80
        comment =  line.find('#')
 
81
        if comment >= 0:
 
82
            # Delete comments
 
83
            line = line[0:comment]
 
84
            line = line.rstrip()
 
85
        if line == '':
 
86
            # Ignore empty lines
 
87
            continue
 
88
        if line.startswith('$'):
 
89
            # Time to output the current command
 
90
            add_command(cmd_cur, input, output, error)
 
91
            # And start a new one
 
92
            cmd_cur = list(split(line[1:]))
 
93
            cmd_line = lineno
 
94
            input, output, error = None, None, None
 
95
        elif line.startswith('<'):
 
96
            if input is None:
 
97
                if cmd_cur is None:
 
98
                    raise SyntaxError('No command for that input',
 
99
                                      (file_name, lineno, 1, orig))
 
100
                input = []
 
101
            input.append(line[1:] + '\n')
 
102
        elif line.startswith('2>'):
 
103
            if error is None:
 
104
                if cmd_cur is None:
 
105
                    raise SyntaxError('No command for that error',
 
106
                                      (file_name, lineno, 1, orig))
 
107
                error = []
 
108
            error.append(line[2:] + '\n')
 
109
        else:
 
110
            # can happen if the first line is not recognized as a command, eg
 
111
            # if the prompt has leading whitespace
 
112
            if output is None:
 
113
                if cmd_cur is None:
 
114
                    raise SyntaxError('No command for line %r' % (line,),
 
115
                                      (file_name, lineno, 1, orig))
 
116
                output = []
 
117
            output.append(line + '\n')
 
118
    # Add the last seen command
 
119
    add_command(cmd_cur, input, output, error)
 
120
    return commands
 
121
 
 
122
 
 
123
def _scan_redirection_options(args):
 
124
    """Recognize and process input and output redirections.
 
125
 
 
126
    :param args: The command line arguments
 
127
 
 
128
    :return: A tuple containing: 
 
129
        - The file name redirected from or None
 
130
        - The file name redirected to or None
 
131
        - The mode to open the output file or None
 
132
        - The reamining arguments
 
133
    """
 
134
    def redirected_file_name(direction, name, args):
 
135
        if name == '':
 
136
            try:
 
137
                name = args.pop(0)
 
138
            except IndexError:
 
139
                # We leave the error handling to higher levels, an empty name
 
140
                # can't be legal.
 
141
                name = ''
 
142
        return name
 
143
 
 
144
    remaining = []
 
145
    in_name = None
 
146
    out_name, out_mode = None, None
 
147
    while args:
 
148
        arg = args.pop(0)
 
149
        if arg.startswith('<'):
 
150
            in_name = redirected_file_name('<', arg[1:], args)
 
151
        elif arg.startswith('>>'):
 
152
            out_name = redirected_file_name('>>', arg[2:], args)
 
153
            out_mode = 'ab+'
 
154
        elif arg.startswith('>',):
 
155
            out_name = redirected_file_name('>', arg[1:], args)
 
156
            out_mode = 'wb+'
 
157
        else:
 
158
            remaining.append(arg)
 
159
    return in_name, out_name, out_mode, remaining
 
160
 
 
161
 
 
162
class ScriptRunner(object):
 
163
    """Run a shell-like script from a test.
 
164
    
 
165
    Can be used as:
 
166
 
 
167
    from bzrlib.tests import script
 
168
 
 
169
    ...
 
170
 
 
171
        def test_bug_nnnnn(self):
 
172
            sr = script.ScriptRunner()
 
173
            sr.run_script(self, '''
 
174
            $ bzr init
 
175
            $ bzr do-this
 
176
            # Boom, error
 
177
            ''')
 
178
    """
 
179
 
 
180
    def __init__(self):
 
181
        self.output_checker = doctest.OutputChecker()
 
182
        self.check_options = doctest.ELLIPSIS
 
183
 
 
184
    def run_script(self, test_case, text):
 
185
        """Run a shell-like script as a test.
 
186
 
 
187
        :param test_case: A TestCase instance that should provide the fail(),
 
188
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
189
            attribute used as a jail root.
 
190
 
 
191
        :param text: A shell-like script (see _script_to_commands for syntax).
 
192
        """
 
193
        for cmd, input, output, error in _script_to_commands(text):
 
194
            self.run_command(test_case, cmd, input, output, error)
 
195
 
 
196
    def run_command(self, test_case, cmd, input, output, error):
 
197
        mname = 'do_' + cmd[0]
 
198
        method = getattr(self, mname, None)
 
199
        if method is None:
 
200
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
201
                              None, 1, ' '.join(cmd))
 
202
        if input is None:
 
203
            str_input = ''
 
204
        else:
 
205
            str_input = ''.join(input)
 
206
        args = list(self._pre_process_args(cmd[1:]))
 
207
        retcode, actual_output, actual_error = method(test_case,
 
208
                                                      str_input, args)
 
209
 
 
210
        self._check_output(output, actual_output, test_case)
 
211
        self._check_output(error, actual_error, test_case)
 
212
        if retcode and not error and actual_error:
 
213
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
214
                           % (' '.join(cmd), actual_error))
 
215
        return retcode, actual_output, actual_error
 
216
 
 
217
    def _check_output(self, expected, actual, test_case):
 
218
        if expected is None:
 
219
            # Specifying None means: any output is accepted
 
220
            return
 
221
        if actual is None:
 
222
            test_case.fail('We expected output: %r, but found None'
 
223
                           % (expected,))
 
224
        matching = self.output_checker.check_output(
 
225
            expected, actual, self.check_options)
 
226
        if not matching:
 
227
            # Note that we can't use output_checker.output_difference() here
 
228
            # because... the API is broken ('expected' must be a doctest
 
229
            # specific object of which a 'want' attribute will be our
 
230
            # 'expected' parameter. So we just fallback to our good old
 
231
            # assertEqualDiff since we know there *are* differences and the
 
232
            # output should be decently readable.
 
233
            test_case.assertEqualDiff(expected, actual)
 
234
 
 
235
    def _pre_process_args(self, args):
 
236
        new_args = []
 
237
        for arg in args:
 
238
            # Strip the simple and double quotes since we don't care about
 
239
            # them.  We leave the backquotes in place though since they have a
 
240
            # different semantic.
 
241
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
242
                yield arg[1:-1]
 
243
            else:
 
244
                if glob.has_magic(arg):
 
245
                    matches = glob.glob(arg)
 
246
                    if matches:
 
247
                        # We care more about order stability than performance
 
248
                        # here
 
249
                        matches.sort()
 
250
                        for m in matches:
 
251
                            yield m
 
252
                else:
 
253
                    yield arg
 
254
 
 
255
    def _read_input(self, input, in_name):
 
256
        if in_name is not None:
 
257
            infile = open(in_name, 'rb')
 
258
            try:
 
259
                # Command redirection takes precedence over provided input
 
260
                input = infile.read()
 
261
            finally:
 
262
                infile.close()
 
263
        return input
 
264
 
 
265
    def _write_output(self, output, out_name, out_mode):
 
266
        if out_name is not None:
 
267
            outfile = open(out_name, out_mode)
 
268
            try:
 
269
                outfile.write(output)
 
270
            finally:
 
271
                outfile.close()
 
272
            output = None
 
273
        return output
 
274
 
 
275
    def do_bzr(self, test_case, input, args):
 
276
        retcode, out, err = test_case._run_bzr_core(
 
277
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
278
        return retcode, out, err
 
279
 
 
280
    def do_cat(self, test_case, input, args):
 
281
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
282
        if args and in_name is not None:
 
283
            raise SyntaxError('Specify a file OR use redirection')
 
284
 
 
285
        inputs = []
 
286
        if input:
 
287
            inputs.append(input)
 
288
        input_names = args
 
289
        if in_name:
 
290
            args.append(in_name)
 
291
        for in_name in input_names:
 
292
            try:
 
293
                inputs.append(self._read_input(None, in_name))
 
294
            except IOError, e:
 
295
                # Some filenames are illegal on Windows and generate EINVAL
 
296
                # rather than just saying the filename doesn't exist
 
297
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
298
                    return (1, None,
 
299
                            '%s: No such file or directory\n' % (in_name,))
 
300
                raise
 
301
        # Basically cat copy input to output
 
302
        output = ''.join(inputs)
 
303
        # Handle output redirections
 
304
        try:
 
305
            output = self._write_output(output, out_name, out_mode)
 
306
        except IOError, e:
 
307
            # If out_name cannot be created, we may get 'ENOENT', however if
 
308
            # out_name is something like '', we can get EINVAL
 
309
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
310
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
311
            raise
 
312
        return 0, output, None
 
313
 
 
314
    def do_echo(self, test_case, input, args):
 
315
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
316
        if input or in_name:
 
317
            raise SyntaxError('echo doesn\'t read from stdin')
 
318
        if args:
 
319
            input = ' '.join(args)
 
320
        # Always append a \n'
 
321
        input += '\n'
 
322
        # Process output
 
323
        output = input
 
324
        # Handle output redirections
 
325
        try:
 
326
            output = self._write_output(output, out_name, out_mode)
 
327
        except IOError, e:
 
328
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
329
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
330
            raise
 
331
        return 0, output, None
 
332
 
 
333
    def _get_jail_root(self, test_case):
 
334
        return test_case.test_dir
 
335
 
 
336
    def _ensure_in_jail(self, test_case, path):
 
337
        jail_root = self._get_jail_root(test_case)
 
338
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
339
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
340
 
 
341
    def do_cd(self, test_case, input, args):
 
342
        if len(args) > 1:
 
343
            raise SyntaxError('Usage: cd [dir]')
 
344
        if len(args) == 1:
 
345
            d = args[0]
 
346
            self._ensure_in_jail(test_case, d)
 
347
        else:
 
348
            # The test "home" directory is the root of its jail
 
349
            d = self._get_jail_root(test_case)
 
350
        os.chdir(d)
 
351
        return 0, None, None
 
352
 
 
353
    def do_mkdir(self, test_case, input, args):
 
354
        if not args or len(args) != 1:
 
355
            raise SyntaxError('Usage: mkdir dir')
 
356
        d = args[0]
 
357
        self._ensure_in_jail(test_case, d)
 
358
        os.mkdir(d)
 
359
        return 0, None, None
 
360
 
 
361
    def do_rm(self, test_case, input, args):
 
362
        err = None
 
363
 
 
364
        def error(msg, path):
 
365
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
366
 
 
367
        force, recursive = False, False
 
368
        opts = None
 
369
        if args and args[0][0] == '-':
 
370
            opts = args.pop(0)[1:]
 
371
            if 'f' in opts:
 
372
                force = True
 
373
                opts = opts.replace('f', '', 1)
 
374
            if 'r' in opts:
 
375
                recursive = True
 
376
                opts = opts.replace('r', '', 1)
 
377
        if not args or opts:
 
378
            raise SyntaxError('Usage: rm [-fr] path+')
 
379
        for p in args:
 
380
            self._ensure_in_jail(test_case, p)
 
381
            # FIXME: Should we put that in osutils ?
 
382
            try:
 
383
                os.remove(p)
 
384
            except OSError, e:
 
385
                # Various OSes raises different exceptions (linux: EISDIR,
 
386
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
387
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
388
                    if recursive:
 
389
                        osutils.rmtree(p)
 
390
                    else:
 
391
                        err = error('Is a directory', p)
 
392
                        break
 
393
                elif e.errno == errno.ENOENT:
 
394
                    if not force:
 
395
                        err =  error('No such file or directory', p)
 
396
                        break
 
397
                else:
 
398
                    raise
 
399
        if err:
 
400
            retcode = 1
 
401
        else:
 
402
            retcode = 0
 
403
        return retcode, None, err
 
404
 
 
405
 
 
406
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
407
    """Helper class to experiment shell-like test and memory fs.
 
408
 
 
409
    This not intended to be used outside of experiments in implementing memoy
 
410
    based file systems and evolving bzr so that test can use only memory based
 
411
    resources.
 
412
    """
 
413
 
 
414
    def setUp(self):
 
415
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
416
        self.script_runner = ScriptRunner()
 
417
 
 
418
    def run_script(self, script):
 
419
        return self.script_runner.run_script(self, script)
 
420
 
 
421
    def run_command(self, cmd, input, output, error):
 
422
        return self.script_runner.run_command(self, cmd, input, output, error)
 
423
 
 
424
 
 
425
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
426
    """Helper class to quickly define shell-like tests.
 
427
 
 
428
    Can be used as:
 
429
 
 
430
    from bzrlib.tests import script
 
431
 
 
432
 
 
433
    class TestBug(script.TestCaseWithTransportAndScript):
 
434
 
 
435
        def test_bug_nnnnn(self):
 
436
            self.run_script('''
 
437
            $ bzr init
 
438
            $ bzr do-this
 
439
            # Boom, error
 
440
            ''')
 
441
    """
 
442
 
 
443
    def setUp(self):
 
444
        super(TestCaseWithTransportAndScript, self).setUp()
 
445
        self.script_runner = ScriptRunner()
 
446
 
 
447
    def run_script(self, script):
 
448
        return self.script_runner.run_script(self, script)
 
449
 
 
450
    def run_command(self, cmd, input, output, error):
 
451
        return self.script_runner.run_command(self, cmd, input, output, error)
 
452