~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testhttp.py

Added support for branch nicks

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for HTTP implementations.
18
 
 
19
 
This module defines a load_tests() method that parametrize tests classes for
20
 
transport implementation, http protocol versions and authentication schemes.
21
 
"""
22
 
 
23
 
# TODO: Should be renamed to bzrlib.transport.http.tests?
24
 
# TODO: What about renaming to bzrlib.tests.transport.http ?
25
 
 
26
 
from cStringIO import StringIO
27
 
import httplib
28
 
import os
29
 
import select
30
 
import SimpleHTTPServer
31
 
import socket
32
 
import sys
33
 
import threading
34
 
 
35
 
import bzrlib
36
 
from bzrlib import (
37
 
    bzrdir,
38
 
    config,
39
 
    errors,
40
 
    osutils,
41
 
    remote as _mod_remote,
42
 
    tests,
43
 
    transport,
44
 
    ui,
45
 
    urlutils,
46
 
    )
47
 
from bzrlib.symbol_versioning import (
48
 
    deprecated_in,
49
 
    )
50
 
from bzrlib.tests import (
51
 
    http_server,
52
 
    http_utils,
53
 
    )
54
 
from bzrlib.transport import (
55
 
    http,
56
 
    remote,
57
 
    )
58
 
from bzrlib.transport.http import (
59
 
    _urllib,
60
 
    _urllib2_wrappers,
61
 
    )
62
 
 
63
 
 
64
 
try:
65
 
    from bzrlib.transport.http._pycurl import PyCurlTransport
66
 
    pycurl_present = True
67
 
except errors.DependencyNotPresent:
68
 
    pycurl_present = False
69
 
 
70
 
 
71
 
def load_tests(standard_tests, module, loader):
72
 
    """Multiply tests for http clients and protocol versions."""
73
 
    result = loader.suiteClass()
74
 
 
75
 
    # one for each transport implementation
76
 
    t_tests, remaining_tests = tests.split_suite_by_condition(
77
 
        standard_tests, tests.condition_isinstance((
78
 
                TestHttpTransportRegistration,
79
 
                TestHttpTransportUrls,
80
 
                Test_redirected_to,
81
 
                )))
82
 
    transport_scenarios = [
83
 
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
84
 
                        _server=http_server.HttpServer_urllib,
85
 
                        _qualified_prefix='http+urllib',)),
86
 
        ]
87
 
    if pycurl_present:
88
 
        transport_scenarios.append(
89
 
            ('pycurl', dict(_transport=PyCurlTransport,
90
 
                            _server=http_server.HttpServer_PyCurl,
91
 
                            _qualified_prefix='http+pycurl',)))
92
 
    tests.multiply_tests(t_tests, transport_scenarios, result)
93
 
 
94
 
    # each implementation tested with each HTTP version
95
 
    tp_tests, remaining_tests = tests.split_suite_by_condition(
96
 
        remaining_tests, tests.condition_isinstance((
97
 
                SmartHTTPTunnellingTest,
98
 
                TestDoCatchRedirections,
99
 
                TestHTTPConnections,
100
 
                TestHTTPRedirections,
101
 
                TestHTTPSilentRedirections,
102
 
                TestLimitedRangeRequestServer,
103
 
                TestPost,
104
 
                TestProxyHttpServer,
105
 
                TestRanges,
106
 
                TestSpecificRequestHandler,
107
 
                )))
108
 
    protocol_scenarios = [
109
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
110
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
111
 
            ]
112
 
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
113
 
                                            protocol_scenarios)
114
 
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
 
 
116
 
    # proxy auth: each auth scheme on all http versions on all implementations.
117
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
118
 
        remaining_tests, tests.condition_isinstance((
119
 
                TestProxyAuth,
120
 
                )))
121
 
    proxy_auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
123
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
124
 
        ('basicdigest',
125
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
126
 
        ]
127
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
128
 
                                              proxy_auth_scheme_scenarios)
129
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
130
 
 
131
 
    # auth: each auth scheme on all http versions on all implementations.
132
 
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
133
 
        remaining_tests, tests.condition_isinstance((
134
 
                TestAuth,
135
 
                )))
136
 
    auth_scheme_scenarios = [
137
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
138
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
139
 
        ('basicdigest',
140
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
141
 
        ]
142
 
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
143
 
                                             auth_scheme_scenarios)
144
 
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
145
 
 
146
 
    # activity: on all http[s] versions on all implementations
147
 
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
148
 
        remaining_tests, tests.condition_isinstance((
149
 
                TestActivity,
150
 
                )))
151
 
    activity_scenarios = [
152
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
153
 
                             _transport=_urllib.HttpTransport_urllib,)),
154
 
        ]
155
 
    if tests.HTTPSServerFeature.available():
156
 
        activity_scenarios.append(
157
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
158
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
159
 
    if pycurl_present:
160
 
        activity_scenarios.append(
161
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
162
 
                                 _transport=PyCurlTransport,)),)
163
 
        if tests.HTTPSServerFeature.available():
164
 
            from bzrlib.tests import (
165
 
                ssl_certs,
166
 
                )
167
 
            # FIXME: Until we have a better way to handle self-signed
168
 
            # certificates (like allowing them in a test specific
169
 
            # authentication.conf for example), we need some specialized pycurl
170
 
            # transport for tests.
171
 
            class HTTPS_pycurl_transport(PyCurlTransport):
172
 
 
173
 
                def __init__(self, base, _from_transport=None):
174
 
                    super(HTTPS_pycurl_transport, self).__init__(
175
 
                        base, _from_transport)
176
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
177
 
 
178
 
            activity_scenarios.append(
179
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
180
 
                                      _transport=HTTPS_pycurl_transport,)),)
181
 
 
182
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
183
 
                                               protocol_scenarios)
184
 
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
185
 
 
186
 
    # No parametrization for the remaining tests
187
 
    result.addTests(remaining_tests)
188
 
 
189
 
    return result
190
 
 
191
 
 
192
 
class FakeManager(object):
193
 
 
194
 
    def __init__(self):
195
 
        self.credentials = []
196
 
 
197
 
    def add_password(self, realm, host, username, password):
198
 
        self.credentials.append([realm, host, username, password])
199
 
 
200
 
 
201
 
class RecordingServer(object):
202
 
    """A fake HTTP server.
203
 
 
204
 
    It records the bytes sent to it, and replies with a 200.
205
 
    """
206
 
 
207
 
    def __init__(self, expect_body_tail=None):
208
 
        """Constructor.
209
 
 
210
 
        :type expect_body_tail: str
211
 
        :param expect_body_tail: a reply won't be sent until this string is
212
 
            received.
213
 
        """
214
 
        self._expect_body_tail = expect_body_tail
215
 
        self.host = None
216
 
        self.port = None
217
 
        self.received_bytes = ''
218
 
 
219
 
    def setUp(self):
220
 
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
221
 
        self._sock.bind(('127.0.0.1', 0))
222
 
        self.host, self.port = self._sock.getsockname()
223
 
        self._ready = threading.Event()
224
 
        self._thread = threading.Thread(target=self._accept_read_and_reply)
225
 
        self._thread.setDaemon(True)
226
 
        self._thread.start()
227
 
        self._ready.wait(5)
228
 
 
229
 
    def _accept_read_and_reply(self):
230
 
        self._sock.listen(1)
231
 
        self._ready.set()
232
 
        self._sock.settimeout(5)
233
 
        try:
234
 
            conn, address = self._sock.accept()
235
 
            # On win32, the accepted connection will be non-blocking to start
236
 
            # with because we're using settimeout.
237
 
            conn.setblocking(True)
238
 
            while not self.received_bytes.endswith(self._expect_body_tail):
239
 
                self.received_bytes += conn.recv(4096)
240
 
            conn.sendall('HTTP/1.1 200 OK\r\n')
241
 
        except socket.timeout:
242
 
            # Make sure the client isn't stuck waiting for us to e.g. accept.
243
 
            self._sock.close()
244
 
        except socket.error:
245
 
            # The client may have already closed the socket.
246
 
            pass
247
 
 
248
 
    def tearDown(self):
249
 
        try:
250
 
            self._sock.close()
251
 
        except socket.error:
252
 
            # We might have already closed it.  We don't care.
253
 
            pass
254
 
        self.host = None
255
 
        self.port = None
256
 
 
257
 
 
258
 
class TestAuthHeader(tests.TestCase):
259
 
 
260
 
    def parse_header(self, header, auth_handler_class=None):
261
 
        if auth_handler_class is None:
262
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
263
 
        self.auth_handler =  auth_handler_class()
264
 
        return self.auth_handler._parse_auth_header(header)
265
 
 
266
 
    def test_empty_header(self):
267
 
        scheme, remainder = self.parse_header('')
268
 
        self.assertEquals('', scheme)
269
 
        self.assertIs(None, remainder)
270
 
 
271
 
    def test_negotiate_header(self):
272
 
        scheme, remainder = self.parse_header('Negotiate')
273
 
        self.assertEquals('negotiate', scheme)
274
 
        self.assertIs(None, remainder)
275
 
 
276
 
    def test_basic_header(self):
277
 
        scheme, remainder = self.parse_header(
278
 
            'Basic realm="Thou should not pass"')
279
 
        self.assertEquals('basic', scheme)
280
 
        self.assertEquals('realm="Thou should not pass"', remainder)
281
 
 
282
 
    def test_basic_extract_realm(self):
283
 
        scheme, remainder = self.parse_header(
284
 
            'Basic realm="Thou should not pass"',
285
 
            _urllib2_wrappers.BasicAuthHandler)
286
 
        match, realm = self.auth_handler.extract_realm(remainder)
287
 
        self.assertTrue(match is not None)
288
 
        self.assertEquals('Thou should not pass', realm)
289
 
 
290
 
    def test_digest_header(self):
291
 
        scheme, remainder = self.parse_header(
292
 
            'Digest realm="Thou should not pass"')
293
 
        self.assertEquals('digest', scheme)
294
 
        self.assertEquals('realm="Thou should not pass"', remainder)
295
 
 
296
 
 
297
 
class TestHTTPServer(tests.TestCase):
298
 
    """Test the HTTP servers implementations."""
299
 
 
300
 
    def test_invalid_protocol(self):
301
 
        class BogusRequestHandler(http_server.TestingHTTPRequestHandler):
302
 
 
303
 
            protocol_version = 'HTTP/0.1'
304
 
 
305
 
        server = http_server.HttpServer(BogusRequestHandler)
306
 
        try:
307
 
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
308
 
        except:
309
 
            server.tearDown()
310
 
            self.fail('HTTP Server creation did not raise UnknownProtocol')
311
 
 
312
 
    def test_force_invalid_protocol(self):
313
 
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
314
 
        try:
315
 
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
316
 
        except:
317
 
            server.tearDown()
318
 
            self.fail('HTTP Server creation did not raise UnknownProtocol')
319
 
 
320
 
    def test_server_start_and_stop(self):
321
 
        server = http_server.HttpServer()
322
 
        server.setUp()
323
 
        self.assertTrue(server._http_running)
324
 
        server.tearDown()
325
 
        self.assertFalse(server._http_running)
326
 
 
327
 
    def test_create_http_server_one_zero(self):
328
 
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
329
 
 
330
 
            protocol_version = 'HTTP/1.0'
331
 
 
332
 
        server = http_server.HttpServer(RequestHandlerOneZero)
333
 
        server.setUp()
334
 
        self.addCleanup(server.tearDown)
335
 
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
336
 
 
337
 
    def test_create_http_server_one_one(self):
338
 
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
339
 
 
340
 
            protocol_version = 'HTTP/1.1'
341
 
 
342
 
        server = http_server.HttpServer(RequestHandlerOneOne)
343
 
        server.setUp()
344
 
        self.addCleanup(server.tearDown)
345
 
        self.assertIsInstance(server._httpd,
346
 
                              http_server.TestingThreadingHTTPServer)
347
 
 
348
 
    def test_create_http_server_force_one_one(self):
349
 
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
350
 
 
351
 
            protocol_version = 'HTTP/1.0'
352
 
 
353
 
        server = http_server.HttpServer(RequestHandlerOneZero,
354
 
                                        protocol_version='HTTP/1.1')
355
 
        server.setUp()
356
 
        self.addCleanup(server.tearDown)
357
 
        self.assertIsInstance(server._httpd,
358
 
                              http_server.TestingThreadingHTTPServer)
359
 
 
360
 
    def test_create_http_server_force_one_zero(self):
361
 
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
362
 
 
363
 
            protocol_version = 'HTTP/1.1'
364
 
 
365
 
        server = http_server.HttpServer(RequestHandlerOneOne,
366
 
                                        protocol_version='HTTP/1.0')
367
 
        server.setUp()
368
 
        self.addCleanup(server.tearDown)
369
 
        self.assertIsInstance(server._httpd,
370
 
                              http_server.TestingHTTPServer)
371
 
 
372
 
 
373
 
class TestWithTransport_pycurl(object):
374
 
    """Test case to inherit from if pycurl is present"""
375
 
 
376
 
    def _get_pycurl_maybe(self):
377
 
        try:
378
 
            from bzrlib.transport.http._pycurl import PyCurlTransport
379
 
            return PyCurlTransport
380
 
        except errors.DependencyNotPresent:
381
 
            raise tests.TestSkipped('pycurl not present')
382
 
 
383
 
    _transport = property(_get_pycurl_maybe)
384
 
 
385
 
 
386
 
class TestHttpUrls(tests.TestCase):
387
 
 
388
 
    # TODO: This should be moved to authorization tests once they
389
 
    # are written.
390
 
 
391
 
    def test_url_parsing(self):
392
 
        f = FakeManager()
393
 
        url = http.extract_auth('http://example.com', f)
394
 
        self.assertEquals('http://example.com', url)
395
 
        self.assertEquals(0, len(f.credentials))
396
 
        url = http.extract_auth(
397
 
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
398
 
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
399
 
        self.assertEquals(1, len(f.credentials))
400
 
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
401
 
                          f.credentials[0])
402
 
 
403
 
 
404
 
class TestHttpTransportUrls(tests.TestCase):
405
 
    """Test the http urls."""
406
 
 
 
1
# (C) 2005 Canonical
 
2
 
 
3
from bzrlib.selftest import TestCase
 
4
from bzrlib.transport.http import HttpTransport
 
5
 
 
6
class TestHttpUrls(TestCase):
407
7
    def test_abs_url(self):
408
8
        """Construction of absolute http URLs"""
409
 
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
9
        t = HttpTransport('http://bazaar-ng.org/bzr/bzr.dev/')
410
10
        eq = self.assertEqualDiff
411
 
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
412
 
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
413
 
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
 
11
        eq(t.abspath('.'),
 
12
           'http://bazaar-ng.org/bzr/bzr.dev')
 
13
        eq(t.abspath('foo/bar'), 
 
14
           'http://bazaar-ng.org/bzr/bzr.dev/foo/bar')
 
15
        eq(t.abspath('.bzr'),
 
16
           'http://bazaar-ng.org/bzr/bzr.dev/.bzr')
414
17
        eq(t.abspath('.bzr/1//2/./3'),
415
 
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
 
18
           'http://bazaar-ng.org/bzr/bzr.dev/.bzr/1/2/3')
416
19
 
417
20
    def test_invalid_http_urls(self):
418
21
        """Trap invalid construction of urls"""
419
 
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
420
 
        self.assertRaises(errors.InvalidURL,
421
 
                          self._transport,
422
 
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
 
22
        t = HttpTransport('http://bazaar-ng.org/bzr/bzr.dev/')
 
23
        self.assertRaises(ValueError,
 
24
            t.abspath,
 
25
            '.bzr/')
 
26
        self.assertRaises(ValueError,
 
27
            t.abspath,
 
28
            '/.bzr')
423
29
 
424
30
    def test_http_root_urls(self):
425
31
        """Construction of URLs from server root"""
426
 
        t = self._transport('http://bzr.ozlabs.org/')
 
32
        t = HttpTransport('http://bzr.ozlabs.org/')
427
33
        eq = self.assertEqualDiff
428
34
        eq(t.abspath('.bzr/tree-version'),
429
35
           'http://bzr.ozlabs.org/.bzr/tree-version')
430
 
 
431
 
    def test_http_impl_urls(self):
432
 
        """There are servers which ask for particular clients to connect"""
433
 
        server = self._server()
434
 
        try:
435
 
            server.setUp()
436
 
            url = server.get_url()
437
 
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
438
 
        finally:
439
 
            server.tearDown()
440
 
 
441
 
 
442
 
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
443
 
 
444
 
    # TODO: This should really be moved into another pycurl
445
 
    # specific test. When https tests will be implemented, take
446
 
    # this one into account.
447
 
    def test_pycurl_without_https_support(self):
448
 
        """Test that pycurl without SSL do not fail with a traceback.
449
 
 
450
 
        For the purpose of the test, we force pycurl to ignore
451
 
        https by supplying a fake version_info that do not
452
 
        support it.
453
 
        """
454
 
        try:
455
 
            import pycurl
456
 
        except ImportError:
457
 
            raise tests.TestSkipped('pycurl not present')
458
 
 
459
 
        version_info_orig = pycurl.version_info
460
 
        try:
461
 
            # Now that we have pycurl imported, we can fake its version_info
462
 
            # This was taken from a windows pycurl without SSL
463
 
            # (thanks to bialix)
464
 
            pycurl.version_info = lambda : (2,
465
 
                                            '7.13.2',
466
 
                                            462082,
467
 
                                            'i386-pc-win32',
468
 
                                            2576,
469
 
                                            None,
470
 
                                            0,
471
 
                                            None,
472
 
                                            ('ftp', 'gopher', 'telnet',
473
 
                                             'dict', 'ldap', 'http', 'file'),
474
 
                                            None,
475
 
                                            0,
476
 
                                            None)
477
 
            self.assertRaises(errors.DependencyNotPresent, self._transport,
478
 
                              'https://launchpad.net')
479
 
        finally:
480
 
            # Restore the right function
481
 
            pycurl.version_info = version_info_orig
482
 
 
483
 
 
484
 
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
485
 
    """Test the http connections."""
486
 
 
487
 
    def setUp(self):
488
 
        http_utils.TestCaseWithWebserver.setUp(self)
489
 
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
490
 
                        transport=self.get_transport())
491
 
 
492
 
    def test_http_has(self):
493
 
        server = self.get_readonly_server()
494
 
        t = self._transport(server.get_url())
495
 
        self.assertEqual(t.has('foo/bar'), True)
496
 
        self.assertEqual(len(server.logs), 1)
497
 
        self.assertContainsRe(server.logs[0],
498
 
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
499
 
 
500
 
    def test_http_has_not_found(self):
501
 
        server = self.get_readonly_server()
502
 
        t = self._transport(server.get_url())
503
 
        self.assertEqual(t.has('not-found'), False)
504
 
        self.assertContainsRe(server.logs[1],
505
 
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
506
 
 
507
 
    def test_http_get(self):
508
 
        server = self.get_readonly_server()
509
 
        t = self._transport(server.get_url())
510
 
        fp = t.get('foo/bar')
511
 
        self.assertEqualDiff(
512
 
            fp.read(),
513
 
            'contents of foo/bar\n')
514
 
        self.assertEqual(len(server.logs), 1)
515
 
        self.assertTrue(server.logs[0].find(
516
 
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
517
 
            % bzrlib.__version__) > -1)
518
 
 
519
 
    def test_has_on_bogus_host(self):
520
 
        # Get a free address and don't 'accept' on it, so that we
521
 
        # can be sure there is no http handler there, but set a
522
 
        # reasonable timeout to not slow down tests too much.
523
 
        default_timeout = socket.getdefaulttimeout()
524
 
        try:
525
 
            socket.setdefaulttimeout(2)
526
 
            s = socket.socket()
527
 
            s.bind(('localhost', 0))
528
 
            t = self._transport('http://%s:%s/' % s.getsockname())
529
 
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
530
 
        finally:
531
 
            socket.setdefaulttimeout(default_timeout)
532
 
 
533
 
 
534
 
class TestHttpTransportRegistration(tests.TestCase):
535
 
    """Test registrations of various http implementations"""
536
 
 
537
 
    def test_http_registered(self):
538
 
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
539
 
        self.assertIsInstance(t, transport.Transport)
540
 
        self.assertIsInstance(t, self._transport)
541
 
 
542
 
 
543
 
class TestPost(tests.TestCase):
544
 
 
545
 
    def test_post_body_is_received(self):
546
 
        server = RecordingServer(expect_body_tail='end-of-body')
547
 
        server.setUp()
548
 
        self.addCleanup(server.tearDown)
549
 
        scheme = self._qualified_prefix
550
 
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
551
 
        http_transport = self._transport(url)
552
 
        code, response = http_transport._post('abc def end-of-body')
553
 
        self.assertTrue(
554
 
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
555
 
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
556
 
        # The transport should not be assuming that the server can accept
557
 
        # chunked encoding the first time it connects, because HTTP/1.1, so we
558
 
        # check for the literal string.
559
 
        self.assertTrue(
560
 
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
561
 
 
562
 
 
563
 
class TestRangeHeader(tests.TestCase):
564
 
    """Test range_header method"""
565
 
 
566
 
    def check_header(self, value, ranges=[], tail=0):
567
 
        offsets = [ (start, end - start + 1) for start, end in ranges]
568
 
        coalesce = transport.Transport._coalesce_offsets
569
 
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
570
 
        range_header = http.HttpTransportBase._range_header
571
 
        self.assertEqual(value, range_header(coalesced, tail))
572
 
 
573
 
    def test_range_header_single(self):
574
 
        self.check_header('0-9', ranges=[(0,9)])
575
 
        self.check_header('100-109', ranges=[(100,109)])
576
 
 
577
 
    def test_range_header_tail(self):
578
 
        self.check_header('-10', tail=10)
579
 
        self.check_header('-50', tail=50)
580
 
 
581
 
    def test_range_header_multi(self):
582
 
        self.check_header('0-9,100-200,300-5000',
583
 
                          ranges=[(0,9), (100, 200), (300,5000)])
584
 
 
585
 
    def test_range_header_mixed(self):
586
 
        self.check_header('0-9,300-5000,-50',
587
 
                          ranges=[(0,9), (300,5000)],
588
 
                          tail=50)
589
 
 
590
 
 
591
 
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
592
 
    """Tests a specific request handler.
593
 
 
594
 
    Daughter classes are expected to override _req_handler_class
595
 
    """
596
 
 
597
 
    # Provide a useful default
598
 
    _req_handler_class = http_server.TestingHTTPRequestHandler
599
 
 
600
 
    def create_transport_readonly_server(self):
601
 
        return http_server.HttpServer(self._req_handler_class,
602
 
                                      protocol_version=self._protocol_version)
603
 
 
604
 
    def _testing_pycurl(self):
605
 
        return pycurl_present and self._transport == PyCurlTransport
606
 
 
607
 
 
608
 
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
609
 
    """Whatever request comes in, close the connection"""
610
 
 
611
 
    def handle_one_request(self):
612
 
        """Handle a single HTTP request, by abruptly closing the connection"""
613
 
        self.close_connection = 1
614
 
 
615
 
 
616
 
class TestWallServer(TestSpecificRequestHandler):
617
 
    """Tests exceptions during the connection phase"""
618
 
 
619
 
    _req_handler_class = WallRequestHandler
620
 
 
621
 
    def test_http_has(self):
622
 
        server = self.get_readonly_server()
623
 
        t = self._transport(server.get_url())
624
 
        # Unfortunately httplib (see HTTPResponse._read_status
625
 
        # for details) make no distinction between a closed
626
 
        # socket and badly formatted status line, so we can't
627
 
        # just test for ConnectionError, we have to test
628
 
        # InvalidHttpResponse too.
629
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
630
 
                          t.has, 'foo/bar')
631
 
 
632
 
    def test_http_get(self):
633
 
        server = self.get_readonly_server()
634
 
        t = self._transport(server.get_url())
635
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
636
 
                          t.get, 'foo/bar')
637
 
 
638
 
 
639
 
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
640
 
    """Whatever request comes in, returns a bad status"""
641
 
 
642
 
    def parse_request(self):
643
 
        """Fakes handling a single HTTP request, returns a bad status"""
644
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
645
 
        self.send_response(0, "Bad status")
646
 
        self.close_connection = 1
647
 
        return False
648
 
 
649
 
 
650
 
class TestBadStatusServer(TestSpecificRequestHandler):
651
 
    """Tests bad status from server."""
652
 
 
653
 
    _req_handler_class = BadStatusRequestHandler
654
 
 
655
 
    def test_http_has(self):
656
 
        server = self.get_readonly_server()
657
 
        t = self._transport(server.get_url())
658
 
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
659
 
 
660
 
    def test_http_get(self):
661
 
        server = self.get_readonly_server()
662
 
        t = self._transport(server.get_url())
663
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
664
 
 
665
 
 
666
 
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
667
 
    """Whatever request comes in, returns an invalid status"""
668
 
 
669
 
    def parse_request(self):
670
 
        """Fakes handling a single HTTP request, returns a bad status"""
671
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
672
 
        self.wfile.write("Invalid status line\r\n")
673
 
        return False
674
 
 
675
 
 
676
 
class TestInvalidStatusServer(TestBadStatusServer):
677
 
    """Tests invalid status from server.
678
 
 
679
 
    Both implementations raises the same error as for a bad status.
680
 
    """
681
 
 
682
 
    _req_handler_class = InvalidStatusRequestHandler
683
 
 
684
 
    def test_http_has(self):
685
 
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
686
 
            raise tests.KnownFailure(
687
 
                'pycurl hangs if the server send back garbage')
688
 
        super(TestInvalidStatusServer, self).test_http_has()
689
 
 
690
 
    def test_http_get(self):
691
 
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
692
 
            raise tests.KnownFailure(
693
 
                'pycurl hangs if the server send back garbage')
694
 
        super(TestInvalidStatusServer, self).test_http_get()
695
 
 
696
 
 
697
 
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
698
 
    """Whatever request comes in, returns a bad protocol version"""
699
 
 
700
 
    def parse_request(self):
701
 
        """Fakes handling a single HTTP request, returns a bad status"""
702
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
703
 
        # Returns an invalid protocol version, but curl just
704
 
        # ignores it and those cannot be tested.
705
 
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
706
 
                                           404,
707
 
                                           'Look at my protocol version'))
708
 
        return False
709
 
 
710
 
 
711
 
class TestBadProtocolServer(TestSpecificRequestHandler):
712
 
    """Tests bad protocol from server."""
713
 
 
714
 
    _req_handler_class = BadProtocolRequestHandler
715
 
 
716
 
    def setUp(self):
717
 
        if pycurl_present and self._transport == PyCurlTransport:
718
 
            raise tests.TestNotApplicable(
719
 
                "pycurl doesn't check the protocol version")
720
 
        super(TestBadProtocolServer, self).setUp()
721
 
 
722
 
    def test_http_has(self):
723
 
        server = self.get_readonly_server()
724
 
        t = self._transport(server.get_url())
725
 
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
726
 
 
727
 
    def test_http_get(self):
728
 
        server = self.get_readonly_server()
729
 
        t = self._transport(server.get_url())
730
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
731
 
 
732
 
 
733
 
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
734
 
    """Whatever request comes in, returns a 403 code"""
735
 
 
736
 
    def parse_request(self):
737
 
        """Handle a single HTTP request, by replying we cannot handle it"""
738
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
739
 
        self.send_error(403)
740
 
        return False
741
 
 
742
 
 
743
 
class TestForbiddenServer(TestSpecificRequestHandler):
744
 
    """Tests forbidden server"""
745
 
 
746
 
    _req_handler_class = ForbiddenRequestHandler
747
 
 
748
 
    def test_http_has(self):
749
 
        server = self.get_readonly_server()
750
 
        t = self._transport(server.get_url())
751
 
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
752
 
 
753
 
    def test_http_get(self):
754
 
        server = self.get_readonly_server()
755
 
        t = self._transport(server.get_url())
756
 
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
757
 
 
758
 
 
759
 
class TestRecordingServer(tests.TestCase):
760
 
 
761
 
    def test_create(self):
762
 
        server = RecordingServer(expect_body_tail=None)
763
 
        self.assertEqual('', server.received_bytes)
764
 
        self.assertEqual(None, server.host)
765
 
        self.assertEqual(None, server.port)
766
 
 
767
 
    def test_setUp_and_tearDown(self):
768
 
        server = RecordingServer(expect_body_tail=None)
769
 
        server.setUp()
770
 
        try:
771
 
            self.assertNotEqual(None, server.host)
772
 
            self.assertNotEqual(None, server.port)
773
 
        finally:
774
 
            server.tearDown()
775
 
        self.assertEqual(None, server.host)
776
 
        self.assertEqual(None, server.port)
777
 
 
778
 
    def test_send_receive_bytes(self):
779
 
        server = RecordingServer(expect_body_tail='c')
780
 
        server.setUp()
781
 
        self.addCleanup(server.tearDown)
782
 
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
783
 
        sock.connect((server.host, server.port))
784
 
        sock.sendall('abc')
785
 
        self.assertEqual('HTTP/1.1 200 OK\r\n',
786
 
                         osutils.recv_all(sock, 4096))
787
 
        self.assertEqual('abc', server.received_bytes)
788
 
 
789
 
 
790
 
class TestRangeRequestServer(TestSpecificRequestHandler):
791
 
    """Tests readv requests against server.
792
 
 
793
 
    We test against default "normal" server.
794
 
    """
795
 
 
796
 
    def setUp(self):
797
 
        super(TestRangeRequestServer, self).setUp()
798
 
        self.build_tree_contents([('a', '0123456789')],)
799
 
 
800
 
    def test_readv(self):
801
 
        server = self.get_readonly_server()
802
 
        t = self._transport(server.get_url())
803
 
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
804
 
        self.assertEqual(l[0], (0, '0'))
805
 
        self.assertEqual(l[1], (1, '1'))
806
 
        self.assertEqual(l[2], (3, '34'))
807
 
        self.assertEqual(l[3], (9, '9'))
808
 
 
809
 
    def test_readv_out_of_order(self):
810
 
        server = self.get_readonly_server()
811
 
        t = self._transport(server.get_url())
812
 
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
813
 
        self.assertEqual(l[0], (1, '1'))
814
 
        self.assertEqual(l[1], (9, '9'))
815
 
        self.assertEqual(l[2], (0, '0'))
816
 
        self.assertEqual(l[3], (3, '34'))
817
 
 
818
 
    def test_readv_invalid_ranges(self):
819
 
        server = self.get_readonly_server()
820
 
        t = self._transport(server.get_url())
821
 
 
822
 
        # This is intentionally reading off the end of the file
823
 
        # since we are sure that it cannot get there
824
 
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
825
 
                              t.readv, 'a', [(1,1), (8,10)])
826
 
 
827
 
        # This is trying to seek past the end of the file, it should
828
 
        # also raise a special error
829
 
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
830
 
                              t.readv, 'a', [(12,2)])
831
 
 
832
 
    def test_readv_multiple_get_requests(self):
833
 
        server = self.get_readonly_server()
834
 
        t = self._transport(server.get_url())
835
 
        # force transport to issue multiple requests
836
 
        t._max_readv_combine = 1
837
 
        t._max_get_ranges = 1
838
 
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
839
 
        self.assertEqual(l[0], (0, '0'))
840
 
        self.assertEqual(l[1], (1, '1'))
841
 
        self.assertEqual(l[2], (3, '34'))
842
 
        self.assertEqual(l[3], (9, '9'))
843
 
        # The server should have issued 4 requests
844
 
        self.assertEqual(4, server.GET_request_nb)
845
 
 
846
 
    def test_readv_get_max_size(self):
847
 
        server = self.get_readonly_server()
848
 
        t = self._transport(server.get_url())
849
 
        # force transport to issue multiple requests by limiting the number of
850
 
        # bytes by request. Note that this apply to coalesced offsets only, a
851
 
        # single range will keep its size even if bigger than the limit.
852
 
        t._get_max_size = 2
853
 
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
854
 
        self.assertEqual(l[0], (0, '0'))
855
 
        self.assertEqual(l[1], (1, '1'))
856
 
        self.assertEqual(l[2], (2, '2345'))
857
 
        self.assertEqual(l[3], (6, '6789'))
858
 
        # The server should have issued 3 requests
859
 
        self.assertEqual(3, server.GET_request_nb)
860
 
 
861
 
    def test_complete_readv_leave_pipe_clean(self):
862
 
        server = self.get_readonly_server()
863
 
        t = self._transport(server.get_url())
864
 
        # force transport to issue multiple requests
865
 
        t._get_max_size = 2
866
 
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
867
 
        # The server should have issued 3 requests
868
 
        self.assertEqual(3, server.GET_request_nb)
869
 
        self.assertEqual('0123456789', t.get_bytes('a'))
870
 
        self.assertEqual(4, server.GET_request_nb)
871
 
 
872
 
    def test_incomplete_readv_leave_pipe_clean(self):
873
 
        server = self.get_readonly_server()
874
 
        t = self._transport(server.get_url())
875
 
        # force transport to issue multiple requests
876
 
        t._get_max_size = 2
877
 
        # Don't collapse readv results into a list so that we leave unread
878
 
        # bytes on the socket
879
 
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
880
 
        self.assertEqual((0, '0'), ireadv.next())
881
 
        # The server should have issued one request so far
882
 
        self.assertEqual(1, server.GET_request_nb)
883
 
        self.assertEqual('0123456789', t.get_bytes('a'))
884
 
        # get_bytes issued an additional request, the readv pending ones are
885
 
        # lost
886
 
        self.assertEqual(2, server.GET_request_nb)
887
 
 
888
 
 
889
 
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
890
 
    """Always reply to range request as if they were single.
891
 
 
892
 
    Don't be explicit about it, just to annoy the clients.
893
 
    """
894
 
 
895
 
    def get_multiple_ranges(self, file, file_size, ranges):
896
 
        """Answer as if it was a single range request and ignores the rest"""
897
 
        (start, end) = ranges[0]
898
 
        return self.get_single_range(file, file_size, start, end)
899
 
 
900
 
 
901
 
class TestSingleRangeRequestServer(TestRangeRequestServer):
902
 
    """Test readv against a server which accept only single range requests"""
903
 
 
904
 
    _req_handler_class = SingleRangeRequestHandler
905
 
 
906
 
 
907
 
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
908
 
    """Only reply to simple range requests, errors out on multiple"""
909
 
 
910
 
    def get_multiple_ranges(self, file, file_size, ranges):
911
 
        """Refuses the multiple ranges request"""
912
 
        if len(ranges) > 1:
913
 
            file.close()
914
 
            self.send_error(416, "Requested range not satisfiable")
915
 
            return
916
 
        (start, end) = ranges[0]
917
 
        return self.get_single_range(file, file_size, start, end)
918
 
 
919
 
 
920
 
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
921
 
    """Test readv against a server which only accept single range requests"""
922
 
 
923
 
    _req_handler_class = SingleOnlyRangeRequestHandler
924
 
 
925
 
 
926
 
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
927
 
    """Ignore range requests without notice"""
928
 
 
929
 
    def do_GET(self):
930
 
        # Update the statistics
931
 
        self.server.test_case_server.GET_request_nb += 1
932
 
        # Just bypass the range handling done by TestingHTTPRequestHandler
933
 
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
934
 
 
935
 
 
936
 
class TestNoRangeRequestServer(TestRangeRequestServer):
937
 
    """Test readv against a server which do not accept range requests"""
938
 
 
939
 
    _req_handler_class = NoRangeRequestHandler
940
 
 
941
 
 
942
 
class MultipleRangeWithoutContentLengthRequestHandler(
943
 
    http_server.TestingHTTPRequestHandler):
944
 
    """Reply to multiple range requests without content length header."""
945
 
 
946
 
    def get_multiple_ranges(self, file, file_size, ranges):
947
 
        self.send_response(206)
948
 
        self.send_header('Accept-Ranges', 'bytes')
949
 
        boundary = "%d" % random.randint(0,0x7FFFFFFF)
950
 
        self.send_header("Content-Type",
951
 
                         "multipart/byteranges; boundary=%s" % boundary)
952
 
        self.end_headers()
953
 
        for (start, end) in ranges:
954
 
            self.wfile.write("--%s\r\n" % boundary)
955
 
            self.send_header("Content-type", 'application/octet-stream')
956
 
            self.send_header("Content-Range", "bytes %d-%d/%d" % (start,
957
 
                                                                  end,
958
 
                                                                  file_size))
959
 
            self.end_headers()
960
 
            self.send_range_content(file, start, end - start + 1)
961
 
        # Final boundary
962
 
        self.wfile.write("--%s\r\n" % boundary)
963
 
 
964
 
 
965
 
class TestMultipleRangeWithoutContentLengthServer(TestRangeRequestServer):
966
 
 
967
 
    _req_handler_class = MultipleRangeWithoutContentLengthRequestHandler
968
 
 
969
 
 
970
 
class TruncatedMultipleRangeRequestHandler(
971
 
    http_server.TestingHTTPRequestHandler):
972
 
    """Reply to multiple range requests truncating the last ones.
973
 
 
974
 
    This server generates responses whose Content-Length describes all the
975
 
    ranges, but fail to include the last ones leading to client short reads.
976
 
    This has been observed randomly with lighttpd (bug #179368).
977
 
    """
978
 
 
979
 
    _truncated_ranges = 2
980
 
 
981
 
    def get_multiple_ranges(self, file, file_size, ranges):
982
 
        self.send_response(206)
983
 
        self.send_header('Accept-Ranges', 'bytes')
984
 
        boundary = 'tagada'
985
 
        self.send_header('Content-Type',
986
 
                         'multipart/byteranges; boundary=%s' % boundary)
987
 
        boundary_line = '--%s\r\n' % boundary
988
 
        # Calculate the Content-Length
989
 
        content_length = 0
990
 
        for (start, end) in ranges:
991
 
            content_length += len(boundary_line)
992
 
            content_length += self._header_line_length(
993
 
                'Content-type', 'application/octet-stream')
994
 
            content_length += self._header_line_length(
995
 
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
996
 
            content_length += len('\r\n') # end headers
997
 
            content_length += end - start # + 1
998
 
        content_length += len(boundary_line)
999
 
        self.send_header('Content-length', content_length)
1000
 
        self.end_headers()
1001
 
 
1002
 
        # Send the multipart body
1003
 
        cur = 0
1004
 
        for (start, end) in ranges:
1005
 
            self.wfile.write(boundary_line)
1006
 
            self.send_header('Content-type', 'application/octet-stream')
1007
 
            self.send_header('Content-Range', 'bytes %d-%d/%d'
1008
 
                             % (start, end, file_size))
1009
 
            self.end_headers()
1010
 
            if cur + self._truncated_ranges >= len(ranges):
1011
 
                # Abruptly ends the response and close the connection
1012
 
                self.close_connection = 1
1013
 
                return
1014
 
            self.send_range_content(file, start, end - start + 1)
1015
 
            cur += 1
1016
 
        # No final boundary
1017
 
        self.wfile.write(boundary_line)
1018
 
 
1019
 
 
1020
 
class TestTruncatedMultipleRangeServer(TestSpecificRequestHandler):
1021
 
 
1022
 
    _req_handler_class = TruncatedMultipleRangeRequestHandler
1023
 
 
1024
 
    def setUp(self):
1025
 
        super(TestTruncatedMultipleRangeServer, self).setUp()
1026
 
        self.build_tree_contents([('a', '0123456789')],)
1027
 
 
1028
 
    def test_readv_with_short_reads(self):
1029
 
        server = self.get_readonly_server()
1030
 
        t = self._transport(server.get_url())
1031
 
        # Force separate ranges for each offset
1032
 
        t._bytes_to_read_before_seek = 0
1033
 
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1034
 
        self.assertEqual((0, '0'), ireadv.next())
1035
 
        self.assertEqual((2, '2'), ireadv.next())
1036
 
        if not self._testing_pycurl():
1037
 
            # Only one request have been issued so far (except for pycurl that
1038
 
            # try to read the whole response at once)
1039
 
            self.assertEqual(1, server.GET_request_nb)
1040
 
        self.assertEqual((4, '45'), ireadv.next())
1041
 
        self.assertEqual((9, '9'), ireadv.next())
1042
 
        # Both implementations issue 3 requests but:
1043
 
        # - urllib does two multiple (4 ranges, then 2 ranges) then a single
1044
 
        #   range,
1045
 
        # - pycurl does two multiple (4 ranges, 4 ranges) then a single range
1046
 
        self.assertEqual(3, server.GET_request_nb)
1047
 
        # Finally the client have tried a single range request and stays in
1048
 
        # that mode
1049
 
        self.assertEqual('single', t._range_hint)
1050
 
 
1051
 
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1052
 
    """Errors out when range specifiers exceed the limit"""
1053
 
 
1054
 
    def get_multiple_ranges(self, file, file_size, ranges):
1055
 
        """Refuses the multiple ranges request"""
1056
 
        tcs = self.server.test_case_server
1057
 
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
1058
 
            file.close()
1059
 
            # Emulate apache behavior
1060
 
            self.send_error(400, "Bad Request")
1061
 
            return
1062
 
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
1063
 
            self, file, file_size, ranges)
1064
 
 
1065
 
 
1066
 
class LimitedRangeHTTPServer(http_server.HttpServer):
1067
 
    """An HttpServer erroring out on requests with too much range specifiers"""
1068
 
 
1069
 
    def __init__(self, request_handler=LimitedRangeRequestHandler,
1070
 
                 protocol_version=None,
1071
 
                 range_limit=None):
1072
 
        http_server.HttpServer.__init__(self, request_handler,
1073
 
                                        protocol_version=protocol_version)
1074
 
        self.range_limit = range_limit
1075
 
 
1076
 
 
1077
 
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
1078
 
    """Tests readv requests against a server erroring out on too much ranges."""
1079
 
 
1080
 
    # Requests with more range specifiers will error out
1081
 
    range_limit = 3
1082
 
 
1083
 
    def create_transport_readonly_server(self):
1084
 
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
1085
 
                                      protocol_version=self._protocol_version)
1086
 
 
1087
 
    def get_transport(self):
1088
 
        return self._transport(self.get_readonly_server().get_url())
1089
 
 
1090
 
    def setUp(self):
1091
 
        http_utils.TestCaseWithWebserver.setUp(self)
1092
 
        # We need to manipulate ranges that correspond to real chunks in the
1093
 
        # response, so we build a content appropriately.
1094
 
        filler = ''.join(['abcdefghij' for x in range(102)])
1095
 
        content = ''.join(['%04d' % v + filler for v in range(16)])
1096
 
        self.build_tree_contents([('a', content)],)
1097
 
 
1098
 
    def test_few_ranges(self):
1099
 
        t = self.get_transport()
1100
 
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
1101
 
        self.assertEqual(l[0], (0, '0000'))
1102
 
        self.assertEqual(l[1], (1024, '0001'))
1103
 
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
1104
 
 
1105
 
    def test_more_ranges(self):
1106
 
        t = self.get_transport()
1107
 
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
1108
 
        self.assertEqual(l[0], (0, '0000'))
1109
 
        self.assertEqual(l[1], (1024, '0001'))
1110
 
        self.assertEqual(l[2], (4096, '0004'))
1111
 
        self.assertEqual(l[3], (8192, '0008'))
1112
 
        # The server will refuse to serve the first request (too much ranges),
1113
 
        # a second request will succeed.
1114
 
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
1115
 
 
1116
 
 
1117
 
class TestHttpProxyWhiteBox(tests.TestCase):
1118
 
    """Whitebox test proxy http authorization.
1119
 
 
1120
 
    Only the urllib implementation is tested here.
1121
 
    """
1122
 
 
1123
 
    def setUp(self):
1124
 
        tests.TestCase.setUp(self)
1125
 
        self._old_env = {}
1126
 
 
1127
 
    def tearDown(self):
1128
 
        self._restore_env()
1129
 
        tests.TestCase.tearDown(self)
1130
 
 
1131
 
    def _install_env(self, env):
1132
 
        for name, value in env.iteritems():
1133
 
            self._old_env[name] = osutils.set_or_unset_env(name, value)
1134
 
 
1135
 
    def _restore_env(self):
1136
 
        for name, value in self._old_env.iteritems():
1137
 
            osutils.set_or_unset_env(name, value)
1138
 
 
1139
 
    def _proxied_request(self):
1140
 
        handler = _urllib2_wrappers.ProxyHandler()
1141
 
        request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1142
 
        handler.set_proxy(request, 'http')
1143
 
        return request
1144
 
 
1145
 
    def test_empty_user(self):
1146
 
        self._install_env({'http_proxy': 'http://bar.com'})
1147
 
        request = self._proxied_request()
1148
 
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
1149
 
 
1150
 
    def test_invalid_proxy(self):
1151
 
        """A proxy env variable without scheme"""
1152
 
        self._install_env({'http_proxy': 'host:1234'})
1153
 
        self.assertRaises(errors.InvalidURL, self._proxied_request)
1154
 
 
1155
 
 
1156
 
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1157
 
    """Tests proxy server.
1158
 
 
1159
 
    Be aware that we do not setup a real proxy here. Instead, we
1160
 
    check that the *connection* goes through the proxy by serving
1161
 
    different content (the faked proxy server append '-proxied'
1162
 
    to the file names).
1163
 
    """
1164
 
 
1165
 
    # FIXME: We don't have an https server available, so we don't
1166
 
    # test https connections.
1167
 
 
1168
 
    def setUp(self):
1169
 
        super(TestProxyHttpServer, self).setUp()
1170
 
        self.build_tree_contents([('foo', 'contents of foo\n'),
1171
 
                                  ('foo-proxied', 'proxied contents of foo\n')])
1172
 
        # Let's setup some attributes for tests
1173
 
        self.server = self.get_readonly_server()
1174
 
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1175
 
        if self._testing_pycurl():
1176
 
            # Oh my ! pycurl does not check for the port as part of
1177
 
            # no_proxy :-( So we just test the host part
1178
 
            self.no_proxy_host = 'localhost'
1179
 
        else:
1180
 
            self.no_proxy_host = self.proxy_address
1181
 
        # The secondary server is the proxy
1182
 
        self.proxy = self.get_secondary_server()
1183
 
        self.proxy_url = self.proxy.get_url()
1184
 
        self._old_env = {}
1185
 
 
1186
 
    def _testing_pycurl(self):
1187
 
        return pycurl_present and self._transport == PyCurlTransport
1188
 
 
1189
 
    def create_transport_secondary_server(self):
1190
 
        """Creates an http server that will serve files with
1191
 
        '-proxied' appended to their names.
1192
 
        """
1193
 
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
1194
 
 
1195
 
    def _install_env(self, env):
1196
 
        for name, value in env.iteritems():
1197
 
            self._old_env[name] = osutils.set_or_unset_env(name, value)
1198
 
 
1199
 
    def _restore_env(self):
1200
 
        for name, value in self._old_env.iteritems():
1201
 
            osutils.set_or_unset_env(name, value)
1202
 
 
1203
 
    def proxied_in_env(self, env):
1204
 
        self._install_env(env)
1205
 
        url = self.server.get_url()
1206
 
        t = self._transport(url)
1207
 
        try:
1208
 
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1209
 
        finally:
1210
 
            self._restore_env()
1211
 
 
1212
 
    def not_proxied_in_env(self, env):
1213
 
        self._install_env(env)
1214
 
        url = self.server.get_url()
1215
 
        t = self._transport(url)
1216
 
        try:
1217
 
            self.assertEqual('contents of foo\n', t.get('foo').read())
1218
 
        finally:
1219
 
            self._restore_env()
1220
 
 
1221
 
    def test_http_proxy(self):
1222
 
        self.proxied_in_env({'http_proxy': self.proxy_url})
1223
 
 
1224
 
    def test_HTTP_PROXY(self):
1225
 
        if self._testing_pycurl():
1226
 
            # pycurl does not check HTTP_PROXY for security reasons
1227
 
            # (for use in a CGI context that we do not care
1228
 
            # about. Should we ?)
1229
 
            raise tests.TestNotApplicable(
1230
 
                'pycurl does not check HTTP_PROXY for security reasons')
1231
 
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1232
 
 
1233
 
    def test_all_proxy(self):
1234
 
        self.proxied_in_env({'all_proxy': self.proxy_url})
1235
 
 
1236
 
    def test_ALL_PROXY(self):
1237
 
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1238
 
 
1239
 
    def test_http_proxy_with_no_proxy(self):
1240
 
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
1241
 
                                 'no_proxy': self.no_proxy_host})
1242
 
 
1243
 
    def test_HTTP_PROXY_with_NO_PROXY(self):
1244
 
        if self._testing_pycurl():
1245
 
            raise tests.TestNotApplicable(
1246
 
                'pycurl does not check HTTP_PROXY for security reasons')
1247
 
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1248
 
                                 'NO_PROXY': self.no_proxy_host})
1249
 
 
1250
 
    def test_all_proxy_with_no_proxy(self):
1251
 
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
1252
 
                                 'no_proxy': self.no_proxy_host})
1253
 
 
1254
 
    def test_ALL_PROXY_with_NO_PROXY(self):
1255
 
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1256
 
                                 'NO_PROXY': self.no_proxy_host})
1257
 
 
1258
 
    def test_http_proxy_without_scheme(self):
1259
 
        if self._testing_pycurl():
1260
 
            # pycurl *ignores* invalid proxy env variables. If that ever change
1261
 
            # in the future, this test will fail indicating that pycurl do not
1262
 
            # ignore anymore such variables.
1263
 
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
1264
 
        else:
1265
 
            self.assertRaises(errors.InvalidURL,
1266
 
                              self.proxied_in_env,
1267
 
                              {'http_proxy': self.proxy_address})
1268
 
 
1269
 
 
1270
 
class TestRanges(http_utils.TestCaseWithWebserver):
1271
 
    """Test the Range header in GET methods."""
1272
 
 
1273
 
    def setUp(self):
1274
 
        http_utils.TestCaseWithWebserver.setUp(self)
1275
 
        self.build_tree_contents([('a', '0123456789')],)
1276
 
        server = self.get_readonly_server()
1277
 
        self.transport = self._transport(server.get_url())
1278
 
 
1279
 
    def create_transport_readonly_server(self):
1280
 
        return http_server.HttpServer(protocol_version=self._protocol_version)
1281
 
 
1282
 
    def _file_contents(self, relpath, ranges):
1283
 
        offsets = [ (start, end - start + 1) for start, end in ranges]
1284
 
        coalesce = self.transport._coalesce_offsets
1285
 
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1286
 
        code, data = self.transport._get(relpath, coalesced)
1287
 
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1288
 
        for start, end in ranges:
1289
 
            data.seek(start)
1290
 
            yield data.read(end - start + 1)
1291
 
 
1292
 
    def _file_tail(self, relpath, tail_amount):
1293
 
        code, data = self.transport._get(relpath, [], tail_amount)
1294
 
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1295
 
        data.seek(-tail_amount, 2)
1296
 
        return data.read(tail_amount)
1297
 
 
1298
 
    def test_range_header(self):
1299
 
        # Valid ranges
1300
 
        map(self.assertEqual,['0', '234'],
1301
 
            list(self._file_contents('a', [(0,0), (2,4)])),)
1302
 
 
1303
 
    def test_range_header_tail(self):
1304
 
        self.assertEqual('789', self._file_tail('a', 3))
1305
 
 
1306
 
    def test_syntactically_invalid_range_header(self):
1307
 
        self.assertListRaises(errors.InvalidHttpRange,
1308
 
                          self._file_contents, 'a', [(4, 3)])
1309
 
 
1310
 
    def test_semantically_invalid_range_header(self):
1311
 
        self.assertListRaises(errors.InvalidHttpRange,
1312
 
                          self._file_contents, 'a', [(42, 128)])
1313
 
 
1314
 
 
1315
 
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
1316
 
    """Test redirection between http servers."""
1317
 
 
1318
 
    def create_transport_secondary_server(self):
1319
 
        """Create the secondary server redirecting to the primary server"""
1320
 
        new = self.get_readonly_server()
1321
 
 
1322
 
        redirecting = http_utils.HTTPServerRedirecting(
1323
 
            protocol_version=self._protocol_version)
1324
 
        redirecting.redirect_to(new.host, new.port)
1325
 
        return redirecting
1326
 
 
1327
 
    def setUp(self):
1328
 
        super(TestHTTPRedirections, self).setUp()
1329
 
        self.build_tree_contents([('a', '0123456789'),
1330
 
                                  ('bundle',
1331
 
                                  '# Bazaar revision bundle v0.9\n#\n')
1332
 
                                  ],)
1333
 
        # The requests to the old server will be redirected to the new server
1334
 
        self.old_transport = self._transport(self.old_server.get_url())
1335
 
 
1336
 
    def test_redirected(self):
1337
 
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
1338
 
        t = self._transport(self.new_server.get_url())
1339
 
        self.assertEqual('0123456789', t.get('a').read())
1340
 
 
1341
 
    def test_read_redirected_bundle_from_url(self):
1342
 
        from bzrlib.bundle import read_bundle_from_url
1343
 
        url = self.old_transport.abspath('bundle')
1344
 
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1345
 
                read_bundle_from_url, url)
1346
 
        # If read_bundle_from_url was successful we get an empty bundle
1347
 
        self.assertEqual([], bundle.revisions)
1348
 
 
1349
 
 
1350
 
class RedirectedRequest(_urllib2_wrappers.Request):
1351
 
    """Request following redirections. """
1352
 
 
1353
 
    init_orig = _urllib2_wrappers.Request.__init__
1354
 
 
1355
 
    def __init__(self, method, url, *args, **kwargs):
1356
 
        """Constructor.
1357
 
 
1358
 
        """
1359
 
        # Since the tests using this class will replace
1360
 
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1361
 
        # or we'll loop.
1362
 
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1363
 
        self.follow_redirections = True
1364
 
 
1365
 
 
1366
 
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1367
 
    """Test redirections.
1368
 
 
1369
 
    http implementations do not redirect silently anymore (they
1370
 
    do not redirect at all in fact). The mechanism is still in
1371
 
    place at the _urllib2_wrappers.Request level and these tests
1372
 
    exercise it.
1373
 
 
1374
 
    For the pycurl implementation
1375
 
    the redirection have been deleted as we may deprecate pycurl
1376
 
    and I have no place to keep a working implementation.
1377
 
    -- vila 20070212
1378
 
    """
1379
 
 
1380
 
    def setUp(self):
1381
 
        if pycurl_present and self._transport == PyCurlTransport:
1382
 
            raise tests.TestNotApplicable(
1383
 
                "pycurl doesn't redirect silently annymore")
1384
 
        super(TestHTTPSilentRedirections, self).setUp()
1385
 
        self.setup_redirected_request()
1386
 
        self.addCleanup(self.cleanup_redirected_request)
1387
 
        self.build_tree_contents([('a','a'),
1388
 
                                  ('1/',),
1389
 
                                  ('1/a', 'redirected once'),
1390
 
                                  ('2/',),
1391
 
                                  ('2/a', 'redirected twice'),
1392
 
                                  ('3/',),
1393
 
                                  ('3/a', 'redirected thrice'),
1394
 
                                  ('4/',),
1395
 
                                  ('4/a', 'redirected 4 times'),
1396
 
                                  ('5/',),
1397
 
                                  ('5/a', 'redirected 5 times'),
1398
 
                                  ],)
1399
 
 
1400
 
        self.old_transport = self._transport(self.old_server.get_url())
1401
 
 
1402
 
    def setup_redirected_request(self):
1403
 
        self.original_class = _urllib2_wrappers.Request
1404
 
        _urllib2_wrappers.Request = RedirectedRequest
1405
 
 
1406
 
    def cleanup_redirected_request(self):
1407
 
        _urllib2_wrappers.Request = self.original_class
1408
 
 
1409
 
    def create_transport_secondary_server(self):
1410
 
        """Create the secondary server, redirections are defined in the tests"""
1411
 
        return http_utils.HTTPServerRedirecting(
1412
 
            protocol_version=self._protocol_version)
1413
 
 
1414
 
    def test_one_redirection(self):
1415
 
        t = self.old_transport
1416
 
 
1417
 
        req = RedirectedRequest('GET', t.abspath('a'))
1418
 
        req.follow_redirections = True
1419
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1420
 
                                       self.new_server.port)
1421
 
        self.old_server.redirections = \
1422
 
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
1423
 
        self.assertEquals('redirected once',t._perform(req).read())
1424
 
 
1425
 
    def test_five_redirections(self):
1426
 
        t = self.old_transport
1427
 
 
1428
 
        req = RedirectedRequest('GET', t.abspath('a'))
1429
 
        req.follow_redirections = True
1430
 
        old_prefix = 'http://%s:%s' % (self.old_server.host,
1431
 
                                       self.old_server.port)
1432
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1433
 
                                       self.new_server.port)
1434
 
        self.old_server.redirections = [
1435
 
            ('/1(.*)', r'%s/2\1' % (old_prefix), 302),
1436
 
            ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
1437
 
            ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
1438
 
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1439
 
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1440
 
            ]
1441
 
        self.assertEquals('redirected 5 times',t._perform(req).read())
1442
 
 
1443
 
 
1444
 
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1445
 
    """Test transport.do_catching_redirections."""
1446
 
 
1447
 
    def setUp(self):
1448
 
        super(TestDoCatchRedirections, self).setUp()
1449
 
        self.build_tree_contents([('a', '0123456789'),],)
1450
 
 
1451
 
        self.old_transport = self._transport(self.old_server.get_url())
1452
 
 
1453
 
    def get_a(self, transport):
1454
 
        return transport.get('a')
1455
 
 
1456
 
    def test_no_redirection(self):
1457
 
        t = self._transport(self.new_server.get_url())
1458
 
 
1459
 
        # We use None for redirected so that we fail if redirected
1460
 
        self.assertEquals('0123456789',
1461
 
                          transport.do_catching_redirections(
1462
 
                self.get_a, t, None).read())
1463
 
 
1464
 
    def test_one_redirection(self):
1465
 
        self.redirections = 0
1466
 
 
1467
 
        def redirected(transport, exception, redirection_notice):
1468
 
            self.redirections += 1
1469
 
            dir, file = urlutils.split(exception.target)
1470
 
            return self._transport(dir)
1471
 
 
1472
 
        self.assertEquals('0123456789',
1473
 
                          transport.do_catching_redirections(
1474
 
                self.get_a, self.old_transport, redirected).read())
1475
 
        self.assertEquals(1, self.redirections)
1476
 
 
1477
 
    def test_redirection_loop(self):
1478
 
 
1479
 
        def redirected(transport, exception, redirection_notice):
1480
 
            # By using the redirected url as a base dir for the
1481
 
            # *old* transport, we create a loop: a => a/a =>
1482
 
            # a/a/a
1483
 
            return self.old_transport.clone(exception.target)
1484
 
 
1485
 
        self.assertRaises(errors.TooManyRedirections,
1486
 
                          transport.do_catching_redirections,
1487
 
                          self.get_a, self.old_transport, redirected)
1488
 
 
1489
 
 
1490
 
class TestAuth(http_utils.TestCaseWithWebserver):
1491
 
    """Test authentication scheme"""
1492
 
 
1493
 
    _auth_header = 'Authorization'
1494
 
    _password_prompt_prefix = ''
1495
 
    _username_prompt_prefix = ''
1496
 
    # Set by load_tests
1497
 
    _auth_server = None
1498
 
 
1499
 
    def setUp(self):
1500
 
        super(TestAuth, self).setUp()
1501
 
        self.server = self.get_readonly_server()
1502
 
        self.build_tree_contents([('a', 'contents of a\n'),
1503
 
                                  ('b', 'contents of b\n'),])
1504
 
 
1505
 
    def create_transport_readonly_server(self):
1506
 
        return self._auth_server(protocol_version=self._protocol_version)
1507
 
 
1508
 
    def _testing_pycurl(self):
1509
 
        return pycurl_present and self._transport == PyCurlTransport
1510
 
 
1511
 
    def get_user_url(self, user, password):
1512
 
        """Build an url embedding user and password"""
1513
 
        url = '%s://' % self.server._url_protocol
1514
 
        if user is not None:
1515
 
            url += user
1516
 
            if password is not None:
1517
 
                url += ':' + password
1518
 
            url += '@'
1519
 
        url += '%s:%s/' % (self.server.host, self.server.port)
1520
 
        return url
1521
 
 
1522
 
    def get_user_transport(self, user, password):
1523
 
        return self._transport(self.get_user_url(user, password))
1524
 
 
1525
 
    def test_no_user(self):
1526
 
        self.server.add_user('joe', 'foo')
1527
 
        t = self.get_user_transport(None, None)
1528
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1529
 
        # Only one 'Authentication Required' error should occur
1530
 
        self.assertEqual(1, self.server.auth_required_errors)
1531
 
 
1532
 
    def test_empty_pass(self):
1533
 
        self.server.add_user('joe', '')
1534
 
        t = self.get_user_transport('joe', '')
1535
 
        self.assertEqual('contents of a\n', t.get('a').read())
1536
 
        # Only one 'Authentication Required' error should occur
1537
 
        self.assertEqual(1, self.server.auth_required_errors)
1538
 
 
1539
 
    def test_user_pass(self):
1540
 
        self.server.add_user('joe', 'foo')
1541
 
        t = self.get_user_transport('joe', 'foo')
1542
 
        self.assertEqual('contents of a\n', t.get('a').read())
1543
 
        # Only one 'Authentication Required' error should occur
1544
 
        self.assertEqual(1, self.server.auth_required_errors)
1545
 
 
1546
 
    def test_unknown_user(self):
1547
 
        self.server.add_user('joe', 'foo')
1548
 
        t = self.get_user_transport('bill', 'foo')
1549
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1550
 
        # Two 'Authentication Required' errors should occur (the
1551
 
        # initial 'who are you' and 'I don't know you, who are
1552
 
        # you').
1553
 
        self.assertEqual(2, self.server.auth_required_errors)
1554
 
 
1555
 
    def test_wrong_pass(self):
1556
 
        self.server.add_user('joe', 'foo')
1557
 
        t = self.get_user_transport('joe', 'bar')
1558
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1559
 
        # Two 'Authentication Required' errors should occur (the
1560
 
        # initial 'who are you' and 'this is not you, who are you')
1561
 
        self.assertEqual(2, self.server.auth_required_errors)
1562
 
 
1563
 
    def test_prompt_for_username(self):
1564
 
        if self._testing_pycurl():
1565
 
            raise tests.TestNotApplicable(
1566
 
                'pycurl cannot prompt, it handles auth by embedding'
1567
 
                ' user:pass in urls only')
1568
 
 
1569
 
        self.server.add_user('joe', 'foo')
1570
 
        t = self.get_user_transport(None, None)
1571
 
        stdout = tests.StringIOWrapper()
1572
 
        stderr = tests.StringIOWrapper()
1573
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1574
 
                                            stdout=stdout, stderr=stderr)
1575
 
        self.assertEqual('contents of a\n',t.get('a').read())
1576
 
        # stdin should be empty
1577
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1578
 
        stderr.seek(0)
1579
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1580
 
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
1581
 
        self.assertEquals('', stdout.getvalue())
1582
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1583
 
                                    stderr.readline())
1584
 
 
1585
 
    def test_prompt_for_password(self):
1586
 
        if self._testing_pycurl():
1587
 
            raise tests.TestNotApplicable(
1588
 
                'pycurl cannot prompt, it handles auth by embedding'
1589
 
                ' user:pass in urls only')
1590
 
 
1591
 
        self.server.add_user('joe', 'foo')
1592
 
        t = self.get_user_transport('joe', None)
1593
 
        stdout = tests.StringIOWrapper()
1594
 
        stderr = tests.StringIOWrapper()
1595
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1596
 
                                            stdout=stdout, stderr=stderr)
1597
 
        self.assertEqual('contents of a\n', t.get('a').read())
1598
 
        # stdin should be empty
1599
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1600
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1601
 
                                    stderr.getvalue())
1602
 
        self.assertEquals('', stdout.getvalue())
1603
 
        # And we shouldn't prompt again for a different request
1604
 
        # against the same transport.
1605
 
        self.assertEqual('contents of b\n',t.get('b').read())
1606
 
        t2 = t.clone()
1607
 
        # And neither against a clone
1608
 
        self.assertEqual('contents of b\n',t2.get('b').read())
1609
 
        # Only one 'Authentication Required' error should occur
1610
 
        self.assertEqual(1, self.server.auth_required_errors)
1611
 
 
1612
 
    def _check_password_prompt(self, scheme, user, actual_prompt):
1613
 
        expected_prompt = (self._password_prompt_prefix
1614
 
                           + ("%s %s@%s:%d, Realm: '%s' password: "
1615
 
                              % (scheme.upper(),
1616
 
                                 user, self.server.host, self.server.port,
1617
 
                                 self.server.auth_realm)))
1618
 
        self.assertEquals(expected_prompt, actual_prompt)
1619
 
 
1620
 
    def _expected_username_prompt(self, scheme):
1621
 
        return (self._username_prompt_prefix
1622
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1623
 
                                 self.server.host, self.server.port,
1624
 
                                 self.server.auth_realm))
1625
 
 
1626
 
    def test_no_prompt_for_password_when_using_auth_config(self):
1627
 
        if self._testing_pycurl():
1628
 
            raise tests.TestNotApplicable(
1629
 
                'pycurl does not support authentication.conf'
1630
 
                ' since it cannot prompt')
1631
 
 
1632
 
        user =' joe'
1633
 
        password = 'foo'
1634
 
        stdin_content = 'bar\n'  # Not the right password
1635
 
        self.server.add_user(user, password)
1636
 
        t = self.get_user_transport(user, None)
1637
 
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1638
 
                                            stdout=tests.StringIOWrapper())
1639
 
        # Create a minimal config file with the right password
1640
 
        conf = config.AuthenticationConfig()
1641
 
        conf._get_config().update(
1642
 
            {'httptest': {'scheme': 'http', 'port': self.server.port,
1643
 
                          'user': user, 'password': password}})
1644
 
        conf._save()
1645
 
        # Issue a request to the server to connect
1646
 
        self.assertEqual('contents of a\n',t.get('a').read())
1647
 
        # stdin should have  been left untouched
1648
 
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
1649
 
        # Only one 'Authentication Required' error should occur
1650
 
        self.assertEqual(1, self.server.auth_required_errors)
1651
 
 
1652
 
    def test_user_from_auth_conf(self):
1653
 
        if self._testing_pycurl():
1654
 
            raise tests.TestNotApplicable(
1655
 
                'pycurl does not support authentication.conf')
1656
 
        user = 'joe'
1657
 
        password = 'foo'
1658
 
        self.server.add_user(user, password)
1659
 
        # Create a minimal config file with the right password
1660
 
        conf = config.AuthenticationConfig()
1661
 
        conf._get_config().update(
1662
 
            {'httptest': {'scheme': 'http', 'port': self.server.port,
1663
 
                          'user': user, 'password': password}})
1664
 
        conf._save()
1665
 
        t = self.get_user_transport(None, None)
1666
 
        # Issue a request to the server to connect
1667
 
        self.assertEqual('contents of a\n', t.get('a').read())
1668
 
        # Only one 'Authentication Required' error should occur
1669
 
        self.assertEqual(1, self.server.auth_required_errors)
1670
 
 
1671
 
    def test_changing_nonce(self):
1672
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1673
 
                                     http_utils.ProxyDigestAuthServer):
1674
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1675
 
        if self._testing_pycurl():
1676
 
            raise tests.KnownFailure(
1677
 
                'pycurl does not handle a nonce change')
1678
 
        self.server.add_user('joe', 'foo')
1679
 
        t = self.get_user_transport('joe', 'foo')
1680
 
        self.assertEqual('contents of a\n', t.get('a').read())
1681
 
        self.assertEqual('contents of b\n', t.get('b').read())
1682
 
        # Only one 'Authentication Required' error should have
1683
 
        # occured so far
1684
 
        self.assertEqual(1, self.server.auth_required_errors)
1685
 
        # The server invalidates the current nonce
1686
 
        self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
1687
 
        self.assertEqual('contents of a\n', t.get('a').read())
1688
 
        # Two 'Authentication Required' errors should occur (the
1689
 
        # initial 'who are you' and a second 'who are you' with the new nonce)
1690
 
        self.assertEqual(2, self.server.auth_required_errors)
1691
 
 
1692
 
 
1693
 
 
1694
 
class TestProxyAuth(TestAuth):
1695
 
    """Test proxy authentication schemes."""
1696
 
 
1697
 
    _auth_header = 'Proxy-authorization'
1698
 
    _password_prompt_prefix = 'Proxy '
1699
 
    _username_prompt_prefix = 'Proxy '
1700
 
 
1701
 
    def setUp(self):
1702
 
        super(TestProxyAuth, self).setUp()
1703
 
        self._old_env = {}
1704
 
        self.addCleanup(self._restore_env)
1705
 
        # Override the contents to avoid false positives
1706
 
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
1707
 
                                  ('b', 'not proxied contents of b\n'),
1708
 
                                  ('a-proxied', 'contents of a\n'),
1709
 
                                  ('b-proxied', 'contents of b\n'),
1710
 
                                  ])
1711
 
 
1712
 
    def get_user_transport(self, user, password):
1713
 
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1714
 
        return self._transport(self.server.get_url())
1715
 
 
1716
 
    def _install_env(self, env):
1717
 
        for name, value in env.iteritems():
1718
 
            self._old_env[name] = osutils.set_or_unset_env(name, value)
1719
 
 
1720
 
    def _restore_env(self):
1721
 
        for name, value in self._old_env.iteritems():
1722
 
            osutils.set_or_unset_env(name, value)
1723
 
 
1724
 
    def test_empty_pass(self):
1725
 
        if self._testing_pycurl():
1726
 
            import pycurl
1727
 
            if pycurl.version_info()[1] < '7.16.0':
1728
 
                raise tests.KnownFailure(
1729
 
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
1730
 
        super(TestProxyAuth, self).test_empty_pass()
1731
 
 
1732
 
 
1733
 
class SampleSocket(object):
1734
 
    """A socket-like object for use in testing the HTTP request handler."""
1735
 
 
1736
 
    def __init__(self, socket_read_content):
1737
 
        """Constructs a sample socket.
1738
 
 
1739
 
        :param socket_read_content: a byte sequence
1740
 
        """
1741
 
        # Use plain python StringIO so we can monkey-patch the close method to
1742
 
        # not discard the contents.
1743
 
        from StringIO import StringIO
1744
 
        self.readfile = StringIO(socket_read_content)
1745
 
        self.writefile = StringIO()
1746
 
        self.writefile.close = lambda: None
1747
 
 
1748
 
    def makefile(self, mode='r', bufsize=None):
1749
 
        if 'r' in mode:
1750
 
            return self.readfile
1751
 
        else:
1752
 
            return self.writefile
1753
 
 
1754
 
 
1755
 
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
1756
 
 
1757
 
    def setUp(self):
1758
 
        super(SmartHTTPTunnellingTest, self).setUp()
1759
 
        # We use the VFS layer as part of HTTP tunnelling tests.
1760
 
        self._captureVar('BZR_NO_SMART_VFS', None)
1761
 
        self.transport_readonly_server = http_utils.HTTPServerWithSmarts
1762
 
 
1763
 
    def create_transport_readonly_server(self):
1764
 
        return http_utils.HTTPServerWithSmarts(
1765
 
            protocol_version=self._protocol_version)
1766
 
 
1767
 
    def test_open_bzrdir(self):
1768
 
        branch = self.make_branch('relpath')
1769
 
        http_server = self.get_readonly_server()
1770
 
        url = http_server.get_url() + 'relpath'
1771
 
        bd = bzrdir.BzrDir.open(url)
1772
 
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1773
 
 
1774
 
    def test_bulk_data(self):
1775
 
        # We should be able to send and receive bulk data in a single message.
1776
 
        # The 'readv' command in the smart protocol both sends and receives
1777
 
        # bulk data, so we use that.
1778
 
        self.build_tree(['data-file'])
1779
 
        http_server = self.get_readonly_server()
1780
 
        http_transport = self._transport(http_server.get_url())
1781
 
        medium = http_transport.get_smart_medium()
1782
 
        # Since we provide the medium, the url below will be mostly ignored
1783
 
        # during the test, as long as the path is '/'.
1784
 
        remote_transport = remote.RemoteTransport('bzr://fake_host/',
1785
 
                                                  medium=medium)
1786
 
        self.assertEqual(
1787
 
            [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
1788
 
 
1789
 
    def test_http_send_smart_request(self):
1790
 
 
1791
 
        post_body = 'hello\n'
1792
 
        expected_reply_body = 'ok\x012\n'
1793
 
 
1794
 
        http_server = self.get_readonly_server()
1795
 
        http_transport = self._transport(http_server.get_url())
1796
 
        medium = http_transport.get_smart_medium()
1797
 
        response = medium.send_http_smart_request(post_body)
1798
 
        reply_body = response.read()
1799
 
        self.assertEqual(expected_reply_body, reply_body)
1800
 
 
1801
 
    def test_smart_http_server_post_request_handler(self):
1802
 
        httpd = self.get_readonly_server()._get_httpd()
1803
 
 
1804
 
        socket = SampleSocket(
1805
 
            'POST /.bzr/smart %s \r\n' % self._protocol_version
1806
 
            # HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
1807
 
            # for 1.0)
1808
 
            + 'Content-Length: 6\r\n'
1809
 
            '\r\n'
1810
 
            'hello\n')
1811
 
        # Beware: the ('localhost', 80) below is the
1812
 
        # client_address parameter, but we don't have one because
1813
 
        # we have defined a socket which is not bound to an
1814
 
        # address. The test framework never uses this client
1815
 
        # address, so far...
1816
 
        request_handler = http_utils.SmartRequestHandler(socket,
1817
 
                                                         ('localhost', 80),
1818
 
                                                         httpd)
1819
 
        response = socket.writefile.getvalue()
1820
 
        self.assertStartsWith(response, '%s 200 ' % self._protocol_version)
1821
 
        # This includes the end of the HTTP headers, and all the body.
1822
 
        expected_end_of_response = '\r\n\r\nok\x012\n'
1823
 
        self.assertEndsWith(response, expected_end_of_response)
1824
 
 
1825
 
 
1826
 
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
1827
 
    """No smart server here request handler."""
1828
 
 
1829
 
    def do_POST(self):
1830
 
        self.send_error(403, "Forbidden")
1831
 
 
1832
 
 
1833
 
class SmartClientAgainstNotSmartServer(TestSpecificRequestHandler):
1834
 
    """Test smart client behaviour against an http server without smarts."""
1835
 
 
1836
 
    _req_handler_class = ForbiddenRequestHandler
1837
 
 
1838
 
    def test_probe_smart_server(self):
1839
 
        """Test error handling against server refusing smart requests."""
1840
 
        server = self.get_readonly_server()
1841
 
        t = self._transport(server.get_url())
1842
 
        # No need to build a valid smart request here, the server will not even
1843
 
        # try to interpret it.
1844
 
        self.assertRaises(errors.SmartProtocolError,
1845
 
                          t.get_smart_medium().send_http_smart_request,
1846
 
                          'whatever')
1847
 
 
1848
 
class Test_redirected_to(tests.TestCase):
1849
 
 
1850
 
    def test_redirected_to_subdir(self):
1851
 
        t = self._transport('http://www.example.com/foo')
1852
 
        r = t._redirected_to('http://www.example.com/foo',
1853
 
                             'http://www.example.com/foo/subdir')
1854
 
        self.assertIsInstance(r, type(t))
1855
 
        # Both transports share the some connection
1856
 
        self.assertEquals(t._get_connection(), r._get_connection())
1857
 
 
1858
 
    def test_redirected_to_self_with_slash(self):
1859
 
        t = self._transport('http://www.example.com/foo')
1860
 
        r = t._redirected_to('http://www.example.com/foo',
1861
 
                             'http://www.example.com/foo/')
1862
 
        self.assertIsInstance(r, type(t))
1863
 
        # Both transports share the some connection (one can argue that we
1864
 
        # should return the exact same transport here, but that seems
1865
 
        # overkill).
1866
 
        self.assertEquals(t._get_connection(), r._get_connection())
1867
 
 
1868
 
    def test_redirected_to_host(self):
1869
 
        t = self._transport('http://www.example.com/foo')
1870
 
        r = t._redirected_to('http://www.example.com/foo',
1871
 
                             'http://foo.example.com/foo/subdir')
1872
 
        self.assertIsInstance(r, type(t))
1873
 
 
1874
 
    def test_redirected_to_same_host_sibling_protocol(self):
1875
 
        t = self._transport('http://www.example.com/foo')
1876
 
        r = t._redirected_to('http://www.example.com/foo',
1877
 
                             'https://www.example.com/foo')
1878
 
        self.assertIsInstance(r, type(t))
1879
 
 
1880
 
    def test_redirected_to_same_host_different_protocol(self):
1881
 
        t = self._transport('http://www.example.com/foo')
1882
 
        r = t._redirected_to('http://www.example.com/foo',
1883
 
                             'ftp://www.example.com/foo')
1884
 
        self.assertNotEquals(type(r), type(t))
1885
 
 
1886
 
    def test_redirected_to_different_host_same_user(self):
1887
 
        t = self._transport('http://joe@www.example.com/foo')
1888
 
        r = t._redirected_to('http://www.example.com/foo',
1889
 
                             'https://foo.example.com/foo')
1890
 
        self.assertIsInstance(r, type(t))
1891
 
        self.assertEquals(t._user, r._user)
1892
 
 
1893
 
 
1894
 
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1895
 
    """Request handler for a unique and pre-defined request.
1896
 
 
1897
 
    The only thing we care about here is how many bytes travel on the wire. But
1898
 
    since we want to measure it for a real http client, we have to send it
1899
 
    correct responses.
1900
 
 
1901
 
    We expect to receive a *single* request nothing more (and we won't even
1902
 
    check what request it is, we just measure the bytes read until an empty
1903
 
    line.
1904
 
    """
1905
 
 
1906
 
    def handle_one_request(self):
1907
 
        tcs = self.server.test_case_server
1908
 
        requestline = self.rfile.readline()
1909
 
        headers = self.MessageClass(self.rfile, 0)
1910
 
        # We just read: the request, the headers, an empty line indicating the
1911
 
        # end of the headers.
1912
 
        bytes_read = len(requestline)
1913
 
        for line in headers.headers:
1914
 
            bytes_read += len(line)
1915
 
        bytes_read += len('\r\n')
1916
 
        if requestline.startswith('POST'):
1917
 
            # The body should be a single line (or we don't know where it ends
1918
 
            # and we don't want to issue a blocking read)
1919
 
            body = self.rfile.readline()
1920
 
            bytes_read += len(body)
1921
 
        tcs.bytes_read = bytes_read
1922
 
 
1923
 
        # We set the bytes written *before* issuing the write, the client is
1924
 
        # supposed to consume every produced byte *before* checking that value.
1925
 
 
1926
 
        # Doing the oppposite may lead to test failure: we may be interrupted
1927
 
        # after the write but before updating the value. The client can then
1928
 
        # continue and read the value *before* we can update it. And yes,
1929
 
        # this has been observed -- vila 20090129
1930
 
        tcs.bytes_written = len(tcs.canned_response)
1931
 
        self.wfile.write(tcs.canned_response)
1932
 
 
1933
 
 
1934
 
class ActivityServerMixin(object):
1935
 
 
1936
 
    def __init__(self, protocol_version):
1937
 
        super(ActivityServerMixin, self).__init__(
1938
 
            request_handler=PredefinedRequestHandler,
1939
 
            protocol_version=protocol_version)
1940
 
        # Bytes read and written by the server
1941
 
        self.bytes_read = 0
1942
 
        self.bytes_written = 0
1943
 
        self.canned_response = None
1944
 
 
1945
 
 
1946
 
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
1947
 
    pass
1948
 
 
1949
 
 
1950
 
if tests.HTTPSServerFeature.available():
1951
 
    from bzrlib.tests import https_server
1952
 
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
1953
 
        pass
1954
 
 
1955
 
 
1956
 
class TestActivity(tests.TestCase):
1957
 
    """Test socket activity reporting.
1958
 
 
1959
 
    We use a special purpose server to control the bytes sent and received and
1960
 
    be able to predict the activity on the client socket.
1961
 
    """
1962
 
 
1963
 
    def setUp(self):
1964
 
        tests.TestCase.setUp(self)
1965
 
        self.server = self._activity_server(self._protocol_version)
1966
 
        self.server.setUp()
1967
 
        self.activities = {}
1968
 
        def report_activity(t, bytes, direction):
1969
 
            count = self.activities.get(direction, 0)
1970
 
            count += bytes
1971
 
            self.activities[direction] = count
1972
 
 
1973
 
        # We override at class level because constructors may propagate the
1974
 
        # bound method and render instance overriding ineffective (an
1975
 
        # alternative would be to define a specific ui factory instead...)
1976
 
        self.orig_report_activity = self._transport._report_activity
1977
 
        self._transport._report_activity = report_activity
1978
 
 
1979
 
    def tearDown(self):
1980
 
        self._transport._report_activity = self.orig_report_activity
1981
 
        self.server.tearDown()
1982
 
        tests.TestCase.tearDown(self)
1983
 
 
1984
 
    def get_transport(self):
1985
 
        return self._transport(self.server.get_url())
1986
 
 
1987
 
    def assertActivitiesMatch(self):
1988
 
        self.assertEqual(self.server.bytes_read,
1989
 
                         self.activities.get('write', 0), 'written bytes')
1990
 
        self.assertEqual(self.server.bytes_written,
1991
 
                         self.activities.get('read', 0), 'read bytes')
1992
 
 
1993
 
    def test_get(self):
1994
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
1995
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
1996
 
Server: Apache/2.0.54 (Fedora)\r
1997
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
1998
 
ETag: "56691-23-38e9ae00"\r
1999
 
Accept-Ranges: bytes\r
2000
 
Content-Length: 35\r
2001
 
Connection: close\r
2002
 
Content-Type: text/plain; charset=UTF-8\r
2003
 
\r
2004
 
Bazaar-NG meta directory, format 1
2005
 
'''
2006
 
        t = self.get_transport()
2007
 
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
2008
 
                         t.get('foo/bar').read())
2009
 
        self.assertActivitiesMatch()
2010
 
 
2011
 
    def test_has(self):
2012
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2013
 
Server: SimpleHTTP/0.6 Python/2.5.2\r
2014
 
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
2015
 
Content-type: application/octet-stream\r
2016
 
Content-Length: 20\r
2017
 
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
2018
 
\r
2019
 
'''
2020
 
        t = self.get_transport()
2021
 
        self.assertTrue(t.has('foo/bar'))
2022
 
        self.assertActivitiesMatch()
2023
 
 
2024
 
    def test_readv(self):
2025
 
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
2026
 
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
2027
 
Server: Apache/2.0.54 (Fedora)\r
2028
 
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
2029
 
ETag: "238a3c-16ec2-805c5540"\r
2030
 
Accept-Ranges: bytes\r
2031
 
Content-Length: 1534\r
2032
 
Connection: close\r
2033
 
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
2034
 
\r
2035
 
\r
2036
 
--418470f848b63279b\r
2037
 
Content-type: text/plain; charset=UTF-8\r
2038
 
Content-range: bytes 0-254/93890\r
2039
 
\r
2040
 
mbp@sourcefrog.net-20050309040815-13242001617e4a06
2041
 
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
2042
 
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
2043
 
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
2044
 
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
2045
 
\r
2046
 
--418470f848b63279b\r
2047
 
Content-type: text/plain; charset=UTF-8\r
2048
 
Content-range: bytes 1000-2049/93890\r
2049
 
\r
2050
 
40-fd4ec249b6b139ab
2051
 
mbp@sourcefrog.net-20050311063625-07858525021f270b
2052
 
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
2053
 
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
2054
 
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
2055
 
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
2056
 
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
2057
 
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
2058
 
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
2059
 
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
2060
 
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
2061
 
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
2062
 
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
2063
 
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
2064
 
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
2065
 
mbp@sourcefrog.net-20050313120651-497bd231b19df600
2066
 
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
2067
 
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
2068
 
mbp@sourcefrog.net-20050314025539-637a636692c055cf
2069
 
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
2070
 
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
2071
 
mbp@source\r
2072
 
--418470f848b63279b--\r
2073
 
'''
2074
 
        t = self.get_transport()
2075
 
        # Remember that the request is ignored and that the ranges below
2076
 
        # doesn't have to match the canned response.
2077
 
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
2078
 
        self.assertEqual(2, len(l))
2079
 
        self.assertActivitiesMatch()
2080
 
 
2081
 
    def test_post(self):
2082
 
        self.server.canned_response = '''HTTP/1.1 200 OK\r
2083
 
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
2084
 
Server: Apache/2.0.54 (Fedora)\r
2085
 
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
2086
 
ETag: "56691-23-38e9ae00"\r
2087
 
Accept-Ranges: bytes\r
2088
 
Content-Length: 35\r
2089
 
Connection: close\r
2090
 
Content-Type: text/plain; charset=UTF-8\r
2091
 
\r
2092
 
lalala whatever as long as itsssss
2093
 
'''
2094
 
        t = self.get_transport()
2095
 
        # We must send a single line of body bytes, see
2096
 
        # PredefinedRequestHandler.handle_one_request
2097
 
        code, f = t._post('abc def end-of-body\n')
2098
 
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2099
 
        self.assertActivitiesMatch()