~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

(jameinel) Allow 'bzr serve' to interpret SIGHUP as a graceful shutdown.
 (bug #795025) (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009, 2010, 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
import textwrap
 
28
 
 
29
from bzrlib import (
 
30
    osutils,
 
31
    tests,
 
32
    )
 
33
 
 
34
 
 
35
def split(s):
 
36
    """Split a command line respecting quotes."""
 
37
    scanner = shlex.shlex(s)
 
38
    scanner.quotes = '\'"`'
 
39
    scanner.whitespace_split = True
 
40
    for t in list(scanner):
 
41
        yield t
 
42
 
 
43
 
 
44
def _script_to_commands(text, file_name=None):
 
45
    """Turn a script into a list of commands with their associated IOs.
 
46
 
 
47
    Each command appears on a line by itself starting with '$ '. It can be
 
48
    associated with an input that will feed it and an expected output.
 
49
 
 
50
    Comments starts with '#' until the end of line.
 
51
    Empty lines are ignored.
 
52
 
 
53
    Input and output are full lines terminated by a '\n'.
 
54
 
 
55
    Input lines start with '<'.
 
56
    Output lines start with nothing.
 
57
    Error lines start with '2>'.
 
58
 
 
59
    :return: A sequence of ([args], input, output, errors), where the args are
 
60
        split in to words, and the input, output, and errors are just strings,
 
61
        typically containing newlines.
 
62
    """
 
63
 
 
64
    commands = []
 
65
 
 
66
    def add_command(cmd, input, output, error):
 
67
        if cmd is not None:
 
68
            if input is not None:
 
69
                input = ''.join(input)
 
70
            if output is not None:
 
71
                output = ''.join(output)
 
72
            if error is not None:
 
73
                error = ''.join(error)
 
74
            commands.append((cmd, input, output, error))
 
75
 
 
76
    cmd_cur = None
 
77
    cmd_line = 1
 
78
    lineno = 0
 
79
    input, output, error = None, None, None
 
80
    text = textwrap.dedent(text)
 
81
    lines = text.split('\n')
 
82
    # to make use of triple-quoted strings easier, we ignore a blank line
 
83
    # right at the start and right at the end; the rest are meaningful
 
84
    if lines and lines[0] == '':
 
85
        del lines[0]
 
86
    if lines and lines[-1] == '':
 
87
        del lines[-1]
 
88
    for line in lines:
 
89
        lineno += 1
 
90
        # Keep a copy for error reporting
 
91
        orig = line
 
92
        comment =  line.find('#')
 
93
        if comment >= 0:
 
94
            # Delete comments
 
95
            # NB: this syntax means comments are allowed inside output, which
 
96
            # may be confusing...
 
97
            line = line[0:comment]
 
98
            line = line.rstrip()
 
99
            if line == '':
 
100
                continue
 
101
        if line.startswith('$'):
 
102
            # Time to output the current command
 
103
            add_command(cmd_cur, input, output, error)
 
104
            # And start a new one
 
105
            cmd_cur = list(split(line[1:]))
 
106
            cmd_line = lineno
 
107
            input, output, error = None, None, None
 
108
        elif line.startswith('<'):
 
109
            if input is None:
 
110
                if cmd_cur is None:
 
111
                    raise SyntaxError('No command for that input',
 
112
                                      (file_name, lineno, 1, orig))
 
113
                input = []
 
114
            input.append(line[1:] + '\n')
 
115
        elif line.startswith('2>'):
 
116
            if error is None:
 
117
                if cmd_cur is None:
 
118
                    raise SyntaxError('No command for that error',
 
119
                                      (file_name, lineno, 1, orig))
 
120
                error = []
 
121
            error.append(line[2:] + '\n')
 
122
        else:
 
123
            # can happen if the first line is not recognized as a command, eg
 
124
            # if the prompt has leading whitespace
 
125
            if output is None:
 
126
                if cmd_cur is None:
 
127
                    raise SyntaxError('No command for line %r' % (line,),
 
128
                                      (file_name, lineno, 1, orig))
 
129
                output = []
 
130
            output.append(line + '\n')
 
131
    # Add the last seen command
 
132
    add_command(cmd_cur, input, output, error)
 
133
    return commands
 
134
 
 
135
 
 
136
def _scan_redirection_options(args):
 
137
    """Recognize and process input and output redirections.
 
138
 
 
139
    :param args: The command line arguments
 
140
 
 
141
    :return: A tuple containing: 
 
142
        - The file name redirected from or None
 
143
        - The file name redirected to or None
 
144
        - The mode to open the output file or None
 
145
        - The reamining arguments
 
146
    """
 
147
    def redirected_file_name(direction, name, args):
 
148
        if name == '':
 
149
            try:
 
150
                name = args.pop(0)
 
151
            except IndexError:
 
152
                # We leave the error handling to higher levels, an empty name
 
153
                # can't be legal.
 
154
                name = ''
 
155
        return name
 
156
 
 
157
    remaining = []
 
158
    in_name = None
 
159
    out_name, out_mode = None, None
 
160
    while args:
 
161
        arg = args.pop(0)
 
162
        if arg.startswith('<'):
 
163
            in_name = redirected_file_name('<', arg[1:], args)
 
164
        elif arg.startswith('>>'):
 
165
            out_name = redirected_file_name('>>', arg[2:], args)
 
166
            out_mode = 'ab+'
 
167
        elif arg.startswith('>',):
 
168
            out_name = redirected_file_name('>', arg[1:], args)
 
169
            out_mode = 'wb+'
 
170
        else:
 
171
            remaining.append(arg)
 
172
    return in_name, out_name, out_mode, remaining
 
173
 
 
174
 
 
175
class ScriptRunner(object):
 
176
    """Run a shell-like script from a test.
 
177
    
 
178
    Can be used as:
 
179
 
 
180
    from bzrlib.tests import script
 
181
 
 
182
    ...
 
183
 
 
184
        def test_bug_nnnnn(self):
 
185
            sr = script.ScriptRunner()
 
186
            sr.run_script(self, '''
 
187
            $ bzr init
 
188
            $ bzr do-this
 
189
            # Boom, error
 
190
            ''')
 
191
    """
 
192
 
 
193
    def __init__(self):
 
194
        self.output_checker = doctest.OutputChecker()
 
195
        self.check_options = doctest.ELLIPSIS
 
196
 
 
197
    def run_script(self, test_case, text, null_output_matches_anything=False):
 
198
        """Run a shell-like script as a test.
 
199
 
 
200
        :param test_case: A TestCase instance that should provide the fail(),
 
201
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
202
            attribute used as a jail root.
 
203
 
 
204
        :param text: A shell-like script (see _script_to_commands for syntax).
 
205
 
 
206
        :param null_output_matches_anything: For commands with no specified
 
207
            output, ignore any output that does happen, including output on
 
208
            standard error.
 
209
        """
 
210
        self.null_output_matches_anything = null_output_matches_anything
 
211
        for cmd, input, output, error in _script_to_commands(text):
 
212
            self.run_command(test_case, cmd, input, output, error)
 
213
 
 
214
    def run_command(self, test_case, cmd, input, output, error):
 
215
        mname = 'do_' + cmd[0]
 
216
        method = getattr(self, mname, None)
 
217
        if method is None:
 
218
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
219
                              (None, 1, 1, ' '.join(cmd)))
 
220
        if input is None:
 
221
            str_input = ''
 
222
        else:
 
223
            str_input = ''.join(input)
 
224
        args = list(self._pre_process_args(cmd[1:]))
 
225
        retcode, actual_output, actual_error = method(test_case,
 
226
                                                      str_input, args)
 
227
 
 
228
        try:
 
229
            self._check_output(output, actual_output, test_case)
 
230
        except AssertionError, e:
 
231
            raise AssertionError(str(e) + " in stdout of command %s" % cmd)
 
232
        try:
 
233
            self._check_output(error, actual_error, test_case)
 
234
        except AssertionError, e:
 
235
            raise AssertionError(str(e) +
 
236
                " in stderr of running command %s" % cmd)
 
237
        if retcode and not error and actual_error:
 
238
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
239
                           % (' '.join(cmd), actual_error))
 
240
        return retcode, actual_output, actual_error
 
241
 
 
242
    def _check_output(self, expected, actual, test_case):
 
243
        if not actual:
 
244
            if expected is None:
 
245
                return
 
246
            elif expected == '...\n':
 
247
                return
 
248
            else:
 
249
                test_case.fail('expected output: %r, but found nothing'
 
250
                            % (expected,))
 
251
 
 
252
        null_output_matches_anything = getattr(
 
253
            self, 'null_output_matches_anything', False)
 
254
        if null_output_matches_anything and expected is None:
 
255
            return
 
256
 
 
257
        expected = expected or ''
 
258
        matching = self.output_checker.check_output(
 
259
            expected, actual, self.check_options)
 
260
        if not matching:
 
261
            # Note that we can't use output_checker.output_difference() here
 
262
            # because... the API is broken ('expected' must be a doctest
 
263
            # specific object of which a 'want' attribute will be our
 
264
            # 'expected' parameter. So we just fallback to our good old
 
265
            # assertEqualDiff since we know there *are* differences and the
 
266
            # output should be decently readable.
 
267
            #
 
268
            # As a special case, we allow output that's missing a final
 
269
            # newline to match an expected string that does have one, so that
 
270
            # we can match a prompt printed on one line, then input given on
 
271
            # the next line.
 
272
            if expected == actual + '\n':
 
273
                pass
 
274
            else:
 
275
                test_case.assertEqualDiff(expected, actual)
 
276
 
 
277
    def _pre_process_args(self, args):
 
278
        new_args = []
 
279
        for arg in args:
 
280
            # Strip the simple and double quotes since we don't care about
 
281
            # them.  We leave the backquotes in place though since they have a
 
282
            # different semantic.
 
283
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
284
                yield arg[1:-1]
 
285
            else:
 
286
                if glob.has_magic(arg):
 
287
                    matches = glob.glob(arg)
 
288
                    if matches:
 
289
                        # We care more about order stability than performance
 
290
                        # here
 
291
                        matches.sort()
 
292
                        for m in matches:
 
293
                            yield m
 
294
                else:
 
295
                    yield arg
 
296
 
 
297
    def _read_input(self, input, in_name):
 
298
        if in_name is not None:
 
299
            infile = open(in_name, 'rb')
 
300
            try:
 
301
                # Command redirection takes precedence over provided input
 
302
                input = infile.read()
 
303
            finally:
 
304
                infile.close()
 
305
        return input
 
306
 
 
307
    def _write_output(self, output, out_name, out_mode):
 
308
        if out_name is not None:
 
309
            outfile = open(out_name, out_mode)
 
310
            try:
 
311
                outfile.write(output)
 
312
            finally:
 
313
                outfile.close()
 
314
            output = None
 
315
        return output
 
316
 
 
317
    def do_bzr(self, test_case, input, args):
 
318
        retcode, out, err = test_case._run_bzr_core(
 
319
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
320
        return retcode, out, err
 
321
 
 
322
    def do_cat(self, test_case, input, args):
 
323
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
324
        if args and in_name is not None:
 
325
            raise SyntaxError('Specify a file OR use redirection')
 
326
 
 
327
        inputs = []
 
328
        if input:
 
329
            inputs.append(input)
 
330
        input_names = args
 
331
        if in_name:
 
332
            args.append(in_name)
 
333
        for in_name in input_names:
 
334
            try:
 
335
                inputs.append(self._read_input(None, in_name))
 
336
            except IOError, e:
 
337
                # Some filenames are illegal on Windows and generate EINVAL
 
338
                # rather than just saying the filename doesn't exist
 
339
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
340
                    return (1, None,
 
341
                            '%s: No such file or directory\n' % (in_name,))
 
342
                raise
 
343
        # Basically cat copy input to output
 
344
        output = ''.join(inputs)
 
345
        # Handle output redirections
 
346
        try:
 
347
            output = self._write_output(output, out_name, out_mode)
 
348
        except IOError, e:
 
349
            # If out_name cannot be created, we may get 'ENOENT', however if
 
350
            # out_name is something like '', we can get EINVAL
 
351
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
352
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
353
            raise
 
354
        return 0, output, None
 
355
 
 
356
    def do_echo(self, test_case, input, args):
 
357
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
358
        if input or in_name:
 
359
            raise SyntaxError('echo doesn\'t read from stdin')
 
360
        if args:
 
361
            input = ' '.join(args)
 
362
        # Always append a \n'
 
363
        input += '\n'
 
364
        # Process output
 
365
        output = input
 
366
        # Handle output redirections
 
367
        try:
 
368
            output = self._write_output(output, out_name, out_mode)
 
369
        except IOError, e:
 
370
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
371
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
372
            raise
 
373
        return 0, output, None
 
374
 
 
375
    def _get_jail_root(self, test_case):
 
376
        return test_case.test_dir
 
377
 
 
378
    def _ensure_in_jail(self, test_case, path):
 
379
        jail_root = self._get_jail_root(test_case)
 
380
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
381
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
382
 
 
383
    def do_cd(self, test_case, input, args):
 
384
        if len(args) > 1:
 
385
            raise SyntaxError('Usage: cd [dir]')
 
386
        if len(args) == 1:
 
387
            d = args[0]
 
388
            self._ensure_in_jail(test_case, d)
 
389
        else:
 
390
            # The test "home" directory is the root of its jail
 
391
            d = self._get_jail_root(test_case)
 
392
        os.chdir(d)
 
393
        return 0, None, None
 
394
 
 
395
    def do_mkdir(self, test_case, input, args):
 
396
        if not args or len(args) != 1:
 
397
            raise SyntaxError('Usage: mkdir dir')
 
398
        d = args[0]
 
399
        self._ensure_in_jail(test_case, d)
 
400
        os.mkdir(d)
 
401
        return 0, None, None
 
402
 
 
403
    def do_rm(self, test_case, input, args):
 
404
        err = None
 
405
 
 
406
        def error(msg, path):
 
407
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
408
 
 
409
        force, recursive = False, False
 
410
        opts = None
 
411
        if args and args[0][0] == '-':
 
412
            opts = args.pop(0)[1:]
 
413
            if 'f' in opts:
 
414
                force = True
 
415
                opts = opts.replace('f', '', 1)
 
416
            if 'r' in opts:
 
417
                recursive = True
 
418
                opts = opts.replace('r', '', 1)
 
419
        if not args or opts:
 
420
            raise SyntaxError('Usage: rm [-fr] path+')
 
421
        for p in args:
 
422
            self._ensure_in_jail(test_case, p)
 
423
            # FIXME: Should we put that in osutils ?
 
424
            try:
 
425
                os.remove(p)
 
426
            except OSError, e:
 
427
                # Various OSes raises different exceptions (linux: EISDIR,
 
428
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
429
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
430
                    if recursive:
 
431
                        osutils.rmtree(p)
 
432
                    else:
 
433
                        err = error('Is a directory', p)
 
434
                        break
 
435
                elif e.errno == errno.ENOENT:
 
436
                    if not force:
 
437
                        err =  error('No such file or directory', p)
 
438
                        break
 
439
                else:
 
440
                    raise
 
441
        if err:
 
442
            retcode = 1
 
443
        else:
 
444
            retcode = 0
 
445
        return retcode, None, err
 
446
 
 
447
    def do_mv(self, test_case, input, args):
 
448
        err = None
 
449
        def error(msg, src, dst):
 
450
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
 
451
 
 
452
        if not args or len(args) != 2:
 
453
            raise SyntaxError("Usage: mv path1 path2")
 
454
        src, dst = args
 
455
        try:
 
456
            real_dst = dst
 
457
            if os.path.isdir(dst):
 
458
                real_dst = os.path.join(dst, os.path.basename(src))
 
459
            os.rename(src, real_dst)
 
460
        except OSError, e:
 
461
            if e.errno == errno.ENOENT:
 
462
                err = error('No such file or directory', src, dst)
 
463
            else:
 
464
                raise
 
465
        if err:
 
466
            retcode = 1
 
467
        else:
 
468
            retcode = 0
 
469
        return retcode, None, err
 
470
 
 
471
 
 
472
 
 
473
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
474
    """Helper class to experiment shell-like test and memory fs.
 
475
 
 
476
    This not intended to be used outside of experiments in implementing memoy
 
477
    based file systems and evolving bzr so that test can use only memory based
 
478
    resources.
 
479
    """
 
480
 
 
481
    def setUp(self):
 
482
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
483
        self.script_runner = ScriptRunner()
 
484
 
 
485
    def run_script(self, script, null_output_matches_anything=False):
 
486
        return self.script_runner.run_script(self, script, 
 
487
                   null_output_matches_anything=null_output_matches_anything)
 
488
 
 
489
    def run_command(self, cmd, input, output, error):
 
490
        return self.script_runner.run_command(self, cmd, input, output, error)
 
491
 
 
492
 
 
493
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
494
    """Helper class to quickly define shell-like tests.
 
495
 
 
496
    Can be used as:
 
497
 
 
498
    from bzrlib.tests import script
 
499
 
 
500
 
 
501
    class TestBug(script.TestCaseWithTransportAndScript):
 
502
 
 
503
        def test_bug_nnnnn(self):
 
504
            self.run_script('''
 
505
            $ bzr init
 
506
            $ bzr do-this
 
507
            # Boom, error
 
508
            ''')
 
509
    """
 
510
 
 
511
    def setUp(self):
 
512
        super(TestCaseWithTransportAndScript, self).setUp()
 
513
        self.script_runner = ScriptRunner()
 
514
 
 
515
    def run_script(self, script, null_output_matches_anything=False):
 
516
        return self.script_runner.run_script(self, script,
 
517
                   null_output_matches_anything=null_output_matches_anything)
 
518
 
 
519
    def run_command(self, cmd, input, output, error):
 
520
        return self.script_runner.run_command(self, cmd, input, output, error)
 
521
 
 
522
 
 
523
def run_script(test_case, script_string, null_output_matches_anything=False):
 
524
    """Run the given script within a testcase"""
 
525
    return ScriptRunner().run_script(test_case, script_string,
 
526
               null_output_matches_anything=null_output_matches_anything)
 
527