19
19
# all of this deals with byte strings so this is safe
20
20
from cStringIO import StringIO
25
26
from bzrlib import (
33
from bzrlib.tests.HTTPTestUtil import (
31
37
from bzrlib.transport import (
39
class SmartClientTests(tests.TestCase):
41
def test_construct_smart_stream_client(self):
42
# make a new client; this really wants a connector function that returns
43
# two fifos or sockets but the constructor should not do any IO
44
client = smart.SmartStreamClient(None)
47
class TCPClientTests(tests.TestCaseWithTransport):
43
from bzrlib.transport.http import SmartClientHTTPMediumRequest
46
class StringIOSSHVendor(object):
47
"""A SSH vendor that uses StringIO to buffer writes and answer reads."""
49
def __init__(self, read_from, write_to):
50
self.read_from = read_from
51
self.write_to = write_to
54
def connect_ssh(self, username, password, host, port, command):
55
self.calls.append(('connect_ssh', username, password, host, port,
57
return StringIOSSHConnection(self)
60
class StringIOSSHConnection(object):
61
"""A SSH connection that uses StringIO to buffer writes and answer reads."""
63
def __init__(self, vendor):
67
self.vendor.calls.append(('close', ))
69
def get_filelike_channels(self):
70
return self.vendor.read_from, self.vendor.write_to
74
class SmartClientMediumTests(tests.TestCase):
75
"""Tests for SmartClientMedium.
77
We should create a test scenario for this: we need a server module that
78
construct the test-servers (like make_loopsocket_and_medium), and the list
79
of SmartClientMedium classes to test.
82
def make_loopsocket_and_medium(self):
83
"""Create a loopback socket for testing, and a medium aimed at it."""
84
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
85
sock.bind(('127.0.0.1', 0))
87
port = sock.getsockname()[1]
88
medium = smart.SmartTCPClientMedium('127.0.0.1', port)
91
def receive_bytes_on_server(self, sock, bytes):
92
"""Accept a connection on sock and read 3 bytes.
94
The bytes are appended to the list bytes.
96
:return: a Thread which is running to do the accept and recv.
98
def _receive_bytes_on_server():
99
connection, address = sock.accept()
100
bytes.append(osutils.recv_all(connection, 3))
102
t = threading.Thread(target=_receive_bytes_on_server)
106
def test_construct_smart_stream_medium_client(self):
107
# make a new instance of the common base for Stream-like Mediums.
108
# this just ensures that the constructor stays parameter-free which
109
# is important for reuse : some subclasses will dynamically connect,
110
# others are always on, etc.
111
medium = smart.SmartClientStreamMedium()
113
def test_construct_smart_client_medium(self):
114
# the base client medium takes no parameters
115
medium = smart.SmartClientMedium()
117
def test_construct_smart_simple_pipes_client_medium(self):
118
# the SimplePipes client medium takes two pipes:
119
# readable pipe, writeable pipe.
120
# Constructing one should just save these and do nothing.
121
# We test this by passing in None.
122
medium = smart.SmartSimplePipesClientMedium(None, None)
124
def test_simple_pipes_client_request_type(self):
125
# SimplePipesClient should use SmartClientStreamMediumRequest's.
126
medium = smart.SmartSimplePipesClientMedium(None, None)
127
request = medium.get_request()
128
self.assertIsInstance(request, smart.SmartClientStreamMediumRequest)
130
def test_simple_pipes_client_get_concurrent_requests(self):
131
# the simple_pipes client does not support pipelined requests:
132
# but it does support serial requests: we construct one after
133
# another is finished. This is a smoke test testing the integration
134
# of the SmartClientStreamMediumRequest and the SmartClientStreamMedium
135
# classes - as the sibling classes share this logic, they do not have
136
# explicit tests for this.
138
medium = smart.SmartSimplePipesClientMedium(None, output)
139
request = medium.get_request()
140
request.finished_writing()
141
request.finished_reading()
142
request2 = medium.get_request()
143
request2.finished_writing()
144
request2.finished_reading()
146
def test_simple_pipes_client__accept_bytes_writes_to_writable(self):
147
# accept_bytes writes to the writeable pipe.
149
medium = smart.SmartSimplePipesClientMedium(None, output)
150
medium._accept_bytes('abc')
151
self.assertEqual('abc', output.getvalue())
153
def test_simple_pipes_client_disconnect_does_nothing(self):
154
# calling disconnect does nothing.
157
medium = smart.SmartSimplePipesClientMedium(input, output)
158
# send some bytes to ensure disconnecting after activity still does not
160
medium._accept_bytes('abc')
162
self.assertFalse(input.closed)
163
self.assertFalse(output.closed)
165
def test_simple_pipes_client_accept_bytes_after_disconnect(self):
166
# calling disconnect on the client does not alter the pipe that
167
# accept_bytes writes to.
170
medium = smart.SmartSimplePipesClientMedium(input, output)
171
medium._accept_bytes('abc')
173
medium._accept_bytes('abc')
174
self.assertFalse(input.closed)
175
self.assertFalse(output.closed)
176
self.assertEqual('abcabc', output.getvalue())
178
def test_simple_pipes_client_ignores_disconnect_when_not_connected(self):
179
# Doing a disconnect on a new (and thus unconnected) SimplePipes medium
181
medium = smart.SmartSimplePipesClientMedium(None, None)
184
def test_simple_pipes_client_can_always_read(self):
185
# SmartSimplePipesClientMedium is never disconnected, so read_bytes
186
# always tries to read from the underlying pipe.
187
input = StringIO('abcdef')
188
medium = smart.SmartSimplePipesClientMedium(input, None)
189
self.assertEqual('abc', medium.read_bytes(3))
191
self.assertEqual('def', medium.read_bytes(3))
193
def test_simple_pipes_client_supports__flush(self):
194
# invoking _flush on a SimplePipesClient should flush the output
195
# pipe. We test this by creating an output pipe that records
196
# flush calls made to it.
197
from StringIO import StringIO # get regular StringIO
201
def logging_flush(): flush_calls.append('flush')
202
output.flush = logging_flush
203
medium = smart.SmartSimplePipesClientMedium(input, output)
204
# this call is here to ensure we only flush once, not on every
205
# _accept_bytes call.
206
medium._accept_bytes('abc')
209
self.assertEqual(['flush'], flush_calls)
211
def test_construct_smart_ssh_client_medium(self):
212
# the SSH client medium takes:
213
# host, port, username, password, vendor
214
# Constructing one should just save these and do nothing.
215
# we test this by creating a empty bound socket and constructing
217
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
218
sock.bind(('127.0.0.1', 0))
219
unopened_port = sock.getsockname()[1]
220
# having vendor be invalid means that if it tries to connect via the
221
# vendor it will blow up.
222
medium = smart.SmartSSHClientMedium('127.0.0.1', unopened_port,
223
username=None, password=None, vendor="not a vendor")
226
def test_ssh_client_connects_on_first_use(self):
227
# The only thing that initiates a connection from the medium is giving
230
vendor = StringIOSSHVendor(StringIO(), output)
231
medium = smart.SmartSSHClientMedium('a hostname', 'a port', 'a username',
232
'a password', vendor)
233
medium._accept_bytes('abc')
234
self.assertEqual('abc', output.getvalue())
235
self.assertEqual([('connect_ssh', 'a username', 'a password',
236
'a hostname', 'a port',
237
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes'])],
240
def test_ssh_client_changes_command_when_BZR_REMOTE_PATH_is_set(self):
241
# The only thing that initiates a connection from the medium is giving
244
vendor = StringIOSSHVendor(StringIO(), output)
245
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
246
def cleanup_environ():
247
osutils.set_or_unset_env('BZR_REMOTE_PATH', orig_bzr_remote_path)
248
self.addCleanup(cleanup_environ)
249
os.environ['BZR_REMOTE_PATH'] = 'fugly'
250
medium = smart.SmartSSHClientMedium('a hostname', 'a port', 'a username',
251
'a password', vendor)
252
medium._accept_bytes('abc')
253
self.assertEqual('abc', output.getvalue())
254
self.assertEqual([('connect_ssh', 'a username', 'a password',
255
'a hostname', 'a port',
256
['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
259
def test_ssh_client_disconnect_does_so(self):
260
# calling disconnect should disconnect both the read_from and write_to
261
# file-like object it from the ssh connection.
264
vendor = StringIOSSHVendor(input, output)
265
medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
266
medium._accept_bytes('abc')
268
self.assertTrue(input.closed)
269
self.assertTrue(output.closed)
271
('connect_ssh', None, None, 'a hostname', None,
272
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
277
def test_ssh_client_disconnect_allows_reconnection(self):
278
# calling disconnect on the client terminates the connection, but should
279
# not prevent additional connections occuring.
280
# we test this by initiating a second connection after doing a
284
vendor = StringIOSSHVendor(input, output)
285
medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
286
medium._accept_bytes('abc')
288
# the disconnect has closed output, so we need a new output for the
289
# new connection to write to.
292
vendor.read_from = input2
293
vendor.write_to = output2
294
medium._accept_bytes('abc')
296
self.assertTrue(input.closed)
297
self.assertTrue(output.closed)
298
self.assertTrue(input2.closed)
299
self.assertTrue(output2.closed)
301
('connect_ssh', None, None, 'a hostname', None,
302
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
304
('connect_ssh', None, None, 'a hostname', None,
305
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
310
def test_ssh_client_ignores_disconnect_when_not_connected(self):
311
# Doing a disconnect on a new (and thus unconnected) SSH medium
312
# does not fail. It's ok to disconnect an unconnected medium.
313
medium = smart.SmartSSHClientMedium(None)
316
def test_ssh_client_raises_on_read_when_not_connected(self):
317
# Doing a read on a new (and thus unconnected) SSH medium raises
318
# MediumNotConnected.
319
medium = smart.SmartSSHClientMedium(None)
320
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
321
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
323
def test_ssh_client_supports__flush(self):
324
# invoking _flush on a SSHClientMedium should flush the output
325
# pipe. We test this by creating an output pipe that records
326
# flush calls made to it.
327
from StringIO import StringIO # get regular StringIO
331
def logging_flush(): flush_calls.append('flush')
332
output.flush = logging_flush
333
vendor = StringIOSSHVendor(input, output)
334
medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
335
# this call is here to ensure we only flush once, not on every
336
# _accept_bytes call.
337
medium._accept_bytes('abc')
340
self.assertEqual(['flush'], flush_calls)
342
def test_construct_smart_tcp_client_medium(self):
343
# the TCP client medium takes a host and a port. Constructing it won't
344
# connect to anything.
345
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
346
sock.bind(('127.0.0.1', 0))
347
unopened_port = sock.getsockname()[1]
348
medium = smart.SmartTCPClientMedium('127.0.0.1', unopened_port)
351
def test_tcp_client_connects_on_first_use(self):
352
# The only thing that initiates a connection from the medium is giving
354
sock, medium = self.make_loopsocket_and_medium()
356
t = self.receive_bytes_on_server(sock, bytes)
357
medium.accept_bytes('abc')
360
self.assertEqual(['abc'], bytes)
362
def test_tcp_client_disconnect_does_so(self):
363
# calling disconnect on the client terminates the connection.
364
# we test this by forcing a short read during a socket.MSG_WAITALL
365
# call: write 2 bytes, try to read 3, and then the client disconnects.
366
sock, medium = self.make_loopsocket_and_medium()
368
t = self.receive_bytes_on_server(sock, bytes)
369
medium.accept_bytes('ab')
373
self.assertEqual(['ab'], bytes)
374
# now disconnect again: this should not do anything, if disconnection
375
# really did disconnect.
378
def test_tcp_client_ignores_disconnect_when_not_connected(self):
379
# Doing a disconnect on a new (and thus unconnected) TCP medium
380
# does not fail. It's ok to disconnect an unconnected medium.
381
medium = smart.SmartTCPClientMedium(None, None)
384
def test_tcp_client_raises_on_read_when_not_connected(self):
385
# Doing a read on a new (and thus unconnected) TCP medium raises
386
# MediumNotConnected.
387
medium = smart.SmartTCPClientMedium(None, None)
388
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
389
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
391
def test_tcp_client_supports__flush(self):
392
# invoking _flush on a TCPClientMedium should do something useful.
393
# RBC 20060922 not sure how to test/tell in this case.
394
sock, medium = self.make_loopsocket_and_medium()
396
t = self.receive_bytes_on_server(sock, bytes)
397
# try with nothing buffered
399
medium._accept_bytes('ab')
400
# and with something sent.
405
self.assertEqual(['ab'], bytes)
406
# now disconnect again : this should not do anything, if disconnection
407
# really did disconnect.
411
class TestSmartClientStreamMediumRequest(tests.TestCase):
412
"""Tests the for SmartClientStreamMediumRequest.
414
SmartClientStreamMediumRequest is a helper for the three stream based
415
mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
416
those three mediums implement the interface it expects.
419
def test_accept_bytes_after_finished_writing_errors(self):
420
# calling accept_bytes after calling finished_writing raises
421
# WritingCompleted to prevent bad assumptions on stream environments
422
# breaking the needs of message-based environments.
424
medium = smart.SmartSimplePipesClientMedium(None, output)
425
request = smart.SmartClientStreamMediumRequest(medium)
426
request.finished_writing()
427
self.assertRaises(errors.WritingCompleted, request.accept_bytes, None)
429
def test_accept_bytes(self):
430
# accept bytes should invoke _accept_bytes on the stream medium.
431
# we test this by using the SimplePipes medium - the most trivial one
432
# and checking that the pipes get the data.
435
medium = smart.SmartSimplePipesClientMedium(input, output)
436
request = smart.SmartClientStreamMediumRequest(medium)
437
request.accept_bytes('123')
438
request.finished_writing()
439
request.finished_reading()
440
self.assertEqual('', input.getvalue())
441
self.assertEqual('123', output.getvalue())
443
def test_construct_sets_stream_request(self):
444
# constructing a SmartClientStreamMediumRequest on a StreamMedium sets
445
# the current request to the new SmartClientStreamMediumRequest
447
medium = smart.SmartSimplePipesClientMedium(None, output)
448
request = smart.SmartClientStreamMediumRequest(medium)
449
self.assertIs(medium._current_request, request)
451
def test_construct_while_another_request_active_throws(self):
452
# constructing a SmartClientStreamMediumRequest on a StreamMedium with
453
# a non-None _current_request raises TooManyConcurrentRequests.
455
medium = smart.SmartSimplePipesClientMedium(None, output)
456
medium._current_request = "a"
457
self.assertRaises(errors.TooManyConcurrentRequests,
458
smart.SmartClientStreamMediumRequest, medium)
460
def test_finished_read_clears_current_request(self):
461
# calling finished_reading clears the current request from the requests
464
medium = smart.SmartSimplePipesClientMedium(None, output)
465
request = smart.SmartClientStreamMediumRequest(medium)
466
request.finished_writing()
467
request.finished_reading()
468
self.assertEqual(None, medium._current_request)
470
def test_finished_read_before_finished_write_errors(self):
471
# calling finished_reading before calling finished_writing triggers a
472
# WritingNotComplete error.
473
medium = smart.SmartSimplePipesClientMedium(None, None)
474
request = smart.SmartClientStreamMediumRequest(medium)
475
self.assertRaises(errors.WritingNotComplete, request.finished_reading)
477
def test_read_bytes(self):
478
# read bytes should invoke _read_bytes on the stream medium.
479
# we test this by using the SimplePipes medium - the most trivial one
480
# and checking that the data is supplied. Its possible that a
481
# faulty implementation could poke at the pipe variables them selves,
482
# but we trust that this will be caught as it will break the integration
484
input = StringIO('321')
486
medium = smart.SmartSimplePipesClientMedium(input, output)
487
request = smart.SmartClientStreamMediumRequest(medium)
488
request.finished_writing()
489
self.assertEqual('321', request.read_bytes(3))
490
request.finished_reading()
491
self.assertEqual('', input.read())
492
self.assertEqual('', output.getvalue())
494
def test_read_bytes_before_finished_write_errors(self):
495
# calling read_bytes before calling finished_writing triggers a
496
# WritingNotComplete error because the Smart protocol is designed to be
497
# compatible with strict message based protocols like HTTP where the
498
# request cannot be submitted until the writing has completed.
499
medium = smart.SmartSimplePipesClientMedium(None, None)
500
request = smart.SmartClientStreamMediumRequest(medium)
501
self.assertRaises(errors.WritingNotComplete, request.read_bytes, None)
503
def test_read_bytes_after_finished_reading_errors(self):
504
# calling read_bytes after calling finished_reading raises
505
# ReadingCompleted to prevent bad assumptions on stream environments
506
# breaking the needs of message-based environments.
508
medium = smart.SmartSimplePipesClientMedium(None, output)
509
request = smart.SmartClientStreamMediumRequest(medium)
510
request.finished_writing()
511
request.finished_reading()
512
self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
515
class RemoteTransportTests(tests.TestCaseWithTransport):
50
super(TCPClientTests, self).setUp()
518
super(RemoteTransportTests, self).setUp()
51
519
# We're allowed to set the transport class here, so that we don't use
52
520
# the default or a parameterized class, but rather use the
53
521
# TestCaseWithTransport infrastructure to set up a smart server and
61
529
t = self.get_transport()
62
530
self.assertIsInstance(t, smart.SmartTransport)
64
def test_get_client_from_transport(self):
532
def test_get_medium_from_transport(self):
533
"""Remote transport has a medium always, which it can return."""
65
534
t = self.get_transport()
66
client = t.get_smart_client()
67
self.assertIsInstance(client, smart.SmartStreamClient)
70
class BasicSmartTests(tests.TestCase):
535
medium = t.get_smart_medium()
536
self.assertIsInstance(medium, smart.SmartClientMedium)
539
class ErrorRaisingProtocol(object):
541
def __init__(self, exception):
542
self.exception = exception
544
def next_read_size(self):
548
class SampleRequest(object):
550
def __init__(self, expected_bytes):
551
self.accepted_bytes = ''
552
self._finished_reading = False
553
self.expected_bytes = expected_bytes
554
self.excess_buffer = ''
556
def accept_bytes(self, bytes):
557
self.accepted_bytes += bytes
558
if self.accepted_bytes.startswith(self.expected_bytes):
559
self._finished_reading = True
560
self.excess_buffer = self.accepted_bytes[len(self.expected_bytes):]
562
def next_read_size(self):
563
if self._finished_reading:
569
class TestSmartServerStreamMedium(tests.TestCase):
571
def portable_socket_pair(self):
572
"""Return a pair of TCP sockets connected to each other.
574
Unlike socket.socketpair, this should work on Windows.
576
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
577
listen_sock.bind(('127.0.0.1', 0))
578
listen_sock.listen(1)
579
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
580
client_sock.connect(listen_sock.getsockname())
581
server_sock, addr = listen_sock.accept()
583
return server_sock, client_sock
72
585
def test_smart_query_version(self):
73
586
"""Feed a canned query version to a server"""
587
# wire-to-wire, using the whole stack
74
588
to_server = StringIO('hello\n')
75
589
from_server = StringIO()
76
server = smart.SmartStreamServer(to_server, from_server,
77
local.LocalTransport(urlutils.local_path_to_url('/')))
78
server._serve_one_request()
590
transport = local.LocalTransport(urlutils.local_path_to_url('/'))
591
server = smart.SmartServerPipeStreamMedium(
592
to_server, from_server, transport)
593
protocol = smart.SmartServerRequestProtocolOne(transport,
595
server._serve_one_request(protocol)
79
596
self.assertEqual('ok\0011\n',
80
597
from_server.getvalue())
82
def test_canned_get_response(self):
599
def test_response_to_canned_get(self):
83
600
transport = memory.MemoryTransport('memory:///')
84
601
transport.put_bytes('testfile', 'contents\nof\nfile\n')
85
602
to_server = StringIO('get\001./testfile\n')
86
603
from_server = StringIO()
87
server = smart.SmartStreamServer(to_server, from_server, transport)
88
server._serve_one_request()
89
self.assertEqual('ok\n'
91
'contents\nof\nfile\n'
93
from_server.getvalue())
604
server = smart.SmartServerPipeStreamMedium(
605
to_server, from_server, transport)
606
protocol = smart.SmartServerRequestProtocolOne(transport,
608
server._serve_one_request(protocol)
609
self.assertEqual('ok\n'
611
'contents\nof\nfile\n'
613
from_server.getvalue())
615
def test_response_to_canned_get_of_utf8(self):
616
# wire-to-wire, using the whole stack, with a UTF-8 filename.
617
transport = memory.MemoryTransport('memory:///')
618
utf8_filename = u'testfile\N{INTERROBANG}'.encode('utf-8')
619
transport.put_bytes(utf8_filename, 'contents\nof\nfile\n')
620
to_server = StringIO('get\001' + utf8_filename + '\n')
621
from_server = StringIO()
622
server = smart.SmartServerPipeStreamMedium(
623
to_server, from_server, transport)
624
protocol = smart.SmartServerRequestProtocolOne(transport,
626
server._serve_one_request(protocol)
627
self.assertEqual('ok\n'
629
'contents\nof\nfile\n'
631
from_server.getvalue())
633
def test_pipe_like_stream_with_bulk_data(self):
634
sample_request_bytes = 'command\n9\nbulk datadone\n'
635
to_server = StringIO(sample_request_bytes)
636
from_server = StringIO()
637
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
638
sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
639
server._serve_one_request(sample_protocol)
640
self.assertEqual('', from_server.getvalue())
641
self.assertEqual(sample_request_bytes, sample_protocol.accepted_bytes)
642
self.assertFalse(server.finished)
644
def test_socket_stream_with_bulk_data(self):
645
sample_request_bytes = 'command\n9\nbulk datadone\n'
646
server_sock, client_sock = self.portable_socket_pair()
647
server = smart.SmartServerSocketStreamMedium(
649
sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
650
client_sock.sendall(sample_request_bytes)
651
server._serve_one_request(sample_protocol)
653
self.assertEqual('', client_sock.recv(1))
654
self.assertEqual(sample_request_bytes, sample_protocol.accepted_bytes)
655
self.assertFalse(server.finished)
657
def test_pipe_like_stream_shutdown_detection(self):
658
to_server = StringIO('')
659
from_server = StringIO()
660
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
661
server._serve_one_request(SampleRequest('x'))
662
self.assertTrue(server.finished)
664
def test_socket_stream_shutdown_detection(self):
665
server_sock, client_sock = self.portable_socket_pair()
667
server = smart.SmartServerSocketStreamMedium(
669
server._serve_one_request(SampleRequest('x'))
670
self.assertTrue(server.finished)
672
def test_pipe_like_stream_with_two_requests(self):
673
# If two requests are read in one go, then two calls to
674
# _serve_one_request should still process both of them as if they had
675
# been received seperately.
676
sample_request_bytes = 'command\n'
677
to_server = StringIO(sample_request_bytes * 2)
678
from_server = StringIO()
679
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
680
first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
681
server._serve_one_request(first_protocol)
682
self.assertEqual(0, first_protocol.next_read_size())
683
self.assertEqual('', from_server.getvalue())
684
self.assertFalse(server.finished)
685
# Make a new protocol, call _serve_one_request with it to collect the
687
second_protocol = SampleRequest(expected_bytes=sample_request_bytes)
688
server._serve_one_request(second_protocol)
689
self.assertEqual('', from_server.getvalue())
690
self.assertEqual(sample_request_bytes, second_protocol.accepted_bytes)
691
self.assertFalse(server.finished)
693
def test_socket_stream_with_two_requests(self):
694
# If two requests are read in one go, then two calls to
695
# _serve_one_request should still process both of them as if they had
696
# been received seperately.
697
sample_request_bytes = 'command\n'
698
server_sock, client_sock = self.portable_socket_pair()
699
server = smart.SmartServerSocketStreamMedium(
701
first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
702
# Put two whole requests on the wire.
703
client_sock.sendall(sample_request_bytes * 2)
704
server._serve_one_request(first_protocol)
705
self.assertEqual(0, first_protocol.next_read_size())
706
self.assertFalse(server.finished)
707
# Make a new protocol, call _serve_one_request with it to collect the
709
second_protocol = SampleRequest(expected_bytes=sample_request_bytes)
710
stream_still_open = server._serve_one_request(second_protocol)
711
self.assertEqual(sample_request_bytes, second_protocol.accepted_bytes)
712
self.assertFalse(server.finished)
714
self.assertEqual('', client_sock.recv(1))
716
def test_pipe_like_stream_error_handling(self):
717
# Use plain python StringIO so we can monkey-patch the close method to
718
# not discard the contents.
719
from StringIO import StringIO
720
to_server = StringIO('')
721
from_server = StringIO()
725
from_server.close = close
726
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
727
fake_protocol = ErrorRaisingProtocol(Exception('boom'))
728
server._serve_one_request(fake_protocol)
729
self.assertEqual('', from_server.getvalue())
730
self.assertTrue(self.closed)
731
self.assertTrue(server.finished)
733
def test_socket_stream_error_handling(self):
734
# Use plain python StringIO so we can monkey-patch the close method to
735
# not discard the contents.
736
from StringIO import StringIO
737
server_sock, client_sock = self.portable_socket_pair()
738
server = smart.SmartServerSocketStreamMedium(
740
fake_protocol = ErrorRaisingProtocol(Exception('boom'))
741
server._serve_one_request(fake_protocol)
742
# recv should not block, because the other end of the socket has been
744
self.assertEqual('', client_sock.recv(1))
745
self.assertTrue(server.finished)
747
def test_pipe_like_stream_keyboard_interrupt_handling(self):
748
# Use plain python StringIO so we can monkey-patch the close method to
749
# not discard the contents.
750
to_server = StringIO('')
751
from_server = StringIO()
752
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
753
fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
755
KeyboardInterrupt, server._serve_one_request, fake_protocol)
756
self.assertEqual('', from_server.getvalue())
758
def test_socket_stream_keyboard_interrupt_handling(self):
759
server_sock, client_sock = self.portable_socket_pair()
760
server = smart.SmartServerSocketStreamMedium(
762
fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
764
KeyboardInterrupt, server._serve_one_request, fake_protocol)
766
self.assertEqual('', client_sock.recv(1))
769
class TestSmartTCPServer(tests.TestCase):
95
771
def test_get_error_unexpected(self):
96
772
"""Error reported by server with no specific representation"""
398
1120
one that should coalesce.
400
1122
self.assertOffsetSerialisation([], '',
401
self.smart_client, self.smart_server)
1123
self.client_protocol, self.smart_server_request)
402
1124
self.assertOffsetSerialisation([(1,2)], '1,2',
403
self.smart_client, self.smart_server)
1125
self.client_protocol, self.smart_server_request)
404
1126
self.assertOffsetSerialisation([(10,40), (0,5)], '10,40\n0,5',
405
self.smart_client, self.smart_server)
1127
self.client_protocol, self.smart_server_request)
406
1128
self.assertOffsetSerialisation([(1,2), (3,4), (100, 200)],
407
'1,2\n3,4\n100,200', self.smart_client, self.smart_server)
1129
'1,2\n3,4\n100,200', self.client_protocol, self.smart_server_request)
1131
def test_accept_bytes_of_bad_request_to_protocol(self):
1132
out_stream = StringIO()
1133
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1134
protocol.accept_bytes('abc')
1135
self.assertEqual('abc', protocol.in_buffer)
1136
protocol.accept_bytes('\n')
1137
self.assertEqual("error\x01Generic bzr smart protocol error: bad request"
1138
" 'abc'\n", out_stream.getvalue())
1139
self.assertTrue(protocol.has_dispatched)
1140
self.assertEqual(0, protocol.next_read_size())
1142
def test_accept_body_bytes_to_protocol(self):
1143
protocol = self.build_protocol_waiting_for_body()
1144
self.assertEqual(6, protocol.next_read_size())
1145
protocol.accept_bytes('7\nabc')
1146
self.assertEqual(9, protocol.next_read_size())
1147
protocol.accept_bytes('defgd')
1148
protocol.accept_bytes('one\n')
1149
self.assertEqual(0, protocol.next_read_size())
1150
self.assertTrue(self.end_received)
1152
def test_accept_request_and_body_all_at_once(self):
1153
mem_transport = memory.MemoryTransport()
1154
mem_transport.put_bytes('foo', 'abcdefghij')
1155
out_stream = StringIO()
1156
protocol = smart.SmartServerRequestProtocolOne(mem_transport,
1158
protocol.accept_bytes('readv\x01foo\n3\n3,3done\n')
1159
self.assertEqual(0, protocol.next_read_size())
1160
self.assertEqual('readv\n3\ndefdone\n', out_stream.getvalue())
1161
self.assertEqual('', protocol.excess_buffer)
1162
self.assertEqual('', protocol.in_buffer)
1164
def test_accept_excess_bytes_are_preserved(self):
1165
out_stream = StringIO()
1166
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1167
protocol.accept_bytes('hello\nhello\n')
1168
self.assertEqual("ok\x011\n", out_stream.getvalue())
1169
self.assertEqual("hello\n", protocol.excess_buffer)
1170
self.assertEqual("", protocol.in_buffer)
1172
def test_accept_excess_bytes_after_body(self):
1173
protocol = self.build_protocol_waiting_for_body()
1174
protocol.accept_bytes('7\nabcdefgdone\nX')
1175
self.assertTrue(self.end_received)
1176
self.assertEqual("X", protocol.excess_buffer)
1177
self.assertEqual("", protocol.in_buffer)
1178
protocol.accept_bytes('Y')
1179
self.assertEqual("XY", protocol.excess_buffer)
1180
self.assertEqual("", protocol.in_buffer)
1182
def test_accept_excess_bytes_after_dispatch(self):
1183
out_stream = StringIO()
1184
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1185
protocol.accept_bytes('hello\n')
1186
self.assertEqual("ok\x011\n", out_stream.getvalue())
1187
protocol.accept_bytes('hel')
1188
self.assertEqual("hel", protocol.excess_buffer)
1189
protocol.accept_bytes('lo\n')
1190
self.assertEqual("hello\n", protocol.excess_buffer)
1191
self.assertEqual("", protocol.in_buffer)
1193
def test__send_response_sets_finished_reading(self):
1194
protocol = smart.SmartServerRequestProtocolOne(None, lambda x: None)
1195
self.assertEqual(1, protocol.next_read_size())
1196
protocol._send_response(('x',))
1197
self.assertEqual(0, protocol.next_read_size())
1199
def test_query_version(self):
1200
"""query_version on a SmartClientProtocolOne should return a number.
1202
The protocol provides the query_version because the domain level clients
1203
may all need to be able to probe for capabilities.
1205
# What we really want to test here is that SmartClientProtocolOne calls
1206
# accept_bytes(tuple_based_encoding_of_hello) and reads and parses the
1207
# response of tuple-encoded (ok, 1). Also, seperately we should test
1208
# the error if the response is a non-understood version.
1209
input = StringIO('ok\x011\n')
1211
medium = smart.SmartSimplePipesClientMedium(input, output)
1212
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1213
self.assertEqual(1, protocol.query_version())
1215
def assertServerToClientEncoding(self, expected_bytes, expected_tuple,
1217
"""Assert that each input_tuple serialises as expected_bytes, and the
1218
bytes deserialise as expected_tuple.
1220
# check the encoding of the server for all input_tuples matches
1222
for input_tuple in input_tuples:
1223
server_output = StringIO()
1224
server_protocol = smart.SmartServerRequestProtocolOne(
1225
None, server_output.write)
1226
server_protocol._send_response(input_tuple)
1227
self.assertEqual(expected_bytes, server_output.getvalue())
1228
# check the decoding of the client protocol from expected_bytes:
1229
input = StringIO(expected_bytes)
1231
medium = smart.SmartSimplePipesClientMedium(input, output)
1232
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1233
protocol.call('foo')
1234
self.assertEqual(expected_tuple, protocol.read_response_tuple())
1236
def test_client_call_empty_response(self):
1237
# protocol.call() can get back an empty tuple as a response. This occurs
1238
# when the parsed line is an empty line, and results in a tuple with
1239
# one element - an empty string.
1240
self.assertServerToClientEncoding('\n', ('', ), [(), ('', )])
1242
def test_client_call_three_element_response(self):
1243
# protocol.call() can get back tuples of other lengths. A three element
1244
# tuple should be unpacked as three strings.
1245
self.assertServerToClientEncoding('a\x01b\x0134\n', ('a', 'b', '34'),
1248
def test_client_call_with_body_bytes_uploads(self):
1249
# protocol.call_with_upload should length-prefix the bytes onto the
1251
expected_bytes = "foo\n7\nabcdefgdone\n"
1252
input = StringIO("\n")
1254
medium = smart.SmartSimplePipesClientMedium(input, output)
1255
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1256
protocol.call_with_body_bytes(('foo', ), "abcdefg")
1257
self.assertEqual(expected_bytes, output.getvalue())
1259
def test_client_call_with_body_readv_array(self):
1260
# protocol.call_with_upload should encode the readv array and then
1261
# length-prefix the bytes onto the wire.
1262
expected_bytes = "foo\n7\n1,2\n5,6done\n"
1263
input = StringIO("\n")
1265
medium = smart.SmartSimplePipesClientMedium(input, output)
1266
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1267
protocol.call_with_body_readv_array(('foo', ), [(1,2),(5,6)])
1268
self.assertEqual(expected_bytes, output.getvalue())
1270
def test_client_read_body_bytes_all(self):
1271
# read_body_bytes should decode the body bytes from the wire into
1273
expected_bytes = "1234567"
1274
server_bytes = "ok\n7\n1234567done\n"
1275
input = StringIO(server_bytes)
1277
medium = smart.SmartSimplePipesClientMedium(input, output)
1278
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1279
protocol.call('foo')
1280
protocol.read_response_tuple(True)
1281
self.assertEqual(expected_bytes, protocol.read_body_bytes())
1283
def test_client_read_body_bytes_incremental(self):
1284
# test reading a few bytes at a time from the body
1285
# XXX: possibly we should test dribbling the bytes into the stringio
1286
# to make the state machine work harder: however, as we use the
1287
# LengthPrefixedBodyDecoder that is already well tested - we can skip
1289
expected_bytes = "1234567"
1290
server_bytes = "ok\n7\n1234567done\n"
1291
input = StringIO(server_bytes)
1293
medium = smart.SmartSimplePipesClientMedium(input, output)
1294
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1295
protocol.call('foo')
1296
protocol.read_response_tuple(True)
1297
self.assertEqual(expected_bytes[0:2], protocol.read_body_bytes(2))
1298
self.assertEqual(expected_bytes[2:4], protocol.read_body_bytes(2))
1299
self.assertEqual(expected_bytes[4:6], protocol.read_body_bytes(2))
1300
self.assertEqual(expected_bytes[6], protocol.read_body_bytes())
1302
def test_client_cancel_read_body_does_not_eat_body_bytes(self):
1303
# cancelling the expected body needs to finish the request, but not
1304
# read any more bytes.
1305
expected_bytes = "1234567"
1306
server_bytes = "ok\n7\n1234567done\n"
1307
input = StringIO(server_bytes)
1309
medium = smart.SmartSimplePipesClientMedium(input, output)
1310
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1311
protocol.call('foo')
1312
protocol.read_response_tuple(True)
1313
protocol.cancel_read_body()
1314
self.assertEqual(3, input.tell())
1315
self.assertRaises(errors.ReadingCompleted, protocol.read_body_bytes)
1318
class LengthPrefixedBodyDecoder(tests.TestCase):
1320
# XXX: TODO: make accept_reading_trailer invoke translate_response or
1321
# something similar to the ProtocolBase method.
1323
def test_construct(self):
1324
decoder = smart.LengthPrefixedBodyDecoder()
1325
self.assertFalse(decoder.finished_reading)
1326
self.assertEqual(6, decoder.next_read_size())
1327
self.assertEqual('', decoder.read_pending_data())
1328
self.assertEqual('', decoder.unused_data)
1330
def test_accept_bytes(self):
1331
decoder = smart.LengthPrefixedBodyDecoder()
1332
decoder.accept_bytes('')
1333
self.assertFalse(decoder.finished_reading)
1334
self.assertEqual(6, decoder.next_read_size())
1335
self.assertEqual('', decoder.read_pending_data())
1336
self.assertEqual('', decoder.unused_data)
1337
decoder.accept_bytes('7')
1338
self.assertFalse(decoder.finished_reading)
1339
self.assertEqual(6, decoder.next_read_size())
1340
self.assertEqual('', decoder.read_pending_data())
1341
self.assertEqual('', decoder.unused_data)
1342
decoder.accept_bytes('\na')
1343
self.assertFalse(decoder.finished_reading)
1344
self.assertEqual(11, decoder.next_read_size())
1345
self.assertEqual('a', decoder.read_pending_data())
1346
self.assertEqual('', decoder.unused_data)
1347
decoder.accept_bytes('bcdefgd')
1348
self.assertFalse(decoder.finished_reading)
1349
self.assertEqual(4, decoder.next_read_size())
1350
self.assertEqual('bcdefg', decoder.read_pending_data())
1351
self.assertEqual('', decoder.unused_data)
1352
decoder.accept_bytes('one')
1353
self.assertFalse(decoder.finished_reading)
1354
self.assertEqual(1, decoder.next_read_size())
1355
self.assertEqual('', decoder.read_pending_data())
1356
self.assertEqual('', decoder.unused_data)
1357
decoder.accept_bytes('\nblarg')
1358
self.assertTrue(decoder.finished_reading)
1359
self.assertEqual(1, decoder.next_read_size())
1360
self.assertEqual('', decoder.read_pending_data())
1361
self.assertEqual('blarg', decoder.unused_data)
1363
def test_accept_bytes_all_at_once_with_excess(self):
1364
decoder = smart.LengthPrefixedBodyDecoder()
1365
decoder.accept_bytes('1\nadone\nunused')
1366
self.assertTrue(decoder.finished_reading)
1367
self.assertEqual(1, decoder.next_read_size())
1368
self.assertEqual('a', decoder.read_pending_data())
1369
self.assertEqual('unused', decoder.unused_data)
1371
def test_accept_bytes_exact_end_of_body(self):
1372
decoder = smart.LengthPrefixedBodyDecoder()
1373
decoder.accept_bytes('1\na')
1374
self.assertFalse(decoder.finished_reading)
1375
self.assertEqual(5, decoder.next_read_size())
1376
self.assertEqual('a', decoder.read_pending_data())
1377
self.assertEqual('', decoder.unused_data)
1378
decoder.accept_bytes('done\n')
1379
self.assertTrue(decoder.finished_reading)
1380
self.assertEqual(1, decoder.next_read_size())
1381
self.assertEqual('', decoder.read_pending_data())
1382
self.assertEqual('', decoder.unused_data)
1385
class FakeHTTPMedium(object):
1387
self.written_request = None
1388
self._current_request = None
1389
def send_http_smart_request(self, bytes):
1390
self.written_request = bytes
1394
class HTTPTunnellingSmokeTest(tests.TestCaseWithTransport):
1396
def _test_bulk_data(self, url_protocol):
1397
# We should be able to send and receive bulk data in a single message.
1398
# The 'readv' command in the smart protocol both sends and receives bulk
1399
# data, so we use that.
1400
self.build_tree(['data-file'])
1401
http_server = HTTPServerWithSmarts()
1402
http_server._url_protocol = url_protocol
1404
self.addCleanup(http_server.tearDown)
1406
http_transport = get_transport(http_server.get_url())
1408
medium = http_transport.get_smart_medium()
1409
#remote_transport = RemoteTransport('fake_url', medium)
1410
remote_transport = smart.SmartTransport('/', medium=medium)
1412
[(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
1414
def test_bulk_data_pycurl(self):
1416
self._test_bulk_data('http+pycurl')
1417
except errors.UnsupportedProtocol, e:
1418
raise tests.TestSkipped(str(e))
1420
def test_bulk_data_urllib(self):
1421
self._test_bulk_data('http+urllib')
1423
def test_smart_http_medium_request_accept_bytes(self):
1424
medium = FakeHTTPMedium()
1425
request = SmartClientHTTPMediumRequest(medium)
1426
request.accept_bytes('abc')
1427
request.accept_bytes('def')
1428
self.assertEqual(None, medium.written_request)
1429
request.finished_writing()
1430
self.assertEqual('abcdef', medium.written_request)
1432
def _test_http_send_smart_request(self, url_protocol):
1433
http_server = HTTPServerWithSmarts()
1434
http_server._url_protocol = url_protocol
1436
self.addCleanup(http_server.tearDown)
1438
post_body = 'hello\n'
1439
expected_reply_body = 'ok\x011\n'
1441
http_transport = get_transport(http_server.get_url())
1442
medium = http_transport.get_smart_medium()
1443
response = medium.send_http_smart_request(post_body)
1444
reply_body = response.read()
1445
self.assertEqual(expected_reply_body, reply_body)
1447
def test_http_send_smart_request_pycurl(self):
1449
self._test_http_send_smart_request('http+pycurl')
1450
except errors.UnsupportedProtocol, e:
1451
raise tests.TestSkipped(str(e))
1453
def test_http_send_smart_request_urllib(self):
1454
self._test_http_send_smart_request('http+urllib')
1456
def test_http_server_with_smarts(self):
1457
http_server = HTTPServerWithSmarts()
1459
self.addCleanup(http_server.tearDown)
1461
post_body = 'hello\n'
1462
expected_reply_body = 'ok\x011\n'
1464
smart_server_url = http_server.get_url() + '.bzr/smart'
1465
reply = urllib2.urlopen(smart_server_url, post_body).read()
1467
self.assertEqual(expected_reply_body, reply)
1469
def test_smart_http_server_post_request_handler(self):
1470
http_server = HTTPServerWithSmarts()
1472
self.addCleanup(http_server.tearDown)
1473
httpd = http_server._get_httpd()
1475
socket = SampleSocket(
1476
'POST /.bzr/smart HTTP/1.0\r\n'
1477
# HTTP/1.0 posts must have a Content-Length.
1478
'Content-Length: 6\r\n'
1481
request_handler = SmartRequestHandler(
1482
socket, ('localhost', 80), httpd)
1483
response = socket.writefile.getvalue()
1484
self.assertStartsWith(response, 'HTTP/1.0 200 ')
1485
# This includes the end of the HTTP headers, and all the body.
1486
expected_end_of_response = '\r\n\r\nok\x011\n'
1487
self.assertEndsWith(response, expected_end_of_response)
1490
class SampleSocket(object):
1491
"""A socket-like object for use in testing the HTTP request handler."""
1493
def __init__(self, socket_read_content):
1494
"""Constructs a sample socket.
1496
:param socket_read_content: a byte sequence
1498
# Use plain python StringIO so we can monkey-patch the close method to
1499
# not discard the contents.
1500
from StringIO import StringIO
1501
self.readfile = StringIO(socket_read_content)
1502
self.writefile = StringIO()
1503
self.writefile.close = lambda: None
1505
def makefile(self, mode='r', bufsize=None):
1507
return self.readfile
1509
return self.writefile
410
1512
# TODO: Client feature that does get_bundle and then installs that into a
411
1513
# branch; this can be used in place of the regular pull/fetch operation when
412
1514
# coming from a smart server.