~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/arch/backends/knotted.py

  • Committer: Robert Collins
  • Date: 2005-09-13 13:07:03 UTC
  • mto: (147.2.6) (364.1.3 bzrtools)
  • mto: This revision was merged to the branch mainline in revision 324.
  • Revision ID: robertc@robertcollins.net-20050913130702-f471ae9f2833a484
create the output directory

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# arch-tag: 0abcff18-da5d-49f0-a03e-1d866a86b5cd
 
2
# Copyright (C) 2004 Canonical Ltd.
 
3
#               Author: David Allouche <david@canonical.com>
 
4
#
 
5
#    This program is free software; you can redistribute it and/or modify
 
6
#    it under the terms of the GNU General Public License as published by
 
7
#    the Free Software Foundation; either version 2 of the License, or
 
8
#    (at your option) any later version.
 
9
#
 
10
#    This program is distributed in the hope that it will be useful,
 
11
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
#    GNU General Public License for more details.
 
14
#
 
15
#    You should have received a copy of the GNU General Public License
 
16
#    along with this program; if not, write to the Free Software
 
17
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
"""Twisted process handling
 
20
 
 
21
The name of this module, "knotted", was suggested by Andrew Bennetts
 
22
as a way to avoid the name clash with the"twisted" top-level package::
 
23
 
 
24
    <spiv> I'd say "knotted" would be good.
 
25
    <spiv> And then you can tell people to "get knotted" ;)
 
26
 
 
27
I disclaim all responsibility for the bad taste of Twisted developers.
 
28
"""
 
29
 
 
30
import Queue
 
31
from twisted.internet import protocol
 
32
from arch import errors
 
33
import commandline
 
34
 
 
35
 
 
36
def make_exec_problem(data, command, args, expected, chdir):
 
37
    status, signal, error = data
 
38
    proc = DummyProcess(command, args, expected, chdir, status, signal, error)
 
39
    return errors.ExecProblem(proc)
 
40
 
 
41
 
 
42
class TwistedSpawningStrategy(commandline.SpawningStrategy):
 
43
 
 
44
    def __init__(self, command, logger):
 
45
        self._command = command
 
46
        self._logger = logger
 
47
 
 
48
    def _spawn(self, args, chdir):
 
49
        if __debug__:
 
50
            from twisted.python import threadable
 
51
            assert not threadable.isInIOThread()
 
52
        from twisted.internet import reactor
 
53
        # The queue size must be unlimited, otherwise the blocking
 
54
        # put() in the reactor could cause a deadlock.
 
55
        queue = Queue.Queue(0) # unlimited queue
 
56
        process_protocol = ProcessProtocol(queue)
 
57
        argv = [self._command]
 
58
        argv.extend(args)
 
59
        reactor.callFromThread(
 
60
            reactor.spawnProcess,
 
61
            process_protocol, self._command, args=argv, env=None, path=chdir)
 
62
        return queue
 
63
 
 
64
    def status_cmd(self, args, chdir, expected):
 
65
        queue = self._spawn(args, chdir)
 
66
        while True:
 
67
            msg = queue.get(block=True)
 
68
            if msg[0] == 'start':
 
69
                self._logger.log_command(self._command, args)
 
70
            elif msg[0] == 'output':
 
71
                text = ''.join(msg[1])
 
72
                self._logger.log_output(text)
 
73
            elif msg[0] == 'error':
 
74
                text = ''.join(msg[1])
 
75
                self._logger.log_error(text)
 
76
            elif msg[0] == 'terminate':
 
77
                if msg[1] in expected and msg[2] is None:
 
78
                    return msg[1]
 
79
                else:
 
80
                    raise make_exec_problem(
 
81
                        msg[1:], self._command, args, expected, chdir)
 
82
            else:
 
83
                raise AssertionError('bad message: %r', msg)
 
84
 
 
85
    def sequence_cmd(self, args, chdir, expected):
 
86
        queue = self._spawn(args, chdir)
 
87
        return SequenceCmd(
 
88
            queue, self._command, args, chdir, expected, self._logger)
 
89
 
 
90
    def status_text_cmd(self, args, chdir, expected):
 
91
        queue = self._spawn(args, chdir)
 
92
        seq = SequenceCmd(
 
93
            queue, self._command, args, chdir, expected, self._logger)
 
94
        seq.strip = False
 
95
        text = ''.join(seq)
 
96
        return seq.status, text
 
97
 
 
98
 
 
99
class SequenceCmd(object):
 
100
 
 
101
    def __init__(self, queue, command, args, chdir, expected, logger):
 
102
        self._queue = queue
 
103
        self._command = command
 
104
        self._logger = logger
 
105
        self._args = args
 
106
        self._chdir = chdir
 
107
        self._expected = expected
 
108
        self._buffer = []
 
109
        self._status = None
 
110
        self.finished = False
 
111
        self.strip = True
 
112
 
 
113
    def __iter__(self):
 
114
        return self
 
115
 
 
116
    def next(self):
 
117
        while True:
 
118
            if self.finished:
 
119
                raise StopIteration
 
120
            if self._buffer:
 
121
                line = self._buffer.pop(0)
 
122
                if self.strip:
 
123
                    return line.rstrip('\n')
 
124
                else:
 
125
                    return line
 
126
            msg = self._queue.get(block=True)
 
127
            if msg[0] == 'start':
 
128
                self._logger.log_command(self._command, self._args)
 
129
            elif msg[0] == 'output':
 
130
                self._buffer = msg[1] + self._buffer
 
131
            elif msg[0] == 'error':
 
132
                text = ''.join(msg[1])
 
133
                self._logger.log_error(text)
 
134
            elif msg[0] == 'terminate':
 
135
                self.finished = True
 
136
                if msg[1] in self._expected and msg[2] is None:
 
137
                    self._status = msg[1]
 
138
                else:
 
139
                    raise make_exec_problem(
 
140
                        msg[1:], self._command, self._args,
 
141
                        self._expected, self._chdir)
 
142
            else:
 
143
                raise AssertionError('bad message: %r', msg)
 
144
 
 
145
    def _get_status(self):
 
146
        if self._status is None:
 
147
            raise AttributeError, "no status, process has not finished"
 
148
        return self._status
 
149
    status = property(_get_status)
 
150
 
 
151
 
 
152
class DummyProcess(object):
 
153
    """Dummy object providing the same interface as `forkexec.ChildProcess`."""
 
154
 
 
155
    def __init__(self, command, args, expected, chdir, status, signal, error):
 
156
        self.command = command
 
157
        self.args = args
 
158
        self.chdir = chdir
 
159
        self.expected = expected
 
160
        self.error = ''.join(error)
 
161
        self.status = status
 
162
        self.signal = signal
 
163
 
 
164
 
 
165
class ProcessProtocol(protocol.ProcessProtocol):
 
166
 
 
167
    def __init__(self, queue):
 
168
        self.__queue = queue
 
169
        self.__out_buffer = str()
 
170
        self.__err_buffer = str()
 
171
        self.error_lines = []
 
172
        self.status = None
 
173
 
 
174
    def connectionMade(self):
 
175
        """The process has started."""
 
176
        self.__queue.put(('start',), block=True)
 
177
 
 
178
    def __send_output(self, lines):
 
179
        self.__queue.put(('output', lines), block=True)
 
180
 
 
181
    def __send_error(self, lines):
 
182
        self.__queue.put(('error', lines), block=True)
 
183
 
 
184
    def outReceived(self, data):
 
185
        """The process has produced data on standard output."""
 
186
        data = self.__out_buffer + data
 
187
        lines = data.splitlines(True)
 
188
        if lines[-1].endswith('\n'):
 
189
            self.__out_buffer = str()
 
190
            complete_lines = lines
 
191
        else:
 
192
            self.__out_buffer = lines[-1]
 
193
            complete_lines = lines[:-1]
 
194
        self.__send_output(complete_lines)
 
195
 
 
196
    def errReceived(self, data):
 
197
        """The process has produced data on standard error."""
 
198
        data = self.__err_buffer + data
 
199
        lines = data.splitlines(True)
 
200
        if lines[-1].endswith('\n'):
 
201
            self.__err_buffer = str()
 
202
            complete_lines = lines
 
203
        else:
 
204
            self.__err_buffer = lines[-1]
 
205
            complete_lines = lines[:-1]
 
206
        self.__send_error(complete_lines)
 
207
        self.error_lines.extend(complete_lines)
 
208
 
 
209
    def outConnectionLost(self):
 
210
        """The process has closed standard output."""
 
211
        if self.__out_buffer:
 
212
            self.__send_output([self.__out_buffer])
 
213
            self.__out_buffer = str()
 
214
 
 
215
    def errConnectionLost(self):
 
216
        """The process has closed standard error."""
 
217
        if self.__err_buffer:
 
218
            self.__send_error([self.__err_buffer])
 
219
            self.error_lines.append(self.__err_buffer)
 
220
            self.__err_buffer = str()
 
221
 
 
222
    def processEnded(self, reason):
 
223
        """The process has terminated."""
 
224
        signal = reason.value.signal
 
225
        status = reason.value.exitCode
 
226
        msg = ('terminate', status, signal, self.error_lines)
 
227
        self.__queue.put(msg, block=True)