~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/arch/backends/forkexec.py

  • Committer: Aaron Bentley
  • Date: 2005-06-07 18:52:04 UTC
  • Revision ID: abentley@panoramicfeedback.com-20050607185204-5fc1f0e3d393b909
Added NEWS, obsoleted bzr-pull

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# arch-tag: 5b9da267-93df-418e-bf4a-47fb9ec7f6de
2
 
# Copyright (C) 2004 Canonical Ltd.
3
 
#               Author: David Allouche <david@canonical.com>
4
 
#
5
 
#    This program is free software; you can redistribute it and/or modify
6
 
#    it under the terms of the GNU General Public License as published by
7
 
#    the Free Software Foundation; either version 2 of the License, or
8
 
#    (at your option) any later version.
9
 
#
10
 
#    This program is distributed in the hope that it will be useful,
11
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
#    GNU General Public License for more details.
14
 
#
15
 
#    You should have received a copy of the GNU General Public License
16
 
#    along with this program; if not, write to the Free Software
17
 
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
 
 
19
 
"""PyArch specific process spawning
20
 
"""
21
 
 
22
 
import sys
23
 
import os
24
 
import threading
25
 
from fai.arch.errors import ExecProblem
26
 
import commandline
27
 
 
28
 
### Spawning strategy
29
 
 
30
 
class PyArchSpawningStrategy(commandline.SpawningStrategy):
31
 
 
32
 
    def __init__(self, command, logger):
33
 
        self._command = command
34
 
        self._logger = logger
35
 
 
36
 
    def sequence_cmd(self, args, chdir=None, expected=(0,)):
37
 
        return _pyarch_sequence_cmd(
38
 
            self._command, args, chdir, expected, self._logger)
39
 
 
40
 
    def status_cmd(self, args, chdir, expected):
41
 
        return exec_safe(
42
 
            self._command, args, expected=expected, chdir=chdir,
43
 
            logger=self._logger)
44
 
 
45
 
    def status_text_cmd(self, args, chdir, expected):
46
 
        return exec_safe_status_stdout(
47
 
            self._command, args, expected=expected, chdir=chdir,
48
 
            logger=self._logger)
49
 
 
50
 
 
51
 
class _pyarch_sequence_cmd(object):
52
 
    def __init__(self, command, args, chdir, expected, logger):
53
 
        self._proc = exec_safe_iter_stdout(
54
 
            command, args, chdir=chdir, expected=expected, logger=logger)
55
 
    def __iter__(self):
56
 
        return self
57
 
    def next(self):
58
 
        line = self._proc.next()
59
 
        return line.rstrip('\n')
60
 
    def _get_finished(self):
61
 
        return self._proc.finished
62
 
    finished = property(_get_finished)
63
 
    def _get_status(self):
64
 
        return self._proc.status
65
 
    status = property(_get_status)
66
 
 
67
 
 
68
 
### Process handling commands
69
 
 
70
 
def exec_safe(program, args = [], stdout = None, stdin = None,
71
 
              stderr = None, expected = 0, chdir = None, logger = None):
72
 
    """Fork/exec a process and and raises an exception if the program
73
 
    died with a signal or returned an error code other than expected.
74
 
    This function will always wait."""
75
 
    proc = ChildProcess(program, args, expected, chdir, logger)
76
 
    proc.spawn(stdout, stdin, stderr)
77
 
    proc.wait()
78
 
    return proc.status
79
 
 
80
 
 
81
 
def exec_safe_status_stdout(program, args = [], expected = 0,
82
 
                            chdir = None, logger = None):
83
 
    """Run the specified program and return a tuple of two items:
84
 
    1. exit status of the program;
85
 
    2. output of the program as a single string.
86
 
    """
87
 
    read_end, write_end = os.pipe()
88
 
    proc = ChildProcess(program, args, expected, chdir, logger)
89
 
    proc.spawn(stdout=write_end)
90
 
    os.close(write_end)
91
 
    fd = os.fdopen(read_end, 'r')
92
 
    output = fd.read()
93
 
    proc.wait()
94
 
    return (proc.status, output)
95
 
 
96
 
 
97
 
class exec_safe_iter_stdout(object):
98
 
 
99
 
    """Iterator over the output of a child process.
100
 
 
101
 
    Fork/exec a process with its output piped. Each iteration will
102
 
    cause a iteration of the process output pipe. The pipe is properly
103
 
    closed whenever the output is exhausted or the iterator is
104
 
    deleted.
105
 
 
106
 
    If the output is exhausted, the process exit status is checked and
107
 
    an ExecError exception will be raised for abnormal or unexpected
108
 
    exit status.
109
 
    """
110
 
 
111
 
    def __init__(self, program, args=[], stdin=None, stderr=None,
112
 
                 expected=0, delsignal=None, chdir=None, logger=None):
113
 
        self.delsignal = delsignal
114
 
        self._read_file, self._pid = None, None
115
 
        read_end, write_end = os.pipe()
116
 
        self.proc = ChildProcess(program, args, expected, chdir, logger)
117
 
        try:
118
 
            self.proc.spawn(stdout=write_end, stdin=stdin, stderr=stderr,
119
 
                            closefds=[read_end])
120
 
            self._pid = self.proc.pid
121
 
        except:
122
 
            os.close(write_end)
123
 
            self.errlog = '<deleted>'
124
 
            os.close(read_end)
125
 
            raise
126
 
        self._read_file = os.fdopen(read_end, 'r', 0)
127
 
        os.close(write_end)
128
 
 
129
 
    def __del__(self):
130
 
        """Destructor. If needed, close the pipe and wait the child process.
131
 
 
132
 
        If child process has already been joined, it means the iterator was
133
 
        deleted before being exhausted. It is assumed the return status is not
134
 
        cared about.
135
 
        """
136
 
        if self._pid is not None:
137
 
            if self._read_file is not None:
138
 
                self._read_file.close()
139
 
            # Some buggy programs (e.g. ls -R) do not properly
140
 
            # terminate when their output pipe is broken. They
141
 
            # must be killed by an appropriate signal before
142
 
            # waiting. SIGINT should be okay most of the time.
143
 
            if self.delsignal:
144
 
                os.kill(self.proc.pid, self.delsignal)
145
 
            self.proc.wait_nocheck()
146
 
 
147
 
    def _get_finished(self):
148
 
        """Whether the underlying process has terminated."""
149
 
        return self.proc.status is not None or self.proc.signal is not None
150
 
    finished = property(_get_finished)
151
 
 
152
 
    def _get_status(self):
153
 
        """Exit status of the underlying process.
154
 
 
155
 
        Raises ValueError if the process has not yet finished. (Hm...
156
 
        should be AttributeError). Can be None if the process was
157
 
        killed by a signal.
158
 
        """
159
 
        if self.proc.status is None:
160
 
            raise ValueError, "no status, process has not finished"
161
 
        return self.proc.status
162
 
    status = property(_get_status)
163
 
 
164
 
    def close(self):
165
 
        """Close the pipe and wait the child process.
166
 
 
167
 
        If the output is not exhausted yet, you should be prepared to
168
 
        handle the error condition caused by breaking the pipe.
169
 
        """
170
 
        self._read_file.close()
171
 
        try:
172
 
            self.proc.wait()
173
 
            self._pid = None
174
 
            return self.proc.status
175
 
        except ExecProblem:
176
 
            self._pid = None
177
 
            raise
178
 
 
179
 
    def __iter__(self):
180
 
        """Iterator. Return self."""
181
 
        return self
182
 
 
183
 
    def next(self):
184
 
        """Iteration method. Iterate on the pipe file.
185
 
 
186
 
        Close the pipe and wait the child process once output is exhausted.
187
 
 
188
 
        Use `file.readline` instead of `file.next` because we want
189
 
        maximal responsiveness to incremental output. The pipe
190
 
        mechanism itself provides buffering.
191
 
        """
192
 
        try:
193
 
            line = self._read_file.readline()
194
 
        except ValueError:
195
 
            if self._pid is None: raise StopIteration
196
 
            else: raise
197
 
        if line:
198
 
            return line
199
 
        else:
200
 
            self.close()
201
 
            raise StopIteration
202
 
 
203
 
 
204
 
### Low-level process handling
205
 
 
206
 
class ChildProcess:
207
 
 
208
 
    """Description of a child process, for error handling."""
209
 
 
210
 
    def __init__(self, command, args, expected=0, chdir=None, logger=None):
211
 
        """Create a child process descriptor.
212
 
 
213
 
        The child process must have already been created. The
214
 
        `command`, `args` and `expected` parameters are used for error
215
 
        reporting.
216
 
 
217
 
        :param command: name of the child process executable
218
 
        :type command: str
219
 
        :param args: child process arguments (argv)
220
 
        :type args: sequence of str
221
 
        :param expected: valid exit status values
222
 
        :type expected: int or sequence of int
223
 
        """
224
 
        self.pid = None
225
 
        self.command = command
226
 
        self.args = tuple(args)
227
 
        if isinstance(expected, int): expected = (expected,)
228
 
        self.expected = expected
229
 
        self.signal = None
230
 
        self.status = None
231
 
        self.error = None
232
 
        self.chdir = chdir
233
 
        self._logger = logger
234
 
        self._outlog = None
235
 
        self._errlog = None
236
 
 
237
 
    def spawn(self, stdout=None, stdin=None, stderr=None, closefds=[]):
238
 
        """Fork, setup file descriptors, and exec child process.
239
 
 
240
 
        The `stdout`, `stdin` and `stderr` file-like objects can be
241
 
        raw file descriptors (ints) or file-like objects with a
242
 
        ``fileno`` method returning a file descriptor.
243
 
 
244
 
        When building a pipe, one side of a pipe is used as `stdout`,
245
 
        `stdin` or `stderr`, and the other is included in the
246
 
        `closefds` list, so the child process will be able to detect a
247
 
        broken pipe.
248
 
 
249
 
        :param stdin: use as standard input of child process, defaults
250
 
            to ``/dev/null``.
251
 
        :type stdin: file-like object with a ``fileno`` method or raw
252
 
            file descriptor (``int``).
253
 
        :param stdout: use as standard output of child process,
254
 
            defaults to internal pipe or ``/dev/null``.
255
 
        :type stdout: file-like object with a ``fileno`` method or raw
256
 
            file descriptor (``int``). If a logger was specified,
257
 
            defaults to a pipe for logging, if no logger was
258
 
            specified, defaults to ``/dev/null``.
259
 
        :param stderr: use as standard error of child process.
260
 
            Defaults to a pipe for error reporting (see `ExecProblem`)
261
 
            and logging.
262
 
        :type stderr: file-like object with a ``fileno`` method or raw
263
 
            file descriptor (``int``).
264
 
        :param closefds: file descriptors to close in the child process.
265
 
        :type closefds: iterable of int
266
 
        """
267
 
        __pychecker__ = 'no-moddefvalue'
268
 
        if self.pid is not None:
269
 
            raise ValueError, "child process was already spawned"
270
 
        if self._logger is not None:
271
 
            self._logger.log_command(self.command, self.args)
272
 
            if stdout is None:
273
 
                self._outlog = stdout = StringOutput()
274
 
        if stderr is None:
275
 
            self._errlog = stderr = StringOutput()
276
 
        self.pid = os.fork()
277
 
        if self.pid:
278
 
            return # the parent process, nothing more to do
279
 
        try:
280
 
            source_fds = [stdin, stdout, stderr]
281
 
            closefds = list(closefds)
282
 
            for i in range(3):
283
 
                if source_fds[i] is None:
284
 
                    source_fds[i] = getnull()
285
 
                if hasattr(source_fds[i], 'fileno'):
286
 
                    source_fds[i] = source_fds[i].fileno()
287
 
            # multiple dup2 can step their own toes if a source fd is smaller
288
 
            # than its target fd so start by duplicating low fds
289
 
            for i in range(1, 3):
290
 
                if source_fds[i] is not None:
291
 
                    while source_fds[i] < i:
292
 
                        closefds.append(source_fds[i])
293
 
                        source_fds[i] = os.dup(source_fds[i])
294
 
            # must close before dup2, because closefds may contain
295
 
            # values in the range 0..2
296
 
            for fd in closefds:
297
 
                os.close(fd)
298
 
            # finally, move the given fds to their final location
299
 
            for dest, source in enumerate(source_fds):
300
 
                if source is not None and source != dest:
301
 
                    os.dup2(source, dest)
302
 
                    if source not in source_fds[dest+1:]:
303
 
                        os.close(source)
304
 
 
305
 
            if self.chdir:
306
 
                os.chdir(self.chdir)
307
 
            os.execvp(self.command, (self.command,) + self.args)
308
 
        except:
309
 
            sys.excepthook(*sys.exc_info())
310
 
        sys.exit(255)
311
 
 
312
 
    def wait_nocheck(self):
313
 
        """Wait for process and return exit result.
314
 
 
315
 
        This (internally used) variant of `wait` is useful when you
316
 
        want to wait for a child process, but not raise an exception
317
 
        if it was terminated abnormally. Typically, that's what you
318
 
        want if you killed the child process.
319
 
 
320
 
        :return: second element of the tuple returned by os.wait()
321
 
        """
322
 
        if self._outlog is not None:
323
 
            self._logger.log_output(self._outlog.read())
324
 
        if self._errlog is not None:
325
 
            self.error = self._errlog.read()
326
 
            if self._logger is not None:
327
 
                self._logger.log_error(self.error)
328
 
        return os.waitpid(self.pid, 0)[1]
329
 
 
330
 
    def wait(self):
331
 
        """Wait for process and check its termination status.
332
 
 
333
 
        If the process exited, set ``self.status`` to its exit status.
334
 
 
335
 
        if the process was terminated by a signal, set ``self.signal``
336
 
        to the value of this signal.
337
 
 
338
 
        :raise ExecProblem: process was killed by a signal or exit
339
 
            status is not is ``self.expected``
340
 
        """
341
 
        result = self.wait_nocheck()
342
 
        if os.WIFSIGNALED(result):
343
 
            self.signal = os.WTERMSIG(result)
344
 
        if os.WIFEXITED(result):
345
 
            self.status = os.WEXITSTATUS(result)
346
 
        if not os.WIFEXITED(result):
347
 
            raise ExecProblem(self)
348
 
        if os.WEXITSTATUS(result) not in self.expected:
349
 
            raise ExecProblem(self)
350
 
 
351
 
 
352
 
nulldev = None
353
 
 
354
 
def getnull():
355
 
    """Return a file object of /dev/null/ opened  for writing."""
356
 
    global nulldev
357
 
    if not nulldev:
358
 
        nulldev = open("/dev/null", "w+")
359
 
    return nulldev
360
 
 
361
 
 
362
 
threadcount = 0
363
 
 
364
 
class StringOutput(object):
365
 
 
366
 
    """Memory buffer storing a pipe output asynchronously."""
367
 
 
368
 
    def __init__(self):
369
 
        read_end, self.write_end = os.pipe()
370
 
        self.readfile = os.fdopen(read_end, 'r', 0)
371
 
        self.thread = threading.Thread(target=self.__run)
372
 
        self.thread.setDaemon(True)
373
 
        self.thread.start()
374
 
 
375
 
    def _close_write_end(self):
376
 
        if self.write_end is not None:
377
 
            os.close(self.write_end)
378
 
            self.write_end = None
379
 
 
380
 
    def _join(self):
381
 
        #global threadcount
382
 
        if self.thread is not None:
383
 
            self.thread.join()
384
 
            self.thread = None
385
 
            #threadcount -= 1
386
 
            #print " %s  joined (%d)" % (self, threadcount)
387
 
 
388
 
    def __del__(self):
389
 
        #print "%s\tdeleting" % self
390
 
        self._close_write_end()
391
 
        self.readfile.close()
392
 
        self._join()
393
 
 
394
 
    def __run(self):
395
 
        #global threadcount
396
 
        #threadcount += 1
397
 
        self.data = self.readfile.read()
398
 
 
399
 
    def fileno(self):
400
 
        return self.write_end
401
 
 
402
 
    def read(self):
403
 
        #print "%s\treading" % self
404
 
        self._close_write_end()
405
 
        self._join()
406
 
        self.readfile.close()
407
 
        return self.data