128
128
('urllib,http', dict(_activity_server=ActivityHTTPServer,
129
129
_transport=_urllib.HttpTransport_urllib,)),
131
if tests.HTTPSServerFeature.available():
131
if features.HTTPSServerFeature.available():
132
132
activity_scenarios.append(
133
133
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
134
134
_transport=_urllib.HttpTransport_urllib,)),)
136
136
activity_scenarios.append(
137
137
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
138
138
_transport=PyCurlTransport,)),)
139
if tests.HTTPSServerFeature.available():
139
if features.HTTPSServerFeature.available():
140
140
from bzrlib.tests import (
533
533
scenarios = vary_by_http_client_implementation()
535
535
def test_http_registered(self):
536
t = transport.get_transport('%s://foo.com/' % self._url_protocol)
536
t = transport.get_transport_from_url(
537
'%s://foo.com/' % self._url_protocol)
537
538
self.assertIsInstance(t, transport.Transport)
538
539
self.assertIsInstance(t, self._transport)
551
552
self.start_server(server)
552
553
url = server.get_url()
553
554
# FIXME: needs a cleanup -- vila 20100611
554
http_transport = transport.get_transport(url)
555
http_transport = transport.get_transport_from_url(url)
555
556
code, response = http_transport._post('abc def end-of-body')
557
558
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
1637
1638
def get_user_transport(self, user, password):
1638
t = transport.get_transport(self.get_user_url(user, password))
1639
t = transport.get_transport_from_url(
1640
self.get_user_url(user, password))
1641
1643
def test_no_user(self):
1767
1769
http_utils.ProxyDigestAuthServer):
1768
1770
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1769
1771
if self._testing_pycurl():
1770
raise tests.KnownFailure(
1771
1773
'pycurl does not handle a nonce change')
1772
1774
self.server.add_user('joe', 'foo')
1773
1775
t = self.get_user_transport('joe', 'foo')
1855
1857
if self._testing_pycurl():
1857
1859
if pycurl.version_info()[1] < '7.16.0':
1858
raise tests.KnownFailure(
1859
1861
'pycurl < 7.16.0 does not handle empty proxy passwords')
1860
1862
super(TestProxyAuth, self).test_empty_pass()
1915
1917
# The 'readv' command in the smart protocol both sends and receives
1916
1918
# bulk data, so we use that.
1917
1919
self.build_tree(['data-file'])
1918
http_transport = transport.get_transport(self.http_server.get_url())
1920
http_transport = transport.get_transport_from_url(
1921
self.http_server.get_url())
1919
1922
medium = http_transport.get_smart_medium()
1920
1923
# Since we provide the medium, the url below will be mostly ignored
1921
1924
# during the test, as long as the path is '/'.
1929
1932
post_body = 'hello\n'
1930
1933
expected_reply_body = 'ok\x012\n'
1932
http_transport = transport.get_transport(self.http_server.get_url())
1935
http_transport = transport.get_transport_from_url(
1936
self.http_server.get_url())
1933
1937
medium = http_transport.get_smart_medium()
1934
1938
response = medium.send_http_smart_request(post_body)
1935
1939
reply_body = response.read()
1993
1997
self.assertIsInstance(r, type(t))
1994
1998
# Both transports share the some connection
1995
1999
self.assertEqual(t._get_connection(), r._get_connection())
2000
self.assertEquals('http://www.example.com/foo/subdir/', r.base)
1997
2002
def test_redirected_to_self_with_slash(self):
1998
2003
t = self._transport('http://www.example.com/foo')
2009
2014
r = t._redirected_to('http://www.example.com/foo',
2010
2015
'http://foo.example.com/foo/subdir')
2011
2016
self.assertIsInstance(r, type(t))
2017
self.assertEquals('http://foo.example.com/foo/subdir/',
2013
2020
def test_redirected_to_same_host_sibling_protocol(self):
2014
2021
t = self._transport('http://www.example.com/foo')
2015
2022
r = t._redirected_to('http://www.example.com/foo',
2016
2023
'https://www.example.com/foo')
2017
2024
self.assertIsInstance(r, type(t))
2025
self.assertEquals('https://www.example.com/foo/',
2019
2028
def test_redirected_to_same_host_different_protocol(self):
2020
2029
t = self._transport('http://www.example.com/foo')
2021
2030
r = t._redirected_to('http://www.example.com/foo',
2022
2031
'ftp://www.example.com/foo')
2023
2032
self.assertNotEquals(type(r), type(t))
2033
self.assertEquals('ftp://www.example.com/foo/', r.external_url())
2035
def test_redirected_to_same_host_specific_implementation(self):
2036
t = self._transport('http://www.example.com/foo')
2037
r = t._redirected_to('http://www.example.com/foo',
2038
'https+urllib://www.example.com/foo')
2039
self.assertEquals('https://www.example.com/foo/', r.external_url())
2025
2041
def test_redirected_to_different_host_same_user(self):
2026
2042
t = self._transport('http://joe@www.example.com/foo')
2027
2043
r = t._redirected_to('http://www.example.com/foo',
2028
2044
'https://foo.example.com/foo')
2029
2045
self.assertIsInstance(r, type(t))
2030
self.assertEqual(t._user, r._user)
2046
self.assertEqual(t._parsed_url.user, r._parsed_url.user)
2047
self.assertEquals('https://joe@foo.example.com/foo/', r.external_url())
2033
2050
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
2089
if tests.HTTPSServerFeature.available():
2106
if features.HTTPSServerFeature.available():
2090
2107
from bzrlib.tests import https_server
2091
2108
class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):