65
if features.pycurl.available():
66
65
from bzrlib.transport.http._pycurl import PyCurlTransport
69
load_tests = load_tests_apply_scenarios
72
def vary_by_http_client_implementation():
73
"""Test the two libraries we can use, pycurl and urllib."""
67
except errors.DependencyNotPresent:
68
pycurl_present = False
71
def load_tests(standard_tests, module, loader):
72
"""Multiply tests for http clients and protocol versions."""
73
result = loader.suiteClass()
75
# one for each transport implementation
76
t_tests, remaining_tests = tests.split_suite_by_condition(
77
standard_tests, tests.condition_isinstance((
78
TestHttpTransportRegistration,
79
TestHttpTransportUrls,
74
82
transport_scenarios = [
75
83
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
76
84
_server=http_server.HttpServer_urllib,
77
_url_protocol='http+urllib',)),
85
_qualified_prefix='http+urllib',)),
79
if features.pycurl.available():
80
88
transport_scenarios.append(
81
89
('pycurl', dict(_transport=PyCurlTransport,
82
90
_server=http_server.HttpServer_PyCurl,
83
_url_protocol='http+pycurl',)))
84
return transport_scenarios
87
def vary_by_http_protocol_version():
88
"""Test on http/1.0 and 1.1"""
90
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
91
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
91
_qualified_prefix='http+pycurl',)))
92
tests.multiply_tests(t_tests, transport_scenarios, result)
94
# each implementation tested with each HTTP version
95
tp_tests, remaining_tests = tests.split_suite_by_condition(
96
remaining_tests, tests.condition_isinstance((
97
SmartHTTPTunnellingTest,
98
TestDoCatchRedirections,
100
TestHTTPRedirections,
101
TestHTTPSilentRedirections,
102
TestLimitedRangeRequestServer,
106
TestSpecificRequestHandler,
108
protocol_scenarios = [
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
112
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
116
# proxy auth: each auth scheme on all http versions on all implementations.
117
tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
remaining_tests, tests.condition_isinstance((
121
proxy_auth_scheme_scenarios = [
122
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
125
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
95
def vary_by_http_auth_scheme():
127
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
proxy_auth_scheme_scenarios)
129
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
131
# auth: each auth scheme on all http versions on all implementations.
132
tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
remaining_tests, tests.condition_isinstance((
136
auth_scheme_scenarios = [
97
137
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
98
138
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
100
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
102
# Add some attributes common to all scenarios
103
for scenario_id, scenario_dict in scenarios:
104
scenario_dict.update(_auth_header='Authorization',
105
_username_prompt_prefix='',
106
_password_prompt_prefix='')
110
def vary_by_http_proxy_auth_scheme():
112
('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
113
('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
114
('proxy-basicdigest',
115
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
117
# Add some attributes common to all scenarios
118
for scenario_id, scenario_dict in scenarios:
119
scenario_dict.update(_auth_header='Proxy-Authorization',
120
_username_prompt_prefix='Proxy ',
121
_password_prompt_prefix='Proxy ')
125
def vary_by_http_activity():
140
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
142
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
auth_scheme_scenarios)
144
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
146
# activity: on all http[s] versions on all implementations
147
tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
remaining_tests, tests.condition_isinstance((
126
151
activity_scenarios = [
127
152
('urllib,http', dict(_activity_server=ActivityHTTPServer,
128
_transport=_urllib.HttpTransport_urllib,)),
153
_transport=_urllib.HttpTransport_urllib,)),
130
if features.pycurl.available():
155
if tests.HTTPSServerFeature.available():
156
activity_scenarios.append(
157
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
_transport=_urllib.HttpTransport_urllib,)),)
131
160
activity_scenarios.append(
132
161
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
133
_transport=PyCurlTransport,)),)
134
if features.HTTPSServerFeature.available():
135
# FIXME: Until we have a better way to handle self-signed certificates
136
# (like allowing them in a test specific authentication.conf for
137
# example), we need some specialized pycurl/urllib transport for tests.
139
from bzrlib.tests import (
142
class HTTPS_urllib_transport(_urllib.HttpTransport_urllib):
144
def __init__(self, base, _from_transport=None):
145
super(HTTPS_urllib_transport, self).__init__(
146
base, _from_transport=_from_transport,
147
ca_certs=ssl_certs.build_path('ca.crt'))
149
activity_scenarios.append(
150
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
151
_transport=HTTPS_urllib_transport,)),)
152
if features.pycurl.available():
162
_transport=PyCurlTransport,)),)
163
if tests.HTTPSServerFeature.available():
164
from bzrlib.tests import (
167
# FIXME: Until we have a better way to handle self-signed
168
# certificates (like allowing them in a test specific
169
# authentication.conf for example), we need some specialized pycurl
170
# transport for tests.
153
171
class HTTPS_pycurl_transport(PyCurlTransport):
155
173
def __init__(self, base, _from_transport=None):
191
217
self.received_bytes = ''
195
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
197
def start_server(self):
198
220
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
199
221
self._sock.bind(('127.0.0.1', 0))
200
222
self.host, self.port = self._sock.getsockname()
201
223
self._ready = threading.Event()
202
self._thread = test_server.TestThread(
203
sync_event=self._ready, target=self._accept_read_and_reply)
224
self._thread = threading.Thread(target=self._accept_read_and_reply)
225
self._thread.setDaemon(True)
204
226
self._thread.start()
205
if 'threads' in tests.selftest_debug_flags:
206
sys.stderr.write('Thread started: %s\n' % (self._thread.ident,))
209
229
def _accept_read_and_reply(self):
210
230
self._sock.listen(1)
211
231
self._ready.set()
212
conn, address = self._sock.accept()
213
if self._expect_body_tail is not None:
232
self._sock.settimeout(5)
234
conn, address = self._sock.accept()
235
# On win32, the accepted connection will be non-blocking to start
236
# with because we're using settimeout.
237
conn.setblocking(True)
214
238
while not self.received_bytes.endswith(self._expect_body_tail):
215
239
self.received_bytes += conn.recv(4096)
216
240
conn.sendall('HTTP/1.1 200 OK\r\n')
241
except socket.timeout:
242
# Make sure the client isn't stuck waiting for us to e.g. accept.
218
243
self._sock.close()
219
244
except socket.error:
220
245
# The client may have already closed the socket.
223
def stop_server(self):
225
# Issue a fake connection to wake up the server and allow it to
227
fake_conn = osutils.connect_socket((self.host, self.port))
229
251
except socket.error:
230
252
# We might have already closed it. We don't care.
235
if 'threads' in tests.selftest_debug_flags:
236
sys.stderr.write('Thread joined: %s\n' % (self._thread.ident,))
239
258
class TestAuthHeader(tests.TestCase):
266
285
_urllib2_wrappers.BasicAuthHandler)
267
286
match, realm = self.auth_handler.extract_realm(remainder)
268
287
self.assertTrue(match is not None)
269
self.assertEqual('Thou should not pass', realm)
288
self.assertEquals('Thou should not pass', realm)
271
290
def test_digest_header(self):
272
291
scheme, remainder = self.parse_header(
273
292
'Digest realm="Thou should not pass"')
274
self.assertEqual('digest', scheme)
275
self.assertEqual('realm="Thou should not pass"', remainder)
278
class TestHTTPRangeParsing(tests.TestCase):
281
super(TestHTTPRangeParsing, self).setUp()
282
# We focus on range parsing here and ignore everything else
283
class RequestHandler(http_server.TestingHTTPRequestHandler):
284
def setup(self): pass
285
def handle(self): pass
286
def finish(self): pass
288
self.req_handler = RequestHandler(None, None, None)
290
def assertRanges(self, ranges, header, file_size):
291
self.assertEqual(ranges,
292
self.req_handler._parse_ranges(header, file_size))
294
def test_simple_range(self):
295
self.assertRanges([(0,2)], 'bytes=0-2', 12)
298
self.assertRanges([(8, 11)], 'bytes=-4', 12)
300
def test_tail_bigger_than_file(self):
301
self.assertRanges([(0, 11)], 'bytes=-99', 12)
303
def test_range_without_end(self):
304
self.assertRanges([(4, 11)], 'bytes=4-', 12)
306
def test_invalid_ranges(self):
307
self.assertRanges(None, 'bytes=12-22', 12)
308
self.assertRanges(None, 'bytes=1-3,12-22', 12)
309
self.assertRanges(None, 'bytes=-', 12)
293
self.assertEquals('digest', scheme)
294
self.assertEquals('realm="Thou should not pass"', remainder)
312
297
class TestHTTPServer(tests.TestCase):
378
374
"""Test case to inherit from if pycurl is present"""
380
376
def _get_pycurl_maybe(self):
381
self.requireFeature(features.pycurl)
382
return PyCurlTransport
378
from bzrlib.transport.http._pycurl import PyCurlTransport
379
return PyCurlTransport
380
except errors.DependencyNotPresent:
381
raise tests.TestSkipped('pycurl not present')
384
383
_transport = property(_get_pycurl_maybe)
386
class TestHttpUrls(tests.TestCase):
388
# TODO: This should be moved to authorization tests once they
391
def test_url_parsing(self):
393
url = http.extract_auth('http://example.com', f)
394
self.assertEquals('http://example.com', url)
395
self.assertEquals(0, len(f.credentials))
396
url = http.extract_auth(
397
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
398
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
399
self.assertEquals(1, len(f.credentials))
400
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
387
404
class TestHttpTransportUrls(tests.TestCase):
388
405
"""Test the http urls."""
390
scenarios = vary_by_http_client_implementation()
392
407
def test_abs_url(self):
393
408
"""Construction of absolute http URLs"""
394
t = self._transport('http://example.com/bzr/bzr.dev/')
409
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
395
410
eq = self.assertEqualDiff
396
eq(t.abspath('.'), 'http://example.com/bzr/bzr.dev')
397
eq(t.abspath('foo/bar'), 'http://example.com/bzr/bzr.dev/foo/bar')
398
eq(t.abspath('.bzr'), 'http://example.com/bzr/bzr.dev/.bzr')
411
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
412
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
413
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
399
414
eq(t.abspath('.bzr/1//2/./3'),
400
'http://example.com/bzr/bzr.dev/.bzr/1/2/3')
415
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
402
417
def test_invalid_http_urls(self):
403
418
"""Trap invalid construction of urls"""
404
self._transport('http://example.com/bzr/bzr.dev/')
419
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
405
420
self.assertRaises(errors.InvalidURL,
407
'http://http://example.com/bzr/bzr.dev/')
422
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
409
424
def test_http_root_urls(self):
410
425
"""Construction of URLs from server root"""
411
t = self._transport('http://example.com/')
426
t = self._transport('http://bzr.ozlabs.org/')
412
427
eq = self.assertEqualDiff
413
428
eq(t.abspath('.bzr/tree-version'),
414
'http://example.com/.bzr/tree-version')
429
'http://bzr.ozlabs.org/.bzr/tree-version')
416
431
def test_http_impl_urls(self):
417
432
"""There are servers which ask for particular clients to connect"""
418
433
server = self._server()
419
server.start_server()
421
436
url = server.get_url()
422
self.assertTrue(url.startswith('%s://' % self._url_protocol))
437
self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
427
442
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
436
451
https by supplying a fake version_info that do not
439
self.requireFeature(features.pycurl)
440
# Import the module locally now that we now it's available.
441
pycurl = features.pycurl.module
457
raise tests.TestSkipped('pycurl not present')
443
self.overrideAttr(pycurl, 'version_info',
444
# Fake the pycurl version_info This was taken from
445
# a windows pycurl without SSL (thanks to bialix)
454
('ftp', 'gopher', 'telnet',
455
'dict', 'ldap', 'http', 'file'),
459
self.assertRaises(errors.DependencyNotPresent, self._transport,
460
'https://launchpad.net')
459
version_info_orig = pycurl.version_info
461
# Now that we have pycurl imported, we can fake its version_info
462
# This was taken from a windows pycurl without SSL
464
pycurl.version_info = lambda : (2,
472
('ftp', 'gopher', 'telnet',
473
'dict', 'ldap', 'http', 'file'),
477
self.assertRaises(errors.DependencyNotPresent, self._transport,
478
'https://launchpad.net')
480
# Restore the right function
481
pycurl.version_info = version_info_orig
463
484
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
464
485
"""Test the http connections."""
466
scenarios = multiply_scenarios(
467
vary_by_http_client_implementation(),
468
vary_by_http_protocol_version(),
472
super(TestHTTPConnections, self).setUp()
488
http_utils.TestCaseWithWebserver.setUp(self)
473
489
self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
474
490
transport=self.get_transport())
476
492
def test_http_has(self):
477
493
server = self.get_readonly_server()
478
t = self.get_readonly_transport()
494
t = self._transport(server.get_url())
479
495
self.assertEqual(t.has('foo/bar'), True)
480
496
self.assertEqual(len(server.logs), 1)
481
497
self.assertContainsRe(server.logs[0],
518
534
class TestHttpTransportRegistration(tests.TestCase):
519
535
"""Test registrations of various http implementations"""
521
scenarios = vary_by_http_client_implementation()
523
537
def test_http_registered(self):
524
t = transport.get_transport_from_url(
525
'%s://foo.com/' % self._url_protocol)
538
t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
526
539
self.assertIsInstance(t, transport.Transport)
527
540
self.assertIsInstance(t, self._transport)
530
543
class TestPost(tests.TestCase):
532
scenarios = multiply_scenarios(
533
vary_by_http_client_implementation(),
534
vary_by_http_protocol_version(),
537
545
def test_post_body_is_received(self):
538
server = RecordingServer(expect_body_tail='end-of-body',
539
scheme=self._url_protocol)
540
self.start_server(server)
541
url = server.get_url()
542
# FIXME: needs a cleanup -- vila 20100611
543
http_transport = transport.get_transport_from_url(url)
546
server = RecordingServer(expect_body_tail='end-of-body')
548
self.addCleanup(server.tearDown)
549
scheme = self._qualified_prefix
550
url = '%s://%s:%s/' % (scheme, server.host, server.port)
551
http_transport = self._transport(url)
544
552
code, response = http_transport._post('abc def end-of-body')
546
554
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
547
555
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
548
self.assertTrue('content-type: application/octet-stream\r'
549
in server.received_bytes.lower())
550
556
# The transport should not be assuming that the server can accept
551
557
# chunked encoding the first time it connects, because HTTP/1.1, so we
552
558
# check for the literal string.
1052
1052
self.assertEqual('single', t._range_hint)
1055
class TruncatedBeforeBoundaryRequestHandler(
1056
http_server.TestingHTTPRequestHandler):
1057
"""Truncation before a boundary, like in bug 198646"""
1059
_truncated_ranges = 1
1061
def get_multiple_ranges(self, file, file_size, ranges):
1062
self.send_response(206)
1063
self.send_header('Accept-Ranges', 'bytes')
1065
self.send_header('Content-Type',
1066
'multipart/byteranges; boundary=%s' % boundary)
1067
boundary_line = '--%s\r\n' % boundary
1068
# Calculate the Content-Length
1070
for (start, end) in ranges:
1071
content_length += len(boundary_line)
1072
content_length += self._header_line_length(
1073
'Content-type', 'application/octet-stream')
1074
content_length += self._header_line_length(
1075
'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1076
content_length += len('\r\n') # end headers
1077
content_length += end - start # + 1
1078
content_length += len(boundary_line)
1079
self.send_header('Content-length', content_length)
1082
# Send the multipart body
1084
for (start, end) in ranges:
1085
if cur + self._truncated_ranges >= len(ranges):
1086
# Abruptly ends the response and close the connection
1087
self.close_connection = 1
1089
self.wfile.write(boundary_line)
1090
self.send_header('Content-type', 'application/octet-stream')
1091
self.send_header('Content-Range', 'bytes %d-%d/%d'
1092
% (start, end, file_size))
1094
self.send_range_content(file, start, end - start + 1)
1097
self.wfile.write(boundary_line)
1100
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1101
"""Tests the case of bug 198646, disconnecting before a boundary."""
1103
_req_handler_class = TruncatedBeforeBoundaryRequestHandler
1106
super(TestTruncatedBeforeBoundary, self).setUp()
1107
self.build_tree_contents([('a', '0123456789')],)
1109
def test_readv_with_short_reads(self):
1110
server = self.get_readonly_server()
1111
t = self.get_readonly_transport()
1112
# Force separate ranges for each offset
1113
t._bytes_to_read_before_seek = 0
1114
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1115
self.assertEqual((0, '0'), ireadv.next())
1116
self.assertEqual((2, '2'), ireadv.next())
1117
self.assertEqual((4, '45'), ireadv.next())
1118
self.assertEqual((9, '9'), ireadv.next())
1121
1054
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1122
1055
"""Errors out when range specifiers exceed the limit"""
1192
1123
Only the urllib implementation is tested here.
1127
tests.TestCase.setUp(self)
1132
tests.TestCase.tearDown(self)
1134
def _install_env(self, env):
1135
for name, value in env.iteritems():
1136
self._old_env[name] = osutils.set_or_unset_env(name, value)
1138
def _restore_env(self):
1139
for name, value in self._old_env.iteritems():
1140
osutils.set_or_unset_env(name, value)
1195
1142
def _proxied_request(self):
1196
1143
handler = _urllib2_wrappers.ProxyHandler()
1197
request = _urllib2_wrappers.Request('GET', 'http://baz/buzzle')
1144
request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1198
1145
handler.set_proxy(request, 'http')
1201
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1202
handler = _urllib2_wrappers.ProxyHandler()
1203
self.assertEqual(expected,
1204
handler.evaluate_proxy_bypass(host, no_proxy))
1206
1148
def test_empty_user(self):
1207
self.overrideEnv('http_proxy', 'http://bar.com')
1208
request = self._proxied_request()
1209
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1211
def test_user_with_at(self):
1212
self.overrideEnv('http_proxy',
1213
'http://username@domain:password@proxy_host:1234')
1149
self._install_env({'http_proxy': 'http://bar.com'})
1214
1150
request = self._proxied_request()
1215
1151
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1217
1153
def test_invalid_proxy(self):
1218
1154
"""A proxy env variable without scheme"""
1219
self.overrideEnv('http_proxy', 'host:1234')
1155
self._install_env({'http_proxy': 'host:1234'})
1220
1156
self.assertRaises(errors.InvalidURL, self._proxied_request)
1222
def test_evaluate_proxy_bypass_true(self):
1223
"""The host is not proxied"""
1224
self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1225
self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1227
def test_evaluate_proxy_bypass_false(self):
1228
"""The host is proxied"""
1229
self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1231
def test_evaluate_proxy_bypass_unknown(self):
1232
"""The host is not explicitly proxied"""
1233
self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1234
self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1236
def test_evaluate_proxy_bypass_empty_entries(self):
1237
"""Ignore empty entries"""
1238
self.assertEvaluateProxyBypass(None, 'example.com', '')
1239
self.assertEvaluateProxyBypass(None, 'example.com', ',')
1240
self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1243
1159
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1244
1160
"""Tests proxy server.
1249
1165
to the file names).
1252
scenarios = multiply_scenarios(
1253
vary_by_http_client_implementation(),
1254
vary_by_http_protocol_version(),
1257
1168
# FIXME: We don't have an https server available, so we don't
1258
# test https connections. --vila toolongago
1169
# test https connections.
1260
1171
def setUp(self):
1261
1172
super(TestProxyHttpServer, self).setUp()
1262
self.transport_secondary_server = http_utils.ProxyServer
1263
1173
self.build_tree_contents([('foo', 'contents of foo\n'),
1264
1174
('foo-proxied', 'proxied contents of foo\n')])
1265
1175
# Let's setup some attributes for tests
1266
server = self.get_readonly_server()
1267
self.server_host_port = '%s:%d' % (server.host, server.port)
1176
self.server = self.get_readonly_server()
1177
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1268
1178
if self._testing_pycurl():
1269
1179
# Oh my ! pycurl does not check for the port as part of
1270
1180
# no_proxy :-( So we just test the host part
1271
self.no_proxy_host = server.host
1181
self.no_proxy_host = 'localhost'
1273
self.no_proxy_host = self.server_host_port
1183
self.no_proxy_host = self.proxy_address
1274
1184
# The secondary server is the proxy
1275
self.proxy_url = self.get_secondary_url()
1185
self.proxy = self.get_secondary_server()
1186
self.proxy_url = self.proxy.get_url()
1277
1189
def _testing_pycurl(self):
1278
# TODO: This is duplicated for lots of the classes in this file
1279
return (features.pycurl.available()
1280
and self._transport == PyCurlTransport)
1282
def assertProxied(self):
1283
t = self.get_readonly_transport()
1284
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1286
def assertNotProxied(self):
1287
t = self.get_readonly_transport()
1288
self.assertEqual('contents of foo\n', t.get('foo').read())
1190
return pycurl_present and self._transport == PyCurlTransport
1192
def create_transport_secondary_server(self):
1193
"""Creates an http server that will serve files with
1194
'-proxied' appended to their names.
1196
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1198
def _install_env(self, env):
1199
for name, value in env.iteritems():
1200
self._old_env[name] = osutils.set_or_unset_env(name, value)
1202
def _restore_env(self):
1203
for name, value in self._old_env.iteritems():
1204
osutils.set_or_unset_env(name, value)
1206
def proxied_in_env(self, env):
1207
self._install_env(env)
1208
url = self.server.get_url()
1209
t = self._transport(url)
1211
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1215
def not_proxied_in_env(self, env):
1216
self._install_env(env)
1217
url = self.server.get_url()
1218
t = self._transport(url)
1220
self.assertEqual('contents of foo\n', t.get('foo').read())
1290
1224
def test_http_proxy(self):
1291
self.overrideEnv('http_proxy', self.proxy_url)
1292
self.assertProxied()
1225
self.proxied_in_env({'http_proxy': self.proxy_url})
1294
1227
def test_HTTP_PROXY(self):
1295
1228
if self._testing_pycurl():
1298
1231
# about. Should we ?)
1299
1232
raise tests.TestNotApplicable(
1300
1233
'pycurl does not check HTTP_PROXY for security reasons')
1301
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1302
self.assertProxied()
1234
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1304
1236
def test_all_proxy(self):
1305
self.overrideEnv('all_proxy', self.proxy_url)
1306
self.assertProxied()
1237
self.proxied_in_env({'all_proxy': self.proxy_url})
1308
1239
def test_ALL_PROXY(self):
1309
self.overrideEnv('ALL_PROXY', self.proxy_url)
1310
self.assertProxied()
1240
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1312
1242
def test_http_proxy_with_no_proxy(self):
1313
self.overrideEnv('no_proxy', self.no_proxy_host)
1314
self.overrideEnv('http_proxy', self.proxy_url)
1315
self.assertNotProxied()
1243
self.not_proxied_in_env({'http_proxy': self.proxy_url,
1244
'no_proxy': self.no_proxy_host})
1317
1246
def test_HTTP_PROXY_with_NO_PROXY(self):
1318
1247
if self._testing_pycurl():
1319
1248
raise tests.TestNotApplicable(
1320
1249
'pycurl does not check HTTP_PROXY for security reasons')
1321
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1322
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1323
self.assertNotProxied()
1250
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1251
'NO_PROXY': self.no_proxy_host})
1325
1253
def test_all_proxy_with_no_proxy(self):
1326
self.overrideEnv('no_proxy', self.no_proxy_host)
1327
self.overrideEnv('all_proxy', self.proxy_url)
1328
self.assertNotProxied()
1254
self.not_proxied_in_env({'all_proxy': self.proxy_url,
1255
'no_proxy': self.no_proxy_host})
1330
1257
def test_ALL_PROXY_with_NO_PROXY(self):
1331
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1332
self.overrideEnv('ALL_PROXY', self.proxy_url)
1333
self.assertNotProxied()
1258
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1259
'NO_PROXY': self.no_proxy_host})
1335
1261
def test_http_proxy_without_scheme(self):
1336
self.overrideEnv('http_proxy', self.server_host_port)
1337
1262
if self._testing_pycurl():
1338
1263
# pycurl *ignores* invalid proxy env variables. If that ever change
1339
1264
# in the future, this test will fail indicating that pycurl do not
1340
1265
# ignore anymore such variables.
1341
self.assertNotProxied()
1266
self.not_proxied_in_env({'http_proxy': self.proxy_address})
1343
self.assertRaises(errors.InvalidURL, self.assertProxied)
1268
self.assertRaises(errors.InvalidURL,
1269
self.proxied_in_env,
1270
{'http_proxy': self.proxy_address})
1346
1273
class TestRanges(http_utils.TestCaseWithWebserver):
1347
1274
"""Test the Range header in GET methods."""
1349
scenarios = multiply_scenarios(
1350
vary_by_http_client_implementation(),
1351
vary_by_http_protocol_version(),
1354
1276
def setUp(self):
1355
super(TestRanges, self).setUp()
1277
http_utils.TestCaseWithWebserver.setUp(self)
1356
1278
self.build_tree_contents([('a', '0123456789')],)
1279
server = self.get_readonly_server()
1280
self.transport = self._transport(server.get_url())
1358
1282
def create_transport_readonly_server(self):
1359
1283
return http_server.HttpServer(protocol_version=self._protocol_version)
1361
1285
def _file_contents(self, relpath, ranges):
1362
t = self.get_readonly_transport()
1363
1286
offsets = [ (start, end - start + 1) for start, end in ranges]
1364
coalesce = t._coalesce_offsets
1287
coalesce = self.transport._coalesce_offsets
1365
1288
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1366
code, data = t._get(relpath, coalesced)
1289
code, data = self.transport._get(relpath, coalesced)
1367
1290
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1368
1291
for start, end in ranges:
1369
1292
data.seek(start)
1370
1293
yield data.read(end - start + 1)
1372
1295
def _file_tail(self, relpath, tail_amount):
1373
t = self.get_readonly_transport()
1374
code, data = t._get(relpath, [], tail_amount)
1296
code, data = self.transport._get(relpath, [], tail_amount)
1375
1297
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1376
1298
data.seek(-tail_amount, 2)
1377
1299
return data.read(tail_amount)
1495
1400
('5/a', 'redirected 5 times'),
1403
self.old_transport = self._transport(self.old_server.get_url())
1405
def setup_redirected_request(self):
1406
self.original_class = _urllib2_wrappers.Request
1407
_urllib2_wrappers.Request = RedirectedRequest
1409
def cleanup_redirected_request(self):
1410
_urllib2_wrappers.Request = self.original_class
1412
def create_transport_secondary_server(self):
1413
"""Create the secondary server, redirections are defined in the tests"""
1414
return http_utils.HTTPServerRedirecting(
1415
protocol_version=self._protocol_version)
1498
1417
def test_one_redirection(self):
1499
t = self.get_old_transport()
1500
req = RedirectedRequest('GET', t._remote_path('a'))
1418
t = self.old_transport
1420
req = RedirectedRequest('GET', t.abspath('a'))
1421
req.follow_redirections = True
1501
1422
new_prefix = 'http://%s:%s' % (self.new_server.host,
1502
1423
self.new_server.port)
1503
1424
self.old_server.redirections = \
1504
1425
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
1505
self.assertEqual('redirected once', t._perform(req).read())
1426
self.assertEquals('redirected once',t._perform(req).read())
1507
1428
def test_five_redirections(self):
1508
t = self.get_old_transport()
1509
req = RedirectedRequest('GET', t._remote_path('a'))
1429
t = self.old_transport
1431
req = RedirectedRequest('GET', t.abspath('a'))
1432
req.follow_redirections = True
1510
1433
old_prefix = 'http://%s:%s' % (self.old_server.host,
1511
1434
self.old_server.port)
1512
1435
new_prefix = 'http://%s:%s' % (self.new_server.host,
1518
1441
('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1519
1442
('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1521
self.assertEqual('redirected 5 times', t._perform(req).read())
1444
self.assertEquals('redirected 5 times',t._perform(req).read())
1524
1447
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1525
1448
"""Test transport.do_catching_redirections."""
1527
scenarios = multiply_scenarios(
1528
vary_by_http_client_implementation(),
1529
vary_by_http_protocol_version(),
1532
1450
def setUp(self):
1533
1451
super(TestDoCatchRedirections, self).setUp()
1534
1452
self.build_tree_contents([('a', '0123456789'),],)
1535
cleanup_http_redirection_connections(self)
1537
self.old_transport = self.get_old_transport()
1454
self.old_transport = self._transport(self.old_server.get_url())
1456
def get_a(self, transport):
1457
return transport.get('a')
1542
1459
def test_no_redirection(self):
1543
t = self.get_new_transport()
1460
t = self._transport(self.new_server.get_url())
1545
1462
# We use None for redirected so that we fail if redirected
1546
self.assertEqual('0123456789',
1547
transport.do_catching_redirections(
1463
self.assertEquals('0123456789',
1464
transport.do_catching_redirections(
1548
1465
self.get_a, t, None).read())
1550
1467
def test_one_redirection(self):
1551
1468
self.redirections = 0
1553
def redirected(t, exception, redirection_notice):
1470
def redirected(transport, exception, redirection_notice):
1554
1471
self.redirections += 1
1555
redirected_t = t._redirected_to(exception.source, exception.target)
1472
dir, file = urlutils.split(exception.target)
1473
return self._transport(dir)
1558
self.assertEqual('0123456789',
1559
transport.do_catching_redirections(
1475
self.assertEquals('0123456789',
1476
transport.do_catching_redirections(
1560
1477
self.get_a, self.old_transport, redirected).read())
1561
self.assertEqual(1, self.redirections)
1478
self.assertEquals(1, self.redirections)
1563
1480
def test_redirection_loop(self):
1573
1490
self.get_a, self.old_transport, redirected)
1576
def _setup_authentication_config(**kwargs):
1577
conf = config.AuthenticationConfig()
1578
conf._get_config().update({'httptest': kwargs})
1582
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1583
"""Unit tests for glue by which urllib2 asks us for authentication"""
1585
def test_get_user_password_without_port(self):
1586
"""We cope if urllib2 doesn't tell us the port.
1588
See https://bugs.launchpad.net/bzr/+bug/654684
1592
_setup_authentication_config(scheme='http', host='localhost',
1593
user=user, password=password)
1594
handler = _urllib2_wrappers.HTTPAuthHandler()
1595
got_pass = handler.get_user_password(dict(
1602
self.assertEqual((user, password), got_pass)
1605
1493
class TestAuth(http_utils.TestCaseWithWebserver):
1606
1494
"""Test authentication scheme"""
1608
scenarios = multiply_scenarios(
1609
vary_by_http_client_implementation(),
1610
vary_by_http_protocol_version(),
1611
vary_by_http_auth_scheme(),
1496
_auth_header = 'Authorization'
1497
_password_prompt_prefix = ''
1498
_username_prompt_prefix = ''
1614
1502
def setUp(self):
1615
1503
super(TestAuth, self).setUp()
1767
1652
# Only one 'Authentication Required' error should occur
1768
1653
self.assertEqual(1, self.server.auth_required_errors)
1655
def test_user_from_auth_conf(self):
1656
if self._testing_pycurl():
1657
raise tests.TestNotApplicable(
1658
'pycurl does not support authentication.conf')
1661
self.server.add_user(user, password)
1662
# Create a minimal config file with the right password
1663
conf = config.AuthenticationConfig()
1664
conf._get_config().update(
1665
{'httptest': {'scheme': 'http', 'port': self.server.port,
1666
'user': user, 'password': password}})
1668
t = self.get_user_transport(None, None)
1669
# Issue a request to the server to connect
1670
self.assertEqual('contents of a\n', t.get('a').read())
1671
# Only one 'Authentication Required' error should occur
1672
self.assertEqual(1, self.server.auth_required_errors)
1770
1674
def test_changing_nonce(self):
1771
1675
if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1772
1676
http_utils.ProxyDigestAuthServer):
1773
1677
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1774
1678
if self._testing_pycurl():
1679
raise tests.KnownFailure(
1776
1680
'pycurl does not handle a nonce change')
1777
1681
self.server.add_user('joe', 'foo')
1778
1682
t = self.get_user_transport('joe', 'foo')
1788
1692
# initial 'who are you' and a second 'who are you' with the new nonce)
1789
1693
self.assertEqual(2, self.server.auth_required_errors)
1791
def test_user_from_auth_conf(self):
1792
if self._testing_pycurl():
1793
raise tests.TestNotApplicable(
1794
'pycurl does not support authentication.conf')
1797
self.server.add_user(user, password)
1798
_setup_authentication_config(scheme='http', port=self.server.port,
1799
user=user, password=password)
1800
t = self.get_user_transport(None, None)
1801
# Issue a request to the server to connect
1802
self.assertEqual('contents of a\n', t.get('a').read())
1803
# Only one 'Authentication Required' error should occur
1804
self.assertEqual(1, self.server.auth_required_errors)
1806
def test_no_credential_leaks_in_log(self):
1807
self.overrideAttr(debug, 'debug_flags', set(['http']))
1809
password = 'very-sensitive-password'
1810
self.server.add_user(user, password)
1811
t = self.get_user_transport(user, password)
1812
# Capture the debug calls to mutter
1815
lines = args[0] % args[1:]
1816
# Some calls output multiple lines, just split them now since we
1817
# care about a single one later.
1818
self.mutters.extend(lines.splitlines())
1819
self.overrideAttr(trace, 'mutter', mutter)
1820
# Issue a request to the server to connect
1821
self.assertEqual(True, t.has('a'))
1822
# Only one 'Authentication Required' error should occur
1823
self.assertEqual(1, self.server.auth_required_errors)
1824
# Since the authentification succeeded, there should be a corresponding
1826
sent_auth_headers = [line for line in self.mutters
1827
if line.startswith('> %s' % (self._auth_header,))]
1828
self.assertLength(1, sent_auth_headers)
1829
self.assertStartsWith(sent_auth_headers[0],
1830
'> %s: <masked>' % (self._auth_header,))
1833
1697
class TestProxyAuth(TestAuth):
1834
"""Test proxy authentication schemes.
1836
This inherits from TestAuth to tweak the setUp and filter some failing
1840
scenarios = multiply_scenarios(
1841
vary_by_http_client_implementation(),
1842
vary_by_http_protocol_version(),
1843
vary_by_http_proxy_auth_scheme(),
1698
"""Test proxy authentication schemes."""
1700
_auth_header = 'Proxy-authorization'
1701
_password_prompt_prefix = 'Proxy '
1702
_username_prompt_prefix = 'Proxy '
1846
1704
def setUp(self):
1847
1705
super(TestProxyAuth, self).setUp()
1707
self.addCleanup(self._restore_env)
1848
1708
# Override the contents to avoid false positives
1849
1709
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1850
1710
('b', 'not proxied contents of b\n'),
1891
1758
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
1893
scenarios = multiply_scenarios(
1894
vary_by_http_client_implementation(),
1895
vary_by_http_protocol_version(),
1898
1760
def setUp(self):
1899
1761
super(SmartHTTPTunnellingTest, self).setUp()
1900
1762
# We use the VFS layer as part of HTTP tunnelling tests.
1901
self.overrideEnv('BZR_NO_SMART_VFS', None)
1763
self._captureVar('BZR_NO_SMART_VFS', None)
1902
1764
self.transport_readonly_server = http_utils.HTTPServerWithSmarts
1903
self.http_server = self.get_readonly_server()
1905
1766
def create_transport_readonly_server(self):
1906
server = http_utils.HTTPServerWithSmarts(
1767
return http_utils.HTTPServerWithSmarts(
1907
1768
protocol_version=self._protocol_version)
1908
server._url_protocol = self._url_protocol
1911
def test_open_controldir(self):
1770
def test_open_bzrdir(self):
1912
1771
branch = self.make_branch('relpath')
1913
url = self.http_server.get_url() + 'relpath'
1914
bd = controldir.ControlDir.open(url)
1915
self.addCleanup(bd.transport.disconnect)
1772
http_server = self.get_readonly_server()
1773
url = http_server.get_url() + 'relpath'
1774
bd = bzrdir.BzrDir.open(url)
1916
1775
self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1918
1777
def test_bulk_data(self):
2010
1866
# Both transports share the some connection (one can argue that we
2011
1867
# should return the exact same transport here, but that seems
2013
self.assertEqual(t._get_connection(), r._get_connection())
1869
self.assertEquals(t._get_connection(), r._get_connection())
2015
1871
def test_redirected_to_host(self):
2016
1872
t = self._transport('http://www.example.com/foo')
2017
1873
r = t._redirected_to('http://www.example.com/foo',
2018
1874
'http://foo.example.com/foo/subdir')
2019
1875
self.assertIsInstance(r, type(t))
2020
self.assertEqual('http://foo.example.com/foo/subdir/',
2023
1877
def test_redirected_to_same_host_sibling_protocol(self):
2024
1878
t = self._transport('http://www.example.com/foo')
2025
1879
r = t._redirected_to('http://www.example.com/foo',
2026
1880
'https://www.example.com/foo')
2027
1881
self.assertIsInstance(r, type(t))
2028
self.assertEqual('https://www.example.com/foo/',
2031
1883
def test_redirected_to_same_host_different_protocol(self):
2032
1884
t = self._transport('http://www.example.com/foo')
2033
1885
r = t._redirected_to('http://www.example.com/foo',
2034
1886
'ftp://www.example.com/foo')
2035
self.assertNotEqual(type(r), type(t))
2036
self.assertEqual('ftp://www.example.com/foo/', r.external_url())
2038
def test_redirected_to_same_host_specific_implementation(self):
2039
t = self._transport('http://www.example.com/foo')
2040
r = t._redirected_to('http://www.example.com/foo',
2041
'https+urllib://www.example.com/foo')
2042
self.assertEqual('https://www.example.com/foo/', r.external_url())
1887
self.assertNotEquals(type(r), type(t))
2044
1889
def test_redirected_to_different_host_same_user(self):
2045
1890
t = self._transport('http://joe@www.example.com/foo')
2046
1891
r = t._redirected_to('http://www.example.com/foo',
2047
1892
'https://foo.example.com/foo')
2048
1893
self.assertIsInstance(r, type(t))
2049
self.assertEqual(t._parsed_url.user, r._parsed_url.user)
2050
self.assertEqual('https://joe@foo.example.com/foo/', r.external_url())
1894
self.assertEquals(t._user, r._user)
2053
1897
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
2249
2097
t = self.get_transport()
2250
2098
# We must send a single line of body bytes, see
2251
# PredefinedRequestHandler._handle_one_request
2099
# PredefinedRequestHandler.handle_one_request
2252
2100
code, f = t._post('abc def end-of-body\n')
2253
2101
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2254
2102
self.assertActivitiesMatch()
2257
class TestActivity(tests.TestCase, TestActivityMixin):
2259
scenarios = multiply_scenarios(
2260
vary_by_http_activity(),
2261
vary_by_http_protocol_version(),
2265
super(TestActivity, self).setUp()
2266
TestActivityMixin.setUp(self)
2269
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2271
# Unlike TestActivity, we are really testing ReportingFileSocket and
2272
# ReportingSocket, so we don't need all the parametrization. Since
2273
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2274
# test them through their use by the transport than directly (that's a
2275
# bit less clean but far more simpler and effective).
2276
_activity_server = ActivityHTTPServer
2277
_protocol_version = 'HTTP/1.1'
2280
super(TestNoReportActivity, self).setUp()
2281
self._transport =_urllib.HttpTransport_urllib
2282
TestActivityMixin.setUp(self)
2284
def assertActivitiesMatch(self):
2285
# Nothing to check here
2289
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2290
"""Test authentication on the redirected http server."""
2292
scenarios = vary_by_http_protocol_version()
2294
_auth_header = 'Authorization'
2295
_password_prompt_prefix = ''
2296
_username_prompt_prefix = ''
2297
_auth_server = http_utils.HTTPBasicAuthServer
2298
_transport = _urllib.HttpTransport_urllib
2301
super(TestAuthOnRedirected, self).setUp()
2302
self.build_tree_contents([('a','a'),
2304
('1/a', 'redirected once'),
2306
new_prefix = 'http://%s:%s' % (self.new_server.host,
2307
self.new_server.port)
2308
self.old_server.redirections = [
2309
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2310
self.old_transport = self.get_old_transport()
2311
self.new_server.add_user('joe', 'foo')
2312
cleanup_http_redirection_connections(self)
2314
def create_transport_readonly_server(self):
2315
server = self._auth_server(protocol_version=self._protocol_version)
2316
server._url_protocol = self._url_protocol
2322
def test_auth_on_redirected_via_do_catching_redirections(self):
2323
self.redirections = 0
2325
def redirected(t, exception, redirection_notice):
2326
self.redirections += 1
2327
redirected_t = t._redirected_to(exception.source, exception.target)
2328
self.addCleanup(redirected_t.disconnect)
2331
stdout = tests.StringIOWrapper()
2332
stderr = tests.StringIOWrapper()
2333
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2334
stdout=stdout, stderr=stderr)
2335
self.assertEqual('redirected once',
2336
transport.do_catching_redirections(
2337
self.get_a, self.old_transport, redirected).read())
2338
self.assertEqual(1, self.redirections)
2339
# stdin should be empty
2340
self.assertEqual('', ui.ui_factory.stdin.readline())
2341
# stdout should be empty, stderr will contains the prompts
2342
self.assertEqual('', stdout.getvalue())
2344
def test_auth_on_redirected_via_following_redirections(self):
2345
self.new_server.add_user('joe', 'foo')
2346
stdout = tests.StringIOWrapper()
2347
stderr = tests.StringIOWrapper()
2348
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2349
stdout=stdout, stderr=stderr)
2350
t = self.old_transport
2351
req = RedirectedRequest('GET', t.abspath('a'))
2352
new_prefix = 'http://%s:%s' % (self.new_server.host,
2353
self.new_server.port)
2354
self.old_server.redirections = [
2355
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2356
self.assertEqual('redirected once', t._perform(req).read())
2357
# stdin should be empty
2358
self.assertEqual('', ui.ui_factory.stdin.readline())
2359
# stdout should be empty, stderr will contains the prompts
2360
self.assertEqual('', stdout.getvalue())