1
# Copyright (C) 2006, 2007, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Blackbox tests for debugger breakin"""
38
class TestBreakin(tests.TestCase):
39
# FIXME: If something is broken, these tests may just hang indefinitely in
40
# wait() waiting for the child to exit when it's not going to.
43
super(TestBreakin, self).setUp()
44
self.requireFeature(tests.BreakinFeature)
46
def _send_signal_via_kill(self, pid, sig_type):
47
if sig_type == 'break':
48
sig_num = signal.SIGQUIT
49
elif sig_type == 'kill':
50
sig_num = signal.SIGKILL
52
raise ValueError("unknown signal type: %s" % (sig_type,))
56
if e.errno != errno.ESRCH:
59
def _send_signal_win32(self, pid, sig_type):
60
"""Send a 'signal' on Windows.
62
Windows doesn't really have signals in the same way. All it really
64
1) Sending SIGINT to the *current* process group (so self, and all
66
2) Sending SIGBREAK to a process that shares the current console,
67
which can be in its own process group.
68
So we have start_bzr_subprocess create a new process group for the
69
spawned process (via a flag to Popen), and then we map
70
SIGQUIT to GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT)
71
SIGKILL to TerminateProcess
73
if sig_type == 'break':
76
ret = ctypes.windll.kernel32.GenerateConsoleCtrlEvent(
77
CTRL_BREAK_EVENT, pid)
79
err = ctypes.FormatError()
80
raise RuntimeError('failed to send CTRL_BREAK: %s'
82
elif sig_type == 'kill':
83
# Does the exit code matter? For now we are just setting it to
84
# something other than 0
85
exit_code = breakin.determine_signal()
86
ctypes.windll.kernel32.TerminateProcess(pid, exit_code)
88
if sys.platform == 'win32':
89
_send_signal = _send_signal_win32
91
_send_signal = _send_signal_via_kill
93
def _popen(self, *args, **kwargs):
94
if sys.platform == 'win32':
95
CREATE_NEW_PROCESS_GROUP = 512
96
# This allows us to send a signal to the child, *without* also
97
# sending it to ourselves
98
kwargs['creationflags'] = CREATE_NEW_PROCESS_GROUP
99
return super(TestBreakin, self)._popen(*args, **kwargs)
101
def _dont_SIGQUIT_on_darwin(self):
102
if sys.platform == 'darwin':
103
# At least on Leopard and with python 2.6, this test will raise a
104
# popup window asking if the python failure should be reported to
105
# Apple... That's not the point of the test :) Marking the test as
106
# not applicable Until we find a way to disable that intrusive
107
# behavior... --vila20080611
108
raise tests.TestNotApplicable(
109
'%s raises a popup on OSX' % self.id())
111
def _wait_for_process(self, pid, sig=None, count=100):
112
# We don't know quite how long waiting for the process 'pid' will take,
113
# but if it's more than 10s then it's probably not going to work.
114
for i in range(count):
116
self._send_signal(pid, sig)
117
# Use WNOHANG to ensure we don't get blocked, doing so, we may
118
# leave the process continue after *we* die...
119
# Win32 doesn't support WNOHANG, so we just pass 0
120
opts = getattr(os, 'WNOHANG', 0)
122
# TODO: waitpid doesn't work well on windows, we might consider
123
# using WaitForSingleObject(proc._handle, TIMEOUT)
124
# instead. Most notably, the WNOHANG isn't allowed, so
125
# this can hang indefinitely.
126
pid_killed, returncode = os.waitpid(pid, opts)
127
if pid_killed != 0 and returncode != 0:
129
# high bit in low byte says if core was dumped; we
131
status, sig = (returncode >> 8, returncode & 0x7f)
134
if e.errno in (errno.ECHILD, errno.ESRCH):
135
# The process doesn't exist anymore
144
# port 0 means to allocate any port
145
_test_process_args = ['serve', '--port', 'localhost:0']
147
def test_breakin(self):
148
# Break in to a debugger while bzr is running
149
# we need to test against a command that will wait for
150
# a while -- bzr serve should do
151
proc = self.start_bzr_subprocess(self._test_process_args,
152
env_changes=dict(BZR_SIGQUIT_PDB=None))
153
# wait for it to get started, and print the 'listening' line
154
proc.stderr.readline()
155
# first sigquit pops into debugger
156
self._send_signal(proc.pid, 'break')
157
# Wait for the debugger to acknowledge the signal reception
158
# Note that it is possible for this to deadlock if the child doesn't
159
# acknowlege the signal and write to stderr. Perhaps we should try
160
# os.read(proc.stderr.fileno())?
161
err = proc.stderr.readline()
162
self.assertContainsRe(err, r'entering debugger')
163
# Try to shutdown cleanly;
164
# Now that the debugger is entered, we can ask him to quit
165
proc.stdin.write("q\n")
166
# But we don't really care if it doesn't.
167
dead, sig = self._wait_for_process(proc.pid, count=3)
169
# The process didn't finish, let's kill it.
170
dead, sig = self._wait_for_process(proc.pid, 'kill', count=10)
172
# process isn't gone, user will have to hunt it down and kill
174
self.fail("subprocess %d wasn't terminated by repeated SIGKILL" %
177
def test_breakin_harder(self):
178
"""SIGQUITting twice ends the process."""
179
self._dont_SIGQUIT_on_darwin()
180
proc = self.start_bzr_subprocess(self._test_process_args,
181
env_changes=dict(BZR_SIGQUIT_PDB=None))
182
# wait for it to get started, and print the 'listening' line
183
proc.stderr.readline()
184
# break into the debugger
185
self._send_signal(proc.pid, 'break')
186
# Wait for the debugger to acknowledge the signal reception (since we
187
# want to send a second signal, we ensure it doesn't get lost by
188
# validating the first get received and produce its effect).
189
err = proc.stderr.readline()
190
self.assertContainsRe(err, r'entering debugger')
191
dead, sig = self._wait_for_process(proc.pid, 'break')
192
self.assertTrue(dead)
193
# Either the child was dead before we could read its status, or the
194
# child was dead from the signal we sent it.
195
self.assertTrue(sig in (None, breakin.determine_signal()))
197
def test_breakin_disabled(self):
198
self._dont_SIGQUIT_on_darwin()
199
proc = self.start_bzr_subprocess(self._test_process_args,
200
env_changes=dict(BZR_SIGQUIT_PDB='0'))
201
# wait for it to get started, and print the 'listening' line
202
proc.stderr.readline()
203
# first hit should just kill it
204
self._send_signal(proc.pid, 'break')