~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Martin
  • Date: 2011-05-21 16:29:38 UTC
  • mto: This revision was merged to the branch mainline in revision 5907.
  • Revision ID: gzlist@googlemail.com-20110521162938-1vrw3hp0197l3vrl
Add tests for non-ascii conflict serialisation

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
    bzrdir,
35
35
    cethread,
36
36
    config,
37
 
    debug,
38
37
    errors,
39
38
    osutils,
40
39
    remote as _mod_remote,
41
40
    tests,
42
 
    trace,
43
41
    transport,
44
42
    ui,
45
43
    )
93
91
        ]
94
92
 
95
93
 
 
94
def vary_by_http_proxy_auth_scheme():
 
95
    return [
 
96
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
97
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
98
        ('basicdigest',
 
99
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
100
        ]
 
101
 
 
102
 
96
103
def vary_by_http_auth_scheme():
97
 
    scenarios = [
 
104
    return [
98
105
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
99
106
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
100
107
        ('basicdigest',
101
108
            dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
102
109
        ]
103
 
    # Add some attributes common to all scenarios
104
 
    for scenario_id, scenario_dict in scenarios:
105
 
        scenario_dict.update(_auth_header='Authorization',
106
 
                             _username_prompt_prefix='',
107
 
                             _password_prompt_prefix='')
108
 
    return scenarios
109
 
 
110
 
 
111
 
def vary_by_http_proxy_auth_scheme():
112
 
    scenarios = [
113
 
        ('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
114
 
        ('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
115
 
        ('proxy-basicdigest',
116
 
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
117
 
        ]
118
 
    # Add some attributes common to all scenarios
119
 
    for scenario_id, scenario_dict in scenarios:
120
 
        scenario_dict.update(_auth_header='Proxy-Authorization',
121
 
                             _username_prompt_prefix='Proxy ',
122
 
                             _password_prompt_prefix='Proxy ')
123
 
    return scenarios
124
110
 
125
111
 
126
112
def vary_by_http_activity():
128
114
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
129
115
                            _transport=_urllib.HttpTransport_urllib,)),
130
116
        ]
131
 
    if features.HTTPSServerFeature.available():
 
117
    if tests.HTTPSServerFeature.available():
132
118
        activity_scenarios.append(
133
119
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
134
120
                                _transport=_urllib.HttpTransport_urllib,)),)
136
122
        activity_scenarios.append(
137
123
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
138
124
                                _transport=PyCurlTransport,)),)
139
 
        if features.HTTPSServerFeature.available():
 
125
        if tests.HTTPSServerFeature.available():
140
126
            from bzrlib.tests import (
141
127
                ssl_certs,
142
128
                )
1048
1034
        self.assertEqual('single', t._range_hint)
1049
1035
 
1050
1036
 
1051
 
class TruncatedBeforeBoundaryRequestHandler(
1052
 
    http_server.TestingHTTPRequestHandler):
1053
 
    """Truncation before a boundary, like in bug 198646"""
1054
 
 
1055
 
    _truncated_ranges = 1
1056
 
 
1057
 
    def get_multiple_ranges(self, file, file_size, ranges):
1058
 
        self.send_response(206)
1059
 
        self.send_header('Accept-Ranges', 'bytes')
1060
 
        boundary = 'tagada'
1061
 
        self.send_header('Content-Type',
1062
 
                         'multipart/byteranges; boundary=%s' % boundary)
1063
 
        boundary_line = '--%s\r\n' % boundary
1064
 
        # Calculate the Content-Length
1065
 
        content_length = 0
1066
 
        for (start, end) in ranges:
1067
 
            content_length += len(boundary_line)
1068
 
            content_length += self._header_line_length(
1069
 
                'Content-type', 'application/octet-stream')
1070
 
            content_length += self._header_line_length(
1071
 
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1072
 
            content_length += len('\r\n') # end headers
1073
 
            content_length += end - start # + 1
1074
 
        content_length += len(boundary_line)
1075
 
        self.send_header('Content-length', content_length)
1076
 
        self.end_headers()
1077
 
 
1078
 
        # Send the multipart body
1079
 
        cur = 0
1080
 
        for (start, end) in ranges:
1081
 
            if cur + self._truncated_ranges >= len(ranges):
1082
 
                # Abruptly ends the response and close the connection
1083
 
                self.close_connection = 1
1084
 
                return
1085
 
            self.wfile.write(boundary_line)
1086
 
            self.send_header('Content-type', 'application/octet-stream')
1087
 
            self.send_header('Content-Range', 'bytes %d-%d/%d'
1088
 
                             % (start, end, file_size))
1089
 
            self.end_headers()
1090
 
            self.send_range_content(file, start, end - start + 1)
1091
 
            cur += 1
1092
 
        # Final boundary
1093
 
        self.wfile.write(boundary_line)
1094
 
 
1095
 
 
1096
 
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1097
 
    """Tests the case of bug 198646, disconnecting before a boundary."""
1098
 
 
1099
 
    _req_handler_class = TruncatedBeforeBoundaryRequestHandler
1100
 
 
1101
 
    def setUp(self):
1102
 
        super(TestTruncatedBeforeBoundary, self).setUp()
1103
 
        self.build_tree_contents([('a', '0123456789')],)
1104
 
 
1105
 
    def test_readv_with_short_reads(self):
1106
 
        server = self.get_readonly_server()
1107
 
        t = self.get_readonly_transport()
1108
 
        # Force separate ranges for each offset
1109
 
        t._bytes_to_read_before_seek = 0
1110
 
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1111
 
        self.assertEqual((0, '0'), ireadv.next())
1112
 
        self.assertEqual((2, '2'), ireadv.next())
1113
 
        self.assertEqual((4, '45'), ireadv.next())
1114
 
        self.assertEqual((9, '9'), ireadv.next())
1115
 
 
1116
 
 
1117
1037
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1118
1038
    """Errors out when range specifiers exceed the limit"""
1119
1039
 
1569
1489
                          self.get_a, self.old_transport, redirected)
1570
1490
 
1571
1491
 
1572
 
def _setup_authentication_config(**kwargs):
1573
 
    conf = config.AuthenticationConfig()
1574
 
    conf._get_config().update({'httptest': kwargs})
1575
 
    conf._save()
1576
 
 
1577
 
 
1578
 
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1579
 
    """Unit tests for glue by which urllib2 asks us for authentication"""
1580
 
 
1581
 
    def test_get_user_password_without_port(self):
1582
 
        """We cope if urllib2 doesn't tell us the port.
1583
 
 
1584
 
        See https://bugs.launchpad.net/bzr/+bug/654684
1585
 
        """
1586
 
        user = 'joe'
1587
 
        password = 'foo'
1588
 
        _setup_authentication_config(scheme='http', host='localhost',
1589
 
                                     user=user, password=password)
1590
 
        handler = _urllib2_wrappers.HTTPAuthHandler()
1591
 
        got_pass = handler.get_user_password(dict(
1592
 
            user='joe',
1593
 
            protocol='http',
1594
 
            host='localhost',
1595
 
            path='/',
1596
 
            realm='Realm',
1597
 
            ))
1598
 
        self.assertEquals((user, password), got_pass)
1599
 
 
1600
 
 
1601
1492
class TestAuth(http_utils.TestCaseWithWebserver):
1602
1493
    """Test authentication scheme"""
1603
1494
 
1607
1498
        vary_by_http_auth_scheme(),
1608
1499
        )
1609
1500
 
 
1501
    _auth_header = 'Authorization'
 
1502
    _password_prompt_prefix = ''
 
1503
    _username_prompt_prefix = ''
 
1504
    # Set by load_tests
 
1505
    _auth_server = None
 
1506
 
1610
1507
    def setUp(self):
1611
1508
        super(TestAuth, self).setUp()
1612
1509
        self.server = self.get_readonly_server()
1753
1650
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1754
1651
                                            stderr=tests.StringIOWrapper())
1755
1652
        # Create a minimal config file with the right password
1756
 
        _setup_authentication_config(scheme='http', port=self.server.port,
1757
 
                                     user=user, password=password)
 
1653
        _setup_authentication_config(
 
1654
            scheme='http', 
 
1655
            port=self.server.port,
 
1656
            user=user,
 
1657
            password=password)
1758
1658
        # Issue a request to the server to connect
1759
1659
        self.assertEqual('contents of a\n',t.get('a').read())
1760
1660
        # stdin should have  been left untouched
1767
1667
                                     http_utils.ProxyDigestAuthServer):
1768
1668
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1769
1669
        if self._testing_pycurl():
1770
 
            self.knownFailure(
 
1670
            raise tests.KnownFailure(
1771
1671
                'pycurl does not handle a nonce change')
1772
1672
        self.server.add_user('joe', 'foo')
1773
1673
        t = self.get_user_transport('joe', 'foo')
1790
1690
        user = 'joe'
1791
1691
        password = 'foo'
1792
1692
        self.server.add_user(user, password)
1793
 
        _setup_authentication_config(scheme='http', port=self.server.port,
1794
 
                                     user=user, password=password)
 
1693
        _setup_authentication_config(
 
1694
            scheme='http', 
 
1695
            port=self.server.port,
 
1696
            user=user,
 
1697
            password=password)
1795
1698
        t = self.get_user_transport(None, None)
1796
1699
        # Issue a request to the server to connect
1797
1700
        self.assertEqual('contents of a\n', t.get('a').read())
1798
1701
        # Only one 'Authentication Required' error should occur
1799
1702
        self.assertEqual(1, self.server.auth_required_errors)
1800
1703
 
1801
 
    def test_no_credential_leaks_in_log(self):
1802
 
        self.overrideAttr(debug, 'debug_flags', set(['http']))
 
1704
 
 
1705
def _setup_authentication_config(**kwargs):
 
1706
    conf = config.AuthenticationConfig()
 
1707
    conf._get_config().update({'httptest': kwargs})
 
1708
    conf._save()
 
1709
 
 
1710
 
 
1711
 
 
1712
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
 
1713
    """Unit tests for glue by which urllib2 asks us for authentication"""
 
1714
 
 
1715
    def test_get_user_password_without_port(self):
 
1716
        """We cope if urllib2 doesn't tell us the port.
 
1717
 
 
1718
        See https://bugs.launchpad.net/bzr/+bug/654684
 
1719
        """
1803
1720
        user = 'joe'
1804
 
        password = 'very-sensitive-password'
1805
 
        self.server.add_user(user, password)
1806
 
        t = self.get_user_transport(user, password)
1807
 
        # Capture the debug calls to mutter
1808
 
        self.mutters = []
1809
 
        def mutter(*args):
1810
 
            lines = args[0] % args[1:]
1811
 
            # Some calls output multiple lines, just split them now since we
1812
 
            # care about a single one later.
1813
 
            self.mutters.extend(lines.splitlines())
1814
 
        self.overrideAttr(trace, 'mutter', mutter)
1815
 
        # Issue a request to the server to connect
1816
 
        self.assertEqual(True, t.has('a'))
1817
 
        # Only one 'Authentication Required' error should occur
1818
 
        self.assertEqual(1, self.server.auth_required_errors)
1819
 
        # Since the authentification succeeded, there should be a corresponding
1820
 
        # debug line
1821
 
        sent_auth_headers = [line for line in self.mutters
1822
 
                             if line.startswith('> %s' % (self._auth_header,))]
1823
 
        self.assertLength(1, sent_auth_headers)
1824
 
        self.assertStartsWith(sent_auth_headers[0],
1825
 
                              '> %s: <masked>' % (self._auth_header,))
 
1721
        password = 'foo'
 
1722
        _setup_authentication_config(
 
1723
            scheme='http', 
 
1724
            host='localhost',
 
1725
            user=user,
 
1726
            password=password)
 
1727
        handler = _urllib2_wrappers.HTTPAuthHandler()
 
1728
        got_pass = handler.get_user_password(dict(
 
1729
            user='joe',
 
1730
            protocol='http',
 
1731
            host='localhost',
 
1732
            path='/',
 
1733
            realm='Realm',
 
1734
            ))
 
1735
        self.assertEquals((user, password), got_pass)
1826
1736
 
1827
1737
 
1828
1738
class TestProxyAuth(TestAuth):
1829
 
    """Test proxy authentication schemes.
1830
 
 
1831
 
    This inherits from TestAuth to tweak the setUp and filter some failing
1832
 
    tests.
1833
 
    """
 
1739
    """Test proxy authentication schemes."""
1834
1740
 
1835
1741
    scenarios = multiply_scenarios(
1836
1742
        vary_by_http_client_implementation(),
1838
1744
        vary_by_http_proxy_auth_scheme(),
1839
1745
        )
1840
1746
 
 
1747
    _auth_header = 'Proxy-authorization'
 
1748
    _password_prompt_prefix = 'Proxy '
 
1749
    _username_prompt_prefix = 'Proxy '
 
1750
 
1841
1751
    def setUp(self):
1842
1752
        super(TestProxyAuth, self).setUp()
1843
1753
        # Override the contents to avoid false positives
1855
1765
        if self._testing_pycurl():
1856
1766
            import pycurl
1857
1767
            if pycurl.version_info()[1] < '7.16.0':
1858
 
                self.knownFailure(
 
1768
                raise tests.KnownFailure(
1859
1769
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
1860
1770
        super(TestProxyAuth, self).test_empty_pass()
1861
1771
 
2086
1996
    pass
2087
1997
 
2088
1998
 
2089
 
if features.HTTPSServerFeature.available():
 
1999
if tests.HTTPSServerFeature.available():
2090
2000
    from bzrlib.tests import https_server
2091
2001
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
2092
2002
        pass
2103
2013
        tests.TestCase.setUp(self)
2104
2014
        self.server = self._activity_server(self._protocol_version)
2105
2015
        self.server.start_server()
2106
 
        _activities = {} # Don't close over self and create a cycle
 
2016
        self.activities = {}
2107
2017
        def report_activity(t, bytes, direction):
2108
 
            count = _activities.get(direction, 0)
 
2018
            count = self.activities.get(direction, 0)
2109
2019
            count += bytes
2110
 
            _activities[direction] = count
2111
 
        self.activities = _activities
 
2020
            self.activities[direction] = count
2112
2021
 
2113
2022
        # We override at class level because constructors may propagate the
2114
2023
        # bound method and render instance overriding ineffective (an
2339
2248
        # stdout should be empty, stderr will contains the prompts
2340
2249
        self.assertEqual('', stdout.getvalue())
2341
2250
 
 
2251