82
84
_server=http_server.HttpServer_urllib,
83
85
_qualified_prefix='http+urllib',)),
85
if features.pycurl.available():
86
88
transport_scenarios.append(
87
89
('pycurl', dict(_transport=PyCurlTransport,
88
90
_server=http_server.HttpServer_PyCurl,
89
91
_qualified_prefix='http+pycurl',)))
90
92
tests.multiply_tests(t_tests, transport_scenarios, result)
92
protocol_scenarios = [
93
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
94
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
97
# some tests are parametrized by the protocol version only
98
p_tests, remaining_tests = tests.split_suite_by_condition(
99
remaining_tests, tests.condition_isinstance((
100
TestAuthOnRedirected,
102
tests.multiply_tests(p_tests, protocol_scenarios, result)
104
94
# each implementation tested with each HTTP version
105
95
tp_tests, remaining_tests = tests.split_suite_by_condition(
106
96
remaining_tests, tests.condition_isinstance((
116
106
TestSpecificRequestHandler,
108
protocol_scenarios = [
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
118
112
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
119
113
protocol_scenarios)
120
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
122
# proxy auth: each auth scheme on all http versions on all implementations.
123
tppa_tests, remaining_tests = tests.split_suite_by_condition(
124
remaining_tests, tests.condition_isinstance((
127
proxy_auth_scheme_scenarios = [
128
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
129
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
131
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
133
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
134
proxy_auth_scheme_scenarios)
135
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
137
116
# auth: each auth scheme on all http versions on all implementations.
138
117
tpa_tests, remaining_tests = tests.split_suite_by_condition(
139
118
remaining_tests, tests.condition_isinstance((
142
121
auth_scheme_scenarios = [
143
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
144
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
146
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
122
('basic', dict(_auth_scheme='basic')),
123
('digest', dict(_auth_scheme='digest')),
148
125
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
149
auth_scheme_scenarios)
126
auth_scheme_scenarios)
150
127
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
152
# activity: on all http[s] versions on all implementations
129
# activity: activity on all http versions on all implementations
153
130
tpact_tests, remaining_tests = tests.split_suite_by_condition(
154
131
remaining_tests, tests.condition_isinstance((
157
134
activity_scenarios = [
158
('urllib,http', dict(_activity_server=ActivityHTTPServer,
159
_transport=_urllib.HttpTransport_urllib,)),
135
('http', dict(_activity_server=ActivityHTTPServer)),
161
137
if tests.HTTPSServerFeature.available():
162
138
activity_scenarios.append(
163
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
164
_transport=_urllib.HttpTransport_urllib,)),)
165
if features.pycurl.available():
166
activity_scenarios.append(
167
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
168
_transport=PyCurlTransport,)),)
169
if tests.HTTPSServerFeature.available():
170
from bzrlib.tests import (
173
# FIXME: Until we have a better way to handle self-signed
174
# certificates (like allowing them in a test specific
175
# authentication.conf for example), we need some specialized pycurl
176
# transport for tests.
177
class HTTPS_pycurl_transport(PyCurlTransport):
179
def __init__(self, base, _from_transport=None):
180
super(HTTPS_pycurl_transport, self).__init__(
181
base, _from_transport)
182
self.cabundle = str(ssl_certs.build_path('ca.crt'))
184
activity_scenarios.append(
185
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
186
_transport=HTTPS_pycurl_transport,)),)
188
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
139
('https', dict(_activity_server=ActivityHTTPSServer)))
140
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
190
142
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
192
144
# No parametrization for the remaining tests
268
216
class TestAuthHeader(tests.TestCase):
270
def parse_header(self, header, auth_handler_class=None):
271
if auth_handler_class is None:
272
auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
273
self.auth_handler = auth_handler_class()
274
return self.auth_handler._parse_auth_header(header)
218
def parse_header(self, header):
219
ah = _urllib2_wrappers.AbstractAuthHandler()
220
return ah._parse_auth_header(header)
276
222
def test_empty_header(self):
277
223
scheme, remainder = self.parse_header('')
278
self.assertEqual('', scheme)
224
self.assertEquals('', scheme)
279
225
self.assertIs(None, remainder)
281
227
def test_negotiate_header(self):
282
228
scheme, remainder = self.parse_header('Negotiate')
283
self.assertEqual('negotiate', scheme)
229
self.assertEquals('negotiate', scheme)
284
230
self.assertIs(None, remainder)
286
232
def test_basic_header(self):
287
233
scheme, remainder = self.parse_header(
288
234
'Basic realm="Thou should not pass"')
289
self.assertEqual('basic', scheme)
290
self.assertEqual('realm="Thou should not pass"', remainder)
292
def test_basic_extract_realm(self):
293
scheme, remainder = self.parse_header(
294
'Basic realm="Thou should not pass"',
295
_urllib2_wrappers.BasicAuthHandler)
296
match, realm = self.auth_handler.extract_realm(remainder)
297
self.assertTrue(match is not None)
298
self.assertEqual('Thou should not pass', realm)
235
self.assertEquals('basic', scheme)
236
self.assertEquals('realm="Thou should not pass"', remainder)
300
238
def test_digest_header(self):
301
239
scheme, remainder = self.parse_header(
302
240
'Digest realm="Thou should not pass"')
303
self.assertEqual('digest', scheme)
304
self.assertEqual('realm="Thou should not pass"', remainder)
241
self.assertEquals('digest', scheme)
242
self.assertEquals('realm="Thou should not pass"', remainder)
307
245
class TestHTTPServer(tests.TestCase):
396
339
def test_url_parsing(self):
397
340
f = FakeManager()
398
341
url = http.extract_auth('http://example.com', f)
399
self.assertEqual('http://example.com', url)
400
self.assertEqual(0, len(f.credentials))
342
self.assertEquals('http://example.com', url)
343
self.assertEquals(0, len(f.credentials))
401
344
url = http.extract_auth(
402
'http://user:pass@example.com/bzr/bzr.dev', f)
403
self.assertEqual('http://example.com/bzr/bzr.dev', url)
404
self.assertEqual(1, len(f.credentials))
405
self.assertEqual([None, 'example.com', 'user', 'pass'],
345
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
346
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
347
self.assertEquals(1, len(f.credentials))
348
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
409
352
class TestHttpTransportUrls(tests.TestCase):
456
399
https by supplying a fake version_info that do not
459
self.requireFeature(features.pycurl)
460
# Import the module locally now that we now it's available.
461
pycurl = features.pycurl.module
405
raise tests.TestSkipped('pycurl not present')
463
self.overrideAttr(pycurl, 'version_info',
464
# Fake the pycurl version_info This was taken from
465
# a windows pycurl without SSL (thanks to bialix)
474
('ftp', 'gopher', 'telnet',
475
'dict', 'ldap', 'http', 'file'),
479
self.assertRaises(errors.DependencyNotPresent, self._transport,
480
'https://launchpad.net')
407
version_info_orig = pycurl.version_info
409
# Now that we have pycurl imported, we can fake its version_info
410
# This was taken from a windows pycurl without SSL
412
pycurl.version_info = lambda : (2,
420
('ftp', 'gopher', 'telnet',
421
'dict', 'ldap', 'http', 'file'),
425
self.assertRaises(errors.DependencyNotPresent, self._transport,
426
'https://launchpad.net')
428
# Restore the right function
429
pycurl.version_info = version_info_orig
483
432
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
625
573
# for details) make no distinction between a closed
626
574
# socket and badly formatted status line, so we can't
627
575
# just test for ConnectionError, we have to test
628
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
629
# instead of ConnectionError too.
630
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
631
errors.InvalidHttpResponse),
576
# InvalidHttpResponse too.
577
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
632
578
t.has, 'foo/bar')
634
580
def test_http_get(self):
635
581
server = self.get_readonly_server()
636
582
t = self._transport(server.get_url())
637
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
638
errors.InvalidHttpResponse),
583
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
639
584
t.get, 'foo/bar')
767
712
self.assertEqual(None, server.host)
768
713
self.assertEqual(None, server.port)
770
def test_setUp_and_stop(self):
715
def test_setUp_and_tearDown(self):
771
716
server = RecordingServer(expect_body_tail=None)
772
server.start_server()
774
719
self.assertNotEqual(None, server.host)
775
720
self.assertNotEqual(None, server.port)
778
723
self.assertEqual(None, server.host)
779
724
self.assertEqual(None, server.port)
781
726
def test_send_receive_bytes(self):
782
server = RecordingServer(expect_body_tail='c', scheme='http')
783
self.start_server(server)
727
server = RecordingServer(expect_body_tail='c')
729
self.addCleanup(server.tearDown)
784
730
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
785
731
sock.connect((server.host, server.port))
786
732
sock.sendall('abc')
1561
1514
# initial 'who are you' and 'this is not you, who are you')
1562
1515
self.assertEqual(2, self.server.auth_required_errors)
1564
def test_prompt_for_username(self):
1565
if self._testing_pycurl():
1566
raise tests.TestNotApplicable(
1567
'pycurl cannot prompt, it handles auth by embedding'
1568
' user:pass in urls only')
1570
self.server.add_user('joe', 'foo')
1571
t = self.get_user_transport(None, None)
1572
stdout = tests.StringIOWrapper()
1573
stderr = tests.StringIOWrapper()
1574
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1575
stdout=stdout, stderr=stderr)
1576
self.assertEqual('contents of a\n',t.get('a').read())
1577
# stdin should be empty
1578
self.assertEqual('', ui.ui_factory.stdin.readline())
1580
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1581
self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1582
self.assertEqual('', stdout.getvalue())
1583
self._check_password_prompt(t._unqualified_scheme, 'joe',
1586
1517
def test_prompt_for_password(self):
1587
1518
if self._testing_pycurl():
1588
1519
raise tests.TestNotApplicable(
1592
1523
self.server.add_user('joe', 'foo')
1593
1524
t = self.get_user_transport('joe', None)
1594
1525
stdout = tests.StringIOWrapper()
1595
stderr = tests.StringIOWrapper()
1596
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1597
stdout=stdout, stderr=stderr)
1598
self.assertEqual('contents of a\n', t.get('a').read())
1526
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1527
self.assertEqual('contents of a\n',t.get('a').read())
1599
1528
# stdin should be empty
1600
1529
self.assertEqual('', ui.ui_factory.stdin.readline())
1601
1530
self._check_password_prompt(t._unqualified_scheme, 'joe',
1603
self.assertEqual('', stdout.getvalue())
1604
1532
# And we shouldn't prompt again for a different request
1605
1533
# against the same transport.
1606
1534
self.assertEqual('contents of b\n',t.get('b').read())
2098
2030
code, f = t._post('abc def end-of-body\n')
2099
2031
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2100
2032
self.assertActivitiesMatch()
2103
class TestActivity(tests.TestCase, TestActivityMixin):
2106
tests.TestCase.setUp(self)
2107
self.server = self._activity_server(self._protocol_version)
2108
self.server.start_server()
2109
self.activities = {}
2110
def report_activity(t, bytes, direction):
2111
count = self.activities.get(direction, 0)
2113
self.activities[direction] = count
2115
# We override at class level because constructors may propagate the
2116
# bound method and render instance overriding ineffective (an
2117
# alternative would be to define a specific ui factory instead...)
2118
self.orig_report_activity = self._transport._report_activity
2119
self._transport._report_activity = report_activity
2122
self._transport._report_activity = self.orig_report_activity
2123
self.server.stop_server()
2124
tests.TestCase.tearDown(self)
2127
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2130
tests.TestCase.setUp(self)
2131
# Unlike TestActivity, we are really testing ReportingFileSocket and
2132
# ReportingSocket, so we don't need all the parametrization. Since
2133
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2134
# test them through their use by the transport than directly (that's a
2135
# bit less clean but far more simpler and effective).
2136
self.server = ActivityHTTPServer('HTTP/1.1')
2137
self._transport=_urllib.HttpTransport_urllib
2139
self.server.start_server()
2141
# We override at class level because constructors may propagate the
2142
# bound method and render instance overriding ineffective (an
2143
# alternative would be to define a specific ui factory instead...)
2144
self.orig_report_activity = self._transport._report_activity
2145
self._transport._report_activity = None
2148
self._transport._report_activity = self.orig_report_activity
2149
self.server.stop_server()
2150
tests.TestCase.tearDown(self)
2152
def assertActivitiesMatch(self):
2153
# Nothing to check here
2157
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2158
"""Test authentication on the redirected http server."""
2160
_auth_header = 'Authorization'
2161
_password_prompt_prefix = ''
2162
_username_prompt_prefix = ''
2163
_auth_server = http_utils.HTTPBasicAuthServer
2164
_transport = _urllib.HttpTransport_urllib
2166
def create_transport_readonly_server(self):
2167
return self._auth_server(protocol_version=self._protocol_version)
2169
def create_transport_secondary_server(self):
2170
"""Create the secondary server redirecting to the primary server"""
2171
new = self.get_readonly_server()
2173
redirecting = http_utils.HTTPServerRedirecting(
2174
protocol_version=self._protocol_version)
2175
redirecting.redirect_to(new.host, new.port)
2179
super(TestAuthOnRedirected, self).setUp()
2180
self.build_tree_contents([('a','a'),
2182
('1/a', 'redirected once'),
2184
new_prefix = 'http://%s:%s' % (self.new_server.host,
2185
self.new_server.port)
2186
self.old_server.redirections = [
2187
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2188
self.old_transport = self._transport(self.old_server.get_url())
2189
self.new_server.add_user('joe', 'foo')
2191
def get_a(self, transport):
2192
return transport.get('a')
2194
def test_auth_on_redirected_via_do_catching_redirections(self):
2195
self.redirections = 0
2197
def redirected(transport, exception, redirection_notice):
2198
self.redirections += 1
2199
dir, file = urlutils.split(exception.target)
2200
return self._transport(dir)
2202
stdout = tests.StringIOWrapper()
2203
stderr = tests.StringIOWrapper()
2204
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2205
stdout=stdout, stderr=stderr)
2206
self.assertEqual('redirected once',
2207
transport.do_catching_redirections(
2208
self.get_a, self.old_transport, redirected).read())
2209
self.assertEqual(1, self.redirections)
2210
# stdin should be empty
2211
self.assertEqual('', ui.ui_factory.stdin.readline())
2212
# stdout should be empty, stderr will contains the prompts
2213
self.assertEqual('', stdout.getvalue())
2215
def test_auth_on_redirected_via_following_redirections(self):
2216
self.new_server.add_user('joe', 'foo')
2217
stdout = tests.StringIOWrapper()
2218
stderr = tests.StringIOWrapper()
2219
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2220
stdout=stdout, stderr=stderr)
2221
t = self.old_transport
2222
req = RedirectedRequest('GET', t.abspath('a'))
2223
new_prefix = 'http://%s:%s' % (self.new_server.host,
2224
self.new_server.port)
2225
self.old_server.redirections = [
2226
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2227
self.assertEqual('redirected once',t._perform(req).read())
2228
# stdin should be empty
2229
self.assertEqual('', ui.ui_factory.stdin.readline())
2230
# stdout should be empty, stderr will contains the prompts
2231
self.assertEqual('', stdout.getvalue())