1
# Copyright (C) 2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
26
from bzrlib.tests import test_server
27
from bzrlib.tests.scenarios import load_tests_apply_scenarios
30
load_tests = load_tests_apply_scenarios
33
class TestThreadWithException(tests.TestCase):
35
def test_start_and_join_smoke_test(self):
39
tt = test_server.ThreadWithException(target=do_nothing)
43
def test_exception_is_re_raised(self):
44
class MyException(Exception):
47
def raise_my_exception():
50
tt = test_server.ThreadWithException(target=raise_my_exception)
52
self.assertRaises(MyException, tt.join)
54
def test_join_when_no_exception(self):
55
resume = threading.Event()
56
class MyException(Exception):
59
def raise_my_exception():
60
# Wait for the test to tell us to resume
65
tt = test_server.ThreadWithException(target=raise_my_exception)
68
self.assertIs(None, tt.exception)
70
self.assertRaises(MyException, tt.join)
73
class TCPClient(object):
78
def connect(self, addr):
79
if self.sock is not None:
80
raise AssertionError('Already connected to %r'
81
% (self.sock.getsockname(),))
82
self.sock = osutils.connect_socket(addr)
85
if self.sock is not None:
87
self.sock.shutdown(socket.SHUT_RDWR)
89
except socket.error, e:
90
if e[0] in (errno.EBADF, errno.ENOTCONN):
91
# Right, the socket is already down
98
return self.sock.sendall(s)
100
def read(self, bufsize=4096):
101
return self.sock.recv(bufsize)
104
class TCPConnectionHandler(SocketServer.StreamRequestHandler):
108
self.handle_connection()
110
self.handle_connection()
112
def handle_connection(self):
113
req = self.rfile.readline()
116
elif req == 'ping\n':
117
self.wfile.write('pong\n')
119
raise ValueError('[%s] not understood' % req)
122
class TestTCPServerInAThread(tests.TestCase):
125
(name, {'server_class': getattr(test_server, name)})
127
('TestingTCPServer', 'TestingThreadingTCPServer')]
129
# Set by load_tests()
132
def get_server(self, server_class=None, connection_handler_class=None):
133
if server_class is not None:
134
self.server_class = server_class
135
if connection_handler_class is None:
136
connection_handler_class = TCPConnectionHandler
137
server = test_server.TestingTCPServerInAThread(
138
('localhost', 0), self.server_class, connection_handler_class)
139
server.start_server()
140
self.addCleanup(server.stop_server)
143
def get_client(self):
145
self.addCleanup(client.disconnect)
148
def get_server_connection(self, server, conn_rank):
149
return server.server.clients[conn_rank]
151
def assertClientAddr(self, client, server, conn_rank):
152
conn = self.get_server_connection(server, conn_rank)
153
self.assertEquals(client.sock.getsockname(), conn[1])
155
def test_start_stop(self):
156
server = self.get_server()
157
client = self.get_client()
159
# since the server doesn't accept connections anymore attempting to
160
# connect should fail
161
client = self.get_client()
162
self.assertRaises(socket.error,
163
client.connect, (server.host, server.port))
165
def test_client_talks_server_respond(self):
166
server = self.get_server()
167
client = self.get_client()
168
client.connect((server.host, server.port))
169
self.assertIs(None, client.write('ping\n'))
171
self.assertClientAddr(client, server, 0)
172
self.assertEquals('pong\n', resp)
174
def test_server_fails_to_start(self):
175
class CantStart(Exception):
178
class CantStartServer(test_server.TestingTCPServer):
180
def server_bind(self):
183
# The exception is raised in the main thread
184
self.assertRaises(CantStart,
185
self.get_server, server_class=CantStartServer)
187
def test_server_fails_while_serving_or_stopping(self):
188
class CantConnect(Exception):
191
class FailingConnectionHandler(TCPConnectionHandler):
196
server = self.get_server(
197
connection_handler_class=FailingConnectionHandler)
198
# The server won't fail until a client connect
199
client = self.get_client()
200
client.connect((server.host, server.port))
202
# Now we must force the server to answer by sending the request and
203
# waiting for some answer. But since we don't control when the
204
# server thread will be given cycles, we don't control either
205
# whether our reads or writes may hang.
206
client.sock.settimeout(0.1)
207
client.write('ping\n')
211
# Now the server has raised the exception in its own thread
212
self.assertRaises(CantConnect, server.stop_server)
214
def test_server_crash_while_responding(self):
215
sync = threading.Event()
217
class FailToRespond(Exception):
220
class FailingDuringResponseHandler(TCPConnectionHandler):
222
def handle_connection(self):
223
req = self.rfile.readline()
224
threading.currentThread().set_ready_event(sync)
225
raise FailToRespond()
227
server = self.get_server(
228
connection_handler_class=FailingDuringResponseHandler)
229
client = self.get_client()
230
client.connect((server.host, server.port))
231
client.write('ping\n')
233
self.assertRaises(FailToRespond, server.pending_exception)
235
def test_exception_swallowed_while_serving(self):
236
sync = threading.Event()
238
class CantServe(Exception):
241
class FailingWhileServingConnectionHandler(TCPConnectionHandler):
244
# We want to sync with the thread that is serving the
246
threading.currentThread().set_ready_event(sync)
249
server = self.get_server(
250
connection_handler_class=FailingWhileServingConnectionHandler)
251
# Install the exception swallower
252
server.set_ignored_exceptions(CantServe)
253
client = self.get_client()
254
# Connect to the server so the exception is raised there
255
client.connect((server.host, server.port))
256
# Wait for the exception to propagate.
258
# The connection wasn't served properly but the exception should have
260
server.pending_exception()