~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-01-14 00:01:32 UTC
  • mfrom: (4957.1.1 jam-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20100114000132-3p3rabnonjw3gzqb
(jam) Merge bzr.stable, bringing in bug fixes #175839, #504390

Show diffs side-by-side

added added

removed removed

Lines of Context:
48
48
    deprecated_in,
49
49
    )
50
50
from bzrlib.tests import (
 
51
    features,
51
52
    http_server,
52
53
    http_utils,
53
54
    )
61
62
    )
62
63
 
63
64
 
64
 
try:
 
65
if features.pycurl.available():
65
66
    from bzrlib.transport.http._pycurl import PyCurlTransport
66
 
    pycurl_present = True
67
 
except errors.DependencyNotPresent:
68
 
    pycurl_present = False
69
67
 
70
68
 
71
69
def load_tests(standard_tests, module, loader):
84
82
                        _server=http_server.HttpServer_urllib,
85
83
                        _qualified_prefix='http+urllib',)),
86
84
        ]
87
 
    if pycurl_present:
 
85
    if features.pycurl.available():
88
86
        transport_scenarios.append(
89
87
            ('pycurl', dict(_transport=PyCurlTransport,
90
88
                            _server=http_server.HttpServer_PyCurl,
113
111
                                            protocol_scenarios)
114
112
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
113
 
 
114
    # proxy auth: each auth scheme on all http versions on all implementations.
 
115
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
116
        remaining_tests, tests.condition_isinstance((
 
117
                TestProxyAuth,
 
118
                )))
 
119
    proxy_auth_scheme_scenarios = [
 
120
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
121
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
122
        ('basicdigest',
 
123
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
124
        ]
 
125
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
126
                                              proxy_auth_scheme_scenarios)
 
127
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
128
 
116
129
    # auth: each auth scheme on all http versions on all implementations.
117
130
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
118
131
        remaining_tests, tests.condition_isinstance((
119
132
                TestAuth,
120
133
                )))
121
134
    auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_scheme='basic')),
123
 
        ('digest', dict(_auth_scheme='digest')),
 
135
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
136
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
137
        ('basicdigest',
 
138
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
124
139
        ]
125
140
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
 
        auth_scheme_scenarios)
 
141
                                             auth_scheme_scenarios)
127
142
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
128
143
 
129
 
    # activity: activity on all http versions on all implementations
 
144
    # activity: on all http[s] versions on all implementations
130
145
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
131
146
        remaining_tests, tests.condition_isinstance((
132
147
                TestActivity,
133
148
                )))
134
149
    activity_scenarios = [
135
 
        ('http', dict(_activity_server=ActivityHTTPServer)),
 
150
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
151
                             _transport=_urllib.HttpTransport_urllib,)),
136
152
        ]
137
153
    if tests.HTTPSServerFeature.available():
138
154
        activity_scenarios.append(
139
 
            ('https', dict(_activity_server=ActivityHTTPSServer)))
140
 
    tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
141
 
        activity_scenarios)
 
155
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
156
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
157
    if features.pycurl.available():
 
158
        activity_scenarios.append(
 
159
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
160
                                 _transport=PyCurlTransport,)),)
 
161
        if tests.HTTPSServerFeature.available():
 
162
            from bzrlib.tests import (
 
163
                ssl_certs,
 
164
                )
 
165
            # FIXME: Until we have a better way to handle self-signed
 
166
            # certificates (like allowing them in a test specific
 
167
            # authentication.conf for example), we need some specialized pycurl
 
168
            # transport for tests.
 
169
            class HTTPS_pycurl_transport(PyCurlTransport):
 
170
 
 
171
                def __init__(self, base, _from_transport=None):
 
172
                    super(HTTPS_pycurl_transport, self).__init__(
 
173
                        base, _from_transport)
 
174
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
175
 
 
176
            activity_scenarios.append(
 
177
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
178
                                      _transport=HTTPS_pycurl_transport,)),)
 
179
 
 
180
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
181
                                               protocol_scenarios)
142
182
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
143
183
 
144
184
    # No parametrization for the remaining tests
162
202
    It records the bytes sent to it, and replies with a 200.
163
203
    """
164
204
 
165
 
    def __init__(self, expect_body_tail=None):
 
205
    def __init__(self, expect_body_tail=None, scheme=''):
166
206
        """Constructor.
167
207
 
168
208
        :type expect_body_tail: str
173
213
        self.host = None
174
214
        self.port = None
175
215
        self.received_bytes = ''
176
 
 
177
 
    def setUp(self):
 
216
        self.scheme = scheme
 
217
 
 
218
    def get_url(self):
 
219
        return '%s://%s:%s/' % (self.scheme, self.host, self.port)
 
220
 
 
221
    def start_server(self):
178
222
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
179
223
        self._sock.bind(('127.0.0.1', 0))
180
224
        self.host, self.port = self._sock.getsockname()
203
247
            # The client may have already closed the socket.
204
248
            pass
205
249
 
206
 
    def tearDown(self):
 
250
    def stop_server(self):
207
251
        try:
208
252
            self._sock.close()
209
253
        except socket.error:
223
267
 
224
268
    def test_empty_header(self):
225
269
        scheme, remainder = self.parse_header('')
226
 
        self.assertEquals('', scheme)
 
270
        self.assertEqual('', scheme)
227
271
        self.assertIs(None, remainder)
228
272
 
229
273
    def test_negotiate_header(self):
230
274
        scheme, remainder = self.parse_header('Negotiate')
231
 
        self.assertEquals('negotiate', scheme)
 
275
        self.assertEqual('negotiate', scheme)
232
276
        self.assertIs(None, remainder)
233
277
 
234
278
    def test_basic_header(self):
235
279
        scheme, remainder = self.parse_header(
236
280
            'Basic realm="Thou should not pass"')
237
 
        self.assertEquals('basic', scheme)
238
 
        self.assertEquals('realm="Thou should not pass"', remainder)
 
281
        self.assertEqual('basic', scheme)
 
282
        self.assertEqual('realm="Thou should not pass"', remainder)
239
283
 
240
284
    def test_basic_extract_realm(self):
241
285
        scheme, remainder = self.parse_header(
243
287
            _urllib2_wrappers.BasicAuthHandler)
244
288
        match, realm = self.auth_handler.extract_realm(remainder)
245
289
        self.assertTrue(match is not None)
246
 
        self.assertEquals('Thou should not pass', realm)
 
290
        self.assertEqual('Thou should not pass', realm)
247
291
 
248
292
    def test_digest_header(self):
249
293
        scheme, remainder = self.parse_header(
250
294
            'Digest realm="Thou should not pass"')
251
 
        self.assertEquals('digest', scheme)
252
 
        self.assertEquals('realm="Thou should not pass"', remainder)
 
295
        self.assertEqual('digest', scheme)
 
296
        self.assertEqual('realm="Thou should not pass"', remainder)
253
297
 
254
298
 
255
299
class TestHTTPServer(tests.TestCase):
262
306
 
263
307
        server = http_server.HttpServer(BogusRequestHandler)
264
308
        try:
265
 
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
 
309
            self.assertRaises(httplib.UnknownProtocol, server.start_server)
266
310
        except:
267
 
            server.tearDown()
 
311
            server.stop_server()
268
312
            self.fail('HTTP Server creation did not raise UnknownProtocol')
269
313
 
270
314
    def test_force_invalid_protocol(self):
271
315
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
272
316
        try:
273
 
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
 
317
            self.assertRaises(httplib.UnknownProtocol, server.start_server)
274
318
        except:
275
 
            server.tearDown()
 
319
            server.stop_server()
276
320
            self.fail('HTTP Server creation did not raise UnknownProtocol')
277
321
 
278
322
    def test_server_start_and_stop(self):
279
323
        server = http_server.HttpServer()
280
 
        server.setUp()
281
 
        self.assertTrue(server._http_running)
282
 
        server.tearDown()
 
324
        server.start_server()
 
325
        try:
 
326
            self.assertTrue(server._http_running)
 
327
        finally:
 
328
            server.stop_server()
283
329
        self.assertFalse(server._http_running)
284
330
 
285
331
    def test_create_http_server_one_zero(self):
288
334
            protocol_version = 'HTTP/1.0'
289
335
 
290
336
        server = http_server.HttpServer(RequestHandlerOneZero)
291
 
        server.setUp()
292
 
        self.addCleanup(server.tearDown)
 
337
        self.start_server(server)
293
338
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
294
339
 
295
340
    def test_create_http_server_one_one(self):
298
343
            protocol_version = 'HTTP/1.1'
299
344
 
300
345
        server = http_server.HttpServer(RequestHandlerOneOne)
301
 
        server.setUp()
302
 
        self.addCleanup(server.tearDown)
 
346
        self.start_server(server)
303
347
        self.assertIsInstance(server._httpd,
304
348
                              http_server.TestingThreadingHTTPServer)
305
349
 
310
354
 
311
355
        server = http_server.HttpServer(RequestHandlerOneZero,
312
356
                                        protocol_version='HTTP/1.1')
313
 
        server.setUp()
314
 
        self.addCleanup(server.tearDown)
 
357
        self.start_server(server)
315
358
        self.assertIsInstance(server._httpd,
316
359
                              http_server.TestingThreadingHTTPServer)
317
360
 
322
365
 
323
366
        server = http_server.HttpServer(RequestHandlerOneOne,
324
367
                                        protocol_version='HTTP/1.0')
325
 
        server.setUp()
326
 
        self.addCleanup(server.tearDown)
 
368
        self.start_server(server)
327
369
        self.assertIsInstance(server._httpd,
328
370
                              http_server.TestingHTTPServer)
329
371
 
332
374
    """Test case to inherit from if pycurl is present"""
333
375
 
334
376
    def _get_pycurl_maybe(self):
335
 
        try:
336
 
            from bzrlib.transport.http._pycurl import PyCurlTransport
337
 
            return PyCurlTransport
338
 
        except errors.DependencyNotPresent:
339
 
            raise tests.TestSkipped('pycurl not present')
 
377
        self.requireFeature(features.pycurl)
 
378
        return PyCurlTransport
340
379
 
341
380
    _transport = property(_get_pycurl_maybe)
342
381
 
349
388
    def test_url_parsing(self):
350
389
        f = FakeManager()
351
390
        url = http.extract_auth('http://example.com', f)
352
 
        self.assertEquals('http://example.com', url)
353
 
        self.assertEquals(0, len(f.credentials))
 
391
        self.assertEqual('http://example.com', url)
 
392
        self.assertEqual(0, len(f.credentials))
354
393
        url = http.extract_auth(
355
394
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
356
 
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
357
 
        self.assertEquals(1, len(f.credentials))
358
 
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
359
 
                          f.credentials[0])
 
395
        self.assertEqual('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
 
396
        self.assertEqual(1, len(f.credentials))
 
397
        self.assertEqual([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
398
                         f.credentials[0])
360
399
 
361
400
 
362
401
class TestHttpTransportUrls(tests.TestCase):
389
428
    def test_http_impl_urls(self):
390
429
        """There are servers which ask for particular clients to connect"""
391
430
        server = self._server()
 
431
        server.start_server()
392
432
        try:
393
 
            server.setUp()
394
433
            url = server.get_url()
395
434
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
396
435
        finally:
397
 
            server.tearDown()
 
436
            server.stop_server()
398
437
 
399
438
 
400
439
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
409
448
        https by supplying a fake version_info that do not
410
449
        support it.
411
450
        """
412
 
        try:
413
 
            import pycurl
414
 
        except ImportError:
415
 
            raise tests.TestSkipped('pycurl not present')
 
451
        self.requireFeature(features.pycurl)
 
452
        # Import the module locally now that we now it's available.
 
453
        pycurl = features.pycurl.module
416
454
 
417
455
        version_info_orig = pycurl.version_info
418
 
        try:
419
 
            # Now that we have pycurl imported, we can fake its version_info
420
 
            # This was taken from a windows pycurl without SSL
421
 
            # (thanks to bialix)
422
 
            pycurl.version_info = lambda : (2,
423
 
                                            '7.13.2',
424
 
                                            462082,
425
 
                                            'i386-pc-win32',
426
 
                                            2576,
427
 
                                            None,
428
 
                                            0,
429
 
                                            None,
430
 
                                            ('ftp', 'gopher', 'telnet',
431
 
                                             'dict', 'ldap', 'http', 'file'),
432
 
                                            None,
433
 
                                            0,
434
 
                                            None)
435
 
            self.assertRaises(errors.DependencyNotPresent, self._transport,
436
 
                              'https://launchpad.net')
437
 
        finally:
438
 
            # Restore the right function
 
456
        def restore():
439
457
            pycurl.version_info = version_info_orig
 
458
        self.addCleanup(restore)
 
459
 
 
460
        # Fake the pycurl version_info This was taken from a windows pycurl
 
461
        # without SSL (thanks to bialix)
 
462
        pycurl.version_info = lambda : (2,
 
463
                                        '7.13.2',
 
464
                                        462082,
 
465
                                        'i386-pc-win32',
 
466
                                        2576,
 
467
                                        None,
 
468
                                        0,
 
469
                                        None,
 
470
                                        ('ftp', 'gopher', 'telnet',
 
471
                                         'dict', 'ldap', 'http', 'file'),
 
472
                                        None,
 
473
                                        0,
 
474
                                        None)
 
475
        self.assertRaises(errors.DependencyNotPresent, self._transport,
 
476
                          'https://launchpad.net')
440
477
 
441
478
 
442
479
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
501
538
class TestPost(tests.TestCase):
502
539
 
503
540
    def test_post_body_is_received(self):
504
 
        server = RecordingServer(expect_body_tail='end-of-body')
505
 
        server.setUp()
506
 
        self.addCleanup(server.tearDown)
507
 
        scheme = self._qualified_prefix
508
 
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
 
541
        server = RecordingServer(expect_body_tail='end-of-body',
 
542
            scheme=self._qualified_prefix)
 
543
        self.start_server(server)
 
544
        url = server.get_url()
509
545
        http_transport = self._transport(url)
510
546
        code, response = http_transport._post('abc def end-of-body')
511
547
        self.assertTrue(
560
596
                                      protocol_version=self._protocol_version)
561
597
 
562
598
    def _testing_pycurl(self):
563
 
        return pycurl_present and self._transport == PyCurlTransport
 
599
        # TODO: This is duplicated for lots of the classes in this file
 
600
        return (features.pycurl.available()
 
601
                and self._transport == PyCurlTransport)
564
602
 
565
603
 
566
604
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
583
621
        # for details) make no distinction between a closed
584
622
        # socket and badly formatted status line, so we can't
585
623
        # just test for ConnectionError, we have to test
586
 
        # InvalidHttpResponse too.
587
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
624
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
 
625
        # instead of ConnectionError too.
 
626
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
 
627
                            errors.InvalidHttpResponse),
588
628
                          t.has, 'foo/bar')
589
629
 
590
630
    def test_http_get(self):
591
631
        server = self.get_readonly_server()
592
632
        t = self._transport(server.get_url())
593
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
633
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
 
634
                           errors.InvalidHttpResponse),
594
635
                          t.get, 'foo/bar')
595
636
 
596
637
 
672
713
    _req_handler_class = BadProtocolRequestHandler
673
714
 
674
715
    def setUp(self):
675
 
        if pycurl_present and self._transport == PyCurlTransport:
 
716
        if self._testing_pycurl():
676
717
            raise tests.TestNotApplicable(
677
718
                "pycurl doesn't check the protocol version")
678
719
        super(TestBadProtocolServer, self).setUp()
722
763
        self.assertEqual(None, server.host)
723
764
        self.assertEqual(None, server.port)
724
765
 
725
 
    def test_setUp_and_tearDown(self):
 
766
    def test_setUp_and_stop(self):
726
767
        server = RecordingServer(expect_body_tail=None)
727
 
        server.setUp()
 
768
        server.start_server()
728
769
        try:
729
770
            self.assertNotEqual(None, server.host)
730
771
            self.assertNotEqual(None, server.port)
731
772
        finally:
732
 
            server.tearDown()
 
773
            server.stop_server()
733
774
        self.assertEqual(None, server.host)
734
775
        self.assertEqual(None, server.port)
735
776
 
736
777
    def test_send_receive_bytes(self):
737
 
        server = RecordingServer(expect_body_tail='c')
738
 
        server.setUp()
739
 
        self.addCleanup(server.tearDown)
 
778
        server = RecordingServer(expect_body_tail='c', scheme='http')
 
779
        self.start_server(server)
740
780
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
741
781
        sock.connect((server.host, server.port))
742
782
        sock.sendall('abc')
1133
1173
        if self._testing_pycurl():
1134
1174
            # Oh my ! pycurl does not check for the port as part of
1135
1175
            # no_proxy :-( So we just test the host part
1136
 
            self.no_proxy_host = 'localhost'
 
1176
            self.no_proxy_host = self.server.host
1137
1177
        else:
1138
1178
            self.no_proxy_host = self.proxy_address
1139
1179
        # The secondary server is the proxy
1142
1182
        self._old_env = {}
1143
1183
 
1144
1184
    def _testing_pycurl(self):
1145
 
        return pycurl_present and self._transport == PyCurlTransport
 
1185
        # TODO: This is duplicated for lots of the classes in this file
 
1186
        return (features.pycurl.available()
 
1187
                and self._transport == PyCurlTransport)
1146
1188
 
1147
1189
    def create_transport_secondary_server(self):
1148
1190
        """Creates an http server that will serve files with
1321
1363
        self.follow_redirections = True
1322
1364
 
1323
1365
 
 
1366
def install_redirected_request(test):
 
1367
    test.original_class = _urllib2_wrappers.Request
 
1368
    def restore():
 
1369
        _urllib2_wrappers.Request = test.original_class
 
1370
    _urllib2_wrappers.Request = RedirectedRequest
 
1371
    test.addCleanup(restore)
 
1372
 
 
1373
 
1324
1374
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1325
1375
    """Test redirections.
1326
1376
 
1336
1386
    """
1337
1387
 
1338
1388
    def setUp(self):
1339
 
        if pycurl_present and self._transport == PyCurlTransport:
 
1389
        if (features.pycurl.available()
 
1390
            and self._transport == PyCurlTransport):
1340
1391
            raise tests.TestNotApplicable(
1341
1392
                "pycurl doesn't redirect silently annymore")
1342
1393
        super(TestHTTPSilentRedirections, self).setUp()
1343
 
        self.setup_redirected_request()
1344
 
        self.addCleanup(self.cleanup_redirected_request)
 
1394
        install_redirected_request(self)
1345
1395
        self.build_tree_contents([('a','a'),
1346
1396
                                  ('1/',),
1347
1397
                                  ('1/a', 'redirected once'),
1357
1407
 
1358
1408
        self.old_transport = self._transport(self.old_server.get_url())
1359
1409
 
1360
 
    def setup_redirected_request(self):
1361
 
        self.original_class = _urllib2_wrappers.Request
1362
 
        _urllib2_wrappers.Request = RedirectedRequest
1363
 
 
1364
 
    def cleanup_redirected_request(self):
1365
 
        _urllib2_wrappers.Request = self.original_class
1366
 
 
1367
1410
    def create_transport_secondary_server(self):
1368
1411
        """Create the secondary server, redirections are defined in the tests"""
1369
1412
        return http_utils.HTTPServerRedirecting(
1373
1416
        t = self.old_transport
1374
1417
 
1375
1418
        req = RedirectedRequest('GET', t.abspath('a'))
1376
 
        req.follow_redirections = True
1377
1419
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1378
1420
                                       self.new_server.port)
1379
1421
        self.old_server.redirections = \
1380
1422
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
1381
 
        self.assertEquals('redirected once',t._perform(req).read())
 
1423
        self.assertEqual('redirected once',t._perform(req).read())
1382
1424
 
1383
1425
    def test_five_redirections(self):
1384
1426
        t = self.old_transport
1385
1427
 
1386
1428
        req = RedirectedRequest('GET', t.abspath('a'))
1387
 
        req.follow_redirections = True
1388
1429
        old_prefix = 'http://%s:%s' % (self.old_server.host,
1389
1430
                                       self.old_server.port)
1390
1431
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1396
1437
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1397
1438
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1398
1439
            ]
1399
 
        self.assertEquals('redirected 5 times',t._perform(req).read())
 
1440
        self.assertEqual('redirected 5 times',t._perform(req).read())
1400
1441
 
1401
1442
 
1402
1443
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1415
1456
        t = self._transport(self.new_server.get_url())
1416
1457
 
1417
1458
        # We use None for redirected so that we fail if redirected
1418
 
        self.assertEquals('0123456789',
1419
 
                          transport.do_catching_redirections(
 
1459
        self.assertEqual('0123456789',
 
1460
                         transport.do_catching_redirections(
1420
1461
                self.get_a, t, None).read())
1421
1462
 
1422
1463
    def test_one_redirection(self):
1427
1468
            dir, file = urlutils.split(exception.target)
1428
1469
            return self._transport(dir)
1429
1470
 
1430
 
        self.assertEquals('0123456789',
1431
 
                          transport.do_catching_redirections(
 
1471
        self.assertEqual('0123456789',
 
1472
                         transport.do_catching_redirections(
1432
1473
                self.get_a, self.old_transport, redirected).read())
1433
 
        self.assertEquals(1, self.redirections)
 
1474
        self.assertEqual(1, self.redirections)
1434
1475
 
1435
1476
    def test_redirection_loop(self):
1436
1477
 
1451
1492
    _auth_header = 'Authorization'
1452
1493
    _password_prompt_prefix = ''
1453
1494
    _username_prompt_prefix = ''
 
1495
    # Set by load_tests
 
1496
    _auth_server = None
1454
1497
 
1455
1498
    def setUp(self):
1456
1499
        super(TestAuth, self).setUp()
1459
1502
                                  ('b', 'contents of b\n'),])
1460
1503
 
1461
1504
    def create_transport_readonly_server(self):
1462
 
        if self._auth_scheme == 'basic':
1463
 
            server = http_utils.HTTPBasicAuthServer(
1464
 
                protocol_version=self._protocol_version)
1465
 
        else:
1466
 
            if self._auth_scheme != 'digest':
1467
 
                raise AssertionError('Unknown auth scheme: %r'
1468
 
                                     % self._auth_scheme)
1469
 
            server = http_utils.HTTPDigestAuthServer(
1470
 
                protocol_version=self._protocol_version)
1471
 
        return server
 
1505
        return self._auth_server(protocol_version=self._protocol_version)
1472
1506
 
1473
1507
    def _testing_pycurl(self):
1474
 
        return pycurl_present and self._transport == PyCurlTransport
 
1508
        # TODO: This is duplicated for lots of the classes in this file
 
1509
        return (features.pycurl.available()
 
1510
                and self._transport == PyCurlTransport)
1475
1511
 
1476
1512
    def get_user_url(self, user, password):
1477
1513
        """Build an url embedding user and password"""
1534
1570
        self.server.add_user('joe', 'foo')
1535
1571
        t = self.get_user_transport(None, None)
1536
1572
        stdout = tests.StringIOWrapper()
1537
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
 
1573
        stderr = tests.StringIOWrapper()
 
1574
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1575
                                            stdout=stdout, stderr=stderr)
1538
1576
        self.assertEqual('contents of a\n',t.get('a').read())
1539
1577
        # stdin should be empty
1540
1578
        self.assertEqual('', ui.ui_factory.stdin.readline())
1541
 
        stdout.seek(0)
 
1579
        stderr.seek(0)
1542
1580
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1543
 
        self.assertEquals(expected_prompt, stdout.read(len(expected_prompt)))
 
1581
        self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
 
1582
        self.assertEqual('', stdout.getvalue())
1544
1583
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1545
 
                                    stdout.readline())
 
1584
                                    stderr.readline())
1546
1585
 
1547
1586
    def test_prompt_for_password(self):
1548
1587
        if self._testing_pycurl():
1553
1592
        self.server.add_user('joe', 'foo')
1554
1593
        t = self.get_user_transport('joe', None)
1555
1594
        stdout = tests.StringIOWrapper()
1556
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1557
 
        self.assertEqual('contents of a\n',t.get('a').read())
 
1595
        stderr = tests.StringIOWrapper()
 
1596
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1597
                                            stdout=stdout, stderr=stderr)
 
1598
        self.assertEqual('contents of a\n', t.get('a').read())
1558
1599
        # stdin should be empty
1559
1600
        self.assertEqual('', ui.ui_factory.stdin.readline())
1560
1601
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1561
 
                                    stdout.getvalue())
 
1602
                                    stderr.getvalue())
 
1603
        self.assertEqual('', stdout.getvalue())
1562
1604
        # And we shouldn't prompt again for a different request
1563
1605
        # against the same transport.
1564
1606
        self.assertEqual('contents of b\n',t.get('b').read())
1574
1616
                              % (scheme.upper(),
1575
1617
                                 user, self.server.host, self.server.port,
1576
1618
                                 self.server.auth_realm)))
1577
 
        self.assertEquals(expected_prompt, actual_prompt)
 
1619
        self.assertEqual(expected_prompt, actual_prompt)
1578
1620
 
1579
1621
    def _expected_username_prompt(self, scheme):
1580
1622
        return (self._username_prompt_prefix
1594
1636
        self.server.add_user(user, password)
1595
1637
        t = self.get_user_transport(user, None)
1596
1638
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1597
 
                                            stdout=tests.StringIOWrapper())
 
1639
                                            stderr=tests.StringIOWrapper())
1598
1640
        # Create a minimal config file with the right password
1599
1641
        conf = config.AuthenticationConfig()
1600
1642
        conf._get_config().update(
1628
1670
        self.assertEqual(1, self.server.auth_required_errors)
1629
1671
 
1630
1672
    def test_changing_nonce(self):
1631
 
        if self._auth_scheme != 'digest':
1632
 
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1673
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1674
                                     http_utils.ProxyDigestAuthServer):
 
1675
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1633
1676
        if self._testing_pycurl():
1634
1677
            raise tests.KnownFailure(
1635
1678
                'pycurl does not handle a nonce change')
1667
1710
                                  ('b-proxied', 'contents of b\n'),
1668
1711
                                  ])
1669
1712
 
1670
 
    def create_transport_readonly_server(self):
1671
 
        if self._auth_scheme == 'basic':
1672
 
            server = http_utils.ProxyBasicAuthServer(
1673
 
                protocol_version=self._protocol_version)
1674
 
        else:
1675
 
            if self._auth_scheme != 'digest':
1676
 
                raise AssertionError('Unknown auth scheme: %r'
1677
 
                                     % self._auth_scheme)
1678
 
            server = http_utils.ProxyDigestAuthServer(
1679
 
                protocol_version=self._protocol_version)
1680
 
        return server
1681
 
 
1682
1713
    def get_user_transport(self, user, password):
1683
1714
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1684
1715
        return self._transport(self.server.get_url())
1823
1854
                             'http://www.example.com/foo/subdir')
1824
1855
        self.assertIsInstance(r, type(t))
1825
1856
        # Both transports share the some connection
1826
 
        self.assertEquals(t._get_connection(), r._get_connection())
 
1857
        self.assertEqual(t._get_connection(), r._get_connection())
1827
1858
 
1828
1859
    def test_redirected_to_self_with_slash(self):
1829
1860
        t = self._transport('http://www.example.com/foo')
1833
1864
        # Both transports share the some connection (one can argue that we
1834
1865
        # should return the exact same transport here, but that seems
1835
1866
        # overkill).
1836
 
        self.assertEquals(t._get_connection(), r._get_connection())
 
1867
        self.assertEqual(t._get_connection(), r._get_connection())
1837
1868
 
1838
1869
    def test_redirected_to_host(self):
1839
1870
        t = self._transport('http://www.example.com/foo')
1858
1889
        r = t._redirected_to('http://www.example.com/foo',
1859
1890
                             'https://foo.example.com/foo')
1860
1891
        self.assertIsInstance(r, type(t))
1861
 
        self.assertEquals(t._user, r._user)
 
1892
        self.assertEqual(t._user, r._user)
1862
1893
 
1863
1894
 
1864
1895
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1923
1954
        pass
1924
1955
 
1925
1956
 
1926
 
class TestActivity(tests.TestCase):
 
1957
class TestActivityMixin(object):
1927
1958
    """Test socket activity reporting.
1928
1959
 
1929
1960
    We use a special purpose server to control the bytes sent and received and
1933
1964
    def setUp(self):
1934
1965
        tests.TestCase.setUp(self)
1935
1966
        self.server = self._activity_server(self._protocol_version)
1936
 
        self.server.setUp()
 
1967
        self.server.start_server()
1937
1968
        self.activities = {}
1938
1969
        def report_activity(t, bytes, direction):
1939
1970
            count = self.activities.get(direction, 0)
1948
1979
 
1949
1980
    def tearDown(self):
1950
1981
        self._transport._report_activity = self.orig_report_activity
1951
 
        self.server.tearDown()
 
1982
        self.server.stop_server()
1952
1983
        tests.TestCase.tearDown(self)
1953
1984
 
1954
1985
    def get_transport(self):
2067
2098
        code, f = t._post('abc def end-of-body\n')
2068
2099
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2069
2100
        self.assertActivitiesMatch()
 
2101
 
 
2102
 
 
2103
class TestActivity(tests.TestCase, TestActivityMixin):
 
2104
 
 
2105
    def setUp(self):
 
2106
        tests.TestCase.setUp(self)
 
2107
        self.server = self._activity_server(self._protocol_version)
 
2108
        self.server.start_server()
 
2109
        self.activities = {}
 
2110
        def report_activity(t, bytes, direction):
 
2111
            count = self.activities.get(direction, 0)
 
2112
            count += bytes
 
2113
            self.activities[direction] = count
 
2114
 
 
2115
        # We override at class level because constructors may propagate the
 
2116
        # bound method and render instance overriding ineffective (an
 
2117
        # alternative would be to define a specific ui factory instead...)
 
2118
        self.orig_report_activity = self._transport._report_activity
 
2119
        self._transport._report_activity = report_activity
 
2120
 
 
2121
    def tearDown(self):
 
2122
        self._transport._report_activity = self.orig_report_activity
 
2123
        self.server.stop_server()
 
2124
        tests.TestCase.tearDown(self)
 
2125
 
 
2126
 
 
2127
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
 
2128
 
 
2129
    def setUp(self):
 
2130
        tests.TestCase.setUp(self)
 
2131
        # Unlike TestActivity, we are really testing ReportingFileSocket and
 
2132
        # ReportingSocket, so we don't need all the parametrization. Since
 
2133
        # ReportingFileSocket and ReportingSocket are wrappers, it's easier to
 
2134
        # test them through their use by the transport than directly (that's a
 
2135
        # bit less clean but far more simpler and effective).
 
2136
        self.server = ActivityHTTPServer('HTTP/1.1')
 
2137
        self._transport=_urllib.HttpTransport_urllib
 
2138
 
 
2139
        self.server.start_server()
 
2140
 
 
2141
        # We override at class level because constructors may propagate the
 
2142
        # bound method and render instance overriding ineffective (an
 
2143
        # alternative would be to define a specific ui factory instead...)
 
2144
        self.orig_report_activity = self._transport._report_activity
 
2145
        self._transport._report_activity = None
 
2146
 
 
2147
    def tearDown(self):
 
2148
        self._transport._report_activity = self.orig_report_activity
 
2149
        self.server.stop_server()
 
2150
        tests.TestCase.tearDown(self)
 
2151
 
 
2152
    def assertActivitiesMatch(self):
 
2153
        # Nothing to check here
 
2154
        pass
 
2155
 
 
2156
 
 
2157
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
 
2158
    """Test authentication on the redirected http server."""
 
2159
 
 
2160
    _auth_header = 'Authorization'
 
2161
    _password_prompt_prefix = ''
 
2162
    _username_prompt_prefix = ''
 
2163
    _auth_server = http_utils.HTTPBasicAuthServer
 
2164
    _transport = _urllib.HttpTransport_urllib
 
2165
 
 
2166
    def create_transport_readonly_server(self):
 
2167
        return self._auth_server()
 
2168
 
 
2169
    def create_transport_secondary_server(self):
 
2170
        """Create the secondary server redirecting to the primary server"""
 
2171
        new = self.get_readonly_server()
 
2172
 
 
2173
        redirecting = http_utils.HTTPServerRedirecting()
 
2174
        redirecting.redirect_to(new.host, new.port)
 
2175
        return redirecting
 
2176
 
 
2177
    def setUp(self):
 
2178
        super(TestAuthOnRedirected, self).setUp()
 
2179
        self.build_tree_contents([('a','a'),
 
2180
                                  ('1/',),
 
2181
                                  ('1/a', 'redirected once'),
 
2182
                                  ],)
 
2183
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
2184
                                       self.new_server.port)
 
2185
        self.old_server.redirections = [
 
2186
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
2187
        self.old_transport = self._transport(self.old_server.get_url())
 
2188
        self.new_server.add_user('joe', 'foo')
 
2189
 
 
2190
    def get_a(self, transport):
 
2191
        return transport.get('a')
 
2192
 
 
2193
    def test_auth_on_redirected_via_do_catching_redirections(self):
 
2194
        self.redirections = 0
 
2195
 
 
2196
        def redirected(transport, exception, redirection_notice):
 
2197
            self.redirections += 1
 
2198
            dir, file = urlutils.split(exception.target)
 
2199
            return self._transport(dir)
 
2200
 
 
2201
        stdout = tests.StringIOWrapper()
 
2202
        stderr = tests.StringIOWrapper()
 
2203
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
2204
                                            stdout=stdout, stderr=stderr)
 
2205
        self.assertEqual('redirected once',
 
2206
                         transport.do_catching_redirections(
 
2207
                self.get_a, self.old_transport, redirected).read())
 
2208
        self.assertEqual(1, self.redirections)
 
2209
        # stdin should be empty
 
2210
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
2211
        # stdout should be empty, stderr will contains the prompts
 
2212
        self.assertEqual('', stdout.getvalue())
 
2213
 
 
2214
    def test_auth_on_redirected_via_following_redirections(self):
 
2215
        self.new_server.add_user('joe', 'foo')
 
2216
        stdout = tests.StringIOWrapper()
 
2217
        stderr = tests.StringIOWrapper()
 
2218
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
2219
                                            stdout=stdout, stderr=stderr)
 
2220
        t = self.old_transport
 
2221
        req = RedirectedRequest('GET', t.abspath('a'))
 
2222
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
2223
                                       self.new_server.port)
 
2224
        self.old_server.redirections = [
 
2225
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
2226
        self.assertEqual('redirected once',t._perform(req).read())
 
2227
        # stdin should be empty
 
2228
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
2229
        # stdout should be empty, stderr will contains the prompts
 
2230
        self.assertEqual('', stdout.getvalue())
 
2231
 
 
2232