~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

  • Committer: Vincent Ladeuil
  • Date: 2010-10-26 08:08:23 UTC
  • mfrom: (5514.1.1 665100-content-type)
  • mto: This revision was merged to the branch mainline in revision 5516.
  • Revision ID: v.ladeuil+lp@free.fr-20101026080823-3wggo03b7cpn9908
Correctly set the Content-Type header when POSTing http requests

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009, 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
import textwrap
 
28
from cStringIO import StringIO
 
29
 
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    tests,
 
34
    )
 
35
 
 
36
 
 
37
def split(s):
 
38
    """Split a command line respecting quotes."""
 
39
    scanner = shlex.shlex(s)
 
40
    scanner.quotes = '\'"`'
 
41
    scanner.whitespace_split = True
 
42
    for t in list(scanner):
 
43
        yield t
 
44
 
 
45
 
 
46
def _script_to_commands(text, file_name=None):
 
47
    """Turn a script into a list of commands with their associated IOs.
 
48
 
 
49
    Each command appears on a line by itself starting with '$ '. It can be
 
50
    associated with an input that will feed it and an expected output.
 
51
 
 
52
    Comments starts with '#' until the end of line.
 
53
    Empty lines are ignored.
 
54
 
 
55
    Input and output are full lines terminated by a '\n'.
 
56
 
 
57
    Input lines start with '<'.
 
58
    Output lines start with nothing.
 
59
    Error lines start with '2>'.
 
60
 
 
61
    :return: A sequence of ([args], input, output, errors), where the args are
 
62
        split in to words, and the input, output, and errors are just strings,
 
63
        typically containing newlines.
 
64
    """
 
65
 
 
66
    commands = []
 
67
 
 
68
    def add_command(cmd, input, output, error):
 
69
        if cmd is not None:
 
70
            if input is not None:
 
71
                input = ''.join(input)
 
72
            if output is not None:
 
73
                output = ''.join(output)
 
74
            if error is not None:
 
75
                error = ''.join(error)
 
76
            commands.append((cmd, input, output, error))
 
77
 
 
78
    cmd_cur = None
 
79
    cmd_line = 1
 
80
    lineno = 0
 
81
    input, output, error = None, None, None
 
82
    text = textwrap.dedent(text)
 
83
    lines = text.split('\n')
 
84
    # to make use of triple-quoted strings easier, we ignore a blank line
 
85
    # right at the start and right at the end; the rest are meaningful
 
86
    if lines and lines[0] == '':
 
87
        del lines[0]
 
88
    if lines and lines[-1] == '':
 
89
        del lines[-1]
 
90
    for line in lines:
 
91
        lineno += 1
 
92
        # Keep a copy for error reporting
 
93
        orig = line
 
94
        comment =  line.find('#')
 
95
        if comment >= 0:
 
96
            # Delete comments
 
97
            # NB: this syntax means comments are allowed inside output, which
 
98
            # may be confusing...
 
99
            line = line[0:comment]
 
100
            line = line.rstrip()
 
101
            if line == '':
 
102
                continue
 
103
        if line.startswith('$'):
 
104
            # Time to output the current command
 
105
            add_command(cmd_cur, input, output, error)
 
106
            # And start a new one
 
107
            cmd_cur = list(split(line[1:]))
 
108
            cmd_line = lineno
 
109
            input, output, error = None, None, None
 
110
        elif line.startswith('<'):
 
111
            if input is None:
 
112
                if cmd_cur is None:
 
113
                    raise SyntaxError('No command for that input',
 
114
                                      (file_name, lineno, 1, orig))
 
115
                input = []
 
116
            input.append(line[1:] + '\n')
 
117
        elif line.startswith('2>'):
 
118
            if error is None:
 
119
                if cmd_cur is None:
 
120
                    raise SyntaxError('No command for that error',
 
121
                                      (file_name, lineno, 1, orig))
 
122
                error = []
 
123
            error.append(line[2:] + '\n')
 
124
        else:
 
125
            # can happen if the first line is not recognized as a command, eg
 
126
            # if the prompt has leading whitespace
 
127
            if output is None:
 
128
                if cmd_cur is None:
 
129
                    raise SyntaxError('No command for line %r' % (line,),
 
130
                                      (file_name, lineno, 1, orig))
 
131
                output = []
 
132
            output.append(line + '\n')
 
133
    # Add the last seen command
 
134
    add_command(cmd_cur, input, output, error)
 
135
    return commands
 
136
 
 
137
 
 
138
def _scan_redirection_options(args):
 
139
    """Recognize and process input and output redirections.
 
140
 
 
141
    :param args: The command line arguments
 
142
 
 
143
    :return: A tuple containing: 
 
144
        - The file name redirected from or None
 
145
        - The file name redirected to or None
 
146
        - The mode to open the output file or None
 
147
        - The reamining arguments
 
148
    """
 
149
    def redirected_file_name(direction, name, args):
 
150
        if name == '':
 
151
            try:
 
152
                name = args.pop(0)
 
153
            except IndexError:
 
154
                # We leave the error handling to higher levels, an empty name
 
155
                # can't be legal.
 
156
                name = ''
 
157
        return name
 
158
 
 
159
    remaining = []
 
160
    in_name = None
 
161
    out_name, out_mode = None, None
 
162
    while args:
 
163
        arg = args.pop(0)
 
164
        if arg.startswith('<'):
 
165
            in_name = redirected_file_name('<', arg[1:], args)
 
166
        elif arg.startswith('>>'):
 
167
            out_name = redirected_file_name('>>', arg[2:], args)
 
168
            out_mode = 'ab+'
 
169
        elif arg.startswith('>',):
 
170
            out_name = redirected_file_name('>', arg[1:], args)
 
171
            out_mode = 'wb+'
 
172
        else:
 
173
            remaining.append(arg)
 
174
    return in_name, out_name, out_mode, remaining
 
175
 
 
176
 
 
177
class ScriptRunner(object):
 
178
    """Run a shell-like script from a test.
 
179
    
 
180
    Can be used as:
 
181
 
 
182
    from bzrlib.tests import script
 
183
 
 
184
    ...
 
185
 
 
186
        def test_bug_nnnnn(self):
 
187
            sr = script.ScriptRunner()
 
188
            sr.run_script(self, '''
 
189
            $ bzr init
 
190
            $ bzr do-this
 
191
            # Boom, error
 
192
            ''')
 
193
    """
 
194
 
 
195
    def __init__(self):
 
196
        self.output_checker = doctest.OutputChecker()
 
197
        self.check_options = doctest.ELLIPSIS
 
198
 
 
199
    def run_script(self, test_case, text):
 
200
        """Run a shell-like script as a test.
 
201
 
 
202
        :param test_case: A TestCase instance that should provide the fail(),
 
203
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
204
            attribute used as a jail root.
 
205
 
 
206
        :param text: A shell-like script (see _script_to_commands for syntax).
 
207
        """
 
208
        for cmd, input, output, error in _script_to_commands(text):
 
209
            self.run_command(test_case, cmd, input, output, error)
 
210
 
 
211
    def run_command(self, test_case, cmd, input, output, error):
 
212
        mname = 'do_' + cmd[0]
 
213
        method = getattr(self, mname, None)
 
214
        if method is None:
 
215
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
216
                              None, 1, ' '.join(cmd))
 
217
        if input is None:
 
218
            str_input = ''
 
219
        else:
 
220
            str_input = ''.join(input)
 
221
        args = list(self._pre_process_args(cmd[1:]))
 
222
        retcode, actual_output, actual_error = method(test_case,
 
223
                                                      str_input, args)
 
224
 
 
225
        try:
 
226
            self._check_output(output, actual_output, test_case)
 
227
        except AssertionError, e:
 
228
            raise AssertionError(str(e) + " in stdout of command %s" % cmd)
 
229
        try:
 
230
            self._check_output(error, actual_error, test_case)
 
231
        except AssertionError, e:
 
232
            raise AssertionError(str(e) +
 
233
                " in stderr of running command %s" % cmd)
 
234
        if retcode and not error and actual_error:
 
235
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
236
                           % (' '.join(cmd), actual_error))
 
237
        return retcode, actual_output, actual_error
 
238
 
 
239
    def _check_output(self, expected, actual, test_case):
 
240
        if not actual:
 
241
            if expected is None:
 
242
                return
 
243
            elif expected == '...\n':
 
244
                return
 
245
            else:
 
246
                test_case.fail('expected output: %r, but found nothing'
 
247
                            % (expected,))
 
248
        expected = expected or ''
 
249
        matching = self.output_checker.check_output(
 
250
            expected, actual, self.check_options)
 
251
        if not matching:
 
252
            # Note that we can't use output_checker.output_difference() here
 
253
            # because... the API is broken ('expected' must be a doctest
 
254
            # specific object of which a 'want' attribute will be our
 
255
            # 'expected' parameter. So we just fallback to our good old
 
256
            # assertEqualDiff since we know there *are* differences and the
 
257
            # output should be decently readable.
 
258
            #
 
259
            # As a special case, we allow output that's missing a final
 
260
            # newline to match an expected string that does have one, so that
 
261
            # we can match a prompt printed on one line, then input given on
 
262
            # the next line.
 
263
            if expected == actual + '\n':
 
264
                pass
 
265
            else:
 
266
                test_case.assertEqualDiff(expected, actual)
 
267
 
 
268
    def _pre_process_args(self, args):
 
269
        new_args = []
 
270
        for arg in args:
 
271
            # Strip the simple and double quotes since we don't care about
 
272
            # them.  We leave the backquotes in place though since they have a
 
273
            # different semantic.
 
274
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
275
                yield arg[1:-1]
 
276
            else:
 
277
                if glob.has_magic(arg):
 
278
                    matches = glob.glob(arg)
 
279
                    if matches:
 
280
                        # We care more about order stability than performance
 
281
                        # here
 
282
                        matches.sort()
 
283
                        for m in matches:
 
284
                            yield m
 
285
                else:
 
286
                    yield arg
 
287
 
 
288
    def _read_input(self, input, in_name):
 
289
        if in_name is not None:
 
290
            infile = open(in_name, 'rb')
 
291
            try:
 
292
                # Command redirection takes precedence over provided input
 
293
                input = infile.read()
 
294
            finally:
 
295
                infile.close()
 
296
        return input
 
297
 
 
298
    def _write_output(self, output, out_name, out_mode):
 
299
        if out_name is not None:
 
300
            outfile = open(out_name, out_mode)
 
301
            try:
 
302
                outfile.write(output)
 
303
            finally:
 
304
                outfile.close()
 
305
            output = None
 
306
        return output
 
307
 
 
308
    def do_bzr(self, test_case, input, args):
 
309
        retcode, out, err = test_case._run_bzr_core(
 
310
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
311
        return retcode, out, err
 
312
 
 
313
    def do_cat(self, test_case, input, args):
 
314
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
315
        if args and in_name is not None:
 
316
            raise SyntaxError('Specify a file OR use redirection')
 
317
 
 
318
        inputs = []
 
319
        if input:
 
320
            inputs.append(input)
 
321
        input_names = args
 
322
        if in_name:
 
323
            args.append(in_name)
 
324
        for in_name in input_names:
 
325
            try:
 
326
                inputs.append(self._read_input(None, in_name))
 
327
            except IOError, e:
 
328
                # Some filenames are illegal on Windows and generate EINVAL
 
329
                # rather than just saying the filename doesn't exist
 
330
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
331
                    return (1, None,
 
332
                            '%s: No such file or directory\n' % (in_name,))
 
333
                raise
 
334
        # Basically cat copy input to output
 
335
        output = ''.join(inputs)
 
336
        # Handle output redirections
 
337
        try:
 
338
            output = self._write_output(output, out_name, out_mode)
 
339
        except IOError, e:
 
340
            # If out_name cannot be created, we may get 'ENOENT', however if
 
341
            # out_name is something like '', we can get EINVAL
 
342
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
343
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
344
            raise
 
345
        return 0, output, None
 
346
 
 
347
    def do_echo(self, test_case, input, args):
 
348
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
349
        if input or in_name:
 
350
            raise SyntaxError('echo doesn\'t read from stdin')
 
351
        if args:
 
352
            input = ' '.join(args)
 
353
        # Always append a \n'
 
354
        input += '\n'
 
355
        # Process output
 
356
        output = input
 
357
        # Handle output redirections
 
358
        try:
 
359
            output = self._write_output(output, out_name, out_mode)
 
360
        except IOError, e:
 
361
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
362
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
363
            raise
 
364
        return 0, output, None
 
365
 
 
366
    def _get_jail_root(self, test_case):
 
367
        return test_case.test_dir
 
368
 
 
369
    def _ensure_in_jail(self, test_case, path):
 
370
        jail_root = self._get_jail_root(test_case)
 
371
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
372
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
373
 
 
374
    def do_cd(self, test_case, input, args):
 
375
        if len(args) > 1:
 
376
            raise SyntaxError('Usage: cd [dir]')
 
377
        if len(args) == 1:
 
378
            d = args[0]
 
379
            self._ensure_in_jail(test_case, d)
 
380
        else:
 
381
            # The test "home" directory is the root of its jail
 
382
            d = self._get_jail_root(test_case)
 
383
        os.chdir(d)
 
384
        return 0, None, None
 
385
 
 
386
    def do_mkdir(self, test_case, input, args):
 
387
        if not args or len(args) != 1:
 
388
            raise SyntaxError('Usage: mkdir dir')
 
389
        d = args[0]
 
390
        self._ensure_in_jail(test_case, d)
 
391
        os.mkdir(d)
 
392
        return 0, None, None
 
393
 
 
394
    def do_rm(self, test_case, input, args):
 
395
        err = None
 
396
 
 
397
        def error(msg, path):
 
398
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
399
 
 
400
        force, recursive = False, False
 
401
        opts = None
 
402
        if args and args[0][0] == '-':
 
403
            opts = args.pop(0)[1:]
 
404
            if 'f' in opts:
 
405
                force = True
 
406
                opts = opts.replace('f', '', 1)
 
407
            if 'r' in opts:
 
408
                recursive = True
 
409
                opts = opts.replace('r', '', 1)
 
410
        if not args or opts:
 
411
            raise SyntaxError('Usage: rm [-fr] path+')
 
412
        for p in args:
 
413
            self._ensure_in_jail(test_case, p)
 
414
            # FIXME: Should we put that in osutils ?
 
415
            try:
 
416
                os.remove(p)
 
417
            except OSError, e:
 
418
                # Various OSes raises different exceptions (linux: EISDIR,
 
419
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
420
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
421
                    if recursive:
 
422
                        osutils.rmtree(p)
 
423
                    else:
 
424
                        err = error('Is a directory', p)
 
425
                        break
 
426
                elif e.errno == errno.ENOENT:
 
427
                    if not force:
 
428
                        err =  error('No such file or directory', p)
 
429
                        break
 
430
                else:
 
431
                    raise
 
432
        if err:
 
433
            retcode = 1
 
434
        else:
 
435
            retcode = 0
 
436
        return retcode, None, err
 
437
 
 
438
    def do_mv(self, test_case, input, args):
 
439
        err = None
 
440
        def error(msg, src, dst):
 
441
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
 
442
 
 
443
        if not args or len(args) != 2:
 
444
            raise SyntaxError("Usage: mv path1 path2")
 
445
        src, dst = args
 
446
        try:
 
447
            real_dst = dst
 
448
            if os.path.isdir(dst):
 
449
                real_dst = os.path.join(dst, os.path.basename(src))
 
450
            os.rename(src, real_dst)
 
451
        except OSError, e:
 
452
            if e.errno == errno.ENOENT:
 
453
                err = error('No such file or directory', src, dst)
 
454
            else:
 
455
                raise
 
456
        if err:
 
457
            retcode = 1
 
458
        else:
 
459
            retcode = 0
 
460
        return retcode, None, err
 
461
 
 
462
 
 
463
 
 
464
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
465
    """Helper class to experiment shell-like test and memory fs.
 
466
 
 
467
    This not intended to be used outside of experiments in implementing memoy
 
468
    based file systems and evolving bzr so that test can use only memory based
 
469
    resources.
 
470
    """
 
471
 
 
472
    def setUp(self):
 
473
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
474
        self.script_runner = ScriptRunner()
 
475
 
 
476
    def run_script(self, script):
 
477
        return self.script_runner.run_script(self, script)
 
478
 
 
479
    def run_command(self, cmd, input, output, error):
 
480
        return self.script_runner.run_command(self, cmd, input, output, error)
 
481
 
 
482
 
 
483
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
484
    """Helper class to quickly define shell-like tests.
 
485
 
 
486
    Can be used as:
 
487
 
 
488
    from bzrlib.tests import script
 
489
 
 
490
 
 
491
    class TestBug(script.TestCaseWithTransportAndScript):
 
492
 
 
493
        def test_bug_nnnnn(self):
 
494
            self.run_script('''
 
495
            $ bzr init
 
496
            $ bzr do-this
 
497
            # Boom, error
 
498
            ''')
 
499
    """
 
500
 
 
501
    def setUp(self):
 
502
        super(TestCaseWithTransportAndScript, self).setUp()
 
503
        self.script_runner = ScriptRunner()
 
504
 
 
505
    def run_script(self, script):
 
506
        return self.script_runner.run_script(self, script)
 
507
 
 
508
    def run_command(self, cmd, input, output, error):
 
509
        return self.script_runner.run_command(self, cmd, input, output, error)
 
510
 
 
511
 
 
512
def run_script(test_case, script_string):
 
513
    """Run the given script within a testcase"""
 
514
    return ScriptRunner().run_script(test_case, script_string)
 
515