~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/arch/backends/forkexec.py

  • Committer: Robert Collins
  • Date: 2005-09-13 12:39:26 UTC
  • mto: (147.2.6) (364.1.3 bzrtools)
  • mto: This revision was merged to the branch mainline in revision 324.
  • Revision ID: robertc@robertcollins.net-20050913123926-b72242bdacc1ae52
create the output directory

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# arch-tag: 5b9da267-93df-418e-bf4a-47fb9ec7f6de
 
2
# Copyright (C) 2004 Canonical Ltd.
 
3
#               Author: David Allouche <david@canonical.com>
 
4
#
 
5
#    This program is free software; you can redistribute it and/or modify
 
6
#    it under the terms of the GNU General Public License as published by
 
7
#    the Free Software Foundation; either version 2 of the License, or
 
8
#    (at your option) any later version.
 
9
#
 
10
#    This program is distributed in the hope that it will be useful,
 
11
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
#    GNU General Public License for more details.
 
14
#
 
15
#    You should have received a copy of the GNU General Public License
 
16
#    along with this program; if not, write to the Free Software
 
17
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
"""PyArch specific process spawning
 
20
"""
 
21
 
 
22
import sys
 
23
import os
 
24
import threading
 
25
from fai.arch.errors import ExecProblem
 
26
import commandline
 
27
 
 
28
### Spawning strategy
 
29
 
 
30
class PyArchSpawningStrategy(commandline.SpawningStrategy):
 
31
 
 
32
    def __init__(self, command, logger):
 
33
        self._command = command
 
34
        self._logger = logger
 
35
 
 
36
    def sequence_cmd(self, args, chdir=None, expected=(0,)):
 
37
        return _pyarch_sequence_cmd(
 
38
            self._command, args, chdir, expected, self._logger)
 
39
 
 
40
    def status_cmd(self, args, chdir, expected):
 
41
        return exec_safe(
 
42
            self._command, args, expected=expected, chdir=chdir,
 
43
            logger=self._logger)
 
44
 
 
45
    def status_text_cmd(self, args, chdir, expected):
 
46
        return exec_safe_status_stdout(
 
47
            self._command, args, expected=expected, chdir=chdir,
 
48
            logger=self._logger)
 
49
 
 
50
 
 
51
class _pyarch_sequence_cmd(object):
 
52
    def __init__(self, command, args, chdir, expected, logger):
 
53
        self._proc = exec_safe_iter_stdout(
 
54
            command, args, chdir=chdir, expected=expected, logger=logger)
 
55
    def __iter__(self):
 
56
        return self
 
57
    def next(self):
 
58
        line = self._proc.next()
 
59
        return line.rstrip('\n')
 
60
    def _get_finished(self):
 
61
        return self._proc.finished
 
62
    finished = property(_get_finished)
 
63
    def _get_status(self):
 
64
        return self._proc.status
 
65
    status = property(_get_status)
 
66
 
 
67
 
 
68
### Process handling commands
 
69
 
 
70
def exec_safe(program, args = [], stdout = None, stdin = None,
 
71
              stderr = None, expected = 0, chdir = None, logger = None):
 
72
    """Fork/exec a process and and raises an exception if the program
 
73
    died with a signal or returned an error code other than expected.
 
74
    This function will always wait."""
 
75
    proc = ChildProcess(program, args, expected, chdir, logger)
 
76
    proc.spawn(stdout, stdin, stderr)
 
77
    proc.wait()
 
78
    return proc.status
 
79
 
 
80
 
 
81
def exec_safe_status_stdout(program, args = [], expected = 0,
 
82
                            chdir = None, logger = None):
 
83
    """Run the specified program and return a tuple of two items:
 
84
    1. exit status of the program;
 
85
    2. output of the program as a single string.
 
86
    """
 
87
    read_end, write_end = os.pipe()
 
88
    proc = ChildProcess(program, args, expected, chdir, logger)
 
89
    proc.spawn(stdout=write_end)
 
90
    os.close(write_end)
 
91
    fd = os.fdopen(read_end, 'r')
 
92
    output = fd.read()
 
93
    proc.wait()
 
94
    return (proc.status, output)
 
95
 
 
96
 
 
97
class exec_safe_iter_stdout(object):
 
98
 
 
99
    """Iterator over the output of a child process.
 
100
 
 
101
    Fork/exec a process with its output piped. Each iteration will
 
102
    cause a iteration of the process output pipe. The pipe is properly
 
103
    closed whenever the output is exhausted or the iterator is
 
104
    deleted.
 
105
 
 
106
    If the output is exhausted, the process exit status is checked and
 
107
    an ExecError exception will be raised for abnormal or unexpected
 
108
    exit status.
 
109
    """
 
110
 
 
111
    def __init__(self, program, args=[], stdin=None, stderr=None,
 
112
                 expected=0, delsignal=None, chdir=None, logger=None):
 
113
        self.delsignal = delsignal
 
114
        self._read_file, self._pid = None, None
 
115
        read_end, write_end = os.pipe()
 
116
        self.proc = ChildProcess(program, args, expected, chdir, logger)
 
117
        try:
 
118
            self.proc.spawn(stdout=write_end, stdin=stdin, stderr=stderr,
 
119
                            closefds=[read_end])
 
120
            self._pid = self.proc.pid
 
121
        except:
 
122
            os.close(write_end)
 
123
            self.errlog = '<deleted>'
 
124
            os.close(read_end)
 
125
            raise
 
126
        self._read_file = os.fdopen(read_end, 'r', 0)
 
127
        os.close(write_end)
 
128
 
 
129
    def __del__(self):
 
130
        """Destructor. If needed, close the pipe and wait the child process.
 
131
 
 
132
        If child process has already been joined, it means the iterator was
 
133
        deleted before being exhausted. It is assumed the return status is not
 
134
        cared about.
 
135
        """
 
136
        if self._pid is not None:
 
137
            if self._read_file is not None:
 
138
                self._read_file.close()
 
139
            # Some buggy programs (e.g. ls -R) do not properly
 
140
            # terminate when their output pipe is broken. They
 
141
            # must be killed by an appropriate signal before
 
142
            # waiting. SIGINT should be okay most of the time.
 
143
            if self.delsignal:
 
144
                os.kill(self.proc.pid, self.delsignal)
 
145
            self.proc.wait_nocheck()
 
146
 
 
147
    def _get_finished(self):
 
148
        """Whether the underlying process has terminated."""
 
149
        return self.proc.status is not None or self.proc.signal is not None
 
150
    finished = property(_get_finished)
 
151
 
 
152
    def _get_status(self):
 
153
        """Exit status of the underlying process.
 
154
 
 
155
        Raises ValueError if the process has not yet finished. (Hm...
 
156
        should be AttributeError). Can be None if the process was
 
157
        killed by a signal.
 
158
        """
 
159
        if self.proc.status is None:
 
160
            raise ValueError, "no status, process has not finished"
 
161
        return self.proc.status
 
162
    status = property(_get_status)
 
163
 
 
164
    def close(self):
 
165
        """Close the pipe and wait the child process.
 
166
 
 
167
        If the output is not exhausted yet, you should be prepared to
 
168
        handle the error condition caused by breaking the pipe.
 
169
        """
 
170
        self._read_file.close()
 
171
        try:
 
172
            self.proc.wait()
 
173
            self._pid = None
 
174
            return self.proc.status
 
175
        except ExecProblem:
 
176
            self._pid = None
 
177
            raise
 
178
 
 
179
    def __iter__(self):
 
180
        """Iterator. Return self."""
 
181
        return self
 
182
 
 
183
    def next(self):
 
184
        """Iteration method. Iterate on the pipe file.
 
185
 
 
186
        Close the pipe and wait the child process once output is exhausted.
 
187
 
 
188
        Use `file.readline` instead of `file.next` because we want
 
189
        maximal responsiveness to incremental output. The pipe
 
190
        mechanism itself provides buffering.
 
191
        """
 
192
        try:
 
193
            line = self._read_file.readline()
 
194
        except ValueError:
 
195
            if self._pid is None: raise StopIteration
 
196
            else: raise
 
197
        if line:
 
198
            return line
 
199
        else:
 
200
            self.close()
 
201
            raise StopIteration
 
202
 
 
203
 
 
204
### Low-level process handling
 
205
 
 
206
class ChildProcess:
 
207
 
 
208
    """Description of a child process, for error handling."""
 
209
 
 
210
    def __init__(self, command, args, expected=0, chdir=None, logger=None):
 
211
        """Create a child process descriptor.
 
212
 
 
213
        The child process must have already been created. The
 
214
        `command`, `args` and `expected` parameters are used for error
 
215
        reporting.
 
216
 
 
217
        :param command: name of the child process executable
 
218
        :type command: str
 
219
        :param args: child process arguments (argv)
 
220
        :type args: sequence of str
 
221
        :param expected: valid exit status values
 
222
        :type expected: int or sequence of int
 
223
        """
 
224
        self.pid = None
 
225
        self.command = command
 
226
        self.args = tuple(args)
 
227
        if isinstance(expected, int): expected = (expected,)
 
228
        self.expected = expected
 
229
        self.signal = None
 
230
        self.status = None
 
231
        self.error = None
 
232
        self.chdir = chdir
 
233
        self._logger = logger
 
234
        self._outlog = None
 
235
        self._errlog = None
 
236
 
 
237
    def spawn(self, stdout=None, stdin=None, stderr=None, closefds=[]):
 
238
        """Fork, setup file descriptors, and exec child process.
 
239
 
 
240
        The `stdout`, `stdin` and `stderr` file-like objects can be
 
241
        raw file descriptors (ints) or file-like objects with a
 
242
        ``fileno`` method returning a file descriptor.
 
243
 
 
244
        When building a pipe, one side of a pipe is used as `stdout`,
 
245
        `stdin` or `stderr`, and the other is included in the
 
246
        `closefds` list, so the child process will be able to detect a
 
247
        broken pipe.
 
248
 
 
249
        :param stdin: use as standard input of child process, defaults
 
250
            to ``/dev/null``.
 
251
        :type stdin: file-like object with a ``fileno`` method or raw
 
252
            file descriptor (``int``).
 
253
        :param stdout: use as standard output of child process,
 
254
            defaults to internal pipe or ``/dev/null``.
 
255
        :type stdout: file-like object with a ``fileno`` method or raw
 
256
            file descriptor (``int``). If a logger was specified,
 
257
            defaults to a pipe for logging, if no logger was
 
258
            specified, defaults to ``/dev/null``.
 
259
        :param stderr: use as standard error of child process.
 
260
            Defaults to a pipe for error reporting (see `ExecProblem`)
 
261
            and logging.
 
262
        :type stderr: file-like object with a ``fileno`` method or raw
 
263
            file descriptor (``int``).
 
264
        :param closefds: file descriptors to close in the child process.
 
265
        :type closefds: iterable of int
 
266
        """
 
267
        __pychecker__ = 'no-moddefvalue'
 
268
        if self.pid is not None:
 
269
            raise ValueError, "child process was already spawned"
 
270
        if self._logger is not None:
 
271
            self._logger.log_command(self.command, self.args)
 
272
            if stdout is None:
 
273
                self._outlog = stdout = StringOutput()
 
274
        if stderr is None:
 
275
            self._errlog = stderr = StringOutput()
 
276
        self.pid = os.fork()
 
277
        if self.pid:
 
278
            return # the parent process, nothing more to do
 
279
        try:
 
280
            source_fds = [stdin, stdout, stderr]
 
281
            closefds = list(closefds)
 
282
            for i in range(3):
 
283
                if source_fds[i] is None:
 
284
                    source_fds[i] = getnull()
 
285
                if hasattr(source_fds[i], 'fileno'):
 
286
                    source_fds[i] = source_fds[i].fileno()
 
287
            # multiple dup2 can step their own toes if a source fd is smaller
 
288
            # than its target fd so start by duplicating low fds
 
289
            for i in range(1, 3):
 
290
                if source_fds[i] is not None:
 
291
                    while source_fds[i] < i:
 
292
                        closefds.append(source_fds[i])
 
293
                        source_fds[i] = os.dup(source_fds[i])
 
294
            # must close before dup2, because closefds may contain
 
295
            # values in the range 0..2
 
296
            for fd in closefds:
 
297
                os.close(fd)
 
298
            # finally, move the given fds to their final location
 
299
            for dest, source in enumerate(source_fds):
 
300
                if source is not None and source != dest:
 
301
                    os.dup2(source, dest)
 
302
                    if source not in source_fds[dest+1:]:
 
303
                        os.close(source)
 
304
 
 
305
            if self.chdir:
 
306
                os.chdir(self.chdir)
 
307
            os.execvp(self.command, (self.command,) + self.args)
 
308
        except:
 
309
            sys.excepthook(*sys.exc_info())
 
310
        sys.exit(255)
 
311
 
 
312
    def wait_nocheck(self):
 
313
        """Wait for process and return exit result.
 
314
 
 
315
        This (internally used) variant of `wait` is useful when you
 
316
        want to wait for a child process, but not raise an exception
 
317
        if it was terminated abnormally. Typically, that's what you
 
318
        want if you killed the child process.
 
319
 
 
320
        :return: second element of the tuple returned by os.wait()
 
321
        """
 
322
        if self._outlog is not None:
 
323
            self._logger.log_output(self._outlog.read())
 
324
        if self._errlog is not None:
 
325
            self.error = self._errlog.read()
 
326
            if self._logger is not None:
 
327
                self._logger.log_error(self.error)
 
328
        return os.waitpid(self.pid, 0)[1]
 
329
 
 
330
    def wait(self):
 
331
        """Wait for process and check its termination status.
 
332
 
 
333
        If the process exited, set ``self.status`` to its exit status.
 
334
 
 
335
        if the process was terminated by a signal, set ``self.signal``
 
336
        to the value of this signal.
 
337
 
 
338
        :raise ExecProblem: process was killed by a signal or exit
 
339
            status is not is ``self.expected``
 
340
        """
 
341
        result = self.wait_nocheck()
 
342
        if os.WIFSIGNALED(result):
 
343
            self.signal = os.WTERMSIG(result)
 
344
        if os.WIFEXITED(result):
 
345
            self.status = os.WEXITSTATUS(result)
 
346
        if not os.WIFEXITED(result):
 
347
            raise ExecProblem(self)
 
348
        if os.WEXITSTATUS(result) not in self.expected:
 
349
            raise ExecProblem(self)
 
350
 
 
351
 
 
352
nulldev = None
 
353
 
 
354
def getnull():
 
355
    """Return a file object of /dev/null/ opened  for writing."""
 
356
    global nulldev
 
357
    if not nulldev:
 
358
        nulldev = open("/dev/null", "w+")
 
359
    return nulldev
 
360
 
 
361
 
 
362
threadcount = 0
 
363
 
 
364
class StringOutput(object):
 
365
 
 
366
    """Memory buffer storing a pipe output asynchronously."""
 
367
 
 
368
    def __init__(self):
 
369
        read_end, self.write_end = os.pipe()
 
370
        self.readfile = os.fdopen(read_end, 'r', 0)
 
371
        self.thread = threading.Thread(target=self.__run)
 
372
        self.thread.setDaemon(True)
 
373
        self.thread.start()
 
374
 
 
375
    def _close_write_end(self):
 
376
        if self.write_end is not None:
 
377
            os.close(self.write_end)
 
378
            self.write_end = None
 
379
 
 
380
    def _join(self):
 
381
        #global threadcount
 
382
        if self.thread is not None:
 
383
            self.thread.join()
 
384
            self.thread = None
 
385
            #threadcount -= 1
 
386
            #print " %s  joined (%d)" % (self, threadcount)
 
387
 
 
388
    def __del__(self):
 
389
        #print "%s\tdeleting" % self
 
390
        self._close_write_end()
 
391
        self.readfile.close()
 
392
        self._join()
 
393
 
 
394
    def __run(self):
 
395
        #global threadcount
 
396
        #threadcount += 1
 
397
        self.data = self.readfile.read()
 
398
 
 
399
    def fileno(self):
 
400
        return self.write_end
 
401
 
 
402
    def read(self):
 
403
        #print "%s\treading" % self
 
404
        self._close_write_end()
 
405
        self._join()
 
406
        self.readfile.close()
 
407
        return self.data