~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

Initial commit for russian version of documents.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
44
44
    ui,
45
45
    urlutils,
46
46
    )
47
 
from bzrlib.symbol_versioning import (
48
 
    deprecated_in,
49
 
    )
50
47
from bzrlib.tests import (
51
48
    http_server,
52
49
    http_utils,
68
65
    pycurl_present = False
69
66
 
70
67
 
 
68
class TransportAdapter(tests.TestScenarioApplier):
 
69
    """Generate the same test for each transport implementation."""
 
70
 
 
71
    def __init__(self):
 
72
        transport_scenarios = [
 
73
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
74
                            _server=http_server.HttpServer_urllib,
 
75
                            _qualified_prefix='http+urllib',)),
 
76
            ]
 
77
        if pycurl_present:
 
78
            transport_scenarios.append(
 
79
                ('pycurl', dict(_transport=PyCurlTransport,
 
80
                                _server=http_server.HttpServer_PyCurl,
 
81
                                _qualified_prefix='http+pycurl',)))
 
82
        self.scenarios = transport_scenarios
 
83
 
 
84
 
 
85
class TransportProtocolAdapter(TransportAdapter):
 
86
    """Generate the same test for each protocol implementation.
 
87
 
 
88
    In addition to the transport adaptatation that we inherit from.
 
89
    """
 
90
 
 
91
    def __init__(self):
 
92
        super(TransportProtocolAdapter, self).__init__()
 
93
        protocol_scenarios = [
 
94
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
95
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
96
            ]
 
97
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
98
                                                  protocol_scenarios)
 
99
 
 
100
 
 
101
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
 
102
    """Generate the same test for each authentication scheme implementation.
 
103
 
 
104
    In addition to the protocol adaptatation that we inherit from.
 
105
    """
 
106
 
 
107
    def __init__(self):
 
108
        super(TransportProtocolAuthenticationAdapter, self).__init__()
 
109
        auth_scheme_scenarios = [
 
110
            ('basic', dict(_auth_scheme='basic')),
 
111
            ('digest', dict(_auth_scheme='digest')),
 
112
            ]
 
113
 
 
114
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
115
                                                  auth_scheme_scenarios)
 
116
 
71
117
def load_tests(standard_tests, module, loader):
72
118
    """Multiply tests for http clients and protocol versions."""
73
 
    result = loader.suiteClass()
74
 
 
75
 
    # one for each transport implementation
76
 
    t_tests, remaining_tests = tests.split_suite_by_condition(
77
 
        standard_tests, tests.condition_isinstance((
78
 
                TestHttpTransportRegistration,
 
119
    # one for each transport
 
120
    t_adapter = TransportAdapter()
 
121
    t_classes= (TestHttpTransportRegistration,
79
122
                TestHttpTransportUrls,
80
 
                Test_redirected_to,
81
 
                )))
82
 
    transport_scenarios = [
83
 
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
84
 
                        _server=http_server.HttpServer_urllib,
85
 
                        _qualified_prefix='http+urllib',)),
86
 
        ]
87
 
    if pycurl_present:
88
 
        transport_scenarios.append(
89
 
            ('pycurl', dict(_transport=PyCurlTransport,
90
 
                            _server=http_server.HttpServer_PyCurl,
91
 
                            _qualified_prefix='http+pycurl',)))
92
 
    tests.multiply_tests(t_tests, transport_scenarios, result)
93
 
 
94
 
    # each implementation tested with each HTTP version
95
 
    tp_tests, remaining_tests = tests.split_suite_by_condition(
96
 
        remaining_tests, tests.condition_isinstance((
97
 
                SmartHTTPTunnellingTest,
98
 
                TestDoCatchRedirections,
99
 
                TestHTTPConnections,
100
 
                TestHTTPRedirections,
101
 
                TestHTTPSilentRedirections,
102
 
                TestLimitedRangeRequestServer,
103
 
                TestPost,
104
 
                TestProxyHttpServer,
105
 
                TestRanges,
106
 
                TestSpecificRequestHandler,
107
 
                )))
108
 
    protocol_scenarios = [
109
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
110
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
111
 
            ]
112
 
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
113
 
                                            protocol_scenarios)
114
 
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
 
 
116
 
    # proxy auth: each auth scheme on all http versions on all implementations.
117
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
 
        remaining_tests, tests.condition_isinstance((
119
 
                TestProxyAuth,
120
 
                )))
121
 
    proxy_auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
124
 
        ('basicdigest',
125
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
126
 
        ]
127
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
 
                                              proxy_auth_scheme_scenarios)
129
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
130
 
 
131
 
    # auth: each auth scheme on all http versions on all implementations.
132
 
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
 
        remaining_tests, tests.condition_isinstance((
134
 
                TestAuth,
135
 
                )))
136
 
    auth_scheme_scenarios = [
137
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
139
 
        ('basicdigest',
140
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
141
 
        ]
142
 
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
 
                                             auth_scheme_scenarios)
144
 
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
145
 
 
146
 
    # activity: on all http[s] versions on all implementations
147
 
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
 
        remaining_tests, tests.condition_isinstance((
149
 
                TestActivity,
150
 
                )))
151
 
    activity_scenarios = [
152
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
 
                             _transport=_urllib.HttpTransport_urllib,)),
154
 
        ]
155
 
    if tests.HTTPSServerFeature.available():
156
 
        activity_scenarios.append(
157
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
159
 
    if pycurl_present:
160
 
        activity_scenarios.append(
161
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
 
                                 _transport=PyCurlTransport,)),)
163
 
        if tests.HTTPSServerFeature.available():
164
 
            from bzrlib.tests import (
165
 
                ssl_certs,
166
123
                )
167
 
            # FIXME: Until we have a better way to handle self-signed
168
 
            # certificates (like allowing them in a test specific
169
 
            # authentication.conf for example), we need some specialized pycurl
170
 
            # transport for tests.
171
 
            class HTTPS_pycurl_transport(PyCurlTransport):
172
 
 
173
 
                def __init__(self, base, _from_transport=None):
174
 
                    super(HTTPS_pycurl_transport, self).__init__(
175
 
                        base, _from_transport)
176
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
177
 
 
178
 
            activity_scenarios.append(
179
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
 
                                      _transport=HTTPS_pycurl_transport,)),)
181
 
 
182
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
183
 
                                               protocol_scenarios)
184
 
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
185
 
 
186
 
    # No parametrization for the remaining tests
187
 
    result.addTests(remaining_tests)
188
 
 
 
124
    is_testing_for_transports = tests.condition_isinstance(t_classes)
 
125
 
 
126
    # multiplied by one for each protocol version
 
127
    tp_adapter = TransportProtocolAdapter()
 
128
    tp_classes= (SmartHTTPTunnellingTest,
 
129
                 TestDoCatchRedirections,
 
130
                 TestHTTPConnections,
 
131
                 TestHTTPRedirections,
 
132
                 TestHTTPSilentRedirections,
 
133
                 TestLimitedRangeRequestServer,
 
134
                 TestPost,
 
135
                 TestProxyHttpServer,
 
136
                 TestRanges,
 
137
                 TestSpecificRequestHandler,
 
138
                 )
 
139
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
 
140
 
 
141
    # multiplied by one for each authentication scheme
 
142
    tpa_adapter = TransportProtocolAuthenticationAdapter()
 
143
    tpa_classes = (TestAuth,
 
144
                   )
 
145
    is_also_testing_for_authentication = tests.condition_isinstance(
 
146
        tpa_classes)
 
147
 
 
148
    result = loader.suiteClass()
 
149
    for test_class in tests.iter_suite_tests(standard_tests):
 
150
        # Each test class is either standalone or testing for some combination
 
151
        # of transport, protocol version, authentication scheme. Use the right
 
152
        # adpater (or none) depending on the class.
 
153
        if is_testing_for_transports(test_class):
 
154
            result.addTests(t_adapter.adapt(test_class))
 
155
        elif is_also_testing_for_protocols(test_class):
 
156
            result.addTests(tp_adapter.adapt(test_class))
 
157
        elif is_also_testing_for_authentication(test_class):
 
158
            result.addTests(tpa_adapter.adapt(test_class))
 
159
        else:
 
160
            result.addTest(test_class)
189
161
    return result
190
162
 
191
163
 
200
172
 
201
173
class RecordingServer(object):
202
174
    """A fake HTTP server.
203
 
 
 
175
    
204
176
    It records the bytes sent to it, and replies with a 200.
205
177
    """
206
178
 
255
227
        self.port = None
256
228
 
257
229
 
258
 
class TestAuthHeader(tests.TestCase):
259
 
 
260
 
    def parse_header(self, header, auth_handler_class=None):
261
 
        if auth_handler_class is None:
262
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
263
 
        self.auth_handler =  auth_handler_class()
264
 
        return self.auth_handler._parse_auth_header(header)
265
 
 
266
 
    def test_empty_header(self):
267
 
        scheme, remainder = self.parse_header('')
268
 
        self.assertEquals('', scheme)
269
 
        self.assertIs(None, remainder)
270
 
 
271
 
    def test_negotiate_header(self):
272
 
        scheme, remainder = self.parse_header('Negotiate')
273
 
        self.assertEquals('negotiate', scheme)
274
 
        self.assertIs(None, remainder)
275
 
 
276
 
    def test_basic_header(self):
277
 
        scheme, remainder = self.parse_header(
278
 
            'Basic realm="Thou should not pass"')
279
 
        self.assertEquals('basic', scheme)
280
 
        self.assertEquals('realm="Thou should not pass"', remainder)
281
 
 
282
 
    def test_basic_extract_realm(self):
283
 
        scheme, remainder = self.parse_header(
284
 
            'Basic realm="Thou should not pass"',
285
 
            _urllib2_wrappers.BasicAuthHandler)
286
 
        match, realm = self.auth_handler.extract_realm(remainder)
287
 
        self.assertTrue(match is not None)
288
 
        self.assertEquals('Thou should not pass', realm)
289
 
 
290
 
    def test_digest_header(self):
291
 
        scheme, remainder = self.parse_header(
292
 
            'Digest realm="Thou should not pass"')
293
 
        self.assertEquals('digest', scheme)
294
 
        self.assertEquals('realm="Thou should not pass"', remainder)
295
 
 
296
 
 
297
230
class TestHTTPServer(tests.TestCase):
298
231
    """Test the HTTP servers implementations."""
299
232
 
516
449
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
517
450
            % bzrlib.__version__) > -1)
518
451
 
 
452
    def test_get_smart_medium(self):
 
453
        # For HTTP, get_smart_medium should return the transport object.
 
454
        server = self.get_readonly_server()
 
455
        http_transport = self._transport(server.get_url())
 
456
        medium = http_transport.get_smart_medium()
 
457
        self.assertIs(medium, http_transport)
 
458
 
519
459
    def test_has_on_bogus_host(self):
520
460
        # Get a free address and don't 'accept' on it, so that we
521
461
        # can be sure there is no http handler there, but set a
878
818
        # bytes on the socket
879
819
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
880
820
        self.assertEqual((0, '0'), ireadv.next())
881
 
        # The server should have issued one request so far
 
821
        # The server should have issued one request so far 
882
822
        self.assertEqual(1, server.GET_request_nb)
883
823
        self.assertEqual('0123456789', t.get_bytes('a'))
884
824
        # get_bytes issued an additional request, the readv pending ones are
1205
1145
        url = self.server.get_url()
1206
1146
        t = self._transport(url)
1207
1147
        try:
1208
 
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1148
            self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
1209
1149
        finally:
1210
1150
            self._restore_env()
1211
1151
 
1214
1154
        url = self.server.get_url()
1215
1155
        t = self._transport(url)
1216
1156
        try:
1217
 
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1157
            self.assertEqual(t.get('foo').read(), 'contents of foo\n')
1218
1158
        finally:
1219
1159
            self._restore_env()
1220
1160
 
1330
1270
                                  ('bundle',
1331
1271
                                  '# Bazaar revision bundle v0.9\n#\n')
1332
1272
                                  ],)
1333
 
        # The requests to the old server will be redirected to the new server
 
1273
 
1334
1274
        self.old_transport = self._transport(self.old_server.get_url())
1335
1275
 
1336
1276
    def test_redirected(self):
1341
1281
    def test_read_redirected_bundle_from_url(self):
1342
1282
        from bzrlib.bundle import read_bundle_from_url
1343
1283
        url = self.old_transport.abspath('bundle')
1344
 
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1345
 
                read_bundle_from_url, url)
 
1284
        bundle = read_bundle_from_url(url)
1346
1285
        # If read_bundle_from_url was successful we get an empty bundle
1347
1286
        self.assertEqual([], bundle.revisions)
1348
1287
 
1359
1298
        # Since the tests using this class will replace
1360
1299
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1361
1300
        # or we'll loop.
1362
 
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
 
1301
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
1363
1302
        self.follow_redirections = True
1364
1303
 
1365
1304
 
1492
1431
 
1493
1432
    _auth_header = 'Authorization'
1494
1433
    _password_prompt_prefix = ''
1495
 
    _username_prompt_prefix = ''
1496
 
    # Set by load_tests
1497
 
    _auth_server = None
1498
1434
 
1499
1435
    def setUp(self):
1500
1436
        super(TestAuth, self).setUp()
1503
1439
                                  ('b', 'contents of b\n'),])
1504
1440
 
1505
1441
    def create_transport_readonly_server(self):
1506
 
        return self._auth_server(protocol_version=self._protocol_version)
 
1442
        if self._auth_scheme == 'basic':
 
1443
            server = http_utils.HTTPBasicAuthServer(
 
1444
                protocol_version=self._protocol_version)
 
1445
        else:
 
1446
            if self._auth_scheme != 'digest':
 
1447
                raise AssertionError('Unknown auth scheme: %r'
 
1448
                                     % self._auth_scheme)
 
1449
            server = http_utils.HTTPDigestAuthServer(
 
1450
                protocol_version=self._protocol_version)
 
1451
        return server
1507
1452
 
1508
1453
    def _testing_pycurl(self):
1509
1454
        return pycurl_present and self._transport == PyCurlTransport
1510
1455
 
1511
 
    def get_user_url(self, user, password):
 
1456
    def get_user_url(self, user=None, password=None):
1512
1457
        """Build an url embedding user and password"""
1513
1458
        url = '%s://' % self.server._url_protocol
1514
1459
        if user is not None:
1519
1464
        url += '%s:%s/' % (self.server.host, self.server.port)
1520
1465
        return url
1521
1466
 
1522
 
    def get_user_transport(self, user, password):
 
1467
    def get_user_transport(self, user=None, password=None):
1523
1468
        return self._transport(self.get_user_url(user, password))
1524
1469
 
1525
1470
    def test_no_user(self):
1526
1471
        self.server.add_user('joe', 'foo')
1527
 
        t = self.get_user_transport(None, None)
 
1472
        t = self.get_user_transport()
1528
1473
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1529
1474
        # Only one 'Authentication Required' error should occur
1530
1475
        self.assertEqual(1, self.server.auth_required_errors)
1560
1505
        # initial 'who are you' and 'this is not you, who are you')
1561
1506
        self.assertEqual(2, self.server.auth_required_errors)
1562
1507
 
1563
 
    def test_prompt_for_username(self):
1564
 
        if self._testing_pycurl():
1565
 
            raise tests.TestNotApplicable(
1566
 
                'pycurl cannot prompt, it handles auth by embedding'
1567
 
                ' user:pass in urls only')
1568
 
 
1569
 
        self.server.add_user('joe', 'foo')
1570
 
        t = self.get_user_transport(None, None)
1571
 
        stdout = tests.StringIOWrapper()
1572
 
        stderr = tests.StringIOWrapper()
1573
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1574
 
                                            stdout=stdout, stderr=stderr)
1575
 
        self.assertEqual('contents of a\n',t.get('a').read())
1576
 
        # stdin should be empty
1577
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1578
 
        stderr.seek(0)
1579
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1580
 
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1581
 
        self.assertEquals('', stdout.getvalue())
1582
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1583
 
                                    stderr.readline())
1584
 
 
1585
1508
    def test_prompt_for_password(self):
1586
1509
        if self._testing_pycurl():
1587
1510
            raise tests.TestNotApplicable(
1591
1514
        self.server.add_user('joe', 'foo')
1592
1515
        t = self.get_user_transport('joe', None)
1593
1516
        stdout = tests.StringIOWrapper()
1594
 
        stderr = tests.StringIOWrapper()
1595
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1596
 
                                            stdout=stdout, stderr=stderr)
1597
 
        self.assertEqual('contents of a\n', t.get('a').read())
 
1517
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1518
        self.assertEqual('contents of a\n',t.get('a').read())
1598
1519
        # stdin should be empty
1599
1520
        self.assertEqual('', ui.ui_factory.stdin.readline())
1600
1521
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1601
 
                                    stderr.getvalue())
1602
 
        self.assertEquals('', stdout.getvalue())
 
1522
                                    stdout.getvalue())
1603
1523
        # And we shouldn't prompt again for a different request
1604
1524
        # against the same transport.
1605
1525
        self.assertEqual('contents of b\n',t.get('b').read())
1617
1537
                                 self.server.auth_realm)))
1618
1538
        self.assertEquals(expected_prompt, actual_prompt)
1619
1539
 
1620
 
    def _expected_username_prompt(self, scheme):
1621
 
        return (self._username_prompt_prefix
1622
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1623
 
                                 self.server.host, self.server.port,
1624
 
                                 self.server.auth_realm))
1625
 
 
1626
1540
    def test_no_prompt_for_password_when_using_auth_config(self):
1627
1541
        if self._testing_pycurl():
1628
1542
            raise tests.TestNotApplicable(
1649
1563
        # Only one 'Authentication Required' error should occur
1650
1564
        self.assertEqual(1, self.server.auth_required_errors)
1651
1565
 
1652
 
    def test_user_from_auth_conf(self):
1653
 
        if self._testing_pycurl():
1654
 
            raise tests.TestNotApplicable(
1655
 
                'pycurl does not support authentication.conf')
1656
 
        user = 'joe'
1657
 
        password = 'foo'
1658
 
        self.server.add_user(user, password)
1659
 
        # Create a minimal config file with the right password
1660
 
        conf = config.AuthenticationConfig()
1661
 
        conf._get_config().update(
1662
 
            {'httptest': {'scheme': 'http', 'port': self.server.port,
1663
 
                          'user': user, 'password': password}})
1664
 
        conf._save()
1665
 
        t = self.get_user_transport(None, None)
1666
 
        # Issue a request to the server to connect
1667
 
        self.assertEqual('contents of a\n', t.get('a').read())
1668
 
        # Only one 'Authentication Required' error should occur
1669
 
        self.assertEqual(1, self.server.auth_required_errors)
1670
 
 
1671
1566
    def test_changing_nonce(self):
1672
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1673
 
                                     http_utils.ProxyDigestAuthServer):
1674
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1567
        if self._auth_scheme != 'digest':
 
1568
            raise tests.TestNotApplicable('HTTP auth digest only test')
1675
1569
        if self._testing_pycurl():
1676
1570
            raise tests.KnownFailure(
1677
1571
                'pycurl does not handle a nonce change')
1695
1589
    """Test proxy authentication schemes."""
1696
1590
 
1697
1591
    _auth_header = 'Proxy-authorization'
1698
 
    _password_prompt_prefix = 'Proxy '
1699
 
    _username_prompt_prefix = 'Proxy '
 
1592
    _password_prompt_prefix='Proxy '
1700
1593
 
1701
1594
    def setUp(self):
1702
1595
        super(TestProxyAuth, self).setUp()
1709
1602
                                  ('b-proxied', 'contents of b\n'),
1710
1603
                                  ])
1711
1604
 
1712
 
    def get_user_transport(self, user, password):
 
1605
    def create_transport_readonly_server(self):
 
1606
        if self._auth_scheme == 'basic':
 
1607
            server = http_utils.ProxyBasicAuthServer(
 
1608
                protocol_version=self._protocol_version)
 
1609
        else:
 
1610
            if self._auth_scheme != 'digest':
 
1611
                raise AssertionError('Unknown auth scheme: %r'
 
1612
                                     % self._auth_scheme)
 
1613
            server = http_utils.ProxyDigestAuthServer(
 
1614
                protocol_version=self._protocol_version)
 
1615
        return server
 
1616
 
 
1617
    def get_user_transport(self, user=None, password=None):
1713
1618
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1714
1619
        return self._transport(self.server.get_url())
1715
1620
 
1842
1747
        # No need to build a valid smart request here, the server will not even
1843
1748
        # try to interpret it.
1844
1749
        self.assertRaises(errors.SmartProtocolError,
1845
 
                          t.get_smart_medium().send_http_smart_request,
1846
 
                          'whatever')
1847
 
 
1848
 
class Test_redirected_to(tests.TestCase):
1849
 
 
1850
 
    def test_redirected_to_subdir(self):
1851
 
        t = self._transport('http://www.example.com/foo')
1852
 
        r = t._redirected_to('http://www.example.com/foo',
1853
 
                             'http://www.example.com/foo/subdir')
1854
 
        self.assertIsInstance(r, type(t))
1855
 
        # Both transports share the some connection
1856
 
        self.assertEquals(t._get_connection(), r._get_connection())
1857
 
 
1858
 
    def test_redirected_to_self_with_slash(self):
1859
 
        t = self._transport('http://www.example.com/foo')
1860
 
        r = t._redirected_to('http://www.example.com/foo',
1861
 
                             'http://www.example.com/foo/')
1862
 
        self.assertIsInstance(r, type(t))
1863
 
        # Both transports share the some connection (one can argue that we
1864
 
        # should return the exact same transport here, but that seems
1865
 
        # overkill).
1866
 
        self.assertEquals(t._get_connection(), r._get_connection())
1867
 
 
1868
 
    def test_redirected_to_host(self):
1869
 
        t = self._transport('http://www.example.com/foo')
1870
 
        r = t._redirected_to('http://www.example.com/foo',
1871
 
                             'http://foo.example.com/foo/subdir')
1872
 
        self.assertIsInstance(r, type(t))
1873
 
 
1874
 
    def test_redirected_to_same_host_sibling_protocol(self):
1875
 
        t = self._transport('http://www.example.com/foo')
1876
 
        r = t._redirected_to('http://www.example.com/foo',
1877
 
                             'https://www.example.com/foo')
1878
 
        self.assertIsInstance(r, type(t))
1879
 
 
1880
 
    def test_redirected_to_same_host_different_protocol(self):
1881
 
        t = self._transport('http://www.example.com/foo')
1882
 
        r = t._redirected_to('http://www.example.com/foo',
1883
 
                             'ftp://www.example.com/foo')
1884
 
        self.assertNotEquals(type(r), type(t))
1885
 
 
1886
 
    def test_redirected_to_different_host_same_user(self):
1887
 
        t = self._transport('http://joe@www.example.com/foo')
1888
 
        r = t._redirected_to('http://www.example.com/foo',
1889
 
                             'https://foo.example.com/foo')
1890
 
        self.assertIsInstance(r, type(t))
1891
 
        self.assertEquals(t._user, r._user)
1892
 
 
1893
 
 
1894
 
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1895
 
    """Request handler for a unique and pre-defined request.
1896
 
 
1897
 
    The only thing we care about here is how many bytes travel on the wire. But
1898
 
    since we want to measure it for a real http client, we have to send it
1899
 
    correct responses.
1900
 
 
1901
 
    We expect to receive a *single* request nothing more (and we won't even
1902
 
    check what request it is, we just measure the bytes read until an empty
1903
 
    line.
1904
 
    """
1905
 
 
1906
 
    def handle_one_request(self):
1907
 
        tcs = self.server.test_case_server
1908
 
        requestline = self.rfile.readline()
1909
 
        headers = self.MessageClass(self.rfile, 0)
1910
 
        # We just read: the request, the headers, an empty line indicating the
1911
 
        # end of the headers.
1912
 
        bytes_read = len(requestline)
1913
 
        for line in headers.headers:
1914
 
            bytes_read += len(line)
1915
 
        bytes_read += len('\r\n')
1916
 
        if requestline.startswith('POST'):
1917
 
            # The body should be a single line (or we don't know where it ends
1918
 
            # and we don't want to issue a blocking read)
1919
 
            body = self.rfile.readline()
1920
 
            bytes_read += len(body)
1921
 
        tcs.bytes_read = bytes_read
1922
 
 
1923
 
        # We set the bytes written *before* issuing the write, the client is
1924
 
        # supposed to consume every produced byte *before* checking that value.
1925
 
 
1926
 
        # Doing the oppposite may lead to test failure: we may be interrupted
1927
 
        # after the write but before updating the value. The client can then
1928
 
        # continue and read the value *before* we can update it. And yes,
1929
 
        # this has been observed -- vila 20090129
1930
 
        tcs.bytes_written = len(tcs.canned_response)
1931
 
        self.wfile.write(tcs.canned_response)
1932
 
 
1933
 
 
1934
 
class ActivityServerMixin(object):
1935
 
 
1936
 
    def __init__(self, protocol_version):
1937
 
        super(ActivityServerMixin, self).__init__(
1938
 
            request_handler=PredefinedRequestHandler,
1939
 
            protocol_version=protocol_version)
1940
 
        # Bytes read and written by the server
1941
 
        self.bytes_read = 0
1942
 
        self.bytes_written = 0
1943
 
        self.canned_response = None
1944
 
 
1945
 
 
1946
 
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
1947
 
    pass
1948
 
 
1949
 
 
1950
 
if tests.HTTPSServerFeature.available():
1951
 
    from bzrlib.tests import https_server
1952
 
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
1953
 
        pass
1954
 
 
1955
 
 
1956
 
class TestActivity(tests.TestCase):
1957
 
    """Test socket activity reporting.
1958
 
 
1959
 
    We use a special purpose server to control the bytes sent and received and
1960
 
    be able to predict the activity on the client socket.
1961
 
    """
1962
 
 
1963
 
    def setUp(self):
1964
 
        tests.TestCase.setUp(self)
1965
 
        self.server = self._activity_server(self._protocol_version)
1966
 
        self.server.setUp()
1967
 
        self.activities = {}
1968
 
        def report_activity(t, bytes, direction):
1969
 
            count = self.activities.get(direction, 0)
1970
 
            count += bytes
1971
 
            self.activities[direction] = count
1972
 
 
1973
 
        # We override at class level because constructors may propagate the
1974
 
        # bound method and render instance overriding ineffective (an
1975
 
        # alternative would be to define a specific ui factory instead...)
1976
 
        self.orig_report_activity = self._transport._report_activity
1977
 
        self._transport._report_activity = report_activity
1978
 
 
1979
 
    def tearDown(self):
1980
 
        self._transport._report_activity = self.orig_report_activity
1981
 
        self.server.tearDown()
1982
 
        tests.TestCase.tearDown(self)
1983
 
 
1984
 
    def get_transport(self):
1985
 
        return self._transport(self.server.get_url())
1986
 
 
1987
 
    def assertActivitiesMatch(self):
1988
 
        self.assertEqual(self.server.bytes_read,
1989
 
                         self.activities.get('write', 0), 'written bytes')
1990
 
        self.assertEqual(self.server.bytes_written,
1991
 
                         self.activities.get('read', 0), 'read bytes')
1992
 
 
1993
 
    def test_get(self):
1994
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
1995
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
1996
 
Server: Apache/2.0.54 (Fedora)\r
1997
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
1998
 
ETag: "56691-23-38e9ae00"\r
1999
 
Accept-Ranges: bytes\r
2000
 
Content-Length: 35\r
2001
 
Connection: close\r
2002
 
Content-Type: text/plain; charset=UTF-8\r
2003
 
\r
2004
 
Bazaar-NG meta directory, format 1
2005
 
'''
2006
 
        t = self.get_transport()
2007
 
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
2008
 
                         t.get('foo/bar').read())
2009
 
        self.assertActivitiesMatch()
2010
 
 
2011
 
    def test_has(self):
2012
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2013
 
Server: SimpleHTTP/0.6 Python/2.5.2\r
2014
 
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
2015
 
Content-type: application/octet-stream\r
2016
 
Content-Length: 20\r
2017
 
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
2018
 
\r
2019
 
'''
2020
 
        t = self.get_transport()
2021
 
        self.assertTrue(t.has('foo/bar'))
2022
 
        self.assertActivitiesMatch()
2023
 
 
2024
 
    def test_readv(self):
2025
 
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
2026
 
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
2027
 
Server: Apache/2.0.54 (Fedora)\r
2028
 
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
2029
 
ETag: "238a3c-16ec2-805c5540"\r
2030
 
Accept-Ranges: bytes\r
2031
 
Content-Length: 1534\r
2032
 
Connection: close\r
2033
 
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
2034
 
\r
2035
 
\r
2036
 
--418470f848b63279b\r
2037
 
Content-type: text/plain; charset=UTF-8\r
2038
 
Content-range: bytes 0-254/93890\r
2039
 
\r
2040
 
mbp@sourcefrog.net-20050309040815-13242001617e4a06
2041
 
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
2042
 
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
2043
 
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
2044
 
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
2045
 
\r
2046
 
--418470f848b63279b\r
2047
 
Content-type: text/plain; charset=UTF-8\r
2048
 
Content-range: bytes 1000-2049/93890\r
2049
 
\r
2050
 
40-fd4ec249b6b139ab
2051
 
mbp@sourcefrog.net-20050311063625-07858525021f270b
2052
 
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
2053
 
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
2054
 
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
2055
 
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
2056
 
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
2057
 
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
2058
 
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
2059
 
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
2060
 
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
2061
 
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
2062
 
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
2063
 
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
2064
 
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
2065
 
mbp@sourcefrog.net-20050313120651-497bd231b19df600
2066
 
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
2067
 
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
2068
 
mbp@sourcefrog.net-20050314025539-637a636692c055cf
2069
 
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
2070
 
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
2071
 
mbp@source\r
2072
 
--418470f848b63279b--\r
2073
 
'''
2074
 
        t = self.get_transport()
2075
 
        # Remember that the request is ignored and that the ranges below
2076
 
        # doesn't have to match the canned response.
2077
 
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
2078
 
        self.assertEqual(2, len(l))
2079
 
        self.assertActivitiesMatch()
2080
 
 
2081
 
    def test_post(self):
2082
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2083
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
2084
 
Server: Apache/2.0.54 (Fedora)\r
2085
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
2086
 
ETag: "56691-23-38e9ae00"\r
2087
 
Accept-Ranges: bytes\r
2088
 
Content-Length: 35\r
2089
 
Connection: close\r
2090
 
Content-Type: text/plain; charset=UTF-8\r
2091
 
\r
2092
 
lalala whatever as long as itsssss
2093
 
'''
2094
 
        t = self.get_transport()
2095
 
        # We must send a single line of body bytes, see
2096
 
        # PredefinedRequestHandler.handle_one_request
2097
 
        code, f = t._post('abc def end-of-body\n')
2098
 
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2099
 
        self.assertActivitiesMatch()
 
1750
                          t.send_http_smart_request, 'whatever')
 
1751