~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: John Arbash Meinel
  • Date: 2008-09-05 02:29:34 UTC
  • mto: (3697.7.4 1.7)
  • mto: This revision was merged to the branch mainline in revision 3748.
  • Revision ID: john@arbash-meinel.com-20080905022934-s8692mbwpkdwi106
Cleanups to the algorithm documentation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
34
34
 
35
35
import bzrlib
36
36
from bzrlib import (
37
 
    bzrdir,
38
37
    config,
39
38
    errors,
40
39
    osutils,
41
 
    remote as _mod_remote,
42
40
    tests,
43
41
    transport,
44
42
    ui,
45
43
    urlutils,
46
44
    )
47
 
from bzrlib.symbol_versioning import (
48
 
    deprecated_in,
49
 
    )
50
45
from bzrlib.tests import (
51
46
    http_server,
52
47
    http_utils,
68
63
    pycurl_present = False
69
64
 
70
65
 
 
66
class TransportAdapter(tests.TestScenarioApplier):
 
67
    """Generate the same test for each transport implementation."""
 
68
 
 
69
    def __init__(self):
 
70
        transport_scenarios = [
 
71
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
72
                            _server=http_server.HttpServer_urllib,
 
73
                            _qualified_prefix='http+urllib',)),
 
74
            ]
 
75
        if pycurl_present:
 
76
            transport_scenarios.append(
 
77
                ('pycurl', dict(_transport=PyCurlTransport,
 
78
                                _server=http_server.HttpServer_PyCurl,
 
79
                                _qualified_prefix='http+pycurl',)))
 
80
        self.scenarios = transport_scenarios
 
81
 
 
82
 
 
83
class TransportProtocolAdapter(TransportAdapter):
 
84
    """Generate the same test for each protocol implementation.
 
85
 
 
86
    In addition to the transport adaptatation that we inherit from.
 
87
    """
 
88
 
 
89
    def __init__(self):
 
90
        super(TransportProtocolAdapter, self).__init__()
 
91
        protocol_scenarios = [
 
92
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
93
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
94
            ]
 
95
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
96
                                                  protocol_scenarios)
 
97
 
 
98
 
 
99
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
 
100
    """Generate the same test for each authentication scheme implementation.
 
101
 
 
102
    In addition to the protocol adaptatation that we inherit from.
 
103
    """
 
104
 
 
105
    def __init__(self):
 
106
        super(TransportProtocolAuthenticationAdapter, self).__init__()
 
107
        auth_scheme_scenarios = [
 
108
            ('basic', dict(_auth_scheme='basic')),
 
109
            ('digest', dict(_auth_scheme='digest')),
 
110
            ]
 
111
 
 
112
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
113
                                                  auth_scheme_scenarios)
 
114
 
71
115
def load_tests(standard_tests, module, loader):
72
116
    """Multiply tests for http clients and protocol versions."""
73
 
    result = loader.suiteClass()
74
 
 
75
 
    # one for each transport implementation
76
 
    t_tests, remaining_tests = tests.split_suite_by_condition(
77
 
        standard_tests, tests.condition_isinstance((
78
 
                TestHttpTransportRegistration,
 
117
    # one for each transport
 
118
    t_adapter = TransportAdapter()
 
119
    t_classes= (TestHttpTransportRegistration,
79
120
                TestHttpTransportUrls,
80
 
                Test_redirected_to,
81
 
                )))
82
 
    transport_scenarios = [
83
 
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
84
 
                        _server=http_server.HttpServer_urllib,
85
 
                        _qualified_prefix='http+urllib',)),
86
 
        ]
87
 
    if pycurl_present:
88
 
        transport_scenarios.append(
89
 
            ('pycurl', dict(_transport=PyCurlTransport,
90
 
                            _server=http_server.HttpServer_PyCurl,
91
 
                            _qualified_prefix='http+pycurl',)))
92
 
    tests.multiply_tests(t_tests, transport_scenarios, result)
93
 
 
94
 
    # each implementation tested with each HTTP version
95
 
    tp_tests, remaining_tests = tests.split_suite_by_condition(
96
 
        remaining_tests, tests.condition_isinstance((
97
 
                SmartHTTPTunnellingTest,
98
 
                TestDoCatchRedirections,
99
 
                TestHTTPConnections,
100
 
                TestHTTPRedirections,
101
 
                TestHTTPSilentRedirections,
102
 
                TestLimitedRangeRequestServer,
103
 
                TestPost,
104
 
                TestProxyHttpServer,
105
 
                TestRanges,
106
 
                TestSpecificRequestHandler,
107
 
                )))
108
 
    protocol_scenarios = [
109
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
110
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
111
 
            ]
112
 
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
113
 
                                            protocol_scenarios)
114
 
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
 
 
116
 
    # proxy auth: each auth scheme on all http versions on all implementations.
117
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
 
        remaining_tests, tests.condition_isinstance((
119
 
                TestProxyAuth,
120
 
                )))
121
 
    proxy_auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
124
 
        ('basicdigest',
125
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
126
 
        ]
127
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
 
                                              proxy_auth_scheme_scenarios)
129
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
130
 
 
131
 
    # auth: each auth scheme on all http versions on all implementations.
132
 
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
 
        remaining_tests, tests.condition_isinstance((
134
 
                TestAuth,
135
 
                )))
136
 
    auth_scheme_scenarios = [
137
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
139
 
        ('basicdigest',
140
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
141
 
        ]
142
 
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
 
                                             auth_scheme_scenarios)
144
 
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
145
 
 
146
 
    # activity: on all http[s] versions on all implementations
147
 
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
 
        remaining_tests, tests.condition_isinstance((
149
 
                TestActivity,
150
 
                )))
151
 
    activity_scenarios = [
152
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
 
                             _transport=_urllib.HttpTransport_urllib,)),
154
 
        ]
155
 
    if tests.HTTPSServerFeature.available():
156
 
        activity_scenarios.append(
157
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
159
 
    if pycurl_present:
160
 
        activity_scenarios.append(
161
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
 
                                 _transport=PyCurlTransport,)),)
163
 
        if tests.HTTPSServerFeature.available():
164
 
            from bzrlib.tests import (
165
 
                ssl_certs,
166
121
                )
167
 
            # FIXME: Until we have a better way to handle self-signed
168
 
            # certificates (like allowing them in a test specific
169
 
            # authentication.conf for example), we need some specialized pycurl
170
 
            # transport for tests.
171
 
            class HTTPS_pycurl_transport(PyCurlTransport):
172
 
 
173
 
                def __init__(self, base, _from_transport=None):
174
 
                    super(HTTPS_pycurl_transport, self).__init__(
175
 
                        base, _from_transport)
176
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
177
 
 
178
 
            activity_scenarios.append(
179
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
 
                                      _transport=HTTPS_pycurl_transport,)),)
181
 
 
182
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
183
 
                                               protocol_scenarios)
184
 
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
185
 
 
186
 
    # No parametrization for the remaining tests
187
 
    result.addTests(remaining_tests)
188
 
 
 
122
    is_testing_for_transports = tests.condition_isinstance(t_classes)
 
123
 
 
124
    # multiplied by one for each protocol version
 
125
    tp_adapter = TransportProtocolAdapter()
 
126
    tp_classes= (SmartHTTPTunnellingTest,
 
127
                 TestDoCatchRedirections,
 
128
                 TestHTTPConnections,
 
129
                 TestHTTPRedirections,
 
130
                 TestHTTPSilentRedirections,
 
131
                 TestLimitedRangeRequestServer,
 
132
                 TestPost,
 
133
                 TestProxyHttpServer,
 
134
                 TestRanges,
 
135
                 TestSpecificRequestHandler,
 
136
                 )
 
137
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
 
138
 
 
139
    # multiplied by one for each authentication scheme
 
140
    tpa_adapter = TransportProtocolAuthenticationAdapter()
 
141
    tpa_classes = (TestAuth,
 
142
                   )
 
143
    is_also_testing_for_authentication = tests.condition_isinstance(
 
144
        tpa_classes)
 
145
 
 
146
    result = loader.suiteClass()
 
147
    for test_class in tests.iter_suite_tests(standard_tests):
 
148
        # Each test class is either standalone or testing for some combination
 
149
        # of transport, protocol version, authentication scheme. Use the right
 
150
        # adpater (or none) depending on the class.
 
151
        if is_testing_for_transports(test_class):
 
152
            result.addTests(t_adapter.adapt(test_class))
 
153
        elif is_also_testing_for_protocols(test_class):
 
154
            result.addTests(tp_adapter.adapt(test_class))
 
155
        elif is_also_testing_for_authentication(test_class):
 
156
            result.addTests(tpa_adapter.adapt(test_class))
 
157
        else:
 
158
            result.addTest(test_class)
189
159
    return result
190
160
 
191
161
 
200
170
 
201
171
class RecordingServer(object):
202
172
    """A fake HTTP server.
203
 
 
 
173
    
204
174
    It records the bytes sent to it, and replies with a 200.
205
175
    """
206
176
 
255
225
        self.port = None
256
226
 
257
227
 
258
 
class TestAuthHeader(tests.TestCase):
259
 
 
260
 
    def parse_header(self, header, auth_handler_class=None):
261
 
        if auth_handler_class is None:
262
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
263
 
        self.auth_handler =  auth_handler_class()
264
 
        return self.auth_handler._parse_auth_header(header)
265
 
 
266
 
    def test_empty_header(self):
267
 
        scheme, remainder = self.parse_header('')
268
 
        self.assertEquals('', scheme)
269
 
        self.assertIs(None, remainder)
270
 
 
271
 
    def test_negotiate_header(self):
272
 
        scheme, remainder = self.parse_header('Negotiate')
273
 
        self.assertEquals('negotiate', scheme)
274
 
        self.assertIs(None, remainder)
275
 
 
276
 
    def test_basic_header(self):
277
 
        scheme, remainder = self.parse_header(
278
 
            'Basic realm="Thou should not pass"')
279
 
        self.assertEquals('basic', scheme)
280
 
        self.assertEquals('realm="Thou should not pass"', remainder)
281
 
 
282
 
    def test_basic_extract_realm(self):
283
 
        scheme, remainder = self.parse_header(
284
 
            'Basic realm="Thou should not pass"',
285
 
            _urllib2_wrappers.BasicAuthHandler)
286
 
        match, realm = self.auth_handler.extract_realm(remainder)
287
 
        self.assertTrue(match is not None)
288
 
        self.assertEquals('Thou should not pass', realm)
289
 
 
290
 
    def test_digest_header(self):
291
 
        scheme, remainder = self.parse_header(
292
 
            'Digest realm="Thou should not pass"')
293
 
        self.assertEquals('digest', scheme)
294
 
        self.assertEquals('realm="Thou should not pass"', remainder)
295
 
 
296
 
 
297
228
class TestHTTPServer(tests.TestCase):
298
229
    """Test the HTTP servers implementations."""
299
230
 
516
447
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
517
448
            % bzrlib.__version__) > -1)
518
449
 
 
450
    def test_get_smart_medium(self):
 
451
        # For HTTP, get_smart_medium should return the transport object.
 
452
        server = self.get_readonly_server()
 
453
        http_transport = self._transport(server.get_url())
 
454
        medium = http_transport.get_smart_medium()
 
455
        self.assertIs(medium, http_transport)
 
456
 
519
457
    def test_has_on_bogus_host(self):
520
458
        # Get a free address and don't 'accept' on it, so that we
521
459
        # can be sure there is no http handler there, but set a
878
816
        # bytes on the socket
879
817
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
880
818
        self.assertEqual((0, '0'), ireadv.next())
881
 
        # The server should have issued one request so far
 
819
        # The server should have issued one request so far 
882
820
        self.assertEqual(1, server.GET_request_nb)
883
821
        self.assertEqual('0123456789', t.get_bytes('a'))
884
822
        # get_bytes issued an additional request, the readv pending ones are
1205
1143
        url = self.server.get_url()
1206
1144
        t = self._transport(url)
1207
1145
        try:
1208
 
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1146
            self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
1209
1147
        finally:
1210
1148
            self._restore_env()
1211
1149
 
1214
1152
        url = self.server.get_url()
1215
1153
        t = self._transport(url)
1216
1154
        try:
1217
 
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1155
            self.assertEqual(t.get('foo').read(), 'contents of foo\n')
1218
1156
        finally:
1219
1157
            self._restore_env()
1220
1158
 
1330
1268
                                  ('bundle',
1331
1269
                                  '# Bazaar revision bundle v0.9\n#\n')
1332
1270
                                  ],)
1333
 
        # The requests to the old server will be redirected to the new server
 
1271
 
1334
1272
        self.old_transport = self._transport(self.old_server.get_url())
1335
1273
 
1336
1274
    def test_redirected(self):
1341
1279
    def test_read_redirected_bundle_from_url(self):
1342
1280
        from bzrlib.bundle import read_bundle_from_url
1343
1281
        url = self.old_transport.abspath('bundle')
1344
 
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1345
 
                read_bundle_from_url, url)
 
1282
        bundle = read_bundle_from_url(url)
1346
1283
        # If read_bundle_from_url was successful we get an empty bundle
1347
1284
        self.assertEqual([], bundle.revisions)
1348
1285
 
1359
1296
        # Since the tests using this class will replace
1360
1297
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1361
1298
        # or we'll loop.
1362
 
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
 
1299
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
1363
1300
        self.follow_redirections = True
1364
1301
 
1365
1302
 
1492
1429
 
1493
1430
    _auth_header = 'Authorization'
1494
1431
    _password_prompt_prefix = ''
1495
 
    _username_prompt_prefix = ''
1496
 
    # Set by load_tests
1497
 
    _auth_server = None
1498
1432
 
1499
1433
    def setUp(self):
1500
1434
        super(TestAuth, self).setUp()
1503
1437
                                  ('b', 'contents of b\n'),])
1504
1438
 
1505
1439
    def create_transport_readonly_server(self):
1506
 
        return self._auth_server(protocol_version=self._protocol_version)
 
1440
        if self._auth_scheme == 'basic':
 
1441
            server = http_utils.HTTPBasicAuthServer(
 
1442
                protocol_version=self._protocol_version)
 
1443
        else:
 
1444
            if self._auth_scheme != 'digest':
 
1445
                raise AssertionError('Unknown auth scheme: %r'
 
1446
                                     % self._auth_scheme)
 
1447
            server = http_utils.HTTPDigestAuthServer(
 
1448
                protocol_version=self._protocol_version)
 
1449
        return server
1507
1450
 
1508
1451
    def _testing_pycurl(self):
1509
1452
        return pycurl_present and self._transport == PyCurlTransport
1510
1453
 
1511
 
    def get_user_url(self, user, password):
 
1454
    def get_user_url(self, user=None, password=None):
1512
1455
        """Build an url embedding user and password"""
1513
1456
        url = '%s://' % self.server._url_protocol
1514
1457
        if user is not None:
1519
1462
        url += '%s:%s/' % (self.server.host, self.server.port)
1520
1463
        return url
1521
1464
 
1522
 
    def get_user_transport(self, user, password):
 
1465
    def get_user_transport(self, user=None, password=None):
1523
1466
        return self._transport(self.get_user_url(user, password))
1524
1467
 
1525
1468
    def test_no_user(self):
1526
1469
        self.server.add_user('joe', 'foo')
1527
 
        t = self.get_user_transport(None, None)
 
1470
        t = self.get_user_transport()
1528
1471
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1529
1472
        # Only one 'Authentication Required' error should occur
1530
1473
        self.assertEqual(1, self.server.auth_required_errors)
1560
1503
        # initial 'who are you' and 'this is not you, who are you')
1561
1504
        self.assertEqual(2, self.server.auth_required_errors)
1562
1505
 
1563
 
    def test_prompt_for_username(self):
1564
 
        if self._testing_pycurl():
1565
 
            raise tests.TestNotApplicable(
1566
 
                'pycurl cannot prompt, it handles auth by embedding'
1567
 
                ' user:pass in urls only')
1568
 
 
1569
 
        self.server.add_user('joe', 'foo')
1570
 
        t = self.get_user_transport(None, None)
1571
 
        stdout = tests.StringIOWrapper()
1572
 
        stderr = tests.StringIOWrapper()
1573
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1574
 
                                            stdout=stdout, stderr=stderr)
1575
 
        self.assertEqual('contents of a\n',t.get('a').read())
1576
 
        # stdin should be empty
1577
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1578
 
        stderr.seek(0)
1579
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1580
 
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1581
 
        self.assertEquals('', stdout.getvalue())
1582
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1583
 
                                    stderr.readline())
1584
 
 
1585
1506
    def test_prompt_for_password(self):
1586
1507
        if self._testing_pycurl():
1587
1508
            raise tests.TestNotApplicable(
1591
1512
        self.server.add_user('joe', 'foo')
1592
1513
        t = self.get_user_transport('joe', None)
1593
1514
        stdout = tests.StringIOWrapper()
1594
 
        stderr = tests.StringIOWrapper()
1595
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1596
 
                                            stdout=stdout, stderr=stderr)
1597
 
        self.assertEqual('contents of a\n', t.get('a').read())
 
1515
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1516
        self.assertEqual('contents of a\n',t.get('a').read())
1598
1517
        # stdin should be empty
1599
1518
        self.assertEqual('', ui.ui_factory.stdin.readline())
1600
1519
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1601
 
                                    stderr.getvalue())
1602
 
        self.assertEquals('', stdout.getvalue())
 
1520
                                    stdout.getvalue())
1603
1521
        # And we shouldn't prompt again for a different request
1604
1522
        # against the same transport.
1605
1523
        self.assertEqual('contents of b\n',t.get('b').read())
1617
1535
                                 self.server.auth_realm)))
1618
1536
        self.assertEquals(expected_prompt, actual_prompt)
1619
1537
 
1620
 
    def _expected_username_prompt(self, scheme):
1621
 
        return (self._username_prompt_prefix
1622
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1623
 
                                 self.server.host, self.server.port,
1624
 
                                 self.server.auth_realm))
1625
 
 
1626
1538
    def test_no_prompt_for_password_when_using_auth_config(self):
1627
1539
        if self._testing_pycurl():
1628
1540
            raise tests.TestNotApplicable(
1649
1561
        # Only one 'Authentication Required' error should occur
1650
1562
        self.assertEqual(1, self.server.auth_required_errors)
1651
1563
 
1652
 
    def test_user_from_auth_conf(self):
1653
 
        if self._testing_pycurl():
1654
 
            raise tests.TestNotApplicable(
1655
 
                'pycurl does not support authentication.conf')
1656
 
        user = 'joe'
1657
 
        password = 'foo'
1658
 
        self.server.add_user(user, password)
1659
 
        # Create a minimal config file with the right password
1660
 
        conf = config.AuthenticationConfig()
1661
 
        conf._get_config().update(
1662
 
            {'httptest': {'scheme': 'http', 'port': self.server.port,
1663
 
                          'user': user, 'password': password}})
1664
 
        conf._save()
1665
 
        t = self.get_user_transport(None, None)
1666
 
        # Issue a request to the server to connect
1667
 
        self.assertEqual('contents of a\n', t.get('a').read())
1668
 
        # Only one 'Authentication Required' error should occur
1669
 
        self.assertEqual(1, self.server.auth_required_errors)
1670
 
 
1671
1564
    def test_changing_nonce(self):
1672
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1673
 
                                     http_utils.ProxyDigestAuthServer):
1674
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1565
        if self._auth_scheme != 'digest':
 
1566
            raise tests.TestNotApplicable('HTTP auth digest only test')
1675
1567
        if self._testing_pycurl():
1676
1568
            raise tests.KnownFailure(
1677
1569
                'pycurl does not handle a nonce change')
1695
1587
    """Test proxy authentication schemes."""
1696
1588
 
1697
1589
    _auth_header = 'Proxy-authorization'
1698
 
    _password_prompt_prefix = 'Proxy '
1699
 
    _username_prompt_prefix = 'Proxy '
 
1590
    _password_prompt_prefix='Proxy '
1700
1591
 
1701
1592
    def setUp(self):
1702
1593
        super(TestProxyAuth, self).setUp()
1709
1600
                                  ('b-proxied', 'contents of b\n'),
1710
1601
                                  ])
1711
1602
 
1712
 
    def get_user_transport(self, user, password):
 
1603
    def create_transport_readonly_server(self):
 
1604
        if self._auth_scheme == 'basic':
 
1605
            server = http_utils.ProxyBasicAuthServer(
 
1606
                protocol_version=self._protocol_version)
 
1607
        else:
 
1608
            if self._auth_scheme != 'digest':
 
1609
                raise AssertionError('Unknown auth scheme: %r'
 
1610
                                     % self._auth_scheme)
 
1611
            server = http_utils.ProxyDigestAuthServer(
 
1612
                protocol_version=self._protocol_version)
 
1613
        return server
 
1614
 
 
1615
    def get_user_transport(self, user=None, password=None):
1713
1616
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1714
1617
        return self._transport(self.server.get_url())
1715
1618
 
1764
1667
        return http_utils.HTTPServerWithSmarts(
1765
1668
            protocol_version=self._protocol_version)
1766
1669
 
1767
 
    def test_open_bzrdir(self):
1768
 
        branch = self.make_branch('relpath')
1769
 
        http_server = self.get_readonly_server()
1770
 
        url = http_server.get_url() + 'relpath'
1771
 
        bd = bzrdir.BzrDir.open(url)
1772
 
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1773
 
 
1774
1670
    def test_bulk_data(self):
1775
1671
        # We should be able to send and receive bulk data in a single message.
1776
1672
        # The 'readv' command in the smart protocol both sends and receives
1842
1738
        # No need to build a valid smart request here, the server will not even
1843
1739
        # try to interpret it.
1844
1740
        self.assertRaises(errors.SmartProtocolError,
1845
 
                          t.get_smart_medium().send_http_smart_request,
1846
 
                          'whatever')
1847
 
 
1848
 
class Test_redirected_to(tests.TestCase):
1849
 
 
1850
 
    def test_redirected_to_subdir(self):
1851
 
        t = self._transport('http://www.example.com/foo')
1852
 
        r = t._redirected_to('http://www.example.com/foo',
1853
 
                             'http://www.example.com/foo/subdir')
1854
 
        self.assertIsInstance(r, type(t))
1855
 
        # Both transports share the some connection
1856
 
        self.assertEquals(t._get_connection(), r._get_connection())
1857
 
 
1858
 
    def test_redirected_to_self_with_slash(self):
1859
 
        t = self._transport('http://www.example.com/foo')
1860
 
        r = t._redirected_to('http://www.example.com/foo',
1861
 
                             'http://www.example.com/foo/')
1862
 
        self.assertIsInstance(r, type(t))
1863
 
        # Both transports share the some connection (one can argue that we
1864
 
        # should return the exact same transport here, but that seems
1865
 
        # overkill).
1866
 
        self.assertEquals(t._get_connection(), r._get_connection())
1867
 
 
1868
 
    def test_redirected_to_host(self):
1869
 
        t = self._transport('http://www.example.com/foo')
1870
 
        r = t._redirected_to('http://www.example.com/foo',
1871
 
                             'http://foo.example.com/foo/subdir')
1872
 
        self.assertIsInstance(r, type(t))
1873
 
 
1874
 
    def test_redirected_to_same_host_sibling_protocol(self):
1875
 
        t = self._transport('http://www.example.com/foo')
1876
 
        r = t._redirected_to('http://www.example.com/foo',
1877
 
                             'https://www.example.com/foo')
1878
 
        self.assertIsInstance(r, type(t))
1879
 
 
1880
 
    def test_redirected_to_same_host_different_protocol(self):
1881
 
        t = self._transport('http://www.example.com/foo')
1882
 
        r = t._redirected_to('http://www.example.com/foo',
1883
 
                             'ftp://www.example.com/foo')
1884
 
        self.assertNotEquals(type(r), type(t))
1885
 
 
1886
 
    def test_redirected_to_different_host_same_user(self):
1887
 
        t = self._transport('http://joe@www.example.com/foo')
1888
 
        r = t._redirected_to('http://www.example.com/foo',
1889
 
                             'https://foo.example.com/foo')
1890
 
        self.assertIsInstance(r, type(t))
1891
 
        self.assertEquals(t._user, r._user)
1892
 
 
1893
 
 
1894
 
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1895
 
    """Request handler for a unique and pre-defined request.
1896
 
 
1897
 
    The only thing we care about here is how many bytes travel on the wire. But
1898
 
    since we want to measure it for a real http client, we have to send it
1899
 
    correct responses.
1900
 
 
1901
 
    We expect to receive a *single* request nothing more (and we won't even
1902
 
    check what request it is, we just measure the bytes read until an empty
1903
 
    line.
1904
 
    """
1905
 
 
1906
 
    def handle_one_request(self):
1907
 
        tcs = self.server.test_case_server
1908
 
        requestline = self.rfile.readline()
1909
 
        headers = self.MessageClass(self.rfile, 0)
1910
 
        # We just read: the request, the headers, an empty line indicating the
1911
 
        # end of the headers.
1912
 
        bytes_read = len(requestline)
1913
 
        for line in headers.headers:
1914
 
            bytes_read += len(line)
1915
 
        bytes_read += len('\r\n')
1916
 
        if requestline.startswith('POST'):
1917
 
            # The body should be a single line (or we don't know where it ends
1918
 
            # and we don't want to issue a blocking read)
1919
 
            body = self.rfile.readline()
1920
 
            bytes_read += len(body)
1921
 
        tcs.bytes_read = bytes_read
1922
 
 
1923
 
        # We set the bytes written *before* issuing the write, the client is
1924
 
        # supposed to consume every produced byte *before* checking that value.
1925
 
 
1926
 
        # Doing the oppposite may lead to test failure: we may be interrupted
1927
 
        # after the write but before updating the value. The client can then
1928
 
        # continue and read the value *before* we can update it. And yes,
1929
 
        # this has been observed -- vila 20090129
1930
 
        tcs.bytes_written = len(tcs.canned_response)
1931
 
        self.wfile.write(tcs.canned_response)
1932
 
 
1933
 
 
1934
 
class ActivityServerMixin(object):
1935
 
 
1936
 
    def __init__(self, protocol_version):
1937
 
        super(ActivityServerMixin, self).__init__(
1938
 
            request_handler=PredefinedRequestHandler,
1939
 
            protocol_version=protocol_version)
1940
 
        # Bytes read and written by the server
1941
 
        self.bytes_read = 0
1942
 
        self.bytes_written = 0
1943
 
        self.canned_response = None
1944
 
 
1945
 
 
1946
 
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
1947
 
    pass
1948
 
 
1949
 
 
1950
 
if tests.HTTPSServerFeature.available():
1951
 
    from bzrlib.tests import https_server
1952
 
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
1953
 
        pass
1954
 
 
1955
 
 
1956
 
class TestActivity(tests.TestCase):
1957
 
    """Test socket activity reporting.
1958
 
 
1959
 
    We use a special purpose server to control the bytes sent and received and
1960
 
    be able to predict the activity on the client socket.
1961
 
    """
1962
 
 
1963
 
    def setUp(self):
1964
 
        tests.TestCase.setUp(self)
1965
 
        self.server = self._activity_server(self._protocol_version)
1966
 
        self.server.setUp()
1967
 
        self.activities = {}
1968
 
        def report_activity(t, bytes, direction):
1969
 
            count = self.activities.get(direction, 0)
1970
 
            count += bytes
1971
 
            self.activities[direction] = count
1972
 
 
1973
 
        # We override at class level because constructors may propagate the
1974
 
        # bound method and render instance overriding ineffective (an
1975
 
        # alternative would be to define a specific ui factory instead...)
1976
 
        self.orig_report_activity = self._transport._report_activity
1977
 
        self._transport._report_activity = report_activity
1978
 
 
1979
 
    def tearDown(self):
1980
 
        self._transport._report_activity = self.orig_report_activity
1981
 
        self.server.tearDown()
1982
 
        tests.TestCase.tearDown(self)
1983
 
 
1984
 
    def get_transport(self):
1985
 
        return self._transport(self.server.get_url())
1986
 
 
1987
 
    def assertActivitiesMatch(self):
1988
 
        self.assertEqual(self.server.bytes_read,
1989
 
                         self.activities.get('write', 0), 'written bytes')
1990
 
        self.assertEqual(self.server.bytes_written,
1991
 
                         self.activities.get('read', 0), 'read bytes')
1992
 
 
1993
 
    def test_get(self):
1994
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
1995
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
1996
 
Server: Apache/2.0.54 (Fedora)\r
1997
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
1998
 
ETag: "56691-23-38e9ae00"\r
1999
 
Accept-Ranges: bytes\r
2000
 
Content-Length: 35\r
2001
 
Connection: close\r
2002
 
Content-Type: text/plain; charset=UTF-8\r
2003
 
\r
2004
 
Bazaar-NG meta directory, format 1
2005
 
'''
2006
 
        t = self.get_transport()
2007
 
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
2008
 
                         t.get('foo/bar').read())
2009
 
        self.assertActivitiesMatch()
2010
 
 
2011
 
    def test_has(self):
2012
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2013
 
Server: SimpleHTTP/0.6 Python/2.5.2\r
2014
 
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
2015
 
Content-type: application/octet-stream\r
2016
 
Content-Length: 20\r
2017
 
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
2018
 
\r
2019
 
'''
2020
 
        t = self.get_transport()
2021
 
        self.assertTrue(t.has('foo/bar'))
2022
 
        self.assertActivitiesMatch()
2023
 
 
2024
 
    def test_readv(self):
2025
 
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
2026
 
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
2027
 
Server: Apache/2.0.54 (Fedora)\r
2028
 
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
2029
 
ETag: "238a3c-16ec2-805c5540"\r
2030
 
Accept-Ranges: bytes\r
2031
 
Content-Length: 1534\r
2032
 
Connection: close\r
2033
 
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
2034
 
\r
2035
 
\r
2036
 
--418470f848b63279b\r
2037
 
Content-type: text/plain; charset=UTF-8\r
2038
 
Content-range: bytes 0-254/93890\r
2039
 
\r
2040
 
mbp@sourcefrog.net-20050309040815-13242001617e4a06
2041
 
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
2042
 
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
2043
 
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
2044
 
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
2045
 
\r
2046
 
--418470f848b63279b\r
2047
 
Content-type: text/plain; charset=UTF-8\r
2048
 
Content-range: bytes 1000-2049/93890\r
2049
 
\r
2050
 
40-fd4ec249b6b139ab
2051
 
mbp@sourcefrog.net-20050311063625-07858525021f270b
2052
 
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
2053
 
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
2054
 
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
2055
 
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
2056
 
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
2057
 
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
2058
 
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
2059
 
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
2060
 
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
2061
 
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
2062
 
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
2063
 
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
2064
 
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
2065
 
mbp@sourcefrog.net-20050313120651-497bd231b19df600
2066
 
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
2067
 
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
2068
 
mbp@sourcefrog.net-20050314025539-637a636692c055cf
2069
 
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
2070
 
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
2071
 
mbp@source\r
2072
 
--418470f848b63279b--\r
2073
 
'''
2074
 
        t = self.get_transport()
2075
 
        # Remember that the request is ignored and that the ranges below
2076
 
        # doesn't have to match the canned response.
2077
 
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
2078
 
        self.assertEqual(2, len(l))
2079
 
        self.assertActivitiesMatch()
2080
 
 
2081
 
    def test_post(self):
2082
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2083
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
2084
 
Server: Apache/2.0.54 (Fedora)\r
2085
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
2086
 
ETag: "56691-23-38e9ae00"\r
2087
 
Accept-Ranges: bytes\r
2088
 
Content-Length: 35\r
2089
 
Connection: close\r
2090
 
Content-Type: text/plain; charset=UTF-8\r
2091
 
\r
2092
 
lalala whatever as long as itsssss
2093
 
'''
2094
 
        t = self.get_transport()
2095
 
        # We must send a single line of body bytes, see
2096
 
        # PredefinedRequestHandler.handle_one_request
2097
 
        code, f = t._post('abc def end-of-body\n')
2098
 
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2099
 
        self.assertActivitiesMatch()
 
1741
                          t.send_http_smart_request, 'whatever')
 
1742