1
# Copyright (C) 2005-2012, 2015, 2016 Canonical Ltd
1
# Copyright (C) 2005-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
288
288
self.req_handler = RequestHandler(None, None, None)
290
290
def assertRanges(self, ranges, header, file_size):
291
self.assertEqual(ranges,
291
self.assertEquals(ranges,
292
292
self.req_handler._parse_ranges(header, file_size))
294
294
def test_simple_range(self):
384
384
_transport = property(_get_pycurl_maybe)
387
class TestHttpUrls(tests.TestCase):
389
# TODO: This should be moved to authorization tests once they
392
def test_url_parsing(self):
394
url = http.extract_auth('http://example.com', f)
395
self.assertEqual('http://example.com', url)
396
self.assertEqual(0, len(f.credentials))
397
url = http.extract_auth(
398
'http://user:pass@example.com/bzr/bzr.dev', f)
399
self.assertEqual('http://example.com/bzr/bzr.dev', url)
400
self.assertEqual(1, len(f.credentials))
401
self.assertEqual([None, 'example.com', 'user', 'pass'],
387
405
class TestHttpTransportUrls(tests.TestCase):
388
406
"""Test the http urls."""
472
super(TestHTTPConnections, self).setUp()
490
http_utils.TestCaseWithWebserver.setUp(self)
473
491
self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
474
492
transport=self.get_transport())
657
675
_req_handler_class = BadStatusRequestHandler
660
super(TestBadStatusServer, self).setUp()
661
# See https://bugs.launchpad.net/bzr/+bug/1451448 for details.
662
# TD;LR: Running both a TCP client and server in the same process and
663
# thread uncovers a race in python. The fix is to run the server in a
664
# different process. Trying to fix yet another race here is not worth
665
# the effort. -- vila 2015-09-06
666
if 'HTTP/1.0' in self.id():
667
raise tests.TestSkipped(
668
'Client/Server in the same process and thread can hang')
670
677
def test_http_has(self):
671
678
t = self.get_readonly_transport()
672
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
673
errors.InvalidHttpResponse),
679
self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
676
681
def test_http_get(self):
677
682
t = self.get_readonly_transport()
678
self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
679
errors.InvalidHttpResponse),
683
self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
683
686
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
1160
1163
protocol_version=self._protocol_version)
1162
1165
def setUp(self):
1163
super(TestLimitedRangeRequestServer, self).setUp()
1166
http_utils.TestCaseWithWebserver.setUp(self)
1164
1167
# We need to manipulate ranges that correspond to real chunks in the
1165
1168
# response, so we build a content appropriately.
1166
1169
filler = ''.join(['abcdefghij' for x in range(102)])
1201
1204
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1202
1205
handler = _urllib2_wrappers.ProxyHandler()
1203
self.assertEqual(expected,
1206
self.assertEquals(expected,
1204
1207
handler.evaluate_proxy_bypass(host, no_proxy))
1206
1209
def test_empty_user(self):
1354
1357
def setUp(self):
1355
super(TestRanges, self).setUp()
1358
http_utils.TestCaseWithWebserver.setUp(self)
1356
1359
self.build_tree_contents([('a', '0123456789')],)
1358
1361
def create_transport_readonly_server(self):
1602
self.assertEqual((user, password), got_pass)
1605
self.assertEquals((user, password), got_pass)
1605
1608
class TestAuth(http_utils.TestCaseWithWebserver):
1908
1911
server._url_protocol = self._url_protocol
1911
def test_open_controldir(self):
1914
def test_open_bzrdir(self):
1912
1915
branch = self.make_branch('relpath')
1913
1916
url = self.http_server.get_url() + 'relpath'
1914
bd = controldir.ControlDir.open(url)
1917
bd = bzrdir.BzrDir.open(url)
1915
1918
self.addCleanup(bd.transport.disconnect)
1916
1919
self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
2000
2003
self.assertIsInstance(r, type(t))
2001
2004
# Both transports share the some connection
2002
2005
self.assertEqual(t._get_connection(), r._get_connection())
2003
self.assertEqual('http://www.example.com/foo/subdir/', r.base)
2006
self.assertEquals('http://www.example.com/foo/subdir/', r.base)
2005
2008
def test_redirected_to_self_with_slash(self):
2006
2009
t = self._transport('http://www.example.com/foo')
2017
2020
r = t._redirected_to('http://www.example.com/foo',
2018
2021
'http://foo.example.com/foo/subdir')
2019
2022
self.assertIsInstance(r, type(t))
2020
self.assertEqual('http://foo.example.com/foo/subdir/',
2023
self.assertEquals('http://foo.example.com/foo/subdir/',
2021
2024
r.external_url())
2023
2026
def test_redirected_to_same_host_sibling_protocol(self):
2025
2028
r = t._redirected_to('http://www.example.com/foo',
2026
2029
'https://www.example.com/foo')
2027
2030
self.assertIsInstance(r, type(t))
2028
self.assertEqual('https://www.example.com/foo/',
2031
self.assertEquals('https://www.example.com/foo/',
2029
2032
r.external_url())
2031
2034
def test_redirected_to_same_host_different_protocol(self):
2032
2035
t = self._transport('http://www.example.com/foo')
2033
2036
r = t._redirected_to('http://www.example.com/foo',
2034
2037
'ftp://www.example.com/foo')
2035
self.assertNotEqual(type(r), type(t))
2036
self.assertEqual('ftp://www.example.com/foo/', r.external_url())
2038
self.assertNotEquals(type(r), type(t))
2039
self.assertEquals('ftp://www.example.com/foo/', r.external_url())
2038
2041
def test_redirected_to_same_host_specific_implementation(self):
2039
2042
t = self._transport('http://www.example.com/foo')
2040
2043
r = t._redirected_to('http://www.example.com/foo',
2041
2044
'https+urllib://www.example.com/foo')
2042
self.assertEqual('https://www.example.com/foo/', r.external_url())
2045
self.assertEquals('https://www.example.com/foo/', r.external_url())
2044
2047
def test_redirected_to_different_host_same_user(self):
2045
2048
t = self._transport('http://joe@www.example.com/foo')
2047
2050
'https://foo.example.com/foo')
2048
2051
self.assertIsInstance(r, type(t))
2049
2052
self.assertEqual(t._parsed_url.user, r._parsed_url.user)
2050
self.assertEqual('https://joe@foo.example.com/foo/', r.external_url())
2053
self.assertEquals('https://joe@foo.example.com/foo/', r.external_url())
2053
2056
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
2277
2280
_protocol_version = 'HTTP/1.1'
2279
2282
def setUp(self):
2280
super(TestNoReportActivity, self).setUp()
2281
2283
self._transport =_urllib.HttpTransport_urllib
2282
2284
TestActivityMixin.setUp(self)