177
253
client_medium._accept_bytes('abc')
178
254
self.assertEqual('abc', output.getvalue())
256
def test_simple_pipes__accept_bytes_subprocess_closed(self):
257
# It is unfortunate that we have to use Popen for this. However,
258
# os.pipe() does not behave the same as subprocess.Popen().
259
# On Windows, if you use os.pipe() and close the write side,
260
# read.read() hangs. On Linux, read.read() returns the empty string.
261
p = subprocess.Popen([sys.executable, '-c',
263
'sys.stdout.write(sys.stdin.read(4))\n'
264
'sys.stdout.close()\n'],
265
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
266
client_medium = medium.SmartSimplePipesClientMedium(
267
p.stdout, p.stdin, 'base')
268
client_medium._accept_bytes('abc\n')
269
self.assertEqual('abc', client_medium._read_bytes(3))
271
# While writing to the underlying pipe,
272
# Windows py2.6.6 we get IOError(EINVAL)
273
# Lucid py2.6.5, we get IOError(EPIPE)
274
# In both cases, it should be wrapped to ConnectionReset
275
self.assertRaises(errors.ConnectionReset,
276
client_medium._accept_bytes, 'more')
278
def test_simple_pipes__accept_bytes_pipe_closed(self):
279
child_read, client_write = create_file_pipes()
280
client_medium = medium.SmartSimplePipesClientMedium(
281
None, client_write, 'base')
282
client_medium._accept_bytes('abc\n')
283
self.assertEqual('abc\n', child_read.read(4))
284
# While writing to the underlying pipe,
285
# Windows py2.6.6 we get IOError(EINVAL)
286
# Lucid py2.6.5, we get IOError(EPIPE)
287
# In both cases, it should be wrapped to ConnectionReset
289
self.assertRaises(errors.ConnectionReset,
290
client_medium._accept_bytes, 'more')
292
def test_simple_pipes__flush_pipe_closed(self):
293
child_read, client_write = create_file_pipes()
294
client_medium = medium.SmartSimplePipesClientMedium(
295
None, client_write, 'base')
296
client_medium._accept_bytes('abc\n')
298
# Even though the pipe is closed, flush on the write side seems to be a
299
# no-op, rather than a failure.
300
client_medium._flush()
302
def test_simple_pipes__flush_subprocess_closed(self):
303
p = subprocess.Popen([sys.executable, '-c',
305
'sys.stdout.write(sys.stdin.read(4))\n'
306
'sys.stdout.close()\n'],
307
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
308
client_medium = medium.SmartSimplePipesClientMedium(
309
p.stdout, p.stdin, 'base')
310
client_medium._accept_bytes('abc\n')
312
# Even though the child process is dead, flush seems to be a no-op.
313
client_medium._flush()
315
def test_simple_pipes__read_bytes_pipe_closed(self):
316
child_read, client_write = create_file_pipes()
317
client_medium = medium.SmartSimplePipesClientMedium(
318
child_read, client_write, 'base')
319
client_medium._accept_bytes('abc\n')
321
self.assertEqual('abc\n', client_medium._read_bytes(4))
322
self.assertEqual('', client_medium._read_bytes(4))
324
def test_simple_pipes__read_bytes_subprocess_closed(self):
325
p = subprocess.Popen([sys.executable, '-c',
327
'if sys.platform == "win32":\n'
328
' import msvcrt, os\n'
329
' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
330
' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
331
'sys.stdout.write(sys.stdin.read(4))\n'
332
'sys.stdout.close()\n'],
333
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
334
client_medium = medium.SmartSimplePipesClientMedium(
335
p.stdout, p.stdin, 'base')
336
client_medium._accept_bytes('abc\n')
338
self.assertEqual('abc\n', client_medium._read_bytes(4))
339
self.assertEqual('', client_medium._read_bytes(4))
180
341
def test_simple_pipes_client_disconnect_does_nothing(self):
181
342
# calling disconnect does nothing.
182
343
input = StringIO()
617
821
super(TestSmartServerStreamMedium, self).setUp()
618
self._captureVar('BZR_NO_SMART_VFS', None)
620
def portable_socket_pair(self):
621
"""Return a pair of TCP sockets connected to each other.
623
Unlike socket.socketpair, this should work on Windows.
625
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
626
listen_sock.bind(('127.0.0.1', 0))
627
listen_sock.listen(1)
628
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
629
client_sock.connect(listen_sock.getsockname())
630
server_sock, addr = listen_sock.accept()
632
return server_sock, client_sock
822
self.overrideEnv('BZR_NO_SMART_VFS', None)
824
def create_pipe_medium(self, to_server, from_server, transport,
826
"""Create a new SmartServerPipeStreamMedium."""
827
return medium.SmartServerPipeStreamMedium(to_server, from_server,
828
transport, timeout=timeout)
830
def create_pipe_context(self, to_server_bytes, transport):
831
"""Create a SmartServerSocketStreamMedium.
833
This differes from create_pipe_medium, in that we initialize the
834
request that is sent to the server, and return the StringIO class that
835
will hold the response.
837
to_server = StringIO(to_server_bytes)
838
from_server = StringIO()
839
m = self.create_pipe_medium(to_server, from_server, transport)
840
return m, from_server
842
def create_socket_medium(self, server_sock, transport, timeout=4.0):
843
"""Initialize a new medium.SmartServerSocketStreamMedium."""
844
return medium.SmartServerSocketStreamMedium(server_sock, transport,
847
def create_socket_context(self, transport, timeout=4.0):
848
"""Create a new SmartServerSocketStreamMedium with default context.
850
This will call portable_socket_pair and pass the server side to
851
create_socket_medium along with transport.
852
It then returns the client_sock and the server.
854
server_sock, client_sock = portable_socket_pair()
855
server = self.create_socket_medium(server_sock, transport,
857
return server, client_sock
634
859
def test_smart_query_version(self):
635
860
"""Feed a canned query version to a server"""
636
861
# wire-to-wire, using the whole stack
637
to_server = StringIO('hello\n')
638
from_server = StringIO()
639
862
transport = local.LocalTransport(urlutils.local_path_to_url('/'))
640
server = medium.SmartServerPipeStreamMedium(
641
to_server, from_server, transport)
863
server, from_server = self.create_pipe_context('hello\n', transport)
642
864
smart_protocol = protocol.SmartServerRequestProtocolOne(transport,
643
865
from_server.write)
644
866
server._serve_one_request(smart_protocol)
933
1124
server_protocol = self.build_protocol_socket('bzr request 2\n')
934
1125
self.assertProtocolTwo(server_protocol)
1127
def test__build_protocol_returns_if_stopping(self):
1128
# _build_protocol should notice that we are stopping, and return
1129
# without waiting for bytes from the client.
1130
server, client_sock = self.create_socket_context(None)
1131
server._stop_gracefully()
1132
self.assertIs(None, server._build_protocol())
1134
def test_socket_set_timeout(self):
1135
server, _ = self.create_socket_context(None, timeout=1.23)
1136
self.assertEqual(1.23, server._client_timeout)
1138
def test_pipe_set_timeout(self):
1139
server = self.create_pipe_medium(None, None, None,
1141
self.assertEqual(1.23, server._client_timeout)
1143
def test_socket_wait_for_bytes_with_timeout_with_data(self):
1144
server, client_sock = self.create_socket_context(None)
1145
client_sock.sendall('data\n')
1146
# This should not block or consume any actual content
1147
self.assertFalse(server._wait_for_bytes_with_timeout(0.1))
1148
data = server.read_bytes(5)
1149
self.assertEqual('data\n', data)
1151
def test_socket_wait_for_bytes_with_timeout_no_data(self):
1152
server, client_sock = self.create_socket_context(None)
1153
# This should timeout quickly, reporting that there wasn't any data
1154
self.assertRaises(errors.ConnectionTimeout,
1155
server._wait_for_bytes_with_timeout, 0.01)
1157
data = server.read_bytes(1)
1158
self.assertEqual('', data)
1160
def test_socket_wait_for_bytes_with_timeout_closed(self):
1161
server, client_sock = self.create_socket_context(None)
1162
# With the socket closed, this should return right away.
1163
# It seems select.select() returns that you *can* read on the socket,
1164
# even though it closed. Presumably as a way to tell it is closed?
1165
# Testing shows that without sock.close() this times-out failing the
1166
# test, but with it, it returns False immediately.
1168
self.assertFalse(server._wait_for_bytes_with_timeout(10))
1169
data = server.read_bytes(1)
1170
self.assertEqual('', data)
1172
def test_socket_wait_for_bytes_with_shutdown(self):
1173
server, client_sock = self.create_socket_context(None)
1175
# Override the _timer functionality, so that time never increments,
1176
# this way, we can be sure we stopped because of the flag, and not
1177
# because of a timeout, etc.
1178
server._timer = lambda: t
1179
server._client_poll_timeout = 0.1
1180
server._stop_gracefully()
1181
server._wait_for_bytes_with_timeout(1.0)
1183
def test_socket_serve_timeout_closes_socket(self):
1184
server, client_sock = self.create_socket_context(None, timeout=0.1)
1185
# This should timeout quickly, and then close the connection so that
1186
# client_sock recv doesn't block.
1188
self.assertEqual('', client_sock.recv(1))
1190
def test_pipe_wait_for_bytes_with_timeout_with_data(self):
1191
# We intentionally use a real pipe here, so that we can 'select' on it.
1192
# You can't select() on a StringIO
1193
(r_server, w_client) = os.pipe()
1194
self.addCleanup(os.close, w_client)
1195
with os.fdopen(r_server, 'rb') as rf_server:
1196
server = self.create_pipe_medium(
1197
rf_server, None, None)
1198
os.write(w_client, 'data\n')
1199
# This should not block or consume any actual content
1200
server._wait_for_bytes_with_timeout(0.1)
1201
data = server.read_bytes(5)
1202
self.assertEqual('data\n', data)
1204
def test_pipe_wait_for_bytes_with_timeout_no_data(self):
1205
# We intentionally use a real pipe here, so that we can 'select' on it.
1206
# You can't select() on a StringIO
1207
(r_server, w_client) = os.pipe()
1208
# We can't add an os.close cleanup here, because we need to control
1209
# when the file handle gets closed ourselves.
1210
with os.fdopen(r_server, 'rb') as rf_server:
1211
server = self.create_pipe_medium(
1212
rf_server, None, None)
1213
if sys.platform == 'win32':
1214
# Windows cannot select() on a pipe, so we just always return
1215
server._wait_for_bytes_with_timeout(0.01)
1217
self.assertRaises(errors.ConnectionTimeout,
1218
server._wait_for_bytes_with_timeout, 0.01)
1220
data = server.read_bytes(5)
1221
self.assertEqual('', data)
1223
def test_pipe_wait_for_bytes_no_fileno(self):
1224
server, _ = self.create_pipe_context('', None)
1225
# Our file doesn't support polling, so we should always just return
1226
# 'you have data to consume.
1227
server._wait_for_bytes_with_timeout(0.01)
937
1230
class TestGetProtocolFactoryForBytes(tests.TestCase):
938
1231
"""_get_protocol_factory_for_bytes identifies the protocol factory a server
969
1262
class TestSmartTCPServer(tests.TestCase):
1264
def make_server(self):
1265
"""Create a SmartTCPServer that we can exercise.
1267
Note: we don't use SmartTCPServer_for_testing because the testing
1268
version overrides lots of functionality like 'serve', and we want to
1269
test the raw service.
1271
This will start the server in another thread, and wait for it to
1272
indicate it has finished starting up.
1274
:return: (server, server_thread)
1276
t = _mod_transport.get_transport_from_url('memory:///')
1277
server = _mod_server.SmartTCPServer(t, client_timeout=4.0)
1278
server._ACCEPT_TIMEOUT = 0.1
1279
# We don't use 'localhost' because that might be an IPv6 address.
1280
server.start_server('127.0.0.1', 0)
1281
server_thread = threading.Thread(target=server.serve,
1283
server_thread.start()
1284
# Ensure this gets called at some point
1285
self.addCleanup(server._stop_gracefully)
1286
server._started.wait()
1287
return server, server_thread
1289
def ensure_client_disconnected(self, client_sock):
1290
"""Ensure that a socket is closed, discarding all errors."""
1296
def connect_to_server(self, server):
1297
"""Create a client socket that can talk to the server."""
1298
client_sock = socket.socket()
1299
server_info = server._server_socket.getsockname()
1300
client_sock.connect(server_info)
1301
self.addCleanup(self.ensure_client_disconnected, client_sock)
1304
def connect_to_server_and_hangup(self, server):
1305
"""Connect to the server, and then hang up.
1306
That way it doesn't sit waiting for 'accept()' to timeout.
1308
# If the server has already signaled that the socket is closed, we
1309
# don't need to try to connect to it. Not being set, though, the server
1310
# might still close the socket while we try to connect to it. So we
1311
# still have to catch the exception.
1312
if server._stopped.isSet():
1315
client_sock = self.connect_to_server(server)
1317
except socket.error, e:
1318
# If the server has hung up already, that is fine.
1321
def say_hello(self, client_sock):
1322
"""Send the 'hello' smart RPC, and expect the response."""
1323
client_sock.send('hello\n')
1324
self.assertEqual('ok\x012\n', client_sock.recv(5))
1326
def shutdown_server_cleanly(self, server, server_thread):
1327
server._stop_gracefully()
1328
self.connect_to_server_and_hangup(server)
1329
server._stopped.wait()
1330
server._fully_stopped.wait()
1331
server_thread.join()
971
1333
def test_get_error_unexpected(self):
972
1334
"""Error reported by server with no specific representation"""
973
self._captureVar('BZR_NO_SMART_VFS', None)
1335
self.overrideEnv('BZR_NO_SMART_VFS', None)
974
1336
class FlakyTransport(object):
976
1338
def external_url(self):
991
1353
t.get, 'something')
992
1354
self.assertContainsRe(str(err), 'some random exception')
1356
def test_propagates_timeout(self):
1357
server = _mod_server.SmartTCPServer(None, client_timeout=1.23)
1358
server_sock, client_sock = portable_socket_pair()
1359
handler = server._make_handler(server_sock)
1360
self.assertEqual(1.23, handler._client_timeout)
1362
def test_serve_conn_tracks_connections(self):
1363
server = _mod_server.SmartTCPServer(None, client_timeout=4.0)
1364
server_sock, client_sock = portable_socket_pair()
1365
server.serve_conn(server_sock, '-%s' % (self.id(),))
1366
self.assertEqual(1, len(server._active_connections))
1367
# We still want to talk on the connection. Polling should indicate it
1369
server._poll_active_connections()
1370
self.assertEqual(1, len(server._active_connections))
1371
# Closing the socket will end the active thread, and polling will
1372
# notice and remove it from the active set.
1374
server._poll_active_connections(0.1)
1375
self.assertEqual(0, len(server._active_connections))
1377
def test_serve_closes_out_finished_connections(self):
1378
server, server_thread = self.make_server()
1379
# The server is started, connect to it.
1380
client_sock = self.connect_to_server(server)
1381
# We send and receive on the connection, so that we know the
1382
# server-side has seen the connect, and started handling the
1384
self.say_hello(client_sock)
1385
self.assertEqual(1, len(server._active_connections))
1386
# Grab a handle to the thread that is processing our request
1387
_, server_side_thread = server._active_connections[0]
1388
# Close the connection, ask the server to stop, and wait for the
1389
# server to stop, as well as the thread that was servicing the
1392
# Wait for the server-side request thread to notice we are closed.
1393
server_side_thread.join()
1394
# Stop the server, it should notice the connection has finished.
1395
self.shutdown_server_cleanly(server, server_thread)
1396
# The server should have noticed that all clients are gone before
1398
self.assertEqual(0, len(server._active_connections))
1400
def test_serve_reaps_finished_connections(self):
1401
server, server_thread = self.make_server()
1402
client_sock1 = self.connect_to_server(server)
1403
# We send and receive on the connection, so that we know the
1404
# server-side has seen the connect, and started handling the
1406
self.say_hello(client_sock1)
1407
server_handler1, server_side_thread1 = server._active_connections[0]
1408
client_sock1.close()
1409
server_side_thread1.join()
1410
# By waiting until the first connection is fully done, the server
1411
# should notice after another connection that the first has finished.
1412
client_sock2 = self.connect_to_server(server)
1413
self.say_hello(client_sock2)
1414
server_handler2, server_side_thread2 = server._active_connections[-1]
1415
# There is a race condition. We know that client_sock2 has been
1416
# registered, but not that _poll_active_connections has been called. We
1417
# know that it will be called before the server will accept a new
1418
# connection, however. So connect one more time, and assert that we
1419
# either have 1 or 2 active connections (never 3), and that the 'first'
1420
# connection is not connection 1
1421
client_sock3 = self.connect_to_server(server)
1422
self.say_hello(client_sock3)
1423
# Copy the list, so we don't have it mutating behind our back
1424
conns = list(server._active_connections)
1425
self.assertEqual(2, len(conns))
1426
self.assertNotEqual((server_handler1, server_side_thread1), conns[0])
1427
self.assertEqual((server_handler2, server_side_thread2), conns[0])
1428
client_sock2.close()
1429
client_sock3.close()
1430
self.shutdown_server_cleanly(server, server_thread)
1432
def test_graceful_shutdown_waits_for_clients_to_stop(self):
1433
server, server_thread = self.make_server()
1434
# We need something big enough that it won't fit in a single recv. So
1435
# the server thread gets blocked writing content to the client until we
1436
# finish reading on the client.
1437
server.backing_transport.put_bytes('bigfile',
1439
client_sock = self.connect_to_server(server)
1440
self.say_hello(client_sock)
1441
_, server_side_thread = server._active_connections[0]
1442
# Start the RPC, but don't finish reading the response
1443
client_medium = medium.SmartClientAlreadyConnectedSocketMedium(
1444
'base', client_sock)
1445
client_client = client._SmartClient(client_medium)
1446
resp, response_handler = client_client.call_expecting_body('get',
1448
self.assertEqual(('ok',), resp)
1449
# Ask the server to stop gracefully, and wait for it.
1450
server._stop_gracefully()
1451
self.connect_to_server_and_hangup(server)
1452
server._stopped.wait()
1453
# It should not be accepting another connection.
1454
self.assertRaises(socket.error, self.connect_to_server, server)
1455
# It should also not be fully stopped
1456
server._fully_stopped.wait(0.01)
1457
self.assertFalse(server._fully_stopped.isSet())
1458
response_handler.read_body_bytes()
1460
server_side_thread.join()
1461
server_thread.join()
1462
self.assertTrue(server._fully_stopped.isSet())
1463
log = self.get_log()
1464
self.assertThat(log, DocTestMatches("""\
1465
INFO Requested to stop gracefully
1466
... Stopping SmartServerSocketStreamMedium(client=('127.0.0.1', ...
1467
INFO Waiting for 1 client(s) to finish
1468
""", flags=doctest.ELLIPSIS|doctest.REPORT_UDIFF))
1470
def test_stop_gracefully_tells_handlers_to_stop(self):
1471
server, server_thread = self.make_server()
1472
client_sock = self.connect_to_server(server)
1473
self.say_hello(client_sock)
1474
server_handler, server_side_thread = server._active_connections[0]
1475
self.assertFalse(server_handler.finished)
1476
server._stop_gracefully()
1477
self.assertTrue(server_handler.finished)
1479
self.connect_to_server_and_hangup(server)
1480
server_thread.join()
995
1483
class SmartTCPTests(tests.TestCase):
996
1484
"""Tests for connection/end to end behaviour using the TCP server.
3739
class Test_SmartClientRequest(tests.TestCase):
3741
def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3742
response_io = StringIO(response)
3744
vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3745
fail_at_write=fail_at_write)
3746
ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3747
client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3748
smart_client = client._SmartClient(client_medium, headers={})
3749
return output, vendor, smart_client
3751
def make_response(self, args, body=None, body_stream=None):
3752
response_io = StringIO()
3753
response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3754
body_stream=body_stream)
3755
responder = protocol.ProtocolThreeResponder(response_io.write)
3756
responder.send_response(response)
3757
return response_io.getvalue()
3759
def test__call_doesnt_retry_append(self):
3760
response = self.make_response(('appended', '8'))
3761
output, vendor, smart_client = self.make_client_with_failing_medium(
3762
fail_at_write=False, response=response)
3763
smart_request = client._SmartClientRequest(smart_client, 'append',
3764
('foo', ''), body='content\n')
3765
self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3767
def test__call_retries_get_bytes(self):
3768
response = self.make_response(('ok',), 'content\n')
3769
output, vendor, smart_client = self.make_client_with_failing_medium(
3770
fail_at_write=False, response=response)
3771
smart_request = client._SmartClientRequest(smart_client, 'get',
3773
response, response_handler = smart_request._call(3)
3774
self.assertEqual(('ok',), response)
3775
self.assertEqual('content\n', response_handler.read_body_bytes())
3777
def test__call_noretry_get_bytes(self):
3778
debug.debug_flags.add('noretry')
3779
response = self.make_response(('ok',), 'content\n')
3780
output, vendor, smart_client = self.make_client_with_failing_medium(
3781
fail_at_write=False, response=response)
3782
smart_request = client._SmartClientRequest(smart_client, 'get',
3784
self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3786
def test__send_no_retry_pipes(self):
3787
client_read, server_write = create_file_pipes()
3788
server_read, client_write = create_file_pipes()
3789
client_medium = medium.SmartSimplePipesClientMedium(client_read,
3790
client_write, base='/')
3791
smart_client = client._SmartClient(client_medium)
3792
smart_request = client._SmartClientRequest(smart_client,
3794
# Close the server side
3796
encoder, response_handler = smart_request._construct_protocol(3)
3797
self.assertRaises(errors.ConnectionReset,
3798
smart_request._send_no_retry, encoder)
3800
def test__send_read_response_sockets(self):
3801
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3802
listen_sock.bind(('127.0.0.1', 0))
3803
listen_sock.listen(1)
3804
host, port = listen_sock.getsockname()
3805
client_medium = medium.SmartTCPClientMedium(host, port, '/')
3806
client_medium._ensure_connection()
3807
smart_client = client._SmartClient(client_medium)
3808
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3809
# Accept the connection, but don't actually talk to the client.
3810
server_sock, _ = listen_sock.accept()
3812
# Sockets buffer and don't really notice that the server has closed the
3813
# connection until we try to read again.
3814
handler = smart_request._send(3)
3815
self.assertRaises(errors.ConnectionReset,
3816
handler.read_response_tuple, expect_body=False)
3818
def test__send_retries_on_write(self):
3819
output, vendor, smart_client = self.make_client_with_failing_medium()
3820
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3821
handler = smart_request._send(3)
3822
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3823
'\x00\x00\x00\x02de' # empty headers
3824
's\x00\x00\x00\tl5:helloee',
3827
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3828
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3830
('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3831
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3835
def test__send_doesnt_retry_read_failure(self):
3836
output, vendor, smart_client = self.make_client_with_failing_medium(
3837
fail_at_write=False)
3838
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3839
handler = smart_request._send(3)
3840
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3841
'\x00\x00\x00\x02de' # empty headers
3842
's\x00\x00\x00\tl5:helloee',
3845
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3846
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3849
self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3851
def test__send_request_retries_body_stream_if_not_started(self):
3852
output, vendor, smart_client = self.make_client_with_failing_medium()
3853
smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3854
body_stream=['a', 'b'])
3855
response_handler = smart_request._send(3)
3856
# We connect, get disconnected, and notice before consuming the stream,
3857
# so we try again one time and succeed.
3859
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3860
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3862
('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3863
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3866
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3867
'\x00\x00\x00\x02de' # empty headers
3868
's\x00\x00\x00\tl5:helloe'
3869
'b\x00\x00\x00\x01a'
3870
'b\x00\x00\x00\x01b'
3874
def test__send_request_stops_if_body_started(self):
3875
# We intentionally use the python StringIO so that we can subclass it.
3876
from StringIO import StringIO
3877
response = StringIO()
3879
class FailAfterFirstWrite(StringIO):
3880
"""Allow one 'write' call to pass, fail the rest"""
3882
StringIO.__init__(self)
3888
return StringIO.write(self, s)
3889
raise IOError(errno.EINVAL, 'invalid file handle')
3890
output = FailAfterFirstWrite()
3892
vendor = FirstRejectedStringIOSSHVendor(response, output,
3893
fail_at_write=False)
3894
ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3895
client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3896
smart_client = client._SmartClient(client_medium, headers={})
3897
smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3898
body_stream=['a', 'b'])
3899
self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3900
# We connect, and manage to get to the point that we start consuming
3901
# the body stream. The next write fails, so we just stop.
3903
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3904
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3908
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3909
'\x00\x00\x00\x02de' # empty headers
3910
's\x00\x00\x00\tl5:helloe',
3913
def test__send_disabled_retry(self):
3914
debug.debug_flags.add('noretry')
3915
output, vendor, smart_client = self.make_client_with_failing_medium()
3916
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3917
self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3919
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3920
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3222
3926
class LengthPrefixedBodyDecoder(tests.TestCase):
3224
3928
# XXX: TODO: make accept_reading_trailer invoke translate_response or