~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart_signals.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-12-02 14:58:47 UTC
  • mfrom: (5554.1.3 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20101202145847-fw822sd3nyhvrwmi
(vila) Merge 2.2 into trunk including fix for bug #583667 and bug
        #681885 (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
import os
19
 
import signal
20
 
import threading
21
 
import weakref
22
 
 
23
 
from bzrlib import tests, transport
24
 
from bzrlib.smart import client, medium, server, signals
25
 
 
26
 
# Windows doesn't define SIGHUP. And while we could just skip a lot of these
27
 
# tests, we often don't actually care about interaction with 'signal', so we
28
 
# can still run the tests for code coverage.
29
 
SIGHUP = getattr(signal, 'SIGHUP', 1)
30
 
 
31
 
 
32
 
class TestSignalHandlers(tests.TestCase):
33
 
 
34
 
    def setUp(self):
35
 
        super(TestSignalHandlers, self).setUp()
36
 
        # This allows us to mutate the signal handler callbacks, but leave it
37
 
        # 'pristine' after the test case.
38
 
        # TODO: Arguably, this could be put into the base test.TestCase, along
39
 
        #       with a tearDown that asserts that all the entries have been
40
 
        #       removed properly. Global state is always a bit messy. A shame
41
 
        #       that we need it for signal handling.
42
 
        orig = signals._setup_on_hangup_dict()
43
 
        self.assertIs(None, orig)
44
 
        def cleanup():
45
 
            signals._on_sighup = None
46
 
        self.addCleanup(cleanup)
47
 
 
48
 
    def test_registered_callback_gets_called(self):
49
 
        calls = []
50
 
        def call_me():
51
 
            calls.append('called')
52
 
        signals.register_on_hangup('myid', call_me)
53
 
        signals._sighup_handler(SIGHUP, None)
54
 
        self.assertEqual(['called'], calls)
55
 
        signals.unregister_on_hangup('myid')
56
 
 
57
 
    def test_unregister_not_present(self):
58
 
        # We don't want unregister to fail, since it is generally run at times
59
 
        # that shouldn't interrupt other flow.
60
 
        signals.unregister_on_hangup('no-such-id')
61
 
        log = self.get_log()
62
 
        self.assertContainsRe(log, 'Error occurred during unregister_on_hangup:')
63
 
        self.assertContainsRe(log, '(?s)Traceback.*KeyError')
64
 
 
65
 
    def test_failing_callback(self):
66
 
        calls = []
67
 
        def call_me():
68
 
            calls.append('called')
69
 
        def fail_me():
70
 
            raise RuntimeError('something bad happened')
71
 
        signals.register_on_hangup('myid', call_me)
72
 
        signals.register_on_hangup('otherid', fail_me)
73
 
        # _sighup_handler should call both, even though it got an exception
74
 
        signals._sighup_handler(SIGHUP, None)
75
 
        signals.unregister_on_hangup('myid')
76
 
        signals.unregister_on_hangup('otherid')
77
 
        log = self.get_log()
78
 
        self.assertContainsRe(log, '(?s)Traceback.*RuntimeError')
79
 
        self.assertEqual(['called'], calls)
80
 
 
81
 
    def test_unregister_during_call(self):
82
 
        # _sighup_handler should handle if some callbacks actually remove
83
 
        # themselves while running.
84
 
        calls = []
85
 
        def call_me_and_unregister():
86
 
            signals.unregister_on_hangup('myid')
87
 
            calls.append('called_and_unregistered')
88
 
        def call_me():
89
 
            calls.append('called')
90
 
        signals.register_on_hangup('myid', call_me_and_unregister)
91
 
        signals.register_on_hangup('other', call_me)
92
 
        signals._sighup_handler(SIGHUP, None)
93
 
 
94
 
    def test_keyboard_interrupt_propagated(self):
95
 
        # In case we get 'stuck' while running a hangup function, we should
96
 
        # not suppress KeyboardInterrupt
97
 
        def call_me_and_raise():
98
 
            raise KeyboardInterrupt()
99
 
        signals.register_on_hangup('myid', call_me_and_raise)
100
 
        self.assertRaises(KeyboardInterrupt,
101
 
                          signals._sighup_handler, SIGHUP, None)
102
 
        signals.unregister_on_hangup('myid')
103
 
 
104
 
    def test_weak_references(self):
105
 
        # TODO: This is probably a very-CPython-specific test
106
 
        # Adding yourself to the callback should not make you immortal
107
 
        # We overrideAttr during the test suite, so that we don't pollute the
108
 
        # original dict. However, we can test that what we override matches
109
 
        # what we are putting there.
110
 
        self.assertIsInstance(signals._on_sighup,
111
 
                              weakref.WeakValueDictionary)
112
 
        calls = []
113
 
        def call_me():
114
 
            calls.append('called')
115
 
        signals.register_on_hangup('myid', call_me)
116
 
        del call_me
117
 
        # Non-CPython might want to do a gc.collect() here
118
 
        signals._sighup_handler(SIGHUP, None)
119
 
        self.assertEqual([], calls)
120
 
 
121
 
    def test_not_installed(self):
122
 
        # If you haven't called bzrlib.smart.signals.install_sighup_handler,
123
 
        # then _on_sighup should be None, and all the calls become no-ops.
124
 
        signals._on_sighup = None
125
 
        calls = []
126
 
        def call_me():
127
 
            calls.append('called')
128
 
        signals.register_on_hangup('myid', calls)
129
 
        signals._sighup_handler(SIGHUP, None)
130
 
        signals.unregister_on_hangup('myid')
131
 
        log = self.get_log()
132
 
        self.assertEqual('', log)
133
 
 
134
 
    def test_install_sighup_handler(self):
135
 
        # install_sighup_handler should set up a signal handler for SIGHUP, as
136
 
        # well as the signals._on_sighup dict.
137
 
        signals._on_sighup = None
138
 
        orig = signals.install_sighup_handler()
139
 
        if getattr(signal, 'SIGHUP', None) is not None:
140
 
            cur = signal.getsignal(SIGHUP)
141
 
            self.assertEqual(signals._sighup_handler, cur)
142
 
        self.assertIsNot(None, signals._on_sighup)
143
 
        signals.restore_sighup_handler(orig)
144
 
        self.assertIs(None, signals._on_sighup)
145
 
 
146
 
 
147
 
class TestInetServer(tests.TestCase):
148
 
 
149
 
    def create_file_pipes(self):
150
 
        r, w = os.pipe()
151
 
        rf = os.fdopen(r, 'rb')
152
 
        wf = os.fdopen(w, 'wb')
153
 
        return rf, wf
154
 
 
155
 
    def test_inet_server_responds_to_sighup(self):
156
 
        t = transport.get_transport('memory:///')
157
 
        content = 'a'*1024*1024
158
 
        t.put_bytes('bigfile', content)
159
 
        factory = server.BzrServerFactory()
160
 
        # Override stdin/stdout so that we can inject our own handles
161
 
        client_read, server_write = self.create_file_pipes()
162
 
        server_read, client_write = self.create_file_pipes()
163
 
        factory._get_stdin_stdout = lambda: (server_read, server_write)
164
 
        factory.set_up(t, None, None, inet=True, timeout=4.0)
165
 
        self.addCleanup(factory.tear_down)
166
 
        started = threading.Event()
167
 
        stopped = threading.Event()
168
 
        def serving():
169
 
            started.set()
170
 
            factory.smart_server.serve()
171
 
            stopped.set()
172
 
        server_thread = threading.Thread(target=serving)
173
 
        server_thread.start()
174
 
        started.wait()
175
 
        client_medium = medium.SmartSimplePipesClientMedium(client_read,
176
 
                            client_write, 'base')
177
 
        client_client = client._SmartClient(client_medium)
178
 
        resp, response_handler = client_client.call_expecting_body('get',
179
 
            'bigfile')
180
 
        signals._sighup_handler(SIGHUP, None)
181
 
        self.assertTrue(factory.smart_server.finished)
182
 
        # We can still finish reading the file content, but more than that, and
183
 
        # the file is closed.
184
 
        v = response_handler.read_body_bytes()
185
 
        if v != content:
186
 
            self.fail('Got the wrong content back, expected 1M "a"')
187
 
        stopped.wait()
188
 
        server_thread.join()
189