82
78
transport_scenarios = [
83
79
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
84
80
_server=http_server.HttpServer_urllib,
85
_qualified_prefix='http+urllib',)),
81
_url_protocol='http+urllib',)),
83
if features.pycurl.available():
88
84
transport_scenarios.append(
89
85
('pycurl', dict(_transport=PyCurlTransport,
90
86
_server=http_server.HttpServer_PyCurl,
91
_qualified_prefix='http+pycurl',)))
87
_url_protocol='http+pycurl',)))
92
88
tests.multiply_tests(t_tests, transport_scenarios, result)
90
protocol_scenarios = [
91
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
92
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
95
# some tests are parametrized by the protocol version only
96
p_tests, remaining_tests = tests.split_suite_by_condition(
97
remaining_tests, tests.condition_isinstance((
100
tests.multiply_tests(p_tests, protocol_scenarios, result)
94
102
# each implementation tested with each HTTP version
95
103
tp_tests, remaining_tests = tests.split_suite_by_condition(
96
104
remaining_tests, tests.condition_isinstance((
106
114
TestSpecificRequestHandler,
108
protocol_scenarios = [
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
112
116
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
113
117
protocol_scenarios)
114
118
tests.multiply_tests(tp_tests, tp_scenarios, result)
120
# proxy auth: each auth scheme on all http versions on all implementations.
121
tppa_tests, remaining_tests = tests.split_suite_by_condition(
122
remaining_tests, tests.condition_isinstance((
125
proxy_auth_scheme_scenarios = [
126
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
127
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
129
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
131
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
132
proxy_auth_scheme_scenarios)
133
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
116
135
# auth: each auth scheme on all http versions on all implementations.
117
136
tpa_tests, remaining_tests = tests.split_suite_by_condition(
118
137
remaining_tests, tests.condition_isinstance((
121
140
auth_scheme_scenarios = [
122
('basic', dict(_auth_scheme='basic')),
123
('digest', dict(_auth_scheme='digest')),
141
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
142
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
144
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
125
146
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
auth_scheme_scenarios)
147
auth_scheme_scenarios)
127
148
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
129
# activity: activity on all http versions on all implementations
150
# activity: on all http[s] versions on all implementations
130
151
tpact_tests, remaining_tests = tests.split_suite_by_condition(
131
152
remaining_tests, tests.condition_isinstance((
134
155
activity_scenarios = [
135
('http', dict(_activity_server=ActivityHTTPServer)),
156
('urllib,http', dict(_activity_server=ActivityHTTPServer,
157
_transport=_urllib.HttpTransport_urllib,)),
137
159
if tests.HTTPSServerFeature.available():
138
160
activity_scenarios.append(
139
('https', dict(_activity_server=ActivityHTTPSServer)))
140
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
161
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
162
_transport=_urllib.HttpTransport_urllib,)),)
163
if features.pycurl.available():
164
activity_scenarios.append(
165
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
166
_transport=PyCurlTransport,)),)
167
if tests.HTTPSServerFeature.available():
168
from bzrlib.tests import (
171
# FIXME: Until we have a better way to handle self-signed
172
# certificates (like allowing them in a test specific
173
# authentication.conf for example), we need some specialized pycurl
174
# transport for tests.
175
class HTTPS_pycurl_transport(PyCurlTransport):
177
def __init__(self, base, _from_transport=None):
178
super(HTTPS_pycurl_transport, self).__init__(
179
base, _from_transport)
180
self.cabundle = str(ssl_certs.build_path('ca.crt'))
182
activity_scenarios.append(
183
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
184
_transport=HTTPS_pycurl_transport,)),)
186
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
142
188
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
144
190
# No parametrization for the remaining tests
175
221
self.received_bytes = ''
225
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
227
def start_server(self):
178
228
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
179
229
self._sock.bind(('127.0.0.1', 0))
180
230
self.host, self.port = self._sock.getsockname()
181
231
self._ready = threading.Event()
182
self._thread = threading.Thread(target=self._accept_read_and_reply)
183
self._thread.setDaemon(True)
232
self._thread = test_server.ThreadWithException(
233
event=self._ready, target=self._accept_read_and_reply)
184
234
self._thread.start()
235
if 'threads' in tests.selftest_debug_flags:
236
sys.stderr.write('Thread started: %s\n' % (self._thread.ident,))
187
239
def _accept_read_and_reply(self):
188
240
self._sock.listen(1)
189
241
self._ready.set()
190
self._sock.settimeout(5)
192
conn, address = self._sock.accept()
193
# On win32, the accepted connection will be non-blocking to start
194
# with because we're using settimeout.
195
conn.setblocking(True)
242
conn, address = self._sock.accept()
243
if self._expect_body_tail is not None:
196
244
while not self.received_bytes.endswith(self._expect_body_tail):
197
245
self.received_bytes += conn.recv(4096)
198
246
conn.sendall('HTTP/1.1 200 OK\r\n')
199
except socket.timeout:
200
# Make sure the client isn't stuck waiting for us to e.g. accept.
201
248
self._sock.close()
202
249
except socket.error:
203
250
# The client may have already closed the socket.
253
def stop_server(self):
255
# Issue a fake connection to wake up the server and allow it to
257
fake_conn = osutils.connect_socket((self.host, self.port))
209
259
except socket.error:
210
260
# We might have already closed it. We don't care.
265
if 'threads' in tests.selftest_debug_flags:
266
sys.stderr.write('Thread joined: %s\n' % (self._thread.ident,))
216
269
class TestAuthHeader(tests.TestCase):
218
def parse_header(self, header):
219
ah = _urllib2_wrappers.AbstractAuthHandler()
220
return ah._parse_auth_header(header)
271
def parse_header(self, header, auth_handler_class=None):
272
if auth_handler_class is None:
273
auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
274
self.auth_handler = auth_handler_class()
275
return self.auth_handler._parse_auth_header(header)
222
277
def test_empty_header(self):
223
278
scheme, remainder = self.parse_header('')
224
self.assertEquals('', scheme)
279
self.assertEqual('', scheme)
225
280
self.assertIs(None, remainder)
227
282
def test_negotiate_header(self):
228
283
scheme, remainder = self.parse_header('Negotiate')
229
self.assertEquals('negotiate', scheme)
284
self.assertEqual('negotiate', scheme)
230
285
self.assertIs(None, remainder)
232
287
def test_basic_header(self):
233
288
scheme, remainder = self.parse_header(
234
289
'Basic realm="Thou should not pass"')
235
self.assertEquals('basic', scheme)
236
self.assertEquals('realm="Thou should not pass"', remainder)
290
self.assertEqual('basic', scheme)
291
self.assertEqual('realm="Thou should not pass"', remainder)
293
def test_basic_extract_realm(self):
294
scheme, remainder = self.parse_header(
295
'Basic realm="Thou should not pass"',
296
_urllib2_wrappers.BasicAuthHandler)
297
match, realm = self.auth_handler.extract_realm(remainder)
298
self.assertTrue(match is not None)
299
self.assertEqual('Thou should not pass', realm)
238
301
def test_digest_header(self):
239
302
scheme, remainder = self.parse_header(
240
303
'Digest realm="Thou should not pass"')
241
self.assertEquals('digest', scheme)
242
self.assertEquals('realm="Thou should not pass"', remainder)
304
self.assertEqual('digest', scheme)
305
self.assertEqual('realm="Thou should not pass"', remainder)
245
308
class TestHTTPServer(tests.TestCase):
251
314
protocol_version = 'HTTP/0.1'
253
server = http_server.HttpServer(BogusRequestHandler)
255
self.assertRaises(httplib.UnknownProtocol,server.setUp)
258
self.fail('HTTP Server creation did not raise UnknownProtocol')
316
self.assertRaises(httplib.UnknownProtocol,
317
http_server.HttpServer, BogusRequestHandler)
260
319
def test_force_invalid_protocol(self):
261
server = http_server.HttpServer(protocol_version='HTTP/0.1')
263
self.assertRaises(httplib.UnknownProtocol,server.setUp)
266
self.fail('HTTP Server creation did not raise UnknownProtocol')
320
self.assertRaises(httplib.UnknownProtocol,
321
http_server.HttpServer, protocol_version='HTTP/0.1')
268
323
def test_server_start_and_stop(self):
269
324
server = http_server.HttpServer()
271
self.assertTrue(server._http_running)
273
self.assertFalse(server._http_running)
325
self.addCleanup(server.stop_server)
326
server.start_server()
327
self.assertTrue(server.server is not None)
328
self.assertTrue(server.server.serving is not None)
329
self.assertTrue(server.server.serving)
275
331
def test_create_http_server_one_zero(self):
276
332
class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
339
388
def test_url_parsing(self):
340
389
f = FakeManager()
341
390
url = http.extract_auth('http://example.com', f)
342
self.assertEquals('http://example.com', url)
343
self.assertEquals(0, len(f.credentials))
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
344
393
url = http.extract_auth(
345
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
346
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
347
self.assertEquals(1, len(f.credentials))
348
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
352
401
class TestHttpTransportUrls(tests.TestCase):
399
448
https by supplying a fake version_info that do not
405
raise tests.TestSkipped('pycurl not present')
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
407
version_info_orig = pycurl.version_info
409
# Now that we have pycurl imported, we can fake its version_info
410
# This was taken from a windows pycurl without SSL
412
pycurl.version_info = lambda : (2,
420
('ftp', 'gopher', 'telnet',
421
'dict', 'ldap', 'http', 'file'),
425
self.assertRaises(errors.DependencyNotPresent, self._transport,
426
'https://launchpad.net')
428
# Restore the right function
429
pycurl.version_info = version_info_orig
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
432
475
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
491
534
class TestPost(tests.TestCase):
493
536
def test_post_body_is_received(self):
494
server = RecordingServer(expect_body_tail='end-of-body')
496
self.addCleanup(server.tearDown)
497
scheme = self._qualified_prefix
498
url = '%s://%s:%s/' % (scheme, server.host, server.port)
499
http_transport = self._transport(url)
537
server = RecordingServer(expect_body_tail='end-of-body',
538
scheme=self._url_protocol)
539
self.start_server(server)
540
url = server.get_url()
541
# FIXME: needs a cleanup -- vila 20100611
542
http_transport = transport.get_transport(url)
500
543
code, response = http_transport._post('abc def end-of-body')
502
545
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
546
589
_req_handler_class = http_server.TestingHTTPRequestHandler
548
591
def create_transport_readonly_server(self):
549
return http_server.HttpServer(self._req_handler_class,
550
protocol_version=self._protocol_version)
592
server = http_server.HttpServer(self._req_handler_class,
593
protocol_version=self._protocol_version)
594
server._url_protocol = self._url_protocol
552
597
def _testing_pycurl(self):
553
return pycurl_present and self._transport == PyCurlTransport
598
# TODO: This is duplicated for lots of the classes in this file
599
return (features.pycurl.available()
600
and self._transport == PyCurlTransport)
556
603
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
557
604
"""Whatever request comes in, close the connection"""
559
def handle_one_request(self):
606
def _handle_one_request(self):
560
607
"""Handle a single HTTP request, by abruptly closing the connection"""
561
608
self.close_connection = 1
567
614
_req_handler_class = WallRequestHandler
569
616
def test_http_has(self):
570
server = self.get_readonly_server()
571
t = self._transport(server.get_url())
617
t = self.get_readonly_transport()
572
618
# Unfortunately httplib (see HTTPResponse._read_status
573
619
# for details) make no distinction between a closed
574
620
# socket and badly formatted status line, so we can't
575
621
# just test for ConnectionError, we have to test
576
# InvalidHttpResponse too.
577
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
622
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
623
# instead of ConnectionError too.
624
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
625
errors.InvalidHttpResponse),
578
626
t.has, 'foo/bar')
580
628
def test_http_get(self):
581
server = self.get_readonly_server()
582
t = self._transport(server.get_url())
583
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
629
t = self.get_readonly_transport()
630
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
631
errors.InvalidHttpResponse),
584
632
t.get, 'foo/bar')
662
700
_req_handler_class = BadProtocolRequestHandler
665
if pycurl_present and self._transport == PyCurlTransport:
703
if self._testing_pycurl():
666
704
raise tests.TestNotApplicable(
667
705
"pycurl doesn't check the protocol version")
668
706
super(TestBadProtocolServer, self).setUp()
670
708
def test_http_has(self):
671
server = self.get_readonly_server()
672
t = self._transport(server.get_url())
709
t = self.get_readonly_transport()
673
710
self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
675
712
def test_http_get(self):
676
server = self.get_readonly_server()
677
t = self._transport(server.get_url())
713
t = self.get_readonly_transport()
678
714
self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
712
746
self.assertEqual(None, server.host)
713
747
self.assertEqual(None, server.port)
715
def test_setUp_and_tearDown(self):
749
def test_setUp_and_stop(self):
716
750
server = RecordingServer(expect_body_tail=None)
751
server.start_server()
719
753
self.assertNotEqual(None, server.host)
720
754
self.assertNotEqual(None, server.port)
723
757
self.assertEqual(None, server.host)
724
758
self.assertEqual(None, server.port)
726
760
def test_send_receive_bytes(self):
727
server = RecordingServer(expect_body_tail='c')
729
self.addCleanup(server.tearDown)
761
server = RecordingServer(expect_body_tail='c', scheme='http')
762
self.start_server(server)
730
763
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
731
764
sock.connect((server.host, server.port))
732
765
sock.sendall('abc')
1044
1071
self.build_tree_contents([('a', content)],)
1046
1073
def test_few_ranges(self):
1047
t = self.get_transport()
1074
t = self.get_readonly_transport()
1048
1075
l = list(t.readv('a', ((0, 4), (1024, 4), )))
1049
1076
self.assertEqual(l[0], (0, '0000'))
1050
1077
self.assertEqual(l[1], (1024, '0001'))
1051
1078
self.assertEqual(1, self.get_readonly_server().GET_request_nb)
1053
1080
def test_more_ranges(self):
1054
t = self.get_transport()
1081
t = self.get_readonly_transport()
1055
1082
l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
1056
1083
self.assertEqual(l[0], (0, '0000'))
1057
1084
self.assertEqual(l[1], (1024, '0001'))
1113
1137
# FIXME: We don't have an https server available, so we don't
1114
# test https connections.
1138
# test https connections. --vila toolongago
1116
1140
def setUp(self):
1117
1141
super(TestProxyHttpServer, self).setUp()
1142
self.transport_secondary_server = http_utils.ProxyServer
1118
1143
self.build_tree_contents([('foo', 'contents of foo\n'),
1119
1144
('foo-proxied', 'proxied contents of foo\n')])
1120
1145
# Let's setup some attributes for tests
1121
self.server = self.get_readonly_server()
1122
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1146
server = self.get_readonly_server()
1147
self.server_host_port = '%s:%d' % (server.host, server.port)
1123
1148
if self._testing_pycurl():
1124
1149
# Oh my ! pycurl does not check for the port as part of
1125
1150
# no_proxy :-( So we just test the host part
1126
self.no_proxy_host = 'localhost'
1151
self.no_proxy_host = server.host
1128
self.no_proxy_host = self.proxy_address
1153
self.no_proxy_host = self.server_host_port
1129
1154
# The secondary server is the proxy
1130
self.proxy = self.get_secondary_server()
1131
self.proxy_url = self.proxy.get_url()
1155
self.proxy_url = self.get_secondary_url()
1132
1156
self._old_env = {}
1134
1158
def _testing_pycurl(self):
1135
return pycurl_present and self._transport == PyCurlTransport
1137
def create_transport_secondary_server(self):
1138
"""Creates an http server that will serve files with
1139
'-proxied' appended to their names.
1141
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1159
# TODO: This is duplicated for lots of the classes in this file
1160
return (features.pycurl.available()
1161
and self._transport == PyCurlTransport)
1143
1163
def _install_env(self, env):
1144
1164
for name, value in env.iteritems():
1221
1239
def setUp(self):
1222
1240
http_utils.TestCaseWithWebserver.setUp(self)
1223
1241
self.build_tree_contents([('a', '0123456789')],)
1224
server = self.get_readonly_server()
1225
self.transport = self._transport(server.get_url())
1227
1243
def create_transport_readonly_server(self):
1228
1244
return http_server.HttpServer(protocol_version=self._protocol_version)
1230
1246
def _file_contents(self, relpath, ranges):
1247
t = self.get_readonly_transport()
1231
1248
offsets = [ (start, end - start + 1) for start, end in ranges]
1232
coalesce = self.transport._coalesce_offsets
1249
coalesce = t._coalesce_offsets
1233
1250
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1234
code, data = self.transport._get(relpath, coalesced)
1251
code, data = t._get(relpath, coalesced)
1235
1252
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1236
1253
for start, end in ranges:
1237
1254
data.seek(start)
1238
1255
yield data.read(end - start + 1)
1240
1257
def _file_tail(self, relpath, tail_amount):
1241
code, data = self.transport._get(relpath, [], tail_amount)
1258
t = self.get_readonly_transport()
1259
code, data = t._get(relpath, [], tail_amount)
1242
1260
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1243
1261
data.seek(-tail_amount, 2)
1244
1262
return data.read(tail_amount)
1263
1281
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
1264
1282
"""Test redirection between http servers."""
1266
def create_transport_secondary_server(self):
1267
"""Create the secondary server redirecting to the primary server"""
1268
new = self.get_readonly_server()
1270
redirecting = http_utils.HTTPServerRedirecting(
1271
protocol_version=self._protocol_version)
1272
redirecting.redirect_to(new.host, new.port)
1275
1284
def setUp(self):
1276
1285
super(TestHTTPRedirections, self).setUp()
1277
1286
self.build_tree_contents([('a', '0123456789'),
1279
1288
'# Bazaar revision bundle v0.9\n#\n')
1281
# The requests to the old server will be redirected to the new server
1282
self.old_transport = self._transport(self.old_server.get_url())
1284
1291
def test_redirected(self):
1285
self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
1286
t = self._transport(self.new_server.get_url())
1287
self.assertEqual('0123456789', t.get('a').read())
1289
def test_read_redirected_bundle_from_url(self):
1290
from bzrlib.bundle import read_bundle_from_url
1291
url = self.old_transport.abspath('bundle')
1292
bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1293
read_bundle_from_url, url)
1294
# If read_bundle_from_url was successful we get an empty bundle
1295
self.assertEqual([], bundle.revisions)
1292
self.assertRaises(errors.RedirectRequested,
1293
self.get_old_transport().get, 'a')
1294
self.assertEqual('0123456789', self.get_new_transport().get('a').read())
1298
1297
class RedirectedRequest(_urllib2_wrappers.Request):
1307
1306
# Since the tests using this class will replace
1308
1307
# _urllib2_wrappers.Request, we can't just call the base class __init__
1309
1308
# or we'll loop.
1310
RedirectedRequest.init_orig(self, method, url, args, kwargs)
1309
RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1311
1310
self.follow_redirections = True
1313
def install_redirected_request(test):
1314
test.overrideAttr(_urllib2_wrappers, 'Request', RedirectedRequest)
1317
def cleanup_http_redirection_connections(test):
1318
# Some sockets are opened but never seen by _urllib, so we trap them at
1319
# the _urllib2_wrappers level to be able to clean them up.
1320
def socket_disconnect(sock):
1322
sock.shutdown(socket.SHUT_RDWR)
1324
except socket.error:
1326
def connect(connection):
1327
test.http_connect_orig(connection)
1328
test.addCleanup(socket_disconnect, connection.sock)
1329
test.http_connect_orig = test.overrideAttr(
1330
_urllib2_wrappers.HTTPConnection, 'connect', connect)
1331
def connect(connection):
1332
test.https_connect_orig(connection)
1333
test.addCleanup(socket_disconnect, connection.sock)
1334
test.https_connect_orig = test.overrideAttr(
1335
_urllib2_wrappers.HTTPSConnection, 'connect', connect)
1314
1338
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1315
1339
"""Test redirections.
1328
1352
def setUp(self):
1329
if pycurl_present and self._transport == PyCurlTransport:
1353
if (features.pycurl.available()
1354
and self._transport == PyCurlTransport):
1330
1355
raise tests.TestNotApplicable(
1331
"pycurl doesn't redirect silently annymore")
1356
"pycurl doesn't redirect silently anymore")
1332
1357
super(TestHTTPSilentRedirections, self).setUp()
1333
self.setup_redirected_request()
1334
self.addCleanup(self.cleanup_redirected_request)
1358
install_redirected_request(self)
1359
cleanup_http_redirection_connections(self)
1335
1360
self.build_tree_contents([('a','a'),
1337
1362
('1/a', 'redirected once'),
1345
1370
('5/a', 'redirected 5 times'),
1348
self.old_transport = self._transport(self.old_server.get_url())
1350
def setup_redirected_request(self):
1351
self.original_class = _urllib2_wrappers.Request
1352
_urllib2_wrappers.Request = RedirectedRequest
1354
def cleanup_redirected_request(self):
1355
_urllib2_wrappers.Request = self.original_class
1357
def create_transport_secondary_server(self):
1358
"""Create the secondary server, redirections are defined in the tests"""
1359
return http_utils.HTTPServerRedirecting(
1360
protocol_version=self._protocol_version)
1362
1373
def test_one_redirection(self):
1363
t = self.old_transport
1365
req = RedirectedRequest('GET', t.abspath('a'))
1366
req.follow_redirections = True
1374
t = self.get_old_transport()
1375
req = RedirectedRequest('GET', t._remote_path('a'))
1367
1376
new_prefix = 'http://%s:%s' % (self.new_server.host,
1368
1377
self.new_server.port)
1369
1378
self.old_server.redirections = \
1370
1379
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
1371
self.assertEquals('redirected once',t._perform(req).read())
1380
self.assertEqual('redirected once', t._perform(req).read())
1373
1382
def test_five_redirections(self):
1374
t = self.old_transport
1376
req = RedirectedRequest('GET', t.abspath('a'))
1377
req.follow_redirections = True
1383
t = self.get_old_transport()
1384
req = RedirectedRequest('GET', t._remote_path('a'))
1378
1385
old_prefix = 'http://%s:%s' % (self.old_server.host,
1379
1386
self.old_server.port)
1380
1387
new_prefix = 'http://%s:%s' % (self.new_server.host,
1395
1402
def setUp(self):
1396
1403
super(TestDoCatchRedirections, self).setUp()
1397
1404
self.build_tree_contents([('a', '0123456789'),],)
1399
self.old_transport = self._transport(self.old_server.get_url())
1401
def get_a(self, transport):
1402
return transport.get('a')
1405
cleanup_http_redirection_connections(self)
1407
self.old_transport = self.get_old_transport()
1404
1412
def test_no_redirection(self):
1405
t = self._transport(self.new_server.get_url())
1413
t = self.get_new_transport()
1407
1415
# We use None for redirected so that we fail if redirected
1408
self.assertEquals('0123456789',
1409
transport.do_catching_redirections(
1416
self.assertEqual('0123456789',
1417
transport.do_catching_redirections(
1410
1418
self.get_a, t, None).read())
1412
1420
def test_one_redirection(self):
1413
1421
self.redirections = 0
1415
def redirected(transport, exception, redirection_notice):
1423
def redirected(t, exception, redirection_notice):
1416
1424
self.redirections += 1
1417
dir, file = urlutils.split(exception.target)
1418
return self._transport(dir)
1425
redirected_t = t._redirected_to(exception.source, exception.target)
1420
self.assertEquals('0123456789',
1421
transport.do_catching_redirections(
1428
self.assertEqual('0123456789',
1429
transport.do_catching_redirections(
1422
1430
self.get_a, self.old_transport, redirected).read())
1423
self.assertEquals(1, self.redirections)
1431
self.assertEqual(1, self.redirections)
1425
1433
def test_redirection_loop(self):
1448
1459
('b', 'contents of b\n'),])
1450
1461
def create_transport_readonly_server(self):
1451
if self._auth_scheme == 'basic':
1452
server = http_utils.HTTPBasicAuthServer(
1453
protocol_version=self._protocol_version)
1455
if self._auth_scheme != 'digest':
1456
raise AssertionError('Unknown auth scheme: %r'
1457
% self._auth_scheme)
1458
server = http_utils.HTTPDigestAuthServer(
1459
protocol_version=self._protocol_version)
1462
server = self._auth_server(protocol_version=self._protocol_version)
1463
server._url_protocol = self._url_protocol
1462
1466
def _testing_pycurl(self):
1463
return pycurl_present and self._transport == PyCurlTransport
1467
# TODO: This is duplicated for lots of the classes in this file
1468
return (features.pycurl.available()
1469
and self._transport == PyCurlTransport)
1465
1471
def get_user_url(self, user, password):
1466
1472
"""Build an url embedding user and password"""
1514
1521
# initial 'who are you' and 'this is not you, who are you')
1515
1522
self.assertEqual(2, self.server.auth_required_errors)
1524
def test_prompt_for_username(self):
1525
if self._testing_pycurl():
1526
raise tests.TestNotApplicable(
1527
'pycurl cannot prompt, it handles auth by embedding'
1528
' user:pass in urls only')
1530
self.server.add_user('joe', 'foo')
1531
t = self.get_user_transport(None, None)
1532
stdout = tests.StringIOWrapper()
1533
stderr = tests.StringIOWrapper()
1534
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1535
stdout=stdout, stderr=stderr)
1536
self.assertEqual('contents of a\n',t.get('a').read())
1537
# stdin should be empty
1538
self.assertEqual('', ui.ui_factory.stdin.readline())
1540
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1541
self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1542
self.assertEqual('', stdout.getvalue())
1543
self._check_password_prompt(t._unqualified_scheme, 'joe',
1517
1546
def test_prompt_for_password(self):
1518
1547
if self._testing_pycurl():
1519
1548
raise tests.TestNotApplicable(
1523
1552
self.server.add_user('joe', 'foo')
1524
1553
t = self.get_user_transport('joe', None)
1525
1554
stdout = tests.StringIOWrapper()
1526
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1527
self.assertEqual('contents of a\n',t.get('a').read())
1555
stderr = tests.StringIOWrapper()
1556
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1557
stdout=stdout, stderr=stderr)
1558
self.assertEqual('contents of a\n', t.get('a').read())
1528
1559
# stdin should be empty
1529
1560
self.assertEqual('', ui.ui_factory.stdin.readline())
1530
1561
self._check_password_prompt(t._unqualified_scheme, 'joe',
1563
self.assertEqual('', stdout.getvalue())
1532
1564
# And we shouldn't prompt again for a different request
1533
1565
# against the same transport.
1534
1566
self.assertEqual('contents of b\n',t.get('b').read())
1544
1576
% (scheme.upper(),
1545
1577
user, self.server.host, self.server.port,
1546
1578
self.server.auth_realm)))
1547
self.assertEquals(expected_prompt, actual_prompt)
1579
self.assertEqual(expected_prompt, actual_prompt)
1581
def _expected_username_prompt(self, scheme):
1582
return (self._username_prompt_prefix
1583
+ "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1584
self.server.host, self.server.port,
1585
self.server.auth_realm))
1549
1587
def test_no_prompt_for_password_when_using_auth_config(self):
1550
1588
if self._testing_pycurl():
1630
1670
('b-proxied', 'contents of b\n'),
1633
def create_transport_readonly_server(self):
1634
if self._auth_scheme == 'basic':
1635
server = http_utils.ProxyBasicAuthServer(
1636
protocol_version=self._protocol_version)
1638
if self._auth_scheme != 'digest':
1639
raise AssertionError('Unknown auth scheme: %r'
1640
% self._auth_scheme)
1641
server = http_utils.ProxyDigestAuthServer(
1642
protocol_version=self._protocol_version)
1645
1673
def get_user_transport(self, user, password):
1646
1674
self._install_env({'all_proxy': self.get_user_url(user, password)})
1647
return self._transport(self.server.get_url())
1675
return TestAuth.get_user_transport(self, user, password)
1649
1677
def _install_env(self, env):
1650
1678
for name, value in env.iteritems():
1692
1721
# We use the VFS layer as part of HTTP tunnelling tests.
1693
1722
self._captureVar('BZR_NO_SMART_VFS', None)
1694
1723
self.transport_readonly_server = http_utils.HTTPServerWithSmarts
1724
self.http_server = self.get_readonly_server()
1696
1726
def create_transport_readonly_server(self):
1697
return http_utils.HTTPServerWithSmarts(
1727
server = http_utils.HTTPServerWithSmarts(
1698
1728
protocol_version=self._protocol_version)
1729
server._url_protocol = self._url_protocol
1700
1732
def test_open_bzrdir(self):
1701
1733
branch = self.make_branch('relpath')
1702
http_server = self.get_readonly_server()
1703
url = http_server.get_url() + 'relpath'
1734
url = self.http_server.get_url() + 'relpath'
1704
1735
bd = bzrdir.BzrDir.open(url)
1736
self.addCleanup(bd.transport.disconnect)
1705
1737
self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1707
1739
def test_bulk_data(self):
1906
1936
# We override at class level because constructors may propagate the
1907
1937
# bound method and render instance overriding ineffective (an
1908
# alternative would be be to define a specific ui factory instead...)
1909
self.orig_report_activity = self._transport._report_activity
1910
self._transport._report_activity = report_activity
1913
self._transport._report_activity = self.orig_report_activity
1914
self.server.tearDown()
1915
tests.TestCase.tearDown(self)
1938
# alternative would be to define a specific ui factory instead...)
1939
self.overrideAttr(self._transport, '_report_activity', report_activity)
1940
self.addCleanup(self.server.stop_server)
1917
1942
def get_transport(self):
1918
return self._transport(self.server.get_url())
1943
t = self._transport(self.server.get_url())
1944
# FIXME: Needs cleanup -- vila 20100611
1920
1947
def assertActivitiesMatch(self):
1921
1948
self.assertEqual(self.server.bytes_read,
2027
2054
t = self.get_transport()
2028
2055
# We must send a single line of body bytes, see
2029
# PredefinedRequestHandler.handle_one_request
2056
# PredefinedRequestHandler._handle_one_request
2030
2057
code, f = t._post('abc def end-of-body\n')
2031
2058
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2032
2059
self.assertActivitiesMatch()
2062
class TestActivity(tests.TestCase, TestActivityMixin):
2065
TestActivityMixin.setUp(self)
2068
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2070
# Unlike TestActivity, we are really testing ReportingFileSocket and
2071
# ReportingSocket, so we don't need all the parametrization. Since
2072
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2073
# test them through their use by the transport than directly (that's a
2074
# bit less clean but far more simpler and effective).
2075
_activity_server = ActivityHTTPServer
2076
_protocol_version = 'HTTP/1.1'
2079
self._transport =_urllib.HttpTransport_urllib
2080
TestActivityMixin.setUp(self)
2082
def assertActivitiesMatch(self):
2083
# Nothing to check here
2087
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2088
"""Test authentication on the redirected http server."""
2090
_auth_header = 'Authorization'
2091
_password_prompt_prefix = ''
2092
_username_prompt_prefix = ''
2093
_auth_server = http_utils.HTTPBasicAuthServer
2094
_transport = _urllib.HttpTransport_urllib
2097
super(TestAuthOnRedirected, self).setUp()
2098
self.build_tree_contents([('a','a'),
2100
('1/a', 'redirected once'),
2102
new_prefix = 'http://%s:%s' % (self.new_server.host,
2103
self.new_server.port)
2104
self.old_server.redirections = [
2105
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2106
self.old_transport = self.get_old_transport()
2107
self.new_server.add_user('joe', 'foo')
2108
cleanup_http_redirection_connections(self)
2110
def create_transport_readonly_server(self):
2111
server = self._auth_server(protocol_version=self._protocol_version)
2112
server._url_protocol = self._url_protocol
2118
def test_auth_on_redirected_via_do_catching_redirections(self):
2119
self.redirections = 0
2121
def redirected(t, exception, redirection_notice):
2122
self.redirections += 1
2123
redirected_t = t._redirected_to(exception.source, exception.target)
2124
self.addCleanup(redirected_t.disconnect)
2127
stdout = tests.StringIOWrapper()
2128
stderr = tests.StringIOWrapper()
2129
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2130
stdout=stdout, stderr=stderr)
2131
self.assertEqual('redirected once',
2132
transport.do_catching_redirections(
2133
self.get_a, self.old_transport, redirected).read())
2134
self.assertEqual(1, self.redirections)
2135
# stdin should be empty
2136
self.assertEqual('', ui.ui_factory.stdin.readline())
2137
# stdout should be empty, stderr will contains the prompts
2138
self.assertEqual('', stdout.getvalue())
2140
def test_auth_on_redirected_via_following_redirections(self):
2141
self.new_server.add_user('joe', 'foo')
2142
stdout = tests.StringIOWrapper()
2143
stderr = tests.StringIOWrapper()
2144
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2145
stdout=stdout, stderr=stderr)
2146
t = self.old_transport
2147
req = RedirectedRequest('GET', t.abspath('a'))
2148
new_prefix = 'http://%s:%s' % (self.new_server.host,
2149
self.new_server.port)
2150
self.old_server.redirections = [
2151
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2152
self.assertEqual('redirected once', t._perform(req).read())
2153
# stdin should be empty
2154
self.assertEqual('', ui.ui_factory.stdin.readline())
2155
# stdout should be empty, stderr will contains the prompts
2156
self.assertEqual('', stdout.getvalue())