60
62
self.credentials.append([realm, host, username, password])
65
class RecordingServer(object):
66
"""A fake HTTP server.
68
It records the bytes sent to it, and replies with a 200.
71
def __init__(self, expect_body_tail=None):
74
:type expect_body_tail: str
75
:param expect_body_tail: a reply won't be sent until this string is
78
self._expect_body_tail = expect_body_tail
81
self.received_bytes = ''
84
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
85
self._sock.bind(('127.0.0.1', 0))
86
self.host, self.port = self._sock.getsockname()
87
self._ready = threading.Event()
88
self._thread = threading.Thread(target=self._accept_read_and_reply)
89
self._thread.setDaemon(True)
93
def _accept_read_and_reply(self):
96
self._sock.settimeout(5)
98
conn, address = self._sock.accept()
99
# On win32, the accepted connection will be non-blocking to start
100
# with because we're using settimeout.
101
conn.setblocking(True)
102
while not self.received_bytes.endswith(self._expect_body_tail):
103
self.received_bytes += conn.recv(4096)
104
conn.sendall('HTTP/1.1 200 OK\r\n')
105
except socket.timeout:
106
# Make sure the client isn't stuck waiting for us to e.g. accept.
113
# We might have already closed it. We don't care.
63
119
class TestHttpUrls(TestCase):
65
121
def test_url_parsing(self):
154
210
'"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
155
211
% bzrlib.__version__) > -1)
213
def test_get_smart_medium(self):
214
# For HTTP, get_smart_medium should return the transport object.
215
server = self.get_readonly_server()
216
http_transport = self._transport(server.get_url())
217
medium = http_transport.get_smart_medium()
218
self.assertIs(medium, http_transport)
157
220
def test_has_on_bogus_host(self):
158
221
# Get a free address and don't 'accept' on it, so that we
159
222
# can be sure there is no http handler there, but set a
228
291
self.assertEqual([[10, 12], [22, 26]], ranges)
294
class TestPost(TestCase):
296
def _test_post_body_is_received(self, scheme):
297
server = RecordingServer(expect_body_tail='end-of-body')
299
self.addCleanup(server.tearDown)
300
url = '%s://%s:%s/' % (scheme, server.host, server.port)
302
http_transport = get_transport(url)
303
except UnsupportedProtocol:
304
raise TestSkipped('%s not available' % scheme)
305
code, response = http_transport._post('abc def end-of-body')
307
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
308
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
309
# The transport should not be assuming that the server can accept
310
# chunked encoding the first time it connects, because HTTP/1.1, so we
311
# check for the literal string.
313
server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
315
def test_post_body_is_received_urllib(self):
316
self._test_post_body_is_received('http+urllib')
318
def test_post_body_is_received_pycurl(self):
319
self._test_post_body_is_received('http+pycurl')
231
322
class TestRangeHeader(TestCase):
232
323
"""Test range_header method"""
392
483
TestCaseWithWebserver):
393
484
"""Tests forbidden server for pycurl implementation"""
487
class TestRecordingServer(TestCase):
489
def test_create(self):
490
server = RecordingServer(expect_body_tail=None)
491
self.assertEqual('', server.received_bytes)
492
self.assertEqual(None, server.host)
493
self.assertEqual(None, server.port)
495
def test_setUp_and_tearDown(self):
496
server = RecordingServer(expect_body_tail=None)
499
self.assertNotEqual(None, server.host)
500
self.assertNotEqual(None, server.port)
503
self.assertEqual(None, server.host)
504
self.assertEqual(None, server.port)
506
def test_send_receive_bytes(self):
507
server = RecordingServer(expect_body_tail='c')
509
self.addCleanup(server.tearDown)
510
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
511
sock.connect((server.host, server.port))
513
self.assertEqual('HTTP/1.1 200 OK\r\n',
514
sock.recv(4096, socket.MSG_WAITALL))
515
self.assertEqual('abc', server.received_bytes)