~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-04-08 12:39:40 UTC
  • mfrom: (4266.2.1 tests.output)
  • Revision ID: pqm@pqm.ubuntu.com-20090408123940-kaho6cwr21163fjn
(robertc) Remove clutter from bzr selftest --list. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
48
48
    deprecated_in,
49
49
    )
50
50
from bzrlib.tests import (
51
 
    features,
52
51
    http_server,
53
52
    http_utils,
54
53
    )
62
61
    )
63
62
 
64
63
 
65
 
if features.pycurl.available():
 
64
try:
66
65
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
66
    pycurl_present = True
 
67
except errors.DependencyNotPresent:
 
68
    pycurl_present = False
67
69
 
68
70
 
69
71
def load_tests(standard_tests, module, loader):
82
84
                        _server=http_server.HttpServer_urllib,
83
85
                        _qualified_prefix='http+urllib',)),
84
86
        ]
85
 
    if features.pycurl.available():
 
87
    if pycurl_present:
86
88
        transport_scenarios.append(
87
89
            ('pycurl', dict(_transport=PyCurlTransport,
88
90
                            _server=http_server.HttpServer_PyCurl,
89
91
                            _qualified_prefix='http+pycurl',)))
90
92
    tests.multiply_tests(t_tests, transport_scenarios, result)
91
93
 
92
 
    protocol_scenarios = [
93
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
94
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
95
 
            ]
96
 
 
97
 
    # some tests are parametrized by the protocol version only
98
 
    p_tests, remaining_tests = tests.split_suite_by_condition(
99
 
        remaining_tests, tests.condition_isinstance((
100
 
                TestAuthOnRedirected,
101
 
                )))
102
 
    tests.multiply_tests(p_tests, protocol_scenarios, result)
103
 
 
104
94
    # each implementation tested with each HTTP version
105
95
    tp_tests, remaining_tests = tests.split_suite_by_condition(
106
96
        remaining_tests, tests.condition_isinstance((
115
105
                TestRanges,
116
106
                TestSpecificRequestHandler,
117
107
                )))
 
108
    protocol_scenarios = [
 
109
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
110
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
111
            ]
118
112
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
119
113
                                            protocol_scenarios)
120
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
121
115
 
122
 
    # proxy auth: each auth scheme on all http versions on all implementations.
123
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
124
 
        remaining_tests, tests.condition_isinstance((
125
 
                TestProxyAuth,
126
 
                )))
127
 
    proxy_auth_scheme_scenarios = [
128
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
129
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
130
 
        ('basicdigest',
131
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
132
 
        ]
133
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
134
 
                                              proxy_auth_scheme_scenarios)
135
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
136
 
 
137
116
    # auth: each auth scheme on all http versions on all implementations.
138
117
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
139
118
        remaining_tests, tests.condition_isinstance((
140
119
                TestAuth,
141
120
                )))
142
121
    auth_scheme_scenarios = [
143
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
144
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
145
 
        ('basicdigest',
146
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
 
122
        ('basic', dict(_auth_scheme='basic')),
 
123
        ('digest', dict(_auth_scheme='digest')),
147
124
        ]
148
125
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
149
 
                                             auth_scheme_scenarios)
 
126
        auth_scheme_scenarios)
150
127
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
151
128
 
152
 
    # activity: on all http[s] versions on all implementations
 
129
    # activity: activity on all http versions on all implementations
153
130
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
154
131
        remaining_tests, tests.condition_isinstance((
155
132
                TestActivity,
156
133
                )))
157
134
    activity_scenarios = [
158
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
159
 
                             _transport=_urllib.HttpTransport_urllib,)),
 
135
        ('http', dict(_activity_server=ActivityHTTPServer)),
160
136
        ]
161
137
    if tests.HTTPSServerFeature.available():
162
138
        activity_scenarios.append(
163
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
164
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
165
 
    if features.pycurl.available():
166
 
        activity_scenarios.append(
167
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
168
 
                                 _transport=PyCurlTransport,)),)
169
 
        if tests.HTTPSServerFeature.available():
170
 
            from bzrlib.tests import (
171
 
                ssl_certs,
172
 
                )
173
 
            # FIXME: Until we have a better way to handle self-signed
174
 
            # certificates (like allowing them in a test specific
175
 
            # authentication.conf for example), we need some specialized pycurl
176
 
            # transport for tests.
177
 
            class HTTPS_pycurl_transport(PyCurlTransport):
178
 
 
179
 
                def __init__(self, base, _from_transport=None):
180
 
                    super(HTTPS_pycurl_transport, self).__init__(
181
 
                        base, _from_transport)
182
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
183
 
 
184
 
            activity_scenarios.append(
185
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
186
 
                                      _transport=HTTPS_pycurl_transport,)),)
187
 
 
188
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
189
 
                                               protocol_scenarios)
 
139
            ('https', dict(_activity_server=ActivityHTTPSServer)))
 
140
    tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
 
141
        activity_scenarios)
190
142
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
191
143
 
192
144
    # No parametrization for the remaining tests
210
162
    It records the bytes sent to it, and replies with a 200.
211
163
    """
212
164
 
213
 
    def __init__(self, expect_body_tail=None, scheme=''):
 
165
    def __init__(self, expect_body_tail=None):
214
166
        """Constructor.
215
167
 
216
168
        :type expect_body_tail: str
221
173
        self.host = None
222
174
        self.port = None
223
175
        self.received_bytes = ''
224
 
        self.scheme = scheme
225
 
 
226
 
    def get_url(self):
227
 
        return '%s://%s:%s/' % (self.scheme, self.host, self.port)
228
 
 
229
 
    def start_server(self):
 
176
 
 
177
    def setUp(self):
230
178
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
231
179
        self._sock.bind(('127.0.0.1', 0))
232
180
        self.host, self.port = self._sock.getsockname()
255
203
            # The client may have already closed the socket.
256
204
            pass
257
205
 
258
 
    def stop_server(self):
 
206
    def tearDown(self):
259
207
        try:
260
208
            self._sock.close()
261
209
        except socket.error:
267
215
 
268
216
class TestAuthHeader(tests.TestCase):
269
217
 
270
 
    def parse_header(self, header, auth_handler_class=None):
271
 
        if auth_handler_class is None:
272
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
273
 
        self.auth_handler =  auth_handler_class()
274
 
        return self.auth_handler._parse_auth_header(header)
 
218
    def parse_header(self, header):
 
219
        ah =  _urllib2_wrappers.AbstractAuthHandler()
 
220
        return ah._parse_auth_header(header)
275
221
 
276
222
    def test_empty_header(self):
277
223
        scheme, remainder = self.parse_header('')
278
 
        self.assertEqual('', scheme)
 
224
        self.assertEquals('', scheme)
279
225
        self.assertIs(None, remainder)
280
226
 
281
227
    def test_negotiate_header(self):
282
228
        scheme, remainder = self.parse_header('Negotiate')
283
 
        self.assertEqual('negotiate', scheme)
 
229
        self.assertEquals('negotiate', scheme)
284
230
        self.assertIs(None, remainder)
285
231
 
286
232
    def test_basic_header(self):
287
233
        scheme, remainder = self.parse_header(
288
234
            'Basic realm="Thou should not pass"')
289
 
        self.assertEqual('basic', scheme)
290
 
        self.assertEqual('realm="Thou should not pass"', remainder)
291
 
 
292
 
    def test_basic_extract_realm(self):
293
 
        scheme, remainder = self.parse_header(
294
 
            'Basic realm="Thou should not pass"',
295
 
            _urllib2_wrappers.BasicAuthHandler)
296
 
        match, realm = self.auth_handler.extract_realm(remainder)
297
 
        self.assertTrue(match is not None)
298
 
        self.assertEqual('Thou should not pass', realm)
 
235
        self.assertEquals('basic', scheme)
 
236
        self.assertEquals('realm="Thou should not pass"', remainder)
299
237
 
300
238
    def test_digest_header(self):
301
239
        scheme, remainder = self.parse_header(
302
240
            'Digest realm="Thou should not pass"')
303
 
        self.assertEqual('digest', scheme)
304
 
        self.assertEqual('realm="Thou should not pass"', remainder)
 
241
        self.assertEquals('digest', scheme)
 
242
        self.assertEquals('realm="Thou should not pass"', remainder)
305
243
 
306
244
 
307
245
class TestHTTPServer(tests.TestCase):
314
252
 
315
253
        server = http_server.HttpServer(BogusRequestHandler)
316
254
        try:
317
 
            self.assertRaises(httplib.UnknownProtocol, server.start_server)
 
255
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
318
256
        except:
319
 
            server.stop_server()
 
257
            server.tearDown()
320
258
            self.fail('HTTP Server creation did not raise UnknownProtocol')
321
259
 
322
260
    def test_force_invalid_protocol(self):
323
261
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
324
262
        try:
325
 
            self.assertRaises(httplib.UnknownProtocol, server.start_server)
 
263
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
326
264
        except:
327
 
            server.stop_server()
 
265
            server.tearDown()
328
266
            self.fail('HTTP Server creation did not raise UnknownProtocol')
329
267
 
330
268
    def test_server_start_and_stop(self):
331
269
        server = http_server.HttpServer()
332
 
        server.start_server()
333
 
        try:
334
 
            self.assertTrue(server._http_running)
335
 
        finally:
336
 
            server.stop_server()
 
270
        server.setUp()
 
271
        self.assertTrue(server._http_running)
 
272
        server.tearDown()
337
273
        self.assertFalse(server._http_running)
338
274
 
339
275
    def test_create_http_server_one_zero(self):
342
278
            protocol_version = 'HTTP/1.0'
343
279
 
344
280
        server = http_server.HttpServer(RequestHandlerOneZero)
345
 
        self.start_server(server)
 
281
        server.setUp()
 
282
        self.addCleanup(server.tearDown)
346
283
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
347
284
 
348
285
    def test_create_http_server_one_one(self):
351
288
            protocol_version = 'HTTP/1.1'
352
289
 
353
290
        server = http_server.HttpServer(RequestHandlerOneOne)
354
 
        self.start_server(server)
 
291
        server.setUp()
 
292
        self.addCleanup(server.tearDown)
355
293
        self.assertIsInstance(server._httpd,
356
294
                              http_server.TestingThreadingHTTPServer)
357
295
 
362
300
 
363
301
        server = http_server.HttpServer(RequestHandlerOneZero,
364
302
                                        protocol_version='HTTP/1.1')
365
 
        self.start_server(server)
 
303
        server.setUp()
 
304
        self.addCleanup(server.tearDown)
366
305
        self.assertIsInstance(server._httpd,
367
306
                              http_server.TestingThreadingHTTPServer)
368
307
 
373
312
 
374
313
        server = http_server.HttpServer(RequestHandlerOneOne,
375
314
                                        protocol_version='HTTP/1.0')
376
 
        self.start_server(server)
 
315
        server.setUp()
 
316
        self.addCleanup(server.tearDown)
377
317
        self.assertIsInstance(server._httpd,
378
318
                              http_server.TestingHTTPServer)
379
319
 
382
322
    """Test case to inherit from if pycurl is present"""
383
323
 
384
324
    def _get_pycurl_maybe(self):
385
 
        self.requireFeature(features.pycurl)
386
 
        return PyCurlTransport
 
325
        try:
 
326
            from bzrlib.transport.http._pycurl import PyCurlTransport
 
327
            return PyCurlTransport
 
328
        except errors.DependencyNotPresent:
 
329
            raise tests.TestSkipped('pycurl not present')
387
330
 
388
331
    _transport = property(_get_pycurl_maybe)
389
332
 
396
339
    def test_url_parsing(self):
397
340
        f = FakeManager()
398
341
        url = http.extract_auth('http://example.com', f)
399
 
        self.assertEqual('http://example.com', url)
400
 
        self.assertEqual(0, len(f.credentials))
 
342
        self.assertEquals('http://example.com', url)
 
343
        self.assertEquals(0, len(f.credentials))
401
344
        url = http.extract_auth(
402
 
            'http://user:pass@example.com/bzr/bzr.dev', f)
403
 
        self.assertEqual('http://example.com/bzr/bzr.dev', url)
404
 
        self.assertEqual(1, len(f.credentials))
405
 
        self.assertEqual([None, 'example.com', 'user', 'pass'],
406
 
                         f.credentials[0])
 
345
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
 
346
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
 
347
        self.assertEquals(1, len(f.credentials))
 
348
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
349
                          f.credentials[0])
407
350
 
408
351
 
409
352
class TestHttpTransportUrls(tests.TestCase):
436
379
    def test_http_impl_urls(self):
437
380
        """There are servers which ask for particular clients to connect"""
438
381
        server = self._server()
439
 
        server.start_server()
440
382
        try:
 
383
            server.setUp()
441
384
            url = server.get_url()
442
385
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
443
386
        finally:
444
 
            server.stop_server()
 
387
            server.tearDown()
445
388
 
446
389
 
447
390
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
456
399
        https by supplying a fake version_info that do not
457
400
        support it.
458
401
        """
459
 
        self.requireFeature(features.pycurl)
460
 
        # Import the module locally now that we now it's available.
461
 
        pycurl = features.pycurl.module
 
402
        try:
 
403
            import pycurl
 
404
        except ImportError:
 
405
            raise tests.TestSkipped('pycurl not present')
462
406
 
463
 
        self.overrideAttr(pycurl, 'version_info',
464
 
                          # Fake the pycurl version_info This was taken from
465
 
                          # a windows pycurl without SSL (thanks to bialix)
466
 
                          lambda : (2,
467
 
                                    '7.13.2',
468
 
                                    462082,
469
 
                                    'i386-pc-win32',
470
 
                                    2576,
471
 
                                    None,
472
 
                                    0,
473
 
                                    None,
474
 
                                    ('ftp', 'gopher', 'telnet',
475
 
                                     'dict', 'ldap', 'http', 'file'),
476
 
                                    None,
477
 
                                    0,
478
 
                                    None))
479
 
        self.assertRaises(errors.DependencyNotPresent, self._transport,
480
 
                          'https://launchpad.net')
 
407
        version_info_orig = pycurl.version_info
 
408
        try:
 
409
            # Now that we have pycurl imported, we can fake its version_info
 
410
            # This was taken from a windows pycurl without SSL
 
411
            # (thanks to bialix)
 
412
            pycurl.version_info = lambda : (2,
 
413
                                            '7.13.2',
 
414
                                            462082,
 
415
                                            'i386-pc-win32',
 
416
                                            2576,
 
417
                                            None,
 
418
                                            0,
 
419
                                            None,
 
420
                                            ('ftp', 'gopher', 'telnet',
 
421
                                             'dict', 'ldap', 'http', 'file'),
 
422
                                            None,
 
423
                                            0,
 
424
                                            None)
 
425
            self.assertRaises(errors.DependencyNotPresent, self._transport,
 
426
                              'https://launchpad.net')
 
427
        finally:
 
428
            # Restore the right function
 
429
            pycurl.version_info = version_info_orig
481
430
 
482
431
 
483
432
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
542
491
class TestPost(tests.TestCase):
543
492
 
544
493
    def test_post_body_is_received(self):
545
 
        server = RecordingServer(expect_body_tail='end-of-body',
546
 
            scheme=self._qualified_prefix)
547
 
        self.start_server(server)
548
 
        url = server.get_url()
 
494
        server = RecordingServer(expect_body_tail='end-of-body')
 
495
        server.setUp()
 
496
        self.addCleanup(server.tearDown)
 
497
        scheme = self._qualified_prefix
 
498
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
549
499
        http_transport = self._transport(url)
550
500
        code, response = http_transport._post('abc def end-of-body')
551
501
        self.assertTrue(
600
550
                                      protocol_version=self._protocol_version)
601
551
 
602
552
    def _testing_pycurl(self):
603
 
        # TODO: This is duplicated for lots of the classes in this file
604
 
        return (features.pycurl.available()
605
 
                and self._transport == PyCurlTransport)
 
553
        return pycurl_present and self._transport == PyCurlTransport
606
554
 
607
555
 
608
556
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
625
573
        # for details) make no distinction between a closed
626
574
        # socket and badly formatted status line, so we can't
627
575
        # just test for ConnectionError, we have to test
628
 
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
629
 
        # instead of ConnectionError too.
630
 
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
631
 
                            errors.InvalidHttpResponse),
 
576
        # InvalidHttpResponse too.
 
577
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
632
578
                          t.has, 'foo/bar')
633
579
 
634
580
    def test_http_get(self):
635
581
        server = self.get_readonly_server()
636
582
        t = self._transport(server.get_url())
637
 
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
638
 
                           errors.InvalidHttpResponse),
 
583
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
639
584
                          t.get, 'foo/bar')
640
585
 
641
586
 
717
662
    _req_handler_class = BadProtocolRequestHandler
718
663
 
719
664
    def setUp(self):
720
 
        if self._testing_pycurl():
 
665
        if pycurl_present and self._transport == PyCurlTransport:
721
666
            raise tests.TestNotApplicable(
722
667
                "pycurl doesn't check the protocol version")
723
668
        super(TestBadProtocolServer, self).setUp()
767
712
        self.assertEqual(None, server.host)
768
713
        self.assertEqual(None, server.port)
769
714
 
770
 
    def test_setUp_and_stop(self):
 
715
    def test_setUp_and_tearDown(self):
771
716
        server = RecordingServer(expect_body_tail=None)
772
 
        server.start_server()
 
717
        server.setUp()
773
718
        try:
774
719
            self.assertNotEqual(None, server.host)
775
720
            self.assertNotEqual(None, server.port)
776
721
        finally:
777
 
            server.stop_server()
 
722
            server.tearDown()
778
723
        self.assertEqual(None, server.host)
779
724
        self.assertEqual(None, server.port)
780
725
 
781
726
    def test_send_receive_bytes(self):
782
 
        server = RecordingServer(expect_body_tail='c', scheme='http')
783
 
        self.start_server(server)
 
727
        server = RecordingServer(expect_body_tail='c')
 
728
        server.setUp()
 
729
        self.addCleanup(server.tearDown)
784
730
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
785
731
        sock.connect((server.host, server.port))
786
732
        sock.sendall('abc')
1177
1123
        if self._testing_pycurl():
1178
1124
            # Oh my ! pycurl does not check for the port as part of
1179
1125
            # no_proxy :-( So we just test the host part
1180
 
            self.no_proxy_host = self.server.host
 
1126
            self.no_proxy_host = 'localhost'
1181
1127
        else:
1182
1128
            self.no_proxy_host = self.proxy_address
1183
1129
        # The secondary server is the proxy
1186
1132
        self._old_env = {}
1187
1133
 
1188
1134
    def _testing_pycurl(self):
1189
 
        # TODO: This is duplicated for lots of the classes in this file
1190
 
        return (features.pycurl.available()
1191
 
                and self._transport == PyCurlTransport)
 
1135
        return pycurl_present and self._transport == PyCurlTransport
1192
1136
 
1193
1137
    def create_transport_secondary_server(self):
1194
1138
        """Creates an http server that will serve files with
1367
1311
        self.follow_redirections = True
1368
1312
 
1369
1313
 
1370
 
def install_redirected_request(test):
1371
 
    test.overrideAttr(_urllib2_wrappers, 'Request', RedirectedRequest)
1372
 
 
1373
 
 
1374
1314
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1375
1315
    """Test redirections.
1376
1316
 
1386
1326
    """
1387
1327
 
1388
1328
    def setUp(self):
1389
 
        if (features.pycurl.available()
1390
 
            and self._transport == PyCurlTransport):
 
1329
        if pycurl_present and self._transport == PyCurlTransport:
1391
1330
            raise tests.TestNotApplicable(
1392
1331
                "pycurl doesn't redirect silently annymore")
1393
1332
        super(TestHTTPSilentRedirections, self).setUp()
1394
 
        install_redirected_request(self)
 
1333
        self.setup_redirected_request()
 
1334
        self.addCleanup(self.cleanup_redirected_request)
1395
1335
        self.build_tree_contents([('a','a'),
1396
1336
                                  ('1/',),
1397
1337
                                  ('1/a', 'redirected once'),
1407
1347
 
1408
1348
        self.old_transport = self._transport(self.old_server.get_url())
1409
1349
 
 
1350
    def setup_redirected_request(self):
 
1351
        self.original_class = _urllib2_wrappers.Request
 
1352
        _urllib2_wrappers.Request = RedirectedRequest
 
1353
 
 
1354
    def cleanup_redirected_request(self):
 
1355
        _urllib2_wrappers.Request = self.original_class
 
1356
 
1410
1357
    def create_transport_secondary_server(self):
1411
1358
        """Create the secondary server, redirections are defined in the tests"""
1412
1359
        return http_utils.HTTPServerRedirecting(
1416
1363
        t = self.old_transport
1417
1364
 
1418
1365
        req = RedirectedRequest('GET', t.abspath('a'))
 
1366
        req.follow_redirections = True
1419
1367
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1420
1368
                                       self.new_server.port)
1421
1369
        self.old_server.redirections = \
1422
1370
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
1423
 
        self.assertEqual('redirected once',t._perform(req).read())
 
1371
        self.assertEquals('redirected once',t._perform(req).read())
1424
1372
 
1425
1373
    def test_five_redirections(self):
1426
1374
        t = self.old_transport
1427
1375
 
1428
1376
        req = RedirectedRequest('GET', t.abspath('a'))
 
1377
        req.follow_redirections = True
1429
1378
        old_prefix = 'http://%s:%s' % (self.old_server.host,
1430
1379
                                       self.old_server.port)
1431
1380
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1437
1386
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1438
1387
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1439
1388
            ]
1440
 
        self.assertEqual('redirected 5 times',t._perform(req).read())
 
1389
        self.assertEquals('redirected 5 times',t._perform(req).read())
1441
1390
 
1442
1391
 
1443
1392
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1456
1405
        t = self._transport(self.new_server.get_url())
1457
1406
 
1458
1407
        # We use None for redirected so that we fail if redirected
1459
 
        self.assertEqual('0123456789',
1460
 
                         transport.do_catching_redirections(
 
1408
        self.assertEquals('0123456789',
 
1409
                          transport.do_catching_redirections(
1461
1410
                self.get_a, t, None).read())
1462
1411
 
1463
1412
    def test_one_redirection(self):
1468
1417
            dir, file = urlutils.split(exception.target)
1469
1418
            return self._transport(dir)
1470
1419
 
1471
 
        self.assertEqual('0123456789',
1472
 
                         transport.do_catching_redirections(
 
1420
        self.assertEquals('0123456789',
 
1421
                          transport.do_catching_redirections(
1473
1422
                self.get_a, self.old_transport, redirected).read())
1474
 
        self.assertEqual(1, self.redirections)
 
1423
        self.assertEquals(1, self.redirections)
1475
1424
 
1476
1425
    def test_redirection_loop(self):
1477
1426
 
1491
1440
 
1492
1441
    _auth_header = 'Authorization'
1493
1442
    _password_prompt_prefix = ''
1494
 
    _username_prompt_prefix = ''
1495
 
    # Set by load_tests
1496
 
    _auth_server = None
1497
1443
 
1498
1444
    def setUp(self):
1499
1445
        super(TestAuth, self).setUp()
1502
1448
                                  ('b', 'contents of b\n'),])
1503
1449
 
1504
1450
    def create_transport_readonly_server(self):
1505
 
        return self._auth_server(protocol_version=self._protocol_version)
 
1451
        if self._auth_scheme == 'basic':
 
1452
            server = http_utils.HTTPBasicAuthServer(
 
1453
                protocol_version=self._protocol_version)
 
1454
        else:
 
1455
            if self._auth_scheme != 'digest':
 
1456
                raise AssertionError('Unknown auth scheme: %r'
 
1457
                                     % self._auth_scheme)
 
1458
            server = http_utils.HTTPDigestAuthServer(
 
1459
                protocol_version=self._protocol_version)
 
1460
        return server
1506
1461
 
1507
1462
    def _testing_pycurl(self):
1508
 
        # TODO: This is duplicated for lots of the classes in this file
1509
 
        return (features.pycurl.available()
1510
 
                and self._transport == PyCurlTransport)
 
1463
        return pycurl_present and self._transport == PyCurlTransport
1511
1464
 
1512
1465
    def get_user_url(self, user, password):
1513
1466
        """Build an url embedding user and password"""
1561
1514
        # initial 'who are you' and 'this is not you, who are you')
1562
1515
        self.assertEqual(2, self.server.auth_required_errors)
1563
1516
 
1564
 
    def test_prompt_for_username(self):
1565
 
        if self._testing_pycurl():
1566
 
            raise tests.TestNotApplicable(
1567
 
                'pycurl cannot prompt, it handles auth by embedding'
1568
 
                ' user:pass in urls only')
1569
 
 
1570
 
        self.server.add_user('joe', 'foo')
1571
 
        t = self.get_user_transport(None, None)
1572
 
        stdout = tests.StringIOWrapper()
1573
 
        stderr = tests.StringIOWrapper()
1574
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1575
 
                                            stdout=stdout, stderr=stderr)
1576
 
        self.assertEqual('contents of a\n',t.get('a').read())
1577
 
        # stdin should be empty
1578
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1579
 
        stderr.seek(0)
1580
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1581
 
        self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1582
 
        self.assertEqual('', stdout.getvalue())
1583
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1584
 
                                    stderr.readline())
1585
 
 
1586
1517
    def test_prompt_for_password(self):
1587
1518
        if self._testing_pycurl():
1588
1519
            raise tests.TestNotApplicable(
1592
1523
        self.server.add_user('joe', 'foo')
1593
1524
        t = self.get_user_transport('joe', None)
1594
1525
        stdout = tests.StringIOWrapper()
1595
 
        stderr = tests.StringIOWrapper()
1596
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1597
 
                                            stdout=stdout, stderr=stderr)
1598
 
        self.assertEqual('contents of a\n', t.get('a').read())
 
1526
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1527
        self.assertEqual('contents of a\n',t.get('a').read())
1599
1528
        # stdin should be empty
1600
1529
        self.assertEqual('', ui.ui_factory.stdin.readline())
1601
1530
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1602
 
                                    stderr.getvalue())
1603
 
        self.assertEqual('', stdout.getvalue())
 
1531
                                    stdout.getvalue())
1604
1532
        # And we shouldn't prompt again for a different request
1605
1533
        # against the same transport.
1606
1534
        self.assertEqual('contents of b\n',t.get('b').read())
1616
1544
                              % (scheme.upper(),
1617
1545
                                 user, self.server.host, self.server.port,
1618
1546
                                 self.server.auth_realm)))
1619
 
        self.assertEqual(expected_prompt, actual_prompt)
1620
 
 
1621
 
    def _expected_username_prompt(self, scheme):
1622
 
        return (self._username_prompt_prefix
1623
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1624
 
                                 self.server.host, self.server.port,
1625
 
                                 self.server.auth_realm))
 
1547
        self.assertEquals(expected_prompt, actual_prompt)
1626
1548
 
1627
1549
    def test_no_prompt_for_password_when_using_auth_config(self):
1628
1550
        if self._testing_pycurl():
1636
1558
        self.server.add_user(user, password)
1637
1559
        t = self.get_user_transport(user, None)
1638
1560
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1639
 
                                            stderr=tests.StringIOWrapper())
 
1561
                                            stdout=tests.StringIOWrapper())
1640
1562
        # Create a minimal config file with the right password
1641
1563
        conf = config.AuthenticationConfig()
1642
1564
        conf._get_config().update(
1670
1592
        self.assertEqual(1, self.server.auth_required_errors)
1671
1593
 
1672
1594
    def test_changing_nonce(self):
1673
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1674
 
                                     http_utils.ProxyDigestAuthServer):
1675
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1595
        if self._auth_scheme != 'digest':
 
1596
            raise tests.TestNotApplicable('HTTP auth digest only test')
1676
1597
        if self._testing_pycurl():
1677
1598
            raise tests.KnownFailure(
1678
1599
                'pycurl does not handle a nonce change')
1696
1617
    """Test proxy authentication schemes."""
1697
1618
 
1698
1619
    _auth_header = 'Proxy-authorization'
1699
 
    _password_prompt_prefix = 'Proxy '
1700
 
    _username_prompt_prefix = 'Proxy '
 
1620
    _password_prompt_prefix='Proxy '
1701
1621
 
1702
1622
    def setUp(self):
1703
1623
        super(TestProxyAuth, self).setUp()
1710
1630
                                  ('b-proxied', 'contents of b\n'),
1711
1631
                                  ])
1712
1632
 
 
1633
    def create_transport_readonly_server(self):
 
1634
        if self._auth_scheme == 'basic':
 
1635
            server = http_utils.ProxyBasicAuthServer(
 
1636
                protocol_version=self._protocol_version)
 
1637
        else:
 
1638
            if self._auth_scheme != 'digest':
 
1639
                raise AssertionError('Unknown auth scheme: %r'
 
1640
                                     % self._auth_scheme)
 
1641
            server = http_utils.ProxyDigestAuthServer(
 
1642
                protocol_version=self._protocol_version)
 
1643
        return server
 
1644
 
1713
1645
    def get_user_transport(self, user, password):
1714
1646
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1715
1647
        return self._transport(self.server.get_url())
1854
1786
                             'http://www.example.com/foo/subdir')
1855
1787
        self.assertIsInstance(r, type(t))
1856
1788
        # Both transports share the some connection
1857
 
        self.assertEqual(t._get_connection(), r._get_connection())
 
1789
        self.assertEquals(t._get_connection(), r._get_connection())
1858
1790
 
1859
1791
    def test_redirected_to_self_with_slash(self):
1860
1792
        t = self._transport('http://www.example.com/foo')
1864
1796
        # Both transports share the some connection (one can argue that we
1865
1797
        # should return the exact same transport here, but that seems
1866
1798
        # overkill).
1867
 
        self.assertEqual(t._get_connection(), r._get_connection())
 
1799
        self.assertEquals(t._get_connection(), r._get_connection())
1868
1800
 
1869
1801
    def test_redirected_to_host(self):
1870
1802
        t = self._transport('http://www.example.com/foo')
1889
1821
        r = t._redirected_to('http://www.example.com/foo',
1890
1822
                             'https://foo.example.com/foo')
1891
1823
        self.assertIsInstance(r, type(t))
1892
 
        self.assertEqual(t._user, r._user)
 
1824
        self.assertEquals(t._user, r._user)
1893
1825
 
1894
1826
 
1895
1827
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1954
1886
        pass
1955
1887
 
1956
1888
 
1957
 
class TestActivityMixin(object):
 
1889
class TestActivity(tests.TestCase):
1958
1890
    """Test socket activity reporting.
1959
1891
 
1960
1892
    We use a special purpose server to control the bytes sent and received and
1964
1896
    def setUp(self):
1965
1897
        tests.TestCase.setUp(self)
1966
1898
        self.server = self._activity_server(self._protocol_version)
1967
 
        self.server.start_server()
 
1899
        self.server.setUp()
1968
1900
        self.activities = {}
1969
1901
        def report_activity(t, bytes, direction):
1970
1902
            count = self.activities.get(direction, 0)
1979
1911
 
1980
1912
    def tearDown(self):
1981
1913
        self._transport._report_activity = self.orig_report_activity
1982
 
        self.server.stop_server()
 
1914
        self.server.tearDown()
1983
1915
        tests.TestCase.tearDown(self)
1984
1916
 
1985
1917
    def get_transport(self):
2098
2030
        code, f = t._post('abc def end-of-body\n')
2099
2031
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2100
2032
        self.assertActivitiesMatch()
2101
 
 
2102
 
 
2103
 
class TestActivity(tests.TestCase, TestActivityMixin):
2104
 
 
2105
 
    def setUp(self):
2106
 
        tests.TestCase.setUp(self)
2107
 
        self.server = self._activity_server(self._protocol_version)
2108
 
        self.server.start_server()
2109
 
        self.activities = {}
2110
 
        def report_activity(t, bytes, direction):
2111
 
            count = self.activities.get(direction, 0)
2112
 
            count += bytes
2113
 
            self.activities[direction] = count
2114
 
 
2115
 
        # We override at class level because constructors may propagate the
2116
 
        # bound method and render instance overriding ineffective (an
2117
 
        # alternative would be to define a specific ui factory instead...)
2118
 
        self.orig_report_activity = self._transport._report_activity
2119
 
        self._transport._report_activity = report_activity
2120
 
 
2121
 
    def tearDown(self):
2122
 
        self._transport._report_activity = self.orig_report_activity
2123
 
        self.server.stop_server()
2124
 
        tests.TestCase.tearDown(self)
2125
 
 
2126
 
 
2127
 
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2128
 
 
2129
 
    def setUp(self):
2130
 
        tests.TestCase.setUp(self)
2131
 
        # Unlike TestActivity, we are really testing ReportingFileSocket and
2132
 
        # ReportingSocket, so we don't need all the parametrization. Since
2133
 
        # ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2134
 
        # test them through their use by the transport than directly (that's a
2135
 
        # bit less clean but far more simpler and effective).
2136
 
        self.server = ActivityHTTPServer('HTTP/1.1')
2137
 
        self._transport=_urllib.HttpTransport_urllib
2138
 
 
2139
 
        self.server.start_server()
2140
 
 
2141
 
        # We override at class level because constructors may propagate the
2142
 
        # bound method and render instance overriding ineffective (an
2143
 
        # alternative would be to define a specific ui factory instead...)
2144
 
        self.orig_report_activity = self._transport._report_activity
2145
 
        self._transport._report_activity = None
2146
 
 
2147
 
    def tearDown(self):
2148
 
        self._transport._report_activity = self.orig_report_activity
2149
 
        self.server.stop_server()
2150
 
        tests.TestCase.tearDown(self)
2151
 
 
2152
 
    def assertActivitiesMatch(self):
2153
 
        # Nothing to check here
2154
 
        pass
2155
 
 
2156
 
 
2157
 
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2158
 
    """Test authentication on the redirected http server."""
2159
 
 
2160
 
    _auth_header = 'Authorization'
2161
 
    _password_prompt_prefix = ''
2162
 
    _username_prompt_prefix = ''
2163
 
    _auth_server = http_utils.HTTPBasicAuthServer
2164
 
    _transport = _urllib.HttpTransport_urllib
2165
 
 
2166
 
    def create_transport_readonly_server(self):
2167
 
        return self._auth_server(protocol_version=self._protocol_version)
2168
 
 
2169
 
    def create_transport_secondary_server(self):
2170
 
        """Create the secondary server redirecting to the primary server"""
2171
 
        new = self.get_readonly_server()
2172
 
 
2173
 
        redirecting = http_utils.HTTPServerRedirecting(
2174
 
            protocol_version=self._protocol_version)
2175
 
        redirecting.redirect_to(new.host, new.port)
2176
 
        return redirecting
2177
 
 
2178
 
    def setUp(self):
2179
 
        super(TestAuthOnRedirected, self).setUp()
2180
 
        self.build_tree_contents([('a','a'),
2181
 
                                  ('1/',),
2182
 
                                  ('1/a', 'redirected once'),
2183
 
                                  ],)
2184
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
2185
 
                                       self.new_server.port)
2186
 
        self.old_server.redirections = [
2187
 
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
2188
 
        self.old_transport = self._transport(self.old_server.get_url())
2189
 
        self.new_server.add_user('joe', 'foo')
2190
 
 
2191
 
    def get_a(self, transport):
2192
 
        return transport.get('a')
2193
 
 
2194
 
    def test_auth_on_redirected_via_do_catching_redirections(self):
2195
 
        self.redirections = 0
2196
 
 
2197
 
        def redirected(transport, exception, redirection_notice):
2198
 
            self.redirections += 1
2199
 
            dir, file = urlutils.split(exception.target)
2200
 
            return self._transport(dir)
2201
 
 
2202
 
        stdout = tests.StringIOWrapper()
2203
 
        stderr = tests.StringIOWrapper()
2204
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2205
 
                                            stdout=stdout, stderr=stderr)
2206
 
        self.assertEqual('redirected once',
2207
 
                         transport.do_catching_redirections(
2208
 
                self.get_a, self.old_transport, redirected).read())
2209
 
        self.assertEqual(1, self.redirections)
2210
 
        # stdin should be empty
2211
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
2212
 
        # stdout should be empty, stderr will contains the prompts
2213
 
        self.assertEqual('', stdout.getvalue())
2214
 
 
2215
 
    def test_auth_on_redirected_via_following_redirections(self):
2216
 
        self.new_server.add_user('joe', 'foo')
2217
 
        stdout = tests.StringIOWrapper()
2218
 
        stderr = tests.StringIOWrapper()
2219
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2220
 
                                            stdout=stdout, stderr=stderr)
2221
 
        t = self.old_transport
2222
 
        req = RedirectedRequest('GET', t.abspath('a'))
2223
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
2224
 
                                       self.new_server.port)
2225
 
        self.old_server.redirections = [
2226
 
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
2227
 
        self.assertEqual('redirected once',t._perform(req).read())
2228
 
        # stdin should be empty
2229
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
2230
 
        # stdout should be empty, stderr will contains the prompts
2231
 
        self.assertEqual('', stdout.getvalue())
2232
 
 
2233