~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

Turn completion assertions into separate methods.

Many common assertions used to be expressed as arguments to the complete
method.  This makes the checks more explicit, and the code easier to read.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2009, 2010, 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Shell-like test scripts.
18
 
 
19
 
See developers/testing.html for more explanations.
20
 
"""
21
 
 
22
 
import doctest
23
 
import errno
24
 
import glob
25
 
import os
26
 
import shlex
27
 
import textwrap
28
 
 
29
 
from bzrlib import (
30
 
    osutils,
31
 
    tests,
32
 
    )
33
 
 
34
 
 
35
 
def split(s):
36
 
    """Split a command line respecting quotes."""
37
 
    scanner = shlex.shlex(s)
38
 
    scanner.quotes = '\'"`'
39
 
    scanner.whitespace_split = True
40
 
    for t in list(scanner):
41
 
        yield t
42
 
 
43
 
 
44
 
def _script_to_commands(text, file_name=None):
45
 
    """Turn a script into a list of commands with their associated IOs.
46
 
 
47
 
    Each command appears on a line by itself starting with '$ '. It can be
48
 
    associated with an input that will feed it and an expected output.
49
 
 
50
 
    Comments starts with '#' until the end of line.
51
 
    Empty lines are ignored.
52
 
 
53
 
    Input and output are full lines terminated by a '\n'.
54
 
 
55
 
    Input lines start with '<'.
56
 
    Output lines start with nothing.
57
 
    Error lines start with '2>'.
58
 
 
59
 
    :return: A sequence of ([args], input, output, errors), where the args are
60
 
        split in to words, and the input, output, and errors are just strings,
61
 
        typically containing newlines.
62
 
    """
63
 
 
64
 
    commands = []
65
 
 
66
 
    def add_command(cmd, input, output, error):
67
 
        if cmd is not None:
68
 
            if input is not None:
69
 
                input = ''.join(input)
70
 
            if output is not None:
71
 
                output = ''.join(output)
72
 
            if error is not None:
73
 
                error = ''.join(error)
74
 
            commands.append((cmd, input, output, error))
75
 
 
76
 
    cmd_cur = None
77
 
    cmd_line = 1
78
 
    lineno = 0
79
 
    input, output, error = None, None, None
80
 
    text = textwrap.dedent(text)
81
 
    lines = text.split('\n')
82
 
    # to make use of triple-quoted strings easier, we ignore a blank line
83
 
    # right at the start and right at the end; the rest are meaningful
84
 
    if lines and lines[0] == '':
85
 
        del lines[0]
86
 
    if lines and lines[-1] == '':
87
 
        del lines[-1]
88
 
    for line in lines:
89
 
        lineno += 1
90
 
        # Keep a copy for error reporting
91
 
        orig = line
92
 
        comment =  line.find('#')
93
 
        if comment >= 0:
94
 
            # Delete comments
95
 
            # NB: this syntax means comments are allowed inside output, which
96
 
            # may be confusing...
97
 
            line = line[0:comment]
98
 
            line = line.rstrip()
99
 
            if line == '':
100
 
                continue
101
 
        if line.startswith('$'):
102
 
            # Time to output the current command
103
 
            add_command(cmd_cur, input, output, error)
104
 
            # And start a new one
105
 
            cmd_cur = list(split(line[1:]))
106
 
            cmd_line = lineno
107
 
            input, output, error = None, None, None
108
 
        elif line.startswith('<'):
109
 
            if input is None:
110
 
                if cmd_cur is None:
111
 
                    raise SyntaxError('No command for that input',
112
 
                                      (file_name, lineno, 1, orig))
113
 
                input = []
114
 
            input.append(line[1:] + '\n')
115
 
        elif line.startswith('2>'):
116
 
            if error is None:
117
 
                if cmd_cur is None:
118
 
                    raise SyntaxError('No command for that error',
119
 
                                      (file_name, lineno, 1, orig))
120
 
                error = []
121
 
            error.append(line[2:] + '\n')
122
 
        else:
123
 
            # can happen if the first line is not recognized as a command, eg
124
 
            # if the prompt has leading whitespace
125
 
            if output is None:
126
 
                if cmd_cur is None:
127
 
                    raise SyntaxError('No command for line %r' % (line,),
128
 
                                      (file_name, lineno, 1, orig))
129
 
                output = []
130
 
            output.append(line + '\n')
131
 
    # Add the last seen command
132
 
    add_command(cmd_cur, input, output, error)
133
 
    return commands
134
 
 
135
 
 
136
 
def _scan_redirection_options(args):
137
 
    """Recognize and process input and output redirections.
138
 
 
139
 
    :param args: The command line arguments
140
 
 
141
 
    :return: A tuple containing: 
142
 
        - The file name redirected from or None
143
 
        - The file name redirected to or None
144
 
        - The mode to open the output file or None
145
 
        - The reamining arguments
146
 
    """
147
 
    def redirected_file_name(direction, name, args):
148
 
        if name == '':
149
 
            try:
150
 
                name = args.pop(0)
151
 
            except IndexError:
152
 
                # We leave the error handling to higher levels, an empty name
153
 
                # can't be legal.
154
 
                name = ''
155
 
        return name
156
 
 
157
 
    remaining = []
158
 
    in_name = None
159
 
    out_name, out_mode = None, None
160
 
    while args:
161
 
        arg = args.pop(0)
162
 
        if arg.startswith('<'):
163
 
            in_name = redirected_file_name('<', arg[1:], args)
164
 
        elif arg.startswith('>>'):
165
 
            out_name = redirected_file_name('>>', arg[2:], args)
166
 
            out_mode = 'ab+'
167
 
        elif arg.startswith('>',):
168
 
            out_name = redirected_file_name('>', arg[1:], args)
169
 
            out_mode = 'wb+'
170
 
        else:
171
 
            remaining.append(arg)
172
 
    return in_name, out_name, out_mode, remaining
173
 
 
174
 
 
175
 
class ScriptRunner(object):
176
 
    """Run a shell-like script from a test.
177
 
    
178
 
    Can be used as:
179
 
 
180
 
    from bzrlib.tests import script
181
 
 
182
 
    ...
183
 
 
184
 
        def test_bug_nnnnn(self):
185
 
            sr = script.ScriptRunner()
186
 
            sr.run_script(self, '''
187
 
            $ bzr init
188
 
            $ bzr do-this
189
 
            # Boom, error
190
 
            ''')
191
 
    """
192
 
 
193
 
    def __init__(self):
194
 
        self.output_checker = doctest.OutputChecker()
195
 
        self.check_options = doctest.ELLIPSIS
196
 
 
197
 
    def run_script(self, test_case, text, null_output_matches_anything=False):
198
 
        """Run a shell-like script as a test.
199
 
 
200
 
        :param test_case: A TestCase instance that should provide the fail(),
201
 
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
202
 
            attribute used as a jail root.
203
 
 
204
 
        :param text: A shell-like script (see _script_to_commands for syntax).
205
 
 
206
 
        :param null_output_matches_anything: For commands with no specified
207
 
            output, ignore any output that does happen, including output on
208
 
            standard error.
209
 
        """
210
 
        self.null_output_matches_anything = null_output_matches_anything
211
 
        for cmd, input, output, error in _script_to_commands(text):
212
 
            self.run_command(test_case, cmd, input, output, error)
213
 
 
214
 
    def run_command(self, test_case, cmd, input, output, error):
215
 
        mname = 'do_' + cmd[0]
216
 
        method = getattr(self, mname, None)
217
 
        if method is None:
218
 
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
219
 
                              (None, 1, 1, ' '.join(cmd)))
220
 
        if input is None:
221
 
            str_input = ''
222
 
        else:
223
 
            str_input = ''.join(input)
224
 
        args = list(self._pre_process_args(cmd[1:]))
225
 
        retcode, actual_output, actual_error = method(test_case,
226
 
                                                      str_input, args)
227
 
 
228
 
        try:
229
 
            self._check_output(output, actual_output, test_case)
230
 
        except AssertionError, e:
231
 
            raise AssertionError(str(e) + " in stdout of command %s" % cmd)
232
 
        try:
233
 
            self._check_output(error, actual_error, test_case)
234
 
        except AssertionError, e:
235
 
            raise AssertionError(str(e) +
236
 
                " in stderr of running command %s" % cmd)
237
 
        if retcode and not error and actual_error:
238
 
            test_case.fail('In \n\t%s\nUnexpected error: %s'
239
 
                           % (' '.join(cmd), actual_error))
240
 
        return retcode, actual_output, actual_error
241
 
 
242
 
    def _check_output(self, expected, actual, test_case):
243
 
        if not actual:
244
 
            if expected is None:
245
 
                return
246
 
            elif expected == '...\n':
247
 
                return
248
 
            else:
249
 
                test_case.fail('expected output: %r, but found nothing'
250
 
                            % (expected,))
251
 
 
252
 
        null_output_matches_anything = getattr(
253
 
            self, 'null_output_matches_anything', False)
254
 
        if null_output_matches_anything and expected is None:
255
 
            return
256
 
 
257
 
        expected = expected or ''
258
 
        matching = self.output_checker.check_output(
259
 
            expected, actual, self.check_options)
260
 
        if not matching:
261
 
            # Note that we can't use output_checker.output_difference() here
262
 
            # because... the API is broken ('expected' must be a doctest
263
 
            # specific object of which a 'want' attribute will be our
264
 
            # 'expected' parameter. So we just fallback to our good old
265
 
            # assertEqualDiff since we know there *are* differences and the
266
 
            # output should be decently readable.
267
 
            #
268
 
            # As a special case, we allow output that's missing a final
269
 
            # newline to match an expected string that does have one, so that
270
 
            # we can match a prompt printed on one line, then input given on
271
 
            # the next line.
272
 
            if expected == actual + '\n':
273
 
                pass
274
 
            else:
275
 
                test_case.assertEqualDiff(expected, actual)
276
 
 
277
 
    def _pre_process_args(self, args):
278
 
        new_args = []
279
 
        for arg in args:
280
 
            # Strip the simple and double quotes since we don't care about
281
 
            # them.  We leave the backquotes in place though since they have a
282
 
            # different semantic.
283
 
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
284
 
                yield arg[1:-1]
285
 
            else:
286
 
                if glob.has_magic(arg):
287
 
                    matches = glob.glob(arg)
288
 
                    if matches:
289
 
                        # We care more about order stability than performance
290
 
                        # here
291
 
                        matches.sort()
292
 
                        for m in matches:
293
 
                            yield m
294
 
                else:
295
 
                    yield arg
296
 
 
297
 
    def _read_input(self, input, in_name):
298
 
        if in_name is not None:
299
 
            infile = open(in_name, 'rb')
300
 
            try:
301
 
                # Command redirection takes precedence over provided input
302
 
                input = infile.read()
303
 
            finally:
304
 
                infile.close()
305
 
        return input
306
 
 
307
 
    def _write_output(self, output, out_name, out_mode):
308
 
        if out_name is not None:
309
 
            outfile = open(out_name, out_mode)
310
 
            try:
311
 
                outfile.write(output)
312
 
            finally:
313
 
                outfile.close()
314
 
            output = None
315
 
        return output
316
 
 
317
 
    def do_bzr(self, test_case, input, args):
318
 
        retcode, out, err = test_case._run_bzr_core(
319
 
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
320
 
        return retcode, out, err
321
 
 
322
 
    def do_cat(self, test_case, input, args):
323
 
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
324
 
        if args and in_name is not None:
325
 
            raise SyntaxError('Specify a file OR use redirection')
326
 
 
327
 
        inputs = []
328
 
        if input:
329
 
            inputs.append(input)
330
 
        input_names = args
331
 
        if in_name:
332
 
            args.append(in_name)
333
 
        for in_name in input_names:
334
 
            try:
335
 
                inputs.append(self._read_input(None, in_name))
336
 
            except IOError, e:
337
 
                # Some filenames are illegal on Windows and generate EINVAL
338
 
                # rather than just saying the filename doesn't exist
339
 
                if e.errno in (errno.ENOENT, errno.EINVAL):
340
 
                    return (1, None,
341
 
                            '%s: No such file or directory\n' % (in_name,))
342
 
                raise
343
 
        # Basically cat copy input to output
344
 
        output = ''.join(inputs)
345
 
        # Handle output redirections
346
 
        try:
347
 
            output = self._write_output(output, out_name, out_mode)
348
 
        except IOError, e:
349
 
            # If out_name cannot be created, we may get 'ENOENT', however if
350
 
            # out_name is something like '', we can get EINVAL
351
 
            if e.errno in (errno.ENOENT, errno.EINVAL):
352
 
                return 1, None, '%s: No such file or directory\n' % (out_name,)
353
 
            raise
354
 
        return 0, output, None
355
 
 
356
 
    def do_echo(self, test_case, input, args):
357
 
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
358
 
        if input or in_name:
359
 
            raise SyntaxError('echo doesn\'t read from stdin')
360
 
        if args:
361
 
            input = ' '.join(args)
362
 
        # Always append a \n'
363
 
        input += '\n'
364
 
        # Process output
365
 
        output = input
366
 
        # Handle output redirections
367
 
        try:
368
 
            output = self._write_output(output, out_name, out_mode)
369
 
        except IOError, e:
370
 
            if e.errno in (errno.ENOENT, errno.EINVAL):
371
 
                return 1, None, '%s: No such file or directory\n' % (out_name,)
372
 
            raise
373
 
        return 0, output, None
374
 
 
375
 
    def _get_jail_root(self, test_case):
376
 
        return test_case.test_dir
377
 
 
378
 
    def _ensure_in_jail(self, test_case, path):
379
 
        jail_root = self._get_jail_root(test_case)
380
 
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
381
 
            raise ValueError('%s is not inside %s' % (path, jail_root))
382
 
 
383
 
    def do_cd(self, test_case, input, args):
384
 
        if len(args) > 1:
385
 
            raise SyntaxError('Usage: cd [dir]')
386
 
        if len(args) == 1:
387
 
            d = args[0]
388
 
            self._ensure_in_jail(test_case, d)
389
 
        else:
390
 
            # The test "home" directory is the root of its jail
391
 
            d = self._get_jail_root(test_case)
392
 
        os.chdir(d)
393
 
        return 0, None, None
394
 
 
395
 
    def do_mkdir(self, test_case, input, args):
396
 
        if not args or len(args) != 1:
397
 
            raise SyntaxError('Usage: mkdir dir')
398
 
        d = args[0]
399
 
        self._ensure_in_jail(test_case, d)
400
 
        os.mkdir(d)
401
 
        return 0, None, None
402
 
 
403
 
    def do_rm(self, test_case, input, args):
404
 
        err = None
405
 
 
406
 
        def error(msg, path):
407
 
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
408
 
 
409
 
        force, recursive = False, False
410
 
        opts = None
411
 
        if args and args[0][0] == '-':
412
 
            opts = args.pop(0)[1:]
413
 
            if 'f' in opts:
414
 
                force = True
415
 
                opts = opts.replace('f', '', 1)
416
 
            if 'r' in opts:
417
 
                recursive = True
418
 
                opts = opts.replace('r', '', 1)
419
 
        if not args or opts:
420
 
            raise SyntaxError('Usage: rm [-fr] path+')
421
 
        for p in args:
422
 
            self._ensure_in_jail(test_case, p)
423
 
            # FIXME: Should we put that in osutils ?
424
 
            try:
425
 
                os.remove(p)
426
 
            except OSError, e:
427
 
                # Various OSes raises different exceptions (linux: EISDIR,
428
 
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
429
 
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
430
 
                    if recursive:
431
 
                        osutils.rmtree(p)
432
 
                    else:
433
 
                        err = error('Is a directory', p)
434
 
                        break
435
 
                elif e.errno == errno.ENOENT:
436
 
                    if not force:
437
 
                        err =  error('No such file or directory', p)
438
 
                        break
439
 
                else:
440
 
                    raise
441
 
        if err:
442
 
            retcode = 1
443
 
        else:
444
 
            retcode = 0
445
 
        return retcode, None, err
446
 
 
447
 
    def do_mv(self, test_case, input, args):
448
 
        err = None
449
 
        def error(msg, src, dst):
450
 
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
451
 
 
452
 
        if not args or len(args) != 2:
453
 
            raise SyntaxError("Usage: mv path1 path2")
454
 
        src, dst = args
455
 
        try:
456
 
            real_dst = dst
457
 
            if os.path.isdir(dst):
458
 
                real_dst = os.path.join(dst, os.path.basename(src))
459
 
            os.rename(src, real_dst)
460
 
        except OSError, e:
461
 
            if e.errno == errno.ENOENT:
462
 
                err = error('No such file or directory', src, dst)
463
 
            else:
464
 
                raise
465
 
        if err:
466
 
            retcode = 1
467
 
        else:
468
 
            retcode = 0
469
 
        return retcode, None, err
470
 
 
471
 
 
472
 
 
473
 
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
474
 
    """Helper class to experiment shell-like test and memory fs.
475
 
 
476
 
    This not intended to be used outside of experiments in implementing memoy
477
 
    based file systems and evolving bzr so that test can use only memory based
478
 
    resources.
479
 
    """
480
 
 
481
 
    def setUp(self):
482
 
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
483
 
        self.script_runner = ScriptRunner()
484
 
 
485
 
    def run_script(self, script, null_output_matches_anything=False):
486
 
        return self.script_runner.run_script(self, script, 
487
 
                   null_output_matches_anything=null_output_matches_anything)
488
 
 
489
 
    def run_command(self, cmd, input, output, error):
490
 
        return self.script_runner.run_command(self, cmd, input, output, error)
491
 
 
492
 
 
493
 
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
494
 
    """Helper class to quickly define shell-like tests.
495
 
 
496
 
    Can be used as:
497
 
 
498
 
    from bzrlib.tests import script
499
 
 
500
 
 
501
 
    class TestBug(script.TestCaseWithTransportAndScript):
502
 
 
503
 
        def test_bug_nnnnn(self):
504
 
            self.run_script('''
505
 
            $ bzr init
506
 
            $ bzr do-this
507
 
            # Boom, error
508
 
            ''')
509
 
    """
510
 
 
511
 
    def setUp(self):
512
 
        super(TestCaseWithTransportAndScript, self).setUp()
513
 
        self.script_runner = ScriptRunner()
514
 
 
515
 
    def run_script(self, script, null_output_matches_anything=False):
516
 
        return self.script_runner.run_script(self, script,
517
 
                   null_output_matches_anything=null_output_matches_anything)
518
 
 
519
 
    def run_command(self, cmd, input, output, error):
520
 
        return self.script_runner.run_command(self, cmd, input, output, error)
521
 
 
522
 
 
523
 
def run_script(test_case, script_string, null_output_matches_anything=False):
524
 
    """Run the given script within a testcase"""
525
 
    return ScriptRunner().run_script(test_case, script_string,
526
 
               null_output_matches_anything=null_output_matches_anything)
527