~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-01-11 19:11:29 UTC
  • mfrom: (5555.3.1 local_work)
  • Revision ID: pqm@pqm.ubuntu.com-20110111191129-qxzw738fmkm0run9
(vila) Add icons for tbzrcommand (iwata)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
32
32
import bzrlib
33
33
from bzrlib import (
34
34
    bzrdir,
35
 
    cethread,
36
35
    config,
37
 
    debug,
38
36
    errors,
39
37
    osutils,
40
38
    remote as _mod_remote,
41
39
    tests,
42
 
    trace,
43
40
    transport,
44
41
    ui,
45
42
    )
93
90
        ]
94
91
 
95
92
 
 
93
def vary_by_http_proxy_auth_scheme():
 
94
    return [
 
95
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
96
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
97
        ('basicdigest',
 
98
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
99
        ]
 
100
 
 
101
 
96
102
def vary_by_http_auth_scheme():
97
 
    scenarios = [
 
103
    return [
98
104
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
99
105
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
100
106
        ('basicdigest',
101
107
            dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
102
108
        ]
103
 
    # Add some attributes common to all scenarios
104
 
    for scenario_id, scenario_dict in scenarios:
105
 
        scenario_dict.update(_auth_header='Authorization',
106
 
                             _username_prompt_prefix='',
107
 
                             _password_prompt_prefix='')
108
 
    return scenarios
109
 
 
110
 
 
111
 
def vary_by_http_proxy_auth_scheme():
112
 
    scenarios = [
113
 
        ('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
114
 
        ('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
115
 
        ('proxy-basicdigest',
116
 
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
117
 
        ]
118
 
    # Add some attributes common to all scenarios
119
 
    for scenario_id, scenario_dict in scenarios:
120
 
        scenario_dict.update(_auth_header='Proxy-Authorization',
121
 
                             _username_prompt_prefix='Proxy ',
122
 
                             _password_prompt_prefix='Proxy ')
123
 
    return scenarios
124
109
 
125
110
 
126
111
def vary_by_http_activity():
193
178
        self._sock.bind(('127.0.0.1', 0))
194
179
        self.host, self.port = self._sock.getsockname()
195
180
        self._ready = threading.Event()
196
 
        self._thread = test_server.TestThread(
197
 
            sync_event=self._ready, target=self._accept_read_and_reply)
 
181
        self._thread = test_server.ThreadWithException(
 
182
            event=self._ready, target=self._accept_read_and_reply)
198
183
        self._thread.start()
199
184
        if 'threads' in tests.selftest_debug_flags:
200
185
            sys.stderr.write('Thread started: %s\n' % (self._thread.ident,))
269
254
        self.assertEqual('realm="Thou should not pass"', remainder)
270
255
 
271
256
 
272
 
class TestHTTPRangeParsing(tests.TestCase):
273
 
 
274
 
    def setUp(self):
275
 
        super(TestHTTPRangeParsing, self).setUp()
276
 
        # We focus on range  parsing here and ignore everything else
277
 
        class RequestHandler(http_server.TestingHTTPRequestHandler):
278
 
            def setup(self): pass
279
 
            def handle(self): pass
280
 
            def finish(self): pass
281
 
 
282
 
        self.req_handler = RequestHandler(None, None, None)
283
 
 
284
 
    def assertRanges(self, ranges, header, file_size):
285
 
        self.assertEquals(ranges,
286
 
                          self.req_handler._parse_ranges(header, file_size))
287
 
 
288
 
    def test_simple_range(self):
289
 
        self.assertRanges([(0,2)], 'bytes=0-2', 12)
290
 
 
291
 
    def test_tail(self):
292
 
        self.assertRanges([(8, 11)], 'bytes=-4', 12)
293
 
 
294
 
    def test_tail_bigger_than_file(self):
295
 
        self.assertRanges([(0, 11)], 'bytes=-99', 12)
296
 
 
297
 
    def test_range_without_end(self):
298
 
        self.assertRanges([(4, 11)], 'bytes=4-', 12)
299
 
 
300
 
    def test_invalid_ranges(self):
301
 
        self.assertRanges(None, 'bytes=12-22', 12)
302
 
        self.assertRanges(None, 'bytes=1-3,12-22', 12)
303
 
        self.assertRanges(None, 'bytes=-', 12)
304
 
 
305
 
 
306
257
class TestHTTPServer(tests.TestCase):
307
258
    """Test the HTTP servers implementations."""
308
259
 
476
427
    """Test the http connections."""
477
428
 
478
429
    scenarios = multiply_scenarios(
479
 
        vary_by_http_client_implementation(),
 
430
        vary_by_http_client_implementation(), 
480
431
        vary_by_http_protocol_version(),
481
432
        )
482
433
 
541
492
class TestPost(tests.TestCase):
542
493
 
543
494
    scenarios = multiply_scenarios(
544
 
        vary_by_http_client_implementation(),
 
495
        vary_by_http_client_implementation(), 
545
496
        vary_by_http_protocol_version(),
546
497
        )
547
498
 
600
551
    """
601
552
 
602
553
    scenarios = multiply_scenarios(
603
 
        vary_by_http_client_implementation(),
 
554
        vary_by_http_client_implementation(), 
604
555
        vary_by_http_protocol_version(),
605
556
        )
606
557
 
1078
1029
    """Tests readv requests against a server erroring out on too much ranges."""
1079
1030
 
1080
1031
    scenarios = multiply_scenarios(
1081
 
        vary_by_http_client_implementation(),
 
1032
        vary_by_http_client_implementation(), 
1082
1033
        vary_by_http_protocol_version(),
1083
1034
        )
1084
1035
 
1124
1075
 
1125
1076
    def _proxied_request(self):
1126
1077
        handler = _urllib2_wrappers.ProxyHandler()
1127
 
        request = _urllib2_wrappers.Request('GET', 'http://baz/buzzle')
 
1078
        request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1128
1079
        handler.set_proxy(request, 'http')
1129
1080
        return request
1130
1081
 
1131
 
    def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1132
 
        handler = _urllib2_wrappers.ProxyHandler()
1133
 
        self.assertEquals(expected,
1134
 
                          handler.evaluate_proxy_bypass(host, no_proxy))
1135
 
 
1136
1082
    def test_empty_user(self):
1137
1083
        self.overrideEnv('http_proxy', 'http://bar.com')
1138
1084
        request = self._proxied_request()
1139
1085
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
1140
1086
 
1141
 
    def test_user_with_at(self):
1142
 
        self.overrideEnv('http_proxy',
1143
 
                         'http://username@domain:password@proxy_host:1234')
1144
 
        request = self._proxied_request()
1145
 
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
1146
 
 
1147
1087
    def test_invalid_proxy(self):
1148
1088
        """A proxy env variable without scheme"""
1149
1089
        self.overrideEnv('http_proxy', 'host:1234')
1150
1090
        self.assertRaises(errors.InvalidURL, self._proxied_request)
1151
1091
 
1152
 
    def test_evaluate_proxy_bypass_true(self):
1153
 
        """The host is not proxied"""
1154
 
        self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1155
 
        self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1156
 
 
1157
 
    def test_evaluate_proxy_bypass_false(self):
1158
 
        """The host is proxied"""
1159
 
        self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1160
 
 
1161
 
    def test_evaluate_proxy_bypass_unknown(self):
1162
 
        """The host is not explicitly proxied"""
1163
 
        self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1164
 
        self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1165
 
 
1166
 
    def test_evaluate_proxy_bypass_empty_entries(self):
1167
 
        """Ignore empty entries"""
1168
 
        self.assertEvaluateProxyBypass(None, 'example.com', '')
1169
 
        self.assertEvaluateProxyBypass(None, 'example.com', ',')
1170
 
        self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1171
 
 
1172
1092
 
1173
1093
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1174
1094
    """Tests proxy server.
1180
1100
    """
1181
1101
 
1182
1102
    scenarios = multiply_scenarios(
1183
 
        vary_by_http_client_implementation(),
 
1103
        vary_by_http_client_implementation(), 
1184
1104
        vary_by_http_protocol_version(),
1185
1105
        )
1186
1106
 
1277
1197
    """Test the Range header in GET methods."""
1278
1198
 
1279
1199
    scenarios = multiply_scenarios(
1280
 
        vary_by_http_client_implementation(),
 
1200
        vary_by_http_client_implementation(), 
1281
1201
        vary_by_http_protocol_version(),
1282
1202
        )
1283
1203
 
1327
1247
    """Test redirection between http servers."""
1328
1248
 
1329
1249
    scenarios = multiply_scenarios(
1330
 
        vary_by_http_client_implementation(),
 
1250
        vary_by_http_client_implementation(), 
1331
1251
        vary_by_http_protocol_version(),
1332
1252
        )
1333
1253
 
1400
1320
    """
1401
1321
 
1402
1322
    scenarios = multiply_scenarios(
1403
 
        vary_by_http_client_implementation(),
 
1323
        vary_by_http_client_implementation(), 
1404
1324
        vary_by_http_protocol_version(),
1405
1325
        )
1406
1326
 
1455
1375
    """Test transport.do_catching_redirections."""
1456
1376
 
1457
1377
    scenarios = multiply_scenarios(
1458
 
        vary_by_http_client_implementation(),
 
1378
        vary_by_http_client_implementation(), 
1459
1379
        vary_by_http_protocol_version(),
1460
1380
        )
1461
1381
 
1503
1423
                          self.get_a, self.old_transport, redirected)
1504
1424
 
1505
1425
 
1506
 
def _setup_authentication_config(**kwargs):
1507
 
    conf = config.AuthenticationConfig()
1508
 
    conf._get_config().update({'httptest': kwargs})
1509
 
    conf._save()
1510
 
 
1511
 
 
1512
 
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1513
 
    """Unit tests for glue by which urllib2 asks us for authentication"""
1514
 
 
1515
 
    def test_get_user_password_without_port(self):
1516
 
        """We cope if urllib2 doesn't tell us the port.
1517
 
 
1518
 
        See https://bugs.launchpad.net/bzr/+bug/654684
1519
 
        """
1520
 
        user = 'joe'
1521
 
        password = 'foo'
1522
 
        _setup_authentication_config(scheme='http', host='localhost',
1523
 
                                     user=user, password=password)
1524
 
        handler = _urllib2_wrappers.HTTPAuthHandler()
1525
 
        got_pass = handler.get_user_password(dict(
1526
 
            user='joe',
1527
 
            protocol='http',
1528
 
            host='localhost',
1529
 
            path='/',
1530
 
            realm='Realm',
1531
 
            ))
1532
 
        self.assertEquals((user, password), got_pass)
1533
 
 
1534
 
 
1535
1426
class TestAuth(http_utils.TestCaseWithWebserver):
1536
1427
    """Test authentication scheme"""
1537
1428
 
1541
1432
        vary_by_http_auth_scheme(),
1542
1433
        )
1543
1434
 
 
1435
    _auth_header = 'Authorization'
 
1436
    _password_prompt_prefix = ''
 
1437
    _username_prompt_prefix = ''
 
1438
    # Set by load_tests
 
1439
    _auth_server = None
 
1440
 
1544
1441
    def setUp(self):
1545
1442
        super(TestAuth, self).setUp()
1546
1443
        self.server = self.get_readonly_server()
1687
1584
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1688
1585
                                            stderr=tests.StringIOWrapper())
1689
1586
        # Create a minimal config file with the right password
1690
 
        _setup_authentication_config(scheme='http', port=self.server.port,
1691
 
                                     user=user, password=password)
 
1587
        _setup_authentication_config(
 
1588
            scheme='http', 
 
1589
            port=self.server.port,
 
1590
            user=user,
 
1591
            password=password)
1692
1592
        # Issue a request to the server to connect
1693
1593
        self.assertEqual('contents of a\n',t.get('a').read())
1694
1594
        # stdin should have  been left untouched
1724
1624
        user = 'joe'
1725
1625
        password = 'foo'
1726
1626
        self.server.add_user(user, password)
1727
 
        _setup_authentication_config(scheme='http', port=self.server.port,
1728
 
                                     user=user, password=password)
 
1627
        _setup_authentication_config(
 
1628
            scheme='http', 
 
1629
            port=self.server.port,
 
1630
            user=user,
 
1631
            password=password)
1729
1632
        t = self.get_user_transport(None, None)
1730
1633
        # Issue a request to the server to connect
1731
1634
        self.assertEqual('contents of a\n', t.get('a').read())
1732
1635
        # Only one 'Authentication Required' error should occur
1733
1636
        self.assertEqual(1, self.server.auth_required_errors)
1734
1637
 
1735
 
    def test_no_credential_leaks_in_log(self):
1736
 
        self.overrideAttr(debug, 'debug_flags', set(['http']))
 
1638
 
 
1639
def _setup_authentication_config(**kwargs):
 
1640
    conf = config.AuthenticationConfig()
 
1641
    conf._get_config().update({'httptest': kwargs})
 
1642
    conf._save()
 
1643
 
 
1644
 
 
1645
 
 
1646
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
 
1647
    """Unit tests for glue by which urllib2 asks us for authentication"""
 
1648
 
 
1649
    def test_get_user_password_without_port(self):
 
1650
        """We cope if urllib2 doesn't tell us the port.
 
1651
 
 
1652
        See https://bugs.launchpad.net/bzr/+bug/654684
 
1653
        """
1737
1654
        user = 'joe'
1738
 
        password = 'very-sensitive-password'
1739
 
        self.server.add_user(user, password)
1740
 
        t = self.get_user_transport(user, password)
1741
 
        # Capture the debug calls to mutter
1742
 
        self.mutters = []
1743
 
        def mutter(*args):
1744
 
            lines = args[0] % args[1:]
1745
 
            # Some calls output multiple lines, just split them now since we
1746
 
            # care about a single one later.
1747
 
            self.mutters.extend(lines.splitlines())
1748
 
        self.overrideAttr(trace, 'mutter', mutter)
1749
 
        # Issue a request to the server to connect
1750
 
        self.assertEqual(True, t.has('a'))
1751
 
        # Only one 'Authentication Required' error should occur
1752
 
        self.assertEqual(1, self.server.auth_required_errors)
1753
 
        # Since the authentification succeeded, there should be a corresponding
1754
 
        # debug line
1755
 
        sent_auth_headers = [line for line in self.mutters
1756
 
                             if line.startswith('> %s' % (self._auth_header,))]
1757
 
        self.assertLength(1, sent_auth_headers)
1758
 
        self.assertStartsWith(sent_auth_headers[0],
1759
 
                              '> %s: <masked>' % (self._auth_header,))
 
1655
        password = 'foo'
 
1656
        _setup_authentication_config(
 
1657
            scheme='http', 
 
1658
            host='localhost',
 
1659
            user=user,
 
1660
            password=password)
 
1661
        handler = _urllib2_wrappers.HTTPAuthHandler()
 
1662
        got_pass = handler.get_user_password(dict(
 
1663
            user='joe',
 
1664
            protocol='http',
 
1665
            host='localhost',
 
1666
            path='/',
 
1667
            realm='Realm',
 
1668
            ))
 
1669
        self.assertEquals((user, password), got_pass)
1760
1670
 
1761
1671
 
1762
1672
class TestProxyAuth(TestAuth):
1763
 
    """Test proxy authentication schemes.
1764
 
 
1765
 
    This inherits from TestAuth to tweak the setUp and filter some failing
1766
 
    tests.
1767
 
    """
 
1673
    """Test proxy authentication schemes."""
1768
1674
 
1769
1675
    scenarios = multiply_scenarios(
1770
1676
        vary_by_http_client_implementation(),
1772
1678
        vary_by_http_proxy_auth_scheme(),
1773
1679
        )
1774
1680
 
 
1681
    _auth_header = 'Proxy-authorization'
 
1682
    _password_prompt_prefix = 'Proxy '
 
1683
    _username_prompt_prefix = 'Proxy '
 
1684
 
1775
1685
    def setUp(self):
1776
1686
        super(TestProxyAuth, self).setUp()
1777
1687
        # Override the contents to avoid false positives
1820
1730
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
1821
1731
 
1822
1732
    scenarios = multiply_scenarios(
1823
 
        vary_by_http_client_implementation(),
 
1733
        vary_by_http_client_implementation(), 
1824
1734
        vary_by_http_protocol_version(),
1825
1735
        )
1826
1736
 
2037
1947
        tests.TestCase.setUp(self)
2038
1948
        self.server = self._activity_server(self._protocol_version)
2039
1949
        self.server.start_server()
2040
 
        _activities = {} # Don't close over self and create a cycle
 
1950
        self.activities = {}
2041
1951
        def report_activity(t, bytes, direction):
2042
 
            count = _activities.get(direction, 0)
 
1952
            count = self.activities.get(direction, 0)
2043
1953
            count += bytes
2044
 
            _activities[direction] = count
2045
 
        self.activities = _activities
 
1954
            self.activities[direction] = count
2046
1955
 
2047
1956
        # We override at class level because constructors may propagate the
2048
1957
        # bound method and render instance overriding ineffective (an
2273
2182
        # stdout should be empty, stderr will contains the prompts
2274
2183
        self.assertEqual('', stdout.getvalue())
2275
2184
 
 
2185