~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

Merge checkout-tags-propagation-603395-2.2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009, 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
import textwrap
 
28
from cStringIO import StringIO
 
29
 
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    tests,
 
34
    )
 
35
 
 
36
 
 
37
def split(s):
 
38
    """Split a command line respecting quotes."""
 
39
    scanner = shlex.shlex(s)
 
40
    scanner.quotes = '\'"`'
 
41
    scanner.whitespace_split = True
 
42
    for t in list(scanner):
 
43
        yield t
 
44
 
 
45
 
 
46
def _script_to_commands(text, file_name=None):
 
47
    """Turn a script into a list of commands with their associated IOs.
 
48
 
 
49
    Each command appears on a line by itself starting with '$ '. It can be
 
50
    associated with an input that will feed it and an expected output.
 
51
 
 
52
    Comments starts with '#' until the end of line.
 
53
    Empty lines are ignored.
 
54
 
 
55
    Input and output are full lines terminated by a '\n'.
 
56
 
 
57
    Input lines start with '<'.
 
58
    Output lines start with nothing.
 
59
    Error lines start with '2>'.
 
60
 
 
61
    :return: A sequence of ([args], input, output, errors), where the args are
 
62
        split in to words, and the input, output, and errors are just strings,
 
63
        typically containing newlines.
 
64
    """
 
65
 
 
66
    commands = []
 
67
 
 
68
    def add_command(cmd, input, output, error):
 
69
        if cmd is not None:
 
70
            if input is not None:
 
71
                input = ''.join(input)
 
72
            if output is not None:
 
73
                output = ''.join(output)
 
74
            if error is not None:
 
75
                error = ''.join(error)
 
76
            commands.append((cmd, input, output, error))
 
77
 
 
78
    cmd_cur = None
 
79
    cmd_line = 1
 
80
    lineno = 0
 
81
    input, output, error = None, None, None
 
82
    text = textwrap.dedent(text)
 
83
    lines = text.split('\n')
 
84
    # to make use of triple-quoted strings easier, we ignore a blank line
 
85
    # right at the start and right at the end; the rest are meaningful
 
86
    if lines and lines[0] == '':
 
87
        del lines[0]
 
88
    if lines and lines[-1] == '':
 
89
        del lines[-1]
 
90
    for line in lines:
 
91
        lineno += 1
 
92
        # Keep a copy for error reporting
 
93
        orig = line
 
94
        comment =  line.find('#')
 
95
        if comment >= 0:
 
96
            # Delete comments
 
97
            # NB: this syntax means comments are allowed inside output, which
 
98
            # may be confusing...
 
99
            line = line[0:comment]
 
100
            line = line.rstrip()
 
101
            if line == '':
 
102
                continue
 
103
        if line.startswith('$'):
 
104
            # Time to output the current command
 
105
            add_command(cmd_cur, input, output, error)
 
106
            # And start a new one
 
107
            cmd_cur = list(split(line[1:]))
 
108
            cmd_line = lineno
 
109
            input, output, error = None, None, None
 
110
        elif line.startswith('<'):
 
111
            if input is None:
 
112
                if cmd_cur is None:
 
113
                    raise SyntaxError('No command for that input',
 
114
                                      (file_name, lineno, 1, orig))
 
115
                input = []
 
116
            input.append(line[1:] + '\n')
 
117
        elif line.startswith('2>'):
 
118
            if error is None:
 
119
                if cmd_cur is None:
 
120
                    raise SyntaxError('No command for that error',
 
121
                                      (file_name, lineno, 1, orig))
 
122
                error = []
 
123
            error.append(line[2:] + '\n')
 
124
        else:
 
125
            # can happen if the first line is not recognized as a command, eg
 
126
            # if the prompt has leading whitespace
 
127
            if output is None:
 
128
                if cmd_cur is None:
 
129
                    raise SyntaxError('No command for line %r' % (line,),
 
130
                                      (file_name, lineno, 1, orig))
 
131
                output = []
 
132
            output.append(line + '\n')
 
133
    # Add the last seen command
 
134
    add_command(cmd_cur, input, output, error)
 
135
    return commands
 
136
 
 
137
 
 
138
def _scan_redirection_options(args):
 
139
    """Recognize and process input and output redirections.
 
140
 
 
141
    :param args: The command line arguments
 
142
 
 
143
    :return: A tuple containing: 
 
144
        - The file name redirected from or None
 
145
        - The file name redirected to or None
 
146
        - The mode to open the output file or None
 
147
        - The reamining arguments
 
148
    """
 
149
    def redirected_file_name(direction, name, args):
 
150
        if name == '':
 
151
            try:
 
152
                name = args.pop(0)
 
153
            except IndexError:
 
154
                # We leave the error handling to higher levels, an empty name
 
155
                # can't be legal.
 
156
                name = ''
 
157
        return name
 
158
 
 
159
    remaining = []
 
160
    in_name = None
 
161
    out_name, out_mode = None, None
 
162
    while args:
 
163
        arg = args.pop(0)
 
164
        if arg.startswith('<'):
 
165
            in_name = redirected_file_name('<', arg[1:], args)
 
166
        elif arg.startswith('>>'):
 
167
            out_name = redirected_file_name('>>', arg[2:], args)
 
168
            out_mode = 'ab+'
 
169
        elif arg.startswith('>',):
 
170
            out_name = redirected_file_name('>', arg[1:], args)
 
171
            out_mode = 'wb+'
 
172
        else:
 
173
            remaining.append(arg)
 
174
    return in_name, out_name, out_mode, remaining
 
175
 
 
176
 
 
177
class ScriptRunner(object):
 
178
    """Run a shell-like script from a test.
 
179
    
 
180
    Can be used as:
 
181
 
 
182
    from bzrlib.tests import script
 
183
 
 
184
    ...
 
185
 
 
186
        def test_bug_nnnnn(self):
 
187
            sr = script.ScriptRunner()
 
188
            sr.run_script(self, '''
 
189
            $ bzr init
 
190
            $ bzr do-this
 
191
            # Boom, error
 
192
            ''')
 
193
    """
 
194
 
 
195
    def __init__(self):
 
196
        self.output_checker = doctest.OutputChecker()
 
197
        self.check_options = doctest.ELLIPSIS
 
198
 
 
199
    def run_script(self, test_case, text, null_output_matches_anything=False):
 
200
        """Run a shell-like script as a test.
 
201
 
 
202
        :param test_case: A TestCase instance that should provide the fail(),
 
203
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
204
            attribute used as a jail root.
 
205
 
 
206
        :param text: A shell-like script (see _script_to_commands for syntax).
 
207
 
 
208
        :param null_output_matches_anything: For commands with no specified
 
209
            output, ignore any output that does happen, including output on
 
210
            standard error.
 
211
        """
 
212
        self.null_output_matches_anything = null_output_matches_anything
 
213
        for cmd, input, output, error in _script_to_commands(text):
 
214
            self.run_command(test_case, cmd, input, output, error)
 
215
 
 
216
    def run_command(self, test_case, cmd, input, output, error):
 
217
        mname = 'do_' + cmd[0]
 
218
        method = getattr(self, mname, None)
 
219
        if method is None:
 
220
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
221
                              (None, 1, 1, ' '.join(cmd)))
 
222
        if input is None:
 
223
            str_input = ''
 
224
        else:
 
225
            str_input = ''.join(input)
 
226
        args = list(self._pre_process_args(cmd[1:]))
 
227
        retcode, actual_output, actual_error = method(test_case,
 
228
                                                      str_input, args)
 
229
 
 
230
        try:
 
231
            self._check_output(output, actual_output, test_case)
 
232
        except AssertionError, e:
 
233
            raise AssertionError(str(e) + " in stdout of command %s" % cmd)
 
234
        try:
 
235
            self._check_output(error, actual_error, test_case)
 
236
        except AssertionError, e:
 
237
            raise AssertionError(str(e) +
 
238
                " in stderr of running command %s" % cmd)
 
239
        if retcode and not error and actual_error:
 
240
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
241
                           % (' '.join(cmd), actual_error))
 
242
        return retcode, actual_output, actual_error
 
243
 
 
244
    def _check_output(self, expected, actual, test_case):
 
245
        if not actual:
 
246
            if expected is None:
 
247
                return
 
248
            elif expected == '...\n':
 
249
                return
 
250
            else:
 
251
                test_case.fail('expected output: %r, but found nothing'
 
252
                            % (expected,))
 
253
 
 
254
        null_output_matches_anything = getattr(
 
255
            self, 'null_output_matches_anything', False)
 
256
        if null_output_matches_anything and expected is None:
 
257
            return
 
258
 
 
259
        expected = expected or ''
 
260
        matching = self.output_checker.check_output(
 
261
            expected, actual, self.check_options)
 
262
        if not matching:
 
263
            # Note that we can't use output_checker.output_difference() here
 
264
            # because... the API is broken ('expected' must be a doctest
 
265
            # specific object of which a 'want' attribute will be our
 
266
            # 'expected' parameter. So we just fallback to our good old
 
267
            # assertEqualDiff since we know there *are* differences and the
 
268
            # output should be decently readable.
 
269
            #
 
270
            # As a special case, we allow output that's missing a final
 
271
            # newline to match an expected string that does have one, so that
 
272
            # we can match a prompt printed on one line, then input given on
 
273
            # the next line.
 
274
            if expected == actual + '\n':
 
275
                pass
 
276
            else:
 
277
                test_case.assertEqualDiff(expected, actual)
 
278
 
 
279
    def _pre_process_args(self, args):
 
280
        new_args = []
 
281
        for arg in args:
 
282
            # Strip the simple and double quotes since we don't care about
 
283
            # them.  We leave the backquotes in place though since they have a
 
284
            # different semantic.
 
285
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
286
                yield arg[1:-1]
 
287
            else:
 
288
                if glob.has_magic(arg):
 
289
                    matches = glob.glob(arg)
 
290
                    if matches:
 
291
                        # We care more about order stability than performance
 
292
                        # here
 
293
                        matches.sort()
 
294
                        for m in matches:
 
295
                            yield m
 
296
                else:
 
297
                    yield arg
 
298
 
 
299
    def _read_input(self, input, in_name):
 
300
        if in_name is not None:
 
301
            infile = open(in_name, 'rb')
 
302
            try:
 
303
                # Command redirection takes precedence over provided input
 
304
                input = infile.read()
 
305
            finally:
 
306
                infile.close()
 
307
        return input
 
308
 
 
309
    def _write_output(self, output, out_name, out_mode):
 
310
        if out_name is not None:
 
311
            outfile = open(out_name, out_mode)
 
312
            try:
 
313
                outfile.write(output)
 
314
            finally:
 
315
                outfile.close()
 
316
            output = None
 
317
        return output
 
318
 
 
319
    def do_bzr(self, test_case, input, args):
 
320
        retcode, out, err = test_case._run_bzr_core(
 
321
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
322
        return retcode, out, err
 
323
 
 
324
    def do_cat(self, test_case, input, args):
 
325
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
326
        if args and in_name is not None:
 
327
            raise SyntaxError('Specify a file OR use redirection')
 
328
 
 
329
        inputs = []
 
330
        if input:
 
331
            inputs.append(input)
 
332
        input_names = args
 
333
        if in_name:
 
334
            args.append(in_name)
 
335
        for in_name in input_names:
 
336
            try:
 
337
                inputs.append(self._read_input(None, in_name))
 
338
            except IOError, e:
 
339
                # Some filenames are illegal on Windows and generate EINVAL
 
340
                # rather than just saying the filename doesn't exist
 
341
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
342
                    return (1, None,
 
343
                            '%s: No such file or directory\n' % (in_name,))
 
344
                raise
 
345
        # Basically cat copy input to output
 
346
        output = ''.join(inputs)
 
347
        # Handle output redirections
 
348
        try:
 
349
            output = self._write_output(output, out_name, out_mode)
 
350
        except IOError, e:
 
351
            # If out_name cannot be created, we may get 'ENOENT', however if
 
352
            # out_name is something like '', we can get EINVAL
 
353
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
354
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
355
            raise
 
356
        return 0, output, None
 
357
 
 
358
    def do_echo(self, test_case, input, args):
 
359
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
360
        if input or in_name:
 
361
            raise SyntaxError('echo doesn\'t read from stdin')
 
362
        if args:
 
363
            input = ' '.join(args)
 
364
        # Always append a \n'
 
365
        input += '\n'
 
366
        # Process output
 
367
        output = input
 
368
        # Handle output redirections
 
369
        try:
 
370
            output = self._write_output(output, out_name, out_mode)
 
371
        except IOError, e:
 
372
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
373
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
374
            raise
 
375
        return 0, output, None
 
376
 
 
377
    def _get_jail_root(self, test_case):
 
378
        return test_case.test_dir
 
379
 
 
380
    def _ensure_in_jail(self, test_case, path):
 
381
        jail_root = self._get_jail_root(test_case)
 
382
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
383
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
384
 
 
385
    def do_cd(self, test_case, input, args):
 
386
        if len(args) > 1:
 
387
            raise SyntaxError('Usage: cd [dir]')
 
388
        if len(args) == 1:
 
389
            d = args[0]
 
390
            self._ensure_in_jail(test_case, d)
 
391
        else:
 
392
            # The test "home" directory is the root of its jail
 
393
            d = self._get_jail_root(test_case)
 
394
        os.chdir(d)
 
395
        return 0, None, None
 
396
 
 
397
    def do_mkdir(self, test_case, input, args):
 
398
        if not args or len(args) != 1:
 
399
            raise SyntaxError('Usage: mkdir dir')
 
400
        d = args[0]
 
401
        self._ensure_in_jail(test_case, d)
 
402
        os.mkdir(d)
 
403
        return 0, None, None
 
404
 
 
405
    def do_rm(self, test_case, input, args):
 
406
        err = None
 
407
 
 
408
        def error(msg, path):
 
409
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
410
 
 
411
        force, recursive = False, False
 
412
        opts = None
 
413
        if args and args[0][0] == '-':
 
414
            opts = args.pop(0)[1:]
 
415
            if 'f' in opts:
 
416
                force = True
 
417
                opts = opts.replace('f', '', 1)
 
418
            if 'r' in opts:
 
419
                recursive = True
 
420
                opts = opts.replace('r', '', 1)
 
421
        if not args or opts:
 
422
            raise SyntaxError('Usage: rm [-fr] path+')
 
423
        for p in args:
 
424
            self._ensure_in_jail(test_case, p)
 
425
            # FIXME: Should we put that in osutils ?
 
426
            try:
 
427
                os.remove(p)
 
428
            except OSError, e:
 
429
                # Various OSes raises different exceptions (linux: EISDIR,
 
430
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
431
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
432
                    if recursive:
 
433
                        osutils.rmtree(p)
 
434
                    else:
 
435
                        err = error('Is a directory', p)
 
436
                        break
 
437
                elif e.errno == errno.ENOENT:
 
438
                    if not force:
 
439
                        err =  error('No such file or directory', p)
 
440
                        break
 
441
                else:
 
442
                    raise
 
443
        if err:
 
444
            retcode = 1
 
445
        else:
 
446
            retcode = 0
 
447
        return retcode, None, err
 
448
 
 
449
    def do_mv(self, test_case, input, args):
 
450
        err = None
 
451
        def error(msg, src, dst):
 
452
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
 
453
 
 
454
        if not args or len(args) != 2:
 
455
            raise SyntaxError("Usage: mv path1 path2")
 
456
        src, dst = args
 
457
        try:
 
458
            real_dst = dst
 
459
            if os.path.isdir(dst):
 
460
                real_dst = os.path.join(dst, os.path.basename(src))
 
461
            os.rename(src, real_dst)
 
462
        except OSError, e:
 
463
            if e.errno == errno.ENOENT:
 
464
                err = error('No such file or directory', src, dst)
 
465
            else:
 
466
                raise
 
467
        if err:
 
468
            retcode = 1
 
469
        else:
 
470
            retcode = 0
 
471
        return retcode, None, err
 
472
 
 
473
 
 
474
 
 
475
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
476
    """Helper class to experiment shell-like test and memory fs.
 
477
 
 
478
    This not intended to be used outside of experiments in implementing memoy
 
479
    based file systems and evolving bzr so that test can use only memory based
 
480
    resources.
 
481
    """
 
482
 
 
483
    def setUp(self):
 
484
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
485
        self.script_runner = ScriptRunner()
 
486
 
 
487
    def run_script(self, script, null_output_matches_anything=False):
 
488
        return self.script_runner.run_script(self, script, 
 
489
                   null_output_matches_anything=null_output_matches_anything)
 
490
 
 
491
    def run_command(self, cmd, input, output, error):
 
492
        return self.script_runner.run_command(self, cmd, input, output, error)
 
493
 
 
494
 
 
495
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
496
    """Helper class to quickly define shell-like tests.
 
497
 
 
498
    Can be used as:
 
499
 
 
500
    from bzrlib.tests import script
 
501
 
 
502
 
 
503
    class TestBug(script.TestCaseWithTransportAndScript):
 
504
 
 
505
        def test_bug_nnnnn(self):
 
506
            self.run_script('''
 
507
            $ bzr init
 
508
            $ bzr do-this
 
509
            # Boom, error
 
510
            ''')
 
511
    """
 
512
 
 
513
    def setUp(self):
 
514
        super(TestCaseWithTransportAndScript, self).setUp()
 
515
        self.script_runner = ScriptRunner()
 
516
 
 
517
    def run_script(self, script, null_output_matches_anything=False):
 
518
        return self.script_runner.run_script(self, script,
 
519
                   null_output_matches_anything=null_output_matches_anything)
 
520
 
 
521
    def run_command(self, cmd, input, output, error):
 
522
        return self.script_runner.run_command(self, cmd, input, output, error)
 
523
 
 
524
 
 
525
def run_script(test_case, script_string, null_output_matches_anything=False):
 
526
    """Run the given script within a testcase"""
 
527
    return ScriptRunner().run_script(test_case, script_string,
 
528
               null_output_matches_anything=null_output_matches_anything)
 
529