85
81
('pycurl', dict(_transport=PyCurlTransport,
86
82
_server=http_server.HttpServer_PyCurl,
87
83
_url_protocol='http+pycurl',)))
88
tests.multiply_tests(t_tests, transport_scenarios, result)
90
protocol_scenarios = [
91
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
92
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
95
# some tests are parametrized by the protocol version only
96
p_tests, remaining_tests = tests.split_suite_by_condition(
97
remaining_tests, tests.condition_isinstance((
100
tests.multiply_tests(p_tests, protocol_scenarios, result)
102
# each implementation tested with each HTTP version
103
tp_tests, remaining_tests = tests.split_suite_by_condition(
104
remaining_tests, tests.condition_isinstance((
105
SmartHTTPTunnellingTest,
106
TestDoCatchRedirections,
108
TestHTTPRedirections,
109
TestHTTPSilentRedirections,
110
TestLimitedRangeRequestServer,
114
TestSpecificRequestHandler,
116
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
118
tests.multiply_tests(tp_tests, tp_scenarios, result)
120
# proxy auth: each auth scheme on all http versions on all implementations.
121
tppa_tests, remaining_tests = tests.split_suite_by_condition(
122
remaining_tests, tests.condition_isinstance((
125
proxy_auth_scheme_scenarios = [
126
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
127
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
129
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
84
return transport_scenarios
87
def vary_by_http_protocol_version():
88
"""Test on http/1.0 and 1.1"""
90
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
91
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
131
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
132
proxy_auth_scheme_scenarios)
133
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
135
# auth: each auth scheme on all http versions on all implementations.
136
tpa_tests, remaining_tests = tests.split_suite_by_condition(
137
remaining_tests, tests.condition_isinstance((
140
auth_scheme_scenarios = [
95
def vary_by_http_auth_scheme():
141
97
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
142
98
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
144
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
146
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
147
auth_scheme_scenarios)
148
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
150
# activity: on all http[s] versions on all implementations
151
tpact_tests, remaining_tests = tests.split_suite_by_condition(
152
remaining_tests, tests.condition_isinstance((
100
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
102
# Add some attributes common to all scenarios
103
for scenario_id, scenario_dict in scenarios:
104
scenario_dict.update(_auth_header='Authorization',
105
_username_prompt_prefix='',
106
_password_prompt_prefix='')
110
def vary_by_http_proxy_auth_scheme():
112
('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
113
('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
114
('proxy-basicdigest',
115
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
117
# Add some attributes common to all scenarios
118
for scenario_id, scenario_dict in scenarios:
119
scenario_dict.update(_auth_header='Proxy-Authorization',
120
_username_prompt_prefix='Proxy ',
121
_password_prompt_prefix='Proxy ')
125
def vary_by_http_activity():
155
126
activity_scenarios = [
156
127
('urllib,http', dict(_activity_server=ActivityHTTPServer,
157
_transport=_urllib.HttpTransport_urllib,)),
128
_transport=_urllib.HttpTransport_urllib,)),
159
if tests.HTTPSServerFeature.available():
130
if features.pycurl.available():
131
activity_scenarios.append(
132
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
133
_transport=PyCurlTransport,)),)
134
if features.HTTPSServerFeature.available():
135
# FIXME: Until we have a better way to handle self-signed certificates
136
# (like allowing them in a test specific authentication.conf for
137
# example), we need some specialized pycurl/urllib transport for tests.
139
from bzrlib.tests import (
142
class HTTPS_urllib_transport(_urllib.HttpTransport_urllib):
144
def __init__(self, base, _from_transport=None):
145
super(HTTPS_urllib_transport, self).__init__(
146
base, _from_transport=_from_transport,
147
ca_certs=ssl_certs.build_path('ca.crt'))
160
149
activity_scenarios.append(
161
150
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
162
_transport=_urllib.HttpTransport_urllib,)),)
163
if features.pycurl.available():
164
activity_scenarios.append(
165
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
166
_transport=PyCurlTransport,)),)
167
if tests.HTTPSServerFeature.available():
168
from bzrlib.tests import (
171
# FIXME: Until we have a better way to handle self-signed
172
# certificates (like allowing them in a test specific
173
# authentication.conf for example), we need some specialized pycurl
174
# transport for tests.
151
_transport=HTTPS_urllib_transport,)),)
152
if features.pycurl.available():
175
153
class HTTPS_pycurl_transport(PyCurlTransport):
177
155
def __init__(self, base, _from_transport=None):
305
275
self.assertEqual('realm="Thou should not pass"', remainder)
278
class TestHTTPRangeParsing(tests.TestCase):
281
super(TestHTTPRangeParsing, self).setUp()
282
# We focus on range parsing here and ignore everything else
283
class RequestHandler(http_server.TestingHTTPRequestHandler):
284
def setup(self): pass
285
def handle(self): pass
286
def finish(self): pass
288
self.req_handler = RequestHandler(None, None, None)
290
def assertRanges(self, ranges, header, file_size):
291
self.assertEquals(ranges,
292
self.req_handler._parse_ranges(header, file_size))
294
def test_simple_range(self):
295
self.assertRanges([(0,2)], 'bytes=0-2', 12)
298
self.assertRanges([(8, 11)], 'bytes=-4', 12)
300
def test_tail_bigger_than_file(self):
301
self.assertRanges([(0, 11)], 'bytes=-99', 12)
303
def test_range_without_end(self):
304
self.assertRanges([(4, 11)], 'bytes=4-', 12)
306
def test_invalid_ranges(self):
307
self.assertRanges(None, 'bytes=12-22', 12)
308
self.assertRanges(None, 'bytes=1-3,12-22', 12)
309
self.assertRanges(None, 'bytes=-', 12)
308
312
class TestHTTPServer(tests.TestCase):
309
313
"""Test the HTTP servers implementations."""
380
384
_transport = property(_get_pycurl_maybe)
383
class TestHttpUrls(tests.TestCase):
385
# TODO: This should be moved to authorization tests once they
388
def test_url_parsing(self):
390
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
393
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
401
387
class TestHttpTransportUrls(tests.TestCase):
402
388
"""Test the http urls."""
390
scenarios = vary_by_http_client_implementation()
404
392
def test_abs_url(self):
405
393
"""Construction of absolute http URLs"""
406
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
394
t = self._transport('http://example.com/bzr/bzr.dev/')
407
395
eq = self.assertEqualDiff
408
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
409
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
410
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
396
eq(t.abspath('.'), 'http://example.com/bzr/bzr.dev')
397
eq(t.abspath('foo/bar'), 'http://example.com/bzr/bzr.dev/foo/bar')
398
eq(t.abspath('.bzr'), 'http://example.com/bzr/bzr.dev/.bzr')
411
399
eq(t.abspath('.bzr/1//2/./3'),
412
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
400
'http://example.com/bzr/bzr.dev/.bzr/1/2/3')
414
402
def test_invalid_http_urls(self):
415
403
"""Trap invalid construction of urls"""
416
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
404
self._transport('http://example.com/bzr/bzr.dev/')
417
405
self.assertRaises(errors.InvalidURL,
419
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
407
'http://http://example.com/bzr/bzr.dev/')
421
409
def test_http_root_urls(self):
422
410
"""Construction of URLs from server root"""
423
t = self._transport('http://bzr.ozlabs.org/')
411
t = self._transport('http://example.com/')
424
412
eq = self.assertEqualDiff
425
413
eq(t.abspath('.bzr/tree-version'),
426
'http://bzr.ozlabs.org/.bzr/tree-version')
414
'http://example.com/.bzr/tree-version')
428
416
def test_http_impl_urls(self):
429
417
"""There are servers which ask for particular clients to connect"""
525
518
class TestHttpTransportRegistration(tests.TestCase):
526
519
"""Test registrations of various http implementations"""
521
scenarios = vary_by_http_client_implementation()
528
523
def test_http_registered(self):
529
t = transport.get_transport('%s://foo.com/' % self._url_protocol)
524
t = transport.get_transport_from_url(
525
'%s://foo.com/' % self._url_protocol)
530
526
self.assertIsInstance(t, transport.Transport)
531
527
self.assertIsInstance(t, self._transport)
534
530
class TestPost(tests.TestCase):
532
scenarios = multiply_scenarios(
533
vary_by_http_client_implementation(),
534
vary_by_http_protocol_version(),
536
537
def test_post_body_is_received(self):
537
538
server = RecordingServer(expect_body_tail='end-of-body',
538
539
scheme=self._url_protocol)
539
540
self.start_server(server)
540
541
url = server.get_url()
541
542
# FIXME: needs a cleanup -- vila 20100611
542
http_transport = transport.get_transport(url)
543
http_transport = transport.get_transport_from_url(url)
543
544
code, response = http_transport._post('abc def end-of-body')
545
546
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
546
547
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
548
self.assertTrue('content-type: application/octet-stream\r'
549
in server.received_bytes.lower())
547
550
# The transport should not be assuming that the server can accept
548
551
# chunked encoding the first time it connects, because HTTP/1.1, so we
549
552
# check for the literal string.
1027
1037
self.assertEqual('single', t._range_hint)
1040
class TruncatedBeforeBoundaryRequestHandler(
1041
http_server.TestingHTTPRequestHandler):
1042
"""Truncation before a boundary, like in bug 198646"""
1044
_truncated_ranges = 1
1046
def get_multiple_ranges(self, file, file_size, ranges):
1047
self.send_response(206)
1048
self.send_header('Accept-Ranges', 'bytes')
1050
self.send_header('Content-Type',
1051
'multipart/byteranges; boundary=%s' % boundary)
1052
boundary_line = '--%s\r\n' % boundary
1053
# Calculate the Content-Length
1055
for (start, end) in ranges:
1056
content_length += len(boundary_line)
1057
content_length += self._header_line_length(
1058
'Content-type', 'application/octet-stream')
1059
content_length += self._header_line_length(
1060
'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1061
content_length += len('\r\n') # end headers
1062
content_length += end - start # + 1
1063
content_length += len(boundary_line)
1064
self.send_header('Content-length', content_length)
1067
# Send the multipart body
1069
for (start, end) in ranges:
1070
if cur + self._truncated_ranges >= len(ranges):
1071
# Abruptly ends the response and close the connection
1072
self.close_connection = 1
1074
self.wfile.write(boundary_line)
1075
self.send_header('Content-type', 'application/octet-stream')
1076
self.send_header('Content-Range', 'bytes %d-%d/%d'
1077
% (start, end, file_size))
1079
self.send_range_content(file, start, end - start + 1)
1082
self.wfile.write(boundary_line)
1085
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1086
"""Tests the case of bug 198646, disconnecting before a boundary."""
1088
_req_handler_class = TruncatedBeforeBoundaryRequestHandler
1091
super(TestTruncatedBeforeBoundary, self).setUp()
1092
self.build_tree_contents([('a', '0123456789')],)
1094
def test_readv_with_short_reads(self):
1095
server = self.get_readonly_server()
1096
t = self.get_readonly_transport()
1097
# Force separate ranges for each offset
1098
t._bytes_to_read_before_seek = 0
1099
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1100
self.assertEqual((0, '0'), ireadv.next())
1101
self.assertEqual((2, '2'), ireadv.next())
1102
self.assertEqual((4, '45'), ireadv.next())
1103
self.assertEqual((9, '9'), ireadv.next())
1029
1106
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1030
1107
"""Errors out when range specifiers exceed the limit"""
1095
1177
Only the urllib implementation is tested here.
1099
tests.TestCase.setUp(self)
1101
self.addCleanup(self._restore_env)
1103
def _install_env(self, env):
1104
for name, value in env.iteritems():
1105
self._old_env[name] = osutils.set_or_unset_env(name, value)
1107
def _restore_env(self):
1108
for name, value in self._old_env.iteritems():
1109
osutils.set_or_unset_env(name, value)
1111
1180
def _proxied_request(self):
1112
1181
handler = _urllib2_wrappers.ProxyHandler()
1113
request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1182
request = _urllib2_wrappers.Request('GET', 'http://baz/buzzle')
1114
1183
handler.set_proxy(request, 'http')
1186
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1187
handler = _urllib2_wrappers.ProxyHandler()
1188
self.assertEquals(expected,
1189
handler.evaluate_proxy_bypass(host, no_proxy))
1117
1191
def test_empty_user(self):
1118
self._install_env({'http_proxy': 'http://bar.com'})
1192
self.overrideEnv('http_proxy', 'http://bar.com')
1193
request = self._proxied_request()
1194
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1196
def test_user_with_at(self):
1197
self.overrideEnv('http_proxy',
1198
'http://username@domain:password@proxy_host:1234')
1119
1199
request = self._proxied_request()
1120
1200
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1122
1202
def test_invalid_proxy(self):
1123
1203
"""A proxy env variable without scheme"""
1124
self._install_env({'http_proxy': 'host:1234'})
1204
self.overrideEnv('http_proxy', 'host:1234')
1125
1205
self.assertRaises(errors.InvalidURL, self._proxied_request)
1207
def test_evaluate_proxy_bypass_true(self):
1208
"""The host is not proxied"""
1209
self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1210
self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1212
def test_evaluate_proxy_bypass_false(self):
1213
"""The host is proxied"""
1214
self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1216
def test_evaluate_proxy_bypass_unknown(self):
1217
"""The host is not explicitly proxied"""
1218
self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1219
self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1221
def test_evaluate_proxy_bypass_empty_entries(self):
1222
"""Ignore empty entries"""
1223
self.assertEvaluateProxyBypass(None, 'example.com', '')
1224
self.assertEvaluateProxyBypass(None, 'example.com', ',')
1225
self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1128
1228
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1129
1229
"""Tests proxy server.
1153
1258
self.no_proxy_host = self.server_host_port
1154
1259
# The secondary server is the proxy
1155
1260
self.proxy_url = self.get_secondary_url()
1158
1262
def _testing_pycurl(self):
1159
1263
# TODO: This is duplicated for lots of the classes in this file
1160
1264
return (features.pycurl.available()
1161
1265
and self._transport == PyCurlTransport)
1163
def _install_env(self, env):
1164
for name, value in env.iteritems():
1165
self._old_env[name] = osutils.set_or_unset_env(name, value)
1167
def _restore_env(self):
1168
for name, value in self._old_env.iteritems():
1169
osutils.set_or_unset_env(name, value)
1171
def proxied_in_env(self, env):
1172
self._install_env(env)
1173
t = self.get_readonly_transport()
1175
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1179
def not_proxied_in_env(self, env):
1180
self._install_env(env)
1181
t = self.get_readonly_transport()
1183
self.assertEqual('contents of foo\n', t.get('foo').read())
1267
def assertProxied(self):
1268
t = self.get_readonly_transport()
1269
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1271
def assertNotProxied(self):
1272
t = self.get_readonly_transport()
1273
self.assertEqual('contents of foo\n', t.get('foo').read())
1187
1275
def test_http_proxy(self):
1188
self.proxied_in_env({'http_proxy': self.proxy_url})
1276
self.overrideEnv('http_proxy', self.proxy_url)
1277
self.assertProxied()
1190
1279
def test_HTTP_PROXY(self):
1191
1280
if self._testing_pycurl():
1194
1283
# about. Should we ?)
1195
1284
raise tests.TestNotApplicable(
1196
1285
'pycurl does not check HTTP_PROXY for security reasons')
1197
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1286
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1287
self.assertProxied()
1199
1289
def test_all_proxy(self):
1200
self.proxied_in_env({'all_proxy': self.proxy_url})
1290
self.overrideEnv('all_proxy', self.proxy_url)
1291
self.assertProxied()
1202
1293
def test_ALL_PROXY(self):
1203
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1294
self.overrideEnv('ALL_PROXY', self.proxy_url)
1295
self.assertProxied()
1205
1297
def test_http_proxy_with_no_proxy(self):
1206
self.not_proxied_in_env({'http_proxy': self.proxy_url,
1207
'no_proxy': self.no_proxy_host})
1298
self.overrideEnv('no_proxy', self.no_proxy_host)
1299
self.overrideEnv('http_proxy', self.proxy_url)
1300
self.assertNotProxied()
1209
1302
def test_HTTP_PROXY_with_NO_PROXY(self):
1210
1303
if self._testing_pycurl():
1211
1304
raise tests.TestNotApplicable(
1212
1305
'pycurl does not check HTTP_PROXY for security reasons')
1213
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1214
'NO_PROXY': self.no_proxy_host})
1306
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1307
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1308
self.assertNotProxied()
1216
1310
def test_all_proxy_with_no_proxy(self):
1217
self.not_proxied_in_env({'all_proxy': self.proxy_url,
1218
'no_proxy': self.no_proxy_host})
1311
self.overrideEnv('no_proxy', self.no_proxy_host)
1312
self.overrideEnv('all_proxy', self.proxy_url)
1313
self.assertNotProxied()
1220
1315
def test_ALL_PROXY_with_NO_PROXY(self):
1221
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1222
'NO_PROXY': self.no_proxy_host})
1316
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1317
self.overrideEnv('ALL_PROXY', self.proxy_url)
1318
self.assertNotProxied()
1224
1320
def test_http_proxy_without_scheme(self):
1321
self.overrideEnv('http_proxy', self.server_host_port)
1225
1322
if self._testing_pycurl():
1226
1323
# pycurl *ignores* invalid proxy env variables. If that ever change
1227
1324
# in the future, this test will fail indicating that pycurl do not
1228
1325
# ignore anymore such variables.
1229
self.not_proxied_in_env({'http_proxy': self.server_host_port})
1326
self.assertNotProxied()
1231
self.assertRaises(errors.InvalidURL,
1232
self.proxied_in_env,
1233
{'http_proxy': self.server_host_port})
1328
self.assertRaises(errors.InvalidURL, self.assertProxied)
1236
1331
class TestRanges(http_utils.TestCaseWithWebserver):
1237
1332
"""Test the Range header in GET methods."""
1334
scenarios = multiply_scenarios(
1335
vary_by_http_client_implementation(),
1336
vary_by_http_protocol_version(),
1239
1339
def setUp(self):
1240
http_utils.TestCaseWithWebserver.setUp(self)
1340
super(TestRanges, self).setUp()
1241
1341
self.build_tree_contents([('a', '0123456789')],)
1243
1343
def create_transport_readonly_server(self):
1443
1558
self.get_a, self.old_transport, redirected)
1561
def _setup_authentication_config(**kwargs):
1562
conf = config.AuthenticationConfig()
1563
conf._get_config().update({'httptest': kwargs})
1567
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1568
"""Unit tests for glue by which urllib2 asks us for authentication"""
1570
def test_get_user_password_without_port(self):
1571
"""We cope if urllib2 doesn't tell us the port.
1573
See https://bugs.launchpad.net/bzr/+bug/654684
1577
_setup_authentication_config(scheme='http', host='localhost',
1578
user=user, password=password)
1579
handler = _urllib2_wrappers.HTTPAuthHandler()
1580
got_pass = handler.get_user_password(dict(
1587
self.assertEquals((user, password), got_pass)
1446
1590
class TestAuth(http_utils.TestCaseWithWebserver):
1447
1591
"""Test authentication scheme"""
1449
_auth_header = 'Authorization'
1450
_password_prompt_prefix = ''
1451
_username_prompt_prefix = ''
1593
scenarios = multiply_scenarios(
1594
vary_by_http_client_implementation(),
1595
vary_by_http_protocol_version(),
1596
vary_by_http_auth_scheme(),
1455
1599
def setUp(self):
1456
1600
super(TestAuth, self).setUp()
1610
1752
# Only one 'Authentication Required' error should occur
1611
1753
self.assertEqual(1, self.server.auth_required_errors)
1613
def test_user_from_auth_conf(self):
1614
if self._testing_pycurl():
1615
raise tests.TestNotApplicable(
1616
'pycurl does not support authentication.conf')
1619
self.server.add_user(user, password)
1620
# Create a minimal config file with the right password
1621
conf = config.AuthenticationConfig()
1622
conf._get_config().update(
1623
{'httptest': {'scheme': 'http', 'port': self.server.port,
1624
'user': user, 'password': password}})
1626
t = self.get_user_transport(None, None)
1627
# Issue a request to the server to connect
1628
self.assertEqual('contents of a\n', t.get('a').read())
1629
# Only one 'Authentication Required' error should occur
1630
self.assertEqual(1, self.server.auth_required_errors)
1632
1755
def test_changing_nonce(self):
1633
1756
if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1634
1757
http_utils.ProxyDigestAuthServer):
1635
1758
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1636
1759
if self._testing_pycurl():
1637
raise tests.KnownFailure(
1638
1761
'pycurl does not handle a nonce change')
1639
1762
self.server.add_user('joe', 'foo')
1640
1763
t = self.get_user_transport('joe', 'foo')
1650
1773
# initial 'who are you' and a second 'who are you' with the new nonce)
1651
1774
self.assertEqual(2, self.server.auth_required_errors)
1776
def test_user_from_auth_conf(self):
1777
if self._testing_pycurl():
1778
raise tests.TestNotApplicable(
1779
'pycurl does not support authentication.conf')
1782
self.server.add_user(user, password)
1783
_setup_authentication_config(scheme='http', port=self.server.port,
1784
user=user, password=password)
1785
t = self.get_user_transport(None, None)
1786
# Issue a request to the server to connect
1787
self.assertEqual('contents of a\n', t.get('a').read())
1788
# Only one 'Authentication Required' error should occur
1789
self.assertEqual(1, self.server.auth_required_errors)
1791
def test_no_credential_leaks_in_log(self):
1792
self.overrideAttr(debug, 'debug_flags', set(['http']))
1794
password = 'very-sensitive-password'
1795
self.server.add_user(user, password)
1796
t = self.get_user_transport(user, password)
1797
# Capture the debug calls to mutter
1800
lines = args[0] % args[1:]
1801
# Some calls output multiple lines, just split them now since we
1802
# care about a single one later.
1803
self.mutters.extend(lines.splitlines())
1804
self.overrideAttr(trace, 'mutter', mutter)
1805
# Issue a request to the server to connect
1806
self.assertEqual(True, t.has('a'))
1807
# Only one 'Authentication Required' error should occur
1808
self.assertEqual(1, self.server.auth_required_errors)
1809
# Since the authentification succeeded, there should be a corresponding
1811
sent_auth_headers = [line for line in self.mutters
1812
if line.startswith('> %s' % (self._auth_header,))]
1813
self.assertLength(1, sent_auth_headers)
1814
self.assertStartsWith(sent_auth_headers[0],
1815
'> %s: <masked>' % (self._auth_header,))
1655
1818
class TestProxyAuth(TestAuth):
1656
"""Test proxy authentication schemes."""
1658
_auth_header = 'Proxy-authorization'
1659
_password_prompt_prefix = 'Proxy '
1660
_username_prompt_prefix = 'Proxy '
1819
"""Test proxy authentication schemes.
1821
This inherits from TestAuth to tweak the setUp and filter some failing
1825
scenarios = multiply_scenarios(
1826
vary_by_http_client_implementation(),
1827
vary_by_http_protocol_version(),
1828
vary_by_http_proxy_auth_scheme(),
1662
1831
def setUp(self):
1663
1832
super(TestProxyAuth, self).setUp()
1665
self.addCleanup(self._restore_env)
1666
1833
# Override the contents to avoid false positives
1667
1834
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1668
1835
('b', 'not proxied contents of b\n'),
1833
2002
r = t._redirected_to('http://www.example.com/foo',
1834
2003
'http://foo.example.com/foo/subdir')
1835
2004
self.assertIsInstance(r, type(t))
2005
self.assertEquals('http://foo.example.com/foo/subdir/',
1837
2008
def test_redirected_to_same_host_sibling_protocol(self):
1838
2009
t = self._transport('http://www.example.com/foo')
1839
2010
r = t._redirected_to('http://www.example.com/foo',
1840
2011
'https://www.example.com/foo')
1841
2012
self.assertIsInstance(r, type(t))
2013
self.assertEquals('https://www.example.com/foo/',
1843
2016
def test_redirected_to_same_host_different_protocol(self):
1844
2017
t = self._transport('http://www.example.com/foo')
1845
2018
r = t._redirected_to('http://www.example.com/foo',
1846
2019
'ftp://www.example.com/foo')
1847
2020
self.assertNotEquals(type(r), type(t))
2021
self.assertEquals('ftp://www.example.com/foo/', r.external_url())
2023
def test_redirected_to_same_host_specific_implementation(self):
2024
t = self._transport('http://www.example.com/foo')
2025
r = t._redirected_to('http://www.example.com/foo',
2026
'https+urllib://www.example.com/foo')
2027
self.assertEquals('https://www.example.com/foo/', r.external_url())
1849
2029
def test_redirected_to_different_host_same_user(self):
1850
2030
t = self._transport('http://joe@www.example.com/foo')
1851
2031
r = t._redirected_to('http://www.example.com/foo',
1852
2032
'https://foo.example.com/foo')
1853
2033
self.assertIsInstance(r, type(t))
1854
self.assertEqual(t._user, r._user)
2034
self.assertEqual(t._parsed_url.user, r._parsed_url.user)
2035
self.assertEquals('https://joe@foo.example.com/foo/', r.external_url())
1857
2038
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):