1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
# FIXME: This test should be repeated for each available http client
18
# implementation; at the moment we have urllib and pycurl.
20
# TODO: Should be renamed to bzrlib.transport.http.tests?
28
from bzrlib.errors import DependencyNotPresent, UnsupportedProtocol
29
from bzrlib import osutils
30
from bzrlib.tests import TestCase, TestSkipped
31
from bzrlib.transport import get_transport, Transport
32
from bzrlib.transport.http import extract_auth, HttpTransportBase
33
from bzrlib.transport.http._urllib import HttpTransport_urllib
34
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
37
class FakeManager(object):
42
def add_password(self, realm, host, username, password):
43
self.credentials.append([realm, host, username, password])
46
class RecordingServer(object):
47
"""A fake HTTP server.
49
It records the bytes sent to it, and replies with a 200.
52
def __init__(self, expect_body_tail=None):
55
:type expect_body_tail: str
56
:param expect_body_tail: a reply won't be sent until this string is
59
self._expect_body_tail = expect_body_tail
62
self.received_bytes = ''
65
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
66
self._sock.bind(('127.0.0.1', 0))
67
self.host, self.port = self._sock.getsockname()
68
self._ready = threading.Event()
69
self._thread = threading.Thread(target=self._accept_read_and_reply)
70
self._thread.setDaemon(True)
74
def _accept_read_and_reply(self):
77
self._sock.settimeout(5)
79
conn, address = self._sock.accept()
80
# On win32, the accepted connection will be non-blocking to start
81
# with because we're using settimeout.
82
conn.setblocking(True)
83
while not self.received_bytes.endswith(self._expect_body_tail):
84
self.received_bytes += conn.recv(4096)
85
conn.sendall('HTTP/1.1 200 OK\r\n')
86
except socket.timeout:
87
# Make sure the client isn't stuck waiting for us to e.g. accept.
94
# We might have already closed it. We don't care.
100
class TestHttpUrls(TestCase):
102
def test_url_parsing(self):
104
url = extract_auth('http://example.com', f)
105
self.assertEquals('http://example.com', url)
106
self.assertEquals(0, len(f.credentials))
107
url = extract_auth('http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
108
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
109
self.assertEquals(1, len(f.credentials))
110
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'], f.credentials[0])
112
def test_abs_url(self):
113
"""Construction of absolute http URLs"""
114
t = HttpTransport_urllib('http://bazaar-vcs.org/bzr/bzr.dev/')
115
eq = self.assertEqualDiff
117
'http://bazaar-vcs.org/bzr/bzr.dev')
118
eq(t.abspath('foo/bar'),
119
'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
120
eq(t.abspath('.bzr'),
121
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
122
eq(t.abspath('.bzr/1//2/./3'),
123
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
125
def test_invalid_http_urls(self):
126
"""Trap invalid construction of urls"""
127
t = HttpTransport_urllib('http://bazaar-vcs.org/bzr/bzr.dev/')
128
self.assertRaises(ValueError,
132
def test_http_root_urls(self):
133
"""Construction of URLs from server root"""
134
t = HttpTransport_urllib('http://bzr.ozlabs.org/')
135
eq = self.assertEqualDiff
136
eq(t.abspath('.bzr/tree-version'),
137
'http://bzr.ozlabs.org/.bzr/tree-version')
139
def test_http_impl_urls(self):
140
"""There are servers which ask for particular clients to connect"""
142
from bzrlib.transport.http._pycurl import HttpServer_PyCurl
143
server = HttpServer_PyCurl()
146
url = server.get_url()
147
self.assertTrue(url.startswith('http+pycurl://'))
150
except DependencyNotPresent:
151
raise TestSkipped('pycurl not present')
154
class TestHttpMixins(object):
156
def _prep_tree(self):
157
self.build_tree(['xxx', 'foo/', 'foo/bar'], line_endings='binary',
158
transport=self.get_transport())
160
def test_http_has(self):
161
server = self.get_readonly_server()
162
t = self._transport(server.get_url())
163
self.assertEqual(t.has('foo/bar'), True)
164
self.assertEqual(len(server.logs), 1)
165
self.assertContainsRe(server.logs[0],
166
r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
168
def test_http_has_not_found(self):
169
server = self.get_readonly_server()
170
t = self._transport(server.get_url())
171
self.assertEqual(t.has('not-found'), False)
172
self.assertContainsRe(server.logs[1],
173
r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
175
def test_http_get(self):
176
server = self.get_readonly_server()
177
t = self._transport(server.get_url())
178
fp = t.get('foo/bar')
179
self.assertEqualDiff(
181
'contents of foo/bar\n')
182
self.assertEqual(len(server.logs), 1)
183
self.assertTrue(server.logs[0].find(
184
'"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s' % bzrlib.__version__) > -1)
186
def test_get_smart_medium(self):
187
# For HTTP, get_smart_medium should return the transport object.
188
server = self.get_readonly_server()
189
http_transport = self._transport(server.get_url())
190
medium = http_transport.get_smart_medium()
191
self.assertIs(medium, http_transport)
194
class TestHttpConnections_urllib(TestCaseWithWebserver, TestHttpMixins):
196
_transport = HttpTransport_urllib
199
TestCaseWithWebserver.setUp(self)
202
def test_has_on_bogus_host(self):
204
# Get a random address, so that we can be sure there is no
205
# http handler there.
207
s.bind(('localhost', 0))
208
t = self._transport('http://%s:%s/' % s.getsockname())
209
self.assertRaises(urllib2.URLError, t.has, 'foo/bar')
212
class TestHttpConnections_pycurl(TestCaseWithWebserver, TestHttpMixins):
214
def _get_pycurl_maybe(self):
216
from bzrlib.transport.http._pycurl import PyCurlTransport
217
return PyCurlTransport
218
except DependencyNotPresent:
219
raise TestSkipped('pycurl not present')
221
_transport = property(_get_pycurl_maybe)
224
TestCaseWithWebserver.setUp(self)
228
class TestHttpTransportRegistration(TestCase):
229
"""Test registrations of various http implementations"""
231
def test_http_registered(self):
232
import bzrlib.transport.http._urllib
233
from bzrlib.transport import get_transport
234
# urlllib should always be present
235
t = get_transport('http+urllib://bzr.google.com/')
236
self.assertIsInstance(t, Transport)
237
self.assertIsInstance(t, bzrlib.transport.http._urllib.HttpTransport_urllib)
240
class TestOffsets(TestCase):
241
"""Test offsets_to_ranges method"""
243
def test_offsets_to_ranges_simple(self):
244
to_range = HttpTransportBase.offsets_to_ranges
245
ranges = to_range([(10, 1)])
246
self.assertEqual([[10, 10]], ranges)
248
ranges = to_range([(0, 1), (1, 1)])
249
self.assertEqual([[0, 1]], ranges)
251
ranges = to_range([(1, 1), (0, 1)])
252
self.assertEqual([[0, 1]], ranges)
254
def test_offset_to_ranges_overlapped(self):
255
to_range = HttpTransportBase.offsets_to_ranges
257
ranges = to_range([(10, 1), (20, 2), (22, 5)])
258
self.assertEqual([[10, 10], [20, 26]], ranges)
260
ranges = to_range([(10, 1), (11, 2), (22, 5)])
261
self.assertEqual([[10, 12], [22, 26]], ranges)
264
class TestPost(TestCase):
266
def _test_post_body_is_received(self, scheme):
267
server = RecordingServer(expect_body_tail='end-of-body')
269
self.addCleanup(server.tearDown)
270
url = '%s://%s:%s/' % (scheme, server.host, server.port)
272
http_transport = get_transport(url)
273
except UnsupportedProtocol:
274
raise TestSkipped('%s not available' % scheme)
275
code, response = http_transport._post('abc def end-of-body')
277
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
278
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
279
# The transport should not be assuming that the server can accept
280
# chunked encoding the first time it connects, because HTTP/1.1, so we
281
# check for the literal string.
283
server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
285
def test_post_body_is_received_urllib(self):
286
self._test_post_body_is_received('http+urllib')
288
def test_post_body_is_received_pycurl(self):
289
self._test_post_body_is_received('http+pycurl')
292
class TestRangeHeader(TestCase):
293
"""Test range_header method"""
295
def check_header(self, value, ranges=[], tail=0):
296
range_header = HttpTransportBase.range_header
297
self.assertEqual(value, range_header(ranges, tail))
299
def test_range_header_single(self):
300
self.check_header('0-9', ranges=[[0,9]])
301
self.check_header('100-109', ranges=[[100,109]])
303
def test_range_header_tail(self):
304
self.check_header('-10', tail=10)
305
self.check_header('-50', tail=50)
307
def test_range_header_multi(self):
308
self.check_header('0-9,100-200,300-5000',
309
ranges=[(0,9), (100, 200), (300,5000)])
311
def test_range_header_mixed(self):
312
self.check_header('0-9,300-5000,-50',
313
ranges=[(0,9), (300,5000)],
317
class TestRecordingServer(TestCase):
319
def test_create(self):
320
server = RecordingServer(expect_body_tail=None)
321
self.assertEqual('', server.received_bytes)
322
self.assertEqual(None, server.host)
323
self.assertEqual(None, server.port)
325
def test_setUp_and_tearDown(self):
326
server = RecordingServer(expect_body_tail=None)
329
self.assertNotEqual(None, server.host)
330
self.assertNotEqual(None, server.port)
333
self.assertEqual(None, server.host)
334
self.assertEqual(None, server.port)
336
def test_send_receive_bytes(self):
337
server = RecordingServer(expect_body_tail='c')
339
self.addCleanup(server.tearDown)
340
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
341
sock.connect((server.host, server.port))
343
self.assertEqual('HTTP/1.1 200 OK\r\n',
344
osutils.recv_all(sock, 4096))
345
self.assertEqual('abc', server.received_bytes)