66
if features.pycurl.available():
67
65
from bzrlib.transport.http._pycurl import PyCurlTransport
70
load_tests = load_tests_apply_scenarios
73
def vary_by_http_client_implementation():
74
"""Test the two libraries we can use, pycurl and urllib."""
67
except errors.DependencyNotPresent:
68
pycurl_present = False
71
def load_tests(standard_tests, module, loader):
72
"""Multiply tests for http clients and protocol versions."""
73
result = loader.suiteClass()
75
# one for each transport implementation
76
t_tests, remaining_tests = tests.split_suite_by_condition(
77
standard_tests, tests.condition_isinstance((
78
TestHttpTransportRegistration,
79
TestHttpTransportUrls,
75
82
transport_scenarios = [
76
83
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
77
84
_server=http_server.HttpServer_urllib,
78
_url_protocol='http+urllib',)),
85
_qualified_prefix='http+urllib',)),
80
if features.pycurl.available():
81
88
transport_scenarios.append(
82
89
('pycurl', dict(_transport=PyCurlTransport,
83
90
_server=http_server.HttpServer_PyCurl,
84
_url_protocol='http+pycurl',)))
85
return transport_scenarios
88
def vary_by_http_protocol_version():
89
"""Test on http/1.0 and 1.1"""
91
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
92
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
91
_qualified_prefix='http+pycurl',)))
92
tests.multiply_tests(t_tests, transport_scenarios, result)
94
# each implementation tested with each HTTP version
95
tp_tests, remaining_tests = tests.split_suite_by_condition(
96
remaining_tests, tests.condition_isinstance((
97
SmartHTTPTunnellingTest,
98
TestDoCatchRedirections,
100
TestHTTPRedirections,
101
TestHTTPSilentRedirections,
102
TestLimitedRangeRequestServer,
106
TestSpecificRequestHandler,
108
protocol_scenarios = [
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
112
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
116
# proxy auth: each auth scheme on all http versions on all implementations.
117
tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
remaining_tests, tests.condition_isinstance((
121
proxy_auth_scheme_scenarios = [
122
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
125
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
96
def vary_by_http_auth_scheme():
127
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
proxy_auth_scheme_scenarios)
129
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
131
# auth: each auth scheme on all http versions on all implementations.
132
tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
remaining_tests, tests.condition_isinstance((
136
auth_scheme_scenarios = [
98
137
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
99
138
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
101
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
103
# Add some attributes common to all scenarios
104
for scenario_id, scenario_dict in scenarios:
105
scenario_dict.update(_auth_header='Authorization',
106
_username_prompt_prefix='',
107
_password_prompt_prefix='')
111
def vary_by_http_proxy_auth_scheme():
113
('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
114
('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
115
('proxy-basicdigest',
116
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
118
# Add some attributes common to all scenarios
119
for scenario_id, scenario_dict in scenarios:
120
scenario_dict.update(_auth_header='Proxy-Authorization',
121
_username_prompt_prefix='Proxy ',
122
_password_prompt_prefix='Proxy ')
126
def vary_by_http_activity():
140
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
142
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
auth_scheme_scenarios)
144
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
146
# activity: activity on all http versions on all implementations
147
tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
remaining_tests, tests.condition_isinstance((
127
151
activity_scenarios = [
128
('urllib,http', dict(_activity_server=ActivityHTTPServer,
129
_transport=_urllib.HttpTransport_urllib,)),
152
('http', dict(_activity_server=ActivityHTTPServer)),
131
154
if tests.HTTPSServerFeature.available():
132
155
activity_scenarios.append(
133
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
134
_transport=_urllib.HttpTransport_urllib,)),)
135
if features.pycurl.available():
136
activity_scenarios.append(
137
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
138
_transport=PyCurlTransport,)),)
139
if tests.HTTPSServerFeature.available():
140
from bzrlib.tests import (
143
# FIXME: Until we have a better way to handle self-signed
144
# certificates (like allowing them in a test specific
145
# authentication.conf for example), we need some specialized pycurl
146
# transport for tests.
147
class HTTPS_pycurl_transport(PyCurlTransport):
149
def __init__(self, base, _from_transport=None):
150
super(HTTPS_pycurl_transport, self).__init__(
151
base, _from_transport)
152
self.cabundle = str(ssl_certs.build_path('ca.crt'))
154
activity_scenarios.append(
155
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
156
_transport=HTTPS_pycurl_transport,)),)
157
return activity_scenarios
156
('https', dict(_activity_server=ActivityHTTPSServer)))
157
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
159
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
161
# No parametrization for the remaining tests
162
result.addTests(remaining_tests)
160
167
class FakeManager(object):
185
192
self.received_bytes = ''
189
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
191
def start_server(self):
192
195
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
193
196
self._sock.bind(('127.0.0.1', 0))
194
197
self.host, self.port = self._sock.getsockname()
195
198
self._ready = threading.Event()
196
self._thread = test_server.TestThread(
197
sync_event=self._ready, target=self._accept_read_and_reply)
199
self._thread = threading.Thread(target=self._accept_read_and_reply)
200
self._thread.setDaemon(True)
198
201
self._thread.start()
199
if 'threads' in tests.selftest_debug_flags:
200
sys.stderr.write('Thread started: %s\n' % (self._thread.ident,))
203
204
def _accept_read_and_reply(self):
204
205
self._sock.listen(1)
205
206
self._ready.set()
206
conn, address = self._sock.accept()
207
if self._expect_body_tail is not None:
207
self._sock.settimeout(5)
209
conn, address = self._sock.accept()
210
# On win32, the accepted connection will be non-blocking to start
211
# with because we're using settimeout.
212
conn.setblocking(True)
208
213
while not self.received_bytes.endswith(self._expect_body_tail):
209
214
self.received_bytes += conn.recv(4096)
210
215
conn.sendall('HTTP/1.1 200 OK\r\n')
216
except socket.timeout:
217
# Make sure the client isn't stuck waiting for us to e.g. accept.
212
218
self._sock.close()
213
219
except socket.error:
214
220
# The client may have already closed the socket.
217
def stop_server(self):
219
# Issue a fake connection to wake up the server and allow it to
221
fake_conn = osutils.connect_socket((self.host, self.port))
223
226
except socket.error:
224
227
# We might have already closed it. We don't care.
229
if 'threads' in tests.selftest_debug_flags:
230
sys.stderr.write('Thread joined: %s\n' % (self._thread.ident,))
233
233
class TestAuthHeader(tests.TestCase):
260
260
_urllib2_wrappers.BasicAuthHandler)
261
261
match, realm = self.auth_handler.extract_realm(remainder)
262
262
self.assertTrue(match is not None)
263
self.assertEqual('Thou should not pass', realm)
263
self.assertEquals('Thou should not pass', realm)
265
265
def test_digest_header(self):
266
266
scheme, remainder = self.parse_header(
267
267
'Digest realm="Thou should not pass"')
268
self.assertEqual('digest', scheme)
269
self.assertEqual('realm="Thou should not pass"', remainder)
272
class TestHTTPRangeParsing(tests.TestCase):
275
super(TestHTTPRangeParsing, self).setUp()
276
# We focus on range parsing here and ignore everything else
277
class RequestHandler(http_server.TestingHTTPRequestHandler):
278
def setup(self): pass
279
def handle(self): pass
280
def finish(self): pass
282
self.req_handler = RequestHandler(None, None, None)
284
def assertRanges(self, ranges, header, file_size):
285
self.assertEquals(ranges,
286
self.req_handler._parse_ranges(header, file_size))
288
def test_simple_range(self):
289
self.assertRanges([(0,2)], 'bytes=0-2', 12)
292
self.assertRanges([(8, 11)], 'bytes=-4', 12)
294
def test_tail_bigger_than_file(self):
295
self.assertRanges([(0, 11)], 'bytes=-99', 12)
297
def test_range_without_end(self):
298
self.assertRanges([(4, 11)], 'bytes=4-', 12)
300
def test_invalid_ranges(self):
301
self.assertRanges(None, 'bytes=12-22', 12)
302
self.assertRanges(None, 'bytes=1-3,12-22', 12)
303
self.assertRanges(None, 'bytes=-', 12)
268
self.assertEquals('digest', scheme)
269
self.assertEquals('realm="Thou should not pass"', remainder)
306
272
class TestHTTPServer(tests.TestCase):
386
366
def test_url_parsing(self):
387
367
f = FakeManager()
388
368
url = http.extract_auth('http://example.com', f)
389
self.assertEqual('http://example.com', url)
390
self.assertEqual(0, len(f.credentials))
369
self.assertEquals('http://example.com', url)
370
self.assertEquals(0, len(f.credentials))
391
371
url = http.extract_auth(
392
'http://user:pass@example.com/bzr/bzr.dev', f)
393
self.assertEqual('http://example.com/bzr/bzr.dev', url)
394
self.assertEqual(1, len(f.credentials))
395
self.assertEqual([None, 'example.com', 'user', 'pass'],
372
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
373
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
374
self.assertEquals(1, len(f.credentials))
375
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
399
379
class TestHttpTransportUrls(tests.TestCase):
400
380
"""Test the http urls."""
402
scenarios = vary_by_http_client_implementation()
404
382
def test_abs_url(self):
405
383
"""Construction of absolute http URLs"""
406
t = self._transport('http://example.com/bzr/bzr.dev/')
384
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
407
385
eq = self.assertEqualDiff
408
eq(t.abspath('.'), 'http://example.com/bzr/bzr.dev')
409
eq(t.abspath('foo/bar'), 'http://example.com/bzr/bzr.dev/foo/bar')
410
eq(t.abspath('.bzr'), 'http://example.com/bzr/bzr.dev/.bzr')
386
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
387
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
388
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
411
389
eq(t.abspath('.bzr/1//2/./3'),
412
'http://example.com/bzr/bzr.dev/.bzr/1/2/3')
390
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
414
392
def test_invalid_http_urls(self):
415
393
"""Trap invalid construction of urls"""
416
self._transport('http://example.com/bzr/bzr.dev/')
394
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
417
395
self.assertRaises(errors.InvalidURL,
419
'http://http://example.com/bzr/bzr.dev/')
397
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
421
399
def test_http_root_urls(self):
422
400
"""Construction of URLs from server root"""
423
t = self._transport('http://example.com/')
401
t = self._transport('http://bzr.ozlabs.org/')
424
402
eq = self.assertEqualDiff
425
403
eq(t.abspath('.bzr/tree-version'),
426
'http://example.com/.bzr/tree-version')
404
'http://bzr.ozlabs.org/.bzr/tree-version')
428
406
def test_http_impl_urls(self):
429
407
"""There are servers which ask for particular clients to connect"""
430
408
server = self._server()
431
server.start_server()
433
411
url = server.get_url()
434
self.assertTrue(url.startswith('%s://' % self._url_protocol))
412
self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
439
417
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
448
426
https by supplying a fake version_info that do not
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
432
raise tests.TestSkipped('pycurl not present')
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
434
version_info_orig = pycurl.version_info
436
# Now that we have pycurl imported, we can fake its version_info
437
# This was taken from a windows pycurl without SSL
439
pycurl.version_info = lambda : (2,
447
('ftp', 'gopher', 'telnet',
448
'dict', 'ldap', 'http', 'file'),
452
self.assertRaises(errors.DependencyNotPresent, self._transport,
453
'https://launchpad.net')
455
# Restore the right function
456
pycurl.version_info = version_info_orig
475
459
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
476
460
"""Test the http connections."""
478
scenarios = multiply_scenarios(
479
vary_by_http_client_implementation(),
480
vary_by_http_protocol_version(),
484
463
http_utils.TestCaseWithWebserver.setUp(self)
485
464
self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
530
509
class TestHttpTransportRegistration(tests.TestCase):
531
510
"""Test registrations of various http implementations"""
533
scenarios = vary_by_http_client_implementation()
535
512
def test_http_registered(self):
536
t = transport.get_transport('%s://foo.com/' % self._url_protocol)
513
t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
537
514
self.assertIsInstance(t, transport.Transport)
538
515
self.assertIsInstance(t, self._transport)
541
518
class TestPost(tests.TestCase):
543
scenarios = multiply_scenarios(
544
vary_by_http_client_implementation(),
545
vary_by_http_protocol_version(),
548
520
def test_post_body_is_received(self):
549
server = RecordingServer(expect_body_tail='end-of-body',
550
scheme=self._url_protocol)
551
self.start_server(server)
552
url = server.get_url()
553
# FIXME: needs a cleanup -- vila 20100611
554
http_transport = transport.get_transport(url)
521
server = RecordingServer(expect_body_tail='end-of-body')
523
self.addCleanup(server.tearDown)
524
scheme = self._qualified_prefix
525
url = '%s://%s:%s/' % (scheme, server.host, server.port)
526
http_transport = self._transport(url)
555
527
code, response = http_transport._post('abc def end-of-body')
557
529
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
558
530
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
559
self.assertTrue('content-type: application/octet-stream\r'
560
in server.received_bytes.lower())
561
531
# The transport should not be assuming that the server can accept
562
532
# chunked encoding the first time it connects, because HTTP/1.1, so we
563
533
# check for the literal string.
1122
1095
Only the urllib implementation is tested here.
1099
tests.TestCase.setUp(self)
1104
tests.TestCase.tearDown(self)
1106
def _install_env(self, env):
1107
for name, value in env.iteritems():
1108
self._old_env[name] = osutils.set_or_unset_env(name, value)
1110
def _restore_env(self):
1111
for name, value in self._old_env.iteritems():
1112
osutils.set_or_unset_env(name, value)
1125
1114
def _proxied_request(self):
1126
1115
handler = _urllib2_wrappers.ProxyHandler()
1127
request = _urllib2_wrappers.Request('GET', 'http://baz/buzzle')
1116
request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1128
1117
handler.set_proxy(request, 'http')
1131
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1132
handler = _urllib2_wrappers.ProxyHandler()
1133
self.assertEquals(expected,
1134
handler.evaluate_proxy_bypass(host, no_proxy))
1136
1120
def test_empty_user(self):
1137
self.overrideEnv('http_proxy', 'http://bar.com')
1138
request = self._proxied_request()
1139
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1141
def test_user_with_at(self):
1142
self.overrideEnv('http_proxy',
1143
'http://username@domain:password@proxy_host:1234')
1121
self._install_env({'http_proxy': 'http://bar.com'})
1144
1122
request = self._proxied_request()
1145
1123
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1147
1125
def test_invalid_proxy(self):
1148
1126
"""A proxy env variable without scheme"""
1149
self.overrideEnv('http_proxy', 'host:1234')
1127
self._install_env({'http_proxy': 'host:1234'})
1150
1128
self.assertRaises(errors.InvalidURL, self._proxied_request)
1152
def test_evaluate_proxy_bypass_true(self):
1153
"""The host is not proxied"""
1154
self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1155
self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1157
def test_evaluate_proxy_bypass_false(self):
1158
"""The host is proxied"""
1159
self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1161
def test_evaluate_proxy_bypass_unknown(self):
1162
"""The host is not explicitly proxied"""
1163
self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1164
self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1166
def test_evaluate_proxy_bypass_empty_entries(self):
1167
"""Ignore empty entries"""
1168
self.assertEvaluateProxyBypass(None, 'example.com', '')
1169
self.assertEvaluateProxyBypass(None, 'example.com', ',')
1170
self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1173
1131
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1174
1132
"""Tests proxy server.
1179
1137
to the file names).
1182
scenarios = multiply_scenarios(
1183
vary_by_http_client_implementation(),
1184
vary_by_http_protocol_version(),
1187
1140
# FIXME: We don't have an https server available, so we don't
1188
# test https connections. --vila toolongago
1141
# test https connections.
1190
1143
def setUp(self):
1191
1144
super(TestProxyHttpServer, self).setUp()
1192
self.transport_secondary_server = http_utils.ProxyServer
1193
1145
self.build_tree_contents([('foo', 'contents of foo\n'),
1194
1146
('foo-proxied', 'proxied contents of foo\n')])
1195
1147
# Let's setup some attributes for tests
1196
server = self.get_readonly_server()
1197
self.server_host_port = '%s:%d' % (server.host, server.port)
1148
self.server = self.get_readonly_server()
1149
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1198
1150
if self._testing_pycurl():
1199
1151
# Oh my ! pycurl does not check for the port as part of
1200
1152
# no_proxy :-( So we just test the host part
1201
self.no_proxy_host = server.host
1153
self.no_proxy_host = 'localhost'
1203
self.no_proxy_host = self.server_host_port
1155
self.no_proxy_host = self.proxy_address
1204
1156
# The secondary server is the proxy
1205
self.proxy_url = self.get_secondary_url()
1157
self.proxy = self.get_secondary_server()
1158
self.proxy_url = self.proxy.get_url()
1207
1161
def _testing_pycurl(self):
1208
# TODO: This is duplicated for lots of the classes in this file
1209
return (features.pycurl.available()
1210
and self._transport == PyCurlTransport)
1212
def assertProxied(self):
1213
t = self.get_readonly_transport()
1214
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1216
def assertNotProxied(self):
1217
t = self.get_readonly_transport()
1218
self.assertEqual('contents of foo\n', t.get('foo').read())
1162
return pycurl_present and self._transport == PyCurlTransport
1164
def create_transport_secondary_server(self):
1165
"""Creates an http server that will serve files with
1166
'-proxied' appended to their names.
1168
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1170
def _install_env(self, env):
1171
for name, value in env.iteritems():
1172
self._old_env[name] = osutils.set_or_unset_env(name, value)
1174
def _restore_env(self):
1175
for name, value in self._old_env.iteritems():
1176
osutils.set_or_unset_env(name, value)
1178
def proxied_in_env(self, env):
1179
self._install_env(env)
1180
url = self.server.get_url()
1181
t = self._transport(url)
1183
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1187
def not_proxied_in_env(self, env):
1188
self._install_env(env)
1189
url = self.server.get_url()
1190
t = self._transport(url)
1192
self.assertEqual('contents of foo\n', t.get('foo').read())
1220
1196
def test_http_proxy(self):
1221
self.overrideEnv('http_proxy', self.proxy_url)
1222
self.assertProxied()
1197
self.proxied_in_env({'http_proxy': self.proxy_url})
1224
1199
def test_HTTP_PROXY(self):
1225
1200
if self._testing_pycurl():
1228
1203
# about. Should we ?)
1229
1204
raise tests.TestNotApplicable(
1230
1205
'pycurl does not check HTTP_PROXY for security reasons')
1231
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1232
self.assertProxied()
1206
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1234
1208
def test_all_proxy(self):
1235
self.overrideEnv('all_proxy', self.proxy_url)
1236
self.assertProxied()
1209
self.proxied_in_env({'all_proxy': self.proxy_url})
1238
1211
def test_ALL_PROXY(self):
1239
self.overrideEnv('ALL_PROXY', self.proxy_url)
1240
self.assertProxied()
1212
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1242
1214
def test_http_proxy_with_no_proxy(self):
1243
self.overrideEnv('no_proxy', self.no_proxy_host)
1244
self.overrideEnv('http_proxy', self.proxy_url)
1245
self.assertNotProxied()
1215
self.not_proxied_in_env({'http_proxy': self.proxy_url,
1216
'no_proxy': self.no_proxy_host})
1247
1218
def test_HTTP_PROXY_with_NO_PROXY(self):
1248
1219
if self._testing_pycurl():
1249
1220
raise tests.TestNotApplicable(
1250
1221
'pycurl does not check HTTP_PROXY for security reasons')
1251
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1252
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1253
self.assertNotProxied()
1222
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1223
'NO_PROXY': self.no_proxy_host})
1255
1225
def test_all_proxy_with_no_proxy(self):
1256
self.overrideEnv('no_proxy', self.no_proxy_host)
1257
self.overrideEnv('all_proxy', self.proxy_url)
1258
self.assertNotProxied()
1226
self.not_proxied_in_env({'all_proxy': self.proxy_url,
1227
'no_proxy': self.no_proxy_host})
1260
1229
def test_ALL_PROXY_with_NO_PROXY(self):
1261
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1262
self.overrideEnv('ALL_PROXY', self.proxy_url)
1263
self.assertNotProxied()
1230
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1231
'NO_PROXY': self.no_proxy_host})
1265
1233
def test_http_proxy_without_scheme(self):
1266
self.overrideEnv('http_proxy', self.server_host_port)
1267
1234
if self._testing_pycurl():
1268
1235
# pycurl *ignores* invalid proxy env variables. If that ever change
1269
1236
# in the future, this test will fail indicating that pycurl do not
1270
1237
# ignore anymore such variables.
1271
self.assertNotProxied()
1238
self.not_proxied_in_env({'http_proxy': self.proxy_address})
1273
self.assertRaises(errors.InvalidURL, self.assertProxied)
1240
self.assertRaises(errors.InvalidURL,
1241
self.proxied_in_env,
1242
{'http_proxy': self.proxy_address})
1276
1245
class TestRanges(http_utils.TestCaseWithWebserver):
1277
1246
"""Test the Range header in GET methods."""
1279
scenarios = multiply_scenarios(
1280
vary_by_http_client_implementation(),
1281
vary_by_http_protocol_version(),
1284
1248
def setUp(self):
1285
1249
http_utils.TestCaseWithWebserver.setUp(self)
1286
1250
self.build_tree_contents([('a', '0123456789')],)
1251
server = self.get_readonly_server()
1252
self.transport = self._transport(server.get_url())
1288
1254
def create_transport_readonly_server(self):
1289
1255
return http_server.HttpServer(protocol_version=self._protocol_version)
1291
1257
def _file_contents(self, relpath, ranges):
1292
t = self.get_readonly_transport()
1293
1258
offsets = [ (start, end - start + 1) for start, end in ranges]
1294
coalesce = t._coalesce_offsets
1259
coalesce = self.transport._coalesce_offsets
1295
1260
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1296
code, data = t._get(relpath, coalesced)
1261
code, data = self.transport._get(relpath, coalesced)
1297
1262
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1298
1263
for start, end in ranges:
1299
1264
data.seek(start)
1300
1265
yield data.read(end - start + 1)
1302
1267
def _file_tail(self, relpath, tail_amount):
1303
t = self.get_readonly_transport()
1304
code, data = t._get(relpath, [], tail_amount)
1268
code, data = self.transport._get(relpath, [], tail_amount)
1305
1269
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1306
1270
data.seek(-tail_amount, 2)
1307
1271
return data.read(tail_amount)
1425
1372
('5/a', 'redirected 5 times'),
1375
self.old_transport = self._transport(self.old_server.get_url())
1377
def setup_redirected_request(self):
1378
self.original_class = _urllib2_wrappers.Request
1379
_urllib2_wrappers.Request = RedirectedRequest
1381
def cleanup_redirected_request(self):
1382
_urllib2_wrappers.Request = self.original_class
1384
def create_transport_secondary_server(self):
1385
"""Create the secondary server, redirections are defined in the tests"""
1386
return http_utils.HTTPServerRedirecting(
1387
protocol_version=self._protocol_version)
1428
1389
def test_one_redirection(self):
1429
t = self.get_old_transport()
1430
req = RedirectedRequest('GET', t._remote_path('a'))
1390
t = self.old_transport
1392
req = RedirectedRequest('GET', t.abspath('a'))
1393
req.follow_redirections = True
1431
1394
new_prefix = 'http://%s:%s' % (self.new_server.host,
1432
1395
self.new_server.port)
1433
1396
self.old_server.redirections = \
1434
1397
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
1435
self.assertEqual('redirected once', t._perform(req).read())
1398
self.assertEquals('redirected once',t._perform(req).read())
1437
1400
def test_five_redirections(self):
1438
t = self.get_old_transport()
1439
req = RedirectedRequest('GET', t._remote_path('a'))
1401
t = self.old_transport
1403
req = RedirectedRequest('GET', t.abspath('a'))
1404
req.follow_redirections = True
1440
1405
old_prefix = 'http://%s:%s' % (self.old_server.host,
1441
1406
self.old_server.port)
1442
1407
new_prefix = 'http://%s:%s' % (self.new_server.host,
1448
1413
('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1449
1414
('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1451
self.assertEqual('redirected 5 times', t._perform(req).read())
1416
self.assertEquals('redirected 5 times',t._perform(req).read())
1454
1419
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1455
1420
"""Test transport.do_catching_redirections."""
1457
scenarios = multiply_scenarios(
1458
vary_by_http_client_implementation(),
1459
vary_by_http_protocol_version(),
1462
1422
def setUp(self):
1463
1423
super(TestDoCatchRedirections, self).setUp()
1464
1424
self.build_tree_contents([('a', '0123456789'),],)
1465
cleanup_http_redirection_connections(self)
1467
self.old_transport = self.get_old_transport()
1426
self.old_transport = self._transport(self.old_server.get_url())
1428
def get_a(self, transport):
1429
return transport.get('a')
1472
1431
def test_no_redirection(self):
1473
t = self.get_new_transport()
1432
t = self._transport(self.new_server.get_url())
1475
1434
# We use None for redirected so that we fail if redirected
1476
self.assertEqual('0123456789',
1477
transport.do_catching_redirections(
1435
self.assertEquals('0123456789',
1436
transport.do_catching_redirections(
1478
1437
self.get_a, t, None).read())
1480
1439
def test_one_redirection(self):
1481
1440
self.redirections = 0
1483
def redirected(t, exception, redirection_notice):
1442
def redirected(transport, exception, redirection_notice):
1484
1443
self.redirections += 1
1485
redirected_t = t._redirected_to(exception.source, exception.target)
1444
dir, file = urlutils.split(exception.target)
1445
return self._transport(dir)
1488
self.assertEqual('0123456789',
1489
transport.do_catching_redirections(
1447
self.assertEquals('0123456789',
1448
transport.do_catching_redirections(
1490
1449
self.get_a, self.old_transport, redirected).read())
1491
self.assertEqual(1, self.redirections)
1450
self.assertEquals(1, self.redirections)
1493
1452
def test_redirection_loop(self):
1503
1462
self.get_a, self.old_transport, redirected)
1506
def _setup_authentication_config(**kwargs):
1507
conf = config.AuthenticationConfig()
1508
conf._get_config().update({'httptest': kwargs})
1512
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1513
"""Unit tests for glue by which urllib2 asks us for authentication"""
1515
def test_get_user_password_without_port(self):
1516
"""We cope if urllib2 doesn't tell us the port.
1518
See https://bugs.launchpad.net/bzr/+bug/654684
1522
_setup_authentication_config(scheme='http', host='localhost',
1523
user=user, password=password)
1524
handler = _urllib2_wrappers.HTTPAuthHandler()
1525
got_pass = handler.get_user_password(dict(
1532
self.assertEquals((user, password), got_pass)
1535
1465
class TestAuth(http_utils.TestCaseWithWebserver):
1536
1466
"""Test authentication scheme"""
1538
scenarios = multiply_scenarios(
1539
vary_by_http_client_implementation(),
1540
vary_by_http_protocol_version(),
1541
vary_by_http_auth_scheme(),
1468
_auth_header = 'Authorization'
1469
_password_prompt_prefix = ''
1470
_username_prompt_prefix = ''
1544
1474
def setUp(self):
1545
1475
super(TestAuth, self).setUp()
1717
1658
# initial 'who are you' and a second 'who are you' with the new nonce)
1718
1659
self.assertEqual(2, self.server.auth_required_errors)
1720
def test_user_from_auth_conf(self):
1721
if self._testing_pycurl():
1722
raise tests.TestNotApplicable(
1723
'pycurl does not support authentication.conf')
1726
self.server.add_user(user, password)
1727
_setup_authentication_config(scheme='http', port=self.server.port,
1728
user=user, password=password)
1729
t = self.get_user_transport(None, None)
1730
# Issue a request to the server to connect
1731
self.assertEqual('contents of a\n', t.get('a').read())
1732
# Only one 'Authentication Required' error should occur
1733
self.assertEqual(1, self.server.auth_required_errors)
1735
def test_no_credential_leaks_in_log(self):
1736
self.overrideAttr(debug, 'debug_flags', set(['http']))
1738
password = 'very-sensitive-password'
1739
self.server.add_user(user, password)
1740
t = self.get_user_transport(user, password)
1741
# Capture the debug calls to mutter
1744
lines = args[0] % args[1:]
1745
# Some calls output multiple lines, just split them now since we
1746
# care about a single one later.
1747
self.mutters.extend(lines.splitlines())
1748
self.overrideAttr(trace, 'mutter', mutter)
1749
# Issue a request to the server to connect
1750
self.assertEqual(True, t.has('a'))
1751
# Only one 'Authentication Required' error should occur
1752
self.assertEqual(1, self.server.auth_required_errors)
1753
# Since the authentification succeeded, there should be a corresponding
1755
sent_auth_headers = [line for line in self.mutters
1756
if line.startswith('> %s' % (self._auth_header,))]
1757
self.assertLength(1, sent_auth_headers)
1758
self.assertStartsWith(sent_auth_headers[0],
1759
'> %s: <masked>' % (self._auth_header,))
1762
1663
class TestProxyAuth(TestAuth):
1763
"""Test proxy authentication schemes.
1765
This inherits from TestAuth to tweak the setUp and filter some failing
1769
scenarios = multiply_scenarios(
1770
vary_by_http_client_implementation(),
1771
vary_by_http_protocol_version(),
1772
vary_by_http_proxy_auth_scheme(),
1664
"""Test proxy authentication schemes."""
1666
_auth_header = 'Proxy-authorization'
1667
_password_prompt_prefix = 'Proxy '
1668
_username_prompt_prefix = 'Proxy '
1775
1670
def setUp(self):
1776
1671
super(TestProxyAuth, self).setUp()
1673
self.addCleanup(self._restore_env)
1777
1674
# Override the contents to avoid false positives
1778
1675
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1779
1676
('b', 'not proxied contents of b\n'),
2036
1932
def setUp(self):
2037
1933
tests.TestCase.setUp(self)
2038
1934
self.server = self._activity_server(self._protocol_version)
2039
self.server.start_server()
2040
_activities = {} # Don't close over self and create a cycle
1936
self.activities = {}
2041
1937
def report_activity(t, bytes, direction):
2042
count = _activities.get(direction, 0)
1938
count = self.activities.get(direction, 0)
2044
_activities[direction] = count
2045
self.activities = _activities
1940
self.activities[direction] = count
2047
1942
# We override at class level because constructors may propagate the
2048
1943
# bound method and render instance overriding ineffective (an
2049
1944
# alternative would be to define a specific ui factory instead...)
2050
self.overrideAttr(self._transport, '_report_activity', report_activity)
2051
self.addCleanup(self.server.stop_server)
1945
self.orig_report_activity = self._transport._report_activity
1946
self._transport._report_activity = report_activity
1949
self._transport._report_activity = self.orig_report_activity
1950
self.server.tearDown()
1951
tests.TestCase.tearDown(self)
2053
1953
def get_transport(self):
2054
t = self._transport(self.server.get_url())
2055
# FIXME: Needs cleanup -- vila 20100611
1954
return self._transport(self.server.get_url())
2058
1956
def assertActivitiesMatch(self):
2059
1957
self.assertEqual(self.server.bytes_read,
2165
2063
t = self.get_transport()
2166
2064
# We must send a single line of body bytes, see
2167
# PredefinedRequestHandler._handle_one_request
2065
# PredefinedRequestHandler.handle_one_request
2168
2066
code, f = t._post('abc def end-of-body\n')
2169
2067
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2170
2068
self.assertActivitiesMatch()
2173
class TestActivity(tests.TestCase, TestActivityMixin):
2175
scenarios = multiply_scenarios(
2176
vary_by_http_activity(),
2177
vary_by_http_protocol_version(),
2181
TestActivityMixin.setUp(self)
2184
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2186
# Unlike TestActivity, we are really testing ReportingFileSocket and
2187
# ReportingSocket, so we don't need all the parametrization. Since
2188
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2189
# test them through their use by the transport than directly (that's a
2190
# bit less clean but far more simpler and effective).
2191
_activity_server = ActivityHTTPServer
2192
_protocol_version = 'HTTP/1.1'
2195
self._transport =_urllib.HttpTransport_urllib
2196
TestActivityMixin.setUp(self)
2198
def assertActivitiesMatch(self):
2199
# Nothing to check here
2203
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2204
"""Test authentication on the redirected http server."""
2206
scenarios = vary_by_http_protocol_version()
2208
_auth_header = 'Authorization'
2209
_password_prompt_prefix = ''
2210
_username_prompt_prefix = ''
2211
_auth_server = http_utils.HTTPBasicAuthServer
2212
_transport = _urllib.HttpTransport_urllib
2215
super(TestAuthOnRedirected, self).setUp()
2216
self.build_tree_contents([('a','a'),
2218
('1/a', 'redirected once'),
2220
new_prefix = 'http://%s:%s' % (self.new_server.host,
2221
self.new_server.port)
2222
self.old_server.redirections = [
2223
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2224
self.old_transport = self.get_old_transport()
2225
self.new_server.add_user('joe', 'foo')
2226
cleanup_http_redirection_connections(self)
2228
def create_transport_readonly_server(self):
2229
server = self._auth_server(protocol_version=self._protocol_version)
2230
server._url_protocol = self._url_protocol
2236
def test_auth_on_redirected_via_do_catching_redirections(self):
2237
self.redirections = 0
2239
def redirected(t, exception, redirection_notice):
2240
self.redirections += 1
2241
redirected_t = t._redirected_to(exception.source, exception.target)
2242
self.addCleanup(redirected_t.disconnect)
2245
stdout = tests.StringIOWrapper()
2246
stderr = tests.StringIOWrapper()
2247
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2248
stdout=stdout, stderr=stderr)
2249
self.assertEqual('redirected once',
2250
transport.do_catching_redirections(
2251
self.get_a, self.old_transport, redirected).read())
2252
self.assertEqual(1, self.redirections)
2253
# stdin should be empty
2254
self.assertEqual('', ui.ui_factory.stdin.readline())
2255
# stdout should be empty, stderr will contains the prompts
2256
self.assertEqual('', stdout.getvalue())
2258
def test_auth_on_redirected_via_following_redirections(self):
2259
self.new_server.add_user('joe', 'foo')
2260
stdout = tests.StringIOWrapper()
2261
stderr = tests.StringIOWrapper()
2262
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2263
stdout=stdout, stderr=stderr)
2264
t = self.old_transport
2265
req = RedirectedRequest('GET', t.abspath('a'))
2266
new_prefix = 'http://%s:%s' % (self.new_server.host,
2267
self.new_server.port)
2268
self.old_server.redirections = [
2269
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2270
self.assertEqual('redirected once', t._perform(req).read())
2271
# stdin should be empty
2272
self.assertEqual('', ui.ui_factory.stdin.readline())
2273
# stdout should be empty, stderr will contains the prompts
2274
self.assertEqual('', stdout.getvalue())