1
# arch-tag: 0abcff18-da5d-49f0-a03e-1d866a86b5cd
2
# Copyright (C) 2004 Canonical Ltd.
3
# Author: David Allouche <david@canonical.com>
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation; either version 2 of the License, or
8
# (at your option) any later version.
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
"""Twisted process handling
21
The name of this module, "knotted", was suggested by Andrew Bennetts
22
as a way to avoid the name clash with the"twisted" top-level package::
24
<spiv> I'd say "knotted" would be good.
25
<spiv> And then you can tell people to "get knotted" ;)
27
I disclaim all responsibility for the bad taste of Twisted developers.
31
from twisted.internet import protocol
32
from arch import errors
36
def make_exec_problem(data, command, args, expected, chdir):
37
status, signal, error = data
38
proc = DummyProcess(command, args, expected, chdir, status, signal, error)
39
return errors.ExecProblem(proc)
42
class TwistedSpawningStrategy(commandline.SpawningStrategy):
44
def __init__(self, command, logger):
45
self._command = command
48
def _spawn(self, args, chdir):
50
from twisted.python import threadable
51
assert not threadable.isInIOThread()
52
from twisted.internet import reactor
53
# The queue size must be unlimited, otherwise the blocking
54
# put() in the reactor could cause a deadlock.
55
queue = Queue.Queue(0) # unlimited queue
56
process_protocol = ProcessProtocol(queue)
57
argv = [self._command]
59
reactor.callFromThread(
61
process_protocol, self._command, args=argv, env=None, path=chdir)
64
def status_cmd(self, args, chdir, expected):
65
queue = self._spawn(args, chdir)
67
msg = queue.get(block=True)
69
self._logger.log_command(self._command, args)
70
elif msg[0] == 'output':
71
text = ''.join(msg[1])
72
self._logger.log_output(text)
73
elif msg[0] == 'error':
74
text = ''.join(msg[1])
75
self._logger.log_error(text)
76
elif msg[0] == 'terminate':
77
if msg[1] in expected and msg[2] is None:
80
raise make_exec_problem(
81
msg[1:], self._command, args, expected, chdir)
83
raise AssertionError('bad message: %r', msg)
85
def sequence_cmd(self, args, chdir, expected):
86
queue = self._spawn(args, chdir)
88
queue, self._command, args, chdir, expected, self._logger)
90
def status_text_cmd(self, args, chdir, expected):
91
queue = self._spawn(args, chdir)
93
queue, self._command, args, chdir, expected, self._logger)
96
return seq.status, text
99
class SequenceCmd(object):
101
def __init__(self, queue, command, args, chdir, expected, logger):
103
self._command = command
104
self._logger = logger
107
self._expected = expected
110
self.finished = False
121
line = self._buffer.pop(0)
123
return line.rstrip('\n')
126
msg = self._queue.get(block=True)
127
if msg[0] == 'start':
128
self._logger.log_command(self._command, self._args)
129
elif msg[0] == 'output':
130
self._buffer = msg[1] + self._buffer
131
elif msg[0] == 'error':
132
text = ''.join(msg[1])
133
self._logger.log_error(text)
134
elif msg[0] == 'terminate':
136
if msg[1] in self._expected and msg[2] is None:
137
self._status = msg[1]
139
raise make_exec_problem(
140
msg[1:], self._command, self._args,
141
self._expected, self._chdir)
143
raise AssertionError('bad message: %r', msg)
145
def _get_status(self):
146
if self._status is None:
147
raise AttributeError, "no status, process has not finished"
149
status = property(_get_status)
152
class DummyProcess(object):
153
"""Dummy object providing the same interface as `forkexec.ChildProcess`."""
155
def __init__(self, command, args, expected, chdir, status, signal, error):
156
self.command = command
159
self.expected = expected
160
self.error = ''.join(error)
165
class ProcessProtocol(protocol.ProcessProtocol):
167
def __init__(self, queue):
169
self.__out_buffer = str()
170
self.__err_buffer = str()
171
self.error_lines = []
174
def connectionMade(self):
175
"""The process has started."""
176
self.__queue.put(('start',), block=True)
178
def __send_output(self, lines):
179
self.__queue.put(('output', lines), block=True)
181
def __send_error(self, lines):
182
self.__queue.put(('error', lines), block=True)
184
def outReceived(self, data):
185
"""The process has produced data on standard output."""
186
data = self.__out_buffer + data
187
lines = data.splitlines(True)
188
if lines[-1].endswith('\n'):
189
self.__out_buffer = str()
190
complete_lines = lines
192
self.__out_buffer = lines[-1]
193
complete_lines = lines[:-1]
194
self.__send_output(complete_lines)
196
def errReceived(self, data):
197
"""The process has produced data on standard error."""
198
data = self.__err_buffer + data
199
lines = data.splitlines(True)
200
if lines[-1].endswith('\n'):
201
self.__err_buffer = str()
202
complete_lines = lines
204
self.__err_buffer = lines[-1]
205
complete_lines = lines[:-1]
206
self.__send_error(complete_lines)
207
self.error_lines.extend(complete_lines)
209
def outConnectionLost(self):
210
"""The process has closed standard output."""
211
if self.__out_buffer:
212
self.__send_output([self.__out_buffer])
213
self.__out_buffer = str()
215
def errConnectionLost(self):
216
"""The process has closed standard error."""
217
if self.__err_buffer:
218
self.__send_error([self.__err_buffer])
219
self.error_lines.append(self.__err_buffer)
220
self.__err_buffer = str()
222
def processEnded(self, reason):
223
"""The process has terminated."""
224
signal = reason.value.signal
225
status = reason.value.exitCode
226
msg = ('terminate', status, signal, self.error_lines)
227
self.__queue.put(msg, block=True)