53
from bzrlib.transport.http._pycurl import PyCurlTransport
55
except errors.DependencyNotPresent:
56
pycurl_present = False
59
class TransportAdapter(tests.TestScenarioApplier):
60
"""Generate the same test for each transport implementation."""
63
transport_scenarios = [
64
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
65
_server=http_server.HttpServer_urllib,
66
_qualified_prefix='http+urllib',)),
69
transport_scenarios.append(
70
('pycurl', dict(_transport=PyCurlTransport,
71
_server=http_server.HttpServer_PyCurl,
72
_qualified_prefix='http+pycurl',)))
73
self.scenarios = transport_scenarios
76
class TransportProtocolAdapter(TransportAdapter):
77
"""Generate the same test for each protocol implementation.
79
In addition to the transport adaptatation that we inherit from.
83
super(TransportProtocolAdapter, self).__init__()
84
protocol_scenarios = [
85
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
86
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
88
self.scenarios = tests.multiply_scenarios(self.scenarios,
92
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
93
"""Generate the same test for each authentication scheme implementation.
95
In addition to the protocol adaptatation that we inherit from.
99
super(TransportProtocolAuthenticationAdapter, self).__init__()
100
auth_scheme_scenarios = [
101
('basic', dict(_auth_scheme='basic')),
102
('digest', dict(_auth_scheme='digest')),
105
self.scenarios = tests.multiply_scenarios(self.scenarios,
106
auth_scheme_scenarios)
108
def load_tests(standard_tests, module, loader):
109
"""Multiply tests for http clients and protocol versions."""
110
# one for each transport
111
t_adapter = TransportAdapter()
112
t_classes= (TestHttpTransportRegistration,
113
TestHttpTransportUrls,
115
is_testing_for_transports = tests.condition_isinstance(t_classes)
117
# multiplied by one for each protocol version
118
tp_adapter = TransportProtocolAdapter()
119
tp_classes= (TestDoCatchRedirections,
121
TestHTTPRedirections,
122
TestHTTPSilentRedirections,
123
TestLimitedRangeRequestServer,
127
TestSpecificRequestHandler,
129
is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
131
# multiplied by one for each authentication scheme
132
tpa_adapter = TransportProtocolAuthenticationAdapter()
133
tpa_classes = (TestAuth,
135
is_also_testing_for_authentication = tests.condition_isinstance(tpa_classes)
137
result = loader.suiteClass()
138
for test in tests.iter_suite_tests(standard_tests):
139
# Each test is either standalone or testing for some combination of
140
# transport, protocol version, authentication scheme. Use the right
141
# adpater (or none) depending on the class.
142
if is_testing_for_transports(test):
143
result.addTests(t_adapter.adapt(test))
144
elif is_also_testing_for_protocols(test):
145
result.addTests(tp_adapter.adapt(test))
146
elif is_also_testing_for_authentication(test):
147
result.addTests(tpa_adapter.adapt(test))
149
result.addTests(test)
153
class TestHttpTransportUrls(tests.TestCase):
154
"""Test the http urls."""
156
def test_abs_url(self):
157
"""Construction of absolute http URLs"""
158
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
159
eq = self.assertEqualDiff
160
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
161
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
162
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
163
eq(t.abspath('.bzr/1//2/./3'),
164
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
166
def test_invalid_http_urls(self):
167
"""Trap invalid construction of urls"""
168
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
169
self.assertRaises(errors.InvalidURL,
171
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
173
def test_http_root_urls(self):
174
"""Construction of URLs from server root"""
175
t = self._transport('http://bzr.ozlabs.org/')
176
eq = self.assertEqualDiff
177
eq(t.abspath('.bzr/tree-version'),
178
'http://bzr.ozlabs.org/.bzr/tree-version')
180
def test_http_impl_urls(self):
181
"""There are servers which ask for particular clients to connect"""
182
server = self._server()
185
url = server.get_url()
186
self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
191
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
192
"""Test the http connections."""
195
http_utils.TestCaseWithWebserver.setUp(self)
196
self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
197
transport=self.get_transport())
199
def test_http_has(self):
200
server = self.get_readonly_server()
201
t = self._transport(server.get_url())
202
self.assertEqual(t.has('foo/bar'), True)
203
self.assertEqual(len(server.logs), 1)
204
self.assertContainsRe(server.logs[0],
205
r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
207
def test_http_has_not_found(self):
208
server = self.get_readonly_server()
209
t = self._transport(server.get_url())
210
self.assertEqual(t.has('not-found'), False)
211
self.assertContainsRe(server.logs[1],
212
r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
214
def test_http_get(self):
215
server = self.get_readonly_server()
216
t = self._transport(server.get_url())
217
fp = t.get('foo/bar')
218
self.assertEqualDiff(
220
'contents of foo/bar\n')
221
self.assertEqual(len(server.logs), 1)
222
self.assertTrue(server.logs[0].find(
223
'"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
224
% bzrlib.__version__) > -1)
226
def test_get_smart_medium(self):
227
# For HTTP, get_smart_medium should return the transport object.
228
server = self.get_readonly_server()
229
http_transport = self._transport(server.get_url())
230
medium = http_transport.get_smart_medium()
231
self.assertIs(medium, http_transport)
233
def test_has_on_bogus_host(self):
234
# Get a free address and don't 'accept' on it, so that we
235
# can be sure there is no http handler there, but set a
236
# reasonable timeout to not slow down tests too much.
237
default_timeout = socket.getdefaulttimeout()
239
socket.setdefaulttimeout(2)
241
s.bind(('localhost', 0))
242
t = self._transport('http://%s:%s/' % s.getsockname())
243
self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
245
socket.setdefaulttimeout(default_timeout)
248
class TestPost(tests.TestCase):
250
def test_post_body_is_received(self):
251
server = http_utils.RecordingServer(expect_body_tail='end-of-body')
253
self.addCleanup(server.tearDown)
254
scheme = self._qualified_prefix
255
url = '%s://%s:%s/' % (scheme, server.host, server.port)
256
http_transport = self._transport(url)
257
code, response = http_transport._post('abc def end-of-body')
259
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
260
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
261
# The transport should not be assuming that the server can accept
262
# chunked encoding the first time it connects, because HTTP/1.1, so we
263
# check for the literal string.
265
server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
268
class TestHttpTransportRegistration(tests.TestCase):
269
"""Test registrations of various http implementations"""
271
def test_http_registered(self):
272
t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
273
self.assertIsInstance(t, transport.Transport)
274
self.assertIsInstance(t, self._transport)
277
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
278
"""Tests a specific request handler.
281
Daughter class are expected to override _req_handler_class
284
# Provide a useful default
285
_req_handler_class = http_server.TestingHTTPRequestHandler
287
def create_transport_readonly_server(self):
288
return http_server.HttpServer(self._req_handler_class,
289
protocol_version=self._protocol_version)
292
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
293
"""Whatever request comes in, close the connection"""
295
def handle_one_request(self):
296
"""Handle a single HTTP request, by abruptly closing the connection"""
297
self.close_connection = 1
300
class TestWallServer(TestSpecificRequestHandler):
301
"""Tests exceptions during the connection phase"""
303
_req_handler_class = WallRequestHandler
305
def test_http_has(self):
306
server = self.get_readonly_server()
307
t = self._transport(server.get_url())
308
# Unfortunately httplib (see HTTPResponse._read_status
309
# for details) make no distinction between a closed
310
# socket and badly formatted status line, so we can't
311
# just test for ConnectionError, we have to test
312
# InvalidHttpResponse too.
313
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
316
def test_http_get(self):
317
server = self.get_readonly_server()
318
t = self._transport(server.get_url())
319
self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
323
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
324
"""Whatever request comes in, returns a bad status"""
326
def parse_request(self):
327
"""Fakes handling a single HTTP request, returns a bad status"""
328
ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
330
self.send_response(0, "Bad status")
332
except socket.error, e:
333
# We don't want to pollute the test results with
334
# spurious server errors while test succeed. In our
335
# case, it may occur that the test has already read
336
# the 'Bad Status' and closed the socket while we are
337
# still trying to send some headers... So the test is
338
# ok, but if we raise the exception, the output is
339
# dirty. So we don't raise, but we close the
340
# connection, just to be safe :)
341
spurious = [errno.EPIPE,
345
if (len(e.args) > 0) and (e.args[0] in spurious):
346
self.close_connection = 1
353
class TestBadStatusServer(TestSpecificRequestHandler):
354
"""Tests bad status from server."""
356
_req_handler_class = BadStatusRequestHandler
358
def test_http_has(self):
359
server = self.get_readonly_server()
360
t = self._transport(server.get_url())
361
self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
363
def test_http_get(self):
364
server = self.get_readonly_server()
365
t = self._transport(server.get_url())
366
self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
369
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
370
"""Whatever request comes in, returns am invalid status"""
372
def parse_request(self):
373
"""Fakes handling a single HTTP request, returns a bad status"""
374
ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
375
self.wfile.write("Invalid status line\r\n")
379
class TestInvalidStatusServer(TestBadStatusServer):
380
"""Tests invalid status from server.
382
Both implementations raises the same error as for a bad status.
385
_req_handler_class = InvalidStatusRequestHandler
388
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
389
"""Whatever request comes in, returns a bad protocol version"""
391
def parse_request(self):
392
"""Fakes handling a single HTTP request, returns a bad status"""
393
ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
394
# Returns an invalid protocol version, but curl just
395
# ignores it and those cannot be tested.
396
self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
398
'Look at my protocol version'))
402
class TestBadProtocolServer(TestSpecificRequestHandler):
403
"""Tests bad protocol from server."""
405
_req_handler_class = BadProtocolRequestHandler
408
if pycurl_present and self._transport == PyCurlTransport:
409
raise tests.TestNotApplicable(
410
"pycurl doesn't check the protocol version")
411
super(TestBadProtocolServer, self).setUp()
413
def test_http_has(self):
414
server = self.get_readonly_server()
415
t = self._transport(server.get_url())
416
self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
418
def test_http_get(self):
419
server = self.get_readonly_server()
420
t = self._transport(server.get_url())
421
self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
424
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
425
"""Whatever request comes in, returns a 403 code"""
427
def parse_request(self):
428
"""Handle a single HTTP request, by replying we cannot handle it"""
429
ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
434
class TestForbiddenServer(TestSpecificRequestHandler):
435
"""Tests forbidden server"""
437
_req_handler_class = ForbiddenRequestHandler
439
def test_http_has(self):
440
server = self.get_readonly_server()
441
t = self._transport(server.get_url())
442
self.assertRaises(errors.TransportError, t.has, 'foo/bar')
444
def test_http_get(self):
445
server = self.get_readonly_server()
446
t = self._transport(server.get_url())
447
self.assertRaises(errors.TransportError, t.get, 'foo/bar')
450
class TestRanges(http_utils.TestCaseWithWebserver):
451
"""Test the Range header in GET methods."""
454
http_utils.TestCaseWithWebserver.setUp(self)
455
self.build_tree_contents([('a', '0123456789')],)
456
server = self.get_readonly_server()
457
self.transport = self._transport(server.get_url())
459
def _file_contents(self, relpath, ranges):
460
offsets = [ (start, end - start + 1) for start, end in ranges]
461
coalesce = self.transport._coalesce_offsets
462
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
463
code, data = self.transport._get(relpath, coalesced)
464
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
465
for start, end in ranges:
467
yield data.read(end - start + 1)
469
def _file_tail(self, relpath, tail_amount):
470
code, data = self.transport._get(relpath, [], tail_amount)
471
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
472
data.seek(-tail_amount, 2)
473
return data.read(tail_amount)
475
def test_range_header(self):
477
map(self.assertEqual,['0', '234'],
478
list(self._file_contents('a', [(0,0), (2,4)])),)
480
def test_range_header_tail(self):
481
self.assertEqual('789', self._file_tail('a', 3))
483
def test_syntactically_invalid_range_header(self):
484
self.assertListRaises(errors.InvalidHttpRange,
485
self._file_contents, 'a', [(4, 3)])
487
def test_semantically_invalid_range_header(self):
488
self.assertListRaises(errors.InvalidHttpRange,
489
self._file_contents, 'a', [(42, 128)])
492
class TestRangeRequestServer(TestSpecificRequestHandler):
493
"""Tests readv requests against server.
495
We test against default "normal" server.
499
super(TestRangeRequestServer, self).setUp()
500
self.build_tree_contents([('a', '0123456789')],)
502
def test_readv(self):
503
server = self.get_readonly_server()
504
t = self._transport(server.get_url())
505
l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
506
self.assertEqual(l[0], (0, '0'))
507
self.assertEqual(l[1], (1, '1'))
508
self.assertEqual(l[2], (3, '34'))
509
self.assertEqual(l[3], (9, '9'))
511
def test_readv_out_of_order(self):
512
server = self.get_readonly_server()
513
t = self._transport(server.get_url())
514
l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
515
self.assertEqual(l[0], (1, '1'))
516
self.assertEqual(l[1], (9, '9'))
517
self.assertEqual(l[2], (0, '0'))
518
self.assertEqual(l[3], (3, '34'))
520
def test_readv_invalid_ranges(self):
521
server = self.get_readonly_server()
522
t = self._transport(server.get_url())
524
# This is intentionally reading off the end of the file
525
# since we are sure that it cannot get there
526
self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
527
t.readv, 'a', [(1,1), (8,10)])
529
# This is trying to seek past the end of the file, it should
530
# also raise a special error
531
self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
532
t.readv, 'a', [(12,2)])
534
def test_readv_multiple_get_requests(self):
535
server = self.get_readonly_server()
536
t = self._transport(server.get_url())
537
# force transport to issue multiple requests
538
t._max_readv_combine = 1
539
t._max_get_ranges = 1
540
l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
541
self.assertEqual(l[0], (0, '0'))
542
self.assertEqual(l[1], (1, '1'))
543
self.assertEqual(l[2], (3, '34'))
544
self.assertEqual(l[3], (9, '9'))
545
# The server should have issued 4 requests
546
self.assertEqual(4, server.GET_request_nb)
548
def test_readv_get_max_size(self):
549
server = self.get_readonly_server()
550
t = self._transport(server.get_url())
551
# force transport to issue multiple requests by limiting the number of
552
# bytes by request. Note that this apply to coalesced offsets only, a
553
# single range ill keep its size even if bigger than the limit.
555
l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
556
self.assertEqual(l[0], (0, '0'))
557
self.assertEqual(l[1], (1, '1'))
558
self.assertEqual(l[2], (2, '2345'))
559
self.assertEqual(l[3], (6, '6789'))
560
# The server should have issued 3 requests
561
self.assertEqual(3, server.GET_request_nb)
564
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
565
"""Always reply to range request as if they were single.
567
Don't be explicit about it, just to annoy the clients.
570
def get_multiple_ranges(self, file, file_size, ranges):
571
"""Answer as if it was a single range request and ignores the rest"""
572
(start, end) = ranges[0]
573
return self.get_single_range(file, file_size, start, end)
576
class TestSingleRangeRequestServer(TestRangeRequestServer):
577
"""Test readv against a server which accept only single range requests"""
579
_req_handler_class = SingleRangeRequestHandler
582
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
583
"""Only reply to simple range requests, errors out on multiple"""
585
def get_multiple_ranges(self, file, file_size, ranges):
586
"""Refuses the multiple ranges request"""
589
self.send_error(416, "Requested range not satisfiable")
591
(start, end) = ranges[0]
592
return self.get_single_range(file, file_size, start, end)
595
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
596
"""Test readv against a server which only accept single range requests"""
598
_req_handler_class = SingleOnlyRangeRequestHandler
601
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
602
"""Ignore range requests without notice"""
605
# Update the statistics
606
self.server.test_case_server.GET_request_nb += 1
607
# Just bypass the range handling done by TestingHTTPRequestHandler
608
return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
611
class TestNoRangeRequestServer(TestRangeRequestServer):
612
"""Test readv against a server which do not accept range requests"""
614
_req_handler_class = NoRangeRequestHandler
617
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
618
"""Errors out when range specifiers exceed the limit"""
620
def get_multiple_ranges(self, file, file_size, ranges):
621
"""Refuses the multiple ranges request"""
622
tcs = self.server.test_case_server
623
if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
625
# Emulate apache behavior
626
self.send_error(400, "Bad Request")
628
return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
629
self, file, file_size, ranges)
632
class LimitedRangeHTTPServer(http_server.HttpServer):
633
"""An HttpServer erroring out on requests with too much range specifiers"""
635
def __init__(self, request_handler=LimitedRangeRequestHandler,
636
protocol_version=None,
638
http_server.HttpServer.__init__(self, request_handler,
639
protocol_version=protocol_version)
640
self.range_limit = range_limit
643
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
644
"""Tests readv requests against a server erroring out on too much ranges."""
648
def create_transport_readonly_server(self):
649
# Requests with more range specifiers will error out
650
return LimitedRangeHTTPServer(range_limit=self.range_limit,
651
protocol_version=self._protocol_version)
653
def get_transport(self):
654
return self._transport(self.get_readonly_server().get_url())
657
http_utils.TestCaseWithWebserver.setUp(self)
658
# We need to manipulate ranges that correspond to real chunks in the
659
# response, so we build a content appropriately.
660
filler = ''.join(['abcdefghij' for x in range(102)])
661
content = ''.join(['%04d' % v + filler for v in range(16)])
662
self.build_tree_contents([('a', content)],)
664
def test_few_ranges(self):
665
t = self.get_transport()
666
l = list(t.readv('a', ((0, 4), (1024, 4), )))
667
self.assertEqual(l[0], (0, '0000'))
668
self.assertEqual(l[1], (1024, '0001'))
669
self.assertEqual(1, self.get_readonly_server().GET_request_nb)
671
def test_more_ranges(self):
672
t = self.get_transport()
673
l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
674
self.assertEqual(l[0], (0, '0000'))
675
self.assertEqual(l[1], (1024, '0001'))
676
self.assertEqual(l[2], (4096, '0004'))
677
self.assertEqual(l[3], (8192, '0008'))
678
# The server will refuse to serve the first request (too much ranges),
679
# a second request will succeeds.
680
self.assertEqual(2, self.get_readonly_server().GET_request_nb)
683
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
684
"""Tests proxy server.
686
Be aware that we do not setup a real proxy here. Instead, we
687
check that the *connection* goes through the proxy by serving
688
different content (the faked proxy server append '-proxied'
692
# FIXME: We don't have an https server available, so we don't
693
# test https connections.
696
super(TestProxyHttpServer, self).setUp()
697
self.build_tree_contents([('foo', 'contents of foo\n'),
698
('foo-proxied', 'proxied contents of foo\n')])
699
# Let's setup some attributes for tests
700
self.server = self.get_readonly_server()
701
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
702
if self._testing_pycurl():
703
# Oh my ! pycurl does not check for the port as part of
704
# no_proxy :-( So we just test the host part
705
self.no_proxy_host = 'localhost'
707
self.no_proxy_host = self.proxy_address
708
# The secondary server is the proxy
709
self.proxy = self.get_secondary_server()
710
self.proxy_url = self.proxy.get_url()
713
def _testing_pycurl(self):
714
return pycurl_present and self._transport == PyCurlTransport
716
def create_transport_secondary_server(self):
717
"""Creates an http server that will serve files with
718
'-proxied' appended to their names.
720
return http_utils.ProxyServer(protocol_version=self._protocol_version)
722
def _install_env(self, env):
723
for name, value in env.iteritems():
724
self._old_env[name] = osutils.set_or_unset_env(name, value)
726
def _restore_env(self):
727
for name, value in self._old_env.iteritems():
728
osutils.set_or_unset_env(name, value)
730
def proxied_in_env(self, env):
731
self._install_env(env)
732
url = self.server.get_url()
733
t = self._transport(url)
735
self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
739
def not_proxied_in_env(self, env):
740
self._install_env(env)
741
url = self.server.get_url()
742
t = self._transport(url)
744
self.assertEqual(t.get('foo').read(), 'contents of foo\n')
748
def test_http_proxy(self):
749
self.proxied_in_env({'http_proxy': self.proxy_url})
751
def test_HTTP_PROXY(self):
752
if self._testing_pycurl():
753
# pycurl does not check HTTP_PROXY for security reasons
754
# (for use in a CGI context that we do not care
755
# about. Should we ?)
756
raise tests.TestNotApplicable(
757
'pycurl does not check HTTP_PROXY for security reasons')
758
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
760
def test_all_proxy(self):
761
self.proxied_in_env({'all_proxy': self.proxy_url})
763
def test_ALL_PROXY(self):
764
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
766
def test_http_proxy_with_no_proxy(self):
767
self.not_proxied_in_env({'http_proxy': self.proxy_url,
768
'no_proxy': self.no_proxy_host})
770
def test_HTTP_PROXY_with_NO_PROXY(self):
771
if self._testing_pycurl():
772
raise tests.TestNotApplicable(
773
'pycurl does not check HTTP_PROXY for security reasons')
774
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
775
'NO_PROXY': self.no_proxy_host})
777
def test_all_proxy_with_no_proxy(self):
778
self.not_proxied_in_env({'all_proxy': self.proxy_url,
779
'no_proxy': self.no_proxy_host})
781
def test_ALL_PROXY_with_NO_PROXY(self):
782
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
783
'NO_PROXY': self.no_proxy_host})
785
def test_http_proxy_without_scheme(self):
786
if self._testing_pycurl():
787
# pycurl *ignores* invalid proxy env variables. If that ever change
788
# in the future, this test will fail indicating that pycurl do not
789
# ignore anymore such variables.
790
self.not_proxied_in_env({'http_proxy': self.proxy_address})
792
self.assertRaises(errors.InvalidURL,
794
{'http_proxy': self.proxy_address})
797
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
798
"""Test redirection between http servers."""
800
def create_transport_secondary_server(self):
801
"""Create the secondary server redirecting to the primary server"""
802
new = self.get_readonly_server()
804
redirecting = http_utils.HTTPServerRedirecting(
805
protocol_version=self._protocol_version)
806
redirecting.redirect_to(new.host, new.port)
810
super(TestHTTPRedirections, self).setUp()
811
self.build_tree_contents([('a', '0123456789'),
813
'# Bazaar revision bundle v0.9\n#\n')
816
self.old_transport = self._transport(self.old_server.get_url())
818
def test_redirected(self):
819
self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
820
t = self._transport(self.new_server.get_url())
821
self.assertEqual('0123456789', t.get('a').read())
823
def test_read_redirected_bundle_from_url(self):
824
from bzrlib.bundle import read_bundle_from_url
825
url = self.old_transport.abspath('bundle')
826
bundle = read_bundle_from_url(url)
827
# If read_bundle_from_url was successful we get an empty bundle
828
self.assertEqual([], bundle.revisions)
831
class RedirectedRequest(_urllib2_wrappers.Request):
832
"""Request following redirections. """
834
init_orig = _urllib2_wrappers.Request.__init__
836
def __init__(self, method, url, *args, **kwargs):
840
# Since the tests using this class will replace
841
# _urllib2_wrappers.Request, we can't just call the base class __init__
843
RedirectedRequest.init_orig(self, method, url, args, kwargs)
844
self.follow_redirections = True
847
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
848
"""Test redirections.
850
http implementations do not redirect silently anymore (they
851
do not redirect at all in fact). The mechanism is still in
852
place at the _urllib2_wrappers.Request level and these tests
855
For the pycurl implementation
856
the redirection have been deleted as we may deprecate pycurl
857
and I have no place to keep a working implementation.
862
if pycurl_present and self._transport == PyCurlTransport:
863
raise tests.TestNotApplicable(
864
"pycurl doesn't redirect silently annymore")
865
super(TestHTTPSilentRedirections, self).setUp()
866
self.setup_redirected_request()
867
self.addCleanup(self.cleanup_redirected_request)
868
self.build_tree_contents([('a','a'),
870
('1/a', 'redirected once'),
872
('2/a', 'redirected twice'),
874
('3/a', 'redirected thrice'),
876
('4/a', 'redirected 4 times'),
878
('5/a', 'redirected 5 times'),
881
self.old_transport = self._transport(self.old_server.get_url())
883
def setup_redirected_request(self):
884
self.original_class = _urllib2_wrappers.Request
885
_urllib2_wrappers.Request = RedirectedRequest
887
def cleanup_redirected_request(self):
888
_urllib2_wrappers.Request = self.original_class
890
def create_transport_secondary_server(self):
891
"""Create the secondary server, redirections are defined in the tests"""
892
return http_utils.HTTPServerRedirecting(
893
protocol_version=self._protocol_version)
895
def test_one_redirection(self):
896
t = self.old_transport
898
req = RedirectedRequest('GET', t.abspath('a'))
899
req.follow_redirections = True
900
new_prefix = 'http://%s:%s' % (self.new_server.host,
901
self.new_server.port)
902
self.old_server.redirections = \
903
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
904
self.assertEquals('redirected once',t._perform(req).read())
906
def test_five_redirections(self):
907
t = self.old_transport
909
req = RedirectedRequest('GET', t.abspath('a'))
910
req.follow_redirections = True
911
old_prefix = 'http://%s:%s' % (self.old_server.host,
912
self.old_server.port)
913
new_prefix = 'http://%s:%s' % (self.new_server.host,
914
self.new_server.port)
915
self.old_server.redirections = \
916
[('/1(.*)', r'%s/2\1' % (old_prefix), 302),
917
('/2(.*)', r'%s/3\1' % (old_prefix), 303),
918
('/3(.*)', r'%s/4\1' % (old_prefix), 307),
919
('/4(.*)', r'%s/5\1' % (new_prefix), 301),
920
('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
922
self.assertEquals('redirected 5 times',t._perform(req).read())
925
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
926
"""Test transport.do_catching_redirections."""
929
super(TestDoCatchRedirections, self).setUp()
930
self.build_tree_contents([('a', '0123456789'),],)
932
self.old_transport = self._transport(self.old_server.get_url())
934
def get_a(self, transport):
935
return transport.get('a')
937
def test_no_redirection(self):
938
t = self._transport(self.new_server.get_url())
940
# We use None for redirected so that we fail if redirected
941
self.assertEquals('0123456789',
942
transport.do_catching_redirections(
943
self.get_a, t, None).read())
945
def test_one_redirection(self):
946
self.redirections = 0
948
def redirected(transport, exception, redirection_notice):
949
self.redirections += 1
950
dir, file = urlutils.split(exception.target)
951
return self._transport(dir)
953
self.assertEquals('0123456789',
954
transport.do_catching_redirections(
955
self.get_a, self.old_transport, redirected).read())
956
self.assertEquals(1, self.redirections)
958
def test_redirection_loop(self):
960
def redirected(transport, exception, redirection_notice):
961
# By using the redirected url as a base dir for the
962
# *old* transport, we create a loop: a => a/a =>
964
return self.old_transport.clone(exception.target)
966
self.assertRaises(errors.TooManyRedirections,
967
transport.do_catching_redirections,
968
self.get_a, self.old_transport, redirected)
971
class TestAuth(http_utils.TestCaseWithWebserver):
972
"""Test authentication scheme"""
974
_auth_header = 'Authorization'
975
_password_prompt_prefix = ''
978
super(TestAuth, self).setUp()
979
self.server = self.get_readonly_server()
980
self.build_tree_contents([('a', 'contents of a\n'),
981
('b', 'contents of b\n'),])
983
def create_transport_readonly_server(self):
984
if self._auth_scheme == 'basic':
985
server = http_utils.HTTPBasicAuthServer(
986
protocol_version=self._protocol_version)
988
if self._auth_scheme != 'digest':
989
raise AssertionError('Unknown auth scheme: %r'
991
server = http_utils.HTTPDigestAuthServer(
992
protocol_version=self._protocol_version)
995
def _testing_pycurl(self):
996
return pycurl_present and self._transport == PyCurlTransport
998
def get_user_url(self, user=None, password=None):
999
"""Build an url embedding user and password"""
1000
url = '%s://' % self.server._url_protocol
1001
if user is not None:
1003
if password is not None:
1004
url += ':' + password
1006
url += '%s:%s/' % (self.server.host, self.server.port)
1009
def get_user_transport(self, user=None, password=None):
1010
return self._transport(self.get_user_url(user, password))
1012
def test_no_user(self):
1013
self.server.add_user('joe', 'foo')
1014
t = self.get_user_transport()
1015
self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1016
# Only one 'Authentication Required' error should occur
1017
self.assertEqual(1, self.server.auth_required_errors)
1019
def test_empty_pass(self):
1020
self.server.add_user('joe', '')
1021
t = self.get_user_transport('joe', '')
1022
self.assertEqual('contents of a\n', t.get('a').read())
1023
# Only one 'Authentication Required' error should occur
1024
self.assertEqual(1, self.server.auth_required_errors)
1026
def test_user_pass(self):
1027
self.server.add_user('joe', 'foo')
1028
t = self.get_user_transport('joe', 'foo')
1029
self.assertEqual('contents of a\n', t.get('a').read())
1030
# Only one 'Authentication Required' error should occur
1031
self.assertEqual(1, self.server.auth_required_errors)
1033
def test_unknown_user(self):
1034
self.server.add_user('joe', 'foo')
1035
t = self.get_user_transport('bill', 'foo')
1036
self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1037
# Two 'Authentication Required' errors should occur (the
1038
# initial 'who are you' and 'I don't know you, who are
1040
self.assertEqual(2, self.server.auth_required_errors)
1042
def test_wrong_pass(self):
1043
self.server.add_user('joe', 'foo')
1044
t = self.get_user_transport('joe', 'bar')
1045
self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1046
# Two 'Authentication Required' errors should occur (the
1047
# initial 'who are you' and 'this is not you, who are you')
1048
self.assertEqual(2, self.server.auth_required_errors)
1050
def test_prompt_for_password(self):
1051
if self._testing_pycurl():
1052
raise tests.TestNotApplicable(
1053
'pycurl cannot prompt, it handles auth by embedding'
1054
' user:pass in urls only')
1056
self.server.add_user('joe', 'foo')
1057
t = self.get_user_transport('joe', None)
1058
stdout = tests.StringIOWrapper()
1059
ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1060
self.assertEqual('contents of a\n',t.get('a').read())
1061
# stdin should be empty
1062
self.assertEqual('', ui.ui_factory.stdin.readline())
1063
self._check_password_prompt(t._unqualified_scheme, 'joe',
1065
# And we shouldn't prompt again for a different request
1066
# against the same transport.
1067
self.assertEqual('contents of b\n',t.get('b').read())
1069
# And neither against a clone
1070
self.assertEqual('contents of b\n',t2.get('b').read())
1071
# Only one 'Authentication Required' error should occur
1072
self.assertEqual(1, self.server.auth_required_errors)
1074
def _check_password_prompt(self, scheme, user, actual_prompt):
1075
expected_prompt = (self._password_prompt_prefix
1076
+ ("%s %s@%s:%d, Realm: '%s' password: "
1078
user, self.server.host, self.server.port,
1079
self.server.auth_realm)))
1080
self.assertEquals(expected_prompt, actual_prompt)
1082
def test_no_prompt_for_password_when_using_auth_config(self):
1083
if self._testing_pycurl():
1084
raise tests.TestNotApplicable(
1085
'pycurl does not support authentication.conf'
1086
' since it cannot prompt')
1090
stdin_content = 'bar\n' # Not the right password
1091
self.server.add_user(user, password)
1092
t = self.get_user_transport(user, None)
1093
ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1094
stdout=tests.StringIOWrapper())
1095
# Create a minimal config file with the right password
1096
conf = config.AuthenticationConfig()
1097
conf._get_config().update(
1098
{'httptest': {'scheme': 'http', 'port': self.server.port,
1099
'user': user, 'password': password}})
1101
# Issue a request to the server to connect
1102
self.assertEqual('contents of a\n',t.get('a').read())
1103
# stdin should have been left untouched
1104
self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
1105
# Only one 'Authentication Required' error should occur
1106
self.assertEqual(1, self.server.auth_required_errors)
1110
class TestProxyAuth(TestAuth):
1111
"""Test proxy authentication schemes."""
1113
_auth_header = 'Proxy-authorization'
1114
_password_prompt_prefix='Proxy '
1117
super(TestProxyAuth, self).setUp()
1119
self.addCleanup(self._restore_env)
1120
# Override the contents to avoid false positives
1121
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1122
('b', 'not proxied contents of b\n'),
1123
('a-proxied', 'contents of a\n'),
1124
('b-proxied', 'contents of b\n'),
1127
def create_transport_readonly_server(self):
1128
if self._auth_scheme == 'basic':
1129
server = http_utils.ProxyBasicAuthServer(
1130
protocol_version=self._protocol_version)
1132
if self._auth_scheme != 'digest':
1133
raise AssertionError('Unknown auth scheme: %r'
1134
% self._auth_scheme)
1135
server = http_utils.ProxyDigestAuthServer(
1136
protocol_version=self._protocol_version)
1139
def get_user_transport(self, user=None, password=None):
1140
self._install_env({'all_proxy': self.get_user_url(user, password)})
1141
return self._transport(self.server.get_url())
1143
def _install_env(self, env):
1144
for name, value in env.iteritems():
1145
self._old_env[name] = osutils.set_or_unset_env(name, value)
1147
def _restore_env(self):
1148
for name, value in self._old_env.iteritems():
1149
osutils.set_or_unset_env(name, value)
1151
def test_empty_pass(self):
1152
if self._testing_pycurl():
1154
if pycurl.version_info()[1] < '7.16.0':
1155
raise tests.KnownFailure(
1156
'pycurl < 7.16.0 does not handle empty proxy passwords')
1157
super(TestProxyAuth, self).test_empty_pass()