1
# Copyright (C) 2010, 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
26
from bzrlib.tests import test_server
27
from bzrlib.tests.scenarios import load_tests_apply_scenarios
30
load_tests = load_tests_apply_scenarios
33
class TCPClient(object):
38
def connect(self, addr):
39
if self.sock is not None:
40
raise AssertionError('Already connected to %r'
41
% (self.sock.getsockname(),))
42
self.sock = osutils.connect_socket(addr)
45
if self.sock is not None:
47
self.sock.shutdown(socket.SHUT_RDWR)
49
except socket.error, e:
50
if e[0] in (errno.EBADF, errno.ENOTCONN):
51
# Right, the socket is already down
58
return self.sock.sendall(s)
60
def read(self, bufsize=4096):
61
return self.sock.recv(bufsize)
64
class TCPConnectionHandler(SocketServer.StreamRequestHandler):
68
self.handle_connection()
70
self.handle_connection()
72
def handle_connection(self):
73
req = self.rfile.readline()
77
self.wfile.write('pong\n')
79
raise ValueError('[%s] not understood' % req)
82
class TestTCPServerInAThread(tests.TestCase):
85
(name, {'server_class': getattr(test_server, name)})
87
('TestingTCPServer', 'TestingThreadingTCPServer')]
89
def get_server(self, server_class=None, connection_handler_class=None):
90
if server_class is not None:
91
self.server_class = server_class
92
if connection_handler_class is None:
93
connection_handler_class = TCPConnectionHandler
94
server = test_server.TestingTCPServerInAThread(
95
('localhost', 0), self.server_class, connection_handler_class)
97
self.addCleanup(server.stop_server)
100
def get_client(self):
102
self.addCleanup(client.disconnect)
105
def get_server_connection(self, server, conn_rank):
106
return server.server.clients[conn_rank]
108
def assertClientAddr(self, client, server, conn_rank):
109
conn = self.get_server_connection(server, conn_rank)
110
self.assertEquals(client.sock.getsockname(), conn[1])
112
def test_start_stop(self):
113
server = self.get_server()
114
client = self.get_client()
116
# since the server doesn't accept connections anymore attempting to
117
# connect should fail
118
client = self.get_client()
119
self.assertRaises(socket.error,
120
client.connect, (server.host, server.port))
122
def test_client_talks_server_respond(self):
123
server = self.get_server()
124
client = self.get_client()
125
client.connect((server.host, server.port))
126
self.assertIs(None, client.write('ping\n'))
128
self.assertClientAddr(client, server, 0)
129
self.assertEquals('pong\n', resp)
131
def test_server_fails_to_start(self):
132
class CantStart(Exception):
135
class CantStartServer(test_server.TestingTCPServer):
137
def server_bind(self):
140
# The exception is raised in the main thread
141
self.assertRaises(CantStart,
142
self.get_server, server_class=CantStartServer)
144
def test_server_fails_while_serving_or_stopping(self):
145
class CantConnect(Exception):
148
class FailingConnectionHandler(TCPConnectionHandler):
153
server = self.get_server(
154
connection_handler_class=FailingConnectionHandler)
155
# The server won't fail until a client connect
156
client = self.get_client()
157
client.connect((server.host, server.port))
159
# Now we must force the server to answer by sending the request and
160
# waiting for some answer. But since we don't control when the
161
# server thread will be given cycles, we don't control either
162
# whether our reads or writes may hang.
163
client.sock.settimeout(0.1)
164
client.write('ping\n')
168
# Now the server has raised the exception in its own thread
169
self.assertRaises(CantConnect, server.stop_server)
171
def test_server_crash_while_responding(self):
172
# We want to ensure the exception has been caught
173
caught = threading.Event()
175
# The thread that will serve the client, this needs to be an attribute
176
# so the handler below can modify it when it's executed (it's
177
# instantiated when the request is processed)
178
self.connection_thread = None
180
class FailToRespond(Exception):
183
class FailingDuringResponseHandler(TCPConnectionHandler):
185
def handle_connection(request):
186
req = request.rfile.readline()
187
# Capture the thread and make it use 'caught' so we can wait on
188
# the even that will be set when the exception is caught. We
189
# also capture the thread to know where to look.
190
self.connection_thread = threading.currentThread()
191
self.connection_thread.set_sync_event(caught)
192
raise FailToRespond()
194
server = self.get_server(
195
connection_handler_class=FailingDuringResponseHandler)
196
client = self.get_client()
197
client.connect((server.host, server.port))
198
client.write('ping\n')
199
# Wait for the exception to be caught
201
# Check that the connection thread did catch the exception,
202
# http://pad.lv/869366 was wrongly checking the server thread which
203
# works for TestingTCPServer where the connection is handled in the
204
# same thread than the server one but is racy for
205
# TestingThreadingTCPServer where the server thread may be in a
206
# blocking accept() call (or not).
208
self.connection_thread.pending_exception()
209
except FailToRespond:
210
# Great, the test succeeded
213
# If the exception is not in the connection thread anymore, it's in
215
server.server.stopped.wait()
216
# The exception is available now
217
self.assertRaises(FailToRespond, server.pending_exception)
219
def test_exception_swallowed_while_serving(self):
220
# We need to ensure the exception has been caught
221
caught = threading.Event()
223
# The thread that will serve the client, this needs to be an attribute
224
# so the handler below can access it when it's executed (it's
225
# instantiated when the request is processed)
226
self.connection_thread = None
227
class CantServe(Exception):
230
class FailingWhileServingConnectionHandler(TCPConnectionHandler):
233
# Capture the thread and make it use 'caught' so we can wait on
234
# the even that will be set when the exception is caught. We
235
# also capture the thread to know where to look.
236
self.connection_thread = threading.currentThread()
237
self.connection_thread.set_sync_event(caught)
240
server = self.get_server(
241
connection_handler_class=FailingWhileServingConnectionHandler)
242
# Install the exception swallower
243
server.set_ignored_exceptions(CantServe)
244
client = self.get_client()
245
# Connect to the server so the exception is raised there
246
client.connect((server.host, server.port))
247
# Wait for the exception to be caught
249
# The connection wasn't served properly but the exception should have
250
# been swallowed (see test_server_crash_while_responding remark about
251
# http://pad.lv/869366 explaining why we can't check the server thread
252
# here). More precisely, the exception *has* been caught and captured
253
# but it is cleared when joining the thread (or trying to acquire the
254
# exception) and as such won't propagate to the server thread.
255
self.connection_thread.pending_exception()
256
server.pending_exception()