112
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
113
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
115
tp_scenarios = tests.multiply_scenarios(adapter.scenarios,
112
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
116
113
protocol_scenarios)
117
adapter.scenarios = tp_scenarios
118
tests.adapt_tests(tp_tests, adapter, result)
120
# multiplied by one for each authentication scheme
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
116
# proxy auth: each auth scheme on all http versions on all implementations.
117
tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
remaining_tests, tests.condition_isinstance((
121
proxy_auth_scheme_scenarios = [
122
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
125
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
127
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
proxy_auth_scheme_scenarios)
129
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
131
# auth: each auth scheme on all http versions on all implementations.
121
132
tpa_tests, remaining_tests = tests.split_suite_by_condition(
122
133
remaining_tests, tests.condition_isinstance((
125
136
auth_scheme_scenarios = [
126
('basic', dict(_auth_scheme='basic')),
127
('digest', dict(_auth_scheme='digest')),
137
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
140
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
129
adapter.scenarios = tests.multiply_scenarios(adapter.scenarios,
130
auth_scheme_scenarios)
131
tests.adapt_tests(tpa_tests, adapter, result)
142
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
auth_scheme_scenarios)
144
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
146
# activity: on all http[s] versions on all implementations
133
147
tpact_tests, remaining_tests = tests.split_suite_by_condition(
134
148
remaining_tests, tests.condition_isinstance((
137
151
activity_scenarios = [
138
('http', dict(_activity_server=ActivityHTTPServer)),
152
('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
_transport=_urllib.HttpTransport_urllib,)),
140
155
if tests.HTTPSServerFeature.available():
141
156
activity_scenarios.append(
142
('https', dict(_activity_server=ActivityHTTPSServer,)))
143
adapter.scenarios = tests.multiply_scenarios(tp_scenarios,
145
tests.adapt_tests(tpact_tests, adapter, result)
157
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
_transport=_urllib.HttpTransport_urllib,)),)
160
activity_scenarios.append(
161
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
_transport=PyCurlTransport,)),)
163
if tests.HTTPSServerFeature.available():
164
from bzrlib.tests import (
167
# FIXME: Until we have a better way to handle self-signed
168
# certificates (like allowing them in a test specific
169
# authentication.conf for example), we need some specialized pycurl
170
# transport for tests.
171
class HTTPS_pycurl_transport(PyCurlTransport):
173
def __init__(self, base, _from_transport=None):
174
super(HTTPS_pycurl_transport, self).__init__(
175
base, _from_transport)
176
self.cabundle = str(ssl_certs.build_path('ca.crt'))
178
activity_scenarios.append(
179
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
_transport=HTTPS_pycurl_transport,)),)
182
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
184
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
147
186
# No parametrization for the remaining tests
148
187
result.addTests(remaining_tests)
258
class TestAuthHeader(tests.TestCase):
260
def parse_header(self, header, auth_handler_class=None):
261
if auth_handler_class is None:
262
auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
263
self.auth_handler = auth_handler_class()
264
return self.auth_handler._parse_auth_header(header)
266
def test_empty_header(self):
267
scheme, remainder = self.parse_header('')
268
self.assertEquals('', scheme)
269
self.assertIs(None, remainder)
271
def test_negotiate_header(self):
272
scheme, remainder = self.parse_header('Negotiate')
273
self.assertEquals('negotiate', scheme)
274
self.assertIs(None, remainder)
276
def test_basic_header(self):
277
scheme, remainder = self.parse_header(
278
'Basic realm="Thou should not pass"')
279
self.assertEquals('basic', scheme)
280
self.assertEquals('realm="Thou should not pass"', remainder)
282
def test_basic_extract_realm(self):
283
scheme, remainder = self.parse_header(
284
'Basic realm="Thou should not pass"',
285
_urllib2_wrappers.BasicAuthHandler)
286
match, realm = self.auth_handler.extract_realm(remainder)
287
self.assertTrue(match is not None)
288
self.assertEquals('Thou should not pass', realm)
290
def test_digest_header(self):
291
scheme, remainder = self.parse_header(
292
'Digest realm="Thou should not pass"')
293
self.assertEquals('digest', scheme)
294
self.assertEquals('realm="Thou should not pass"', remainder)
219
297
class TestHTTPServer(tests.TestCase):
220
298
"""Test the HTTP servers implementations."""
1488
1560
# initial 'who are you' and 'this is not you, who are you')
1489
1561
self.assertEqual(2, self.server.auth_required_errors)
1563
def test_prompt_for_username(self):
1564
if self._testing_pycurl():
1565
raise tests.TestNotApplicable(
1566
'pycurl cannot prompt, it handles auth by embedding'
1567
' user:pass in urls only')
1569
self.server.add_user('joe', 'foo')
1570
t = self.get_user_transport(None, None)
1571
stdout = tests.StringIOWrapper()
1572
stderr = tests.StringIOWrapper()
1573
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1574
stdout=stdout, stderr=stderr)
1575
self.assertEqual('contents of a\n',t.get('a').read())
1576
# stdin should be empty
1577
self.assertEqual('', ui.ui_factory.stdin.readline())
1579
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1580
self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1581
self.assertEquals('', stdout.getvalue())
1582
self._check_password_prompt(t._unqualified_scheme, 'joe',
1491
1585
def test_prompt_for_password(self):
1492
1586
if self._testing_pycurl():
1493
1587
raise tests.TestNotApplicable(
1497
1591
self.server.add_user('joe', 'foo')
1498
1592
t = self.get_user_transport('joe', None)
1499
1593
stdout = tests.StringIOWrapper()
1500
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1501
self.assertEqual('contents of a\n',t.get('a').read())
1594
stderr = tests.StringIOWrapper()
1595
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1596
stdout=stdout, stderr=stderr)
1597
self.assertEqual('contents of a\n', t.get('a').read())
1502
1598
# stdin should be empty
1503
1599
self.assertEqual('', ui.ui_factory.stdin.readline())
1504
1600
self._check_password_prompt(t._unqualified_scheme, 'joe',
1602
self.assertEquals('', stdout.getvalue())
1506
1603
# And we shouldn't prompt again for a different request
1507
1604
# against the same transport.
1508
1605
self.assertEqual('contents of b\n',t.get('b').read())