34
40
# wait() waiting for the child to exit when it's not going to.
37
if sys.platform == 'win32':
38
raise tests.TestSkipped('breakin signal not tested on win32')
39
43
super(TestBreakin, self).setUp()
44
if breakin.determine_signal() is None:
45
raise tests.TestSkipped('this platform is missing SIGQUIT'
47
if sys.platform == 'win32':
48
# Windows doesn't have os.kill, and we catch the SIGBREAK signal.
49
# We trigger SIGBREAK via a Console api so we need ctypes to access
52
raise tests.UnavailableFeature('ctypes')
53
self._send_signal = self._send_signal_win32
55
self._send_signal = self._send_signal_via_kill
57
def _send_signal_via_kill(self, pid, sig_type):
59
sig_num = signal.SIGQUIT
61
sig_num = signal.SIGKILL
63
raise ValueError("unknown signal type: %s" % (sig_type,))
66
def _send_signal_win32(self, pid, sig_type):
67
"""Send a 'signal' on Windows.
69
Windows doesn't really have signals in the same way. All it really
71
1) Sending SIGINT to the *current* process group (so self, and all
73
2) Sending SIGBREAK to a process that shares the current console,
74
which can be in its own process group.
75
So we have start_bzr_subprocess to create a new process group for the
76
spawned process, and then we map
77
SIGQUIT to GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT)
78
SIGKILL to TerminateProcess
80
if sig_type == 'break':
83
ret = ctypes.windll.kernel32.GenerateConsoleCtrlEvent(
84
CTRL_BREAK_EVENT, pid)
86
err = ctypes.FormatError()
87
raise RuntimeError('failed to send CTRL_BREAK: %s'
89
elif sig_type == 'kill':
90
# Does the exit code matter? For now we are just setting it to
91
# something other than 0
92
exit_code = breakin.determine_signal()
93
ctypes.windll.kernel32.TerminateProcess(pid, exit_code)
95
def _popen(self, *args, **kwargs):
96
if sys.platform == 'win32':
97
CREATE_NEW_PROCESS_GROUP = 512
98
# This allows us to send a signal to the child, *without* also
99
# sending it to ourselves
100
kwargs['creationflags'] = CREATE_NEW_PROCESS_GROUP
101
return super(TestBreakin, self)._popen(*args, **kwargs)
41
103
def _dont_SIGQUIT_on_darwin(self):
42
104
if sys.platform == 'darwin':
54
116
for i in range(100):
56
118
if sig is not None:
119
self._send_signal(pid, sig)
58
120
# Use WNOHANG to ensure we don't get blocked, doing so, we may
59
121
# leave the process continue after *we* die...
122
# Win32 doesn't support WNOHANG, so we just pass 0
123
opts = getattr(os, 'WNOHANG', 0)
61
125
# note: waitpid is different on win32, but this test only runs
63
pid_killed, returncode = os.waitpid(pid, os.WNOHANG)
127
# TODO: waitpid doesn't work well on windows, we might consider
128
# using WaitForSingleObject(proc._handle, TIMEOUT)
129
# instead. Most notably, the WNOHANG isn't allowed, so
130
# this can hang indefinitely.
131
pid_killed, returncode = os.waitpid(pid, opts)
64
132
if (pid_killed, returncode) != (0, 0):
65
133
if sig is not None:
66
134
# high bit in low byte says if core was dumped; we
88
156
# wait for it to get started, and print the 'listening' line
89
157
proc.stderr.readline()
90
158
# first sigquit pops into debugger
91
os.kill(proc.pid, signal.SIGQUIT)
159
self._send_signal(proc.pid, 'break')
92
160
# Wait for the debugger to acknowledge the signal reception
161
# Note that it is possible for this to deadlock if the child doesn't
162
# acknowlege the signal and write to stderr. Perhaps we should try
163
# os.read(proc.stderr.fileno())?
93
164
err = proc.stderr.readline()
94
165
self.assertContainsRe(err, r'entering debugger')
95
166
# Now that the debugger is entered, we can ask him to quit
99
170
dead, sig = self._wait_for_process(proc.pid)
101
172
# The process didn't finish, let's kill it before reporting failure
102
dead, sig = self._wait_for_process(proc.pid, signal.SIGKILL)
173
dead, sig = self._wait_for_process(proc.pid, 'kill')
104
175
raise tests.KnownFailure(
105
176
"subprocess wasn't terminated, it had to be killed")
115
186
# wait for it to get started, and print the 'listening' line
116
187
proc.stderr.readline()
117
188
# break into the debugger
118
os.kill(proc.pid, signal.SIGQUIT)
189
self._send_signal(proc.pid, 'break')
119
190
# Wait for the debugger to acknowledge the signal reception (since we
120
191
# want to send a second signal, we ensure it doesn't get lost by
121
192
# validating the first get received and produce its effect).
122
193
err = proc.stderr.readline()
123
194
self.assertContainsRe(err, r'entering debugger')
124
dead, sig = self._wait_for_process(proc.pid, signal.SIGQUIT)
125
self.assertTrue((dead and sig == signal.SIGQUIT),
126
msg="subprocess wasn't terminated by repeated SIGQUIT")
195
dead, sig = self._wait_for_process(proc.pid, 'break')
196
self.assertTrue(dead)
197
# Either the child was dead before we could read its status, or the
198
# child was dead from the signal we sent it.
199
self.assertTrue(sig in (None, breakin.determine_signal()))
128
201
def test_breakin_disabled(self):
129
202
self._dont_SIGQUIT_on_darwin()