63
66
from bzrlib.transport.http._pycurl import PyCurlTransport
66
def load_tests(standard_tests, module, loader):
67
"""Multiply tests for http clients and protocol versions."""
68
result = loader.suiteClass()
70
# one for each transport implementation
71
t_tests, remaining_tests = tests.split_suite_by_condition(
72
standard_tests, tests.condition_isinstance((
73
TestHttpTransportRegistration,
74
TestHttpTransportUrls,
69
load_tests = load_tests_apply_scenarios
72
def vary_by_http_client_implementation():
73
"""Test the two libraries we can use, pycurl and urllib."""
77
74
transport_scenarios = [
78
75
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
79
76
_server=http_server.HttpServer_urllib,
80
_qualified_prefix='http+urllib',)),
77
_url_protocol='http+urllib',)),
82
79
if features.pycurl.available():
83
80
transport_scenarios.append(
84
81
('pycurl', dict(_transport=PyCurlTransport,
85
82
_server=http_server.HttpServer_PyCurl,
86
_qualified_prefix='http+pycurl',)))
87
tests.multiply_tests(t_tests, transport_scenarios, result)
89
protocol_scenarios = [
90
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
91
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
94
# some tests are parametrized by the protocol version only
95
p_tests, remaining_tests = tests.split_suite_by_condition(
96
remaining_tests, tests.condition_isinstance((
99
tests.multiply_tests(p_tests, protocol_scenarios, result)
101
# each implementation tested with each HTTP version
102
tp_tests, remaining_tests = tests.split_suite_by_condition(
103
remaining_tests, tests.condition_isinstance((
104
SmartHTTPTunnellingTest,
105
TestDoCatchRedirections,
107
TestHTTPRedirections,
108
TestHTTPSilentRedirections,
109
TestLimitedRangeRequestServer,
113
TestSpecificRequestHandler,
115
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
117
tests.multiply_tests(tp_tests, tp_scenarios, result)
119
# proxy auth: each auth scheme on all http versions on all implementations.
120
tppa_tests, remaining_tests = tests.split_suite_by_condition(
121
remaining_tests, tests.condition_isinstance((
124
proxy_auth_scheme_scenarios = [
125
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
126
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
128
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
83
_url_protocol='http+pycurl',)))
84
return transport_scenarios
87
def vary_by_http_protocol_version():
88
"""Test on http/1.0 and 1.1"""
90
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
91
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
130
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
131
proxy_auth_scheme_scenarios)
132
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
134
# auth: each auth scheme on all http versions on all implementations.
135
tpa_tests, remaining_tests = tests.split_suite_by_condition(
136
remaining_tests, tests.condition_isinstance((
139
auth_scheme_scenarios = [
95
def vary_by_http_auth_scheme():
140
97
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
141
98
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
143
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
145
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
146
auth_scheme_scenarios)
147
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
149
# activity: on all http[s] versions on all implementations
150
tpact_tests, remaining_tests = tests.split_suite_by_condition(
151
remaining_tests, tests.condition_isinstance((
100
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
102
# Add some attributes common to all scenarios
103
for scenario_id, scenario_dict in scenarios:
104
scenario_dict.update(_auth_header='Authorization',
105
_username_prompt_prefix='',
106
_password_prompt_prefix='')
110
def vary_by_http_proxy_auth_scheme():
112
('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
113
('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
114
('proxy-basicdigest',
115
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
117
# Add some attributes common to all scenarios
118
for scenario_id, scenario_dict in scenarios:
119
scenario_dict.update(_auth_header='Proxy-Authorization',
120
_username_prompt_prefix='Proxy ',
121
_password_prompt_prefix='Proxy ')
125
def vary_by_http_activity():
154
126
activity_scenarios = [
155
127
('urllib,http', dict(_activity_server=ActivityHTTPServer,
156
_transport=_urllib.HttpTransport_urllib,)),
128
_transport=_urllib.HttpTransport_urllib,)),
158
if tests.HTTPSServerFeature.available():
130
if features.pycurl.available():
131
activity_scenarios.append(
132
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
133
_transport=PyCurlTransport,)),)
134
if features.HTTPSServerFeature.available():
135
# FIXME: Until we have a better way to handle self-signed certificates
136
# (like allowing them in a test specific authentication.conf for
137
# example), we need some specialized pycurl/urllib transport for tests.
139
from bzrlib.tests import (
142
class HTTPS_urllib_transport(_urllib.HttpTransport_urllib):
144
def __init__(self, base, _from_transport=None):
145
super(HTTPS_urllib_transport, self).__init__(
146
base, _from_transport=_from_transport,
147
ca_certs=ssl_certs.build_path('ca.crt'))
159
149
activity_scenarios.append(
160
150
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
161
_transport=_urllib.HttpTransport_urllib,)),)
162
if features.pycurl.available():
163
activity_scenarios.append(
164
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
165
_transport=PyCurlTransport,)),)
166
if tests.HTTPSServerFeature.available():
167
from bzrlib.tests import (
170
# FIXME: Until we have a better way to handle self-signed
171
# certificates (like allowing them in a test specific
172
# authentication.conf for example), we need some specialized pycurl
173
# transport for tests.
151
_transport=HTTPS_urllib_transport,)),)
152
if features.pycurl.available():
174
153
class HTTPS_pycurl_transport(PyCurlTransport):
176
155
def __init__(self, base, _from_transport=None):
301
275
self.assertEqual('realm="Thou should not pass"', remainder)
278
class TestHTTPRangeParsing(tests.TestCase):
281
super(TestHTTPRangeParsing, self).setUp()
282
# We focus on range parsing here and ignore everything else
283
class RequestHandler(http_server.TestingHTTPRequestHandler):
284
def setup(self): pass
285
def handle(self): pass
286
def finish(self): pass
288
self.req_handler = RequestHandler(None, None, None)
290
def assertRanges(self, ranges, header, file_size):
291
self.assertEquals(ranges,
292
self.req_handler._parse_ranges(header, file_size))
294
def test_simple_range(self):
295
self.assertRanges([(0,2)], 'bytes=0-2', 12)
298
self.assertRanges([(8, 11)], 'bytes=-4', 12)
300
def test_tail_bigger_than_file(self):
301
self.assertRanges([(0, 11)], 'bytes=-99', 12)
303
def test_range_without_end(self):
304
self.assertRanges([(4, 11)], 'bytes=4-', 12)
306
def test_invalid_ranges(self):
307
self.assertRanges(None, 'bytes=12-22', 12)
308
self.assertRanges(None, 'bytes=1-3,12-22', 12)
309
self.assertRanges(None, 'bytes=-', 12)
304
312
class TestHTTPServer(tests.TestCase):
305
313
"""Test the HTTP servers implementations."""
385
384
_transport = property(_get_pycurl_maybe)
388
class TestHttpUrls(tests.TestCase):
390
# TODO: This should be moved to authorization tests once they
393
def test_url_parsing(self):
395
url = http.extract_auth('http://example.com', f)
396
self.assertEqual('http://example.com', url)
397
self.assertEqual(0, len(f.credentials))
398
url = http.extract_auth(
399
'http://user:pass@example.com/bzr/bzr.dev', f)
400
self.assertEqual('http://example.com/bzr/bzr.dev', url)
401
self.assertEqual(1, len(f.credentials))
402
self.assertEqual([None, 'example.com', 'user', 'pass'],
406
387
class TestHttpTransportUrls(tests.TestCase):
407
388
"""Test the http urls."""
390
scenarios = vary_by_http_client_implementation()
409
392
def test_abs_url(self):
410
393
"""Construction of absolute http URLs"""
411
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
394
t = self._transport('http://example.com/bzr/bzr.dev/')
412
395
eq = self.assertEqualDiff
413
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
414
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
415
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
396
eq(t.abspath('.'), 'http://example.com/bzr/bzr.dev')
397
eq(t.abspath('foo/bar'), 'http://example.com/bzr/bzr.dev/foo/bar')
398
eq(t.abspath('.bzr'), 'http://example.com/bzr/bzr.dev/.bzr')
416
399
eq(t.abspath('.bzr/1//2/./3'),
417
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
400
'http://example.com/bzr/bzr.dev/.bzr/1/2/3')
419
402
def test_invalid_http_urls(self):
420
403
"""Trap invalid construction of urls"""
421
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
404
self._transport('http://example.com/bzr/bzr.dev/')
422
405
self.assertRaises(errors.InvalidURL,
424
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
407
'http://http://example.com/bzr/bzr.dev/')
426
409
def test_http_root_urls(self):
427
410
"""Construction of URLs from server root"""
428
t = self._transport('http://bzr.ozlabs.org/')
411
t = self._transport('http://example.com/')
429
412
eq = self.assertEqualDiff
430
413
eq(t.abspath('.bzr/tree-version'),
431
'http://bzr.ozlabs.org/.bzr/tree-version')
414
'http://example.com/.bzr/tree-version')
433
416
def test_http_impl_urls(self):
434
417
"""There are servers which ask for particular clients to connect"""
530
518
class TestHttpTransportRegistration(tests.TestCase):
531
519
"""Test registrations of various http implementations"""
521
scenarios = vary_by_http_client_implementation()
533
523
def test_http_registered(self):
534
t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
524
t = transport.get_transport_from_url(
525
'%s://foo.com/' % self._url_protocol)
535
526
self.assertIsInstance(t, transport.Transport)
536
527
self.assertIsInstance(t, self._transport)
539
530
class TestPost(tests.TestCase):
532
scenarios = multiply_scenarios(
533
vary_by_http_client_implementation(),
534
vary_by_http_protocol_version(),
541
537
def test_post_body_is_received(self):
542
538
server = RecordingServer(expect_body_tail='end-of-body',
543
scheme=self._qualified_prefix)
539
scheme=self._url_protocol)
544
540
self.start_server(server)
545
541
url = server.get_url()
546
http_transport = self._transport(url)
542
# FIXME: needs a cleanup -- vila 20100611
543
http_transport = transport.get_transport_from_url(url)
547
544
code, response = http_transport._post('abc def end-of-body')
549
546
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
550
547
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
548
self.assertTrue('content-type: application/octet-stream\r'
549
in server.received_bytes.lower())
551
550
# The transport should not be assuming that the server can accept
552
551
# chunked encoding the first time it connects, because HTTP/1.1, so we
553
552
# check for the literal string.
1048
1052
self.assertEqual('single', t._range_hint)
1055
class TruncatedBeforeBoundaryRequestHandler(
1056
http_server.TestingHTTPRequestHandler):
1057
"""Truncation before a boundary, like in bug 198646"""
1059
_truncated_ranges = 1
1061
def get_multiple_ranges(self, file, file_size, ranges):
1062
self.send_response(206)
1063
self.send_header('Accept-Ranges', 'bytes')
1065
self.send_header('Content-Type',
1066
'multipart/byteranges; boundary=%s' % boundary)
1067
boundary_line = '--%s\r\n' % boundary
1068
# Calculate the Content-Length
1070
for (start, end) in ranges:
1071
content_length += len(boundary_line)
1072
content_length += self._header_line_length(
1073
'Content-type', 'application/octet-stream')
1074
content_length += self._header_line_length(
1075
'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1076
content_length += len('\r\n') # end headers
1077
content_length += end - start # + 1
1078
content_length += len(boundary_line)
1079
self.send_header('Content-length', content_length)
1082
# Send the multipart body
1084
for (start, end) in ranges:
1085
if cur + self._truncated_ranges >= len(ranges):
1086
# Abruptly ends the response and close the connection
1087
self.close_connection = 1
1089
self.wfile.write(boundary_line)
1090
self.send_header('Content-type', 'application/octet-stream')
1091
self.send_header('Content-Range', 'bytes %d-%d/%d'
1092
% (start, end, file_size))
1094
self.send_range_content(file, start, end - start + 1)
1097
self.wfile.write(boundary_line)
1100
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1101
"""Tests the case of bug 198646, disconnecting before a boundary."""
1103
_req_handler_class = TruncatedBeforeBoundaryRequestHandler
1106
super(TestTruncatedBeforeBoundary, self).setUp()
1107
self.build_tree_contents([('a', '0123456789')],)
1109
def test_readv_with_short_reads(self):
1110
server = self.get_readonly_server()
1111
t = self.get_readonly_transport()
1112
# Force separate ranges for each offset
1113
t._bytes_to_read_before_seek = 0
1114
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1115
self.assertEqual((0, '0'), ireadv.next())
1116
self.assertEqual((2, '2'), ireadv.next())
1117
self.assertEqual((4, '45'), ireadv.next())
1118
self.assertEqual((9, '9'), ireadv.next())
1050
1121
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1051
1122
"""Errors out when range specifiers exceed the limit"""
1119
1192
Only the urllib implementation is tested here.
1123
tests.TestCase.setUp(self)
1125
self.addCleanup(self._restore_env)
1127
def _install_env(self, env):
1128
for name, value in env.iteritems():
1129
self._old_env[name] = osutils.set_or_unset_env(name, value)
1131
def _restore_env(self):
1132
for name, value in self._old_env.iteritems():
1133
osutils.set_or_unset_env(name, value)
1135
1195
def _proxied_request(self):
1136
1196
handler = _urllib2_wrappers.ProxyHandler()
1137
request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1197
request = _urllib2_wrappers.Request('GET', 'http://baz/buzzle')
1138
1198
handler.set_proxy(request, 'http')
1201
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1202
handler = _urllib2_wrappers.ProxyHandler()
1203
self.assertEquals(expected,
1204
handler.evaluate_proxy_bypass(host, no_proxy))
1141
1206
def test_empty_user(self):
1142
self._install_env({'http_proxy': 'http://bar.com'})
1207
self.overrideEnv('http_proxy', 'http://bar.com')
1208
request = self._proxied_request()
1209
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1211
def test_user_with_at(self):
1212
self.overrideEnv('http_proxy',
1213
'http://username@domain:password@proxy_host:1234')
1143
1214
request = self._proxied_request()
1144
1215
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1146
1217
def test_invalid_proxy(self):
1147
1218
"""A proxy env variable without scheme"""
1148
self._install_env({'http_proxy': 'host:1234'})
1219
self.overrideEnv('http_proxy', 'host:1234')
1149
1220
self.assertRaises(errors.InvalidURL, self._proxied_request)
1222
def test_evaluate_proxy_bypass_true(self):
1223
"""The host is not proxied"""
1224
self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1225
self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1227
def test_evaluate_proxy_bypass_false(self):
1228
"""The host is proxied"""
1229
self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1231
def test_evaluate_proxy_bypass_unknown(self):
1232
"""The host is not explicitly proxied"""
1233
self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1234
self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1236
def test_evaluate_proxy_bypass_empty_entries(self):
1237
"""Ignore empty entries"""
1238
self.assertEvaluateProxyBypass(None, 'example.com', '')
1239
self.assertEvaluateProxyBypass(None, 'example.com', ',')
1240
self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1152
1243
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1153
1244
"""Tests proxy server.
1158
1249
to the file names).
1252
scenarios = multiply_scenarios(
1253
vary_by_http_client_implementation(),
1254
vary_by_http_protocol_version(),
1161
1257
# FIXME: We don't have an https server available, so we don't
1162
# test https connections.
1258
# test https connections. --vila toolongago
1164
1260
def setUp(self):
1165
1261
super(TestProxyHttpServer, self).setUp()
1262
self.transport_secondary_server = http_utils.ProxyServer
1166
1263
self.build_tree_contents([('foo', 'contents of foo\n'),
1167
1264
('foo-proxied', 'proxied contents of foo\n')])
1168
1265
# Let's setup some attributes for tests
1169
self.server = self.get_readonly_server()
1170
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1266
server = self.get_readonly_server()
1267
self.server_host_port = '%s:%d' % (server.host, server.port)
1171
1268
if self._testing_pycurl():
1172
1269
# Oh my ! pycurl does not check for the port as part of
1173
1270
# no_proxy :-( So we just test the host part
1174
self.no_proxy_host = self.server.host
1271
self.no_proxy_host = server.host
1176
self.no_proxy_host = self.proxy_address
1273
self.no_proxy_host = self.server_host_port
1177
1274
# The secondary server is the proxy
1178
self.proxy = self.get_secondary_server()
1179
self.proxy_url = self.proxy.get_url()
1275
self.proxy_url = self.get_secondary_url()
1182
1277
def _testing_pycurl(self):
1183
1278
# TODO: This is duplicated for lots of the classes in this file
1184
1279
return (features.pycurl.available()
1185
1280
and self._transport == PyCurlTransport)
1187
def create_transport_secondary_server(self):
1188
"""Creates an http server that will serve files with
1189
'-proxied' appended to their names.
1191
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1193
def _install_env(self, env):
1194
for name, value in env.iteritems():
1195
self._old_env[name] = osutils.set_or_unset_env(name, value)
1197
def _restore_env(self):
1198
for name, value in self._old_env.iteritems():
1199
osutils.set_or_unset_env(name, value)
1201
def proxied_in_env(self, env):
1202
self._install_env(env)
1203
url = self.server.get_url()
1204
t = self._transport(url)
1206
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1210
def not_proxied_in_env(self, env):
1211
self._install_env(env)
1212
url = self.server.get_url()
1213
t = self._transport(url)
1215
self.assertEqual('contents of foo\n', t.get('foo').read())
1282
def assertProxied(self):
1283
t = self.get_readonly_transport()
1284
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1286
def assertNotProxied(self):
1287
t = self.get_readonly_transport()
1288
self.assertEqual('contents of foo\n', t.get('foo').read())
1219
1290
def test_http_proxy(self):
1220
self.proxied_in_env({'http_proxy': self.proxy_url})
1291
self.overrideEnv('http_proxy', self.proxy_url)
1292
self.assertProxied()
1222
1294
def test_HTTP_PROXY(self):
1223
1295
if self._testing_pycurl():
1226
1298
# about. Should we ?)
1227
1299
raise tests.TestNotApplicable(
1228
1300
'pycurl does not check HTTP_PROXY for security reasons')
1229
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1301
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1302
self.assertProxied()
1231
1304
def test_all_proxy(self):
1232
self.proxied_in_env({'all_proxy': self.proxy_url})
1305
self.overrideEnv('all_proxy', self.proxy_url)
1306
self.assertProxied()
1234
1308
def test_ALL_PROXY(self):
1235
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1309
self.overrideEnv('ALL_PROXY', self.proxy_url)
1310
self.assertProxied()
1237
1312
def test_http_proxy_with_no_proxy(self):
1238
self.not_proxied_in_env({'http_proxy': self.proxy_url,
1239
'no_proxy': self.no_proxy_host})
1313
self.overrideEnv('no_proxy', self.no_proxy_host)
1314
self.overrideEnv('http_proxy', self.proxy_url)
1315
self.assertNotProxied()
1241
1317
def test_HTTP_PROXY_with_NO_PROXY(self):
1242
1318
if self._testing_pycurl():
1243
1319
raise tests.TestNotApplicable(
1244
1320
'pycurl does not check HTTP_PROXY for security reasons')
1245
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1246
'NO_PROXY': self.no_proxy_host})
1321
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1322
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1323
self.assertNotProxied()
1248
1325
def test_all_proxy_with_no_proxy(self):
1249
self.not_proxied_in_env({'all_proxy': self.proxy_url,
1250
'no_proxy': self.no_proxy_host})
1326
self.overrideEnv('no_proxy', self.no_proxy_host)
1327
self.overrideEnv('all_proxy', self.proxy_url)
1328
self.assertNotProxied()
1252
1330
def test_ALL_PROXY_with_NO_PROXY(self):
1253
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1254
'NO_PROXY': self.no_proxy_host})
1331
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1332
self.overrideEnv('ALL_PROXY', self.proxy_url)
1333
self.assertNotProxied()
1256
1335
def test_http_proxy_without_scheme(self):
1336
self.overrideEnv('http_proxy', self.server_host_port)
1257
1337
if self._testing_pycurl():
1258
1338
# pycurl *ignores* invalid proxy env variables. If that ever change
1259
1339
# in the future, this test will fail indicating that pycurl do not
1260
1340
# ignore anymore such variables.
1261
self.not_proxied_in_env({'http_proxy': self.proxy_address})
1341
self.assertNotProxied()
1263
self.assertRaises(errors.InvalidURL,
1264
self.proxied_in_env,
1265
{'http_proxy': self.proxy_address})
1343
self.assertRaises(errors.InvalidURL, self.assertProxied)
1268
1346
class TestRanges(http_utils.TestCaseWithWebserver):
1269
1347
"""Test the Range header in GET methods."""
1349
scenarios = multiply_scenarios(
1350
vary_by_http_client_implementation(),
1351
vary_by_http_protocol_version(),
1271
1354
def setUp(self):
1272
http_utils.TestCaseWithWebserver.setUp(self)
1355
super(TestRanges, self).setUp()
1273
1356
self.build_tree_contents([('a', '0123456789')],)
1274
server = self.get_readonly_server()
1275
self.transport = self._transport(server.get_url())
1277
1358
def create_transport_readonly_server(self):
1278
1359
return http_server.HttpServer(protocol_version=self._protocol_version)
1280
1361
def _file_contents(self, relpath, ranges):
1362
t = self.get_readonly_transport()
1281
1363
offsets = [ (start, end - start + 1) for start, end in ranges]
1282
coalesce = self.transport._coalesce_offsets
1364
coalesce = t._coalesce_offsets
1283
1365
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1284
code, data = self.transport._get(relpath, coalesced)
1366
code, data = t._get(relpath, coalesced)
1285
1367
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1286
1368
for start, end in ranges:
1287
1369
data.seek(start)
1288
1370
yield data.read(end - start + 1)
1290
1372
def _file_tail(self, relpath, tail_amount):
1291
code, data = self.transport._get(relpath, [], tail_amount)
1373
t = self.get_readonly_transport()
1374
code, data = t._get(relpath, [], tail_amount)
1292
1375
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1293
1376
data.seek(-tail_amount, 2)
1294
1377
return data.read(tail_amount)
1636
1767
# Only one 'Authentication Required' error should occur
1637
1768
self.assertEqual(1, self.server.auth_required_errors)
1639
def test_user_from_auth_conf(self):
1640
if self._testing_pycurl():
1641
raise tests.TestNotApplicable(
1642
'pycurl does not support authentication.conf')
1645
self.server.add_user(user, password)
1646
# Create a minimal config file with the right password
1647
conf = config.AuthenticationConfig()
1648
conf._get_config().update(
1649
{'httptest': {'scheme': 'http', 'port': self.server.port,
1650
'user': user, 'password': password}})
1652
t = self.get_user_transport(None, None)
1653
# Issue a request to the server to connect
1654
self.assertEqual('contents of a\n', t.get('a').read())
1655
# Only one 'Authentication Required' error should occur
1656
self.assertEqual(1, self.server.auth_required_errors)
1658
1770
def test_changing_nonce(self):
1659
1771
if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1660
1772
http_utils.ProxyDigestAuthServer):
1661
1773
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1662
1774
if self._testing_pycurl():
1663
raise tests.KnownFailure(
1664
1776
'pycurl does not handle a nonce change')
1665
1777
self.server.add_user('joe', 'foo')
1666
1778
t = self.get_user_transport('joe', 'foo')
1676
1788
# initial 'who are you' and a second 'who are you' with the new nonce)
1677
1789
self.assertEqual(2, self.server.auth_required_errors)
1791
def test_user_from_auth_conf(self):
1792
if self._testing_pycurl():
1793
raise tests.TestNotApplicable(
1794
'pycurl does not support authentication.conf')
1797
self.server.add_user(user, password)
1798
_setup_authentication_config(scheme='http', port=self.server.port,
1799
user=user, password=password)
1800
t = self.get_user_transport(None, None)
1801
# Issue a request to the server to connect
1802
self.assertEqual('contents of a\n', t.get('a').read())
1803
# Only one 'Authentication Required' error should occur
1804
self.assertEqual(1, self.server.auth_required_errors)
1806
def test_no_credential_leaks_in_log(self):
1807
self.overrideAttr(debug, 'debug_flags', set(['http']))
1809
password = 'very-sensitive-password'
1810
self.server.add_user(user, password)
1811
t = self.get_user_transport(user, password)
1812
# Capture the debug calls to mutter
1815
lines = args[0] % args[1:]
1816
# Some calls output multiple lines, just split them now since we
1817
# care about a single one later.
1818
self.mutters.extend(lines.splitlines())
1819
self.overrideAttr(trace, 'mutter', mutter)
1820
# Issue a request to the server to connect
1821
self.assertEqual(True, t.has('a'))
1822
# Only one 'Authentication Required' error should occur
1823
self.assertEqual(1, self.server.auth_required_errors)
1824
# Since the authentification succeeded, there should be a corresponding
1826
sent_auth_headers = [line for line in self.mutters
1827
if line.startswith('> %s' % (self._auth_header,))]
1828
self.assertLength(1, sent_auth_headers)
1829
self.assertStartsWith(sent_auth_headers[0],
1830
'> %s: <masked>' % (self._auth_header,))
1681
1833
class TestProxyAuth(TestAuth):
1682
"""Test proxy authentication schemes."""
1684
_auth_header = 'Proxy-authorization'
1685
_password_prompt_prefix = 'Proxy '
1686
_username_prompt_prefix = 'Proxy '
1834
"""Test proxy authentication schemes.
1836
This inherits from TestAuth to tweak the setUp and filter some failing
1840
scenarios = multiply_scenarios(
1841
vary_by_http_client_implementation(),
1842
vary_by_http_protocol_version(),
1843
vary_by_http_proxy_auth_scheme(),
1688
1846
def setUp(self):
1689
1847
super(TestProxyAuth, self).setUp()
1691
self.addCleanup(self._restore_env)
1692
1848
# Override the contents to avoid false positives
1693
1849
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1694
1850
('b', 'not proxied contents of b\n'),
1857
2017
r = t._redirected_to('http://www.example.com/foo',
1858
2018
'http://foo.example.com/foo/subdir')
1859
2019
self.assertIsInstance(r, type(t))
2020
self.assertEquals('http://foo.example.com/foo/subdir/',
1861
2023
def test_redirected_to_same_host_sibling_protocol(self):
1862
2024
t = self._transport('http://www.example.com/foo')
1863
2025
r = t._redirected_to('http://www.example.com/foo',
1864
2026
'https://www.example.com/foo')
1865
2027
self.assertIsInstance(r, type(t))
2028
self.assertEquals('https://www.example.com/foo/',
1867
2031
def test_redirected_to_same_host_different_protocol(self):
1868
2032
t = self._transport('http://www.example.com/foo')
1869
2033
r = t._redirected_to('http://www.example.com/foo',
1870
2034
'ftp://www.example.com/foo')
1871
2035
self.assertNotEquals(type(r), type(t))
2036
self.assertEquals('ftp://www.example.com/foo/', r.external_url())
2038
def test_redirected_to_same_host_specific_implementation(self):
2039
t = self._transport('http://www.example.com/foo')
2040
r = t._redirected_to('http://www.example.com/foo',
2041
'https+urllib://www.example.com/foo')
2042
self.assertEquals('https://www.example.com/foo/', r.external_url())
1873
2044
def test_redirected_to_different_host_same_user(self):
1874
2045
t = self._transport('http://joe@www.example.com/foo')
1875
2046
r = t._redirected_to('http://www.example.com/foo',
1876
2047
'https://foo.example.com/foo')
1877
2048
self.assertIsInstance(r, type(t))
1878
self.assertEqual(t._user, r._user)
2049
self.assertEqual(t._parsed_url.user, r._parsed_url.user)
2050
self.assertEquals('https://joe@foo.example.com/foo/', r.external_url())
1881
2053
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):