13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for HTTP implementations.
19
This module defines a load_tests() method that parametrize tests classes for
20
transport implementation, http protocol versions and authentication schemes.
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
# FIXME: This test should be repeated for each available http client
18
# implementation; at the moment we have urllib and pycurl.
23
20
# TODO: Should be renamed to bzrlib.transport.http.tests?
24
21
# TODO: What about renaming to bzrlib.tests.transport.http ?
26
23
from cStringIO import StringIO
30
import SimpleHTTPServer
36
31
from bzrlib import (
41
remote as _mod_remote,
47
from bzrlib.symbol_versioning import (
50
37
from bzrlib.tests import (
43
from bzrlib.tests.HttpServer import (
48
from bzrlib.tests.HTTPTestUtil import (
49
BadProtocolRequestHandler,
50
BadStatusRequestHandler,
51
ForbiddenRequestHandler,
54
HTTPServerRedirecting,
55
InvalidStatusRequestHandler,
56
LimitedRangeHTTPServer,
57
NoRangeRequestHandler,
59
ProxyDigestAuthServer,
61
SingleRangeRequestHandler,
62
SingleOnlyRangeRequestHandler,
63
TestCaseWithRedirectedWebserver,
64
TestCaseWithTwoWebservers,
65
TestCaseWithWebserver,
55
68
from bzrlib.transport import (
70
do_catching_redirections,
59
74
from bzrlib.transport.http import (
65
if features.pycurl.available():
66
from bzrlib.transport.http._pycurl import PyCurlTransport
69
def load_tests(standard_tests, module, loader):
70
"""Multiply tests for http clients and protocol versions."""
71
result = loader.suiteClass()
73
# one for each transport implementation
74
t_tests, remaining_tests = tests.split_suite_by_condition(
75
standard_tests, tests.condition_isinstance((
76
TestHttpTransportRegistration,
77
TestHttpTransportUrls,
80
transport_scenarios = [
81
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
82
_server=http_server.HttpServer_urllib,
83
_qualified_prefix='http+urllib',)),
85
if features.pycurl.available():
86
transport_scenarios.append(
87
('pycurl', dict(_transport=PyCurlTransport,
88
_server=http_server.HttpServer_PyCurl,
89
_qualified_prefix='http+pycurl',)))
90
tests.multiply_tests(t_tests, transport_scenarios, result)
92
# each implementation tested with each HTTP version
93
tp_tests, remaining_tests = tests.split_suite_by_condition(
94
remaining_tests, tests.condition_isinstance((
95
SmartHTTPTunnellingTest,
96
TestDoCatchRedirections,
99
TestHTTPSilentRedirections,
100
TestLimitedRangeRequestServer,
104
TestSpecificRequestHandler,
106
protocol_scenarios = [
107
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
108
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
110
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
112
tests.multiply_tests(tp_tests, tp_scenarios, result)
114
# proxy auth: each auth scheme on all http versions on all implementations.
115
tppa_tests, remaining_tests = tests.split_suite_by_condition(
116
remaining_tests, tests.condition_isinstance((
119
proxy_auth_scheme_scenarios = [
120
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
121
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
123
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
125
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
proxy_auth_scheme_scenarios)
127
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
129
# auth: each auth scheme on all http versions on all implementations.
130
tpa_tests, remaining_tests = tests.split_suite_by_condition(
131
remaining_tests, tests.condition_isinstance((
134
auth_scheme_scenarios = [
135
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
136
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
138
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
140
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
141
auth_scheme_scenarios)
142
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
144
# activity: on all http[s] versions on all implementations
145
tpact_tests, remaining_tests = tests.split_suite_by_condition(
146
remaining_tests, tests.condition_isinstance((
149
activity_scenarios = [
150
('urllib,http', dict(_activity_server=ActivityHTTPServer,
151
_transport=_urllib.HttpTransport_urllib,)),
153
if tests.HTTPSServerFeature.available():
154
activity_scenarios.append(
155
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
156
_transport=_urllib.HttpTransport_urllib,)),)
157
if features.pycurl.available():
158
activity_scenarios.append(
159
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
160
_transport=PyCurlTransport,)),)
161
if tests.HTTPSServerFeature.available():
162
from bzrlib.tests import (
165
# FIXME: Until we have a better way to handle self-signed
166
# certificates (like allowing them in a test specific
167
# authentication.conf for example), we need some specialized pycurl
168
# transport for tests.
169
class HTTPS_pycurl_transport(PyCurlTransport):
171
def __init__(self, base, _from_transport=None):
172
super(HTTPS_pycurl_transport, self).__init__(
173
base, _from_transport)
174
self.cabundle = str(ssl_certs.build_path('ca.crt'))
176
activity_scenarios.append(
177
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
178
_transport=HTTPS_pycurl_transport,)),)
180
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
182
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
184
# No parametrization for the remaining tests
185
result.addTests(remaining_tests)
79
from bzrlib.transport.http._urllib import HttpTransport_urllib
80
from bzrlib.transport.http._urllib2_wrappers import (
190
87
class FakeManager(object):
260
class TestAuthHeader(tests.TestCase):
262
def parse_header(self, header, auth_handler_class=None):
263
if auth_handler_class is None:
264
auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
265
self.auth_handler = auth_handler_class()
266
return self.auth_handler._parse_auth_header(header)
268
def test_empty_header(self):
269
scheme, remainder = self.parse_header('')
270
self.assertEqual('', scheme)
271
self.assertIs(None, remainder)
273
def test_negotiate_header(self):
274
scheme, remainder = self.parse_header('Negotiate')
275
self.assertEqual('negotiate', scheme)
276
self.assertIs(None, remainder)
278
def test_basic_header(self):
279
scheme, remainder = self.parse_header(
280
'Basic realm="Thou should not pass"')
281
self.assertEqual('basic', scheme)
282
self.assertEqual('realm="Thou should not pass"', remainder)
284
def test_basic_extract_realm(self):
285
scheme, remainder = self.parse_header(
286
'Basic realm="Thou should not pass"',
287
_urllib2_wrappers.BasicAuthHandler)
288
match, realm = self.auth_handler.extract_realm(remainder)
289
self.assertTrue(match is not None)
290
self.assertEqual('Thou should not pass', realm)
292
def test_digest_header(self):
293
scheme, remainder = self.parse_header(
294
'Digest realm="Thou should not pass"')
295
self.assertEqual('digest', scheme)
296
self.assertEqual('realm="Thou should not pass"', remainder)
299
class TestHTTPServer(tests.TestCase):
300
"""Test the HTTP servers implementations."""
302
def test_invalid_protocol(self):
303
class BogusRequestHandler(http_server.TestingHTTPRequestHandler):
305
protocol_version = 'HTTP/0.1'
307
server = http_server.HttpServer(BogusRequestHandler)
309
self.assertRaises(httplib.UnknownProtocol, server.start_server)
312
self.fail('HTTP Server creation did not raise UnknownProtocol')
314
def test_force_invalid_protocol(self):
315
server = http_server.HttpServer(protocol_version='HTTP/0.1')
317
self.assertRaises(httplib.UnknownProtocol, server.start_server)
320
self.fail('HTTP Server creation did not raise UnknownProtocol')
322
def test_server_start_and_stop(self):
323
server = http_server.HttpServer()
324
server.start_server()
326
self.assertTrue(server._http_running)
329
self.assertFalse(server._http_running)
331
def test_create_http_server_one_zero(self):
332
class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
334
protocol_version = 'HTTP/1.0'
336
server = http_server.HttpServer(RequestHandlerOneZero)
337
self.start_server(server)
338
self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
340
def test_create_http_server_one_one(self):
341
class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
343
protocol_version = 'HTTP/1.1'
345
server = http_server.HttpServer(RequestHandlerOneOne)
346
self.start_server(server)
347
self.assertIsInstance(server._httpd,
348
http_server.TestingThreadingHTTPServer)
350
def test_create_http_server_force_one_one(self):
351
class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
353
protocol_version = 'HTTP/1.0'
355
server = http_server.HttpServer(RequestHandlerOneZero,
356
protocol_version='HTTP/1.1')
357
self.start_server(server)
358
self.assertIsInstance(server._httpd,
359
http_server.TestingThreadingHTTPServer)
361
def test_create_http_server_force_one_zero(self):
362
class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
364
protocol_version = 'HTTP/1.1'
366
server = http_server.HttpServer(RequestHandlerOneOne,
367
protocol_version='HTTP/1.0')
368
self.start_server(server)
369
self.assertIsInstance(server._httpd,
370
http_server.TestingHTTPServer)
373
153
class TestWithTransport_pycurl(object):
374
154
"""Test case to inherit from if pycurl is present"""
376
156
def _get_pycurl_maybe(self):
377
self.requireFeature(features.pycurl)
378
return PyCurlTransport
158
from bzrlib.transport.http._pycurl import PyCurlTransport
159
return PyCurlTransport
160
except errors.DependencyNotPresent:
161
raise TestSkipped('pycurl not present')
380
163
_transport = property(_get_pycurl_maybe)
383
class TestHttpUrls(tests.TestCase):
166
class TestHttpUrls(TestCase):
385
168
# TODO: This should be moved to authorization tests once they
388
171
def test_url_parsing(self):
389
172
f = FakeManager()
390
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
393
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
401
class TestHttpTransportUrls(tests.TestCase):
402
"""Test the http urls."""
173
url = extract_auth('http://example.com', f)
174
self.assertEquals('http://example.com', url)
175
self.assertEquals(0, len(f.credentials))
176
url = extract_auth('http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
177
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
178
self.assertEquals(1, len(f.credentials))
179
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
183
class TestHttpTransportUrls(object):
184
"""Test the http urls.
186
This MUST be used by daughter classes that also inherit from
189
We can't inherit directly from TestCase or the
190
test framework will try to create an instance which cannot
191
run, its implementation being incomplete.
404
194
def test_abs_url(self):
405
195
"""Construction of absolute http URLs"""
823
653
self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
824
654
t.readv, 'a', [(12,2)])
826
def test_readv_multiple_get_requests(self):
827
server = self.get_readonly_server()
828
t = self._transport(server.get_url())
829
# force transport to issue multiple requests
830
t._max_readv_combine = 1
831
t._max_get_ranges = 1
832
l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
833
self.assertEqual(l[0], (0, '0'))
834
self.assertEqual(l[1], (1, '1'))
835
self.assertEqual(l[2], (3, '34'))
836
self.assertEqual(l[3], (9, '9'))
837
# The server should have issued 4 requests
838
self.assertEqual(4, server.GET_request_nb)
840
def test_readv_get_max_size(self):
841
server = self.get_readonly_server()
842
t = self._transport(server.get_url())
843
# force transport to issue multiple requests by limiting the number of
844
# bytes by request. Note that this apply to coalesced offsets only, a
845
# single range will keep its size even if bigger than the limit.
847
l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
848
self.assertEqual(l[0], (0, '0'))
849
self.assertEqual(l[1], (1, '1'))
850
self.assertEqual(l[2], (2, '2345'))
851
self.assertEqual(l[3], (6, '6789'))
852
# The server should have issued 3 requests
853
self.assertEqual(3, server.GET_request_nb)
855
def test_complete_readv_leave_pipe_clean(self):
856
server = self.get_readonly_server()
857
t = self._transport(server.get_url())
858
# force transport to issue multiple requests
860
l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
861
# The server should have issued 3 requests
862
self.assertEqual(3, server.GET_request_nb)
863
self.assertEqual('0123456789', t.get_bytes('a'))
864
self.assertEqual(4, server.GET_request_nb)
866
def test_incomplete_readv_leave_pipe_clean(self):
867
server = self.get_readonly_server()
868
t = self._transport(server.get_url())
869
# force transport to issue multiple requests
871
# Don't collapse readv results into a list so that we leave unread
872
# bytes on the socket
873
ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
874
self.assertEqual((0, '0'), ireadv.next())
875
# The server should have issued one request so far
876
self.assertEqual(1, server.GET_request_nb)
877
self.assertEqual('0123456789', t.get_bytes('a'))
878
# get_bytes issued an additional request, the readv pending ones are
880
self.assertEqual(2, server.GET_request_nb)
883
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
884
"""Always reply to range request as if they were single.
886
Don't be explicit about it, just to annoy the clients.
889
def get_multiple_ranges(self, file, file_size, ranges):
890
"""Answer as if it was a single range request and ignores the rest"""
891
(start, end) = ranges[0]
892
return self.get_single_range(file, file_size, start, end)
895
657
class TestSingleRangeRequestServer(TestRangeRequestServer):
896
658
"""Test readv against a server which accept only single range requests"""
898
_req_handler_class = SingleRangeRequestHandler
901
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
902
"""Only reply to simple range requests, errors out on multiple"""
904
def get_multiple_ranges(self, file, file_size, ranges):
905
"""Refuses the multiple ranges request"""
908
self.send_error(416, "Requested range not satisfiable")
910
(start, end) = ranges[0]
911
return self.get_single_range(file, file_size, start, end)
660
def create_transport_readonly_server(self):
661
return HttpServer(SingleRangeRequestHandler)
664
class TestSingleRangeRequestServer_urllib(TestSingleRangeRequestServer,
665
TestCaseWithWebserver):
666
"""Tests single range requests accepting server for urllib implementation"""
668
_transport = HttpTransport_urllib
671
class TestSingleRangeRequestServer_pycurl(TestWithTransport_pycurl,
672
TestSingleRangeRequestServer,
673
TestCaseWithWebserver):
674
"""Tests single range requests accepting server for pycurl implementation"""
914
677
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
915
678
"""Test readv against a server which only accept single range requests"""
917
_req_handler_class = SingleOnlyRangeRequestHandler
920
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
921
"""Ignore range requests without notice"""
924
# Update the statistics
925
self.server.test_case_server.GET_request_nb += 1
926
# Just bypass the range handling done by TestingHTTPRequestHandler
927
return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
680
def create_transport_readonly_server(self):
681
return HttpServer(SingleOnlyRangeRequestHandler)
684
class TestSingleOnlyRangeRequestServer_urllib(TestSingleOnlyRangeRequestServer,
685
TestCaseWithWebserver):
686
"""Tests single range requests accepting server for urllib implementation"""
688
_transport = HttpTransport_urllib
691
class TestSingleOnlyRangeRequestServer_pycurl(TestWithTransport_pycurl,
692
TestSingleOnlyRangeRequestServer,
693
TestCaseWithWebserver):
694
"""Tests single range requests accepting server for pycurl implementation"""
930
697
class TestNoRangeRequestServer(TestRangeRequestServer):
931
698
"""Test readv against a server which do not accept range requests"""
933
_req_handler_class = NoRangeRequestHandler
936
class MultipleRangeWithoutContentLengthRequestHandler(
937
http_server.TestingHTTPRequestHandler):
938
"""Reply to multiple range requests without content length header."""
940
def get_multiple_ranges(self, file, file_size, ranges):
941
self.send_response(206)
942
self.send_header('Accept-Ranges', 'bytes')
943
boundary = "%d" % random.randint(0,0x7FFFFFFF)
944
self.send_header("Content-Type",
945
"multipart/byteranges; boundary=%s" % boundary)
947
for (start, end) in ranges:
948
self.wfile.write("--%s\r\n" % boundary)
949
self.send_header("Content-type", 'application/octet-stream')
950
self.send_header("Content-Range", "bytes %d-%d/%d" % (start,
954
self.send_range_content(file, start, end - start + 1)
956
self.wfile.write("--%s\r\n" % boundary)
959
class TestMultipleRangeWithoutContentLengthServer(TestRangeRequestServer):
961
_req_handler_class = MultipleRangeWithoutContentLengthRequestHandler
964
class TruncatedMultipleRangeRequestHandler(
965
http_server.TestingHTTPRequestHandler):
966
"""Reply to multiple range requests truncating the last ones.
968
This server generates responses whose Content-Length describes all the
969
ranges, but fail to include the last ones leading to client short reads.
970
This has been observed randomly with lighttpd (bug #179368).
700
def create_transport_readonly_server(self):
701
return HttpServer(NoRangeRequestHandler)
704
class TestNoRangeRequestServer_urllib(TestNoRangeRequestServer,
705
TestCaseWithWebserver):
706
"""Tests range requests refusing server for urllib implementation"""
708
_transport = HttpTransport_urllib
711
class TestNoRangeRequestServer_pycurl(TestWithTransport_pycurl,
712
TestNoRangeRequestServer,
713
TestCaseWithWebserver):
714
"""Tests range requests refusing server for pycurl implementation"""
717
class TestLimitedRangeRequestServer(object):
718
"""Tests readv requests against server that errors out on too much ranges.
720
This MUST be used by daughter classes that also inherit from
721
TestCaseWithWebserver.
723
We can't inherit directly from TestCaseWithWebserver or the
724
test framework will try to create an instance which cannot
725
run, its implementation being incomplete.
973
_truncated_ranges = 2
975
def get_multiple_ranges(self, file, file_size, ranges):
976
self.send_response(206)
977
self.send_header('Accept-Ranges', 'bytes')
979
self.send_header('Content-Type',
980
'multipart/byteranges; boundary=%s' % boundary)
981
boundary_line = '--%s\r\n' % boundary
982
# Calculate the Content-Length
984
for (start, end) in ranges:
985
content_length += len(boundary_line)
986
content_length += self._header_line_length(
987
'Content-type', 'application/octet-stream')
988
content_length += self._header_line_length(
989
'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
990
content_length += len('\r\n') # end headers
991
content_length += end - start # + 1
992
content_length += len(boundary_line)
993
self.send_header('Content-length', content_length)
996
# Send the multipart body
998
for (start, end) in ranges:
999
self.wfile.write(boundary_line)
1000
self.send_header('Content-type', 'application/octet-stream')
1001
self.send_header('Content-Range', 'bytes %d-%d/%d'
1002
% (start, end, file_size))
1004
if cur + self._truncated_ranges >= len(ranges):
1005
# Abruptly ends the response and close the connection
1006
self.close_connection = 1
1008
self.send_range_content(file, start, end - start + 1)
1011
self.wfile.write(boundary_line)
1014
class TestTruncatedMultipleRangeServer(TestSpecificRequestHandler):
1016
_req_handler_class = TruncatedMultipleRangeRequestHandler
1019
super(TestTruncatedMultipleRangeServer, self).setUp()
1020
self.build_tree_contents([('a', '0123456789')],)
1022
def test_readv_with_short_reads(self):
1023
server = self.get_readonly_server()
1024
t = self._transport(server.get_url())
1025
# Force separate ranges for each offset
1026
t._bytes_to_read_before_seek = 0
1027
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1028
self.assertEqual((0, '0'), ireadv.next())
1029
self.assertEqual((2, '2'), ireadv.next())
1030
if not self._testing_pycurl():
1031
# Only one request have been issued so far (except for pycurl that
1032
# try to read the whole response at once)
1033
self.assertEqual(1, server.GET_request_nb)
1034
self.assertEqual((4, '45'), ireadv.next())
1035
self.assertEqual((9, '9'), ireadv.next())
1036
# Both implementations issue 3 requests but:
1037
# - urllib does two multiple (4 ranges, then 2 ranges) then a single
1039
# - pycurl does two multiple (4 ranges, 4 ranges) then a single range
1040
self.assertEqual(3, server.GET_request_nb)
1041
# Finally the client have tried a single range request and stays in
1043
self.assertEqual('single', t._range_hint)
1045
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1046
"""Errors out when range specifiers exceed the limit"""
1048
def get_multiple_ranges(self, file, file_size, ranges):
1049
"""Refuses the multiple ranges request"""
1050
tcs = self.server.test_case_server
1051
if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
1053
# Emulate apache behavior
1054
self.send_error(400, "Bad Request")
1056
return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
1057
self, file, file_size, ranges)
1060
class LimitedRangeHTTPServer(http_server.HttpServer):
1061
"""An HttpServer erroring out on requests with too much range specifiers"""
1063
def __init__(self, request_handler=LimitedRangeRequestHandler,
1064
protocol_version=None,
1066
http_server.HttpServer.__init__(self, request_handler,
1067
protocol_version=protocol_version)
1068
self.range_limit = range_limit
1071
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
1072
"""Tests readv requests against a server erroring out on too much ranges."""
1074
# Requests with more range specifiers will error out
1077
730
def create_transport_readonly_server(self):
1078
return LimitedRangeHTTPServer(range_limit=self.range_limit,
1079
protocol_version=self._protocol_version)
731
# Requests with more range specifiers will error out
732
return LimitedRangeHTTPServer(range_limit=self.range_limit)
1081
734
def get_transport(self):
1082
735
return self._transport(self.get_readonly_server().get_url())
1084
737
def setUp(self):
1085
http_utils.TestCaseWithWebserver.setUp(self)
738
TestCaseWithWebserver.setUp(self)
1086
739
# We need to manipulate ranges that correspond to real chunks in the
1087
740
# response, so we build a content appropriately.
1088
filler = ''.join(['abcdefghij' for x in range(102)])
741
filler = ''.join(['abcdefghij' for _ in range(102)])
1089
742
content = ''.join(['%04d' % v + filler for v in range(16)])
1090
743
self.build_tree_contents([('a', content)],)
1602
1297
# Only one 'Authentication Required' error should occur
1603
1298
self.assertEqual(1, self.server.auth_required_errors)
1605
def _check_password_prompt(self, scheme, user, actual_prompt):
1606
expected_prompt = (self._password_prompt_prefix
1607
+ ("%s %s@%s:%d, Realm: '%s' password: "
1609
user, self.server.host, self.server.port,
1610
self.server.auth_realm)))
1611
self.assertEqual(expected_prompt, actual_prompt)
1613
def _expected_username_prompt(self, scheme):
1614
return (self._username_prompt_prefix
1615
+ "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1616
self.server.host, self.server.port,
1617
self.server.auth_realm))
1619
def test_no_prompt_for_password_when_using_auth_config(self):
1620
if self._testing_pycurl():
1621
raise tests.TestNotApplicable(
1622
'pycurl does not support authentication.conf'
1623
' since it cannot prompt')
1627
stdin_content = 'bar\n' # Not the right password
1628
self.server.add_user(user, password)
1629
t = self.get_user_transport(user, None)
1630
ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1631
stderr=tests.StringIOWrapper())
1632
# Create a minimal config file with the right password
1633
conf = config.AuthenticationConfig()
1634
conf._get_config().update(
1635
{'httptest': {'scheme': 'http', 'port': self.server.port,
1636
'user': user, 'password': password}})
1638
# Issue a request to the server to connect
1639
self.assertEqual('contents of a\n',t.get('a').read())
1640
# stdin should have been left untouched
1641
self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
1642
# Only one 'Authentication Required' error should occur
1643
self.assertEqual(1, self.server.auth_required_errors)
1645
def test_user_from_auth_conf(self):
1646
if self._testing_pycurl():
1647
raise tests.TestNotApplicable(
1648
'pycurl does not support authentication.conf')
1651
self.server.add_user(user, password)
1652
# Create a minimal config file with the right password
1653
conf = config.AuthenticationConfig()
1654
conf._get_config().update(
1655
{'httptest': {'scheme': 'http', 'port': self.server.port,
1656
'user': user, 'password': password}})
1658
t = self.get_user_transport(None, None)
1659
# Issue a request to the server to connect
1660
self.assertEqual('contents of a\n', t.get('a').read())
1661
# Only one 'Authentication Required' error should occur
1662
self.assertEqual(1, self.server.auth_required_errors)
1664
def test_changing_nonce(self):
1665
if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1666
http_utils.ProxyDigestAuthServer):
1667
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1668
if self._testing_pycurl():
1669
raise tests.KnownFailure(
1670
'pycurl does not handle a nonce change')
1671
self.server.add_user('joe', 'foo')
1672
t = self.get_user_transport('joe', 'foo')
1673
self.assertEqual('contents of a\n', t.get('a').read())
1674
self.assertEqual('contents of b\n', t.get('b').read())
1675
# Only one 'Authentication Required' error should have
1677
self.assertEqual(1, self.server.auth_required_errors)
1678
# The server invalidates the current nonce
1679
self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
1680
self.assertEqual('contents of a\n', t.get('a').read())
1681
# Two 'Authentication Required' errors should occur (the
1682
# initial 'who are you' and a second 'who are you' with the new nonce)
1683
self.assertEqual(2, self.server.auth_required_errors)
1301
class TestHTTPAuth(TestAuth):
1302
"""Test HTTP authentication schemes.
1304
Daughter classes MUST inherit from TestCaseWithWebserver too.
1307
_auth_header = 'Authorization'
1310
TestCaseWithWebserver.setUp(self)
1311
self.server = self.get_readonly_server()
1312
TestAuth.setUp(self)
1314
def get_user_transport(self, user=None, password=None):
1315
return self._transport(self.get_user_url(user, password))
1687
1318
class TestProxyAuth(TestAuth):
1688
"""Test proxy authentication schemes."""
1319
"""Test proxy authentication schemes.
1321
Daughter classes MUST also inherit from TestCaseWithWebserver.
1690
1323
_auth_header = 'Proxy-authorization'
1691
_password_prompt_prefix = 'Proxy '
1692
_username_prompt_prefix = 'Proxy '
1694
1325
def setUp(self):
1695
super(TestProxyAuth, self).setUp()
1326
TestCaseWithWebserver.setUp(self)
1327
self.server = self.get_readonly_server()
1696
1328
self._old_env = {}
1697
1329
self.addCleanup(self._restore_env)
1330
TestAuth.setUp(self)
1698
1331
# Override the contents to avoid false positives
1699
1332
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1700
1333
('b', 'not proxied contents of b\n'),
1714
1347
for name, value in self._old_env.iteritems():
1715
1348
osutils.set_or_unset_env(name, value)
1717
def test_empty_pass(self):
1718
if self._testing_pycurl():
1720
if pycurl.version_info()[1] < '7.16.0':
1721
raise tests.KnownFailure(
1722
'pycurl < 7.16.0 does not handle empty proxy passwords')
1723
super(TestProxyAuth, self).test_empty_pass()
1726
class SampleSocket(object):
1727
"""A socket-like object for use in testing the HTTP request handler."""
1729
def __init__(self, socket_read_content):
1730
"""Constructs a sample socket.
1732
:param socket_read_content: a byte sequence
1734
# Use plain python StringIO so we can monkey-patch the close method to
1735
# not discard the contents.
1736
from StringIO import StringIO
1737
self.readfile = StringIO(socket_read_content)
1738
self.writefile = StringIO()
1739
self.writefile.close = lambda: None
1741
def makefile(self, mode='r', bufsize=None):
1743
return self.readfile
1745
return self.writefile
1748
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
1751
super(SmartHTTPTunnellingTest, self).setUp()
1752
# We use the VFS layer as part of HTTP tunnelling tests.
1753
self._captureVar('BZR_NO_SMART_VFS', None)
1754
self.transport_readonly_server = http_utils.HTTPServerWithSmarts
1756
def create_transport_readonly_server(self):
1757
return http_utils.HTTPServerWithSmarts(
1758
protocol_version=self._protocol_version)
1760
def test_open_bzrdir(self):
1761
branch = self.make_branch('relpath')
1762
http_server = self.get_readonly_server()
1763
url = http_server.get_url() + 'relpath'
1764
bd = bzrdir.BzrDir.open(url)
1765
self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1767
def test_bulk_data(self):
1768
# We should be able to send and receive bulk data in a single message.
1769
# The 'readv' command in the smart protocol both sends and receives
1770
# bulk data, so we use that.
1771
self.build_tree(['data-file'])
1772
http_server = self.get_readonly_server()
1773
http_transport = self._transport(http_server.get_url())
1774
medium = http_transport.get_smart_medium()
1775
# Since we provide the medium, the url below will be mostly ignored
1776
# during the test, as long as the path is '/'.
1777
remote_transport = remote.RemoteTransport('bzr://fake_host/',
1780
[(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
1782
def test_http_send_smart_request(self):
1784
post_body = 'hello\n'
1785
expected_reply_body = 'ok\x012\n'
1787
http_server = self.get_readonly_server()
1788
http_transport = self._transport(http_server.get_url())
1789
medium = http_transport.get_smart_medium()
1790
response = medium.send_http_smart_request(post_body)
1791
reply_body = response.read()
1792
self.assertEqual(expected_reply_body, reply_body)
1794
def test_smart_http_server_post_request_handler(self):
1795
httpd = self.get_readonly_server()._get_httpd()
1797
socket = SampleSocket(
1798
'POST /.bzr/smart %s \r\n' % self._protocol_version
1799
# HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
1801
+ 'Content-Length: 6\r\n'
1804
# Beware: the ('localhost', 80) below is the
1805
# client_address parameter, but we don't have one because
1806
# we have defined a socket which is not bound to an
1807
# address. The test framework never uses this client
1808
# address, so far...
1809
request_handler = http_utils.SmartRequestHandler(socket,
1812
response = socket.writefile.getvalue()
1813
self.assertStartsWith(response, '%s 200 ' % self._protocol_version)
1814
# This includes the end of the HTTP headers, and all the body.
1815
expected_end_of_response = '\r\n\r\nok\x012\n'
1816
self.assertEndsWith(response, expected_end_of_response)
1819
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
1820
"""No smart server here request handler."""
1823
self.send_error(403, "Forbidden")
1826
class SmartClientAgainstNotSmartServer(TestSpecificRequestHandler):
1827
"""Test smart client behaviour against an http server without smarts."""
1829
_req_handler_class = ForbiddenRequestHandler
1831
def test_probe_smart_server(self):
1832
"""Test error handling against server refusing smart requests."""
1833
server = self.get_readonly_server()
1834
t = self._transport(server.get_url())
1835
# No need to build a valid smart request here, the server will not even
1836
# try to interpret it.
1837
self.assertRaises(errors.SmartProtocolError,
1838
t.get_smart_medium().send_http_smart_request,
1841
class Test_redirected_to(tests.TestCase):
1843
def test_redirected_to_subdir(self):
1844
t = self._transport('http://www.example.com/foo')
1845
r = t._redirected_to('http://www.example.com/foo',
1846
'http://www.example.com/foo/subdir')
1847
self.assertIsInstance(r, type(t))
1848
# Both transports share the some connection
1849
self.assertEqual(t._get_connection(), r._get_connection())
1851
def test_redirected_to_self_with_slash(self):
1852
t = self._transport('http://www.example.com/foo')
1853
r = t._redirected_to('http://www.example.com/foo',
1854
'http://www.example.com/foo/')
1855
self.assertIsInstance(r, type(t))
1856
# Both transports share the some connection (one can argue that we
1857
# should return the exact same transport here, but that seems
1859
self.assertEqual(t._get_connection(), r._get_connection())
1861
def test_redirected_to_host(self):
1862
t = self._transport('http://www.example.com/foo')
1863
r = t._redirected_to('http://www.example.com/foo',
1864
'http://foo.example.com/foo/subdir')
1865
self.assertIsInstance(r, type(t))
1867
def test_redirected_to_same_host_sibling_protocol(self):
1868
t = self._transport('http://www.example.com/foo')
1869
r = t._redirected_to('http://www.example.com/foo',
1870
'https://www.example.com/foo')
1871
self.assertIsInstance(r, type(t))
1873
def test_redirected_to_same_host_different_protocol(self):
1874
t = self._transport('http://www.example.com/foo')
1875
r = t._redirected_to('http://www.example.com/foo',
1876
'ftp://www.example.com/foo')
1877
self.assertNotEquals(type(r), type(t))
1879
def test_redirected_to_different_host_same_user(self):
1880
t = self._transport('http://joe@www.example.com/foo')
1881
r = t._redirected_to('http://www.example.com/foo',
1882
'https://foo.example.com/foo')
1883
self.assertIsInstance(r, type(t))
1884
self.assertEqual(t._user, r._user)
1887
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1888
"""Request handler for a unique and pre-defined request.
1890
The only thing we care about here is how many bytes travel on the wire. But
1891
since we want to measure it for a real http client, we have to send it
1894
We expect to receive a *single* request nothing more (and we won't even
1895
check what request it is, we just measure the bytes read until an empty
1899
def handle_one_request(self):
1900
tcs = self.server.test_case_server
1901
requestline = self.rfile.readline()
1902
headers = self.MessageClass(self.rfile, 0)
1903
# We just read: the request, the headers, an empty line indicating the
1904
# end of the headers.
1905
bytes_read = len(requestline)
1906
for line in headers.headers:
1907
bytes_read += len(line)
1908
bytes_read += len('\r\n')
1909
if requestline.startswith('POST'):
1910
# The body should be a single line (or we don't know where it ends
1911
# and we don't want to issue a blocking read)
1912
body = self.rfile.readline()
1913
bytes_read += len(body)
1914
tcs.bytes_read = bytes_read
1916
# We set the bytes written *before* issuing the write, the client is
1917
# supposed to consume every produced byte *before* checking that value.
1919
# Doing the oppposite may lead to test failure: we may be interrupted
1920
# after the write but before updating the value. The client can then
1921
# continue and read the value *before* we can update it. And yes,
1922
# this has been observed -- vila 20090129
1923
tcs.bytes_written = len(tcs.canned_response)
1924
self.wfile.write(tcs.canned_response)
1927
class ActivityServerMixin(object):
1929
def __init__(self, protocol_version):
1930
super(ActivityServerMixin, self).__init__(
1931
request_handler=PredefinedRequestHandler,
1932
protocol_version=protocol_version)
1933
# Bytes read and written by the server
1935
self.bytes_written = 0
1936
self.canned_response = None
1939
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
1943
if tests.HTTPSServerFeature.available():
1944
from bzrlib.tests import https_server
1945
class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
1949
class TestActivityMixin(object):
1950
"""Test socket activity reporting.
1952
We use a special purpose server to control the bytes sent and received and
1953
be able to predict the activity on the client socket.
1957
tests.TestCase.setUp(self)
1958
self.server = self._activity_server(self._protocol_version)
1959
self.server.start_server()
1960
self.activities = {}
1961
def report_activity(t, bytes, direction):
1962
count = self.activities.get(direction, 0)
1964
self.activities[direction] = count
1966
# We override at class level because constructors may propagate the
1967
# bound method and render instance overriding ineffective (an
1968
# alternative would be to define a specific ui factory instead...)
1969
self.orig_report_activity = self._transport._report_activity
1970
self._transport._report_activity = report_activity
1973
self._transport._report_activity = self.orig_report_activity
1974
self.server.stop_server()
1975
tests.TestCase.tearDown(self)
1977
def get_transport(self):
1978
return self._transport(self.server.get_url())
1980
def assertActivitiesMatch(self):
1981
self.assertEqual(self.server.bytes_read,
1982
self.activities.get('write', 0), 'written bytes')
1983
self.assertEqual(self.server.bytes_written,
1984
self.activities.get('read', 0), 'read bytes')
1987
self.server.canned_response = '''HTTP/1.1 200 OK\r
1988
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
1989
Server: Apache/2.0.54 (Fedora)\r
1990
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
1991
ETag: "56691-23-38e9ae00"\r
1992
Accept-Ranges: bytes\r
1993
Content-Length: 35\r
1995
Content-Type: text/plain; charset=UTF-8\r
1997
Bazaar-NG meta directory, format 1
1999
t = self.get_transport()
2000
self.assertEqual('Bazaar-NG meta directory, format 1\n',
2001
t.get('foo/bar').read())
2002
self.assertActivitiesMatch()
2005
self.server.canned_response = '''HTTP/1.1 200 OK\r
2006
Server: SimpleHTTP/0.6 Python/2.5.2\r
2007
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
2008
Content-type: application/octet-stream\r
2009
Content-Length: 20\r
2010
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
2013
t = self.get_transport()
2014
self.assertTrue(t.has('foo/bar'))
2015
self.assertActivitiesMatch()
2017
def test_readv(self):
2018
self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
2019
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
2020
Server: Apache/2.0.54 (Fedora)\r
2021
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
2022
ETag: "238a3c-16ec2-805c5540"\r
2023
Accept-Ranges: bytes\r
2024
Content-Length: 1534\r
2026
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
2029
--418470f848b63279b\r
2030
Content-type: text/plain; charset=UTF-8\r
2031
Content-range: bytes 0-254/93890\r
2033
mbp@sourcefrog.net-20050309040815-13242001617e4a06
2034
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
2035
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
2036
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
2037
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
2039
--418470f848b63279b\r
2040
Content-type: text/plain; charset=UTF-8\r
2041
Content-range: bytes 1000-2049/93890\r
2044
mbp@sourcefrog.net-20050311063625-07858525021f270b
2045
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
2046
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
2047
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
2048
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
2049
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
2050
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
2051
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
2052
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
2053
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
2054
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
2055
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
2056
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
2057
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
2058
mbp@sourcefrog.net-20050313120651-497bd231b19df600
2059
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
2060
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
2061
mbp@sourcefrog.net-20050314025539-637a636692c055cf
2062
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
2063
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
2065
--418470f848b63279b--\r
2067
t = self.get_transport()
2068
# Remember that the request is ignored and that the ranges below
2069
# doesn't have to match the canned response.
2070
l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
2071
self.assertEqual(2, len(l))
2072
self.assertActivitiesMatch()
2074
def test_post(self):
2075
self.server.canned_response = '''HTTP/1.1 200 OK\r
2076
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
2077
Server: Apache/2.0.54 (Fedora)\r
2078
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
2079
ETag: "56691-23-38e9ae00"\r
2080
Accept-Ranges: bytes\r
2081
Content-Length: 35\r
2083
Content-Type: text/plain; charset=UTF-8\r
2085
lalala whatever as long as itsssss
2087
t = self.get_transport()
2088
# We must send a single line of body bytes, see
2089
# PredefinedRequestHandler.handle_one_request
2090
code, f = t._post('abc def end-of-body\n')
2091
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2092
self.assertActivitiesMatch()
2095
class TestActivity(tests.TestCase, TestActivityMixin):
2098
tests.TestCase.setUp(self)
2099
self.server = self._activity_server(self._protocol_version)
2100
self.server.start_server()
2101
self.activities = {}
2102
def report_activity(t, bytes, direction):
2103
count = self.activities.get(direction, 0)
2105
self.activities[direction] = count
2107
# We override at class level because constructors may propagate the
2108
# bound method and render instance overriding ineffective (an
2109
# alternative would be to define a specific ui factory instead...)
2110
self.orig_report_activity = self._transport._report_activity
2111
self._transport._report_activity = report_activity
2114
self._transport._report_activity = self.orig_report_activity
2115
self.server.stop_server()
2116
tests.TestCase.tearDown(self)
2119
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2122
tests.TestCase.setUp(self)
2123
# Unlike TestActivity, we are really testing ReportingFileSocket and
2124
# ReportingSocket, so we don't need all the parametrization. Since
2125
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2126
# test them through their use by the transport than directly (that's a
2127
# bit less clean but far more simpler and effective).
2128
self.server = ActivityHTTPServer('HTTP/1.1')
2129
self._transport=_urllib.HttpTransport_urllib
2131
self.server.start_server()
2133
# We override at class level because constructors may propagate the
2134
# bound method and render instance overriding ineffective (an
2135
# alternative would be to define a specific ui factory instead...)
2136
self.orig_report_activity = self._transport._report_activity
2137
self._transport._report_activity = None
2140
self._transport._report_activity = self.orig_report_activity
2141
self.server.stop_server()
2142
tests.TestCase.tearDown(self)
2144
def assertActivitiesMatch(self):
2145
# Nothing to check here
2149
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2150
"""Test authentication on the redirected http server."""
2152
_auth_header = 'Authorization'
2153
_password_prompt_prefix = ''
2154
_username_prompt_prefix = ''
2155
_auth_server = http_utils.HTTPBasicAuthServer
2156
_transport = _urllib.HttpTransport_urllib
2158
def create_transport_readonly_server(self):
2159
return self._auth_server()
2161
def create_transport_secondary_server(self):
2162
"""Create the secondary server redirecting to the primary server"""
2163
new = self.get_readonly_server()
2165
redirecting = http_utils.HTTPServerRedirecting()
2166
redirecting.redirect_to(new.host, new.port)
2170
super(TestAuthOnRedirected, self).setUp()
2171
self.build_tree_contents([('a','a'),
2173
('1/a', 'redirected once'),
2175
new_prefix = 'http://%s:%s' % (self.new_server.host,
2176
self.new_server.port)
2177
self.old_server.redirections = [
2178
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2179
self.old_transport = self._transport(self.old_server.get_url())
2180
self.new_server.add_user('joe', 'foo')
2182
def get_a(self, transport):
2183
return transport.get('a')
2185
def test_auth_on_redirected_via_do_catching_redirections(self):
2186
self.redirections = 0
2188
def redirected(transport, exception, redirection_notice):
2189
self.redirections += 1
2190
dir, file = urlutils.split(exception.target)
2191
return self._transport(dir)
2193
stdout = tests.StringIOWrapper()
2194
stderr = tests.StringIOWrapper()
2195
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2196
stdout=stdout, stderr=stderr)
2197
self.assertEqual('redirected once',
2198
transport.do_catching_redirections(
2199
self.get_a, self.old_transport, redirected).read())
2200
self.assertEqual(1, self.redirections)
2201
# stdin should be empty
2202
self.assertEqual('', ui.ui_factory.stdin.readline())
2203
# stdout should be empty, stderr will contains the prompts
2204
self.assertEqual('', stdout.getvalue())
2206
def test_auth_on_redirected_via_following_redirections(self):
2207
self.new_server.add_user('joe', 'foo')
2208
stdout = tests.StringIOWrapper()
2209
stderr = tests.StringIOWrapper()
2210
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2211
stdout=stdout, stderr=stderr)
2212
t = self.old_transport
2213
req = RedirectedRequest('GET', t.abspath('a'))
2214
new_prefix = 'http://%s:%s' % (self.new_server.host,
2215
self.new_server.port)
2216
self.old_server.redirections = [
2217
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2218
self.assertEqual('redirected once',t._perform(req).read())
2219
# stdin should be empty
2220
self.assertEqual('', ui.ui_factory.stdin.readline())
2221
# stdout should be empty, stderr will contains the prompts
2222
self.assertEqual('', stdout.getvalue())
1351
class TestHTTPBasicAuth(TestHTTPAuth, TestCaseWithWebserver):
1352
"""Test http basic authentication scheme"""
1354
_transport = HttpTransport_urllib
1356
def create_transport_readonly_server(self):
1357
return HTTPBasicAuthServer()
1360
class TestHTTPProxyBasicAuth(TestProxyAuth, TestCaseWithWebserver):
1361
"""Test proxy basic authentication scheme"""
1363
_transport = HttpTransport_urllib
1365
def create_transport_readonly_server(self):
1366
return ProxyBasicAuthServer()
1369
class TestDigestAuth(object):
1370
"""Digest Authentication specific tests"""
1372
def test_changing_nonce(self):
1373
self.server.add_user('joe', 'foo')
1374
t = self.get_user_transport('joe', 'foo')
1375
self.assertEqual('contents of a\n', t.get('a').read())
1376
self.assertEqual('contents of b\n', t.get('b').read())
1377
# Only one 'Authentication Required' error should have
1379
self.assertEqual(1, self.server.auth_required_errors)
1380
# The server invalidates the current nonce
1381
self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
1382
self.assertEqual('contents of a\n', t.get('a').read())
1383
# Two 'Authentication Required' errors should occur (the
1384
# initial 'who are you' and a second 'who are you' with the new nonce)
1385
self.assertEqual(2, self.server.auth_required_errors)
1388
class TestHTTPDigestAuth(TestHTTPAuth, TestDigestAuth, TestCaseWithWebserver):
1389
"""Test http digest authentication scheme"""
1391
_transport = HttpTransport_urllib
1393
def create_transport_readonly_server(self):
1394
return HTTPDigestAuthServer()
1397
class TestHTTPProxyDigestAuth(TestProxyAuth, TestDigestAuth,
1398
TestCaseWithWebserver):
1399
"""Test proxy digest authentication scheme"""
1401
_transport = HttpTransport_urllib
1403
def create_transport_readonly_server(self):
1404
return ProxyDigestAuthServer()