~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

(gz) Remove bzrlib/util/elementtree/ package (Martin Packman)

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
    bzrdir,
35
35
    cethread,
36
36
    config,
 
37
    debug,
37
38
    errors,
38
39
    osutils,
39
40
    remote as _mod_remote,
40
41
    tests,
 
42
    trace,
41
43
    transport,
42
44
    ui,
43
45
    )
91
93
        ]
92
94
 
93
95
 
94
 
def vary_by_http_proxy_auth_scheme():
95
 
    return [
96
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
97
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
98
 
        ('basicdigest',
99
 
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
100
 
        ]
101
 
 
102
 
 
103
96
def vary_by_http_auth_scheme():
104
 
    return [
 
97
    scenarios = [
105
98
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
106
99
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
107
100
        ('basicdigest',
108
101
            dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
109
102
        ]
 
103
    # Add some attributes common to all scenarios
 
104
    for scenario_id, scenario_dict in scenarios:
 
105
        scenario_dict.update(_auth_header='Authorization',
 
106
                             _username_prompt_prefix='',
 
107
                             _password_prompt_prefix='')
 
108
    return scenarios
 
109
 
 
110
 
 
111
def vary_by_http_proxy_auth_scheme():
 
112
    scenarios = [
 
113
        ('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
114
        ('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
115
        ('proxy-basicdigest',
 
116
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
117
        ]
 
118
    # Add some attributes common to all scenarios
 
119
    for scenario_id, scenario_dict in scenarios:
 
120
        scenario_dict.update(_auth_header='Proxy-Authorization',
 
121
                             _username_prompt_prefix='Proxy ',
 
122
                             _password_prompt_prefix='Proxy ')
 
123
    return scenarios
110
124
 
111
125
 
112
126
def vary_by_http_activity():
114
128
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
115
129
                            _transport=_urllib.HttpTransport_urllib,)),
116
130
        ]
117
 
    if tests.HTTPSServerFeature.available():
 
131
    if features.HTTPSServerFeature.available():
118
132
        activity_scenarios.append(
119
133
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
120
134
                                _transport=_urllib.HttpTransport_urllib,)),)
122
136
        activity_scenarios.append(
123
137
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
124
138
                                _transport=PyCurlTransport,)),)
125
 
        if tests.HTTPSServerFeature.available():
 
139
        if features.HTTPSServerFeature.available():
126
140
            from bzrlib.tests import (
127
141
                ssl_certs,
128
142
                )
519
533
    scenarios = vary_by_http_client_implementation()
520
534
 
521
535
    def test_http_registered(self):
522
 
        t = transport.get_transport('%s://foo.com/' % self._url_protocol)
 
536
        t = transport.get_transport_from_url(
 
537
            '%s://foo.com/' % self._url_protocol)
523
538
        self.assertIsInstance(t, transport.Transport)
524
539
        self.assertIsInstance(t, self._transport)
525
540
 
537
552
        self.start_server(server)
538
553
        url = server.get_url()
539
554
        # FIXME: needs a cleanup -- vila 20100611
540
 
        http_transport = transport.get_transport(url)
 
555
        http_transport = transport.get_transport_from_url(url)
541
556
        code, response = http_transport._post('abc def end-of-body')
542
557
        self.assertTrue(
543
558
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
1034
1049
        self.assertEqual('single', t._range_hint)
1035
1050
 
1036
1051
 
 
1052
class TruncatedBeforeBoundaryRequestHandler(
 
1053
    http_server.TestingHTTPRequestHandler):
 
1054
    """Truncation before a boundary, like in bug 198646"""
 
1055
 
 
1056
    _truncated_ranges = 1
 
1057
 
 
1058
    def get_multiple_ranges(self, file, file_size, ranges):
 
1059
        self.send_response(206)
 
1060
        self.send_header('Accept-Ranges', 'bytes')
 
1061
        boundary = 'tagada'
 
1062
        self.send_header('Content-Type',
 
1063
                         'multipart/byteranges; boundary=%s' % boundary)
 
1064
        boundary_line = '--%s\r\n' % boundary
 
1065
        # Calculate the Content-Length
 
1066
        content_length = 0
 
1067
        for (start, end) in ranges:
 
1068
            content_length += len(boundary_line)
 
1069
            content_length += self._header_line_length(
 
1070
                'Content-type', 'application/octet-stream')
 
1071
            content_length += self._header_line_length(
 
1072
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
 
1073
            content_length += len('\r\n') # end headers
 
1074
            content_length += end - start # + 1
 
1075
        content_length += len(boundary_line)
 
1076
        self.send_header('Content-length', content_length)
 
1077
        self.end_headers()
 
1078
 
 
1079
        # Send the multipart body
 
1080
        cur = 0
 
1081
        for (start, end) in ranges:
 
1082
            if cur + self._truncated_ranges >= len(ranges):
 
1083
                # Abruptly ends the response and close the connection
 
1084
                self.close_connection = 1
 
1085
                return
 
1086
            self.wfile.write(boundary_line)
 
1087
            self.send_header('Content-type', 'application/octet-stream')
 
1088
            self.send_header('Content-Range', 'bytes %d-%d/%d'
 
1089
                             % (start, end, file_size))
 
1090
            self.end_headers()
 
1091
            self.send_range_content(file, start, end - start + 1)
 
1092
            cur += 1
 
1093
        # Final boundary
 
1094
        self.wfile.write(boundary_line)
 
1095
 
 
1096
 
 
1097
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
 
1098
    """Tests the case of bug 198646, disconnecting before a boundary."""
 
1099
 
 
1100
    _req_handler_class = TruncatedBeforeBoundaryRequestHandler
 
1101
 
 
1102
    def setUp(self):
 
1103
        super(TestTruncatedBeforeBoundary, self).setUp()
 
1104
        self.build_tree_contents([('a', '0123456789')],)
 
1105
 
 
1106
    def test_readv_with_short_reads(self):
 
1107
        server = self.get_readonly_server()
 
1108
        t = self.get_readonly_transport()
 
1109
        # Force separate ranges for each offset
 
1110
        t._bytes_to_read_before_seek = 0
 
1111
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
 
1112
        self.assertEqual((0, '0'), ireadv.next())
 
1113
        self.assertEqual((2, '2'), ireadv.next())
 
1114
        self.assertEqual((4, '45'), ireadv.next())
 
1115
        self.assertEqual((9, '9'), ireadv.next())
 
1116
 
 
1117
 
1037
1118
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1038
1119
    """Errors out when range specifiers exceed the limit"""
1039
1120
 
1489
1570
                          self.get_a, self.old_transport, redirected)
1490
1571
 
1491
1572
 
 
1573
def _setup_authentication_config(**kwargs):
 
1574
    conf = config.AuthenticationConfig()
 
1575
    conf._get_config().update({'httptest': kwargs})
 
1576
    conf._save()
 
1577
 
 
1578
 
 
1579
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
 
1580
    """Unit tests for glue by which urllib2 asks us for authentication"""
 
1581
 
 
1582
    def test_get_user_password_without_port(self):
 
1583
        """We cope if urllib2 doesn't tell us the port.
 
1584
 
 
1585
        See https://bugs.launchpad.net/bzr/+bug/654684
 
1586
        """
 
1587
        user = 'joe'
 
1588
        password = 'foo'
 
1589
        _setup_authentication_config(scheme='http', host='localhost',
 
1590
                                     user=user, password=password)
 
1591
        handler = _urllib2_wrappers.HTTPAuthHandler()
 
1592
        got_pass = handler.get_user_password(dict(
 
1593
            user='joe',
 
1594
            protocol='http',
 
1595
            host='localhost',
 
1596
            path='/',
 
1597
            realm='Realm',
 
1598
            ))
 
1599
        self.assertEquals((user, password), got_pass)
 
1600
 
 
1601
 
1492
1602
class TestAuth(http_utils.TestCaseWithWebserver):
1493
1603
    """Test authentication scheme"""
1494
1604
 
1498
1608
        vary_by_http_auth_scheme(),
1499
1609
        )
1500
1610
 
1501
 
    _auth_header = 'Authorization'
1502
 
    _password_prompt_prefix = ''
1503
 
    _username_prompt_prefix = ''
1504
 
    # Set by load_tests
1505
 
    _auth_server = None
1506
 
 
1507
1611
    def setUp(self):
1508
1612
        super(TestAuth, self).setUp()
1509
1613
        self.server = self.get_readonly_server()
1532
1636
        return url
1533
1637
 
1534
1638
    def get_user_transport(self, user, password):
1535
 
        t = transport.get_transport(self.get_user_url(user, password))
 
1639
        t = transport.get_transport_from_url(
 
1640
            self.get_user_url(user, password))
1536
1641
        return t
1537
1642
 
1538
1643
    def test_no_user(self):
1650
1755
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1651
1756
                                            stderr=tests.StringIOWrapper())
1652
1757
        # Create a minimal config file with the right password
1653
 
        _setup_authentication_config(
1654
 
            scheme='http', 
1655
 
            port=self.server.port,
1656
 
            user=user,
1657
 
            password=password)
 
1758
        _setup_authentication_config(scheme='http', port=self.server.port,
 
1759
                                     user=user, password=password)
1658
1760
        # Issue a request to the server to connect
1659
1761
        self.assertEqual('contents of a\n',t.get('a').read())
1660
1762
        # stdin should have  been left untouched
1667
1769
                                     http_utils.ProxyDigestAuthServer):
1668
1770
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1669
1771
        if self._testing_pycurl():
1670
 
            raise tests.KnownFailure(
 
1772
            self.knownFailure(
1671
1773
                'pycurl does not handle a nonce change')
1672
1774
        self.server.add_user('joe', 'foo')
1673
1775
        t = self.get_user_transport('joe', 'foo')
1690
1792
        user = 'joe'
1691
1793
        password = 'foo'
1692
1794
        self.server.add_user(user, password)
1693
 
        _setup_authentication_config(
1694
 
            scheme='http', 
1695
 
            port=self.server.port,
1696
 
            user=user,
1697
 
            password=password)
 
1795
        _setup_authentication_config(scheme='http', port=self.server.port,
 
1796
                                     user=user, password=password)
1698
1797
        t = self.get_user_transport(None, None)
1699
1798
        # Issue a request to the server to connect
1700
1799
        self.assertEqual('contents of a\n', t.get('a').read())
1701
1800
        # Only one 'Authentication Required' error should occur
1702
1801
        self.assertEqual(1, self.server.auth_required_errors)
1703
1802
 
1704
 
 
1705
 
def _setup_authentication_config(**kwargs):
1706
 
    conf = config.AuthenticationConfig()
1707
 
    conf._get_config().update({'httptest': kwargs})
1708
 
    conf._save()
1709
 
 
1710
 
 
1711
 
 
1712
 
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1713
 
    """Unit tests for glue by which urllib2 asks us for authentication"""
1714
 
 
1715
 
    def test_get_user_password_without_port(self):
1716
 
        """We cope if urllib2 doesn't tell us the port.
1717
 
 
1718
 
        See https://bugs.launchpad.net/bzr/+bug/654684
1719
 
        """
 
1803
    def test_no_credential_leaks_in_log(self):
 
1804
        self.overrideAttr(debug, 'debug_flags', set(['http']))
1720
1805
        user = 'joe'
1721
 
        password = 'foo'
1722
 
        _setup_authentication_config(
1723
 
            scheme='http', 
1724
 
            host='localhost',
1725
 
            user=user,
1726
 
            password=password)
1727
 
        handler = _urllib2_wrappers.HTTPAuthHandler()
1728
 
        got_pass = handler.get_user_password(dict(
1729
 
            user='joe',
1730
 
            protocol='http',
1731
 
            host='localhost',
1732
 
            path='/',
1733
 
            realm='Realm',
1734
 
            ))
1735
 
        self.assertEquals((user, password), got_pass)
 
1806
        password = 'very-sensitive-password'
 
1807
        self.server.add_user(user, password)
 
1808
        t = self.get_user_transport(user, password)
 
1809
        # Capture the debug calls to mutter
 
1810
        self.mutters = []
 
1811
        def mutter(*args):
 
1812
            lines = args[0] % args[1:]
 
1813
            # Some calls output multiple lines, just split them now since we
 
1814
            # care about a single one later.
 
1815
            self.mutters.extend(lines.splitlines())
 
1816
        self.overrideAttr(trace, 'mutter', mutter)
 
1817
        # Issue a request to the server to connect
 
1818
        self.assertEqual(True, t.has('a'))
 
1819
        # Only one 'Authentication Required' error should occur
 
1820
        self.assertEqual(1, self.server.auth_required_errors)
 
1821
        # Since the authentification succeeded, there should be a corresponding
 
1822
        # debug line
 
1823
        sent_auth_headers = [line for line in self.mutters
 
1824
                             if line.startswith('> %s' % (self._auth_header,))]
 
1825
        self.assertLength(1, sent_auth_headers)
 
1826
        self.assertStartsWith(sent_auth_headers[0],
 
1827
                              '> %s: <masked>' % (self._auth_header,))
1736
1828
 
1737
1829
 
1738
1830
class TestProxyAuth(TestAuth):
1739
 
    """Test proxy authentication schemes."""
 
1831
    """Test proxy authentication schemes.
 
1832
 
 
1833
    This inherits from TestAuth to tweak the setUp and filter some failing
 
1834
    tests.
 
1835
    """
1740
1836
 
1741
1837
    scenarios = multiply_scenarios(
1742
1838
        vary_by_http_client_implementation(),
1744
1840
        vary_by_http_proxy_auth_scheme(),
1745
1841
        )
1746
1842
 
1747
 
    _auth_header = 'Proxy-authorization'
1748
 
    _password_prompt_prefix = 'Proxy '
1749
 
    _username_prompt_prefix = 'Proxy '
1750
 
 
1751
1843
    def setUp(self):
1752
1844
        super(TestProxyAuth, self).setUp()
1753
1845
        # Override the contents to avoid false positives
1765
1857
        if self._testing_pycurl():
1766
1858
            import pycurl
1767
1859
            if pycurl.version_info()[1] < '7.16.0':
1768
 
                raise tests.KnownFailure(
 
1860
                self.knownFailure(
1769
1861
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
1770
1862
        super(TestProxyAuth, self).test_empty_pass()
1771
1863
 
1825
1917
        # The 'readv' command in the smart protocol both sends and receives
1826
1918
        # bulk data, so we use that.
1827
1919
        self.build_tree(['data-file'])
1828
 
        http_transport = transport.get_transport(self.http_server.get_url())
 
1920
        http_transport = transport.get_transport_from_url(
 
1921
            self.http_server.get_url())
1829
1922
        medium = http_transport.get_smart_medium()
1830
1923
        # Since we provide the medium, the url below will be mostly ignored
1831
1924
        # during the test, as long as the path is '/'.
1839
1932
        post_body = 'hello\n'
1840
1933
        expected_reply_body = 'ok\x012\n'
1841
1934
 
1842
 
        http_transport = transport.get_transport(self.http_server.get_url())
 
1935
        http_transport = transport.get_transport_from_url(
 
1936
            self.http_server.get_url())
1843
1937
        medium = http_transport.get_smart_medium()
1844
1938
        response = medium.send_http_smart_request(post_body)
1845
1939
        reply_body = response.read()
1903
1997
        self.assertIsInstance(r, type(t))
1904
1998
        # Both transports share the some connection
1905
1999
        self.assertEqual(t._get_connection(), r._get_connection())
 
2000
        self.assertEquals('http://www.example.com/foo/subdir/', r.base)
1906
2001
 
1907
2002
    def test_redirected_to_self_with_slash(self):
1908
2003
        t = self._transport('http://www.example.com/foo')
1919
2014
        r = t._redirected_to('http://www.example.com/foo',
1920
2015
                             'http://foo.example.com/foo/subdir')
1921
2016
        self.assertIsInstance(r, type(t))
 
2017
        self.assertEquals('http://foo.example.com/foo/subdir/',
 
2018
            r.external_url())
1922
2019
 
1923
2020
    def test_redirected_to_same_host_sibling_protocol(self):
1924
2021
        t = self._transport('http://www.example.com/foo')
1925
2022
        r = t._redirected_to('http://www.example.com/foo',
1926
2023
                             'https://www.example.com/foo')
1927
2024
        self.assertIsInstance(r, type(t))
 
2025
        self.assertEquals('https://www.example.com/foo/',
 
2026
            r.external_url())
1928
2027
 
1929
2028
    def test_redirected_to_same_host_different_protocol(self):
1930
2029
        t = self._transport('http://www.example.com/foo')
1931
2030
        r = t._redirected_to('http://www.example.com/foo',
1932
2031
                             'ftp://www.example.com/foo')
1933
2032
        self.assertNotEquals(type(r), type(t))
 
2033
        self.assertEquals('ftp://www.example.com/foo/', r.external_url())
 
2034
 
 
2035
    def test_redirected_to_same_host_specific_implementation(self):
 
2036
        t = self._transport('http://www.example.com/foo')
 
2037
        r = t._redirected_to('http://www.example.com/foo',
 
2038
                             'https+urllib://www.example.com/foo')
 
2039
        self.assertEquals('https://www.example.com/foo/', r.external_url())
1934
2040
 
1935
2041
    def test_redirected_to_different_host_same_user(self):
1936
2042
        t = self._transport('http://joe@www.example.com/foo')
1937
2043
        r = t._redirected_to('http://www.example.com/foo',
1938
2044
                             'https://foo.example.com/foo')
1939
2045
        self.assertIsInstance(r, type(t))
1940
 
        self.assertEqual(t._user, r._user)
 
2046
        self.assertEqual(t._parsed_url.user, r._parsed_url.user)
 
2047
        self.assertEquals('https://joe@foo.example.com/foo/', r.external_url())
1941
2048
 
1942
2049
 
1943
2050
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1996
2103
    pass
1997
2104
 
1998
2105
 
1999
 
if tests.HTTPSServerFeature.available():
 
2106
if features.HTTPSServerFeature.available():
2000
2107
    from bzrlib.tests import https_server
2001
2108
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
2002
2109
        pass
2249
2356
        # stdout should be empty, stderr will contains the prompts
2250
2357
        self.assertEqual('', stdout.getvalue())
2251
2358
 
2252