78
82
transport_scenarios = [
79
83
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
80
84
_server=http_server.HttpServer_urllib,
81
_url_protocol='http+urllib',)),
85
_qualified_prefix='http+urllib',)),
83
if features.pycurl.available():
84
88
transport_scenarios.append(
85
89
('pycurl', dict(_transport=PyCurlTransport,
86
90
_server=http_server.HttpServer_PyCurl,
87
_url_protocol='http+pycurl',)))
91
_qualified_prefix='http+pycurl',)))
88
92
tests.multiply_tests(t_tests, transport_scenarios, result)
90
protocol_scenarios = [
91
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
92
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
95
# some tests are parametrized by the protocol version only
96
p_tests, remaining_tests = tests.split_suite_by_condition(
97
remaining_tests, tests.condition_isinstance((
100
tests.multiply_tests(p_tests, protocol_scenarios, result)
102
94
# each implementation tested with each HTTP version
103
95
tp_tests, remaining_tests = tests.split_suite_by_condition(
104
96
remaining_tests, tests.condition_isinstance((
114
106
TestSpecificRequestHandler,
108
protocol_scenarios = [
109
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
110
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
116
112
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
117
113
protocol_scenarios)
118
114
tests.multiply_tests(tp_tests, tp_scenarios, result)
120
# proxy auth: each auth scheme on all http versions on all implementations.
121
tppa_tests, remaining_tests = tests.split_suite_by_condition(
122
remaining_tests, tests.condition_isinstance((
125
proxy_auth_scheme_scenarios = [
126
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
127
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
129
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
131
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
132
proxy_auth_scheme_scenarios)
133
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
135
116
# auth: each auth scheme on all http versions on all implementations.
136
117
tpa_tests, remaining_tests = tests.split_suite_by_condition(
137
118
remaining_tests, tests.condition_isinstance((
140
121
auth_scheme_scenarios = [
141
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
142
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
144
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
122
('basic', dict(_auth_scheme='basic')),
123
('digest', dict(_auth_scheme='digest')),
146
125
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
147
auth_scheme_scenarios)
126
auth_scheme_scenarios)
148
127
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
150
# activity: on all http[s] versions on all implementations
129
# activity: activity on all http versions on all implementations
151
130
tpact_tests, remaining_tests = tests.split_suite_by_condition(
152
131
remaining_tests, tests.condition_isinstance((
155
134
activity_scenarios = [
156
('urllib,http', dict(_activity_server=ActivityHTTPServer,
157
_transport=_urllib.HttpTransport_urllib,)),
135
('http', dict(_activity_server=ActivityHTTPServer)),
159
137
if tests.HTTPSServerFeature.available():
160
138
activity_scenarios.append(
161
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
162
_transport=_urllib.HttpTransport_urllib,)),)
163
if features.pycurl.available():
164
activity_scenarios.append(
165
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
166
_transport=PyCurlTransport,)),)
167
if tests.HTTPSServerFeature.available():
168
from bzrlib.tests import (
171
# FIXME: Until we have a better way to handle self-signed
172
# certificates (like allowing them in a test specific
173
# authentication.conf for example), we need some specialized pycurl
174
# transport for tests.
175
class HTTPS_pycurl_transport(PyCurlTransport):
177
def __init__(self, base, _from_transport=None):
178
super(HTTPS_pycurl_transport, self).__init__(
179
base, _from_transport)
180
self.cabundle = str(ssl_certs.build_path('ca.crt'))
182
activity_scenarios.append(
183
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
184
_transport=HTTPS_pycurl_transport,)),)
186
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
139
('https', dict(_activity_server=ActivityHTTPSServer)))
140
tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
188
142
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
190
144
# No parametrization for the remaining tests
221
175
self.received_bytes = ''
225
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
227
def start_server(self):
228
178
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
229
179
self._sock.bind(('127.0.0.1', 0))
230
180
self.host, self.port = self._sock.getsockname()
231
181
self._ready = threading.Event()
232
self._thread = test_server.ThreadWithException(
233
event=self._ready, target=self._accept_read_and_reply)
182
self._thread = threading.Thread(target=self._accept_read_and_reply)
183
self._thread.setDaemon(True)
234
184
self._thread.start()
235
if 'threads' in tests.selftest_debug_flags:
236
sys.stderr.write('Thread started: %s\n' % (self._thread.ident,))
239
187
def _accept_read_and_reply(self):
240
188
self._sock.listen(1)
241
189
self._ready.set()
242
conn, address = self._sock.accept()
243
if self._expect_body_tail is not None:
190
self._sock.settimeout(5)
192
conn, address = self._sock.accept()
193
# On win32, the accepted connection will be non-blocking to start
194
# with because we're using settimeout.
195
conn.setblocking(True)
244
196
while not self.received_bytes.endswith(self._expect_body_tail):
245
197
self.received_bytes += conn.recv(4096)
246
198
conn.sendall('HTTP/1.1 200 OK\r\n')
199
except socket.timeout:
200
# Make sure the client isn't stuck waiting for us to e.g. accept.
248
201
self._sock.close()
249
202
except socket.error:
250
203
# The client may have already closed the socket.
253
def stop_server(self):
255
# Issue a fake connection to wake up the server and allow it to
257
fake_conn = osutils.connect_socket((self.host, self.port))
259
209
except socket.error:
260
210
# We might have already closed it. We don't care.
265
if 'threads' in tests.selftest_debug_flags:
266
sys.stderr.write('Thread joined: %s\n' % (self._thread.ident,))
269
216
class TestAuthHeader(tests.TestCase):
277
224
def test_empty_header(self):
278
225
scheme, remainder = self.parse_header('')
279
self.assertEqual('', scheme)
226
self.assertEquals('', scheme)
280
227
self.assertIs(None, remainder)
282
229
def test_negotiate_header(self):
283
230
scheme, remainder = self.parse_header('Negotiate')
284
self.assertEqual('negotiate', scheme)
231
self.assertEquals('negotiate', scheme)
285
232
self.assertIs(None, remainder)
287
234
def test_basic_header(self):
288
235
scheme, remainder = self.parse_header(
289
236
'Basic realm="Thou should not pass"')
290
self.assertEqual('basic', scheme)
291
self.assertEqual('realm="Thou should not pass"', remainder)
237
self.assertEquals('basic', scheme)
238
self.assertEquals('realm="Thou should not pass"', remainder)
293
240
def test_basic_extract_realm(self):
294
241
scheme, remainder = self.parse_header(
296
243
_urllib2_wrappers.BasicAuthHandler)
297
244
match, realm = self.auth_handler.extract_realm(remainder)
298
245
self.assertTrue(match is not None)
299
self.assertEqual('Thou should not pass', realm)
246
self.assertEquals('Thou should not pass', realm)
301
248
def test_digest_header(self):
302
249
scheme, remainder = self.parse_header(
303
250
'Digest realm="Thou should not pass"')
304
self.assertEqual('digest', scheme)
305
self.assertEqual('realm="Thou should not pass"', remainder)
251
self.assertEquals('digest', scheme)
252
self.assertEquals('realm="Thou should not pass"', remainder)
308
255
class TestHTTPServer(tests.TestCase):
314
261
protocol_version = 'HTTP/0.1'
316
self.assertRaises(httplib.UnknownProtocol,
317
http_server.HttpServer, BogusRequestHandler)
263
server = http_server.HttpServer(BogusRequestHandler)
265
self.assertRaises(httplib.UnknownProtocol,server.setUp)
268
self.fail('HTTP Server creation did not raise UnknownProtocol')
319
270
def test_force_invalid_protocol(self):
320
self.assertRaises(httplib.UnknownProtocol,
321
http_server.HttpServer, protocol_version='HTTP/0.1')
271
server = http_server.HttpServer(protocol_version='HTTP/0.1')
273
self.assertRaises(httplib.UnknownProtocol,server.setUp)
276
self.fail('HTTP Server creation did not raise UnknownProtocol')
323
278
def test_server_start_and_stop(self):
324
279
server = http_server.HttpServer()
325
self.addCleanup(server.stop_server)
326
server.start_server()
327
self.assertTrue(server.server is not None)
328
self.assertTrue(server.server.serving is not None)
329
self.assertTrue(server.server.serving)
281
self.assertTrue(server._http_running)
283
self.assertFalse(server._http_running)
331
285
def test_create_http_server_one_zero(self):
332
286
class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
388
349
def test_url_parsing(self):
389
350
f = FakeManager()
390
351
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
352
self.assertEquals('http://example.com', url)
353
self.assertEquals(0, len(f.credentials))
393
354
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
355
'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
356
self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
357
self.assertEquals(1, len(f.credentials))
358
self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
401
362
class TestHttpTransportUrls(tests.TestCase):
448
409
https by supplying a fake version_info that do not
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
415
raise tests.TestSkipped('pycurl not present')
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
417
version_info_orig = pycurl.version_info
419
# Now that we have pycurl imported, we can fake its version_info
420
# This was taken from a windows pycurl without SSL
422
pycurl.version_info = lambda : (2,
430
('ftp', 'gopher', 'telnet',
431
'dict', 'ldap', 'http', 'file'),
435
self.assertRaises(errors.DependencyNotPresent, self._transport,
436
'https://launchpad.net')
438
# Restore the right function
439
pycurl.version_info = version_info_orig
475
442
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
534
501
class TestPost(tests.TestCase):
536
503
def test_post_body_is_received(self):
537
server = RecordingServer(expect_body_tail='end-of-body',
538
scheme=self._url_protocol)
539
self.start_server(server)
540
url = server.get_url()
541
# FIXME: needs a cleanup -- vila 20100611
542
http_transport = transport.get_transport(url)
504
server = RecordingServer(expect_body_tail='end-of-body')
506
self.addCleanup(server.tearDown)
507
scheme = self._qualified_prefix
508
url = '%s://%s:%s/' % (scheme, server.host, server.port)
509
http_transport = self._transport(url)
543
510
code, response = http_transport._post('abc def end-of-body')
545
512
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
589
556
_req_handler_class = http_server.TestingHTTPRequestHandler
591
558
def create_transport_readonly_server(self):
592
server = http_server.HttpServer(self._req_handler_class,
593
protocol_version=self._protocol_version)
594
server._url_protocol = self._url_protocol
559
return http_server.HttpServer(self._req_handler_class,
560
protocol_version=self._protocol_version)
597
562
def _testing_pycurl(self):
598
# TODO: This is duplicated for lots of the classes in this file
599
return (features.pycurl.available()
600
and self._transport == PyCurlTransport)
563
return pycurl_present and self._transport == PyCurlTransport
603
566
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
604
567
"""Whatever request comes in, close the connection"""
606
def _handle_one_request(self):
569
def handle_one_request(self):
607
570
"""Handle a single HTTP request, by abruptly closing the connection"""
608
571
self.close_connection = 1
614
577
_req_handler_class = WallRequestHandler
616
579
def test_http_has(self):
617
t = self.get_readonly_transport()
580
server = self.get_readonly_server()
581
t = self._transport(server.get_url())
618
582
# Unfortunately httplib (see HTTPResponse._read_status
619
583
# for details) make no distinction between a closed
620
584
# socket and badly formatted status line, so we can't
621
585
# just test for ConnectionError, we have to test
622
# InvalidHttpResponse too. And pycurl may raise ConnectionReset
623
# instead of ConnectionError too.
624
self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
625
errors.InvalidHttpResponse),
586
# InvalidHttpResponse too.
587
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
626
588
t.has, 'foo/bar')
628
590
def test_http_get(self):
629
t = self.get_readonly_transport()
630
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
631
errors.InvalidHttpResponse),
591
server = self.get_readonly_server()
592
t = self._transport(server.get_url())
593
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
632
594
t.get, 'foo/bar')
700
672
_req_handler_class = BadProtocolRequestHandler
703
if self._testing_pycurl():
675
if pycurl_present and self._transport == PyCurlTransport:
704
676
raise tests.TestNotApplicable(
705
677
"pycurl doesn't check the protocol version")
706
678
super(TestBadProtocolServer, self).setUp()
708
680
def test_http_has(self):
709
t = self.get_readonly_transport()
681
server = self.get_readonly_server()
682
t = self._transport(server.get_url())
710
683
self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
712
685
def test_http_get(self):
713
t = self.get_readonly_transport()
686
server = self.get_readonly_server()
687
t = self._transport(server.get_url())
714
688
self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
746
722
self.assertEqual(None, server.host)
747
723
self.assertEqual(None, server.port)
749
def test_setUp_and_stop(self):
725
def test_setUp_and_tearDown(self):
750
726
server = RecordingServer(expect_body_tail=None)
751
server.start_server()
753
729
self.assertNotEqual(None, server.host)
754
730
self.assertNotEqual(None, server.port)
757
733
self.assertEqual(None, server.host)
758
734
self.assertEqual(None, server.port)
760
736
def test_send_receive_bytes(self):
761
server = RecordingServer(expect_body_tail='c', scheme='http')
762
self.start_server(server)
737
server = RecordingServer(expect_body_tail='c')
739
self.addCleanup(server.tearDown)
763
740
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
764
741
sock.connect((server.host, server.port))
765
742
sock.sendall('abc')
1071
1054
self.build_tree_contents([('a', content)],)
1073
1056
def test_few_ranges(self):
1074
t = self.get_readonly_transport()
1057
t = self.get_transport()
1075
1058
l = list(t.readv('a', ((0, 4), (1024, 4), )))
1076
1059
self.assertEqual(l[0], (0, '0000'))
1077
1060
self.assertEqual(l[1], (1024, '0001'))
1078
1061
self.assertEqual(1, self.get_readonly_server().GET_request_nb)
1080
1063
def test_more_ranges(self):
1081
t = self.get_readonly_transport()
1064
t = self.get_transport()
1082
1065
l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
1083
1066
self.assertEqual(l[0], (0, '0000'))
1084
1067
self.assertEqual(l[1], (1024, '0001'))
1137
1123
# FIXME: We don't have an https server available, so we don't
1138
# test https connections. --vila toolongago
1124
# test https connections.
1140
1126
def setUp(self):
1141
1127
super(TestProxyHttpServer, self).setUp()
1142
self.transport_secondary_server = http_utils.ProxyServer
1143
1128
self.build_tree_contents([('foo', 'contents of foo\n'),
1144
1129
('foo-proxied', 'proxied contents of foo\n')])
1145
1130
# Let's setup some attributes for tests
1146
server = self.get_readonly_server()
1147
self.server_host_port = '%s:%d' % (server.host, server.port)
1131
self.server = self.get_readonly_server()
1132
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1148
1133
if self._testing_pycurl():
1149
1134
# Oh my ! pycurl does not check for the port as part of
1150
1135
# no_proxy :-( So we just test the host part
1151
self.no_proxy_host = server.host
1136
self.no_proxy_host = 'localhost'
1153
self.no_proxy_host = self.server_host_port
1138
self.no_proxy_host = self.proxy_address
1154
1139
# The secondary server is the proxy
1155
self.proxy_url = self.get_secondary_url()
1140
self.proxy = self.get_secondary_server()
1141
self.proxy_url = self.proxy.get_url()
1156
1142
self._old_env = {}
1158
1144
def _testing_pycurl(self):
1159
# TODO: This is duplicated for lots of the classes in this file
1160
return (features.pycurl.available()
1161
and self._transport == PyCurlTransport)
1145
return pycurl_present and self._transport == PyCurlTransport
1147
def create_transport_secondary_server(self):
1148
"""Creates an http server that will serve files with
1149
'-proxied' appended to their names.
1151
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1163
1153
def _install_env(self, env):
1164
1154
for name, value in env.iteritems():
1239
1231
def setUp(self):
1240
1232
http_utils.TestCaseWithWebserver.setUp(self)
1241
1233
self.build_tree_contents([('a', '0123456789')],)
1234
server = self.get_readonly_server()
1235
self.transport = self._transport(server.get_url())
1243
1237
def create_transport_readonly_server(self):
1244
1238
return http_server.HttpServer(protocol_version=self._protocol_version)
1246
1240
def _file_contents(self, relpath, ranges):
1247
t = self.get_readonly_transport()
1248
1241
offsets = [ (start, end - start + 1) for start, end in ranges]
1249
coalesce = t._coalesce_offsets
1242
coalesce = self.transport._coalesce_offsets
1250
1243
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1251
code, data = t._get(relpath, coalesced)
1244
code, data = self.transport._get(relpath, coalesced)
1252
1245
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1253
1246
for start, end in ranges:
1254
1247
data.seek(start)
1255
1248
yield data.read(end - start + 1)
1257
1250
def _file_tail(self, relpath, tail_amount):
1258
t = self.get_readonly_transport()
1259
code, data = t._get(relpath, [], tail_amount)
1251
code, data = self.transport._get(relpath, [], tail_amount)
1260
1252
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1261
1253
data.seek(-tail_amount, 2)
1262
1254
return data.read(tail_amount)
1281
1273
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
1282
1274
"""Test redirection between http servers."""
1276
def create_transport_secondary_server(self):
1277
"""Create the secondary server redirecting to the primary server"""
1278
new = self.get_readonly_server()
1280
redirecting = http_utils.HTTPServerRedirecting(
1281
protocol_version=self._protocol_version)
1282
redirecting.redirect_to(new.host, new.port)
1284
1285
def setUp(self):
1285
1286
super(TestHTTPRedirections, self).setUp()
1286
1287
self.build_tree_contents([('a', '0123456789'),
1288
1289
'# Bazaar revision bundle v0.9\n#\n')
1291
# The requests to the old server will be redirected to the new server
1292
self.old_transport = self._transport(self.old_server.get_url())
1291
1294
def test_redirected(self):
1292
self.assertRaises(errors.RedirectRequested,
1293
self.get_old_transport().get, 'a')
1294
self.assertEqual('0123456789', self.get_new_transport().get('a').read())
1295
self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
1296
t = self._transport(self.new_server.get_url())
1297
self.assertEqual('0123456789', t.get('a').read())
1299
def test_read_redirected_bundle_from_url(self):
1300
from bzrlib.bundle import read_bundle_from_url
1301
url = self.old_transport.abspath('bundle')
1302
bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1303
read_bundle_from_url, url)
1304
# If read_bundle_from_url was successful we get an empty bundle
1305
self.assertEqual([], bundle.revisions)
1297
1308
class RedirectedRequest(_urllib2_wrappers.Request):
1310
1321
self.follow_redirections = True
1313
def install_redirected_request(test):
1314
test.overrideAttr(_urllib2_wrappers, 'Request', RedirectedRequest)
1317
def cleanup_http_redirection_connections(test):
1318
# Some sockets are opened but never seen by _urllib, so we trap them at
1319
# the _urllib2_wrappers level to be able to clean them up.
1320
def socket_disconnect(sock):
1322
sock.shutdown(socket.SHUT_RDWR)
1324
except socket.error:
1326
def connect(connection):
1327
test.http_connect_orig(connection)
1328
test.addCleanup(socket_disconnect, connection.sock)
1329
test.http_connect_orig = test.overrideAttr(
1330
_urllib2_wrappers.HTTPConnection, 'connect', connect)
1331
def connect(connection):
1332
test.https_connect_orig(connection)
1333
test.addCleanup(socket_disconnect, connection.sock)
1334
test.https_connect_orig = test.overrideAttr(
1335
_urllib2_wrappers.HTTPSConnection, 'connect', connect)
1338
1324
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1339
1325
"""Test redirections.
1352
1338
def setUp(self):
1353
if (features.pycurl.available()
1354
and self._transport == PyCurlTransport):
1339
if pycurl_present and self._transport == PyCurlTransport:
1355
1340
raise tests.TestNotApplicable(
1356
"pycurl doesn't redirect silently anymore")
1341
"pycurl doesn't redirect silently annymore")
1357
1342
super(TestHTTPSilentRedirections, self).setUp()
1358
install_redirected_request(self)
1359
cleanup_http_redirection_connections(self)
1343
self.setup_redirected_request()
1344
self.addCleanup(self.cleanup_redirected_request)
1360
1345
self.build_tree_contents([('a','a'),
1362
1347
('1/a', 'redirected once'),
1370
1355
('5/a', 'redirected 5 times'),
1358
self.old_transport = self._transport(self.old_server.get_url())
1360
def setup_redirected_request(self):
1361
self.original_class = _urllib2_wrappers.Request
1362
_urllib2_wrappers.Request = RedirectedRequest
1364
def cleanup_redirected_request(self):
1365
_urllib2_wrappers.Request = self.original_class
1367
def create_transport_secondary_server(self):
1368
"""Create the secondary server, redirections are defined in the tests"""
1369
return http_utils.HTTPServerRedirecting(
1370
protocol_version=self._protocol_version)
1373
1372
def test_one_redirection(self):
1374
t = self.get_old_transport()
1375
req = RedirectedRequest('GET', t._remote_path('a'))
1373
t = self.old_transport
1375
req = RedirectedRequest('GET', t.abspath('a'))
1376
req.follow_redirections = True
1376
1377
new_prefix = 'http://%s:%s' % (self.new_server.host,
1377
1378
self.new_server.port)
1378
1379
self.old_server.redirections = \
1379
1380
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
1380
self.assertEqual('redirected once', t._perform(req).read())
1381
self.assertEquals('redirected once',t._perform(req).read())
1382
1383
def test_five_redirections(self):
1383
t = self.get_old_transport()
1384
req = RedirectedRequest('GET', t._remote_path('a'))
1384
t = self.old_transport
1386
req = RedirectedRequest('GET', t.abspath('a'))
1387
req.follow_redirections = True
1385
1388
old_prefix = 'http://%s:%s' % (self.old_server.host,
1386
1389
self.old_server.port)
1387
1390
new_prefix = 'http://%s:%s' % (self.new_server.host,
1402
1405
def setUp(self):
1403
1406
super(TestDoCatchRedirections, self).setUp()
1404
1407
self.build_tree_contents([('a', '0123456789'),],)
1405
cleanup_http_redirection_connections(self)
1407
self.old_transport = self.get_old_transport()
1409
self.old_transport = self._transport(self.old_server.get_url())
1411
def get_a(self, transport):
1412
return transport.get('a')
1412
1414
def test_no_redirection(self):
1413
t = self.get_new_transport()
1415
t = self._transport(self.new_server.get_url())
1415
1417
# We use None for redirected so that we fail if redirected
1416
self.assertEqual('0123456789',
1417
transport.do_catching_redirections(
1418
self.assertEquals('0123456789',
1419
transport.do_catching_redirections(
1418
1420
self.get_a, t, None).read())
1420
1422
def test_one_redirection(self):
1421
1423
self.redirections = 0
1423
def redirected(t, exception, redirection_notice):
1425
def redirected(transport, exception, redirection_notice):
1424
1426
self.redirections += 1
1425
redirected_t = t._redirected_to(exception.source, exception.target)
1427
dir, file = urlutils.split(exception.target)
1428
return self._transport(dir)
1428
self.assertEqual('0123456789',
1429
transport.do_catching_redirections(
1430
self.assertEquals('0123456789',
1431
transport.do_catching_redirections(
1430
1432
self.get_a, self.old_transport, redirected).read())
1431
self.assertEqual(1, self.redirections)
1433
self.assertEquals(1, self.redirections)
1433
1435
def test_redirection_loop(self):
1459
1459
('b', 'contents of b\n'),])
1461
1461
def create_transport_readonly_server(self):
1462
server = self._auth_server(protocol_version=self._protocol_version)
1463
server._url_protocol = self._url_protocol
1462
if self._auth_scheme == 'basic':
1463
server = http_utils.HTTPBasicAuthServer(
1464
protocol_version=self._protocol_version)
1466
if self._auth_scheme != 'digest':
1467
raise AssertionError('Unknown auth scheme: %r'
1468
% self._auth_scheme)
1469
server = http_utils.HTTPDigestAuthServer(
1470
protocol_version=self._protocol_version)
1466
1473
def _testing_pycurl(self):
1467
# TODO: This is duplicated for lots of the classes in this file
1468
return (features.pycurl.available()
1469
and self._transport == PyCurlTransport)
1474
return pycurl_present and self._transport == PyCurlTransport
1471
1476
def get_user_url(self, user, password):
1472
1477
"""Build an url embedding user and password"""
1530
1534
self.server.add_user('joe', 'foo')
1531
1535
t = self.get_user_transport(None, None)
1532
1536
stdout = tests.StringIOWrapper()
1533
stderr = tests.StringIOWrapper()
1534
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1535
stdout=stdout, stderr=stderr)
1537
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
1536
1538
self.assertEqual('contents of a\n',t.get('a').read())
1537
1539
# stdin should be empty
1538
1540
self.assertEqual('', ui.ui_factory.stdin.readline())
1540
1542
expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1541
self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1542
self.assertEqual('', stdout.getvalue())
1543
self.assertEquals(expected_prompt, stdout.read(len(expected_prompt)))
1543
1544
self._check_password_prompt(t._unqualified_scheme, 'joe',
1546
1547
def test_prompt_for_password(self):
1547
1548
if self._testing_pycurl():
1552
1553
self.server.add_user('joe', 'foo')
1553
1554
t = self.get_user_transport('joe', None)
1554
1555
stdout = tests.StringIOWrapper()
1555
stderr = tests.StringIOWrapper()
1556
ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1557
stdout=stdout, stderr=stderr)
1558
self.assertEqual('contents of a\n', t.get('a').read())
1556
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1557
self.assertEqual('contents of a\n',t.get('a').read())
1559
1558
# stdin should be empty
1560
1559
self.assertEqual('', ui.ui_factory.stdin.readline())
1561
1560
self._check_password_prompt(t._unqualified_scheme, 'joe',
1563
self.assertEqual('', stdout.getvalue())
1564
1562
# And we shouldn't prompt again for a different request
1565
1563
# against the same transport.
1566
1564
self.assertEqual('contents of b\n',t.get('b').read())
1670
1667
('b-proxied', 'contents of b\n'),
1670
def create_transport_readonly_server(self):
1671
if self._auth_scheme == 'basic':
1672
server = http_utils.ProxyBasicAuthServer(
1673
protocol_version=self._protocol_version)
1675
if self._auth_scheme != 'digest':
1676
raise AssertionError('Unknown auth scheme: %r'
1677
% self._auth_scheme)
1678
server = http_utils.ProxyDigestAuthServer(
1679
protocol_version=self._protocol_version)
1673
1682
def get_user_transport(self, user, password):
1674
1683
self._install_env({'all_proxy': self.get_user_url(user, password)})
1675
return TestAuth.get_user_transport(self, user, password)
1684
return self._transport(self.server.get_url())
1677
1686
def _install_env(self, env):
1678
1687
for name, value in env.iteritems():
1721
1729
# We use the VFS layer as part of HTTP tunnelling tests.
1722
1730
self._captureVar('BZR_NO_SMART_VFS', None)
1723
1731
self.transport_readonly_server = http_utils.HTTPServerWithSmarts
1724
self.http_server = self.get_readonly_server()
1726
1733
def create_transport_readonly_server(self):
1727
server = http_utils.HTTPServerWithSmarts(
1734
return http_utils.HTTPServerWithSmarts(
1728
1735
protocol_version=self._protocol_version)
1729
server._url_protocol = self._url_protocol
1732
1737
def test_open_bzrdir(self):
1733
1738
branch = self.make_branch('relpath')
1734
url = self.http_server.get_url() + 'relpath'
1739
http_server = self.get_readonly_server()
1740
url = http_server.get_url() + 'relpath'
1735
1741
bd = bzrdir.BzrDir.open(url)
1736
self.addCleanup(bd.transport.disconnect)
1737
1742
self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1739
1744
def test_bulk_data(self):
1936
1943
# We override at class level because constructors may propagate the
1937
1944
# bound method and render instance overriding ineffective (an
1938
1945
# alternative would be to define a specific ui factory instead...)
1939
self.overrideAttr(self._transport, '_report_activity', report_activity)
1940
self.addCleanup(self.server.stop_server)
1946
self.orig_report_activity = self._transport._report_activity
1947
self._transport._report_activity = report_activity
1950
self._transport._report_activity = self.orig_report_activity
1951
self.server.tearDown()
1952
tests.TestCase.tearDown(self)
1942
1954
def get_transport(self):
1943
t = self._transport(self.server.get_url())
1944
# FIXME: Needs cleanup -- vila 20100611
1955
return self._transport(self.server.get_url())
1947
1957
def assertActivitiesMatch(self):
1948
1958
self.assertEqual(self.server.bytes_read,
2054
2064
t = self.get_transport()
2055
2065
# We must send a single line of body bytes, see
2056
# PredefinedRequestHandler._handle_one_request
2066
# PredefinedRequestHandler.handle_one_request
2057
2067
code, f = t._post('abc def end-of-body\n')
2058
2068
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2059
2069
self.assertActivitiesMatch()
2062
class TestActivity(tests.TestCase, TestActivityMixin):
2065
TestActivityMixin.setUp(self)
2068
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2070
# Unlike TestActivity, we are really testing ReportingFileSocket and
2071
# ReportingSocket, so we don't need all the parametrization. Since
2072
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2073
# test them through their use by the transport than directly (that's a
2074
# bit less clean but far more simpler and effective).
2075
_activity_server = ActivityHTTPServer
2076
_protocol_version = 'HTTP/1.1'
2079
self._transport =_urllib.HttpTransport_urllib
2080
TestActivityMixin.setUp(self)
2082
def assertActivitiesMatch(self):
2083
# Nothing to check here
2087
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2088
"""Test authentication on the redirected http server."""
2090
_auth_header = 'Authorization'
2091
_password_prompt_prefix = ''
2092
_username_prompt_prefix = ''
2093
_auth_server = http_utils.HTTPBasicAuthServer
2094
_transport = _urllib.HttpTransport_urllib
2097
super(TestAuthOnRedirected, self).setUp()
2098
self.build_tree_contents([('a','a'),
2100
('1/a', 'redirected once'),
2102
new_prefix = 'http://%s:%s' % (self.new_server.host,
2103
self.new_server.port)
2104
self.old_server.redirections = [
2105
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2106
self.old_transport = self.get_old_transport()
2107
self.new_server.add_user('joe', 'foo')
2108
cleanup_http_redirection_connections(self)
2110
def create_transport_readonly_server(self):
2111
server = self._auth_server(protocol_version=self._protocol_version)
2112
server._url_protocol = self._url_protocol
2118
def test_auth_on_redirected_via_do_catching_redirections(self):
2119
self.redirections = 0
2121
def redirected(t, exception, redirection_notice):
2122
self.redirections += 1
2123
redirected_t = t._redirected_to(exception.source, exception.target)
2124
self.addCleanup(redirected_t.disconnect)
2127
stdout = tests.StringIOWrapper()
2128
stderr = tests.StringIOWrapper()
2129
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2130
stdout=stdout, stderr=stderr)
2131
self.assertEqual('redirected once',
2132
transport.do_catching_redirections(
2133
self.get_a, self.old_transport, redirected).read())
2134
self.assertEqual(1, self.redirections)
2135
# stdin should be empty
2136
self.assertEqual('', ui.ui_factory.stdin.readline())
2137
# stdout should be empty, stderr will contains the prompts
2138
self.assertEqual('', stdout.getvalue())
2140
def test_auth_on_redirected_via_following_redirections(self):
2141
self.new_server.add_user('joe', 'foo')
2142
stdout = tests.StringIOWrapper()
2143
stderr = tests.StringIOWrapper()
2144
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2145
stdout=stdout, stderr=stderr)
2146
t = self.old_transport
2147
req = RedirectedRequest('GET', t.abspath('a'))
2148
new_prefix = 'http://%s:%s' % (self.new_server.host,
2149
self.new_server.port)
2150
self.old_server.redirections = [
2151
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2152
self.assertEqual('redirected once', t._perform(req).read())
2153
# stdin should be empty
2154
self.assertEqual('', ui.ui_factory.stdin.readline())
2155
# stdout should be empty, stderr will contains the prompts
2156
self.assertEqual('', stdout.getvalue())