141
143
auth_scheme_scenarios)
142
144
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
144
# activity: on all http[s] versions on all implementations
146
# activity: activity on all http versions on all implementations
145
147
tpact_tests, remaining_tests = tests.split_suite_by_condition(
146
148
remaining_tests, tests.condition_isinstance((
149
151
activity_scenarios = [
150
('urllib,http', dict(_activity_server=ActivityHTTPServer,
151
_transport=_urllib.HttpTransport_urllib,)),
152
('http', dict(_activity_server=ActivityHTTPServer)),
153
154
if tests.HTTPSServerFeature.available():
154
155
activity_scenarios.append(
155
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
156
_transport=_urllib.HttpTransport_urllib,)),)
157
if features.pycurl.available():
158
activity_scenarios.append(
159
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
160
_transport=PyCurlTransport,)),)
161
if tests.HTTPSServerFeature.available():
162
from bzrlib.tests import (
165
# FIXME: Until we have a better way to handle self-signed
166
# certificates (like allowing them in a test specific
167
# authentication.conf for example), we need some specialized pycurl
168
# transport for tests.
169
class HTTPS_pycurl_transport(PyCurlTransport):
171
def __init__(self, base, _from_transport=None):
172
super(HTTPS_pycurl_transport, self).__init__(
173
base, _from_transport)
174
self.cabundle = str(ssl_certs.build_path('ca.crt'))
176
activity_scenarios.append(
177
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
178
_transport=HTTPS_pycurl_transport,)),)
180
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
156
('https', dict(_activity_server=ActivityHTTPSServer)))
157
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
182
159
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
184
161
# No parametrization for the remaining tests
268
241
def test_empty_header(self):
269
242
scheme, remainder = self.parse_header('')
270
self.assertEqual('', scheme)
243
self.assertEquals('', scheme)
271
244
self.assertIs(None, remainder)
273
246
def test_negotiate_header(self):
274
247
scheme, remainder = self.parse_header('Negotiate')
275
self.assertEqual('negotiate', scheme)
248
self.assertEquals('negotiate', scheme)
276
249
self.assertIs(None, remainder)
278
251
def test_basic_header(self):
279
252
scheme, remainder = self.parse_header(
280
253
'Basic realm="Thou should not pass"')
281
self.assertEqual('basic', scheme)
282
self.assertEqual('realm="Thou should not pass"', remainder)
254
self.assertEquals('basic', scheme)
255
self.assertEquals('realm="Thou should not pass"', remainder)
284
257
def test_basic_extract_realm(self):
285
258
scheme, remainder = self.parse_header(
388
366
def test_url_parsing(self):
389
367
f = FakeManager()
390
368
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
369
self.assertEquals('http://example.com', url)
370
self.assertEquals(0, len(f.credentials))
393
371
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
372
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
373
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
374
self.assertEquals(1, len(f.credentials))
375
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
401
379
class TestHttpTransportUrls(tests.TestCase):
448
426
https by supplying a fake version_info that do not
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
432
raise tests.TestSkipped('pycurl not present')
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
434
version_info_orig = pycurl.version_info
436
# Now that we have pycurl imported, we can fake its version_info
437
# This was taken from a windows pycurl without SSL
439
pycurl.version_info = lambda : (2,
447
('ftp', 'gopher', 'telnet',
448
'dict', 'ldap', 'http', 'file'),
452
self.assertRaises(errors.DependencyNotPresent, self._transport,
453
'https://launchpad.net')
455
# Restore the right function
456
pycurl.version_info = version_info_orig
475
459
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
617
600
# for details) make no distinction between a closed
618
601
# socket and badly formatted status line, so we can't
619
602
# just test for ConnectionError, we have to test
620
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
621
# instead of ConnectionError too.
622
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
623
errors.InvalidHttpResponse),
603
# InvalidHttpResponse too.
604
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
624
605
t.has, 'foo/bar')
626
607
def test_http_get(self):
627
608
server = self.get_readonly_server()
628
609
t = self._transport(server.get_url())
629
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
630
errors.InvalidHttpResponse),
610
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
631
611
t.get, 'foo/bar')
759
739
self.assertEqual(None, server.host)
760
740
self.assertEqual(None, server.port)
762
def test_setUp_and_stop(self):
742
def test_setUp_and_tearDown(self):
763
743
server = RecordingServer(expect_body_tail=None)
764
server.start_server()
766
746
self.assertNotEqual(None, server.host)
767
747
self.assertNotEqual(None, server.port)
770
750
self.assertEqual(None, server.host)
771
751
self.assertEqual(None, server.port)
773
753
def test_send_receive_bytes(self):
774
server = RecordingServer(expect_body_tail='c', scheme='http')
775
self.start_server(server)
754
server = RecordingServer(expect_body_tail='c')
756
self.addCleanup(server.tearDown)
776
757
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
777
758
sock.connect((server.host, server.port))
778
759
sock.sendall('abc')
2090
2072
code, f = t._post('abc def end-of-body\n')
2091
2073
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2092
2074
self.assertActivitiesMatch()
2095
class TestActivity(tests.TestCase, TestActivityMixin):
2098
tests.TestCase.setUp(self)
2099
self.server = self._activity_server(self._protocol_version)
2100
self.server.start_server()
2101
self.activities = {}
2102
def report_activity(t, bytes, direction):
2103
count = self.activities.get(direction, 0)
2105
self.activities[direction] = count
2107
# We override at class level because constructors may propagate the
2108
# bound method and render instance overriding ineffective (an
2109
# alternative would be to define a specific ui factory instead...)
2110
self.orig_report_activity = self._transport._report_activity
2111
self._transport._report_activity = report_activity
2114
self._transport._report_activity = self.orig_report_activity
2115
self.server.stop_server()
2116
tests.TestCase.tearDown(self)
2119
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2122
tests.TestCase.setUp(self)
2123
# Unlike TestActivity, we are really testing ReportingFileSocket and
2124
# ReportingSocket, so we don't need all the parametrization. Since
2125
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2126
# test them through their use by the transport than directly (that's a
2127
# bit less clean but far more simpler and effective).
2128
self.server = ActivityHTTPServer('HTTP/1.1')
2129
self._transport=_urllib.HttpTransport_urllib
2131
self.server.start_server()
2133
# We override at class level because constructors may propagate the
2134
# bound method and render instance overriding ineffective (an
2135
# alternative would be to define a specific ui factory instead...)
2136
self.orig_report_activity = self._transport._report_activity
2137
self._transport._report_activity = None
2140
self._transport._report_activity = self.orig_report_activity
2141
self.server.stop_server()
2142
tests.TestCase.tearDown(self)
2144
def assertActivitiesMatch(self):
2145
# Nothing to check here
2149
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2150
"""Test authentication on the redirected http server."""
2152
_auth_header = 'Authorization'
2153
_password_prompt_prefix = ''
2154
_username_prompt_prefix = ''
2155
_auth_server = http_utils.HTTPBasicAuthServer
2156
_transport = _urllib.HttpTransport_urllib
2158
def create_transport_readonly_server(self):
2159
return self._auth_server()
2161
def create_transport_secondary_server(self):
2162
"""Create the secondary server redirecting to the primary server"""
2163
new = self.get_readonly_server()
2165
redirecting = http_utils.HTTPServerRedirecting()
2166
redirecting.redirect_to(new.host, new.port)
2170
super(TestAuthOnRedirected, self).setUp()
2171
self.build_tree_contents([('a','a'),
2173
('1/a', 'redirected once'),
2175
new_prefix = 'http://%s:%s' % (self.new_server.host,
2176
self.new_server.port)
2177
self.old_server.redirections = [
2178
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2179
self.old_transport = self._transport(self.old_server.get_url())
2180
self.new_server.add_user('joe', 'foo')
2182
def get_a(self, transport):
2183
return transport.get('a')
2185
def test_auth_on_redirected_via_do_catching_redirections(self):
2186
self.redirections = 0
2188
def redirected(transport, exception, redirection_notice):
2189
self.redirections += 1
2190
dir, file = urlutils.split(exception.target)
2191
return self._transport(dir)
2193
stdout = tests.StringIOWrapper()
2194
stderr = tests.StringIOWrapper()
2195
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2196
stdout=stdout, stderr=stderr)
2197
self.assertEqual('redirected once',
2198
transport.do_catching_redirections(
2199
self.get_a, self.old_transport, redirected).read())
2200
self.assertEqual(1, self.redirections)
2201
# stdin should be empty
2202
self.assertEqual('', ui.ui_factory.stdin.readline())
2203
# stdout should be empty, stderr will contains the prompts
2204
self.assertEqual('', stdout.getvalue())
2206
def test_auth_on_redirected_via_following_redirections(self):
2207
self.new_server.add_user('joe', 'foo')
2208
stdout = tests.StringIOWrapper()
2209
stderr = tests.StringIOWrapper()
2210
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2211
stdout=stdout, stderr=stderr)
2212
t = self.old_transport
2213
req = RedirectedRequest('GET', t.abspath('a'))
2214
new_prefix = 'http://%s:%s' % (self.new_server.host,
2215
self.new_server.port)
2216
self.old_server.redirections = [
2217
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2218
self.assertEqual('redirected once',t._perform(req).read())
2219
# stdin should be empty
2220
self.assertEqual('', ui.ui_factory.stdin.readline())
2221
# stdout should be empty, stderr will contains the prompts
2222
self.assertEqual('', stdout.getvalue())