~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: John Arbash Meinel
  • Date: 2008-09-09 15:09:12 UTC
  • mto: This revision was merged to the branch mainline in revision 3699.
  • Revision ID: john@arbash-meinel.com-20080909150912-wyttm8he1zsls2ck
Use the right timing function on win32

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
44
44
    ui,
45
45
    urlutils,
46
46
    )
47
 
from bzrlib.symbol_versioning import (
48
 
    deprecated_in,
49
 
    )
50
47
from bzrlib.tests import (
51
48
    http_server,
52
49
    http_utils,
68
65
    pycurl_present = False
69
66
 
70
67
 
 
68
class TransportAdapter(tests.TestScenarioApplier):
 
69
    """Generate the same test for each transport implementation."""
 
70
 
 
71
    def __init__(self):
 
72
        transport_scenarios = [
 
73
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
74
                            _server=http_server.HttpServer_urllib,
 
75
                            _qualified_prefix='http+urllib',)),
 
76
            ]
 
77
        if pycurl_present:
 
78
            transport_scenarios.append(
 
79
                ('pycurl', dict(_transport=PyCurlTransport,
 
80
                                _server=http_server.HttpServer_PyCurl,
 
81
                                _qualified_prefix='http+pycurl',)))
 
82
        self.scenarios = transport_scenarios
 
83
 
 
84
 
 
85
class TransportProtocolAdapter(TransportAdapter):
 
86
    """Generate the same test for each protocol implementation.
 
87
 
 
88
    In addition to the transport adaptatation that we inherit from.
 
89
    """
 
90
 
 
91
    def __init__(self):
 
92
        super(TransportProtocolAdapter, self).__init__()
 
93
        protocol_scenarios = [
 
94
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
95
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
96
            ]
 
97
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
98
                                                  protocol_scenarios)
 
99
 
 
100
 
 
101
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
 
102
    """Generate the same test for each authentication scheme implementation.
 
103
 
 
104
    In addition to the protocol adaptatation that we inherit from.
 
105
    """
 
106
 
 
107
    def __init__(self):
 
108
        super(TransportProtocolAuthenticationAdapter, self).__init__()
 
109
        auth_scheme_scenarios = [
 
110
            ('basic', dict(_auth_scheme='basic')),
 
111
            ('digest', dict(_auth_scheme='digest')),
 
112
            ]
 
113
 
 
114
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
115
                                                  auth_scheme_scenarios)
 
116
 
71
117
def load_tests(standard_tests, module, loader):
72
118
    """Multiply tests for http clients and protocol versions."""
73
 
    result = loader.suiteClass()
74
 
 
75
 
    # one for each transport implementation
76
 
    t_tests, remaining_tests = tests.split_suite_by_condition(
77
 
        standard_tests, tests.condition_isinstance((
78
 
                TestHttpTransportRegistration,
 
119
    # one for each transport
 
120
    t_adapter = TransportAdapter()
 
121
    t_classes= (TestHttpTransportRegistration,
79
122
                TestHttpTransportUrls,
80
 
                Test_redirected_to,
81
 
                )))
82
 
    transport_scenarios = [
83
 
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
84
 
                        _server=http_server.HttpServer_urllib,
85
 
                        _qualified_prefix='http+urllib',)),
86
 
        ]
87
 
    if pycurl_present:
88
 
        transport_scenarios.append(
89
 
            ('pycurl', dict(_transport=PyCurlTransport,
90
 
                            _server=http_server.HttpServer_PyCurl,
91
 
                            _qualified_prefix='http+pycurl',)))
92
 
    tests.multiply_tests(t_tests, transport_scenarios, result)
93
 
 
94
 
    # each implementation tested with each HTTP version
95
 
    tp_tests, remaining_tests = tests.split_suite_by_condition(
96
 
        remaining_tests, tests.condition_isinstance((
97
 
                SmartHTTPTunnellingTest,
98
 
                TestDoCatchRedirections,
99
 
                TestHTTPConnections,
100
 
                TestHTTPRedirections,
101
 
                TestHTTPSilentRedirections,
102
 
                TestLimitedRangeRequestServer,
103
 
                TestPost,
104
 
                TestProxyHttpServer,
105
 
                TestRanges,
106
 
                TestSpecificRequestHandler,
107
 
                )))
108
 
    protocol_scenarios = [
109
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
110
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
111
 
            ]
112
 
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
113
 
                                            protocol_scenarios)
114
 
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
 
 
116
 
    # proxy auth: each auth scheme on all http versions on all implementations.
117
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
 
        remaining_tests, tests.condition_isinstance((
119
 
                TestProxyAuth,
120
 
                )))
121
 
    proxy_auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
124
 
        ('basicdigest',
125
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
126
 
        ]
127
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
 
                                              proxy_auth_scheme_scenarios)
129
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
130
 
 
131
 
    # auth: each auth scheme on all http versions on all implementations.
132
 
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
 
        remaining_tests, tests.condition_isinstance((
134
 
                TestAuth,
135
 
                )))
136
 
    auth_scheme_scenarios = [
137
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
139
 
        ('basicdigest',
140
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
141
 
        ]
142
 
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
 
                                             auth_scheme_scenarios)
144
 
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
145
 
 
146
 
    # activity: on all http[s] versions on all implementations
147
 
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
 
        remaining_tests, tests.condition_isinstance((
149
 
                TestActivity,
150
 
                )))
151
 
    activity_scenarios = [
152
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
 
                             _transport=_urllib.HttpTransport_urllib,)),
154
 
        ]
155
 
    if tests.HTTPSServerFeature.available():
156
 
        activity_scenarios.append(
157
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
159
 
    if pycurl_present:
160
 
        activity_scenarios.append(
161
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
 
                                 _transport=PyCurlTransport,)),)
163
 
        if tests.HTTPSServerFeature.available():
164
 
            from bzrlib.tests import (
165
 
                ssl_certs,
166
123
                )
167
 
            # FIXME: Until we have a better way to handle self-signed
168
 
            # certificates (like allowing them in a test specific
169
 
            # authentication.conf for example), we need some specialized pycurl
170
 
            # transport for tests.
171
 
            class HTTPS_pycurl_transport(PyCurlTransport):
172
 
 
173
 
                def __init__(self, base, _from_transport=None):
174
 
                    super(HTTPS_pycurl_transport, self).__init__(
175
 
                        base, _from_transport)
176
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
177
 
 
178
 
            activity_scenarios.append(
179
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
 
                                      _transport=HTTPS_pycurl_transport,)),)
181
 
 
182
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
183
 
                                               protocol_scenarios)
184
 
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
185
 
 
186
 
    # No parametrization for the remaining tests
187
 
    result.addTests(remaining_tests)
188
 
 
 
124
    is_testing_for_transports = tests.condition_isinstance(t_classes)
 
125
 
 
126
    # multiplied by one for each protocol version
 
127
    tp_adapter = TransportProtocolAdapter()
 
128
    tp_classes= (SmartHTTPTunnellingTest,
 
129
                 TestDoCatchRedirections,
 
130
                 TestHTTPConnections,
 
131
                 TestHTTPRedirections,
 
132
                 TestHTTPSilentRedirections,
 
133
                 TestLimitedRangeRequestServer,
 
134
                 TestPost,
 
135
                 TestProxyHttpServer,
 
136
                 TestRanges,
 
137
                 TestSpecificRequestHandler,
 
138
                 )
 
139
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
 
140
 
 
141
    # multiplied by one for each authentication scheme
 
142
    tpa_adapter = TransportProtocolAuthenticationAdapter()
 
143
    tpa_classes = (TestAuth,
 
144
                   )
 
145
    is_also_testing_for_authentication = tests.condition_isinstance(
 
146
        tpa_classes)
 
147
 
 
148
    result = loader.suiteClass()
 
149
    for test_class in tests.iter_suite_tests(standard_tests):
 
150
        # Each test class is either standalone or testing for some combination
 
151
        # of transport, protocol version, authentication scheme. Use the right
 
152
        # adpater (or none) depending on the class.
 
153
        if is_testing_for_transports(test_class):
 
154
            result.addTests(t_adapter.adapt(test_class))
 
155
        elif is_also_testing_for_protocols(test_class):
 
156
            result.addTests(tp_adapter.adapt(test_class))
 
157
        elif is_also_testing_for_authentication(test_class):
 
158
            result.addTests(tpa_adapter.adapt(test_class))
 
159
        else:
 
160
            result.addTest(test_class)
189
161
    return result
190
162
 
191
163
 
200
172
 
201
173
class RecordingServer(object):
202
174
    """A fake HTTP server.
203
 
 
 
175
    
204
176
    It records the bytes sent to it, and replies with a 200.
205
177
    """
206
178
 
255
227
        self.port = None
256
228
 
257
229
 
258
 
class TestAuthHeader(tests.TestCase):
259
 
 
260
 
    def parse_header(self, header, auth_handler_class=None):
261
 
        if auth_handler_class is None:
262
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
263
 
        self.auth_handler =  auth_handler_class()
264
 
        return self.auth_handler._parse_auth_header(header)
265
 
 
266
 
    def test_empty_header(self):
267
 
        scheme, remainder = self.parse_header('')
268
 
        self.assertEquals('', scheme)
269
 
        self.assertIs(None, remainder)
270
 
 
271
 
    def test_negotiate_header(self):
272
 
        scheme, remainder = self.parse_header('Negotiate')
273
 
        self.assertEquals('negotiate', scheme)
274
 
        self.assertIs(None, remainder)
275
 
 
276
 
    def test_basic_header(self):
277
 
        scheme, remainder = self.parse_header(
278
 
            'Basic realm="Thou should not pass"')
279
 
        self.assertEquals('basic', scheme)
280
 
        self.assertEquals('realm="Thou should not pass"', remainder)
281
 
 
282
 
    def test_basic_extract_realm(self):
283
 
        scheme, remainder = self.parse_header(
284
 
            'Basic realm="Thou should not pass"',
285
 
            _urllib2_wrappers.BasicAuthHandler)
286
 
        match, realm = self.auth_handler.extract_realm(remainder)
287
 
        self.assertTrue(match is not None)
288
 
        self.assertEquals('Thou should not pass', realm)
289
 
 
290
 
    def test_digest_header(self):
291
 
        scheme, remainder = self.parse_header(
292
 
            'Digest realm="Thou should not pass"')
293
 
        self.assertEquals('digest', scheme)
294
 
        self.assertEquals('realm="Thou should not pass"', remainder)
295
 
 
296
 
 
297
230
class TestHTTPServer(tests.TestCase):
298
231
    """Test the HTTP servers implementations."""
299
232
 
516
449
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
517
450
            % bzrlib.__version__) > -1)
518
451
 
 
452
    def test_get_smart_medium(self):
 
453
        # For HTTP, get_smart_medium should return the transport object.
 
454
        server = self.get_readonly_server()
 
455
        http_transport = self._transport(server.get_url())
 
456
        medium = http_transport.get_smart_medium()
 
457
        self.assertIs(medium, http_transport)
 
458
 
519
459
    def test_has_on_bogus_host(self):
520
460
        # Get a free address and don't 'accept' on it, so that we
521
461
        # can be sure there is no http handler there, but set a
625
565
        # for details) make no distinction between a closed
626
566
        # socket and badly formatted status line, so we can't
627
567
        # just test for ConnectionError, we have to test
628
 
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
629
 
        # instead of ConnectionError too.
630
 
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
631
 
                            errors.InvalidHttpResponse),
 
568
        # InvalidHttpResponse too.
 
569
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
632
570
                          t.has, 'foo/bar')
633
571
 
634
572
    def test_http_get(self):
635
573
        server = self.get_readonly_server()
636
574
        t = self._transport(server.get_url())
637
 
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
638
 
                           errors.InvalidHttpResponse),
 
575
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
639
576
                          t.get, 'foo/bar')
640
577
 
641
578
 
881
818
        # bytes on the socket
882
819
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
883
820
        self.assertEqual((0, '0'), ireadv.next())
884
 
        # The server should have issued one request so far
 
821
        # The server should have issued one request so far 
885
822
        self.assertEqual(1, server.GET_request_nb)
886
823
        self.assertEqual('0123456789', t.get_bytes('a'))
887
824
        # get_bytes issued an additional request, the readv pending ones are
1208
1145
        url = self.server.get_url()
1209
1146
        t = self._transport(url)
1210
1147
        try:
1211
 
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1148
            self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
1212
1149
        finally:
1213
1150
            self._restore_env()
1214
1151
 
1217
1154
        url = self.server.get_url()
1218
1155
        t = self._transport(url)
1219
1156
        try:
1220
 
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1157
            self.assertEqual(t.get('foo').read(), 'contents of foo\n')
1221
1158
        finally:
1222
1159
            self._restore_env()
1223
1160
 
1333
1270
                                  ('bundle',
1334
1271
                                  '# Bazaar revision bundle v0.9\n#\n')
1335
1272
                                  ],)
1336
 
        # The requests to the old server will be redirected to the new server
 
1273
 
1337
1274
        self.old_transport = self._transport(self.old_server.get_url())
1338
1275
 
1339
1276
    def test_redirected(self):
1344
1281
    def test_read_redirected_bundle_from_url(self):
1345
1282
        from bzrlib.bundle import read_bundle_from_url
1346
1283
        url = self.old_transport.abspath('bundle')
1347
 
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1348
 
                read_bundle_from_url, url)
 
1284
        bundle = read_bundle_from_url(url)
1349
1285
        # If read_bundle_from_url was successful we get an empty bundle
1350
1286
        self.assertEqual([], bundle.revisions)
1351
1287
 
1362
1298
        # Since the tests using this class will replace
1363
1299
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1364
1300
        # or we'll loop.
1365
 
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
 
1301
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
1366
1302
        self.follow_redirections = True
1367
1303
 
1368
1304
 
1495
1431
 
1496
1432
    _auth_header = 'Authorization'
1497
1433
    _password_prompt_prefix = ''
1498
 
    _username_prompt_prefix = ''
1499
 
    # Set by load_tests
1500
 
    _auth_server = None
1501
1434
 
1502
1435
    def setUp(self):
1503
1436
        super(TestAuth, self).setUp()
1506
1439
                                  ('b', 'contents of b\n'),])
1507
1440
 
1508
1441
    def create_transport_readonly_server(self):
1509
 
        return self._auth_server(protocol_version=self._protocol_version)
 
1442
        if self._auth_scheme == 'basic':
 
1443
            server = http_utils.HTTPBasicAuthServer(
 
1444
                protocol_version=self._protocol_version)
 
1445
        else:
 
1446
            if self._auth_scheme != 'digest':
 
1447
                raise AssertionError('Unknown auth scheme: %r'
 
1448
                                     % self._auth_scheme)
 
1449
            server = http_utils.HTTPDigestAuthServer(
 
1450
                protocol_version=self._protocol_version)
 
1451
        return server
1510
1452
 
1511
1453
    def _testing_pycurl(self):
1512
1454
        return pycurl_present and self._transport == PyCurlTransport
1513
1455
 
1514
 
    def get_user_url(self, user, password):
 
1456
    def get_user_url(self, user=None, password=None):
1515
1457
        """Build an url embedding user and password"""
1516
1458
        url = '%s://' % self.server._url_protocol
1517
1459
        if user is not None:
1522
1464
        url += '%s:%s/' % (self.server.host, self.server.port)
1523
1465
        return url
1524
1466
 
1525
 
    def get_user_transport(self, user, password):
 
1467
    def get_user_transport(self, user=None, password=None):
1526
1468
        return self._transport(self.get_user_url(user, password))
1527
1469
 
1528
1470
    def test_no_user(self):
1529
1471
        self.server.add_user('joe', 'foo')
1530
 
        t = self.get_user_transport(None, None)
 
1472
        t = self.get_user_transport()
1531
1473
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1532
1474
        # Only one 'Authentication Required' error should occur
1533
1475
        self.assertEqual(1, self.server.auth_required_errors)
1563
1505
        # initial 'who are you' and 'this is not you, who are you')
1564
1506
        self.assertEqual(2, self.server.auth_required_errors)
1565
1507
 
1566
 
    def test_prompt_for_username(self):
1567
 
        if self._testing_pycurl():
1568
 
            raise tests.TestNotApplicable(
1569
 
                'pycurl cannot prompt, it handles auth by embedding'
1570
 
                ' user:pass in urls only')
1571
 
 
1572
 
        self.server.add_user('joe', 'foo')
1573
 
        t = self.get_user_transport(None, None)
1574
 
        stdout = tests.StringIOWrapper()
1575
 
        stderr = tests.StringIOWrapper()
1576
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1577
 
                                            stdout=stdout, stderr=stderr)
1578
 
        self.assertEqual('contents of a\n',t.get('a').read())
1579
 
        # stdin should be empty
1580
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1581
 
        stderr.seek(0)
1582
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1583
 
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1584
 
        self.assertEquals('', stdout.getvalue())
1585
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1586
 
                                    stderr.readline())
1587
 
 
1588
1508
    def test_prompt_for_password(self):
1589
1509
        if self._testing_pycurl():
1590
1510
            raise tests.TestNotApplicable(
1594
1514
        self.server.add_user('joe', 'foo')
1595
1515
        t = self.get_user_transport('joe', None)
1596
1516
        stdout = tests.StringIOWrapper()
1597
 
        stderr = tests.StringIOWrapper()
1598
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1599
 
                                            stdout=stdout, stderr=stderr)
1600
 
        self.assertEqual('contents of a\n', t.get('a').read())
 
1517
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1518
        self.assertEqual('contents of a\n',t.get('a').read())
1601
1519
        # stdin should be empty
1602
1520
        self.assertEqual('', ui.ui_factory.stdin.readline())
1603
1521
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1604
 
                                    stderr.getvalue())
1605
 
        self.assertEquals('', stdout.getvalue())
 
1522
                                    stdout.getvalue())
1606
1523
        # And we shouldn't prompt again for a different request
1607
1524
        # against the same transport.
1608
1525
        self.assertEqual('contents of b\n',t.get('b').read())
1620
1537
                                 self.server.auth_realm)))
1621
1538
        self.assertEquals(expected_prompt, actual_prompt)
1622
1539
 
1623
 
    def _expected_username_prompt(self, scheme):
1624
 
        return (self._username_prompt_prefix
1625
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1626
 
                                 self.server.host, self.server.port,
1627
 
                                 self.server.auth_realm))
1628
 
 
1629
1540
    def test_no_prompt_for_password_when_using_auth_config(self):
1630
1541
        if self._testing_pycurl():
1631
1542
            raise tests.TestNotApplicable(
1652
1563
        # Only one 'Authentication Required' error should occur
1653
1564
        self.assertEqual(1, self.server.auth_required_errors)
1654
1565
 
1655
 
    def test_user_from_auth_conf(self):
1656
 
        if self._testing_pycurl():
1657
 
            raise tests.TestNotApplicable(
1658
 
                'pycurl does not support authentication.conf')
1659
 
        user = 'joe'
1660
 
        password = 'foo'
1661
 
        self.server.add_user(user, password)
1662
 
        # Create a minimal config file with the right password
1663
 
        conf = config.AuthenticationConfig()
1664
 
        conf._get_config().update(
1665
 
            {'httptest': {'scheme': 'http', 'port': self.server.port,
1666
 
                          'user': user, 'password': password}})
1667
 
        conf._save()
1668
 
        t = self.get_user_transport(None, None)
1669
 
        # Issue a request to the server to connect
1670
 
        self.assertEqual('contents of a\n', t.get('a').read())
1671
 
        # Only one 'Authentication Required' error should occur
1672
 
        self.assertEqual(1, self.server.auth_required_errors)
1673
 
 
1674
1566
    def test_changing_nonce(self):
1675
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1676
 
                                     http_utils.ProxyDigestAuthServer):
1677
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1567
        if self._auth_scheme != 'digest':
 
1568
            raise tests.TestNotApplicable('HTTP auth digest only test')
1678
1569
        if self._testing_pycurl():
1679
1570
            raise tests.KnownFailure(
1680
1571
                'pycurl does not handle a nonce change')
1698
1589
    """Test proxy authentication schemes."""
1699
1590
 
1700
1591
    _auth_header = 'Proxy-authorization'
1701
 
    _password_prompt_prefix = 'Proxy '
1702
 
    _username_prompt_prefix = 'Proxy '
 
1592
    _password_prompt_prefix='Proxy '
1703
1593
 
1704
1594
    def setUp(self):
1705
1595
        super(TestProxyAuth, self).setUp()
1712
1602
                                  ('b-proxied', 'contents of b\n'),
1713
1603
                                  ])
1714
1604
 
1715
 
    def get_user_transport(self, user, password):
 
1605
    def create_transport_readonly_server(self):
 
1606
        if self._auth_scheme == 'basic':
 
1607
            server = http_utils.ProxyBasicAuthServer(
 
1608
                protocol_version=self._protocol_version)
 
1609
        else:
 
1610
            if self._auth_scheme != 'digest':
 
1611
                raise AssertionError('Unknown auth scheme: %r'
 
1612
                                     % self._auth_scheme)
 
1613
            server = http_utils.ProxyDigestAuthServer(
 
1614
                protocol_version=self._protocol_version)
 
1615
        return server
 
1616
 
 
1617
    def get_user_transport(self, user=None, password=None):
1716
1618
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1717
1619
        return self._transport(self.server.get_url())
1718
1620
 
1845
1747
        # No need to build a valid smart request here, the server will not even
1846
1748
        # try to interpret it.
1847
1749
        self.assertRaises(errors.SmartProtocolError,
1848
 
                          t.get_smart_medium().send_http_smart_request,
1849
 
                          'whatever')
1850
 
 
1851
 
class Test_redirected_to(tests.TestCase):
1852
 
 
1853
 
    def test_redirected_to_subdir(self):
1854
 
        t = self._transport('http://www.example.com/foo')
1855
 
        r = t._redirected_to('http://www.example.com/foo',
1856
 
                             'http://www.example.com/foo/subdir')
1857
 
        self.assertIsInstance(r, type(t))
1858
 
        # Both transports share the some connection
1859
 
        self.assertEquals(t._get_connection(), r._get_connection())
1860
 
 
1861
 
    def test_redirected_to_self_with_slash(self):
1862
 
        t = self._transport('http://www.example.com/foo')
1863
 
        r = t._redirected_to('http://www.example.com/foo',
1864
 
                             'http://www.example.com/foo/')
1865
 
        self.assertIsInstance(r, type(t))
1866
 
        # Both transports share the some connection (one can argue that we
1867
 
        # should return the exact same transport here, but that seems
1868
 
        # overkill).
1869
 
        self.assertEquals(t._get_connection(), r._get_connection())
1870
 
 
1871
 
    def test_redirected_to_host(self):
1872
 
        t = self._transport('http://www.example.com/foo')
1873
 
        r = t._redirected_to('http://www.example.com/foo',
1874
 
                             'http://foo.example.com/foo/subdir')
1875
 
        self.assertIsInstance(r, type(t))
1876
 
 
1877
 
    def test_redirected_to_same_host_sibling_protocol(self):
1878
 
        t = self._transport('http://www.example.com/foo')
1879
 
        r = t._redirected_to('http://www.example.com/foo',
1880
 
                             'https://www.example.com/foo')
1881
 
        self.assertIsInstance(r, type(t))
1882
 
 
1883
 
    def test_redirected_to_same_host_different_protocol(self):
1884
 
        t = self._transport('http://www.example.com/foo')
1885
 
        r = t._redirected_to('http://www.example.com/foo',
1886
 
                             'ftp://www.example.com/foo')
1887
 
        self.assertNotEquals(type(r), type(t))
1888
 
 
1889
 
    def test_redirected_to_different_host_same_user(self):
1890
 
        t = self._transport('http://joe@www.example.com/foo')
1891
 
        r = t._redirected_to('http://www.example.com/foo',
1892
 
                             'https://foo.example.com/foo')
1893
 
        self.assertIsInstance(r, type(t))
1894
 
        self.assertEquals(t._user, r._user)
1895
 
 
1896
 
 
1897
 
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1898
 
    """Request handler for a unique and pre-defined request.
1899
 
 
1900
 
    The only thing we care about here is how many bytes travel on the wire. But
1901
 
    since we want to measure it for a real http client, we have to send it
1902
 
    correct responses.
1903
 
 
1904
 
    We expect to receive a *single* request nothing more (and we won't even
1905
 
    check what request it is, we just measure the bytes read until an empty
1906
 
    line.
1907
 
    """
1908
 
 
1909
 
    def handle_one_request(self):
1910
 
        tcs = self.server.test_case_server
1911
 
        requestline = self.rfile.readline()
1912
 
        headers = self.MessageClass(self.rfile, 0)
1913
 
        # We just read: the request, the headers, an empty line indicating the
1914
 
        # end of the headers.
1915
 
        bytes_read = len(requestline)
1916
 
        for line in headers.headers:
1917
 
            bytes_read += len(line)
1918
 
        bytes_read += len('\r\n')
1919
 
        if requestline.startswith('POST'):
1920
 
            # The body should be a single line (or we don't know where it ends
1921
 
            # and we don't want to issue a blocking read)
1922
 
            body = self.rfile.readline()
1923
 
            bytes_read += len(body)
1924
 
        tcs.bytes_read = bytes_read
1925
 
 
1926
 
        # We set the bytes written *before* issuing the write, the client is
1927
 
        # supposed to consume every produced byte *before* checking that value.
1928
 
 
1929
 
        # Doing the oppposite may lead to test failure: we may be interrupted
1930
 
        # after the write but before updating the value. The client can then
1931
 
        # continue and read the value *before* we can update it. And yes,
1932
 
        # this has been observed -- vila 20090129
1933
 
        tcs.bytes_written = len(tcs.canned_response)
1934
 
        self.wfile.write(tcs.canned_response)
1935
 
 
1936
 
 
1937
 
class ActivityServerMixin(object):
1938
 
 
1939
 
    def __init__(self, protocol_version):
1940
 
        super(ActivityServerMixin, self).__init__(
1941
 
            request_handler=PredefinedRequestHandler,
1942
 
            protocol_version=protocol_version)
1943
 
        # Bytes read and written by the server
1944
 
        self.bytes_read = 0
1945
 
        self.bytes_written = 0
1946
 
        self.canned_response = None
1947
 
 
1948
 
 
1949
 
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
1950
 
    pass
1951
 
 
1952
 
 
1953
 
if tests.HTTPSServerFeature.available():
1954
 
    from bzrlib.tests import https_server
1955
 
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
1956
 
        pass
1957
 
 
1958
 
 
1959
 
class TestActivity(tests.TestCase):
1960
 
    """Test socket activity reporting.
1961
 
 
1962
 
    We use a special purpose server to control the bytes sent and received and
1963
 
    be able to predict the activity on the client socket.
1964
 
    """
1965
 
 
1966
 
    def setUp(self):
1967
 
        tests.TestCase.setUp(self)
1968
 
        self.server = self._activity_server(self._protocol_version)
1969
 
        self.server.setUp()
1970
 
        self.activities = {}
1971
 
        def report_activity(t, bytes, direction):
1972
 
            count = self.activities.get(direction, 0)
1973
 
            count += bytes
1974
 
            self.activities[direction] = count
1975
 
 
1976
 
        # We override at class level because constructors may propagate the
1977
 
        # bound method and render instance overriding ineffective (an
1978
 
        # alternative would be to define a specific ui factory instead...)
1979
 
        self.orig_report_activity = self._transport._report_activity
1980
 
        self._transport._report_activity = report_activity
1981
 
 
1982
 
    def tearDown(self):
1983
 
        self._transport._report_activity = self.orig_report_activity
1984
 
        self.server.tearDown()
1985
 
        tests.TestCase.tearDown(self)
1986
 
 
1987
 
    def get_transport(self):
1988
 
        return self._transport(self.server.get_url())
1989
 
 
1990
 
    def assertActivitiesMatch(self):
1991
 
        self.assertEqual(self.server.bytes_read,
1992
 
                         self.activities.get('write', 0), 'written bytes')
1993
 
        self.assertEqual(self.server.bytes_written,
1994
 
                         self.activities.get('read', 0), 'read bytes')
1995
 
 
1996
 
    def test_get(self):
1997
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
1998
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
1999
 
Server: Apache/2.0.54 (Fedora)\r
2000
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
2001
 
ETag: "56691-23-38e9ae00"\r
2002
 
Accept-Ranges: bytes\r
2003
 
Content-Length: 35\r
2004
 
Connection: close\r
2005
 
Content-Type: text/plain; charset=UTF-8\r
2006
 
\r
2007
 
Bazaar-NG meta directory, format 1
2008
 
'''
2009
 
        t = self.get_transport()
2010
 
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
2011
 
                         t.get('foo/bar').read())
2012
 
        self.assertActivitiesMatch()
2013
 
 
2014
 
    def test_has(self):
2015
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2016
 
Server: SimpleHTTP/0.6 Python/2.5.2\r
2017
 
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
2018
 
Content-type: application/octet-stream\r
2019
 
Content-Length: 20\r
2020
 
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
2021
 
\r
2022
 
'''
2023
 
        t = self.get_transport()
2024
 
        self.assertTrue(t.has('foo/bar'))
2025
 
        self.assertActivitiesMatch()
2026
 
 
2027
 
    def test_readv(self):
2028
 
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
2029
 
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
2030
 
Server: Apache/2.0.54 (Fedora)\r
2031
 
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
2032
 
ETag: "238a3c-16ec2-805c5540"\r
2033
 
Accept-Ranges: bytes\r
2034
 
Content-Length: 1534\r
2035
 
Connection: close\r
2036
 
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
2037
 
\r
2038
 
\r
2039
 
--418470f848b63279b\r
2040
 
Content-type: text/plain; charset=UTF-8\r
2041
 
Content-range: bytes 0-254/93890\r
2042
 
\r
2043
 
mbp@sourcefrog.net-20050309040815-13242001617e4a06
2044
 
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
2045
 
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
2046
 
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
2047
 
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
2048
 
\r
2049
 
--418470f848b63279b\r
2050
 
Content-type: text/plain; charset=UTF-8\r
2051
 
Content-range: bytes 1000-2049/93890\r
2052
 
\r
2053
 
40-fd4ec249b6b139ab
2054
 
mbp@sourcefrog.net-20050311063625-07858525021f270b
2055
 
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
2056
 
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
2057
 
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
2058
 
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
2059
 
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
2060
 
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
2061
 
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
2062
 
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
2063
 
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
2064
 
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
2065
 
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
2066
 
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
2067
 
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
2068
 
mbp@sourcefrog.net-20050313120651-497bd231b19df600
2069
 
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
2070
 
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
2071
 
mbp@sourcefrog.net-20050314025539-637a636692c055cf
2072
 
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
2073
 
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
2074
 
mbp@source\r
2075
 
--418470f848b63279b--\r
2076
 
'''
2077
 
        t = self.get_transport()
2078
 
        # Remember that the request is ignored and that the ranges below
2079
 
        # doesn't have to match the canned response.
2080
 
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
2081
 
        self.assertEqual(2, len(l))
2082
 
        self.assertActivitiesMatch()
2083
 
 
2084
 
    def test_post(self):
2085
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2086
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
2087
 
Server: Apache/2.0.54 (Fedora)\r
2088
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
2089
 
ETag: "56691-23-38e9ae00"\r
2090
 
Accept-Ranges: bytes\r
2091
 
Content-Length: 35\r
2092
 
Connection: close\r
2093
 
Content-Type: text/plain; charset=UTF-8\r
2094
 
\r
2095
 
lalala whatever as long as itsssss
2096
 
'''
2097
 
        t = self.get_transport()
2098
 
        # We must send a single line of body bytes, see
2099
 
        # PredefinedRequestHandler.handle_one_request
2100
 
        code, f = t._post('abc def end-of-body\n')
2101
 
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2102
 
        self.assertActivitiesMatch()
 
1750
                          t.send_http_smart_request, 'whatever')
 
1751