113
113
protocol_scenarios)
114
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
116
# proxy auth: each auth scheme on all http versions on all implementations.
117
tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
remaining_tests, tests.condition_isinstance((
121
proxy_auth_scheme_scenarios = [
122
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
125
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
127
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
proxy_auth_scheme_scenarios)
129
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
116
131
# auth: each auth scheme on all http versions on all implementations.
117
132
tpa_tests, remaining_tests = tests.split_suite_by_condition(
118
133
remaining_tests, tests.condition_isinstance((
121
136
auth_scheme_scenarios = [
122
('basic', dict(_auth_scheme='basic')),
123
('digest', dict(_auth_scheme='digest')),
137
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
140
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
125
142
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
auth_scheme_scenarios)
143
auth_scheme_scenarios)
127
144
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
129
# activity: activity on all http versions on all implementations
146
# activity: on all http[s] versions on all implementations
130
147
tpact_tests, remaining_tests = tests.split_suite_by_condition(
131
148
remaining_tests, tests.condition_isinstance((
134
151
activity_scenarios = [
135
('http', dict(_activity_server=ActivityHTTPServer)),
152
('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
_transport=_urllib.HttpTransport_urllib,)),
137
155
if tests.HTTPSServerFeature.available():
138
156
activity_scenarios.append(
139
('https', dict(_activity_server=ActivityHTTPSServer)))
140
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
157
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
_transport=_urllib.HttpTransport_urllib,)),)
160
activity_scenarios.append(
161
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
_transport=PyCurlTransport,)),)
163
if tests.HTTPSServerFeature.available():
164
from bzrlib.tests import (
167
# FIXME: Until we have a better way to handle self-signed
168
# certificates (like allowing them in a test specific
169
# authentication.conf for example), we need some specialized pycurl
170
# transport for tests.
171
class HTTPS_pycurl_transport(PyCurlTransport):
173
def __init__(self, base, _from_transport=None):
174
super(HTTPS_pycurl_transport, self).__init__(
175
base, _from_transport)
176
self.cabundle = str(ssl_certs.build_path('ca.crt'))
178
activity_scenarios.append(
179
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
_transport=HTTPS_pycurl_transport,)),)
182
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
142
184
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
144
186
# No parametrization for the remaining tests
583
625
# for details) make no distinction between a closed
584
626
# socket and badly formatted status line, so we can't
585
627
# just test for ConnectionError, we have to test
586
# InvalidHttpResponse too.
587
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
628
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
629
# instead of ConnectionError too.
630
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
631
errors.InvalidHttpResponse),
588
632
t.has, 'foo/bar')
590
634
def test_http_get(self):
591
635
server = self.get_readonly_server()
592
636
t = self._transport(server.get_url())
593
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
637
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
638
errors.InvalidHttpResponse),
594
639
t.get, 'foo/bar')
1459
1506
('b', 'contents of b\n'),])
1461
1508
def create_transport_readonly_server(self):
1462
if self._auth_scheme == 'basic':
1463
server = http_utils.HTTPBasicAuthServer(
1464
protocol_version=self._protocol_version)
1466
if self._auth_scheme != 'digest':
1467
raise AssertionError('Unknown auth scheme: %r'
1468
% self._auth_scheme)
1469
server = http_utils.HTTPDigestAuthServer(
1470
protocol_version=self._protocol_version)
1509
return self._auth_server(protocol_version=self._protocol_version)
1473
1511
def _testing_pycurl(self):
1474
1512
return pycurl_present and self._transport == PyCurlTransport
1534
1572
self.server.add_user('joe', 'foo')
1535
1573
t = self.get_user_transport(None, None)
1536
1574
stdout = tests.StringIOWrapper()
1537
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
1575
stderr = tests.StringIOWrapper()
1576
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1577
stdout=stdout, stderr=stderr)
1538
1578
self.assertEqual('contents of a\n',t.get('a').read())
1539
1579
# stdin should be empty
1540
1580
self.assertEqual('', ui.ui_factory.stdin.readline())
1542
1582
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1543
self.assertEquals(expected_prompt, stdout.read(len(expected_prompt)))
1583
self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1584
self.assertEquals('', stdout.getvalue())
1544
1585
self._check_password_prompt(t._unqualified_scheme, 'joe',
1547
1588
def test_prompt_for_password(self):
1548
1589
if self._testing_pycurl():
1553
1594
self.server.add_user('joe', 'foo')
1554
1595
t = self.get_user_transport('joe', None)
1555
1596
stdout = tests.StringIOWrapper()
1556
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1557
self.assertEqual('contents of a\n',t.get('a').read())
1597
stderr = tests.StringIOWrapper()
1598
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1599
stdout=stdout, stderr=stderr)
1600
self.assertEqual('contents of a\n', t.get('a').read())
1558
1601
# stdin should be empty
1559
1602
self.assertEqual('', ui.ui_factory.stdin.readline())
1560
1603
self._check_password_prompt(t._unqualified_scheme, 'joe',
1605
self.assertEquals('', stdout.getvalue())
1562
1606
# And we shouldn't prompt again for a different request
1563
1607
# against the same transport.
1564
1608
self.assertEqual('contents of b\n',t.get('b').read())
1667
1712
('b-proxied', 'contents of b\n'),
1670
def create_transport_readonly_server(self):
1671
if self._auth_scheme == 'basic':
1672
server = http_utils.ProxyBasicAuthServer(
1673
protocol_version=self._protocol_version)
1675
if self._auth_scheme != 'digest':
1676
raise AssertionError('Unknown auth scheme: %r'
1677
% self._auth_scheme)
1678
server = http_utils.ProxyDigestAuthServer(
1679
protocol_version=self._protocol_version)
1682
1715
def get_user_transport(self, user, password):
1683
1716
self._install_env({'all_proxy': self.get_user_url(user, password)})
1684
1717
return self._transport(self.server.get_url())