~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Ian Clatworthy
  • Date: 2009-01-19 02:24:15 UTC
  • mto: This revision was merged to the branch mainline in revision 3944.
  • Revision ID: ian.clatworthy@canonical.com-20090119022415-mo0mcfeiexfktgwt
apply jam's log --short fix (Ian Clatworthy)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
# FIXME: This test should be repeated for each available http client
18
 
# implementation; at the moment we have urllib and pycurl.
 
17
"""Tests for HTTP implementations.
 
18
 
 
19
This module defines a load_tests() method that parametrize tests classes for
 
20
transport implementation, http protocol versions and authentication schemes.
 
21
"""
19
22
 
20
23
# TODO: Should be renamed to bzrlib.transport.http.tests?
 
24
# TODO: What about renaming to bzrlib.tests.transport.http ?
 
25
 
 
26
from cStringIO import StringIO
 
27
import httplib
 
28
import os
 
29
import select
 
30
import SimpleHTTPServer
 
31
import socket
 
32
import sys
 
33
import threading
21
34
 
22
35
import bzrlib
23
 
from bzrlib.errors import DependencyNotPresent
24
 
from bzrlib.tests import TestCase, TestSkipped
25
 
from bzrlib.transport import Transport
26
 
from bzrlib.transport.http import extract_auth
27
 
from bzrlib.transport.http._urllib import HttpTransport_urllib
28
 
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
29
 
 
30
 
class FakeManager (object):
 
36
from bzrlib import (
 
37
    bzrdir,
 
38
    config,
 
39
    errors,
 
40
    osutils,
 
41
    remote as _mod_remote,
 
42
    tests,
 
43
    transport,
 
44
    ui,
 
45
    urlutils,
 
46
    )
 
47
from bzrlib.tests import (
 
48
    http_server,
 
49
    http_utils,
 
50
    )
 
51
from bzrlib.transport import (
 
52
    http,
 
53
    remote,
 
54
    )
 
55
from bzrlib.transport.http import (
 
56
    _urllib,
 
57
    _urllib2_wrappers,
 
58
    )
 
59
 
 
60
 
 
61
try:
 
62
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
63
    pycurl_present = True
 
64
except errors.DependencyNotPresent:
 
65
    pycurl_present = False
 
66
 
 
67
 
 
68
class TransportAdapter(tests.TestScenarioApplier):
 
69
    """Generate the same test for each transport implementation."""
 
70
 
 
71
    def __init__(self):
 
72
        transport_scenarios = [
 
73
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
74
                            _server=http_server.HttpServer_urllib,
 
75
                            _qualified_prefix='http+urllib',)),
 
76
            ]
 
77
        if pycurl_present:
 
78
            transport_scenarios.append(
 
79
                ('pycurl', dict(_transport=PyCurlTransport,
 
80
                                _server=http_server.HttpServer_PyCurl,
 
81
                                _qualified_prefix='http+pycurl',)))
 
82
        self.scenarios = transport_scenarios
 
83
 
 
84
 
 
85
class TransportProtocolAdapter(TransportAdapter):
 
86
    """Generate the same test for each protocol implementation.
 
87
 
 
88
    In addition to the transport adaptatation that we inherit from.
 
89
    """
 
90
 
 
91
    def __init__(self):
 
92
        super(TransportProtocolAdapter, self).__init__()
 
93
        protocol_scenarios = [
 
94
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
95
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
96
            ]
 
97
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
98
                                                  protocol_scenarios)
 
99
 
 
100
 
 
101
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
 
102
    """Generate the same test for each authentication scheme implementation.
 
103
 
 
104
    In addition to the protocol adaptatation that we inherit from.
 
105
    """
 
106
 
 
107
    def __init__(self):
 
108
        super(TransportProtocolAuthenticationAdapter, self).__init__()
 
109
        auth_scheme_scenarios = [
 
110
            ('basic', dict(_auth_scheme='basic')),
 
111
            ('digest', dict(_auth_scheme='digest')),
 
112
            ]
 
113
 
 
114
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
115
                                                  auth_scheme_scenarios)
 
116
 
 
117
def load_tests(standard_tests, module, loader):
 
118
    """Multiply tests for http clients and protocol versions."""
 
119
    # one for each transport
 
120
    t_adapter = TransportAdapter()
 
121
    t_classes= (TestHttpTransportRegistration,
 
122
                TestHttpTransportUrls,
 
123
                Test_redirected_to,
 
124
                )
 
125
    is_testing_for_transports = tests.condition_isinstance(t_classes)
 
126
 
 
127
    # multiplied by one for each protocol version
 
128
    tp_adapter = TransportProtocolAdapter()
 
129
    tp_classes= (SmartHTTPTunnellingTest,
 
130
                 TestDoCatchRedirections,
 
131
                 TestHTTPConnections,
 
132
                 TestHTTPRedirections,
 
133
                 TestHTTPSilentRedirections,
 
134
                 TestLimitedRangeRequestServer,
 
135
                 TestPost,
 
136
                 TestProxyHttpServer,
 
137
                 TestRanges,
 
138
                 TestSpecificRequestHandler,
 
139
                 )
 
140
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
 
141
 
 
142
    # multiplied by one for each authentication scheme
 
143
    tpa_adapter = TransportProtocolAuthenticationAdapter()
 
144
    tpa_classes = (TestAuth,
 
145
                   )
 
146
    is_also_testing_for_authentication = tests.condition_isinstance(
 
147
        tpa_classes)
 
148
 
 
149
    result = loader.suiteClass()
 
150
    for test_class in tests.iter_suite_tests(standard_tests):
 
151
        # Each test class is either standalone or testing for some combination
 
152
        # of transport, protocol version, authentication scheme. Use the right
 
153
        # adpater (or none) depending on the class.
 
154
        if is_testing_for_transports(test_class):
 
155
            result.addTests(t_adapter.adapt(test_class))
 
156
        elif is_also_testing_for_protocols(test_class):
 
157
            result.addTests(tp_adapter.adapt(test_class))
 
158
        elif is_also_testing_for_authentication(test_class):
 
159
            result.addTests(tpa_adapter.adapt(test_class))
 
160
        else:
 
161
            result.addTest(test_class)
 
162
    return result
 
163
 
 
164
 
 
165
class FakeManager(object):
 
166
 
31
167
    def __init__(self):
32
168
        self.credentials = []
33
 
        
 
169
 
34
170
    def add_password(self, realm, host, username, password):
35
171
        self.credentials.append([realm, host, username, password])
36
172
 
37
173
 
38
 
class TestHttpUrls(TestCase):
 
174
class RecordingServer(object):
 
175
    """A fake HTTP server.
 
176
    
 
177
    It records the bytes sent to it, and replies with a 200.
 
178
    """
 
179
 
 
180
    def __init__(self, expect_body_tail=None):
 
181
        """Constructor.
 
182
 
 
183
        :type expect_body_tail: str
 
184
        :param expect_body_tail: a reply won't be sent until this string is
 
185
            received.
 
186
        """
 
187
        self._expect_body_tail = expect_body_tail
 
188
        self.host = None
 
189
        self.port = None
 
190
        self.received_bytes = ''
 
191
 
 
192
    def setUp(self):
 
193
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
194
        self._sock.bind(('127.0.0.1', 0))
 
195
        self.host, self.port = self._sock.getsockname()
 
196
        self._ready = threading.Event()
 
197
        self._thread = threading.Thread(target=self._accept_read_and_reply)
 
198
        self._thread.setDaemon(True)
 
199
        self._thread.start()
 
200
        self._ready.wait(5)
 
201
 
 
202
    def _accept_read_and_reply(self):
 
203
        self._sock.listen(1)
 
204
        self._ready.set()
 
205
        self._sock.settimeout(5)
 
206
        try:
 
207
            conn, address = self._sock.accept()
 
208
            # On win32, the accepted connection will be non-blocking to start
 
209
            # with because we're using settimeout.
 
210
            conn.setblocking(True)
 
211
            while not self.received_bytes.endswith(self._expect_body_tail):
 
212
                self.received_bytes += conn.recv(4096)
 
213
            conn.sendall('HTTP/1.1 200 OK\r\n')
 
214
        except socket.timeout:
 
215
            # Make sure the client isn't stuck waiting for us to e.g. accept.
 
216
            self._sock.close()
 
217
        except socket.error:
 
218
            # The client may have already closed the socket.
 
219
            pass
 
220
 
 
221
    def tearDown(self):
 
222
        try:
 
223
            self._sock.close()
 
224
        except socket.error:
 
225
            # We might have already closed it.  We don't care.
 
226
            pass
 
227
        self.host = None
 
228
        self.port = None
 
229
 
 
230
 
 
231
class TestHTTPServer(tests.TestCase):
 
232
    """Test the HTTP servers implementations."""
 
233
 
 
234
    def test_invalid_protocol(self):
 
235
        class BogusRequestHandler(http_server.TestingHTTPRequestHandler):
 
236
 
 
237
            protocol_version = 'HTTP/0.1'
 
238
 
 
239
        server = http_server.HttpServer(BogusRequestHandler)
 
240
        try:
 
241
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
 
242
        except:
 
243
            server.tearDown()
 
244
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
245
 
 
246
    def test_force_invalid_protocol(self):
 
247
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
 
248
        try:
 
249
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
 
250
        except:
 
251
            server.tearDown()
 
252
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
253
 
 
254
    def test_server_start_and_stop(self):
 
255
        server = http_server.HttpServer()
 
256
        server.setUp()
 
257
        self.assertTrue(server._http_running)
 
258
        server.tearDown()
 
259
        self.assertFalse(server._http_running)
 
260
 
 
261
    def test_create_http_server_one_zero(self):
 
262
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
263
 
 
264
            protocol_version = 'HTTP/1.0'
 
265
 
 
266
        server = http_server.HttpServer(RequestHandlerOneZero)
 
267
        server.setUp()
 
268
        self.addCleanup(server.tearDown)
 
269
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
 
270
 
 
271
    def test_create_http_server_one_one(self):
 
272
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
273
 
 
274
            protocol_version = 'HTTP/1.1'
 
275
 
 
276
        server = http_server.HttpServer(RequestHandlerOneOne)
 
277
        server.setUp()
 
278
        self.addCleanup(server.tearDown)
 
279
        self.assertIsInstance(server._httpd,
 
280
                              http_server.TestingThreadingHTTPServer)
 
281
 
 
282
    def test_create_http_server_force_one_one(self):
 
283
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
284
 
 
285
            protocol_version = 'HTTP/1.0'
 
286
 
 
287
        server = http_server.HttpServer(RequestHandlerOneZero,
 
288
                                        protocol_version='HTTP/1.1')
 
289
        server.setUp()
 
290
        self.addCleanup(server.tearDown)
 
291
        self.assertIsInstance(server._httpd,
 
292
                              http_server.TestingThreadingHTTPServer)
 
293
 
 
294
    def test_create_http_server_force_one_zero(self):
 
295
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
296
 
 
297
            protocol_version = 'HTTP/1.1'
 
298
 
 
299
        server = http_server.HttpServer(RequestHandlerOneOne,
 
300
                                        protocol_version='HTTP/1.0')
 
301
        server.setUp()
 
302
        self.addCleanup(server.tearDown)
 
303
        self.assertIsInstance(server._httpd,
 
304
                              http_server.TestingHTTPServer)
 
305
 
 
306
 
 
307
class TestWithTransport_pycurl(object):
 
308
    """Test case to inherit from if pycurl is present"""
 
309
 
 
310
    def _get_pycurl_maybe(self):
 
311
        try:
 
312
            from bzrlib.transport.http._pycurl import PyCurlTransport
 
313
            return PyCurlTransport
 
314
        except errors.DependencyNotPresent:
 
315
            raise tests.TestSkipped('pycurl not present')
 
316
 
 
317
    _transport = property(_get_pycurl_maybe)
 
318
 
 
319
 
 
320
class TestHttpUrls(tests.TestCase):
 
321
 
 
322
    # TODO: This should be moved to authorization tests once they
 
323
    # are written.
 
324
 
39
325
    def test_url_parsing(self):
40
326
        f = FakeManager()
41
 
        url = extract_auth('http://example.com', f)
 
327
        url = http.extract_auth('http://example.com', f)
42
328
        self.assertEquals('http://example.com', url)
43
329
        self.assertEquals(0, len(f.credentials))
44
 
        url = extract_auth('http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
 
330
        url = http.extract_auth(
 
331
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
45
332
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
46
333
        self.assertEquals(1, len(f.credentials))
47
 
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'], f.credentials[0])
48
 
        
 
334
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
335
                          f.credentials[0])
 
336
 
 
337
 
 
338
class TestHttpTransportUrls(tests.TestCase):
 
339
    """Test the http urls."""
 
340
 
49
341
    def test_abs_url(self):
50
342
        """Construction of absolute http URLs"""
51
 
        t = HttpTransport_urllib('http://bazaar-vcs.org/bzr/bzr.dev/')
 
343
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
52
344
        eq = self.assertEqualDiff
53
 
        eq(t.abspath('.'),
54
 
           'http://bazaar-vcs.org/bzr/bzr.dev')
55
 
        eq(t.abspath('foo/bar'), 
56
 
           'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
57
 
        eq(t.abspath('.bzr'),
58
 
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
 
345
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
 
346
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
 
347
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
59
348
        eq(t.abspath('.bzr/1//2/./3'),
60
349
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
61
350
 
62
351
    def test_invalid_http_urls(self):
63
352
        """Trap invalid construction of urls"""
64
 
        t = HttpTransport_urllib('http://bazaar-vcs.org/bzr/bzr.dev/')
65
 
        self.assertRaises(ValueError,
66
 
            t.abspath,
67
 
            '.bzr/')
68
 
        self.assertRaises(ValueError,
69
 
            t.abspath,
70
 
            '/.bzr')
 
353
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
354
        self.assertRaises(errors.InvalidURL,
 
355
                          self._transport,
 
356
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
71
357
 
72
358
    def test_http_root_urls(self):
73
359
        """Construction of URLs from server root"""
74
 
        t = HttpTransport_urllib('http://bzr.ozlabs.org/')
 
360
        t = self._transport('http://bzr.ozlabs.org/')
75
361
        eq = self.assertEqualDiff
76
362
        eq(t.abspath('.bzr/tree-version'),
77
363
           'http://bzr.ozlabs.org/.bzr/tree-version')
78
364
 
79
365
    def test_http_impl_urls(self):
80
366
        """There are servers which ask for particular clients to connect"""
81
 
        try:
82
 
            from bzrlib.transport.http._pycurl import HttpServer_PyCurl
83
 
            server = HttpServer_PyCurl()
84
 
            try:
85
 
                server.setUp()
86
 
                url = server.get_url()
87
 
                self.assertTrue(url.startswith('http+pycurl://'))
88
 
            finally:
89
 
                server.tearDown()
90
 
        except DependencyNotPresent:
91
 
            raise TestSkipped('pycurl not present')
92
 
 
93
 
class TestHttpMixins(object):
94
 
 
95
 
    def _prep_tree(self):
96
 
        self.build_tree(['xxx', 'foo/', 'foo/bar'], line_endings='binary',
 
367
        server = self._server()
 
368
        try:
 
369
            server.setUp()
 
370
            url = server.get_url()
 
371
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
 
372
        finally:
 
373
            server.tearDown()
 
374
 
 
375
 
 
376
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
 
377
 
 
378
    # TODO: This should really be moved into another pycurl
 
379
    # specific test. When https tests will be implemented, take
 
380
    # this one into account.
 
381
    def test_pycurl_without_https_support(self):
 
382
        """Test that pycurl without SSL do not fail with a traceback.
 
383
 
 
384
        For the purpose of the test, we force pycurl to ignore
 
385
        https by supplying a fake version_info that do not
 
386
        support it.
 
387
        """
 
388
        try:
 
389
            import pycurl
 
390
        except ImportError:
 
391
            raise tests.TestSkipped('pycurl not present')
 
392
 
 
393
        version_info_orig = pycurl.version_info
 
394
        try:
 
395
            # Now that we have pycurl imported, we can fake its version_info
 
396
            # This was taken from a windows pycurl without SSL
 
397
            # (thanks to bialix)
 
398
            pycurl.version_info = lambda : (2,
 
399
                                            '7.13.2',
 
400
                                            462082,
 
401
                                            'i386-pc-win32',
 
402
                                            2576,
 
403
                                            None,
 
404
                                            0,
 
405
                                            None,
 
406
                                            ('ftp', 'gopher', 'telnet',
 
407
                                             'dict', 'ldap', 'http', 'file'),
 
408
                                            None,
 
409
                                            0,
 
410
                                            None)
 
411
            self.assertRaises(errors.DependencyNotPresent, self._transport,
 
412
                              'https://launchpad.net')
 
413
        finally:
 
414
            # Restore the right function
 
415
            pycurl.version_info = version_info_orig
 
416
 
 
417
 
 
418
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
 
419
    """Test the http connections."""
 
420
 
 
421
    def setUp(self):
 
422
        http_utils.TestCaseWithWebserver.setUp(self)
 
423
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
97
424
                        transport=self.get_transport())
98
425
 
99
426
    def test_http_has(self):
101
428
        t = self._transport(server.get_url())
102
429
        self.assertEqual(t.has('foo/bar'), True)
103
430
        self.assertEqual(len(server.logs), 1)
104
 
        self.assertContainsRe(server.logs[0], 
 
431
        self.assertContainsRe(server.logs[0],
105
432
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
106
433
 
107
434
    def test_http_has_not_found(self):
108
435
        server = self.get_readonly_server()
109
436
        t = self._transport(server.get_url())
110
437
        self.assertEqual(t.has('not-found'), False)
111
 
        self.assertContainsRe(server.logs[1], 
 
438
        self.assertContainsRe(server.logs[1],
112
439
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
113
440
 
114
441
    def test_http_get(self):
120
447
            'contents of foo/bar\n')
121
448
        self.assertEqual(len(server.logs), 1)
122
449
        self.assertTrue(server.logs[0].find(
123
 
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s' % bzrlib.__version__) > -1)
124
 
 
125
 
 
126
 
class TestHttpConnections_urllib(TestCaseWithWebserver, TestHttpMixins):
127
 
    _transport = HttpTransport_urllib
128
 
 
129
 
    def setUp(self):
130
 
        TestCaseWithWebserver.setUp(self)
131
 
        self._prep_tree()
132
 
 
133
 
 
134
 
 
135
 
class TestHttpConnections_pycurl(TestCaseWithWebserver, TestHttpMixins):
136
 
 
137
 
    def _get_pycurl_maybe(self):
 
450
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
 
451
            % bzrlib.__version__) > -1)
 
452
 
 
453
    def test_has_on_bogus_host(self):
 
454
        # Get a free address and don't 'accept' on it, so that we
 
455
        # can be sure there is no http handler there, but set a
 
456
        # reasonable timeout to not slow down tests too much.
 
457
        default_timeout = socket.getdefaulttimeout()
138
458
        try:
139
 
            from bzrlib.transport.http._pycurl import PyCurlTransport
140
 
            return PyCurlTransport
141
 
        except DependencyNotPresent:
142
 
            raise TestSkipped('pycurl not present')
143
 
 
144
 
    _transport = property(_get_pycurl_maybe)
145
 
 
146
 
    def setUp(self):
147
 
        TestCaseWithWebserver.setUp(self)
148
 
        self._prep_tree()
149
 
 
150
 
 
151
 
 
152
 
class TestHttpTransportRegistration(TestCase):
 
459
            socket.setdefaulttimeout(2)
 
460
            s = socket.socket()
 
461
            s.bind(('localhost', 0))
 
462
            t = self._transport('http://%s:%s/' % s.getsockname())
 
463
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
 
464
        finally:
 
465
            socket.setdefaulttimeout(default_timeout)
 
466
 
 
467
 
 
468
class TestHttpTransportRegistration(tests.TestCase):
153
469
    """Test registrations of various http implementations"""
154
470
 
155
471
    def test_http_registered(self):
156
 
        import bzrlib.transport.http._urllib
157
 
        from bzrlib.transport import get_transport
158
 
        # urlllib should always be present
159
 
        t = get_transport('http+urllib://bzr.google.com/')
160
 
        self.assertIsInstance(t, Transport)
161
 
        self.assertIsInstance(t, bzrlib.transport.http._urllib.HttpTransport_urllib)
 
472
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
 
473
        self.assertIsInstance(t, transport.Transport)
 
474
        self.assertIsInstance(t, self._transport)
 
475
 
 
476
 
 
477
class TestPost(tests.TestCase):
 
478
 
 
479
    def test_post_body_is_received(self):
 
480
        server = RecordingServer(expect_body_tail='end-of-body')
 
481
        server.setUp()
 
482
        self.addCleanup(server.tearDown)
 
483
        scheme = self._qualified_prefix
 
484
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
 
485
        http_transport = self._transport(url)
 
486
        code, response = http_transport._post('abc def end-of-body')
 
487
        self.assertTrue(
 
488
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
 
489
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
 
490
        # The transport should not be assuming that the server can accept
 
491
        # chunked encoding the first time it connects, because HTTP/1.1, so we
 
492
        # check for the literal string.
 
493
        self.assertTrue(
 
494
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
 
495
 
 
496
 
 
497
class TestRangeHeader(tests.TestCase):
 
498
    """Test range_header method"""
 
499
 
 
500
    def check_header(self, value, ranges=[], tail=0):
 
501
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
502
        coalesce = transport.Transport._coalesce_offsets
 
503
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
504
        range_header = http.HttpTransportBase._range_header
 
505
        self.assertEqual(value, range_header(coalesced, tail))
 
506
 
 
507
    def test_range_header_single(self):
 
508
        self.check_header('0-9', ranges=[(0,9)])
 
509
        self.check_header('100-109', ranges=[(100,109)])
 
510
 
 
511
    def test_range_header_tail(self):
 
512
        self.check_header('-10', tail=10)
 
513
        self.check_header('-50', tail=50)
 
514
 
 
515
    def test_range_header_multi(self):
 
516
        self.check_header('0-9,100-200,300-5000',
 
517
                          ranges=[(0,9), (100, 200), (300,5000)])
 
518
 
 
519
    def test_range_header_mixed(self):
 
520
        self.check_header('0-9,300-5000,-50',
 
521
                          ranges=[(0,9), (300,5000)],
 
522
                          tail=50)
 
523
 
 
524
 
 
525
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
 
526
    """Tests a specific request handler.
 
527
 
 
528
    Daughter classes are expected to override _req_handler_class
 
529
    """
 
530
 
 
531
    # Provide a useful default
 
532
    _req_handler_class = http_server.TestingHTTPRequestHandler
 
533
 
 
534
    def create_transport_readonly_server(self):
 
535
        return http_server.HttpServer(self._req_handler_class,
 
536
                                      protocol_version=self._protocol_version)
 
537
 
 
538
    def _testing_pycurl(self):
 
539
        return pycurl_present and self._transport == PyCurlTransport
 
540
 
 
541
 
 
542
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
 
543
    """Whatever request comes in, close the connection"""
 
544
 
 
545
    def handle_one_request(self):
 
546
        """Handle a single HTTP request, by abruptly closing the connection"""
 
547
        self.close_connection = 1
 
548
 
 
549
 
 
550
class TestWallServer(TestSpecificRequestHandler):
 
551
    """Tests exceptions during the connection phase"""
 
552
 
 
553
    _req_handler_class = WallRequestHandler
 
554
 
 
555
    def test_http_has(self):
 
556
        server = self.get_readonly_server()
 
557
        t = self._transport(server.get_url())
 
558
        # Unfortunately httplib (see HTTPResponse._read_status
 
559
        # for details) make no distinction between a closed
 
560
        # socket and badly formatted status line, so we can't
 
561
        # just test for ConnectionError, we have to test
 
562
        # InvalidHttpResponse too.
 
563
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
564
                          t.has, 'foo/bar')
 
565
 
 
566
    def test_http_get(self):
 
567
        server = self.get_readonly_server()
 
568
        t = self._transport(server.get_url())
 
569
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
570
                          t.get, 'foo/bar')
 
571
 
 
572
 
 
573
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
574
    """Whatever request comes in, returns a bad status"""
 
575
 
 
576
    def parse_request(self):
 
577
        """Fakes handling a single HTTP request, returns a bad status"""
 
578
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
579
        self.send_response(0, "Bad status")
 
580
        self.close_connection = 1
 
581
        return False
 
582
 
 
583
 
 
584
class TestBadStatusServer(TestSpecificRequestHandler):
 
585
    """Tests bad status from server."""
 
586
 
 
587
    _req_handler_class = BadStatusRequestHandler
 
588
 
 
589
    def test_http_has(self):
 
590
        server = self.get_readonly_server()
 
591
        t = self._transport(server.get_url())
 
592
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
593
 
 
594
    def test_http_get(self):
 
595
        server = self.get_readonly_server()
 
596
        t = self._transport(server.get_url())
 
597
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
598
 
 
599
 
 
600
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
601
    """Whatever request comes in, returns an invalid status"""
 
602
 
 
603
    def parse_request(self):
 
604
        """Fakes handling a single HTTP request, returns a bad status"""
 
605
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
606
        self.wfile.write("Invalid status line\r\n")
 
607
        return False
 
608
 
 
609
 
 
610
class TestInvalidStatusServer(TestBadStatusServer):
 
611
    """Tests invalid status from server.
 
612
 
 
613
    Both implementations raises the same error as for a bad status.
 
614
    """
 
615
 
 
616
    _req_handler_class = InvalidStatusRequestHandler
 
617
 
 
618
    def test_http_has(self):
 
619
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
620
            raise tests.KnownFailure(
 
621
                'pycurl hangs if the server send back garbage')
 
622
        super(TestInvalidStatusServer, self).test_http_has()
 
623
 
 
624
    def test_http_get(self):
 
625
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
626
            raise tests.KnownFailure(
 
627
                'pycurl hangs if the server send back garbage')
 
628
        super(TestInvalidStatusServer, self).test_http_get()
 
629
 
 
630
 
 
631
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
 
632
    """Whatever request comes in, returns a bad protocol version"""
 
633
 
 
634
    def parse_request(self):
 
635
        """Fakes handling a single HTTP request, returns a bad status"""
 
636
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
637
        # Returns an invalid protocol version, but curl just
 
638
        # ignores it and those cannot be tested.
 
639
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
 
640
                                           404,
 
641
                                           'Look at my protocol version'))
 
642
        return False
 
643
 
 
644
 
 
645
class TestBadProtocolServer(TestSpecificRequestHandler):
 
646
    """Tests bad protocol from server."""
 
647
 
 
648
    _req_handler_class = BadProtocolRequestHandler
 
649
 
 
650
    def setUp(self):
 
651
        if pycurl_present and self._transport == PyCurlTransport:
 
652
            raise tests.TestNotApplicable(
 
653
                "pycurl doesn't check the protocol version")
 
654
        super(TestBadProtocolServer, self).setUp()
 
655
 
 
656
    def test_http_has(self):
 
657
        server = self.get_readonly_server()
 
658
        t = self._transport(server.get_url())
 
659
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
660
 
 
661
    def test_http_get(self):
 
662
        server = self.get_readonly_server()
 
663
        t = self._transport(server.get_url())
 
664
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
665
 
 
666
 
 
667
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
668
    """Whatever request comes in, returns a 403 code"""
 
669
 
 
670
    def parse_request(self):
 
671
        """Handle a single HTTP request, by replying we cannot handle it"""
 
672
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
673
        self.send_error(403)
 
674
        return False
 
675
 
 
676
 
 
677
class TestForbiddenServer(TestSpecificRequestHandler):
 
678
    """Tests forbidden server"""
 
679
 
 
680
    _req_handler_class = ForbiddenRequestHandler
 
681
 
 
682
    def test_http_has(self):
 
683
        server = self.get_readonly_server()
 
684
        t = self._transport(server.get_url())
 
685
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
 
686
 
 
687
    def test_http_get(self):
 
688
        server = self.get_readonly_server()
 
689
        t = self._transport(server.get_url())
 
690
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
 
691
 
 
692
 
 
693
class TestRecordingServer(tests.TestCase):
 
694
 
 
695
    def test_create(self):
 
696
        server = RecordingServer(expect_body_tail=None)
 
697
        self.assertEqual('', server.received_bytes)
 
698
        self.assertEqual(None, server.host)
 
699
        self.assertEqual(None, server.port)
 
700
 
 
701
    def test_setUp_and_tearDown(self):
 
702
        server = RecordingServer(expect_body_tail=None)
 
703
        server.setUp()
 
704
        try:
 
705
            self.assertNotEqual(None, server.host)
 
706
            self.assertNotEqual(None, server.port)
 
707
        finally:
 
708
            server.tearDown()
 
709
        self.assertEqual(None, server.host)
 
710
        self.assertEqual(None, server.port)
 
711
 
 
712
    def test_send_receive_bytes(self):
 
713
        server = RecordingServer(expect_body_tail='c')
 
714
        server.setUp()
 
715
        self.addCleanup(server.tearDown)
 
716
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
717
        sock.connect((server.host, server.port))
 
718
        sock.sendall('abc')
 
719
        self.assertEqual('HTTP/1.1 200 OK\r\n',
 
720
                         osutils.recv_all(sock, 4096))
 
721
        self.assertEqual('abc', server.received_bytes)
 
722
 
 
723
 
 
724
class TestRangeRequestServer(TestSpecificRequestHandler):
 
725
    """Tests readv requests against server.
 
726
 
 
727
    We test against default "normal" server.
 
728
    """
 
729
 
 
730
    def setUp(self):
 
731
        super(TestRangeRequestServer, self).setUp()
 
732
        self.build_tree_contents([('a', '0123456789')],)
 
733
 
 
734
    def test_readv(self):
 
735
        server = self.get_readonly_server()
 
736
        t = self._transport(server.get_url())
 
737
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
738
        self.assertEqual(l[0], (0, '0'))
 
739
        self.assertEqual(l[1], (1, '1'))
 
740
        self.assertEqual(l[2], (3, '34'))
 
741
        self.assertEqual(l[3], (9, '9'))
 
742
 
 
743
    def test_readv_out_of_order(self):
 
744
        server = self.get_readonly_server()
 
745
        t = self._transport(server.get_url())
 
746
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
747
        self.assertEqual(l[0], (1, '1'))
 
748
        self.assertEqual(l[1], (9, '9'))
 
749
        self.assertEqual(l[2], (0, '0'))
 
750
        self.assertEqual(l[3], (3, '34'))
 
751
 
 
752
    def test_readv_invalid_ranges(self):
 
753
        server = self.get_readonly_server()
 
754
        t = self._transport(server.get_url())
 
755
 
 
756
        # This is intentionally reading off the end of the file
 
757
        # since we are sure that it cannot get there
 
758
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
759
                              t.readv, 'a', [(1,1), (8,10)])
 
760
 
 
761
        # This is trying to seek past the end of the file, it should
 
762
        # also raise a special error
 
763
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
764
                              t.readv, 'a', [(12,2)])
 
765
 
 
766
    def test_readv_multiple_get_requests(self):
 
767
        server = self.get_readonly_server()
 
768
        t = self._transport(server.get_url())
 
769
        # force transport to issue multiple requests
 
770
        t._max_readv_combine = 1
 
771
        t._max_get_ranges = 1
 
772
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
773
        self.assertEqual(l[0], (0, '0'))
 
774
        self.assertEqual(l[1], (1, '1'))
 
775
        self.assertEqual(l[2], (3, '34'))
 
776
        self.assertEqual(l[3], (9, '9'))
 
777
        # The server should have issued 4 requests
 
778
        self.assertEqual(4, server.GET_request_nb)
 
779
 
 
780
    def test_readv_get_max_size(self):
 
781
        server = self.get_readonly_server()
 
782
        t = self._transport(server.get_url())
 
783
        # force transport to issue multiple requests by limiting the number of
 
784
        # bytes by request. Note that this apply to coalesced offsets only, a
 
785
        # single range will keep its size even if bigger than the limit.
 
786
        t._get_max_size = 2
 
787
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
788
        self.assertEqual(l[0], (0, '0'))
 
789
        self.assertEqual(l[1], (1, '1'))
 
790
        self.assertEqual(l[2], (2, '2345'))
 
791
        self.assertEqual(l[3], (6, '6789'))
 
792
        # The server should have issued 3 requests
 
793
        self.assertEqual(3, server.GET_request_nb)
 
794
 
 
795
    def test_complete_readv_leave_pipe_clean(self):
 
796
        server = self.get_readonly_server()
 
797
        t = self._transport(server.get_url())
 
798
        # force transport to issue multiple requests
 
799
        t._get_max_size = 2
 
800
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
801
        # The server should have issued 3 requests
 
802
        self.assertEqual(3, server.GET_request_nb)
 
803
        self.assertEqual('0123456789', t.get_bytes('a'))
 
804
        self.assertEqual(4, server.GET_request_nb)
 
805
 
 
806
    def test_incomplete_readv_leave_pipe_clean(self):
 
807
        server = self.get_readonly_server()
 
808
        t = self._transport(server.get_url())
 
809
        # force transport to issue multiple requests
 
810
        t._get_max_size = 2
 
811
        # Don't collapse readv results into a list so that we leave unread
 
812
        # bytes on the socket
 
813
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
814
        self.assertEqual((0, '0'), ireadv.next())
 
815
        # The server should have issued one request so far 
 
816
        self.assertEqual(1, server.GET_request_nb)
 
817
        self.assertEqual('0123456789', t.get_bytes('a'))
 
818
        # get_bytes issued an additional request, the readv pending ones are
 
819
        # lost
 
820
        self.assertEqual(2, server.GET_request_nb)
 
821
 
 
822
 
 
823
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
824
    """Always reply to range request as if they were single.
 
825
 
 
826
    Don't be explicit about it, just to annoy the clients.
 
827
    """
 
828
 
 
829
    def get_multiple_ranges(self, file, file_size, ranges):
 
830
        """Answer as if it was a single range request and ignores the rest"""
 
831
        (start, end) = ranges[0]
 
832
        return self.get_single_range(file, file_size, start, end)
 
833
 
 
834
 
 
835
class TestSingleRangeRequestServer(TestRangeRequestServer):
 
836
    """Test readv against a server which accept only single range requests"""
 
837
 
 
838
    _req_handler_class = SingleRangeRequestHandler
 
839
 
 
840
 
 
841
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
842
    """Only reply to simple range requests, errors out on multiple"""
 
843
 
 
844
    def get_multiple_ranges(self, file, file_size, ranges):
 
845
        """Refuses the multiple ranges request"""
 
846
        if len(ranges) > 1:
 
847
            file.close()
 
848
            self.send_error(416, "Requested range not satisfiable")
 
849
            return
 
850
        (start, end) = ranges[0]
 
851
        return self.get_single_range(file, file_size, start, end)
 
852
 
 
853
 
 
854
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
 
855
    """Test readv against a server which only accept single range requests"""
 
856
 
 
857
    _req_handler_class = SingleOnlyRangeRequestHandler
 
858
 
 
859
 
 
860
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
861
    """Ignore range requests without notice"""
 
862
 
 
863
    def do_GET(self):
 
864
        # Update the statistics
 
865
        self.server.test_case_server.GET_request_nb += 1
 
866
        # Just bypass the range handling done by TestingHTTPRequestHandler
 
867
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
 
868
 
 
869
 
 
870
class TestNoRangeRequestServer(TestRangeRequestServer):
 
871
    """Test readv against a server which do not accept range requests"""
 
872
 
 
873
    _req_handler_class = NoRangeRequestHandler
 
874
 
 
875
 
 
876
class MultipleRangeWithoutContentLengthRequestHandler(
 
877
    http_server.TestingHTTPRequestHandler):
 
878
    """Reply to multiple range requests without content length header."""
 
879
 
 
880
    def get_multiple_ranges(self, file, file_size, ranges):
 
881
        self.send_response(206)
 
882
        self.send_header('Accept-Ranges', 'bytes')
 
883
        boundary = "%d" % random.randint(0,0x7FFFFFFF)
 
884
        self.send_header("Content-Type",
 
885
                         "multipart/byteranges; boundary=%s" % boundary)
 
886
        self.end_headers()
 
887
        for (start, end) in ranges:
 
888
            self.wfile.write("--%s\r\n" % boundary)
 
889
            self.send_header("Content-type", 'application/octet-stream')
 
890
            self.send_header("Content-Range", "bytes %d-%d/%d" % (start,
 
891
                                                                  end,
 
892
                                                                  file_size))
 
893
            self.end_headers()
 
894
            self.send_range_content(file, start, end - start + 1)
 
895
        # Final boundary
 
896
        self.wfile.write("--%s\r\n" % boundary)
 
897
 
 
898
 
 
899
class TestMultipleRangeWithoutContentLengthServer(TestRangeRequestServer):
 
900
 
 
901
    _req_handler_class = MultipleRangeWithoutContentLengthRequestHandler
 
902
 
 
903
 
 
904
class TruncatedMultipleRangeRequestHandler(
 
905
    http_server.TestingHTTPRequestHandler):
 
906
    """Reply to multiple range requests truncating the last ones.
 
907
 
 
908
    This server generates responses whose Content-Length describes all the
 
909
    ranges, but fail to include the last ones leading to client short reads.
 
910
    This has been observed randomly with lighttpd (bug #179368).
 
911
    """
 
912
 
 
913
    _truncated_ranges = 2
 
914
 
 
915
    def get_multiple_ranges(self, file, file_size, ranges):
 
916
        self.send_response(206)
 
917
        self.send_header('Accept-Ranges', 'bytes')
 
918
        boundary = 'tagada'
 
919
        self.send_header('Content-Type',
 
920
                         'multipart/byteranges; boundary=%s' % boundary)
 
921
        boundary_line = '--%s\r\n' % boundary
 
922
        # Calculate the Content-Length
 
923
        content_length = 0
 
924
        for (start, end) in ranges:
 
925
            content_length += len(boundary_line)
 
926
            content_length += self._header_line_length(
 
927
                'Content-type', 'application/octet-stream')
 
928
            content_length += self._header_line_length(
 
929
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
 
930
            content_length += len('\r\n') # end headers
 
931
            content_length += end - start # + 1
 
932
        content_length += len(boundary_line)
 
933
        self.send_header('Content-length', content_length)
 
934
        self.end_headers()
 
935
 
 
936
        # Send the multipart body
 
937
        cur = 0
 
938
        for (start, end) in ranges:
 
939
            self.wfile.write(boundary_line)
 
940
            self.send_header('Content-type', 'application/octet-stream')
 
941
            self.send_header('Content-Range', 'bytes %d-%d/%d'
 
942
                             % (start, end, file_size))
 
943
            self.end_headers()
 
944
            if cur + self._truncated_ranges >= len(ranges):
 
945
                # Abruptly ends the response and close the connection
 
946
                self.close_connection = 1
 
947
                return
 
948
            self.send_range_content(file, start, end - start + 1)
 
949
            cur += 1
 
950
        # No final boundary
 
951
        self.wfile.write(boundary_line)
 
952
 
 
953
 
 
954
class TestTruncatedMultipleRangeServer(TestSpecificRequestHandler):
 
955
 
 
956
    _req_handler_class = TruncatedMultipleRangeRequestHandler
 
957
 
 
958
    def setUp(self):
 
959
        super(TestTruncatedMultipleRangeServer, self).setUp()
 
960
        self.build_tree_contents([('a', '0123456789')],)
 
961
 
 
962
    def test_readv_with_short_reads(self):
 
963
        server = self.get_readonly_server()
 
964
        t = self._transport(server.get_url())
 
965
        # Force separate ranges for each offset
 
966
        t._bytes_to_read_before_seek = 0
 
967
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
 
968
        self.assertEqual((0, '0'), ireadv.next())
 
969
        self.assertEqual((2, '2'), ireadv.next())
 
970
        if not self._testing_pycurl():
 
971
            # Only one request have been issued so far (except for pycurl that
 
972
            # try to read the whole response at once)
 
973
            self.assertEqual(1, server.GET_request_nb)
 
974
        self.assertEqual((4, '45'), ireadv.next())
 
975
        self.assertEqual((9, '9'), ireadv.next())
 
976
        # Both implementations issue 3 requests but:
 
977
        # - urllib does two multiple (4 ranges, then 2 ranges) then a single
 
978
        #   range,
 
979
        # - pycurl does two multiple (4 ranges, 4 ranges) then a single range
 
980
        self.assertEqual(3, server.GET_request_nb)
 
981
        # Finally the client have tried a single range request and stays in
 
982
        # that mode
 
983
        self.assertEqual('single', t._range_hint)
 
984
 
 
985
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
986
    """Errors out when range specifiers exceed the limit"""
 
987
 
 
988
    def get_multiple_ranges(self, file, file_size, ranges):
 
989
        """Refuses the multiple ranges request"""
 
990
        tcs = self.server.test_case_server
 
991
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
 
992
            file.close()
 
993
            # Emulate apache behavior
 
994
            self.send_error(400, "Bad Request")
 
995
            return
 
996
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
 
997
            self, file, file_size, ranges)
 
998
 
 
999
 
 
1000
class LimitedRangeHTTPServer(http_server.HttpServer):
 
1001
    """An HttpServer erroring out on requests with too much range specifiers"""
 
1002
 
 
1003
    def __init__(self, request_handler=LimitedRangeRequestHandler,
 
1004
                 protocol_version=None,
 
1005
                 range_limit=None):
 
1006
        http_server.HttpServer.__init__(self, request_handler,
 
1007
                                        protocol_version=protocol_version)
 
1008
        self.range_limit = range_limit
 
1009
 
 
1010
 
 
1011
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
 
1012
    """Tests readv requests against a server erroring out on too much ranges."""
 
1013
 
 
1014
    # Requests with more range specifiers will error out
 
1015
    range_limit = 3
 
1016
 
 
1017
    def create_transport_readonly_server(self):
 
1018
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
 
1019
                                      protocol_version=self._protocol_version)
 
1020
 
 
1021
    def get_transport(self):
 
1022
        return self._transport(self.get_readonly_server().get_url())
 
1023
 
 
1024
    def setUp(self):
 
1025
        http_utils.TestCaseWithWebserver.setUp(self)
 
1026
        # We need to manipulate ranges that correspond to real chunks in the
 
1027
        # response, so we build a content appropriately.
 
1028
        filler = ''.join(['abcdefghij' for x in range(102)])
 
1029
        content = ''.join(['%04d' % v + filler for v in range(16)])
 
1030
        self.build_tree_contents([('a', content)],)
 
1031
 
 
1032
    def test_few_ranges(self):
 
1033
        t = self.get_transport()
 
1034
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
 
1035
        self.assertEqual(l[0], (0, '0000'))
 
1036
        self.assertEqual(l[1], (1024, '0001'))
 
1037
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
 
1038
 
 
1039
    def test_more_ranges(self):
 
1040
        t = self.get_transport()
 
1041
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
 
1042
        self.assertEqual(l[0], (0, '0000'))
 
1043
        self.assertEqual(l[1], (1024, '0001'))
 
1044
        self.assertEqual(l[2], (4096, '0004'))
 
1045
        self.assertEqual(l[3], (8192, '0008'))
 
1046
        # The server will refuse to serve the first request (too much ranges),
 
1047
        # a second request will succeed.
 
1048
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
 
1049
 
 
1050
 
 
1051
class TestHttpProxyWhiteBox(tests.TestCase):
 
1052
    """Whitebox test proxy http authorization.
 
1053
 
 
1054
    Only the urllib implementation is tested here.
 
1055
    """
 
1056
 
 
1057
    def setUp(self):
 
1058
        tests.TestCase.setUp(self)
 
1059
        self._old_env = {}
 
1060
 
 
1061
    def tearDown(self):
 
1062
        self._restore_env()
 
1063
        tests.TestCase.tearDown(self)
 
1064
 
 
1065
    def _install_env(self, env):
 
1066
        for name, value in env.iteritems():
 
1067
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1068
 
 
1069
    def _restore_env(self):
 
1070
        for name, value in self._old_env.iteritems():
 
1071
            osutils.set_or_unset_env(name, value)
 
1072
 
 
1073
    def _proxied_request(self):
 
1074
        handler = _urllib2_wrappers.ProxyHandler()
 
1075
        request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
 
1076
        handler.set_proxy(request, 'http')
 
1077
        return request
 
1078
 
 
1079
    def test_empty_user(self):
 
1080
        self._install_env({'http_proxy': 'http://bar.com'})
 
1081
        request = self._proxied_request()
 
1082
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
 
1083
 
 
1084
    def test_invalid_proxy(self):
 
1085
        """A proxy env variable without scheme"""
 
1086
        self._install_env({'http_proxy': 'host:1234'})
 
1087
        self.assertRaises(errors.InvalidURL, self._proxied_request)
 
1088
 
 
1089
 
 
1090
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
 
1091
    """Tests proxy server.
 
1092
 
 
1093
    Be aware that we do not setup a real proxy here. Instead, we
 
1094
    check that the *connection* goes through the proxy by serving
 
1095
    different content (the faked proxy server append '-proxied'
 
1096
    to the file names).
 
1097
    """
 
1098
 
 
1099
    # FIXME: We don't have an https server available, so we don't
 
1100
    # test https connections.
 
1101
 
 
1102
    def setUp(self):
 
1103
        super(TestProxyHttpServer, self).setUp()
 
1104
        self.build_tree_contents([('foo', 'contents of foo\n'),
 
1105
                                  ('foo-proxied', 'proxied contents of foo\n')])
 
1106
        # Let's setup some attributes for tests
 
1107
        self.server = self.get_readonly_server()
 
1108
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
 
1109
        if self._testing_pycurl():
 
1110
            # Oh my ! pycurl does not check for the port as part of
 
1111
            # no_proxy :-( So we just test the host part
 
1112
            self.no_proxy_host = 'localhost'
 
1113
        else:
 
1114
            self.no_proxy_host = self.proxy_address
 
1115
        # The secondary server is the proxy
 
1116
        self.proxy = self.get_secondary_server()
 
1117
        self.proxy_url = self.proxy.get_url()
 
1118
        self._old_env = {}
 
1119
 
 
1120
    def _testing_pycurl(self):
 
1121
        return pycurl_present and self._transport == PyCurlTransport
 
1122
 
 
1123
    def create_transport_secondary_server(self):
 
1124
        """Creates an http server that will serve files with
 
1125
        '-proxied' appended to their names.
 
1126
        """
 
1127
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
 
1128
 
 
1129
    def _install_env(self, env):
 
1130
        for name, value in env.iteritems():
 
1131
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1132
 
 
1133
    def _restore_env(self):
 
1134
        for name, value in self._old_env.iteritems():
 
1135
            osutils.set_or_unset_env(name, value)
 
1136
 
 
1137
    def proxied_in_env(self, env):
 
1138
        self._install_env(env)
 
1139
        url = self.server.get_url()
 
1140
        t = self._transport(url)
 
1141
        try:
 
1142
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1143
        finally:
 
1144
            self._restore_env()
 
1145
 
 
1146
    def not_proxied_in_env(self, env):
 
1147
        self._install_env(env)
 
1148
        url = self.server.get_url()
 
1149
        t = self._transport(url)
 
1150
        try:
 
1151
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1152
        finally:
 
1153
            self._restore_env()
 
1154
 
 
1155
    def test_http_proxy(self):
 
1156
        self.proxied_in_env({'http_proxy': self.proxy_url})
 
1157
 
 
1158
    def test_HTTP_PROXY(self):
 
1159
        if self._testing_pycurl():
 
1160
            # pycurl does not check HTTP_PROXY for security reasons
 
1161
            # (for use in a CGI context that we do not care
 
1162
            # about. Should we ?)
 
1163
            raise tests.TestNotApplicable(
 
1164
                'pycurl does not check HTTP_PROXY for security reasons')
 
1165
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
 
1166
 
 
1167
    def test_all_proxy(self):
 
1168
        self.proxied_in_env({'all_proxy': self.proxy_url})
 
1169
 
 
1170
    def test_ALL_PROXY(self):
 
1171
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
 
1172
 
 
1173
    def test_http_proxy_with_no_proxy(self):
 
1174
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
 
1175
                                 'no_proxy': self.no_proxy_host})
 
1176
 
 
1177
    def test_HTTP_PROXY_with_NO_PROXY(self):
 
1178
        if self._testing_pycurl():
 
1179
            raise tests.TestNotApplicable(
 
1180
                'pycurl does not check HTTP_PROXY for security reasons')
 
1181
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
 
1182
                                 'NO_PROXY': self.no_proxy_host})
 
1183
 
 
1184
    def test_all_proxy_with_no_proxy(self):
 
1185
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
 
1186
                                 'no_proxy': self.no_proxy_host})
 
1187
 
 
1188
    def test_ALL_PROXY_with_NO_PROXY(self):
 
1189
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
 
1190
                                 'NO_PROXY': self.no_proxy_host})
 
1191
 
 
1192
    def test_http_proxy_without_scheme(self):
 
1193
        if self._testing_pycurl():
 
1194
            # pycurl *ignores* invalid proxy env variables. If that ever change
 
1195
            # in the future, this test will fail indicating that pycurl do not
 
1196
            # ignore anymore such variables.
 
1197
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
 
1198
        else:
 
1199
            self.assertRaises(errors.InvalidURL,
 
1200
                              self.proxied_in_env,
 
1201
                              {'http_proxy': self.proxy_address})
 
1202
 
 
1203
 
 
1204
class TestRanges(http_utils.TestCaseWithWebserver):
 
1205
    """Test the Range header in GET methods."""
 
1206
 
 
1207
    def setUp(self):
 
1208
        http_utils.TestCaseWithWebserver.setUp(self)
 
1209
        self.build_tree_contents([('a', '0123456789')],)
 
1210
        server = self.get_readonly_server()
 
1211
        self.transport = self._transport(server.get_url())
 
1212
 
 
1213
    def create_transport_readonly_server(self):
 
1214
        return http_server.HttpServer(protocol_version=self._protocol_version)
 
1215
 
 
1216
    def _file_contents(self, relpath, ranges):
 
1217
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
1218
        coalesce = self.transport._coalesce_offsets
 
1219
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
1220
        code, data = self.transport._get(relpath, coalesced)
 
1221
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1222
        for start, end in ranges:
 
1223
            data.seek(start)
 
1224
            yield data.read(end - start + 1)
 
1225
 
 
1226
    def _file_tail(self, relpath, tail_amount):
 
1227
        code, data = self.transport._get(relpath, [], tail_amount)
 
1228
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1229
        data.seek(-tail_amount, 2)
 
1230
        return data.read(tail_amount)
 
1231
 
 
1232
    def test_range_header(self):
 
1233
        # Valid ranges
 
1234
        map(self.assertEqual,['0', '234'],
 
1235
            list(self._file_contents('a', [(0,0), (2,4)])),)
 
1236
 
 
1237
    def test_range_header_tail(self):
 
1238
        self.assertEqual('789', self._file_tail('a', 3))
 
1239
 
 
1240
    def test_syntactically_invalid_range_header(self):
 
1241
        self.assertListRaises(errors.InvalidHttpRange,
 
1242
                          self._file_contents, 'a', [(4, 3)])
 
1243
 
 
1244
    def test_semantically_invalid_range_header(self):
 
1245
        self.assertListRaises(errors.InvalidHttpRange,
 
1246
                          self._file_contents, 'a', [(42, 128)])
 
1247
 
 
1248
 
 
1249
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1250
    """Test redirection between http servers."""
 
1251
 
 
1252
    def create_transport_secondary_server(self):
 
1253
        """Create the secondary server redirecting to the primary server"""
 
1254
        new = self.get_readonly_server()
 
1255
 
 
1256
        redirecting = http_utils.HTTPServerRedirecting(
 
1257
            protocol_version=self._protocol_version)
 
1258
        redirecting.redirect_to(new.host, new.port)
 
1259
        return redirecting
 
1260
 
 
1261
    def setUp(self):
 
1262
        super(TestHTTPRedirections, self).setUp()
 
1263
        self.build_tree_contents([('a', '0123456789'),
 
1264
                                  ('bundle',
 
1265
                                  '# Bazaar revision bundle v0.9\n#\n')
 
1266
                                  ],)
 
1267
        # The requests to the old server will be redirected to the new server
 
1268
        self.old_transport = self._transport(self.old_server.get_url())
 
1269
 
 
1270
    def test_redirected(self):
 
1271
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
 
1272
        t = self._transport(self.new_server.get_url())
 
1273
        self.assertEqual('0123456789', t.get('a').read())
 
1274
 
 
1275
    def test_read_redirected_bundle_from_url(self):
 
1276
        from bzrlib.bundle import read_bundle_from_url
 
1277
        url = self.old_transport.abspath('bundle')
 
1278
        bundle = read_bundle_from_url(url)
 
1279
        # If read_bundle_from_url was successful we get an empty bundle
 
1280
        self.assertEqual([], bundle.revisions)
 
1281
 
 
1282
 
 
1283
class RedirectedRequest(_urllib2_wrappers.Request):
 
1284
    """Request following redirections. """
 
1285
 
 
1286
    init_orig = _urllib2_wrappers.Request.__init__
 
1287
 
 
1288
    def __init__(self, method, url, *args, **kwargs):
 
1289
        """Constructor.
 
1290
 
 
1291
        """
 
1292
        # Since the tests using this class will replace
 
1293
        # _urllib2_wrappers.Request, we can't just call the base class __init__
 
1294
        # or we'll loop.
 
1295
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
 
1296
        self.follow_redirections = True
 
1297
 
 
1298
 
 
1299
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1300
    """Test redirections.
 
1301
 
 
1302
    http implementations do not redirect silently anymore (they
 
1303
    do not redirect at all in fact). The mechanism is still in
 
1304
    place at the _urllib2_wrappers.Request level and these tests
 
1305
    exercise it.
 
1306
 
 
1307
    For the pycurl implementation
 
1308
    the redirection have been deleted as we may deprecate pycurl
 
1309
    and I have no place to keep a working implementation.
 
1310
    -- vila 20070212
 
1311
    """
 
1312
 
 
1313
    def setUp(self):
 
1314
        if pycurl_present and self._transport == PyCurlTransport:
 
1315
            raise tests.TestNotApplicable(
 
1316
                "pycurl doesn't redirect silently annymore")
 
1317
        super(TestHTTPSilentRedirections, self).setUp()
 
1318
        self.setup_redirected_request()
 
1319
        self.addCleanup(self.cleanup_redirected_request)
 
1320
        self.build_tree_contents([('a','a'),
 
1321
                                  ('1/',),
 
1322
                                  ('1/a', 'redirected once'),
 
1323
                                  ('2/',),
 
1324
                                  ('2/a', 'redirected twice'),
 
1325
                                  ('3/',),
 
1326
                                  ('3/a', 'redirected thrice'),
 
1327
                                  ('4/',),
 
1328
                                  ('4/a', 'redirected 4 times'),
 
1329
                                  ('5/',),
 
1330
                                  ('5/a', 'redirected 5 times'),
 
1331
                                  ],)
 
1332
 
 
1333
        self.old_transport = self._transport(self.old_server.get_url())
 
1334
 
 
1335
    def setup_redirected_request(self):
 
1336
        self.original_class = _urllib2_wrappers.Request
 
1337
        _urllib2_wrappers.Request = RedirectedRequest
 
1338
 
 
1339
    def cleanup_redirected_request(self):
 
1340
        _urllib2_wrappers.Request = self.original_class
 
1341
 
 
1342
    def create_transport_secondary_server(self):
 
1343
        """Create the secondary server, redirections are defined in the tests"""
 
1344
        return http_utils.HTTPServerRedirecting(
 
1345
            protocol_version=self._protocol_version)
 
1346
 
 
1347
    def test_one_redirection(self):
 
1348
        t = self.old_transport
 
1349
 
 
1350
        req = RedirectedRequest('GET', t.abspath('a'))
 
1351
        req.follow_redirections = True
 
1352
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1353
                                       self.new_server.port)
 
1354
        self.old_server.redirections = \
 
1355
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
1356
        self.assertEquals('redirected once',t._perform(req).read())
 
1357
 
 
1358
    def test_five_redirections(self):
 
1359
        t = self.old_transport
 
1360
 
 
1361
        req = RedirectedRequest('GET', t.abspath('a'))
 
1362
        req.follow_redirections = True
 
1363
        old_prefix = 'http://%s:%s' % (self.old_server.host,
 
1364
                                       self.old_server.port)
 
1365
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1366
                                       self.new_server.port)
 
1367
        self.old_server.redirections = [
 
1368
            ('/1(.*)', r'%s/2\1' % (old_prefix), 302),
 
1369
            ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
 
1370
            ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
 
1371
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
 
1372
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
 
1373
            ]
 
1374
        self.assertEquals('redirected 5 times',t._perform(req).read())
 
1375
 
 
1376
 
 
1377
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1378
    """Test transport.do_catching_redirections."""
 
1379
 
 
1380
    def setUp(self):
 
1381
        super(TestDoCatchRedirections, self).setUp()
 
1382
        self.build_tree_contents([('a', '0123456789'),],)
 
1383
 
 
1384
        self.old_transport = self._transport(self.old_server.get_url())
 
1385
 
 
1386
    def get_a(self, transport):
 
1387
        return transport.get('a')
 
1388
 
 
1389
    def test_no_redirection(self):
 
1390
        t = self._transport(self.new_server.get_url())
 
1391
 
 
1392
        # We use None for redirected so that we fail if redirected
 
1393
        self.assertEquals('0123456789',
 
1394
                          transport.do_catching_redirections(
 
1395
                self.get_a, t, None).read())
 
1396
 
 
1397
    def test_one_redirection(self):
 
1398
        self.redirections = 0
 
1399
 
 
1400
        def redirected(transport, exception, redirection_notice):
 
1401
            self.redirections += 1
 
1402
            dir, file = urlutils.split(exception.target)
 
1403
            return self._transport(dir)
 
1404
 
 
1405
        self.assertEquals('0123456789',
 
1406
                          transport.do_catching_redirections(
 
1407
                self.get_a, self.old_transport, redirected).read())
 
1408
        self.assertEquals(1, self.redirections)
 
1409
 
 
1410
    def test_redirection_loop(self):
 
1411
 
 
1412
        def redirected(transport, exception, redirection_notice):
 
1413
            # By using the redirected url as a base dir for the
 
1414
            # *old* transport, we create a loop: a => a/a =>
 
1415
            # a/a/a
 
1416
            return self.old_transport.clone(exception.target)
 
1417
 
 
1418
        self.assertRaises(errors.TooManyRedirections,
 
1419
                          transport.do_catching_redirections,
 
1420
                          self.get_a, self.old_transport, redirected)
 
1421
 
 
1422
 
 
1423
class TestAuth(http_utils.TestCaseWithWebserver):
 
1424
    """Test authentication scheme"""
 
1425
 
 
1426
    _auth_header = 'Authorization'
 
1427
    _password_prompt_prefix = ''
 
1428
 
 
1429
    def setUp(self):
 
1430
        super(TestAuth, self).setUp()
 
1431
        self.server = self.get_readonly_server()
 
1432
        self.build_tree_contents([('a', 'contents of a\n'),
 
1433
                                  ('b', 'contents of b\n'),])
 
1434
 
 
1435
    def create_transport_readonly_server(self):
 
1436
        if self._auth_scheme == 'basic':
 
1437
            server = http_utils.HTTPBasicAuthServer(
 
1438
                protocol_version=self._protocol_version)
 
1439
        else:
 
1440
            if self._auth_scheme != 'digest':
 
1441
                raise AssertionError('Unknown auth scheme: %r'
 
1442
                                     % self._auth_scheme)
 
1443
            server = http_utils.HTTPDigestAuthServer(
 
1444
                protocol_version=self._protocol_version)
 
1445
        return server
 
1446
 
 
1447
    def _testing_pycurl(self):
 
1448
        return pycurl_present and self._transport == PyCurlTransport
 
1449
 
 
1450
    def get_user_url(self, user, password):
 
1451
        """Build an url embedding user and password"""
 
1452
        url = '%s://' % self.server._url_protocol
 
1453
        if user is not None:
 
1454
            url += user
 
1455
            if password is not None:
 
1456
                url += ':' + password
 
1457
            url += '@'
 
1458
        url += '%s:%s/' % (self.server.host, self.server.port)
 
1459
        return url
 
1460
 
 
1461
    def get_user_transport(self, user, password):
 
1462
        return self._transport(self.get_user_url(user, password))
 
1463
 
 
1464
    def test_no_user(self):
 
1465
        self.server.add_user('joe', 'foo')
 
1466
        t = self.get_user_transport(None, None)
 
1467
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1468
        # Only one 'Authentication Required' error should occur
 
1469
        self.assertEqual(1, self.server.auth_required_errors)
 
1470
 
 
1471
    def test_empty_pass(self):
 
1472
        self.server.add_user('joe', '')
 
1473
        t = self.get_user_transport('joe', '')
 
1474
        self.assertEqual('contents of a\n', t.get('a').read())
 
1475
        # Only one 'Authentication Required' error should occur
 
1476
        self.assertEqual(1, self.server.auth_required_errors)
 
1477
 
 
1478
    def test_user_pass(self):
 
1479
        self.server.add_user('joe', 'foo')
 
1480
        t = self.get_user_transport('joe', 'foo')
 
1481
        self.assertEqual('contents of a\n', t.get('a').read())
 
1482
        # Only one 'Authentication Required' error should occur
 
1483
        self.assertEqual(1, self.server.auth_required_errors)
 
1484
 
 
1485
    def test_unknown_user(self):
 
1486
        self.server.add_user('joe', 'foo')
 
1487
        t = self.get_user_transport('bill', 'foo')
 
1488
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1489
        # Two 'Authentication Required' errors should occur (the
 
1490
        # initial 'who are you' and 'I don't know you, who are
 
1491
        # you').
 
1492
        self.assertEqual(2, self.server.auth_required_errors)
 
1493
 
 
1494
    def test_wrong_pass(self):
 
1495
        self.server.add_user('joe', 'foo')
 
1496
        t = self.get_user_transport('joe', 'bar')
 
1497
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1498
        # Two 'Authentication Required' errors should occur (the
 
1499
        # initial 'who are you' and 'this is not you, who are you')
 
1500
        self.assertEqual(2, self.server.auth_required_errors)
 
1501
 
 
1502
    def test_prompt_for_password(self):
 
1503
        if self._testing_pycurl():
 
1504
            raise tests.TestNotApplicable(
 
1505
                'pycurl cannot prompt, it handles auth by embedding'
 
1506
                ' user:pass in urls only')
 
1507
 
 
1508
        self.server.add_user('joe', 'foo')
 
1509
        t = self.get_user_transport('joe', None)
 
1510
        stdout = tests.StringIOWrapper()
 
1511
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1512
        self.assertEqual('contents of a\n',t.get('a').read())
 
1513
        # stdin should be empty
 
1514
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1515
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1516
                                    stdout.getvalue())
 
1517
        # And we shouldn't prompt again for a different request
 
1518
        # against the same transport.
 
1519
        self.assertEqual('contents of b\n',t.get('b').read())
 
1520
        t2 = t.clone()
 
1521
        # And neither against a clone
 
1522
        self.assertEqual('contents of b\n',t2.get('b').read())
 
1523
        # Only one 'Authentication Required' error should occur
 
1524
        self.assertEqual(1, self.server.auth_required_errors)
 
1525
 
 
1526
    def _check_password_prompt(self, scheme, user, actual_prompt):
 
1527
        expected_prompt = (self._password_prompt_prefix
 
1528
                           + ("%s %s@%s:%d, Realm: '%s' password: "
 
1529
                              % (scheme.upper(),
 
1530
                                 user, self.server.host, self.server.port,
 
1531
                                 self.server.auth_realm)))
 
1532
        self.assertEquals(expected_prompt, actual_prompt)
 
1533
 
 
1534
    def test_no_prompt_for_password_when_using_auth_config(self):
 
1535
        if self._testing_pycurl():
 
1536
            raise tests.TestNotApplicable(
 
1537
                'pycurl does not support authentication.conf'
 
1538
                ' since it cannot prompt')
 
1539
 
 
1540
        user =' joe'
 
1541
        password = 'foo'
 
1542
        stdin_content = 'bar\n'  # Not the right password
 
1543
        self.server.add_user(user, password)
 
1544
        t = self.get_user_transport(user, None)
 
1545
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
 
1546
                                            stdout=tests.StringIOWrapper())
 
1547
        # Create a minimal config file with the right password
 
1548
        conf = config.AuthenticationConfig()
 
1549
        conf._get_config().update(
 
1550
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1551
                          'user': user, 'password': password}})
 
1552
        conf._save()
 
1553
        # Issue a request to the server to connect
 
1554
        self.assertEqual('contents of a\n',t.get('a').read())
 
1555
        # stdin should have  been left untouched
 
1556
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
 
1557
        # Only one 'Authentication Required' error should occur
 
1558
        self.assertEqual(1, self.server.auth_required_errors)
 
1559
 
 
1560
    def test_user_from_auth_conf(self):
 
1561
        if self._testing_pycurl():
 
1562
            raise tests.TestNotApplicable(
 
1563
                'pycurl does not support authentication.conf')
 
1564
        user = 'joe'
 
1565
        password = 'foo'
 
1566
        self.server.add_user(user, password)
 
1567
        # Create a minimal config file with the right password
 
1568
        conf = config.AuthenticationConfig()
 
1569
        conf._get_config().update(
 
1570
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1571
                          'user': user, 'password': password}})
 
1572
        conf._save()
 
1573
        t = self.get_user_transport(None, None)
 
1574
        # Issue a request to the server to connect
 
1575
        self.assertEqual('contents of a\n', t.get('a').read())
 
1576
        # Only one 'Authentication Required' error should occur
 
1577
        self.assertEqual(1, self.server.auth_required_errors)
 
1578
 
 
1579
    def test_changing_nonce(self):
 
1580
        if self._auth_scheme != 'digest':
 
1581
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1582
        if self._testing_pycurl():
 
1583
            raise tests.KnownFailure(
 
1584
                'pycurl does not handle a nonce change')
 
1585
        self.server.add_user('joe', 'foo')
 
1586
        t = self.get_user_transport('joe', 'foo')
 
1587
        self.assertEqual('contents of a\n', t.get('a').read())
 
1588
        self.assertEqual('contents of b\n', t.get('b').read())
 
1589
        # Only one 'Authentication Required' error should have
 
1590
        # occured so far
 
1591
        self.assertEqual(1, self.server.auth_required_errors)
 
1592
        # The server invalidates the current nonce
 
1593
        self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
 
1594
        self.assertEqual('contents of a\n', t.get('a').read())
 
1595
        # Two 'Authentication Required' errors should occur (the
 
1596
        # initial 'who are you' and a second 'who are you' with the new nonce)
 
1597
        self.assertEqual(2, self.server.auth_required_errors)
 
1598
 
 
1599
 
 
1600
 
 
1601
class TestProxyAuth(TestAuth):
 
1602
    """Test proxy authentication schemes."""
 
1603
 
 
1604
    _auth_header = 'Proxy-authorization'
 
1605
    _password_prompt_prefix='Proxy '
 
1606
 
 
1607
    def setUp(self):
 
1608
        super(TestProxyAuth, self).setUp()
 
1609
        self._old_env = {}
 
1610
        self.addCleanup(self._restore_env)
 
1611
        # Override the contents to avoid false positives
 
1612
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
 
1613
                                  ('b', 'not proxied contents of b\n'),
 
1614
                                  ('a-proxied', 'contents of a\n'),
 
1615
                                  ('b-proxied', 'contents of b\n'),
 
1616
                                  ])
 
1617
 
 
1618
    def create_transport_readonly_server(self):
 
1619
        if self._auth_scheme == 'basic':
 
1620
            server = http_utils.ProxyBasicAuthServer(
 
1621
                protocol_version=self._protocol_version)
 
1622
        else:
 
1623
            if self._auth_scheme != 'digest':
 
1624
                raise AssertionError('Unknown auth scheme: %r'
 
1625
                                     % self._auth_scheme)
 
1626
            server = http_utils.ProxyDigestAuthServer(
 
1627
                protocol_version=self._protocol_version)
 
1628
        return server
 
1629
 
 
1630
    def get_user_transport(self, user, password):
 
1631
        self._install_env({'all_proxy': self.get_user_url(user, password)})
 
1632
        return self._transport(self.server.get_url())
 
1633
 
 
1634
    def _install_env(self, env):
 
1635
        for name, value in env.iteritems():
 
1636
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1637
 
 
1638
    def _restore_env(self):
 
1639
        for name, value in self._old_env.iteritems():
 
1640
            osutils.set_or_unset_env(name, value)
 
1641
 
 
1642
    def test_empty_pass(self):
 
1643
        if self._testing_pycurl():
 
1644
            import pycurl
 
1645
            if pycurl.version_info()[1] < '7.16.0':
 
1646
                raise tests.KnownFailure(
 
1647
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
 
1648
        super(TestProxyAuth, self).test_empty_pass()
 
1649
 
 
1650
 
 
1651
class SampleSocket(object):
 
1652
    """A socket-like object for use in testing the HTTP request handler."""
 
1653
 
 
1654
    def __init__(self, socket_read_content):
 
1655
        """Constructs a sample socket.
 
1656
 
 
1657
        :param socket_read_content: a byte sequence
 
1658
        """
 
1659
        # Use plain python StringIO so we can monkey-patch the close method to
 
1660
        # not discard the contents.
 
1661
        from StringIO import StringIO
 
1662
        self.readfile = StringIO(socket_read_content)
 
1663
        self.writefile = StringIO()
 
1664
        self.writefile.close = lambda: None
 
1665
 
 
1666
    def makefile(self, mode='r', bufsize=None):
 
1667
        if 'r' in mode:
 
1668
            return self.readfile
 
1669
        else:
 
1670
            return self.writefile
 
1671
 
 
1672
 
 
1673
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
 
1674
 
 
1675
    def setUp(self):
 
1676
        super(SmartHTTPTunnellingTest, self).setUp()
 
1677
        # We use the VFS layer as part of HTTP tunnelling tests.
 
1678
        self._captureVar('BZR_NO_SMART_VFS', None)
 
1679
        self.transport_readonly_server = http_utils.HTTPServerWithSmarts
 
1680
 
 
1681
    def create_transport_readonly_server(self):
 
1682
        return http_utils.HTTPServerWithSmarts(
 
1683
            protocol_version=self._protocol_version)
 
1684
 
 
1685
    def test_open_bzrdir(self):
 
1686
        branch = self.make_branch('relpath')
 
1687
        http_server = self.get_readonly_server()
 
1688
        url = http_server.get_url() + 'relpath'
 
1689
        bd = bzrdir.BzrDir.open(url)
 
1690
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
 
1691
 
 
1692
    def test_bulk_data(self):
 
1693
        # We should be able to send and receive bulk data in a single message.
 
1694
        # The 'readv' command in the smart protocol both sends and receives
 
1695
        # bulk data, so we use that.
 
1696
        self.build_tree(['data-file'])
 
1697
        http_server = self.get_readonly_server()
 
1698
        http_transport = self._transport(http_server.get_url())
 
1699
        medium = http_transport.get_smart_medium()
 
1700
        # Since we provide the medium, the url below will be mostly ignored
 
1701
        # during the test, as long as the path is '/'.
 
1702
        remote_transport = remote.RemoteTransport('bzr://fake_host/',
 
1703
                                                  medium=medium)
 
1704
        self.assertEqual(
 
1705
            [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
 
1706
 
 
1707
    def test_http_send_smart_request(self):
 
1708
 
 
1709
        post_body = 'hello\n'
 
1710
        expected_reply_body = 'ok\x012\n'
 
1711
 
 
1712
        http_server = self.get_readonly_server()
 
1713
        http_transport = self._transport(http_server.get_url())
 
1714
        medium = http_transport.get_smart_medium()
 
1715
        response = medium.send_http_smart_request(post_body)
 
1716
        reply_body = response.read()
 
1717
        self.assertEqual(expected_reply_body, reply_body)
 
1718
 
 
1719
    def test_smart_http_server_post_request_handler(self):
 
1720
        httpd = self.get_readonly_server()._get_httpd()
 
1721
 
 
1722
        socket = SampleSocket(
 
1723
            'POST /.bzr/smart %s \r\n' % self._protocol_version
 
1724
            # HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
 
1725
            # for 1.0)
 
1726
            + 'Content-Length: 6\r\n'
 
1727
            '\r\n'
 
1728
            'hello\n')
 
1729
        # Beware: the ('localhost', 80) below is the
 
1730
        # client_address parameter, but we don't have one because
 
1731
        # we have defined a socket which is not bound to an
 
1732
        # address. The test framework never uses this client
 
1733
        # address, so far...
 
1734
        request_handler = http_utils.SmartRequestHandler(socket,
 
1735
                                                         ('localhost', 80),
 
1736
                                                         httpd)
 
1737
        response = socket.writefile.getvalue()
 
1738
        self.assertStartsWith(response, '%s 200 ' % self._protocol_version)
 
1739
        # This includes the end of the HTTP headers, and all the body.
 
1740
        expected_end_of_response = '\r\n\r\nok\x012\n'
 
1741
        self.assertEndsWith(response, expected_end_of_response)
 
1742
 
 
1743
 
 
1744
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
1745
    """No smart server here request handler."""
 
1746
 
 
1747
    def do_POST(self):
 
1748
        self.send_error(403, "Forbidden")
 
1749
 
 
1750
 
 
1751
class SmartClientAgainstNotSmartServer(TestSpecificRequestHandler):
 
1752
    """Test smart client behaviour against an http server without smarts."""
 
1753
 
 
1754
    _req_handler_class = ForbiddenRequestHandler
 
1755
 
 
1756
    def test_probe_smart_server(self):
 
1757
        """Test error handling against server refusing smart requests."""
 
1758
        server = self.get_readonly_server()
 
1759
        t = self._transport(server.get_url())
 
1760
        # No need to build a valid smart request here, the server will not even
 
1761
        # try to interpret it.
 
1762
        self.assertRaises(errors.SmartProtocolError,
 
1763
                          t.get_smart_medium().send_http_smart_request,
 
1764
                          'whatever')
 
1765
 
 
1766
class Test_redirected_to(tests.TestCase):
 
1767
 
 
1768
    def test_redirected_to_subdir(self):
 
1769
        t = self._transport('http://www.example.com/foo')
 
1770
        r = t._redirected_to('http://www.example.com/foo',
 
1771
                             'http://www.example.com/foo/subdir')
 
1772
        self.assertIsInstance(r, type(t))
 
1773
        # Both transports share the some connection
 
1774
        self.assertEquals(t._get_connection(), r._get_connection())
 
1775
 
 
1776
    def test_redirected_to_self_with_slash(self):
 
1777
        t = self._transport('http://www.example.com/foo')
 
1778
        r = t._redirected_to('http://www.example.com/foo',
 
1779
                             'http://www.example.com/foo/')
 
1780
        self.assertIsInstance(r, type(t))
 
1781
        # Both transports share the some connection (one can argue that we
 
1782
        # should return the exact same transport here, but that seems
 
1783
        # overkill).
 
1784
        self.assertEquals(t._get_connection(), r._get_connection())
 
1785
 
 
1786
    def test_redirected_to_host(self):
 
1787
        t = self._transport('http://www.example.com/foo')
 
1788
        r = t._redirected_to('http://www.example.com/foo',
 
1789
                             'http://foo.example.com/foo/subdir')
 
1790
        self.assertIsInstance(r, type(t))
 
1791
 
 
1792
    def test_redirected_to_same_host_sibling_protocol(self):
 
1793
        t = self._transport('http://www.example.com/foo')
 
1794
        r = t._redirected_to('http://www.example.com/foo',
 
1795
                             'https://www.example.com/foo')
 
1796
        self.assertIsInstance(r, type(t))
 
1797
 
 
1798
    def test_redirected_to_same_host_different_protocol(self):
 
1799
        t = self._transport('http://www.example.com/foo')
 
1800
        r = t._redirected_to('http://www.example.com/foo',
 
1801
                             'ftp://www.example.com/foo')
 
1802
        self.assertNotEquals(type(r), type(t))
 
1803
 
 
1804
    def test_redirected_to_different_host_same_user(self):
 
1805
        t = self._transport('http://joe@www.example.com/foo')
 
1806
        r = t._redirected_to('http://www.example.com/foo',
 
1807
                             'https://foo.example.com/foo')
 
1808
        self.assertIsInstance(r, type(t))
 
1809
        self.assertEquals(t._user, r._user)