~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/blackbox/test_breakin.py

  • Committer: Martin Pool
  • Date: 2009-07-10 07:14:02 UTC
  • mto: (4525.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4526.
  • Revision ID: mbp@sourcefrog.net-20090710071402-dga5lhvdup5jeere
Rename remaining *_implementations tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Blackbox tests for debugger breakin"""
 
18
 
 
19
import errno
 
20
import os
 
21
import signal
 
22
import subprocess
 
23
import sys
 
24
import time
 
25
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    tests,
 
29
    )
 
30
 
 
31
 
 
32
class TestBreakin(tests.TestCase):
 
33
    # FIXME: If something is broken, these tests may just hang indefinitely in
 
34
    # wait() waiting for the child to exit when it's not going to.
 
35
 
 
36
    def setUp(self):
 
37
        if sys.platform == 'win32':
 
38
            raise tests.TestSkipped('breakin signal not tested on win32')
 
39
        super(TestBreakin, self).setUp()
 
40
 
 
41
    def _dont_SIGQUIT_on_darwin(self):
 
42
        if sys.platform == 'darwin':
 
43
            # At least on Leopard and with python 2.6, this test will raise a
 
44
            # popup window asking if the python failure should be reported to
 
45
            # Apple... That's not the point of the test :) Marking the test as
 
46
            # not applicable Until we find a way to disable that intrusive
 
47
            # behavior... --vila20080611
 
48
            raise tests.TestNotApplicable(
 
49
                '%s raises a popup on OSX' % self.id())
 
50
 
 
51
    def _wait_for_process(self, pid, sig=None):
 
52
        # We don't know quite how long waiting for the process 'pid' will take,
 
53
        # but if it's more than 10s then it's probably not going to work.
 
54
        for i in range(100):
 
55
            time.sleep(0.1)
 
56
            if sig is not None:
 
57
                os.kill(pid, sig)
 
58
            # Use WNOHANG to ensure we don't get blocked, doing so, we may
 
59
            # leave the process continue after *we* die...
 
60
            try:
 
61
                # note: waitpid is different on win32, but this test only runs
 
62
                # on unix
 
63
                pid_killed, returncode = os.waitpid(pid, os.WNOHANG)
 
64
                if (pid_killed, returncode) != (0, 0):
 
65
                    if sig is not None:
 
66
                        # high bit in low byte says if core was dumped; we
 
67
                        # don't care
 
68
                        status, sig = (returncode >> 8, returncode & 0x7f)
 
69
                        return True, sig
 
70
            except OSError, e:
 
71
                if e.errno in (errno.ECHILD, errno.ESRCH):
 
72
                    # The process doesn't exist anymore
 
73
                    return True, None
 
74
                else:
 
75
                    raise
 
76
 
 
77
        return False, None
 
78
 
 
79
    # port 0 means to allocate any port
 
80
    _test_process_args = ['serve', '--port', 'localhost:0']
 
81
 
 
82
    def test_breakin(self):
 
83
        # Break in to a debugger while bzr is running
 
84
        # we need to test against a command that will wait for
 
85
        # a while -- bzr serve should do
 
86
        proc = self.start_bzr_subprocess(self._test_process_args,
 
87
                env_changes=dict(BZR_SIGQUIT_PDB=None))
 
88
        # wait for it to get started, and print the 'listening' line
 
89
        proc.stderr.readline()
 
90
        # first sigquit pops into debugger
 
91
        os.kill(proc.pid, signal.SIGQUIT)
 
92
        # Wait for the debugger to acknowledge the signal reception
 
93
        err = proc.stderr.readline()
 
94
        self.assertContainsRe(err, r'entering debugger')
 
95
        # Now that the debugger is entered, we can ask him to quit
 
96
        proc.stdin.write("q\n")
 
97
        # We wait a bit to let the child process handles our query and avoid
 
98
        # triggering deadlocks leading to hangs on multi-core hosts...
 
99
        dead, sig = self._wait_for_process(proc.pid)
 
100
        if not dead:
 
101
            # The process didn't finish, let's kill it before reporting failure
 
102
            dead, sig = self._wait_for_process(proc.pid, signal.SIGKILL)
 
103
            if dead:
 
104
                raise tests.KnownFailure(
 
105
                    "subprocess wasn't terminated, it had to be killed")
 
106
            else:
 
107
                self.fail("subprocess %d wasn't terminated by repeated SIGKILL",
 
108
                          proc.pid)
 
109
 
 
110
    def test_breakin_harder(self):
 
111
        """SIGQUITting twice ends the process."""
 
112
        self._dont_SIGQUIT_on_darwin()
 
113
        proc = self.start_bzr_subprocess(self._test_process_args,
 
114
                env_changes=dict(BZR_SIGQUIT_PDB=None))
 
115
        # wait for it to get started, and print the 'listening' line
 
116
        proc.stderr.readline()
 
117
        # break into the debugger
 
118
        os.kill(proc.pid, signal.SIGQUIT)
 
119
        # Wait for the debugger to acknowledge the signal reception (since we
 
120
        # want to send a second signal, we ensure it doesn't get lost by
 
121
        # validating the first get received and produce its effect).
 
122
        err = proc.stderr.readline()
 
123
        self.assertContainsRe(err, r'entering debugger')
 
124
        dead, sig = self._wait_for_process(proc.pid, signal.SIGQUIT)
 
125
        self.assertTrue((dead and sig == signal.SIGQUIT),
 
126
                        msg="subprocess wasn't terminated by repeated SIGQUIT")
 
127
 
 
128
    def test_breakin_disabled(self):
 
129
        self._dont_SIGQUIT_on_darwin()
 
130
        proc = self.start_bzr_subprocess(self._test_process_args,
 
131
                env_changes=dict(BZR_SIGQUIT_PDB='0'))
 
132
        # wait for it to get started, and print the 'listening' line
 
133
        proc.stderr.readline()
 
134
        # first hit should just kill it
 
135
        os.kill(proc.pid, signal.SIGQUIT)
 
136
        proc.wait()