~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Vincent Ladeuil
  • Date: 2009-07-02 13:07:14 UTC
  • mto: (4524.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4525.
  • Revision ID: v.ladeuil+lp@free.fr-20090702130714-hsyqfusi8vn3a11m
Use tree.has_changes() where appropriate (the test suite caught a
bug in has_changes() (not filtering out the root) in an impressive
number of tests)

* bzrlib/send.py:
(send): Use tree.has_changes() instead of tree.changes_from().

* bzrlib/reconfigure.py:
(Reconfigure._check): Use tree.has_changes() instead of
tree.changes_from().

* bzrlib/merge.py:
(Merger.ensure_revision_trees, Merger.compare_basis): Use
tree.has_changes() instead of tree.changes_from().

* bzrlib/builtins.py:
(cmd_remove_tree.run, cmd_push.run, cmd_merge.run): Use
tree.has_changes() instead of tree.changes_from().

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
71
71
def load_tests(standard_tests, module, loader):
72
72
    """Multiply tests for http clients and protocol versions."""
73
73
    result = loader.suiteClass()
74
 
    adapter = tests.TestScenarioApplier()
75
 
    remaining_tests = standard_tests
76
74
 
77
 
    # one for each transport
 
75
    # one for each transport implementation
78
76
    t_tests, remaining_tests = tests.split_suite_by_condition(
79
 
        remaining_tests, tests.condition_isinstance((
 
77
        standard_tests, tests.condition_isinstance((
80
78
                TestHttpTransportRegistration,
81
79
                TestHttpTransportUrls,
82
80
                Test_redirected_to,
91
89
            ('pycurl', dict(_transport=PyCurlTransport,
92
90
                            _server=http_server.HttpServer_PyCurl,
93
91
                            _qualified_prefix='http+pycurl',)))
94
 
    adapter.scenarios = transport_scenarios
95
 
    tests.adapt_tests(t_tests, adapter, result)
 
92
    tests.multiply_tests(t_tests, transport_scenarios, result)
96
93
 
97
 
    # multiplied by one for each protocol version
 
94
    # each implementation tested with each HTTP version
98
95
    tp_tests, remaining_tests = tests.split_suite_by_condition(
99
96
        remaining_tests, tests.condition_isinstance((
100
97
                SmartHTTPTunnellingTest,
112
109
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
113
110
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
114
111
            ]
115
 
    tp_scenarios = tests.multiply_scenarios(adapter.scenarios,
 
112
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
116
113
                                            protocol_scenarios)
117
 
    adapter.scenarios = tp_scenarios
118
 
    tests.adapt_tests(tp_tests, adapter, result)
119
 
 
120
 
    # multiplied by one for each authentication scheme
 
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
 
115
 
 
116
    # proxy auth: each auth scheme on all http versions on all implementations.
 
117
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
118
        remaining_tests, tests.condition_isinstance((
 
119
                TestProxyAuth,
 
120
                )))
 
121
    proxy_auth_scheme_scenarios = [
 
122
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
123
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
124
        ('basicdigest',
 
125
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
126
        ]
 
127
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
128
                                              proxy_auth_scheme_scenarios)
 
129
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
130
 
 
131
    # auth: each auth scheme on all http versions on all implementations.
121
132
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
122
133
        remaining_tests, tests.condition_isinstance((
123
134
                TestAuth,
124
135
                )))
125
136
    auth_scheme_scenarios = [
126
 
        ('basic', dict(_auth_scheme='basic')),
127
 
        ('digest', dict(_auth_scheme='digest')),
 
137
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
138
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
139
        ('basicdigest',
 
140
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
128
141
        ]
129
 
    adapter.scenarios = tests.multiply_scenarios(adapter.scenarios,
130
 
                                                 auth_scheme_scenarios)
131
 
    tests.adapt_tests(tpa_tests, adapter, result)
 
142
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
143
                                             auth_scheme_scenarios)
 
144
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
132
145
 
 
146
    # activity: on all http[s] versions on all implementations
133
147
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
134
148
        remaining_tests, tests.condition_isinstance((
135
149
                TestActivity,
136
150
                )))
137
151
    activity_scenarios = [
138
 
        ('http', dict(_activity_server=ActivityHTTPServer)),
 
152
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
153
                             _transport=_urllib.HttpTransport_urllib,)),
139
154
        ]
140
155
    if tests.HTTPSServerFeature.available():
141
156
        activity_scenarios.append(
142
 
            ('https', dict(_activity_server=ActivityHTTPSServer,)))
143
 
    adapter.scenarios = tests.multiply_scenarios(tp_scenarios,
144
 
                                                 activity_scenarios)
145
 
    tests.adapt_tests(tpact_tests, adapter, result)
 
157
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
158
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
159
    if pycurl_present:
 
160
        activity_scenarios.append(
 
161
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
162
                                 _transport=PyCurlTransport,)),)
 
163
        if tests.HTTPSServerFeature.available():
 
164
            from bzrlib.tests import (
 
165
                ssl_certs,
 
166
                )
 
167
            # FIXME: Until we have a better way to handle self-signed
 
168
            # certificates (like allowing them in a test specific
 
169
            # authentication.conf for example), we need some specialized pycurl
 
170
            # transport for tests.
 
171
            class HTTPS_pycurl_transport(PyCurlTransport):
 
172
 
 
173
                def __init__(self, base, _from_transport=None):
 
174
                    super(HTTPS_pycurl_transport, self).__init__(
 
175
                        base, _from_transport)
 
176
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
177
 
 
178
            activity_scenarios.append(
 
179
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
180
                                      _transport=HTTPS_pycurl_transport,)),)
 
181
 
 
182
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
183
                                               protocol_scenarios)
 
184
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
146
185
 
147
186
    # No parametrization for the remaining tests
148
187
    result.addTests(remaining_tests)
161
200
 
162
201
class RecordingServer(object):
163
202
    """A fake HTTP server.
164
 
    
 
203
 
165
204
    It records the bytes sent to it, and replies with a 200.
166
205
    """
167
206
 
216
255
        self.port = None
217
256
 
218
257
 
 
258
class TestAuthHeader(tests.TestCase):
 
259
 
 
260
    def parse_header(self, header, auth_handler_class=None):
 
261
        if auth_handler_class is None:
 
262
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
 
263
        self.auth_handler =  auth_handler_class()
 
264
        return self.auth_handler._parse_auth_header(header)
 
265
 
 
266
    def test_empty_header(self):
 
267
        scheme, remainder = self.parse_header('')
 
268
        self.assertEquals('', scheme)
 
269
        self.assertIs(None, remainder)
 
270
 
 
271
    def test_negotiate_header(self):
 
272
        scheme, remainder = self.parse_header('Negotiate')
 
273
        self.assertEquals('negotiate', scheme)
 
274
        self.assertIs(None, remainder)
 
275
 
 
276
    def test_basic_header(self):
 
277
        scheme, remainder = self.parse_header(
 
278
            'Basic realm="Thou should not pass"')
 
279
        self.assertEquals('basic', scheme)
 
280
        self.assertEquals('realm="Thou should not pass"', remainder)
 
281
 
 
282
    def test_basic_extract_realm(self):
 
283
        scheme, remainder = self.parse_header(
 
284
            'Basic realm="Thou should not pass"',
 
285
            _urllib2_wrappers.BasicAuthHandler)
 
286
        match, realm = self.auth_handler.extract_realm(remainder)
 
287
        self.assertTrue(match is not None)
 
288
        self.assertEquals('Thou should not pass', realm)
 
289
 
 
290
    def test_digest_header(self):
 
291
        scheme, remainder = self.parse_header(
 
292
            'Digest realm="Thou should not pass"')
 
293
        self.assertEquals('digest', scheme)
 
294
        self.assertEquals('realm="Thou should not pass"', remainder)
 
295
 
 
296
 
219
297
class TestHTTPServer(tests.TestCase):
220
298
    """Test the HTTP servers implementations."""
221
299
 
800
878
        # bytes on the socket
801
879
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
802
880
        self.assertEqual((0, '0'), ireadv.next())
803
 
        # The server should have issued one request so far 
 
881
        # The server should have issued one request so far
804
882
        self.assertEqual(1, server.GET_request_nb)
805
883
        self.assertEqual('0123456789', t.get_bytes('a'))
806
884
        # get_bytes issued an additional request, the readv pending ones are
1281
1359
        # Since the tests using this class will replace
1282
1360
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1283
1361
        # or we'll loop.
1284
 
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
 
1362
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1285
1363
        self.follow_redirections = True
1286
1364
 
1287
1365
 
1414
1492
 
1415
1493
    _auth_header = 'Authorization'
1416
1494
    _password_prompt_prefix = ''
 
1495
    _username_prompt_prefix = ''
 
1496
    # Set by load_tests
 
1497
    _auth_server = None
1417
1498
 
1418
1499
    def setUp(self):
1419
1500
        super(TestAuth, self).setUp()
1422
1503
                                  ('b', 'contents of b\n'),])
1423
1504
 
1424
1505
    def create_transport_readonly_server(self):
1425
 
        if self._auth_scheme == 'basic':
1426
 
            server = http_utils.HTTPBasicAuthServer(
1427
 
                protocol_version=self._protocol_version)
1428
 
        else:
1429
 
            if self._auth_scheme != 'digest':
1430
 
                raise AssertionError('Unknown auth scheme: %r'
1431
 
                                     % self._auth_scheme)
1432
 
            server = http_utils.HTTPDigestAuthServer(
1433
 
                protocol_version=self._protocol_version)
1434
 
        return server
 
1506
        return self._auth_server(protocol_version=self._protocol_version)
1435
1507
 
1436
1508
    def _testing_pycurl(self):
1437
1509
        return pycurl_present and self._transport == PyCurlTransport
1488
1560
        # initial 'who are you' and 'this is not you, who are you')
1489
1561
        self.assertEqual(2, self.server.auth_required_errors)
1490
1562
 
 
1563
    def test_prompt_for_username(self):
 
1564
        if self._testing_pycurl():
 
1565
            raise tests.TestNotApplicable(
 
1566
                'pycurl cannot prompt, it handles auth by embedding'
 
1567
                ' user:pass in urls only')
 
1568
 
 
1569
        self.server.add_user('joe', 'foo')
 
1570
        t = self.get_user_transport(None, None)
 
1571
        stdout = tests.StringIOWrapper()
 
1572
        stderr = tests.StringIOWrapper()
 
1573
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1574
                                            stdout=stdout, stderr=stderr)
 
1575
        self.assertEqual('contents of a\n',t.get('a').read())
 
1576
        # stdin should be empty
 
1577
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1578
        stderr.seek(0)
 
1579
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
 
1580
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
 
1581
        self.assertEquals('', stdout.getvalue())
 
1582
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1583
                                    stderr.readline())
 
1584
 
1491
1585
    def test_prompt_for_password(self):
1492
1586
        if self._testing_pycurl():
1493
1587
            raise tests.TestNotApplicable(
1497
1591
        self.server.add_user('joe', 'foo')
1498
1592
        t = self.get_user_transport('joe', None)
1499
1593
        stdout = tests.StringIOWrapper()
1500
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1501
 
        self.assertEqual('contents of a\n',t.get('a').read())
 
1594
        stderr = tests.StringIOWrapper()
 
1595
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1596
                                            stdout=stdout, stderr=stderr)
 
1597
        self.assertEqual('contents of a\n', t.get('a').read())
1502
1598
        # stdin should be empty
1503
1599
        self.assertEqual('', ui.ui_factory.stdin.readline())
1504
1600
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1505
 
                                    stdout.getvalue())
 
1601
                                    stderr.getvalue())
 
1602
        self.assertEquals('', stdout.getvalue())
1506
1603
        # And we shouldn't prompt again for a different request
1507
1604
        # against the same transport.
1508
1605
        self.assertEqual('contents of b\n',t.get('b').read())
1520
1617
                                 self.server.auth_realm)))
1521
1618
        self.assertEquals(expected_prompt, actual_prompt)
1522
1619
 
 
1620
    def _expected_username_prompt(self, scheme):
 
1621
        return (self._username_prompt_prefix
 
1622
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
 
1623
                                 self.server.host, self.server.port,
 
1624
                                 self.server.auth_realm))
 
1625
 
1523
1626
    def test_no_prompt_for_password_when_using_auth_config(self):
1524
1627
        if self._testing_pycurl():
1525
1628
            raise tests.TestNotApplicable(
1566
1669
        self.assertEqual(1, self.server.auth_required_errors)
1567
1670
 
1568
1671
    def test_changing_nonce(self):
1569
 
        if self._auth_scheme != 'digest':
1570
 
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1672
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1673
                                     http_utils.ProxyDigestAuthServer):
 
1674
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1571
1675
        if self._testing_pycurl():
1572
1676
            raise tests.KnownFailure(
1573
1677
                'pycurl does not handle a nonce change')
1591
1695
    """Test proxy authentication schemes."""
1592
1696
 
1593
1697
    _auth_header = 'Proxy-authorization'
1594
 
    _password_prompt_prefix='Proxy '
 
1698
    _password_prompt_prefix = 'Proxy '
 
1699
    _username_prompt_prefix = 'Proxy '
1595
1700
 
1596
1701
    def setUp(self):
1597
1702
        super(TestProxyAuth, self).setUp()
1604
1709
                                  ('b-proxied', 'contents of b\n'),
1605
1710
                                  ])
1606
1711
 
1607
 
    def create_transport_readonly_server(self):
1608
 
        if self._auth_scheme == 'basic':
1609
 
            server = http_utils.ProxyBasicAuthServer(
1610
 
                protocol_version=self._protocol_version)
1611
 
        else:
1612
 
            if self._auth_scheme != 'digest':
1613
 
                raise AssertionError('Unknown auth scheme: %r'
1614
 
                                     % self._auth_scheme)
1615
 
            server = http_utils.ProxyDigestAuthServer(
1616
 
                protocol_version=self._protocol_version)
1617
 
        return server
1618
 
 
1619
1712
    def get_user_transport(self, user, password):
1620
1713
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1621
1714
        return self._transport(self.server.get_url())
1879
1972
 
1880
1973
        # We override at class level because constructors may propagate the
1881
1974
        # bound method and render instance overriding ineffective (an
1882
 
        # alternative would be be to define a specific ui factory instead...)
 
1975
        # alternative would be to define a specific ui factory instead...)
1883
1976
        self.orig_report_activity = self._transport._report_activity
1884
1977
        self._transport._report_activity = report_activity
1885
1978