78
82
transport_scenarios = [
79
83
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
80
84
_server=http_server.HttpServer_urllib,
81
_url_protocol='http+urllib',)),
85
_qualified_prefix='http+urllib',)),
83
if features.pycurl.available():
84
88
transport_scenarios.append(
85
89
('pycurl', dict(_transport=PyCurlTransport,
86
90
_server=http_server.HttpServer_PyCurl,
87
_url_protocol='http+pycurl',)))
91
_qualified_prefix='http+pycurl',)))
88
92
tests.multiply_tests(t_tests, transport_scenarios, result)
90
protocol_scenarios = [
91
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
92
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
95
# some tests are parametrized by the protocol version only
96
p_tests, remaining_tests = tests.split_suite_by_condition(
97
remaining_tests, tests.condition_isinstance((
100
tests.multiply_tests(p_tests, protocol_scenarios, result)
102
94
# each implementation tested with each HTTP version
103
95
tp_tests, remaining_tests = tests.split_suite_by_condition(
104
96
remaining_tests, tests.condition_isinstance((
114
106
TestSpecificRequestHandler,
108
protocol_scenarios = [
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
116
112
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
117
113
protocol_scenarios)
118
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
120
# proxy auth: each auth scheme on all http versions on all implementations.
121
tppa_tests, remaining_tests = tests.split_suite_by_condition(
122
remaining_tests, tests.condition_isinstance((
125
proxy_auth_scheme_scenarios = [
126
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
127
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
129
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
131
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
132
proxy_auth_scheme_scenarios)
133
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
135
116
# auth: each auth scheme on all http versions on all implementations.
136
117
tpa_tests, remaining_tests = tests.split_suite_by_condition(
137
118
remaining_tests, tests.condition_isinstance((
140
121
auth_scheme_scenarios = [
141
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
142
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
144
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
122
('basic', dict(_auth_scheme='basic')),
123
('digest', dict(_auth_scheme='digest')),
146
125
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
147
auth_scheme_scenarios)
126
auth_scheme_scenarios)
148
127
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
150
# activity: on all http[s] versions on all implementations
129
# activity: activity on all http versions on all implementations
151
130
tpact_tests, remaining_tests = tests.split_suite_by_condition(
152
131
remaining_tests, tests.condition_isinstance((
155
134
activity_scenarios = [
156
('urllib,http', dict(_activity_server=ActivityHTTPServer,
157
_transport=_urllib.HttpTransport_urllib,)),
135
('http', dict(_activity_server=ActivityHTTPServer)),
159
137
if tests.HTTPSServerFeature.available():
160
138
activity_scenarios.append(
161
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
162
_transport=_urllib.HttpTransport_urllib,)),)
163
if features.pycurl.available():
164
activity_scenarios.append(
165
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
166
_transport=PyCurlTransport,)),)
167
if tests.HTTPSServerFeature.available():
168
from bzrlib.tests import (
171
# FIXME: Until we have a better way to handle self-signed
172
# certificates (like allowing them in a test specific
173
# authentication.conf for example), we need some specialized pycurl
174
# transport for tests.
175
class HTTPS_pycurl_transport(PyCurlTransport):
177
def __init__(self, base, _from_transport=None):
178
super(HTTPS_pycurl_transport, self).__init__(
179
base, _from_transport)
180
self.cabundle = str(ssl_certs.build_path('ca.crt'))
182
activity_scenarios.append(
183
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
184
_transport=HTTPS_pycurl_transport,)),)
186
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
139
('https', dict(_activity_server=ActivityHTTPSServer)))
140
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
188
142
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
190
144
# No parametrization for the remaining tests
221
175
self.received_bytes = ''
225
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
227
def start_server(self):
228
178
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
229
179
self._sock.bind(('127.0.0.1', 0))
230
180
self.host, self.port = self._sock.getsockname()
231
181
self._ready = threading.Event()
232
self._thread = test_server.ThreadWithException(
233
event=self._ready, target=self._accept_read_and_reply)
182
self._thread = threading.Thread(target=self._accept_read_and_reply)
183
self._thread.setDaemon(True)
234
184
self._thread.start()
235
if 'threads' in tests.selftest_debug_flags:
236
sys.stderr.write('Thread started: %s\n' % (self._thread.ident,))
239
187
def _accept_read_and_reply(self):
240
188
self._sock.listen(1)
241
189
self._ready.set()
242
conn, address = self._sock.accept()
243
if self._expect_body_tail is not None:
190
self._sock.settimeout(5)
192
conn, address = self._sock.accept()
193
# On win32, the accepted connection will be non-blocking to start
194
# with because we're using settimeout.
195
conn.setblocking(True)
244
196
while not self.received_bytes.endswith(self._expect_body_tail):
245
197
self.received_bytes += conn.recv(4096)
246
198
conn.sendall('HTTP/1.1 200 OK\r\n')
199
except socket.timeout:
200
# Make sure the client isn't stuck waiting for us to e.g. accept.
248
201
self._sock.close()
249
202
except socket.error:
250
203
# The client may have already closed the socket.
253
def stop_server(self):
255
# Issue a fake connection to wake up the server and allow it to
257
fake_conn = osutils.connect_socket((self.host, self.port))
259
209
except socket.error:
260
210
# We might have already closed it. We don't care.
265
if 'threads' in tests.selftest_debug_flags:
266
sys.stderr.write('Thread joined: %s\n' % (self._thread.ident,))
269
216
class TestAuthHeader(tests.TestCase):
271
def parse_header(self, header, auth_handler_class=None):
272
if auth_handler_class is None:
273
auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
274
self.auth_handler = auth_handler_class()
275
return self.auth_handler._parse_auth_header(header)
218
def parse_header(self, header):
219
ah = _urllib2_wrappers.AbstractAuthHandler()
220
return ah._parse_auth_header(header)
277
222
def test_empty_header(self):
278
223
scheme, remainder = self.parse_header('')
279
self.assertEqual('', scheme)
224
self.assertEquals('', scheme)
280
225
self.assertIs(None, remainder)
282
227
def test_negotiate_header(self):
283
228
scheme, remainder = self.parse_header('Negotiate')
284
self.assertEqual('negotiate', scheme)
229
self.assertEquals('negotiate', scheme)
285
230
self.assertIs(None, remainder)
287
232
def test_basic_header(self):
288
233
scheme, remainder = self.parse_header(
289
234
'Basic realm="Thou should not pass"')
290
self.assertEqual('basic', scheme)
291
self.assertEqual('realm="Thou should not pass"', remainder)
293
def test_basic_extract_realm(self):
294
scheme, remainder = self.parse_header(
295
'Basic realm="Thou should not pass"',
296
_urllib2_wrappers.BasicAuthHandler)
297
match, realm = self.auth_handler.extract_realm(remainder)
298
self.assertTrue(match is not None)
299
self.assertEqual('Thou should not pass', realm)
235
self.assertEquals('basic', scheme)
236
self.assertEquals('realm="Thou should not pass"', remainder)
301
238
def test_digest_header(self):
302
239
scheme, remainder = self.parse_header(
303
240
'Digest realm="Thou should not pass"')
304
self.assertEqual('digest', scheme)
305
self.assertEqual('realm="Thou should not pass"', remainder)
241
self.assertEquals('digest', scheme)
242
self.assertEquals('realm="Thou should not pass"', remainder)
308
245
class TestHTTPServer(tests.TestCase):
314
251
protocol_version = 'HTTP/0.1'
316
self.assertRaises(httplib.UnknownProtocol,
317
http_server.HttpServer, BogusRequestHandler)
253
server = http_server.HttpServer(BogusRequestHandler)
255
self.assertRaises(httplib.UnknownProtocol,server.setUp)
258
self.fail('HTTP Server creation did not raise UnknownProtocol')
319
260
def test_force_invalid_protocol(self):
320
self.assertRaises(httplib.UnknownProtocol,
321
http_server.HttpServer, protocol_version='HTTP/0.1')
261
server = http_server.HttpServer(protocol_version='HTTP/0.1')
263
self.assertRaises(httplib.UnknownProtocol,server.setUp)
266
self.fail('HTTP Server creation did not raise UnknownProtocol')
323
268
def test_server_start_and_stop(self):
324
269
server = http_server.HttpServer()
325
self.addCleanup(server.stop_server)
326
server.start_server()
327
self.assertTrue(server.server is not None)
328
self.assertTrue(server.server.serving is not None)
329
self.assertTrue(server.server.serving)
271
self.assertTrue(server._http_running)
273
self.assertFalse(server._http_running)
331
275
def test_create_http_server_one_zero(self):
332
276
class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
388
339
def test_url_parsing(self):
389
340
f = FakeManager()
390
341
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
342
self.assertEquals('http://example.com', url)
343
self.assertEquals(0, len(f.credentials))
393
344
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
345
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
346
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
347
self.assertEquals(1, len(f.credentials))
348
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
401
352
class TestHttpTransportUrls(tests.TestCase):
448
399
https by supplying a fake version_info that do not
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
405
raise tests.TestSkipped('pycurl not present')
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
407
version_info_orig = pycurl.version_info
409
# Now that we have pycurl imported, we can fake its version_info
410
# This was taken from a windows pycurl without SSL
412
pycurl.version_info = lambda : (2,
420
('ftp', 'gopher', 'telnet',
421
'dict', 'ldap', 'http', 'file'),
425
self.assertRaises(errors.DependencyNotPresent, self._transport,
426
'https://launchpad.net')
428
# Restore the right function
429
pycurl.version_info = version_info_orig
475
432
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
534
491
class TestPost(tests.TestCase):
536
493
def test_post_body_is_received(self):
537
server = RecordingServer(expect_body_tail='end-of-body',
538
scheme=self._url_protocol)
539
self.start_server(server)
540
url = server.get_url()
541
# FIXME: needs a cleanup -- vila 20100611
542
http_transport = transport.get_transport(url)
494
server = RecordingServer(expect_body_tail='end-of-body')
496
self.addCleanup(server.tearDown)
497
scheme = self._qualified_prefix
498
url = '%s://%s:%s/' % (scheme, server.host, server.port)
499
http_transport = self._transport(url)
543
500
code, response = http_transport._post('abc def end-of-body')
545
502
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
589
546
_req_handler_class = http_server.TestingHTTPRequestHandler
591
548
def create_transport_readonly_server(self):
592
server = http_server.HttpServer(self._req_handler_class,
593
protocol_version=self._protocol_version)
594
server._url_protocol = self._url_protocol
549
return http_server.HttpServer(self._req_handler_class,
550
protocol_version=self._protocol_version)
597
552
def _testing_pycurl(self):
598
# TODO: This is duplicated for lots of the classes in this file
599
return (features.pycurl.available()
600
and self._transport == PyCurlTransport)
553
return pycurl_present and self._transport == PyCurlTransport
603
556
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
604
557
"""Whatever request comes in, close the connection"""
606
def _handle_one_request(self):
559
def handle_one_request(self):
607
560
"""Handle a single HTTP request, by abruptly closing the connection"""
608
561
self.close_connection = 1
614
567
_req_handler_class = WallRequestHandler
616
569
def test_http_has(self):
617
t = self.get_readonly_transport()
570
server = self.get_readonly_server()
571
t = self._transport(server.get_url())
618
572
# Unfortunately httplib (see HTTPResponse._read_status
619
573
# for details) make no distinction between a closed
620
574
# socket and badly formatted status line, so we can't
621
575
# just test for ConnectionError, we have to test
622
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
623
# instead of ConnectionError too.
624
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
625
errors.InvalidHttpResponse),
576
# InvalidHttpResponse too.
577
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
626
578
t.has, 'foo/bar')
628
580
def test_http_get(self):
629
t = self.get_readonly_transport()
630
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
631
errors.InvalidHttpResponse),
581
server = self.get_readonly_server()
582
t = self._transport(server.get_url())
583
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
632
584
t.get, 'foo/bar')
700
662
_req_handler_class = BadProtocolRequestHandler
703
if self._testing_pycurl():
665
if pycurl_present and self._transport == PyCurlTransport:
704
666
raise tests.TestNotApplicable(
705
667
"pycurl doesn't check the protocol version")
706
668
super(TestBadProtocolServer, self).setUp()
708
670
def test_http_has(self):
709
t = self.get_readonly_transport()
671
server = self.get_readonly_server()
672
t = self._transport(server.get_url())
710
673
self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
712
675
def test_http_get(self):
713
t = self.get_readonly_transport()
676
server = self.get_readonly_server()
677
t = self._transport(server.get_url())
714
678
self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
746
712
self.assertEqual(None, server.host)
747
713
self.assertEqual(None, server.port)
749
def test_setUp_and_stop(self):
715
def test_setUp_and_tearDown(self):
750
716
server = RecordingServer(expect_body_tail=None)
751
server.start_server()
753
719
self.assertNotEqual(None, server.host)
754
720
self.assertNotEqual(None, server.port)
757
723
self.assertEqual(None, server.host)
758
724
self.assertEqual(None, server.port)
760
726
def test_send_receive_bytes(self):
761
server = RecordingServer(expect_body_tail='c', scheme='http')
762
self.start_server(server)
727
server = RecordingServer(expect_body_tail='c')
729
self.addCleanup(server.tearDown)
763
730
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
764
731
sock.connect((server.host, server.port))
765
732
sock.sendall('abc')
1071
1044
self.build_tree_contents([('a', content)],)
1073
1046
def test_few_ranges(self):
1074
t = self.get_readonly_transport()
1047
t = self.get_transport()
1075
1048
l = list(t.readv('a', ((0, 4), (1024, 4), )))
1076
1049
self.assertEqual(l[0], (0, '0000'))
1077
1050
self.assertEqual(l[1], (1024, '0001'))
1078
1051
self.assertEqual(1, self.get_readonly_server().GET_request_nb)
1080
1053
def test_more_ranges(self):
1081
t = self.get_readonly_transport()
1054
t = self.get_transport()
1082
1055
l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
1083
1056
self.assertEqual(l[0], (0, '0000'))
1084
1057
self.assertEqual(l[1], (1024, '0001'))
1137
1113
# FIXME: We don't have an https server available, so we don't
1138
# test https connections. --vila toolongago
1114
# test https connections.
1140
1116
def setUp(self):
1141
1117
super(TestProxyHttpServer, self).setUp()
1142
self.transport_secondary_server = http_utils.ProxyServer
1143
1118
self.build_tree_contents([('foo', 'contents of foo\n'),
1144
1119
('foo-proxied', 'proxied contents of foo\n')])
1145
1120
# Let's setup some attributes for tests
1146
server = self.get_readonly_server()
1147
self.server_host_port = '%s:%d' % (server.host, server.port)
1121
self.server = self.get_readonly_server()
1122
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1148
1123
if self._testing_pycurl():
1149
1124
# Oh my ! pycurl does not check for the port as part of
1150
1125
# no_proxy :-( So we just test the host part
1151
self.no_proxy_host = server.host
1126
self.no_proxy_host = 'localhost'
1153
self.no_proxy_host = self.server_host_port
1128
self.no_proxy_host = self.proxy_address
1154
1129
# The secondary server is the proxy
1155
self.proxy_url = self.get_secondary_url()
1130
self.proxy = self.get_secondary_server()
1131
self.proxy_url = self.proxy.get_url()
1156
1132
self._old_env = {}
1158
1134
def _testing_pycurl(self):
1159
# TODO: This is duplicated for lots of the classes in this file
1160
return (features.pycurl.available()
1161
and self._transport == PyCurlTransport)
1135
return pycurl_present and self._transport == PyCurlTransport
1137
def create_transport_secondary_server(self):
1138
"""Creates an http server that will serve files with
1139
'-proxied' appended to their names.
1141
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1163
1143
def _install_env(self, env):
1164
1144
for name, value in env.iteritems():
1239
1221
def setUp(self):
1240
1222
http_utils.TestCaseWithWebserver.setUp(self)
1241
1223
self.build_tree_contents([('a', '0123456789')],)
1224
server = self.get_readonly_server()
1225
self.transport = self._transport(server.get_url())
1243
1227
def create_transport_readonly_server(self):
1244
1228
return http_server.HttpServer(protocol_version=self._protocol_version)
1246
1230
def _file_contents(self, relpath, ranges):
1247
t = self.get_readonly_transport()
1248
1231
offsets = [ (start, end - start + 1) for start, end in ranges]
1249
coalesce = t._coalesce_offsets
1232
coalesce = self.transport._coalesce_offsets
1250
1233
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1251
code, data = t._get(relpath, coalesced)
1234
code, data = self.transport._get(relpath, coalesced)
1252
1235
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1253
1236
for start, end in ranges:
1254
1237
data.seek(start)
1255
1238
yield data.read(end - start + 1)
1257
1240
def _file_tail(self, relpath, tail_amount):
1258
t = self.get_readonly_transport()
1259
code, data = t._get(relpath, [], tail_amount)
1241
code, data = self.transport._get(relpath, [], tail_amount)
1260
1242
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1261
1243
data.seek(-tail_amount, 2)
1262
1244
return data.read(tail_amount)
1281
1263
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
1282
1264
"""Test redirection between http servers."""
1266
def create_transport_secondary_server(self):
1267
"""Create the secondary server redirecting to the primary server"""
1268
new = self.get_readonly_server()
1270
redirecting = http_utils.HTTPServerRedirecting(
1271
protocol_version=self._protocol_version)
1272
redirecting.redirect_to(new.host, new.port)
1284
1275
def setUp(self):
1285
1276
super(TestHTTPRedirections, self).setUp()
1286
1277
self.build_tree_contents([('a', '0123456789'),
1288
1279
'# Bazaar revision bundle v0.9\n#\n')
1281
# The requests to the old server will be redirected to the new server
1282
self.old_transport = self._transport(self.old_server.get_url())
1291
1284
def test_redirected(self):
1292
self.assertRaises(errors.RedirectRequested,
1293
self.get_old_transport().get, 'a')
1294
self.assertEqual('0123456789', self.get_new_transport().get('a').read())
1285
self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
1286
t = self._transport(self.new_server.get_url())
1287
self.assertEqual('0123456789', t.get('a').read())
1289
def test_read_redirected_bundle_from_url(self):
1290
from bzrlib.bundle import read_bundle_from_url
1291
url = self.old_transport.abspath('bundle')
1292
bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1293
read_bundle_from_url, url)
1294
# If read_bundle_from_url was successful we get an empty bundle
1295
self.assertEqual([], bundle.revisions)
1297
1298
class RedirectedRequest(_urllib2_wrappers.Request):
1306
1307
# Since the tests using this class will replace
1307
1308
# _urllib2_wrappers.Request, we can't just call the base class __init__
1308
1309
# or we'll loop.
1309
RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1310
RedirectedRequest.init_orig(self, method, url, args, kwargs)
1310
1311
self.follow_redirections = True
1313
def install_redirected_request(test):
1314
test.overrideAttr(_urllib2_wrappers, 'Request', RedirectedRequest)
1317
def cleanup_http_redirection_connections(test):
1318
# Some sockets are opened but never seen by _urllib, so we trap them at
1319
# the _urllib2_wrappers level to be able to clean them up.
1320
def socket_disconnect(sock):
1322
sock.shutdown(socket.SHUT_RDWR)
1324
except socket.error:
1326
def connect(connection):
1327
test.http_connect_orig(connection)
1328
test.addCleanup(socket_disconnect, connection.sock)
1329
test.http_connect_orig = test.overrideAttr(
1330
_urllib2_wrappers.HTTPConnection, 'connect', connect)
1331
def connect(connection):
1332
test.https_connect_orig(connection)
1333
test.addCleanup(socket_disconnect, connection.sock)
1334
test.https_connect_orig = test.overrideAttr(
1335
_urllib2_wrappers.HTTPSConnection, 'connect', connect)
1338
1314
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1339
1315
"""Test redirections.
1352
1328
def setUp(self):
1353
if (features.pycurl.available()
1354
and self._transport == PyCurlTransport):
1329
if pycurl_present and self._transport == PyCurlTransport:
1355
1330
raise tests.TestNotApplicable(
1356
"pycurl doesn't redirect silently anymore")
1331
"pycurl doesn't redirect silently annymore")
1357
1332
super(TestHTTPSilentRedirections, self).setUp()
1358
install_redirected_request(self)
1359
cleanup_http_redirection_connections(self)
1333
self.setup_redirected_request()
1334
self.addCleanup(self.cleanup_redirected_request)
1360
1335
self.build_tree_contents([('a','a'),
1362
1337
('1/a', 'redirected once'),
1370
1345
('5/a', 'redirected 5 times'),
1348
self.old_transport = self._transport(self.old_server.get_url())
1350
def setup_redirected_request(self):
1351
self.original_class = _urllib2_wrappers.Request
1352
_urllib2_wrappers.Request = RedirectedRequest
1354
def cleanup_redirected_request(self):
1355
_urllib2_wrappers.Request = self.original_class
1357
def create_transport_secondary_server(self):
1358
"""Create the secondary server, redirections are defined in the tests"""
1359
return http_utils.HTTPServerRedirecting(
1360
protocol_version=self._protocol_version)
1373
1362
def test_one_redirection(self):
1374
t = self.get_old_transport()
1375
req = RedirectedRequest('GET', t._remote_path('a'))
1363
t = self.old_transport
1365
req = RedirectedRequest('GET', t.abspath('a'))
1366
req.follow_redirections = True
1376
1367
new_prefix = 'http://%s:%s' % (self.new_server.host,
1377
1368
self.new_server.port)
1378
1369
self.old_server.redirections = \
1379
1370
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
1380
self.assertEqual('redirected once', t._perform(req).read())
1371
self.assertEquals('redirected once',t._perform(req).read())
1382
1373
def test_five_redirections(self):
1383
t = self.get_old_transport()
1384
req = RedirectedRequest('GET', t._remote_path('a'))
1374
t = self.old_transport
1376
req = RedirectedRequest('GET', t.abspath('a'))
1377
req.follow_redirections = True
1385
1378
old_prefix = 'http://%s:%s' % (self.old_server.host,
1386
1379
self.old_server.port)
1387
1380
new_prefix = 'http://%s:%s' % (self.new_server.host,
1402
1395
def setUp(self):
1403
1396
super(TestDoCatchRedirections, self).setUp()
1404
1397
self.build_tree_contents([('a', '0123456789'),],)
1405
cleanup_http_redirection_connections(self)
1407
self.old_transport = self.get_old_transport()
1399
self.old_transport = self._transport(self.old_server.get_url())
1401
def get_a(self, transport):
1402
return transport.get('a')
1412
1404
def test_no_redirection(self):
1413
t = self.get_new_transport()
1405
t = self._transport(self.new_server.get_url())
1415
1407
# We use None for redirected so that we fail if redirected
1416
self.assertEqual('0123456789',
1417
transport.do_catching_redirections(
1408
self.assertEquals('0123456789',
1409
transport.do_catching_redirections(
1418
1410
self.get_a, t, None).read())
1420
1412
def test_one_redirection(self):
1421
1413
self.redirections = 0
1423
def redirected(t, exception, redirection_notice):
1415
def redirected(transport, exception, redirection_notice):
1424
1416
self.redirections += 1
1425
redirected_t = t._redirected_to(exception.source, exception.target)
1417
dir, file = urlutils.split(exception.target)
1418
return self._transport(dir)
1428
self.assertEqual('0123456789',
1429
transport.do_catching_redirections(
1420
self.assertEquals('0123456789',
1421
transport.do_catching_redirections(
1430
1422
self.get_a, self.old_transport, redirected).read())
1431
self.assertEqual(1, self.redirections)
1423
self.assertEquals(1, self.redirections)
1433
1425
def test_redirection_loop(self):
1459
1448
('b', 'contents of b\n'),])
1461
1450
def create_transport_readonly_server(self):
1462
server = self._auth_server(protocol_version=self._protocol_version)
1463
server._url_protocol = self._url_protocol
1451
if self._auth_scheme == 'basic':
1452
server = http_utils.HTTPBasicAuthServer(
1453
protocol_version=self._protocol_version)
1455
if self._auth_scheme != 'digest':
1456
raise AssertionError('Unknown auth scheme: %r'
1457
% self._auth_scheme)
1458
server = http_utils.HTTPDigestAuthServer(
1459
protocol_version=self._protocol_version)
1466
1462
def _testing_pycurl(self):
1467
# TODO: This is duplicated for lots of the classes in this file
1468
return (features.pycurl.available()
1469
and self._transport == PyCurlTransport)
1463
return pycurl_present and self._transport == PyCurlTransport
1471
1465
def get_user_url(self, user, password):
1472
1466
"""Build an url embedding user and password"""
1521
1514
# initial 'who are you' and 'this is not you, who are you')
1522
1515
self.assertEqual(2, self.server.auth_required_errors)
1524
def test_prompt_for_username(self):
1525
if self._testing_pycurl():
1526
raise tests.TestNotApplicable(
1527
'pycurl cannot prompt, it handles auth by embedding'
1528
' user:pass in urls only')
1530
self.server.add_user('joe', 'foo')
1531
t = self.get_user_transport(None, None)
1532
stdout = tests.StringIOWrapper()
1533
stderr = tests.StringIOWrapper()
1534
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1535
stdout=stdout, stderr=stderr)
1536
self.assertEqual('contents of a\n',t.get('a').read())
1537
# stdin should be empty
1538
self.assertEqual('', ui.ui_factory.stdin.readline())
1540
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1541
self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1542
self.assertEqual('', stdout.getvalue())
1543
self._check_password_prompt(t._unqualified_scheme, 'joe',
1546
1517
def test_prompt_for_password(self):
1547
1518
if self._testing_pycurl():
1548
1519
raise tests.TestNotApplicable(
1552
1523
self.server.add_user('joe', 'foo')
1553
1524
t = self.get_user_transport('joe', None)
1554
1525
stdout = tests.StringIOWrapper()
1555
stderr = tests.StringIOWrapper()
1556
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1557
stdout=stdout, stderr=stderr)
1558
self.assertEqual('contents of a\n', t.get('a').read())
1526
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1527
self.assertEqual('contents of a\n',t.get('a').read())
1559
1528
# stdin should be empty
1560
1529
self.assertEqual('', ui.ui_factory.stdin.readline())
1561
1530
self._check_password_prompt(t._unqualified_scheme, 'joe',
1563
self.assertEqual('', stdout.getvalue())
1564
1532
# And we shouldn't prompt again for a different request
1565
1533
# against the same transport.
1566
1534
self.assertEqual('contents of b\n',t.get('b').read())
1576
1544
% (scheme.upper(),
1577
1545
user, self.server.host, self.server.port,
1578
1546
self.server.auth_realm)))
1579
self.assertEqual(expected_prompt, actual_prompt)
1581
def _expected_username_prompt(self, scheme):
1582
return (self._username_prompt_prefix
1583
+ "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1584
self.server.host, self.server.port,
1585
self.server.auth_realm))
1547
self.assertEquals(expected_prompt, actual_prompt)
1587
1549
def test_no_prompt_for_password_when_using_auth_config(self):
1588
1550
if self._testing_pycurl():
1670
1630
('b-proxied', 'contents of b\n'),
1633
def create_transport_readonly_server(self):
1634
if self._auth_scheme == 'basic':
1635
server = http_utils.ProxyBasicAuthServer(
1636
protocol_version=self._protocol_version)
1638
if self._auth_scheme != 'digest':
1639
raise AssertionError('Unknown auth scheme: %r'
1640
% self._auth_scheme)
1641
server = http_utils.ProxyDigestAuthServer(
1642
protocol_version=self._protocol_version)
1673
1645
def get_user_transport(self, user, password):
1674
1646
self._install_env({'all_proxy': self.get_user_url(user, password)})
1675
return TestAuth.get_user_transport(self, user, password)
1647
return self._transport(self.server.get_url())
1677
1649
def _install_env(self, env):
1678
1650
for name, value in env.iteritems():
1721
1692
# We use the VFS layer as part of HTTP tunnelling tests.
1722
1693
self._captureVar('BZR_NO_SMART_VFS', None)
1723
1694
self.transport_readonly_server = http_utils.HTTPServerWithSmarts
1724
self.http_server = self.get_readonly_server()
1726
1696
def create_transport_readonly_server(self):
1727
server = http_utils.HTTPServerWithSmarts(
1697
return http_utils.HTTPServerWithSmarts(
1728
1698
protocol_version=self._protocol_version)
1729
server._url_protocol = self._url_protocol
1732
1700
def test_open_bzrdir(self):
1733
1701
branch = self.make_branch('relpath')
1734
url = self.http_server.get_url() + 'relpath'
1702
http_server = self.get_readonly_server()
1703
url = http_server.get_url() + 'relpath'
1735
1704
bd = bzrdir.BzrDir.open(url)
1736
self.addCleanup(bd.transport.disconnect)
1737
1705
self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1739
1707
def test_bulk_data(self):
1936
1906
# We override at class level because constructors may propagate the
1937
1907
# bound method and render instance overriding ineffective (an
1938
# alternative would be to define a specific ui factory instead...)
1939
self.overrideAttr(self._transport, '_report_activity', report_activity)
1940
self.addCleanup(self.server.stop_server)
1908
# alternative would be be to define a specific ui factory instead...)
1909
self.orig_report_activity = self._transport._report_activity
1910
self._transport._report_activity = report_activity
1913
self._transport._report_activity = self.orig_report_activity
1914
self.server.tearDown()
1915
tests.TestCase.tearDown(self)
1942
1917
def get_transport(self):
1943
t = self._transport(self.server.get_url())
1944
# FIXME: Needs cleanup -- vila 20100611
1918
return self._transport(self.server.get_url())
1947
1920
def assertActivitiesMatch(self):
1948
1921
self.assertEqual(self.server.bytes_read,
2054
2027
t = self.get_transport()
2055
2028
# We must send a single line of body bytes, see
2056
# PredefinedRequestHandler._handle_one_request
2029
# PredefinedRequestHandler.handle_one_request
2057
2030
code, f = t._post('abc def end-of-body\n')
2058
2031
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2059
2032
self.assertActivitiesMatch()
2062
class TestActivity(tests.TestCase, TestActivityMixin):
2065
TestActivityMixin.setUp(self)
2068
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2070
# Unlike TestActivity, we are really testing ReportingFileSocket and
2071
# ReportingSocket, so we don't need all the parametrization. Since
2072
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2073
# test them through their use by the transport than directly (that's a
2074
# bit less clean but far more simpler and effective).
2075
_activity_server = ActivityHTTPServer
2076
_protocol_version = 'HTTP/1.1'
2079
self._transport =_urllib.HttpTransport_urllib
2080
TestActivityMixin.setUp(self)
2082
def assertActivitiesMatch(self):
2083
# Nothing to check here
2087
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2088
"""Test authentication on the redirected http server."""
2090
_auth_header = 'Authorization'
2091
_password_prompt_prefix = ''
2092
_username_prompt_prefix = ''
2093
_auth_server = http_utils.HTTPBasicAuthServer
2094
_transport = _urllib.HttpTransport_urllib
2097
super(TestAuthOnRedirected, self).setUp()
2098
self.build_tree_contents([('a','a'),
2100
('1/a', 'redirected once'),
2102
new_prefix = 'http://%s:%s' % (self.new_server.host,
2103
self.new_server.port)
2104
self.old_server.redirections = [
2105
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2106
self.old_transport = self.get_old_transport()
2107
self.new_server.add_user('joe', 'foo')
2108
cleanup_http_redirection_connections(self)
2110
def create_transport_readonly_server(self):
2111
server = self._auth_server(protocol_version=self._protocol_version)
2112
server._url_protocol = self._url_protocol
2118
def test_auth_on_redirected_via_do_catching_redirections(self):
2119
self.redirections = 0
2121
def redirected(t, exception, redirection_notice):
2122
self.redirections += 1
2123
redirected_t = t._redirected_to(exception.source, exception.target)
2124
self.addCleanup(redirected_t.disconnect)
2127
stdout = tests.StringIOWrapper()
2128
stderr = tests.StringIOWrapper()
2129
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2130
stdout=stdout, stderr=stderr)
2131
self.assertEqual('redirected once',
2132
transport.do_catching_redirections(
2133
self.get_a, self.old_transport, redirected).read())
2134
self.assertEqual(1, self.redirections)
2135
# stdin should be empty
2136
self.assertEqual('', ui.ui_factory.stdin.readline())
2137
# stdout should be empty, stderr will contains the prompts
2138
self.assertEqual('', stdout.getvalue())
2140
def test_auth_on_redirected_via_following_redirections(self):
2141
self.new_server.add_user('joe', 'foo')
2142
stdout = tests.StringIOWrapper()
2143
stderr = tests.StringIOWrapper()
2144
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2145
stdout=stdout, stderr=stderr)
2146
t = self.old_transport
2147
req = RedirectedRequest('GET', t.abspath('a'))
2148
new_prefix = 'http://%s:%s' % (self.new_server.host,
2149
self.new_server.port)
2150
self.old_server.redirections = [
2151
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2152
self.assertEqual('redirected once', t._perform(req).read())
2153
# stdin should be empty
2154
self.assertEqual('', ui.ui_factory.stdin.readline())
2155
# stdout should be empty, stderr will contains the prompts
2156
self.assertEqual('', stdout.getvalue())