178
254
client_medium._accept_bytes('abc')
179
255
self.assertEqual('abc', output.getvalue())
257
def test_simple_pipes__accept_bytes_subprocess_closed(self):
258
# It is unfortunate that we have to use Popen for this. However,
259
# os.pipe() does not behave the same as subprocess.Popen().
260
# On Windows, if you use os.pipe() and close the write side,
261
# read.read() hangs. On Linux, read.read() returns the empty string.
262
p = subprocess.Popen([sys.executable, '-c',
264
'sys.stdout.write(sys.stdin.read(4))\n'
265
'sys.stdout.close()\n'],
266
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
267
client_medium = medium.SmartSimplePipesClientMedium(
268
p.stdout, p.stdin, 'base')
269
client_medium._accept_bytes('abc\n')
270
self.assertEqual('abc', client_medium._read_bytes(3))
272
# While writing to the underlying pipe,
273
# Windows py2.6.6 we get IOError(EINVAL)
274
# Lucid py2.6.5, we get IOError(EPIPE)
275
# In both cases, it should be wrapped to ConnectionReset
276
self.assertRaises(errors.ConnectionReset,
277
client_medium._accept_bytes, 'more')
279
def test_simple_pipes__accept_bytes_pipe_closed(self):
280
child_read, client_write = create_file_pipes()
281
client_medium = medium.SmartSimplePipesClientMedium(
282
None, client_write, 'base')
283
client_medium._accept_bytes('abc\n')
284
self.assertEqual('abc\n', child_read.read(4))
285
# While writing to the underlying pipe,
286
# Windows py2.6.6 we get IOError(EINVAL)
287
# Lucid py2.6.5, we get IOError(EPIPE)
288
# In both cases, it should be wrapped to ConnectionReset
290
self.assertRaises(errors.ConnectionReset,
291
client_medium._accept_bytes, 'more')
293
def test_simple_pipes__flush_pipe_closed(self):
294
child_read, client_write = create_file_pipes()
295
client_medium = medium.SmartSimplePipesClientMedium(
296
None, client_write, 'base')
297
client_medium._accept_bytes('abc\n')
299
# Even though the pipe is closed, flush on the write side seems to be a
300
# no-op, rather than a failure.
301
client_medium._flush()
303
def test_simple_pipes__flush_subprocess_closed(self):
304
p = subprocess.Popen([sys.executable, '-c',
306
'sys.stdout.write(sys.stdin.read(4))\n'
307
'sys.stdout.close()\n'],
308
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
309
client_medium = medium.SmartSimplePipesClientMedium(
310
p.stdout, p.stdin, 'base')
311
client_medium._accept_bytes('abc\n')
313
# Even though the child process is dead, flush seems to be a no-op.
314
client_medium._flush()
316
def test_simple_pipes__read_bytes_pipe_closed(self):
317
child_read, client_write = create_file_pipes()
318
client_medium = medium.SmartSimplePipesClientMedium(
319
child_read, client_write, 'base')
320
client_medium._accept_bytes('abc\n')
322
self.assertEqual('abc\n', client_medium._read_bytes(4))
323
self.assertEqual('', client_medium._read_bytes(4))
325
def test_simple_pipes__read_bytes_subprocess_closed(self):
326
p = subprocess.Popen([sys.executable, '-c',
328
'if sys.platform == "win32":\n'
329
' import msvcrt, os\n'
330
' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
331
' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
332
'sys.stdout.write(sys.stdin.read(4))\n'
333
'sys.stdout.close()\n'],
334
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
335
client_medium = medium.SmartSimplePipesClientMedium(
336
p.stdout, p.stdin, 'base')
337
client_medium._accept_bytes('abc\n')
339
self.assertEqual('abc\n', client_medium._read_bytes(4))
340
self.assertEqual('', client_medium._read_bytes(4))
181
342
def test_simple_pipes_client_disconnect_does_nothing(self):
182
343
# calling disconnect does nothing.
183
344
input = StringIO()
618
822
super(TestSmartServerStreamMedium, self).setUp()
619
823
self.overrideEnv('BZR_NO_SMART_VFS', None)
621
def portable_socket_pair(self):
622
"""Return a pair of TCP sockets connected to each other.
624
Unlike socket.socketpair, this should work on Windows.
626
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
627
listen_sock.bind(('127.0.0.1', 0))
628
listen_sock.listen(1)
629
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
630
client_sock.connect(listen_sock.getsockname())
631
server_sock, addr = listen_sock.accept()
633
return server_sock, client_sock
825
def create_pipe_medium(self, to_server, from_server, transport,
827
"""Create a new SmartServerPipeStreamMedium."""
828
return medium.SmartServerPipeStreamMedium(to_server, from_server,
829
transport, timeout=timeout)
831
def create_pipe_context(self, to_server_bytes, transport):
832
"""Create a SmartServerSocketStreamMedium.
834
This differes from create_pipe_medium, in that we initialize the
835
request that is sent to the server, and return the StringIO class that
836
will hold the response.
838
to_server = StringIO(to_server_bytes)
839
from_server = StringIO()
840
m = self.create_pipe_medium(to_server, from_server, transport)
841
return m, from_server
843
def create_socket_medium(self, server_sock, transport, timeout=4.0):
844
"""Initialize a new medium.SmartServerSocketStreamMedium."""
845
return medium.SmartServerSocketStreamMedium(server_sock, transport,
848
def create_socket_context(self, transport, timeout=4.0):
849
"""Create a new SmartServerSocketStreamMedium with default context.
851
This will call portable_socket_pair and pass the server side to
852
create_socket_medium along with transport.
853
It then returns the client_sock and the server.
855
server_sock, client_sock = portable_socket_pair()
856
server = self.create_socket_medium(server_sock, transport,
858
return server, client_sock
635
860
def test_smart_query_version(self):
636
861
"""Feed a canned query version to a server"""
637
862
# wire-to-wire, using the whole stack
638
to_server = StringIO('hello\n')
639
from_server = StringIO()
640
863
transport = local.LocalTransport(urlutils.local_path_to_url('/'))
641
server = medium.SmartServerPipeStreamMedium(
642
to_server, from_server, transport)
864
server, from_server = self.create_pipe_context('hello\n', transport)
643
865
smart_protocol = protocol.SmartServerRequestProtocolOne(transport,
644
866
from_server.write)
645
867
server._serve_one_request(smart_protocol)
934
1125
server_protocol = self.build_protocol_socket('bzr request 2\n')
935
1126
self.assertProtocolTwo(server_protocol)
1128
def test__build_protocol_returns_if_stopping(self):
1129
# _build_protocol should notice that we are stopping, and return
1130
# without waiting for bytes from the client.
1131
server, client_sock = self.create_socket_context(None)
1132
server._stop_gracefully()
1133
self.assertIs(None, server._build_protocol())
1135
def test_socket_set_timeout(self):
1136
server, _ = self.create_socket_context(None, timeout=1.23)
1137
self.assertEqual(1.23, server._client_timeout)
1139
def test_pipe_set_timeout(self):
1140
server = self.create_pipe_medium(None, None, None,
1142
self.assertEqual(1.23, server._client_timeout)
1144
def test_socket_wait_for_bytes_with_timeout_with_data(self):
1145
server, client_sock = self.create_socket_context(None)
1146
client_sock.sendall('data\n')
1147
# This should not block or consume any actual content
1148
self.assertFalse(server._wait_for_bytes_with_timeout(0.1))
1149
data = server.read_bytes(5)
1150
self.assertEqual('data\n', data)
1152
def test_socket_wait_for_bytes_with_timeout_no_data(self):
1153
server, client_sock = self.create_socket_context(None)
1154
# This should timeout quickly, reporting that there wasn't any data
1155
self.assertRaises(errors.ConnectionTimeout,
1156
server._wait_for_bytes_with_timeout, 0.01)
1158
data = server.read_bytes(1)
1159
self.assertEqual('', data)
1161
def test_socket_wait_for_bytes_with_timeout_closed(self):
1162
server, client_sock = self.create_socket_context(None)
1163
# With the socket closed, this should return right away.
1164
# It seems select.select() returns that you *can* read on the socket,
1165
# even though it closed. Presumably as a way to tell it is closed?
1166
# Testing shows that without sock.close() this times-out failing the
1167
# test, but with it, it returns False immediately.
1169
self.assertFalse(server._wait_for_bytes_with_timeout(10))
1170
data = server.read_bytes(1)
1171
self.assertEqual('', data)
1173
def test_socket_wait_for_bytes_with_shutdown(self):
1174
server, client_sock = self.create_socket_context(None)
1176
# Override the _timer functionality, so that time never increments,
1177
# this way, we can be sure we stopped because of the flag, and not
1178
# because of a timeout, etc.
1179
server._timer = lambda: t
1180
server._client_poll_timeout = 0.1
1181
server._stop_gracefully()
1182
server._wait_for_bytes_with_timeout(1.0)
1184
def test_socket_serve_timeout_closes_socket(self):
1185
server, client_sock = self.create_socket_context(None, timeout=0.1)
1186
# This should timeout quickly, and then close the connection so that
1187
# client_sock recv doesn't block.
1189
self.assertEqual('', client_sock.recv(1))
1191
def test_pipe_wait_for_bytes_with_timeout_with_data(self):
1192
# We intentionally use a real pipe here, so that we can 'select' on it.
1193
# You can't select() on a StringIO
1194
(r_server, w_client) = os.pipe()
1195
self.addCleanup(os.close, w_client)
1196
with os.fdopen(r_server, 'rb') as rf_server:
1197
server = self.create_pipe_medium(
1198
rf_server, None, None)
1199
os.write(w_client, 'data\n')
1200
# This should not block or consume any actual content
1201
server._wait_for_bytes_with_timeout(0.1)
1202
data = server.read_bytes(5)
1203
self.assertEqual('data\n', data)
1205
def test_pipe_wait_for_bytes_with_timeout_no_data(self):
1206
# We intentionally use a real pipe here, so that we can 'select' on it.
1207
# You can't select() on a StringIO
1208
(r_server, w_client) = os.pipe()
1209
# We can't add an os.close cleanup here, because we need to control
1210
# when the file handle gets closed ourselves.
1211
with os.fdopen(r_server, 'rb') as rf_server:
1212
server = self.create_pipe_medium(
1213
rf_server, None, None)
1214
if sys.platform == 'win32':
1215
# Windows cannot select() on a pipe, so we just always return
1216
server._wait_for_bytes_with_timeout(0.01)
1218
self.assertRaises(errors.ConnectionTimeout,
1219
server._wait_for_bytes_with_timeout, 0.01)
1221
data = server.read_bytes(5)
1222
self.assertEqual('', data)
1224
def test_pipe_wait_for_bytes_no_fileno(self):
1225
server, _ = self.create_pipe_context('', None)
1226
# Our file doesn't support polling, so we should always just return
1227
# 'you have data to consume.
1228
server._wait_for_bytes_with_timeout(0.01)
938
1231
class TestGetProtocolFactoryForBytes(tests.TestCase):
939
1232
"""_get_protocol_factory_for_bytes identifies the protocol factory a server
970
1263
class TestSmartTCPServer(tests.TestCase):
1265
def make_server(self):
1266
"""Create a SmartTCPServer that we can exercise.
1268
Note: we don't use SmartTCPServer_for_testing because the testing
1269
version overrides lots of functionality like 'serve', and we want to
1270
test the raw service.
1272
This will start the server in another thread, and wait for it to
1273
indicate it has finished starting up.
1275
:return: (server, server_thread)
1277
t = _mod_transport.get_transport_from_url('memory:///')
1278
server = _mod_server.SmartTCPServer(t, client_timeout=4.0)
1279
server._ACCEPT_TIMEOUT = 0.1
1280
# We don't use 'localhost' because that might be an IPv6 address.
1281
server.start_server('127.0.0.1', 0)
1282
server_thread = threading.Thread(target=server.serve,
1284
server_thread.start()
1285
# Ensure this gets called at some point
1286
self.addCleanup(server._stop_gracefully)
1287
server._started.wait()
1288
return server, server_thread
1290
def ensure_client_disconnected(self, client_sock):
1291
"""Ensure that a socket is closed, discarding all errors."""
1297
def connect_to_server(self, server):
1298
"""Create a client socket that can talk to the server."""
1299
client_sock = socket.socket()
1300
server_info = server._server_socket.getsockname()
1301
client_sock.connect(server_info)
1302
self.addCleanup(self.ensure_client_disconnected, client_sock)
1305
def connect_to_server_and_hangup(self, server):
1306
"""Connect to the server, and then hang up.
1307
That way it doesn't sit waiting for 'accept()' to timeout.
1309
# If the server has already signaled that the socket is closed, we
1310
# don't need to try to connect to it. Not being set, though, the server
1311
# might still close the socket while we try to connect to it. So we
1312
# still have to catch the exception.
1313
if server._stopped.isSet():
1316
client_sock = self.connect_to_server(server)
1318
except socket.error, e:
1319
# If the server has hung up already, that is fine.
1322
def say_hello(self, client_sock):
1323
"""Send the 'hello' smart RPC, and expect the response."""
1324
client_sock.send('hello\n')
1325
self.assertEqual('ok\x012\n', client_sock.recv(5))
1327
def shutdown_server_cleanly(self, server, server_thread):
1328
server._stop_gracefully()
1329
self.connect_to_server_and_hangup(server)
1330
server._stopped.wait()
1331
server._fully_stopped.wait()
1332
server_thread.join()
972
1334
def test_get_error_unexpected(self):
973
1335
"""Error reported by server with no specific representation"""
974
1336
self.overrideEnv('BZR_NO_SMART_VFS', None)
992
1354
t.get, 'something')
993
1355
self.assertContainsRe(str(err), 'some random exception')
1357
def test_propagates_timeout(self):
1358
server = _mod_server.SmartTCPServer(None, client_timeout=1.23)
1359
server_sock, client_sock = portable_socket_pair()
1360
handler = server._make_handler(server_sock)
1361
self.assertEqual(1.23, handler._client_timeout)
1363
def test_serve_conn_tracks_connections(self):
1364
server = _mod_server.SmartTCPServer(None, client_timeout=4.0)
1365
server_sock, client_sock = portable_socket_pair()
1366
server.serve_conn(server_sock, '-%s' % (self.id(),))
1367
self.assertEqual(1, len(server._active_connections))
1368
# We still want to talk on the connection. Polling should indicate it
1370
server._poll_active_connections()
1371
self.assertEqual(1, len(server._active_connections))
1372
# Closing the socket will end the active thread, and polling will
1373
# notice and remove it from the active set.
1375
server._poll_active_connections(0.1)
1376
self.assertEqual(0, len(server._active_connections))
1378
def test_serve_closes_out_finished_connections(self):
1379
server, server_thread = self.make_server()
1380
# The server is started, connect to it.
1381
client_sock = self.connect_to_server(server)
1382
# We send and receive on the connection, so that we know the
1383
# server-side has seen the connect, and started handling the
1385
self.say_hello(client_sock)
1386
self.assertEqual(1, len(server._active_connections))
1387
# Grab a handle to the thread that is processing our request
1388
_, server_side_thread = server._active_connections[0]
1389
# Close the connection, ask the server to stop, and wait for the
1390
# server to stop, as well as the thread that was servicing the
1393
# Wait for the server-side request thread to notice we are closed.
1394
server_side_thread.join()
1395
# Stop the server, it should notice the connection has finished.
1396
self.shutdown_server_cleanly(server, server_thread)
1397
# The server should have noticed that all clients are gone before
1399
self.assertEqual(0, len(server._active_connections))
1401
def test_serve_reaps_finished_connections(self):
1402
server, server_thread = self.make_server()
1403
client_sock1 = self.connect_to_server(server)
1404
# We send and receive on the connection, so that we know the
1405
# server-side has seen the connect, and started handling the
1407
self.say_hello(client_sock1)
1408
server_handler1, server_side_thread1 = server._active_connections[0]
1409
client_sock1.close()
1410
server_side_thread1.join()
1411
# By waiting until the first connection is fully done, the server
1412
# should notice after another connection that the first has finished.
1413
client_sock2 = self.connect_to_server(server)
1414
self.say_hello(client_sock2)
1415
server_handler2, server_side_thread2 = server._active_connections[-1]
1416
# There is a race condition. We know that client_sock2 has been
1417
# registered, but not that _poll_active_connections has been called. We
1418
# know that it will be called before the server will accept a new
1419
# connection, however. So connect one more time, and assert that we
1420
# either have 1 or 2 active connections (never 3), and that the 'first'
1421
# connection is not connection 1
1422
client_sock3 = self.connect_to_server(server)
1423
self.say_hello(client_sock3)
1424
# Copy the list, so we don't have it mutating behind our back
1425
conns = list(server._active_connections)
1426
self.assertEqual(2, len(conns))
1427
self.assertNotEqual((server_handler1, server_side_thread1), conns[0])
1428
self.assertEqual((server_handler2, server_side_thread2), conns[0])
1429
client_sock2.close()
1430
client_sock3.close()
1431
self.shutdown_server_cleanly(server, server_thread)
1433
def test_graceful_shutdown_waits_for_clients_to_stop(self):
1434
server, server_thread = self.make_server()
1435
# We need something big enough that it won't fit in a single recv. So
1436
# the server thread gets blocked writing content to the client until we
1437
# finish reading on the client.
1438
server.backing_transport.put_bytes('bigfile',
1440
client_sock = self.connect_to_server(server)
1441
self.say_hello(client_sock)
1442
_, server_side_thread = server._active_connections[0]
1443
# Start the RPC, but don't finish reading the response
1444
client_medium = medium.SmartClientAlreadyConnectedSocketMedium(
1445
'base', client_sock)
1446
client_client = client._SmartClient(client_medium)
1447
resp, response_handler = client_client.call_expecting_body('get',
1449
self.assertEqual(('ok',), resp)
1450
# Ask the server to stop gracefully, and wait for it.
1451
server._stop_gracefully()
1452
self.connect_to_server_and_hangup(server)
1453
server._stopped.wait()
1454
# It should not be accepting another connection.
1455
self.assertRaises(socket.error, self.connect_to_server, server)
1456
response_handler.read_body_bytes()
1458
server_side_thread.join()
1459
server_thread.join()
1460
self.assertTrue(server._fully_stopped.isSet())
1461
log = self.get_log()
1462
self.assertThat(log, DocTestMatches("""\
1463
INFO Requested to stop gracefully
1464
... Stopping SmartServerSocketStreamMedium(client=('127.0.0.1', ...
1465
INFO Waiting for 1 client(s) to finish
1466
""", flags=doctest.ELLIPSIS|doctest.REPORT_UDIFF))
1468
def test_stop_gracefully_tells_handlers_to_stop(self):
1469
server, server_thread = self.make_server()
1470
client_sock = self.connect_to_server(server)
1471
self.say_hello(client_sock)
1472
server_handler, server_side_thread = server._active_connections[0]
1473
self.assertFalse(server_handler.finished)
1474
server._stop_gracefully()
1475
self.assertTrue(server_handler.finished)
1477
self.connect_to_server_and_hangup(server)
1478
server_thread.join()
996
1481
class SmartTCPTests(tests.TestCase):
997
1482
"""Tests for connection/end to end behaviour using the TCP server.
3753
class Test_SmartClientRequest(tests.TestCase):
3755
def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3756
response_io = StringIO(response)
3758
vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3759
fail_at_write=fail_at_write)
3760
ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3761
client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3762
smart_client = client._SmartClient(client_medium, headers={})
3763
return output, vendor, smart_client
3765
def make_response(self, args, body=None, body_stream=None):
3766
response_io = StringIO()
3767
response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3768
body_stream=body_stream)
3769
responder = protocol.ProtocolThreeResponder(response_io.write)
3770
responder.send_response(response)
3771
return response_io.getvalue()
3773
def test__call_doesnt_retry_append(self):
3774
response = self.make_response(('appended', '8'))
3775
output, vendor, smart_client = self.make_client_with_failing_medium(
3776
fail_at_write=False, response=response)
3777
smart_request = client._SmartClientRequest(smart_client, 'append',
3778
('foo', ''), body='content\n')
3779
self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3781
def test__call_retries_get_bytes(self):
3782
response = self.make_response(('ok',), 'content\n')
3783
output, vendor, smart_client = self.make_client_with_failing_medium(
3784
fail_at_write=False, response=response)
3785
smart_request = client._SmartClientRequest(smart_client, 'get',
3787
response, response_handler = smart_request._call(3)
3788
self.assertEqual(('ok',), response)
3789
self.assertEqual('content\n', response_handler.read_body_bytes())
3791
def test__call_noretry_get_bytes(self):
3792
debug.debug_flags.add('noretry')
3793
response = self.make_response(('ok',), 'content\n')
3794
output, vendor, smart_client = self.make_client_with_failing_medium(
3795
fail_at_write=False, response=response)
3796
smart_request = client._SmartClientRequest(smart_client, 'get',
3798
self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3800
def test__send_no_retry_pipes(self):
3801
client_read, server_write = create_file_pipes()
3802
server_read, client_write = create_file_pipes()
3803
client_medium = medium.SmartSimplePipesClientMedium(client_read,
3804
client_write, base='/')
3805
smart_client = client._SmartClient(client_medium)
3806
smart_request = client._SmartClientRequest(smart_client,
3808
# Close the server side
3810
encoder, response_handler = smart_request._construct_protocol(3)
3811
self.assertRaises(errors.ConnectionReset,
3812
smart_request._send_no_retry, encoder)
3814
def test__send_read_response_sockets(self):
3815
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3816
listen_sock.bind(('127.0.0.1', 0))
3817
listen_sock.listen(1)
3818
host, port = listen_sock.getsockname()
3819
client_medium = medium.SmartTCPClientMedium(host, port, '/')
3820
client_medium._ensure_connection()
3821
smart_client = client._SmartClient(client_medium)
3822
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3823
# Accept the connection, but don't actually talk to the client.
3824
server_sock, _ = listen_sock.accept()
3826
# Sockets buffer and don't really notice that the server has closed the
3827
# connection until we try to read again.
3828
handler = smart_request._send(3)
3829
self.assertRaises(errors.ConnectionReset,
3830
handler.read_response_tuple, expect_body=False)
3832
def test__send_retries_on_write(self):
3833
output, vendor, smart_client = self.make_client_with_failing_medium()
3834
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3835
handler = smart_request._send(3)
3836
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3837
'\x00\x00\x00\x02de' # empty headers
3838
's\x00\x00\x00\tl5:helloee',
3841
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3842
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3844
('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3845
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3849
def test__send_doesnt_retry_read_failure(self):
3850
output, vendor, smart_client = self.make_client_with_failing_medium(
3851
fail_at_write=False)
3852
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3853
handler = smart_request._send(3)
3854
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3855
'\x00\x00\x00\x02de' # empty headers
3856
's\x00\x00\x00\tl5:helloee',
3859
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3860
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3863
self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3865
def test__send_request_retries_body_stream_if_not_started(self):
3866
output, vendor, smart_client = self.make_client_with_failing_medium()
3867
smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3868
body_stream=['a', 'b'])
3869
response_handler = smart_request._send(3)
3870
# We connect, get disconnected, and notice before consuming the stream,
3871
# so we try again one time and succeed.
3873
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3874
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3876
('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3877
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3880
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3881
'\x00\x00\x00\x02de' # empty headers
3882
's\x00\x00\x00\tl5:helloe'
3883
'b\x00\x00\x00\x01a'
3884
'b\x00\x00\x00\x01b'
3888
def test__send_request_stops_if_body_started(self):
3889
# We intentionally use the python StringIO so that we can subclass it.
3890
from StringIO import StringIO
3891
response = StringIO()
3893
class FailAfterFirstWrite(StringIO):
3894
"""Allow one 'write' call to pass, fail the rest"""
3896
StringIO.__init__(self)
3902
return StringIO.write(self, s)
3903
raise IOError(errno.EINVAL, 'invalid file handle')
3904
output = FailAfterFirstWrite()
3906
vendor = FirstRejectedStringIOSSHVendor(response, output,
3907
fail_at_write=False)
3908
ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3909
client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3910
smart_client = client._SmartClient(client_medium, headers={})
3911
smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3912
body_stream=['a', 'b'])
3913
self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3914
# We connect, and manage to get to the point that we start consuming
3915
# the body stream. The next write fails, so we just stop.
3917
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3918
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3922
self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3923
'\x00\x00\x00\x02de' # empty headers
3924
's\x00\x00\x00\tl5:helloe',
3927
def test__send_disabled_retry(self):
3928
debug.debug_flags.add('noretry')
3929
output, vendor, smart_client = self.make_client_with_failing_medium()
3930
smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3931
self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3933
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3934
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3222
3940
class LengthPrefixedBodyDecoder(tests.TestCase):
3224
3942
# XXX: TODO: make accept_reading_trailer invoke translate_response or