~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/arch/backends/knotted.py

  • Committer: Aaron Bentley
  • Date: 2005-11-11 17:43:12 UTC
  • Revision ID: aaron.bentley@utoronto.ca-20051111174312-1c627d82a07bf8fd
Added patch for tab-in-patch-filename support

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# arch-tag: 0abcff18-da5d-49f0-a03e-1d866a86b5cd
2
 
# Copyright (C) 2004 Canonical Ltd.
3
 
#               Author: David Allouche <david@canonical.com>
4
 
#
5
 
#    This program is free software; you can redistribute it and/or modify
6
 
#    it under the terms of the GNU General Public License as published by
7
 
#    the Free Software Foundation; either version 2 of the License, or
8
 
#    (at your option) any later version.
9
 
#
10
 
#    This program is distributed in the hope that it will be useful,
11
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
#    GNU General Public License for more details.
14
 
#
15
 
#    You should have received a copy of the GNU General Public License
16
 
#    along with this program; if not, write to the Free Software
17
 
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
 
 
19
 
"""Twisted process handling
20
 
 
21
 
The name of this module, "knotted", was suggested by Andrew Bennetts
22
 
as a way to avoid the name clash with the"twisted" top-level package::
23
 
 
24
 
    <spiv> I'd say "knotted" would be good.
25
 
    <spiv> And then you can tell people to "get knotted" ;)
26
 
 
27
 
I disclaim all responsibility for the bad taste of Twisted developers.
28
 
"""
29
 
 
30
 
import Queue
31
 
from twisted.internet import protocol
32
 
from arch import errors
33
 
import commandline
34
 
 
35
 
 
36
 
def make_exec_problem(data, command, args, expected, chdir):
37
 
    status, signal, error = data
38
 
    proc = DummyProcess(command, args, expected, chdir, status, signal, error)
39
 
    return errors.ExecProblem(proc)
40
 
 
41
 
 
42
 
class TwistedSpawningStrategy(commandline.SpawningStrategy):
43
 
 
44
 
    def __init__(self, command, logger):
45
 
        self._command = command
46
 
        self._logger = logger
47
 
 
48
 
    def _spawn(self, args, chdir):
49
 
        if __debug__:
50
 
            from twisted.python import threadable
51
 
            assert not threadable.isInIOThread()
52
 
        from twisted.internet import reactor
53
 
        # The queue size must be unlimited, otherwise the blocking
54
 
        # put() in the reactor could cause a deadlock.
55
 
        queue = Queue.Queue(0) # unlimited queue
56
 
        process_protocol = ProcessProtocol(queue)
57
 
        argv = [self._command]
58
 
        argv.extend(args)
59
 
        reactor.callFromThread(
60
 
            reactor.spawnProcess,
61
 
            process_protocol, self._command, args=argv, env=None, path=chdir)
62
 
        return queue
63
 
 
64
 
    def status_cmd(self, args, chdir, expected):
65
 
        queue = self._spawn(args, chdir)
66
 
        while True:
67
 
            msg = queue.get(block=True)
68
 
            if msg[0] == 'start':
69
 
                self._logger.log_command(self._command, args)
70
 
            elif msg[0] == 'output':
71
 
                text = ''.join(msg[1])
72
 
                self._logger.log_output(text)
73
 
            elif msg[0] == 'error':
74
 
                text = ''.join(msg[1])
75
 
                self._logger.log_error(text)
76
 
            elif msg[0] == 'terminate':
77
 
                if msg[1] in expected and msg[2] is None:
78
 
                    return msg[1]
79
 
                else:
80
 
                    raise make_exec_problem(
81
 
                        msg[1:], self._command, args, expected, chdir)
82
 
            else:
83
 
                raise AssertionError('bad message: %r', msg)
84
 
 
85
 
    def sequence_cmd(self, args, chdir, expected):
86
 
        queue = self._spawn(args, chdir)
87
 
        return SequenceCmd(
88
 
            queue, self._command, args, chdir, expected, self._logger)
89
 
 
90
 
    def status_text_cmd(self, args, chdir, expected):
91
 
        queue = self._spawn(args, chdir)
92
 
        seq = SequenceCmd(
93
 
            queue, self._command, args, chdir, expected, self._logger)
94
 
        seq.strip = False
95
 
        text = ''.join(seq)
96
 
        return seq.status, text
97
 
 
98
 
 
99
 
class SequenceCmd(object):
100
 
 
101
 
    def __init__(self, queue, command, args, chdir, expected, logger):
102
 
        self._queue = queue
103
 
        self._command = command
104
 
        self._logger = logger
105
 
        self._args = args
106
 
        self._chdir = chdir
107
 
        self._expected = expected
108
 
        self._buffer = []
109
 
        self._status = None
110
 
        self.finished = False
111
 
        self.strip = True
112
 
 
113
 
    def __iter__(self):
114
 
        return self
115
 
 
116
 
    def next(self):
117
 
        while True:
118
 
            if self.finished:
119
 
                raise StopIteration
120
 
            if self._buffer:
121
 
                line = self._buffer.pop(0)
122
 
                if self.strip:
123
 
                    return line.rstrip('\n')
124
 
                else:
125
 
                    return line
126
 
            msg = self._queue.get(block=True)
127
 
            if msg[0] == 'start':
128
 
                self._logger.log_command(self._command, self._args)
129
 
            elif msg[0] == 'output':
130
 
                self._buffer = msg[1] + self._buffer
131
 
            elif msg[0] == 'error':
132
 
                text = ''.join(msg[1])
133
 
                self._logger.log_error(text)
134
 
            elif msg[0] == 'terminate':
135
 
                self.finished = True
136
 
                if msg[1] in self._expected and msg[2] is None:
137
 
                    self._status = msg[1]
138
 
                else:
139
 
                    raise make_exec_problem(
140
 
                        msg[1:], self._command, self._args,
141
 
                        self._expected, self._chdir)
142
 
            else:
143
 
                raise AssertionError('bad message: %r', msg)
144
 
 
145
 
    def _get_status(self):
146
 
        if self._status is None:
147
 
            raise AttributeError, "no status, process has not finished"
148
 
        return self._status
149
 
    status = property(_get_status)
150
 
 
151
 
 
152
 
class DummyProcess(object):
153
 
    """Dummy object providing the same interface as `forkexec.ChildProcess`."""
154
 
 
155
 
    def __init__(self, command, args, expected, chdir, status, signal, error):
156
 
        self.command = command
157
 
        self.args = args
158
 
        self.chdir = chdir
159
 
        self.expected = expected
160
 
        self.error = ''.join(error)
161
 
        self.status = status
162
 
        self.signal = signal
163
 
 
164
 
 
165
 
class ProcessProtocol(protocol.ProcessProtocol):
166
 
 
167
 
    def __init__(self, queue):
168
 
        self.__queue = queue
169
 
        self.__out_buffer = str()
170
 
        self.__err_buffer = str()
171
 
        self.error_lines = []
172
 
        self.status = None
173
 
 
174
 
    def connectionMade(self):
175
 
        """The process has started."""
176
 
        self.__queue.put(('start',), block=True)
177
 
 
178
 
    def __send_output(self, lines):
179
 
        self.__queue.put(('output', lines), block=True)
180
 
 
181
 
    def __send_error(self, lines):
182
 
        self.__queue.put(('error', lines), block=True)
183
 
 
184
 
    def outReceived(self, data):
185
 
        """The process has produced data on standard output."""
186
 
        data = self.__out_buffer + data
187
 
        lines = data.splitlines(True)
188
 
        if lines[-1].endswith('\n'):
189
 
            self.__out_buffer = str()
190
 
            complete_lines = lines
191
 
        else:
192
 
            self.__out_buffer = lines[-1]
193
 
            complete_lines = lines[:-1]
194
 
        self.__send_output(complete_lines)
195
 
 
196
 
    def errReceived(self, data):
197
 
        """The process has produced data on standard error."""
198
 
        data = self.__err_buffer + data
199
 
        lines = data.splitlines(True)
200
 
        if lines[-1].endswith('\n'):
201
 
            self.__err_buffer = str()
202
 
            complete_lines = lines
203
 
        else:
204
 
            self.__err_buffer = lines[-1]
205
 
            complete_lines = lines[:-1]
206
 
        self.__send_error(complete_lines)
207
 
        self.error_lines.extend(complete_lines)
208
 
 
209
 
    def outConnectionLost(self):
210
 
        """The process has closed standard output."""
211
 
        if self.__out_buffer:
212
 
            self.__send_output([self.__out_buffer])
213
 
            self.__out_buffer = str()
214
 
 
215
 
    def errConnectionLost(self):
216
 
        """The process has closed standard error."""
217
 
        if self.__err_buffer:
218
 
            self.__send_error([self.__err_buffer])
219
 
            self.error_lines.append(self.__err_buffer)
220
 
            self.__err_buffer = str()
221
 
 
222
 
    def processEnded(self, reason):
223
 
        """The process has terminated."""
224
 
        signal = reason.value.signal
225
 
        status = reason.value.exitCode
226
 
        msg = ('terminate', status, signal, self.error_lines)
227
 
        self.__queue.put(msg, block=True)