~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Vincent Ladeuil
  • Date: 2009-06-22 12:52:39 UTC
  • mto: (4471.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4472.
  • Revision ID: v.ladeuil+lp@free.fr-20090622125239-kabo9smxt9c3vnir
Use a consistent scheme for naming pyrex source files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
34
34
 
35
35
import bzrlib
36
36
from bzrlib import (
 
37
    bzrdir,
37
38
    config,
38
39
    errors,
39
40
    osutils,
 
41
    remote as _mod_remote,
40
42
    tests,
41
43
    transport,
42
44
    ui,
43
45
    urlutils,
44
46
    )
 
47
from bzrlib.symbol_versioning import (
 
48
    deprecated_in,
 
49
    )
45
50
from bzrlib.tests import (
46
51
    http_server,
47
52
    http_utils,
63
68
    pycurl_present = False
64
69
 
65
70
 
66
 
class TransportAdapter(tests.TestScenarioApplier):
67
 
    """Generate the same test for each transport implementation."""
68
 
 
69
 
    def __init__(self):
70
 
        transport_scenarios = [
71
 
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
72
 
                            _server=http_server.HttpServer_urllib,
73
 
                            _qualified_prefix='http+urllib',)),
74
 
            ]
75
 
        if pycurl_present:
76
 
            transport_scenarios.append(
77
 
                ('pycurl', dict(_transport=PyCurlTransport,
78
 
                                _server=http_server.HttpServer_PyCurl,
79
 
                                _qualified_prefix='http+pycurl',)))
80
 
        self.scenarios = transport_scenarios
81
 
 
82
 
 
83
 
class TransportProtocolAdapter(TransportAdapter):
84
 
    """Generate the same test for each protocol implementation.
85
 
 
86
 
    In addition to the transport adaptatation that we inherit from.
87
 
    """
88
 
 
89
 
    def __init__(self):
90
 
        super(TransportProtocolAdapter, self).__init__()
91
 
        protocol_scenarios = [
92
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
93
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
94
 
            ]
95
 
        self.scenarios = tests.multiply_scenarios(self.scenarios,
96
 
                                                  protocol_scenarios)
97
 
 
98
 
 
99
 
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
100
 
    """Generate the same test for each authentication scheme implementation.
101
 
 
102
 
    In addition to the protocol adaptatation that we inherit from.
103
 
    """
104
 
 
105
 
    def __init__(self):
106
 
        super(TransportProtocolAuthenticationAdapter, self).__init__()
107
 
        auth_scheme_scenarios = [
108
 
            ('basic', dict(_auth_scheme='basic')),
109
 
            ('digest', dict(_auth_scheme='digest')),
110
 
            ]
111
 
 
112
 
        self.scenarios = tests.multiply_scenarios(self.scenarios,
113
 
                                                  auth_scheme_scenarios)
114
 
 
115
71
def load_tests(standard_tests, module, loader):
116
72
    """Multiply tests for http clients and protocol versions."""
117
 
    # one for each transport
118
 
    t_adapter = TransportAdapter()
119
 
    t_classes= (TestHttpTransportRegistration,
 
73
    result = loader.suiteClass()
 
74
 
 
75
    # one for each transport implementation
 
76
    t_tests, remaining_tests = tests.split_suite_by_condition(
 
77
        standard_tests, tests.condition_isinstance((
 
78
                TestHttpTransportRegistration,
120
79
                TestHttpTransportUrls,
 
80
                Test_redirected_to,
 
81
                )))
 
82
    transport_scenarios = [
 
83
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
84
                        _server=http_server.HttpServer_urllib,
 
85
                        _qualified_prefix='http+urllib',)),
 
86
        ]
 
87
    if pycurl_present:
 
88
        transport_scenarios.append(
 
89
            ('pycurl', dict(_transport=PyCurlTransport,
 
90
                            _server=http_server.HttpServer_PyCurl,
 
91
                            _qualified_prefix='http+pycurl',)))
 
92
    tests.multiply_tests(t_tests, transport_scenarios, result)
 
93
 
 
94
    # each implementation tested with each HTTP version
 
95
    tp_tests, remaining_tests = tests.split_suite_by_condition(
 
96
        remaining_tests, tests.condition_isinstance((
 
97
                SmartHTTPTunnellingTest,
 
98
                TestDoCatchRedirections,
 
99
                TestHTTPConnections,
 
100
                TestHTTPRedirections,
 
101
                TestHTTPSilentRedirections,
 
102
                TestLimitedRangeRequestServer,
 
103
                TestPost,
 
104
                TestProxyHttpServer,
 
105
                TestRanges,
 
106
                TestSpecificRequestHandler,
 
107
                )))
 
108
    protocol_scenarios = [
 
109
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
110
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
111
            ]
 
112
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
 
113
                                            protocol_scenarios)
 
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
 
115
 
 
116
    # proxy auth: each auth scheme on all http versions on all implementations.
 
117
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
118
        remaining_tests, tests.condition_isinstance((
 
119
                TestProxyAuth,
 
120
                )))
 
121
    proxy_auth_scheme_scenarios = [
 
122
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
123
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
124
        ('basicdigest',
 
125
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
126
        ]
 
127
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
128
                                              proxy_auth_scheme_scenarios)
 
129
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
130
 
 
131
    # auth: each auth scheme on all http versions on all implementations.
 
132
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
 
133
        remaining_tests, tests.condition_isinstance((
 
134
                TestAuth,
 
135
                )))
 
136
    auth_scheme_scenarios = [
 
137
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
138
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
139
        ('basicdigest',
 
140
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
 
141
        ]
 
142
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
143
                                             auth_scheme_scenarios)
 
144
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
 
145
 
 
146
    # activity: on all http[s] versions on all implementations
 
147
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
 
148
        remaining_tests, tests.condition_isinstance((
 
149
                TestActivity,
 
150
                )))
 
151
    activity_scenarios = [
 
152
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
153
                             _transport=_urllib.HttpTransport_urllib,)),
 
154
        ]
 
155
    if tests.HTTPSServerFeature.available():
 
156
        activity_scenarios.append(
 
157
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
158
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
159
    if pycurl_present:
 
160
        activity_scenarios.append(
 
161
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
162
                                 _transport=PyCurlTransport,)),)
 
163
        if tests.HTTPSServerFeature.available():
 
164
            from bzrlib.tests import (
 
165
                ssl_certs,
121
166
                )
122
 
    is_testing_for_transports = tests.condition_isinstance(t_classes)
123
 
 
124
 
    # multiplied by one for each protocol version
125
 
    tp_adapter = TransportProtocolAdapter()
126
 
    tp_classes= (SmartHTTPTunnellingTest,
127
 
                 TestDoCatchRedirections,
128
 
                 TestHTTPConnections,
129
 
                 TestHTTPRedirections,
130
 
                 TestHTTPSilentRedirections,
131
 
                 TestLimitedRangeRequestServer,
132
 
                 TestPost,
133
 
                 TestProxyHttpServer,
134
 
                 TestRanges,
135
 
                 TestSpecificRequestHandler,
136
 
                 )
137
 
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
138
 
 
139
 
    # multiplied by one for each authentication scheme
140
 
    tpa_adapter = TransportProtocolAuthenticationAdapter()
141
 
    tpa_classes = (TestAuth,
142
 
                   )
143
 
    is_also_testing_for_authentication = tests.condition_isinstance(
144
 
        tpa_classes)
145
 
 
146
 
    result = loader.suiteClass()
147
 
    for test_class in tests.iter_suite_tests(standard_tests):
148
 
        # Each test class is either standalone or testing for some combination
149
 
        # of transport, protocol version, authentication scheme. Use the right
150
 
        # adpater (or none) depending on the class.
151
 
        if is_testing_for_transports(test_class):
152
 
            result.addTests(t_adapter.adapt(test_class))
153
 
        elif is_also_testing_for_protocols(test_class):
154
 
            result.addTests(tp_adapter.adapt(test_class))
155
 
        elif is_also_testing_for_authentication(test_class):
156
 
            result.addTests(tpa_adapter.adapt(test_class))
157
 
        else:
158
 
            result.addTest(test_class)
 
167
            # FIXME: Until we have a better way to handle self-signed
 
168
            # certificates (like allowing them in a test specific
 
169
            # authentication.conf for example), we need some specialized pycurl
 
170
            # transport for tests.
 
171
            class HTTPS_pycurl_transport(PyCurlTransport):
 
172
 
 
173
                def __init__(self, base, _from_transport=None):
 
174
                    super(HTTPS_pycurl_transport, self).__init__(
 
175
                        base, _from_transport)
 
176
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
177
 
 
178
            activity_scenarios.append(
 
179
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
180
                                      _transport=HTTPS_pycurl_transport,)),)
 
181
 
 
182
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
183
                                               protocol_scenarios)
 
184
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
 
185
 
 
186
    # No parametrization for the remaining tests
 
187
    result.addTests(remaining_tests)
 
188
 
159
189
    return result
160
190
 
161
191
 
170
200
 
171
201
class RecordingServer(object):
172
202
    """A fake HTTP server.
173
 
    
 
203
 
174
204
    It records the bytes sent to it, and replies with a 200.
175
205
    """
176
206
 
225
255
        self.port = None
226
256
 
227
257
 
 
258
class TestAuthHeader(tests.TestCase):
 
259
 
 
260
    def parse_header(self, header, auth_handler_class=None):
 
261
        if auth_handler_class is None:
 
262
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
 
263
        self.auth_handler =  auth_handler_class()
 
264
        return self.auth_handler._parse_auth_header(header)
 
265
 
 
266
    def test_empty_header(self):
 
267
        scheme, remainder = self.parse_header('')
 
268
        self.assertEquals('', scheme)
 
269
        self.assertIs(None, remainder)
 
270
 
 
271
    def test_negotiate_header(self):
 
272
        scheme, remainder = self.parse_header('Negotiate')
 
273
        self.assertEquals('negotiate', scheme)
 
274
        self.assertIs(None, remainder)
 
275
 
 
276
    def test_basic_header(self):
 
277
        scheme, remainder = self.parse_header(
 
278
            'Basic realm="Thou should not pass"')
 
279
        self.assertEquals('basic', scheme)
 
280
        self.assertEquals('realm="Thou should not pass"', remainder)
 
281
 
 
282
    def test_basic_extract_realm(self):
 
283
        scheme, remainder = self.parse_header(
 
284
            'Basic realm="Thou should not pass"',
 
285
            _urllib2_wrappers.BasicAuthHandler)
 
286
        match, realm = self.auth_handler.extract_realm(remainder)
 
287
        self.assertTrue(match is not None)
 
288
        self.assertEquals('Thou should not pass', realm)
 
289
 
 
290
    def test_digest_header(self):
 
291
        scheme, remainder = self.parse_header(
 
292
            'Digest realm="Thou should not pass"')
 
293
        self.assertEquals('digest', scheme)
 
294
        self.assertEquals('realm="Thou should not pass"', remainder)
 
295
 
 
296
 
228
297
class TestHTTPServer(tests.TestCase):
229
298
    """Test the HTTP servers implementations."""
230
299
 
447
516
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
448
517
            % bzrlib.__version__) > -1)
449
518
 
450
 
    def test_get_smart_medium(self):
451
 
        # For HTTP, get_smart_medium should return the transport object.
452
 
        server = self.get_readonly_server()
453
 
        http_transport = self._transport(server.get_url())
454
 
        medium = http_transport.get_smart_medium()
455
 
        self.assertIs(medium, http_transport)
456
 
 
457
519
    def test_has_on_bogus_host(self):
458
520
        # Get a free address and don't 'accept' on it, so that we
459
521
        # can be sure there is no http handler there, but set a
816
878
        # bytes on the socket
817
879
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
818
880
        self.assertEqual((0, '0'), ireadv.next())
819
 
        # The server should have issued one request so far 
 
881
        # The server should have issued one request so far
820
882
        self.assertEqual(1, server.GET_request_nb)
821
883
        self.assertEqual('0123456789', t.get_bytes('a'))
822
884
        # get_bytes issued an additional request, the readv pending ones are
1143
1205
        url = self.server.get_url()
1144
1206
        t = self._transport(url)
1145
1207
        try:
1146
 
            self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
 
1208
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1147
1209
        finally:
1148
1210
            self._restore_env()
1149
1211
 
1152
1214
        url = self.server.get_url()
1153
1215
        t = self._transport(url)
1154
1216
        try:
1155
 
            self.assertEqual(t.get('foo').read(), 'contents of foo\n')
 
1217
            self.assertEqual('contents of foo\n', t.get('foo').read())
1156
1218
        finally:
1157
1219
            self._restore_env()
1158
1220
 
1268
1330
                                  ('bundle',
1269
1331
                                  '# Bazaar revision bundle v0.9\n#\n')
1270
1332
                                  ],)
1271
 
 
 
1333
        # The requests to the old server will be redirected to the new server
1272
1334
        self.old_transport = self._transport(self.old_server.get_url())
1273
1335
 
1274
1336
    def test_redirected(self):
1279
1341
    def test_read_redirected_bundle_from_url(self):
1280
1342
        from bzrlib.bundle import read_bundle_from_url
1281
1343
        url = self.old_transport.abspath('bundle')
1282
 
        bundle = read_bundle_from_url(url)
 
1344
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
 
1345
                read_bundle_from_url, url)
1283
1346
        # If read_bundle_from_url was successful we get an empty bundle
1284
1347
        self.assertEqual([], bundle.revisions)
1285
1348
 
1296
1359
        # Since the tests using this class will replace
1297
1360
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1298
1361
        # or we'll loop.
1299
 
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
 
1362
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1300
1363
        self.follow_redirections = True
1301
1364
 
1302
1365
 
1429
1492
 
1430
1493
    _auth_header = 'Authorization'
1431
1494
    _password_prompt_prefix = ''
 
1495
    _username_prompt_prefix = ''
 
1496
    # Set by load_tests
 
1497
    _auth_server = None
1432
1498
 
1433
1499
    def setUp(self):
1434
1500
        super(TestAuth, self).setUp()
1437
1503
                                  ('b', 'contents of b\n'),])
1438
1504
 
1439
1505
    def create_transport_readonly_server(self):
1440
 
        if self._auth_scheme == 'basic':
1441
 
            server = http_utils.HTTPBasicAuthServer(
1442
 
                protocol_version=self._protocol_version)
1443
 
        else:
1444
 
            if self._auth_scheme != 'digest':
1445
 
                raise AssertionError('Unknown auth scheme: %r'
1446
 
                                     % self._auth_scheme)
1447
 
            server = http_utils.HTTPDigestAuthServer(
1448
 
                protocol_version=self._protocol_version)
1449
 
        return server
 
1506
        return self._auth_server(protocol_version=self._protocol_version)
1450
1507
 
1451
1508
    def _testing_pycurl(self):
1452
1509
        return pycurl_present and self._transport == PyCurlTransport
1453
1510
 
1454
 
    def get_user_url(self, user=None, password=None):
 
1511
    def get_user_url(self, user, password):
1455
1512
        """Build an url embedding user and password"""
1456
1513
        url = '%s://' % self.server._url_protocol
1457
1514
        if user is not None:
1462
1519
        url += '%s:%s/' % (self.server.host, self.server.port)
1463
1520
        return url
1464
1521
 
1465
 
    def get_user_transport(self, user=None, password=None):
 
1522
    def get_user_transport(self, user, password):
1466
1523
        return self._transport(self.get_user_url(user, password))
1467
1524
 
1468
1525
    def test_no_user(self):
1469
1526
        self.server.add_user('joe', 'foo')
1470
 
        t = self.get_user_transport()
 
1527
        t = self.get_user_transport(None, None)
1471
1528
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1472
1529
        # Only one 'Authentication Required' error should occur
1473
1530
        self.assertEqual(1, self.server.auth_required_errors)
1503
1560
        # initial 'who are you' and 'this is not you, who are you')
1504
1561
        self.assertEqual(2, self.server.auth_required_errors)
1505
1562
 
 
1563
    def test_prompt_for_username(self):
 
1564
        if self._testing_pycurl():
 
1565
            raise tests.TestNotApplicable(
 
1566
                'pycurl cannot prompt, it handles auth by embedding'
 
1567
                ' user:pass in urls only')
 
1568
 
 
1569
        self.server.add_user('joe', 'foo')
 
1570
        t = self.get_user_transport(None, None)
 
1571
        stdout = tests.StringIOWrapper()
 
1572
        stderr = tests.StringIOWrapper()
 
1573
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1574
                                            stdout=stdout, stderr=stderr)
 
1575
        self.assertEqual('contents of a\n',t.get('a').read())
 
1576
        # stdin should be empty
 
1577
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1578
        stderr.seek(0)
 
1579
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
 
1580
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
 
1581
        self.assertEquals('', stdout.getvalue())
 
1582
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1583
                                    stderr.readline())
 
1584
 
1506
1585
    def test_prompt_for_password(self):
1507
1586
        if self._testing_pycurl():
1508
1587
            raise tests.TestNotApplicable(
1512
1591
        self.server.add_user('joe', 'foo')
1513
1592
        t = self.get_user_transport('joe', None)
1514
1593
        stdout = tests.StringIOWrapper()
1515
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1516
 
        self.assertEqual('contents of a\n',t.get('a').read())
 
1594
        stderr = tests.StringIOWrapper()
 
1595
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1596
                                            stdout=stdout, stderr=stderr)
 
1597
        self.assertEqual('contents of a\n', t.get('a').read())
1517
1598
        # stdin should be empty
1518
1599
        self.assertEqual('', ui.ui_factory.stdin.readline())
1519
1600
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1520
 
                                    stdout.getvalue())
 
1601
                                    stderr.getvalue())
 
1602
        self.assertEquals('', stdout.getvalue())
1521
1603
        # And we shouldn't prompt again for a different request
1522
1604
        # against the same transport.
1523
1605
        self.assertEqual('contents of b\n',t.get('b').read())
1535
1617
                                 self.server.auth_realm)))
1536
1618
        self.assertEquals(expected_prompt, actual_prompt)
1537
1619
 
 
1620
    def _expected_username_prompt(self, scheme):
 
1621
        return (self._username_prompt_prefix
 
1622
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
 
1623
                                 self.server.host, self.server.port,
 
1624
                                 self.server.auth_realm))
 
1625
 
1538
1626
    def test_no_prompt_for_password_when_using_auth_config(self):
1539
1627
        if self._testing_pycurl():
1540
1628
            raise tests.TestNotApplicable(
1561
1649
        # Only one 'Authentication Required' error should occur
1562
1650
        self.assertEqual(1, self.server.auth_required_errors)
1563
1651
 
 
1652
    def test_user_from_auth_conf(self):
 
1653
        if self._testing_pycurl():
 
1654
            raise tests.TestNotApplicable(
 
1655
                'pycurl does not support authentication.conf')
 
1656
        user = 'joe'
 
1657
        password = 'foo'
 
1658
        self.server.add_user(user, password)
 
1659
        # Create a minimal config file with the right password
 
1660
        conf = config.AuthenticationConfig()
 
1661
        conf._get_config().update(
 
1662
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1663
                          'user': user, 'password': password}})
 
1664
        conf._save()
 
1665
        t = self.get_user_transport(None, None)
 
1666
        # Issue a request to the server to connect
 
1667
        self.assertEqual('contents of a\n', t.get('a').read())
 
1668
        # Only one 'Authentication Required' error should occur
 
1669
        self.assertEqual(1, self.server.auth_required_errors)
 
1670
 
1564
1671
    def test_changing_nonce(self):
1565
 
        if self._auth_scheme != 'digest':
1566
 
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1672
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1673
                                     http_utils.ProxyDigestAuthServer):
 
1674
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1567
1675
        if self._testing_pycurl():
1568
1676
            raise tests.KnownFailure(
1569
1677
                'pycurl does not handle a nonce change')
1587
1695
    """Test proxy authentication schemes."""
1588
1696
 
1589
1697
    _auth_header = 'Proxy-authorization'
1590
 
    _password_prompt_prefix='Proxy '
 
1698
    _password_prompt_prefix = 'Proxy '
 
1699
    _username_prompt_prefix = 'Proxy '
1591
1700
 
1592
1701
    def setUp(self):
1593
1702
        super(TestProxyAuth, self).setUp()
1600
1709
                                  ('b-proxied', 'contents of b\n'),
1601
1710
                                  ])
1602
1711
 
1603
 
    def create_transport_readonly_server(self):
1604
 
        if self._auth_scheme == 'basic':
1605
 
            server = http_utils.ProxyBasicAuthServer(
1606
 
                protocol_version=self._protocol_version)
1607
 
        else:
1608
 
            if self._auth_scheme != 'digest':
1609
 
                raise AssertionError('Unknown auth scheme: %r'
1610
 
                                     % self._auth_scheme)
1611
 
            server = http_utils.ProxyDigestAuthServer(
1612
 
                protocol_version=self._protocol_version)
1613
 
        return server
1614
 
 
1615
 
    def get_user_transport(self, user=None, password=None):
 
1712
    def get_user_transport(self, user, password):
1616
1713
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1617
1714
        return self._transport(self.server.get_url())
1618
1715
 
1667
1764
        return http_utils.HTTPServerWithSmarts(
1668
1765
            protocol_version=self._protocol_version)
1669
1766
 
 
1767
    def test_open_bzrdir(self):
 
1768
        branch = self.make_branch('relpath')
 
1769
        http_server = self.get_readonly_server()
 
1770
        url = http_server.get_url() + 'relpath'
 
1771
        bd = bzrdir.BzrDir.open(url)
 
1772
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
 
1773
 
1670
1774
    def test_bulk_data(self):
1671
1775
        # We should be able to send and receive bulk data in a single message.
1672
1776
        # The 'readv' command in the smart protocol both sends and receives
1738
1842
        # No need to build a valid smart request here, the server will not even
1739
1843
        # try to interpret it.
1740
1844
        self.assertRaises(errors.SmartProtocolError,
1741
 
                          t.send_http_smart_request, 'whatever')
1742
 
 
 
1845
                          t.get_smart_medium().send_http_smart_request,
 
1846
                          'whatever')
 
1847
 
 
1848
class Test_redirected_to(tests.TestCase):
 
1849
 
 
1850
    def test_redirected_to_subdir(self):
 
1851
        t = self._transport('http://www.example.com/foo')
 
1852
        r = t._redirected_to('http://www.example.com/foo',
 
1853
                             'http://www.example.com/foo/subdir')
 
1854
        self.assertIsInstance(r, type(t))
 
1855
        # Both transports share the some connection
 
1856
        self.assertEquals(t._get_connection(), r._get_connection())
 
1857
 
 
1858
    def test_redirected_to_self_with_slash(self):
 
1859
        t = self._transport('http://www.example.com/foo')
 
1860
        r = t._redirected_to('http://www.example.com/foo',
 
1861
                             'http://www.example.com/foo/')
 
1862
        self.assertIsInstance(r, type(t))
 
1863
        # Both transports share the some connection (one can argue that we
 
1864
        # should return the exact same transport here, but that seems
 
1865
        # overkill).
 
1866
        self.assertEquals(t._get_connection(), r._get_connection())
 
1867
 
 
1868
    def test_redirected_to_host(self):
 
1869
        t = self._transport('http://www.example.com/foo')
 
1870
        r = t._redirected_to('http://www.example.com/foo',
 
1871
                             'http://foo.example.com/foo/subdir')
 
1872
        self.assertIsInstance(r, type(t))
 
1873
 
 
1874
    def test_redirected_to_same_host_sibling_protocol(self):
 
1875
        t = self._transport('http://www.example.com/foo')
 
1876
        r = t._redirected_to('http://www.example.com/foo',
 
1877
                             'https://www.example.com/foo')
 
1878
        self.assertIsInstance(r, type(t))
 
1879
 
 
1880
    def test_redirected_to_same_host_different_protocol(self):
 
1881
        t = self._transport('http://www.example.com/foo')
 
1882
        r = t._redirected_to('http://www.example.com/foo',
 
1883
                             'ftp://www.example.com/foo')
 
1884
        self.assertNotEquals(type(r), type(t))
 
1885
 
 
1886
    def test_redirected_to_different_host_same_user(self):
 
1887
        t = self._transport('http://joe@www.example.com/foo')
 
1888
        r = t._redirected_to('http://www.example.com/foo',
 
1889
                             'https://foo.example.com/foo')
 
1890
        self.assertIsInstance(r, type(t))
 
1891
        self.assertEquals(t._user, r._user)
 
1892
 
 
1893
 
 
1894
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
 
1895
    """Request handler for a unique and pre-defined request.
 
1896
 
 
1897
    The only thing we care about here is how many bytes travel on the wire. But
 
1898
    since we want to measure it for a real http client, we have to send it
 
1899
    correct responses.
 
1900
 
 
1901
    We expect to receive a *single* request nothing more (and we won't even
 
1902
    check what request it is, we just measure the bytes read until an empty
 
1903
    line.
 
1904
    """
 
1905
 
 
1906
    def handle_one_request(self):
 
1907
        tcs = self.server.test_case_server
 
1908
        requestline = self.rfile.readline()
 
1909
        headers = self.MessageClass(self.rfile, 0)
 
1910
        # We just read: the request, the headers, an empty line indicating the
 
1911
        # end of the headers.
 
1912
        bytes_read = len(requestline)
 
1913
        for line in headers.headers:
 
1914
            bytes_read += len(line)
 
1915
        bytes_read += len('\r\n')
 
1916
        if requestline.startswith('POST'):
 
1917
            # The body should be a single line (or we don't know where it ends
 
1918
            # and we don't want to issue a blocking read)
 
1919
            body = self.rfile.readline()
 
1920
            bytes_read += len(body)
 
1921
        tcs.bytes_read = bytes_read
 
1922
 
 
1923
        # We set the bytes written *before* issuing the write, the client is
 
1924
        # supposed to consume every produced byte *before* checking that value.
 
1925
 
 
1926
        # Doing the oppposite may lead to test failure: we may be interrupted
 
1927
        # after the write but before updating the value. The client can then
 
1928
        # continue and read the value *before* we can update it. And yes,
 
1929
        # this has been observed -- vila 20090129
 
1930
        tcs.bytes_written = len(tcs.canned_response)
 
1931
        self.wfile.write(tcs.canned_response)
 
1932
 
 
1933
 
 
1934
class ActivityServerMixin(object):
 
1935
 
 
1936
    def __init__(self, protocol_version):
 
1937
        super(ActivityServerMixin, self).__init__(
 
1938
            request_handler=PredefinedRequestHandler,
 
1939
            protocol_version=protocol_version)
 
1940
        # Bytes read and written by the server
 
1941
        self.bytes_read = 0
 
1942
        self.bytes_written = 0
 
1943
        self.canned_response = None
 
1944
 
 
1945
 
 
1946
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
 
1947
    pass
 
1948
 
 
1949
 
 
1950
if tests.HTTPSServerFeature.available():
 
1951
    from bzrlib.tests import https_server
 
1952
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
 
1953
        pass
 
1954
 
 
1955
 
 
1956
class TestActivity(tests.TestCase):
 
1957
    """Test socket activity reporting.
 
1958
 
 
1959
    We use a special purpose server to control the bytes sent and received and
 
1960
    be able to predict the activity on the client socket.
 
1961
    """
 
1962
 
 
1963
    def setUp(self):
 
1964
        tests.TestCase.setUp(self)
 
1965
        self.server = self._activity_server(self._protocol_version)
 
1966
        self.server.setUp()
 
1967
        self.activities = {}
 
1968
        def report_activity(t, bytes, direction):
 
1969
            count = self.activities.get(direction, 0)
 
1970
            count += bytes
 
1971
            self.activities[direction] = count
 
1972
 
 
1973
        # We override at class level because constructors may propagate the
 
1974
        # bound method and render instance overriding ineffective (an
 
1975
        # alternative would be to define a specific ui factory instead...)
 
1976
        self.orig_report_activity = self._transport._report_activity
 
1977
        self._transport._report_activity = report_activity
 
1978
 
 
1979
    def tearDown(self):
 
1980
        self._transport._report_activity = self.orig_report_activity
 
1981
        self.server.tearDown()
 
1982
        tests.TestCase.tearDown(self)
 
1983
 
 
1984
    def get_transport(self):
 
1985
        return self._transport(self.server.get_url())
 
1986
 
 
1987
    def assertActivitiesMatch(self):
 
1988
        self.assertEqual(self.server.bytes_read,
 
1989
                         self.activities.get('write', 0), 'written bytes')
 
1990
        self.assertEqual(self.server.bytes_written,
 
1991
                         self.activities.get('read', 0), 'read bytes')
 
1992
 
 
1993
    def test_get(self):
 
1994
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
1995
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
 
1996
Server: Apache/2.0.54 (Fedora)\r
 
1997
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
 
1998
ETag: "56691-23-38e9ae00"\r
 
1999
Accept-Ranges: bytes\r
 
2000
Content-Length: 35\r
 
2001
Connection: close\r
 
2002
Content-Type: text/plain; charset=UTF-8\r
 
2003
\r
 
2004
Bazaar-NG meta directory, format 1
 
2005
'''
 
2006
        t = self.get_transport()
 
2007
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
 
2008
                         t.get('foo/bar').read())
 
2009
        self.assertActivitiesMatch()
 
2010
 
 
2011
    def test_has(self):
 
2012
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
2013
Server: SimpleHTTP/0.6 Python/2.5.2\r
 
2014
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
 
2015
Content-type: application/octet-stream\r
 
2016
Content-Length: 20\r
 
2017
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
 
2018
\r
 
2019
'''
 
2020
        t = self.get_transport()
 
2021
        self.assertTrue(t.has('foo/bar'))
 
2022
        self.assertActivitiesMatch()
 
2023
 
 
2024
    def test_readv(self):
 
2025
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
 
2026
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
 
2027
Server: Apache/2.0.54 (Fedora)\r
 
2028
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
 
2029
ETag: "238a3c-16ec2-805c5540"\r
 
2030
Accept-Ranges: bytes\r
 
2031
Content-Length: 1534\r
 
2032
Connection: close\r
 
2033
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
 
2034
\r
 
2035
\r
 
2036
--418470f848b63279b\r
 
2037
Content-type: text/plain; charset=UTF-8\r
 
2038
Content-range: bytes 0-254/93890\r
 
2039
\r
 
2040
mbp@sourcefrog.net-20050309040815-13242001617e4a06
 
2041
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
 
2042
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
 
2043
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
 
2044
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
 
2045
\r
 
2046
--418470f848b63279b\r
 
2047
Content-type: text/plain; charset=UTF-8\r
 
2048
Content-range: bytes 1000-2049/93890\r
 
2049
\r
 
2050
40-fd4ec249b6b139ab
 
2051
mbp@sourcefrog.net-20050311063625-07858525021f270b
 
2052
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
 
2053
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
 
2054
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
 
2055
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
 
2056
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
 
2057
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
 
2058
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
 
2059
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
 
2060
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
 
2061
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
 
2062
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
 
2063
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
 
2064
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
 
2065
mbp@sourcefrog.net-20050313120651-497bd231b19df600
 
2066
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
 
2067
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
 
2068
mbp@sourcefrog.net-20050314025539-637a636692c055cf
 
2069
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
 
2070
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
 
2071
mbp@source\r
 
2072
--418470f848b63279b--\r
 
2073
'''
 
2074
        t = self.get_transport()
 
2075
        # Remember that the request is ignored and that the ranges below
 
2076
        # doesn't have to match the canned response.
 
2077
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
 
2078
        self.assertEqual(2, len(l))
 
2079
        self.assertActivitiesMatch()
 
2080
 
 
2081
    def test_post(self):
 
2082
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
2083
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
 
2084
Server: Apache/2.0.54 (Fedora)\r
 
2085
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
 
2086
ETag: "56691-23-38e9ae00"\r
 
2087
Accept-Ranges: bytes\r
 
2088
Content-Length: 35\r
 
2089
Connection: close\r
 
2090
Content-Type: text/plain; charset=UTF-8\r
 
2091
\r
 
2092
lalala whatever as long as itsssss
 
2093
'''
 
2094
        t = self.get_transport()
 
2095
        # We must send a single line of body bytes, see
 
2096
        # PredefinedRequestHandler.handle_one_request
 
2097
        code, f = t._post('abc def end-of-body\n')
 
2098
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
 
2099
        self.assertActivitiesMatch()