~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Joe Julian
  • Date: 2010-01-10 02:25:31 UTC
  • mto: (4634.119.7 2.0)
  • mto: This revision was merged to the branch mainline in revision 4959.
  • Revision ID: joe@julianfamily.org-20100110022531-wqk61rsagz8xsiga
Added MANIFEST.in to allow bdist_rpm to have all the required include files and tools. bdist_rpm will still fail to build correctly on some distributions due to a disttools bug http://bugs.python.org/issue644744

Show diffs side-by-side

added added

removed removed

Lines of Context:
113
113
                                            protocol_scenarios)
114
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
115
 
 
116
    # proxy auth: each auth scheme on all http versions on all implementations.
 
117
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
118
        remaining_tests, tests.condition_isinstance((
 
119
                TestProxyAuth,
 
120
                )))
 
121
    proxy_auth_scheme_scenarios = [
 
122
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
123
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
124
        ('basicdigest',
 
125
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
126
        ]
 
127
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
128
                                              proxy_auth_scheme_scenarios)
 
129
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
130
 
116
131
    # auth: each auth scheme on all http versions on all implementations.
117
132
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
118
133
        remaining_tests, tests.condition_isinstance((
119
134
                TestAuth,
120
135
                )))
121
136
    auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_scheme='basic')),
123
 
        ('digest', dict(_auth_scheme='digest')),
 
137
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
138
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
139
        ('basicdigest',
 
140
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
124
141
        ]
125
142
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
 
        auth_scheme_scenarios)
 
143
                                             auth_scheme_scenarios)
127
144
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
128
145
 
129
 
    # activity: activity on all http versions on all implementations
 
146
    # activity: on all http[s] versions on all implementations
130
147
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
131
148
        remaining_tests, tests.condition_isinstance((
132
149
                TestActivity,
133
150
                )))
134
151
    activity_scenarios = [
135
 
        ('http', dict(_activity_server=ActivityHTTPServer)),
 
152
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
153
                             _transport=_urllib.HttpTransport_urllib,)),
136
154
        ]
137
155
    if tests.HTTPSServerFeature.available():
138
156
        activity_scenarios.append(
139
 
            ('https', dict(_activity_server=ActivityHTTPSServer)))
140
 
    tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
141
 
        activity_scenarios)
 
157
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
158
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
159
    if pycurl_present:
 
160
        activity_scenarios.append(
 
161
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
162
                                 _transport=PyCurlTransport,)),)
 
163
        if tests.HTTPSServerFeature.available():
 
164
            from bzrlib.tests import (
 
165
                ssl_certs,
 
166
                )
 
167
            # FIXME: Until we have a better way to handle self-signed
 
168
            # certificates (like allowing them in a test specific
 
169
            # authentication.conf for example), we need some specialized pycurl
 
170
            # transport for tests.
 
171
            class HTTPS_pycurl_transport(PyCurlTransport):
 
172
 
 
173
                def __init__(self, base, _from_transport=None):
 
174
                    super(HTTPS_pycurl_transport, self).__init__(
 
175
                        base, _from_transport)
 
176
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
177
 
 
178
            activity_scenarios.append(
 
179
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
180
                                      _transport=HTTPS_pycurl_transport,)),)
 
181
 
 
182
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
183
                                               protocol_scenarios)
142
184
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
143
185
 
144
186
    # No parametrization for the remaining tests
583
625
        # for details) make no distinction between a closed
584
626
        # socket and badly formatted status line, so we can't
585
627
        # just test for ConnectionError, we have to test
586
 
        # InvalidHttpResponse too.
587
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
628
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
 
629
        # instead of ConnectionError too.
 
630
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
 
631
                            errors.InvalidHttpResponse),
588
632
                          t.has, 'foo/bar')
589
633
 
590
634
    def test_http_get(self):
591
635
        server = self.get_readonly_server()
592
636
        t = self._transport(server.get_url())
593
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
637
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
 
638
                           errors.InvalidHttpResponse),
594
639
                          t.get, 'foo/bar')
595
640
 
596
641
 
1451
1496
    _auth_header = 'Authorization'
1452
1497
    _password_prompt_prefix = ''
1453
1498
    _username_prompt_prefix = ''
 
1499
    # Set by load_tests
 
1500
    _auth_server = None
1454
1501
 
1455
1502
    def setUp(self):
1456
1503
        super(TestAuth, self).setUp()
1459
1506
                                  ('b', 'contents of b\n'),])
1460
1507
 
1461
1508
    def create_transport_readonly_server(self):
1462
 
        if self._auth_scheme == 'basic':
1463
 
            server = http_utils.HTTPBasicAuthServer(
1464
 
                protocol_version=self._protocol_version)
1465
 
        else:
1466
 
            if self._auth_scheme != 'digest':
1467
 
                raise AssertionError('Unknown auth scheme: %r'
1468
 
                                     % self._auth_scheme)
1469
 
            server = http_utils.HTTPDigestAuthServer(
1470
 
                protocol_version=self._protocol_version)
1471
 
        return server
 
1509
        return self._auth_server(protocol_version=self._protocol_version)
1472
1510
 
1473
1511
    def _testing_pycurl(self):
1474
1512
        return pycurl_present and self._transport == PyCurlTransport
1534
1572
        self.server.add_user('joe', 'foo')
1535
1573
        t = self.get_user_transport(None, None)
1536
1574
        stdout = tests.StringIOWrapper()
1537
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
 
1575
        stderr = tests.StringIOWrapper()
 
1576
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1577
                                            stdout=stdout, stderr=stderr)
1538
1578
        self.assertEqual('contents of a\n',t.get('a').read())
1539
1579
        # stdin should be empty
1540
1580
        self.assertEqual('', ui.ui_factory.stdin.readline())
1541
 
        stdout.seek(0)
 
1581
        stderr.seek(0)
1542
1582
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1543
 
        self.assertEquals(expected_prompt, stdout.read(len(expected_prompt)))
 
1583
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
 
1584
        self.assertEquals('', stdout.getvalue())
1544
1585
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1545
 
                                    stdout.readline())
 
1586
                                    stderr.readline())
1546
1587
 
1547
1588
    def test_prompt_for_password(self):
1548
1589
        if self._testing_pycurl():
1553
1594
        self.server.add_user('joe', 'foo')
1554
1595
        t = self.get_user_transport('joe', None)
1555
1596
        stdout = tests.StringIOWrapper()
1556
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1557
 
        self.assertEqual('contents of a\n',t.get('a').read())
 
1597
        stderr = tests.StringIOWrapper()
 
1598
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1599
                                            stdout=stdout, stderr=stderr)
 
1600
        self.assertEqual('contents of a\n', t.get('a').read())
1558
1601
        # stdin should be empty
1559
1602
        self.assertEqual('', ui.ui_factory.stdin.readline())
1560
1603
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1561
 
                                    stdout.getvalue())
 
1604
                                    stderr.getvalue())
 
1605
        self.assertEquals('', stdout.getvalue())
1562
1606
        # And we shouldn't prompt again for a different request
1563
1607
        # against the same transport.
1564
1608
        self.assertEqual('contents of b\n',t.get('b').read())
1628
1672
        self.assertEqual(1, self.server.auth_required_errors)
1629
1673
 
1630
1674
    def test_changing_nonce(self):
1631
 
        if self._auth_scheme != 'digest':
1632
 
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1675
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1676
                                     http_utils.ProxyDigestAuthServer):
 
1677
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1633
1678
        if self._testing_pycurl():
1634
1679
            raise tests.KnownFailure(
1635
1680
                'pycurl does not handle a nonce change')
1667
1712
                                  ('b-proxied', 'contents of b\n'),
1668
1713
                                  ])
1669
1714
 
1670
 
    def create_transport_readonly_server(self):
1671
 
        if self._auth_scheme == 'basic':
1672
 
            server = http_utils.ProxyBasicAuthServer(
1673
 
                protocol_version=self._protocol_version)
1674
 
        else:
1675
 
            if self._auth_scheme != 'digest':
1676
 
                raise AssertionError('Unknown auth scheme: %r'
1677
 
                                     % self._auth_scheme)
1678
 
            server = http_utils.ProxyDigestAuthServer(
1679
 
                protocol_version=self._protocol_version)
1680
 
        return server
1681
 
 
1682
1715
    def get_user_transport(self, user, password):
1683
1716
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1684
1717
        return self._transport(self.server.get_url())