~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-04-09 20:23:07 UTC
  • mfrom: (4265.1.4 bbc-merge)
  • Revision ID: pqm@pqm.ubuntu.com-20090409202307-n0depb16qepoe21o
(jam) Change _fetch_uses_deltas = False for CHK repos until we can
        write a better fix.

Show diffs side-by-side

added added

removed removed

Lines of Context:
113
113
                                            protocol_scenarios)
114
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
115
 
116
 
    # proxy auth: each auth scheme on all http versions on all implementations.
117
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
 
        remaining_tests, tests.condition_isinstance((
119
 
                TestProxyAuth,
120
 
                )))
121
 
    proxy_auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
124
 
        ('basicdigest',
125
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
126
 
        ]
127
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
 
                                              proxy_auth_scheme_scenarios)
129
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
130
 
 
131
116
    # auth: each auth scheme on all http versions on all implementations.
132
117
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
118
        remaining_tests, tests.condition_isinstance((
134
119
                TestAuth,
135
120
                )))
136
121
    auth_scheme_scenarios = [
137
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
139
 
        ('basicdigest',
140
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
 
122
        ('basic', dict(_auth_scheme='basic')),
 
123
        ('digest', dict(_auth_scheme='digest')),
141
124
        ]
142
125
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
 
                                             auth_scheme_scenarios)
 
126
        auth_scheme_scenarios)
144
127
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
145
128
 
146
 
    # activity: on all http[s] versions on all implementations
 
129
    # activity: activity on all http versions on all implementations
147
130
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
131
        remaining_tests, tests.condition_isinstance((
149
132
                TestActivity,
150
133
                )))
151
134
    activity_scenarios = [
152
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
 
                             _transport=_urllib.HttpTransport_urllib,)),
 
135
        ('http', dict(_activity_server=ActivityHTTPServer)),
154
136
        ]
155
137
    if tests.HTTPSServerFeature.available():
156
138
        activity_scenarios.append(
157
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
159
 
    if pycurl_present:
160
 
        activity_scenarios.append(
161
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
 
                                 _transport=PyCurlTransport,)),)
163
 
        if tests.HTTPSServerFeature.available():
164
 
            from bzrlib.tests import (
165
 
                ssl_certs,
166
 
                )
167
 
            # FIXME: Until we have a better way to handle self-signed
168
 
            # certificates (like allowing them in a test specific
169
 
            # authentication.conf for example), we need some specialized pycurl
170
 
            # transport for tests.
171
 
            class HTTPS_pycurl_transport(PyCurlTransport):
172
 
 
173
 
                def __init__(self, base, _from_transport=None):
174
 
                    super(HTTPS_pycurl_transport, self).__init__(
175
 
                        base, _from_transport)
176
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
177
 
 
178
 
            activity_scenarios.append(
179
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
 
                                      _transport=HTTPS_pycurl_transport,)),)
181
 
 
182
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
183
 
                                               protocol_scenarios)
 
139
            ('https', dict(_activity_server=ActivityHTTPSServer)))
 
140
    tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
 
141
        activity_scenarios)
184
142
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
185
143
 
186
144
    # No parametrization for the remaining tests
257
215
 
258
216
class TestAuthHeader(tests.TestCase):
259
217
 
260
 
    def parse_header(self, header, auth_handler_class=None):
261
 
        if auth_handler_class is None:
262
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
263
 
        self.auth_handler =  auth_handler_class()
264
 
        return self.auth_handler._parse_auth_header(header)
 
218
    def parse_header(self, header):
 
219
        ah =  _urllib2_wrappers.AbstractAuthHandler()
 
220
        return ah._parse_auth_header(header)
265
221
 
266
222
    def test_empty_header(self):
267
223
        scheme, remainder = self.parse_header('')
279
235
        self.assertEquals('basic', scheme)
280
236
        self.assertEquals('realm="Thou should not pass"', remainder)
281
237
 
282
 
    def test_basic_extract_realm(self):
283
 
        scheme, remainder = self.parse_header(
284
 
            'Basic realm="Thou should not pass"',
285
 
            _urllib2_wrappers.BasicAuthHandler)
286
 
        match, realm = self.auth_handler.extract_realm(remainder)
287
 
        self.assertTrue(match is not None)
288
 
        self.assertEquals('Thou should not pass', realm)
289
 
 
290
238
    def test_digest_header(self):
291
239
        scheme, remainder = self.parse_header(
292
240
            'Digest realm="Thou should not pass"')
1492
1440
 
1493
1441
    _auth_header = 'Authorization'
1494
1442
    _password_prompt_prefix = ''
1495
 
    _username_prompt_prefix = ''
1496
 
    # Set by load_tests
1497
 
    _auth_server = None
1498
1443
 
1499
1444
    def setUp(self):
1500
1445
        super(TestAuth, self).setUp()
1503
1448
                                  ('b', 'contents of b\n'),])
1504
1449
 
1505
1450
    def create_transport_readonly_server(self):
1506
 
        return self._auth_server(protocol_version=self._protocol_version)
 
1451
        if self._auth_scheme == 'basic':
 
1452
            server = http_utils.HTTPBasicAuthServer(
 
1453
                protocol_version=self._protocol_version)
 
1454
        else:
 
1455
            if self._auth_scheme != 'digest':
 
1456
                raise AssertionError('Unknown auth scheme: %r'
 
1457
                                     % self._auth_scheme)
 
1458
            server = http_utils.HTTPDigestAuthServer(
 
1459
                protocol_version=self._protocol_version)
 
1460
        return server
1507
1461
 
1508
1462
    def _testing_pycurl(self):
1509
1463
        return pycurl_present and self._transport == PyCurlTransport
1560
1514
        # initial 'who are you' and 'this is not you, who are you')
1561
1515
        self.assertEqual(2, self.server.auth_required_errors)
1562
1516
 
1563
 
    def test_prompt_for_username(self):
1564
 
        if self._testing_pycurl():
1565
 
            raise tests.TestNotApplicable(
1566
 
                'pycurl cannot prompt, it handles auth by embedding'
1567
 
                ' user:pass in urls only')
1568
 
 
1569
 
        self.server.add_user('joe', 'foo')
1570
 
        t = self.get_user_transport(None, None)
1571
 
        stdout = tests.StringIOWrapper()
1572
 
        stderr = tests.StringIOWrapper()
1573
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1574
 
                                            stdout=stdout, stderr=stderr)
1575
 
        self.assertEqual('contents of a\n',t.get('a').read())
1576
 
        # stdin should be empty
1577
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1578
 
        stderr.seek(0)
1579
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1580
 
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1581
 
        self.assertEquals('', stdout.getvalue())
1582
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1583
 
                                    stderr.readline())
1584
 
 
1585
1517
    def test_prompt_for_password(self):
1586
1518
        if self._testing_pycurl():
1587
1519
            raise tests.TestNotApplicable(
1591
1523
        self.server.add_user('joe', 'foo')
1592
1524
        t = self.get_user_transport('joe', None)
1593
1525
        stdout = tests.StringIOWrapper()
1594
 
        stderr = tests.StringIOWrapper()
1595
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1596
 
                                            stdout=stdout, stderr=stderr)
1597
 
        self.assertEqual('contents of a\n', t.get('a').read())
 
1526
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1527
        self.assertEqual('contents of a\n',t.get('a').read())
1598
1528
        # stdin should be empty
1599
1529
        self.assertEqual('', ui.ui_factory.stdin.readline())
1600
1530
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1601
 
                                    stderr.getvalue())
1602
 
        self.assertEquals('', stdout.getvalue())
 
1531
                                    stdout.getvalue())
1603
1532
        # And we shouldn't prompt again for a different request
1604
1533
        # against the same transport.
1605
1534
        self.assertEqual('contents of b\n',t.get('b').read())
1617
1546
                                 self.server.auth_realm)))
1618
1547
        self.assertEquals(expected_prompt, actual_prompt)
1619
1548
 
1620
 
    def _expected_username_prompt(self, scheme):
1621
 
        return (self._username_prompt_prefix
1622
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1623
 
                                 self.server.host, self.server.port,
1624
 
                                 self.server.auth_realm))
1625
 
 
1626
1549
    def test_no_prompt_for_password_when_using_auth_config(self):
1627
1550
        if self._testing_pycurl():
1628
1551
            raise tests.TestNotApplicable(
1669
1592
        self.assertEqual(1, self.server.auth_required_errors)
1670
1593
 
1671
1594
    def test_changing_nonce(self):
1672
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1673
 
                                     http_utils.ProxyDigestAuthServer):
1674
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1595
        if self._auth_scheme != 'digest':
 
1596
            raise tests.TestNotApplicable('HTTP auth digest only test')
1675
1597
        if self._testing_pycurl():
1676
1598
            raise tests.KnownFailure(
1677
1599
                'pycurl does not handle a nonce change')
1695
1617
    """Test proxy authentication schemes."""
1696
1618
 
1697
1619
    _auth_header = 'Proxy-authorization'
1698
 
    _password_prompt_prefix = 'Proxy '
1699
 
    _username_prompt_prefix = 'Proxy '
 
1620
    _password_prompt_prefix='Proxy '
1700
1621
 
1701
1622
    def setUp(self):
1702
1623
        super(TestProxyAuth, self).setUp()
1709
1630
                                  ('b-proxied', 'contents of b\n'),
1710
1631
                                  ])
1711
1632
 
 
1633
    def create_transport_readonly_server(self):
 
1634
        if self._auth_scheme == 'basic':
 
1635
            server = http_utils.ProxyBasicAuthServer(
 
1636
                protocol_version=self._protocol_version)
 
1637
        else:
 
1638
            if self._auth_scheme != 'digest':
 
1639
                raise AssertionError('Unknown auth scheme: %r'
 
1640
                                     % self._auth_scheme)
 
1641
            server = http_utils.ProxyDigestAuthServer(
 
1642
                protocol_version=self._protocol_version)
 
1643
        return server
 
1644
 
1712
1645
    def get_user_transport(self, user, password):
1713
1646
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1714
1647
        return self._transport(self.server.get_url())