113
111
protocol_scenarios)
114
112
tests.multiply_tests(tp_tests, tp_scenarios, result)
114
# proxy auth: each auth scheme on all http versions on all implementations.
115
tppa_tests, remaining_tests = tests.split_suite_by_condition(
116
remaining_tests, tests.condition_isinstance((
119
proxy_auth_scheme_scenarios = [
120
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
121
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
123
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
125
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
proxy_auth_scheme_scenarios)
127
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
116
129
# auth: each auth scheme on all http versions on all implementations.
117
130
tpa_tests, remaining_tests = tests.split_suite_by_condition(
118
131
remaining_tests, tests.condition_isinstance((
121
134
auth_scheme_scenarios = [
122
('basic', dict(_auth_scheme='basic')),
123
('digest', dict(_auth_scheme='digest')),
135
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
136
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
138
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
125
140
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
auth_scheme_scenarios)
141
auth_scheme_scenarios)
127
142
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
129
# activity: activity on all http versions on all implementations
144
# activity: on all http[s] versions on all implementations
130
145
tpact_tests, remaining_tests = tests.split_suite_by_condition(
131
146
remaining_tests, tests.condition_isinstance((
134
149
activity_scenarios = [
135
('http', dict(_activity_server=ActivityHTTPServer)),
150
('urllib,http', dict(_activity_server=ActivityHTTPServer,
151
_transport=_urllib.HttpTransport_urllib,)),
137
153
if tests.HTTPSServerFeature.available():
138
154
activity_scenarios.append(
139
('https', dict(_activity_server=ActivityHTTPSServer)))
140
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
155
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
156
_transport=_urllib.HttpTransport_urllib,)),)
157
if features.pycurl.available():
158
activity_scenarios.append(
159
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
160
_transport=PyCurlTransport,)),)
161
if tests.HTTPSServerFeature.available():
162
from bzrlib.tests import (
165
# FIXME: Until we have a better way to handle self-signed
166
# certificates (like allowing them in a test specific
167
# authentication.conf for example), we need some specialized pycurl
168
# transport for tests.
169
class HTTPS_pycurl_transport(PyCurlTransport):
171
def __init__(self, base, _from_transport=None):
172
super(HTTPS_pycurl_transport, self).__init__(
173
base, _from_transport)
174
self.cabundle = str(ssl_certs.build_path('ca.crt'))
176
activity_scenarios.append(
177
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
178
_transport=HTTPS_pycurl_transport,)),)
180
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
142
182
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
144
184
# No parametrization for the remaining tests
224
268
def test_empty_header(self):
225
269
scheme, remainder = self.parse_header('')
226
self.assertEquals('', scheme)
270
self.assertEqual('', scheme)
227
271
self.assertIs(None, remainder)
229
273
def test_negotiate_header(self):
230
274
scheme, remainder = self.parse_header('Negotiate')
231
self.assertEquals('negotiate', scheme)
275
self.assertEqual('negotiate', scheme)
232
276
self.assertIs(None, remainder)
234
278
def test_basic_header(self):
235
279
scheme, remainder = self.parse_header(
236
280
'Basic realm="Thou should not pass"')
237
self.assertEquals('basic', scheme)
238
self.assertEquals('realm="Thou should not pass"', remainder)
281
self.assertEqual('basic', scheme)
282
self.assertEqual('realm="Thou should not pass"', remainder)
240
284
def test_basic_extract_realm(self):
241
285
scheme, remainder = self.parse_header(
349
388
def test_url_parsing(self):
350
389
f = FakeManager()
351
390
url = http.extract_auth('http://example.com', f)
352
self.assertEquals('http://example.com', url)
353
self.assertEquals(0, len(f.credentials))
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
354
393
url = http.extract_auth(
355
394
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
356
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
357
self.assertEquals(1, len(f.credentials))
358
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
395
self.assertEqual('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'www.bazaar-vcs.org', 'user', 'pass'],
362
401
class TestHttpTransportUrls(tests.TestCase):
409
448
https by supplying a fake version_info that do not
415
raise tests.TestSkipped('pycurl not present')
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
417
455
version_info_orig = pycurl.version_info
419
# Now that we have pycurl imported, we can fake its version_info
420
# This was taken from a windows pycurl without SSL
422
pycurl.version_info = lambda : (2,
430
('ftp', 'gopher', 'telnet',
431
'dict', 'ldap', 'http', 'file'),
435
self.assertRaises(errors.DependencyNotPresent, self._transport,
436
'https://launchpad.net')
438
# Restore the right function
439
457
pycurl.version_info = version_info_orig
458
self.addCleanup(restore)
460
# Fake the pycurl version_info This was taken from a windows pycurl
461
# without SSL (thanks to bialix)
462
pycurl.version_info = lambda : (2,
470
('ftp', 'gopher', 'telnet',
471
'dict', 'ldap', 'http', 'file'),
475
self.assertRaises(errors.DependencyNotPresent, self._transport,
476
'https://launchpad.net')
442
479
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
583
621
# for details) make no distinction between a closed
584
622
# socket and badly formatted status line, so we can't
585
623
# just test for ConnectionError, we have to test
586
# InvalidHttpResponse too.
587
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
624
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
625
# instead of ConnectionError too.
626
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
627
errors.InvalidHttpResponse),
588
628
t.has, 'foo/bar')
590
630
def test_http_get(self):
591
631
server = self.get_readonly_server()
592
632
t = self._transport(server.get_url())
593
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
633
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
634
errors.InvalidHttpResponse),
594
635
t.get, 'foo/bar')
722
763
self.assertEqual(None, server.host)
723
764
self.assertEqual(None, server.port)
725
def test_setUp_and_tearDown(self):
766
def test_setUp_and_stop(self):
726
767
server = RecordingServer(expect_body_tail=None)
768
server.start_server()
729
770
self.assertNotEqual(None, server.host)
730
771
self.assertNotEqual(None, server.port)
733
774
self.assertEqual(None, server.host)
734
775
self.assertEqual(None, server.port)
736
777
def test_send_receive_bytes(self):
737
server = RecordingServer(expect_body_tail='c')
739
self.addCleanup(server.tearDown)
778
server = RecordingServer(expect_body_tail='c', scheme='http')
779
self.start_server(server)
740
780
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
741
781
sock.connect((server.host, server.port))
742
782
sock.sendall('abc')
1534
1570
self.server.add_user('joe', 'foo')
1535
1571
t = self.get_user_transport(None, None)
1536
1572
stdout = tests.StringIOWrapper()
1537
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
1573
stderr = tests.StringIOWrapper()
1574
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1575
stdout=stdout, stderr=stderr)
1538
1576
self.assertEqual('contents of a\n',t.get('a').read())
1539
1577
# stdin should be empty
1540
1578
self.assertEqual('', ui.ui_factory.stdin.readline())
1542
1580
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1543
self.assertEquals(expected_prompt, stdout.read(len(expected_prompt)))
1581
self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1582
self.assertEqual('', stdout.getvalue())
1544
1583
self._check_password_prompt(t._unqualified_scheme, 'joe',
1547
1586
def test_prompt_for_password(self):
1548
1587
if self._testing_pycurl():
1553
1592
self.server.add_user('joe', 'foo')
1554
1593
t = self.get_user_transport('joe', None)
1555
1594
stdout = tests.StringIOWrapper()
1556
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1557
self.assertEqual('contents of a\n',t.get('a').read())
1595
stderr = tests.StringIOWrapper()
1596
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1597
stdout=stdout, stderr=stderr)
1598
self.assertEqual('contents of a\n', t.get('a').read())
1558
1599
# stdin should be empty
1559
1600
self.assertEqual('', ui.ui_factory.stdin.readline())
1560
1601
self._check_password_prompt(t._unqualified_scheme, 'joe',
1603
self.assertEqual('', stdout.getvalue())
1562
1604
# And we shouldn't prompt again for a different request
1563
1605
# against the same transport.
1564
1606
self.assertEqual('contents of b\n',t.get('b').read())
2067
2098
code, f = t._post('abc def end-of-body\n')
2068
2099
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2069
2100
self.assertActivitiesMatch()
2103
class TestActivity(tests.TestCase, TestActivityMixin):
2106
tests.TestCase.setUp(self)
2107
self.server = self._activity_server(self._protocol_version)
2108
self.server.start_server()
2109
self.activities = {}
2110
def report_activity(t, bytes, direction):
2111
count = self.activities.get(direction, 0)
2113
self.activities[direction] = count
2115
# We override at class level because constructors may propagate the
2116
# bound method and render instance overriding ineffective (an
2117
# alternative would be to define a specific ui factory instead...)
2118
self.orig_report_activity = self._transport._report_activity
2119
self._transport._report_activity = report_activity
2122
self._transport._report_activity = self.orig_report_activity
2123
self.server.stop_server()
2124
tests.TestCase.tearDown(self)
2127
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2130
tests.TestCase.setUp(self)
2131
# Unlike TestActivity, we are really testing ReportingFileSocket and
2132
# ReportingSocket, so we don't need all the parametrization. Since
2133
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2134
# test them through their use by the transport than directly (that's a
2135
# bit less clean but far more simpler and effective).
2136
self.server = ActivityHTTPServer('HTTP/1.1')
2137
self._transport=_urllib.HttpTransport_urllib
2139
self.server.start_server()
2141
# We override at class level because constructors may propagate the
2142
# bound method and render instance overriding ineffective (an
2143
# alternative would be to define a specific ui factory instead...)
2144
self.orig_report_activity = self._transport._report_activity
2145
self._transport._report_activity = None
2148
self._transport._report_activity = self.orig_report_activity
2149
self.server.stop_server()
2150
tests.TestCase.tearDown(self)
2152
def assertActivitiesMatch(self):
2153
# Nothing to check here
2157
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2158
"""Test authentication on the redirected http server."""
2160
_auth_header = 'Authorization'
2161
_password_prompt_prefix = ''
2162
_username_prompt_prefix = ''
2163
_auth_server = http_utils.HTTPBasicAuthServer
2164
_transport = _urllib.HttpTransport_urllib
2166
def create_transport_readonly_server(self):
2167
return self._auth_server()
2169
def create_transport_secondary_server(self):
2170
"""Create the secondary server redirecting to the primary server"""
2171
new = self.get_readonly_server()
2173
redirecting = http_utils.HTTPServerRedirecting()
2174
redirecting.redirect_to(new.host, new.port)
2178
super(TestAuthOnRedirected, self).setUp()
2179
self.build_tree_contents([('a','a'),
2181
('1/a', 'redirected once'),
2183
new_prefix = 'http://%s:%s' % (self.new_server.host,
2184
self.new_server.port)
2185
self.old_server.redirections = [
2186
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2187
self.old_transport = self._transport(self.old_server.get_url())
2188
self.new_server.add_user('joe', 'foo')
2190
def get_a(self, transport):
2191
return transport.get('a')
2193
def test_auth_on_redirected_via_do_catching_redirections(self):
2194
self.redirections = 0
2196
def redirected(transport, exception, redirection_notice):
2197
self.redirections += 1
2198
dir, file = urlutils.split(exception.target)
2199
return self._transport(dir)
2201
stdout = tests.StringIOWrapper()
2202
stderr = tests.StringIOWrapper()
2203
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2204
stdout=stdout, stderr=stderr)
2205
self.assertEqual('redirected once',
2206
transport.do_catching_redirections(
2207
self.get_a, self.old_transport, redirected).read())
2208
self.assertEqual(1, self.redirections)
2209
# stdin should be empty
2210
self.assertEqual('', ui.ui_factory.stdin.readline())
2211
# stdout should be empty, stderr will contains the prompts
2212
self.assertEqual('', stdout.getvalue())
2214
def test_auth_on_redirected_via_following_redirections(self):
2215
self.new_server.add_user('joe', 'foo')
2216
stdout = tests.StringIOWrapper()
2217
stderr = tests.StringIOWrapper()
2218
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2219
stdout=stdout, stderr=stderr)
2220
t = self.old_transport
2221
req = RedirectedRequest('GET', t.abspath('a'))
2222
new_prefix = 'http://%s:%s' % (self.new_server.host,
2223
self.new_server.port)
2224
self.old_server.redirections = [
2225
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2226
self.assertEqual('redirected once',t._perform(req).read())
2227
# stdin should be empty
2228
self.assertEqual('', ui.ui_factory.stdin.readline())
2229
# stdout should be empty, stderr will contains the prompts
2230
self.assertEqual('', stdout.getvalue())