~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Vincent Ladeuil
  • Date: 2009-12-14 15:51:36 UTC
  • mto: (4894.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4895.
  • Revision ID: v.ladeuil+lp@free.fr-20091214155136-rf4nkqvxda9oiw4u
Cleanup tests and tweak the text displayed.

* bzrlib/tests/blackbox/test_update.py:
Fix imports and replace the assertContainsRe with assertEqualDiff
to make the test clearer, more robust and easier to debug.

* bzrlib/tests/commands/test_update.py: 
Fix imports.

* bzrlib/tests/blackbox/test_filtered_view_ops.py: 
Fix imports and strange accesses to base class methods.
(TestViewTreeOperations.test_view_on_update): Avoid os.chdir()
call, simplify string matching assertions.

* bzrlib/builtins.py:
(cmd_update.run): Fix spurious space, get rid of the final '/' for
the base path, don't add a final period (it's a legal char in a
path and would be annoying for people that like to copy/paste).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for HTTP implementations.
 
18
 
 
19
This module defines a load_tests() method that parametrize tests classes for
 
20
transport implementation, http protocol versions and authentication schemes.
 
21
"""
 
22
 
 
23
# TODO: Should be renamed to bzrlib.transport.http.tests?
 
24
# TODO: What about renaming to bzrlib.tests.transport.http ?
 
25
 
 
26
from cStringIO import StringIO
 
27
import httplib
 
28
import os
 
29
import select
 
30
import SimpleHTTPServer
 
31
import socket
 
32
import sys
 
33
import threading
 
34
 
 
35
import bzrlib
 
36
from bzrlib import (
 
37
    bzrdir,
 
38
    config,
 
39
    errors,
 
40
    osutils,
 
41
    remote as _mod_remote,
 
42
    tests,
 
43
    transport,
 
44
    ui,
 
45
    urlutils,
 
46
    )
 
47
from bzrlib.symbol_versioning import (
 
48
    deprecated_in,
 
49
    )
 
50
from bzrlib.tests import (
 
51
    http_server,
 
52
    http_utils,
 
53
    )
 
54
from bzrlib.transport import (
 
55
    http,
 
56
    remote,
 
57
    )
 
58
from bzrlib.transport.http import (
 
59
    _urllib,
 
60
    _urllib2_wrappers,
 
61
    )
 
62
 
 
63
 
 
64
try:
 
65
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
66
    pycurl_present = True
 
67
except errors.DependencyNotPresent:
 
68
    pycurl_present = False
 
69
 
 
70
 
 
71
def load_tests(standard_tests, module, loader):
 
72
    """Multiply tests for http clients and protocol versions."""
 
73
    result = loader.suiteClass()
 
74
 
 
75
    # one for each transport implementation
 
76
    t_tests, remaining_tests = tests.split_suite_by_condition(
 
77
        standard_tests, tests.condition_isinstance((
 
78
                TestHttpTransportRegistration,
 
79
                TestHttpTransportUrls,
 
80
                Test_redirected_to,
 
81
                )))
 
82
    transport_scenarios = [
 
83
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
84
                        _server=http_server.HttpServer_urllib,
 
85
                        _qualified_prefix='http+urllib',)),
 
86
        ]
 
87
    if pycurl_present:
 
88
        transport_scenarios.append(
 
89
            ('pycurl', dict(_transport=PyCurlTransport,
 
90
                            _server=http_server.HttpServer_PyCurl,
 
91
                            _qualified_prefix='http+pycurl',)))
 
92
    tests.multiply_tests(t_tests, transport_scenarios, result)
 
93
 
 
94
    # each implementation tested with each HTTP version
 
95
    tp_tests, remaining_tests = tests.split_suite_by_condition(
 
96
        remaining_tests, tests.condition_isinstance((
 
97
                SmartHTTPTunnellingTest,
 
98
                TestDoCatchRedirections,
 
99
                TestHTTPConnections,
 
100
                TestHTTPRedirections,
 
101
                TestHTTPSilentRedirections,
 
102
                TestLimitedRangeRequestServer,
 
103
                TestPost,
 
104
                TestProxyHttpServer,
 
105
                TestRanges,
 
106
                TestSpecificRequestHandler,
 
107
                )))
 
108
    protocol_scenarios = [
 
109
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
110
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
111
            ]
 
112
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
 
113
                                            protocol_scenarios)
 
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
 
115
 
 
116
    # proxy auth: each auth scheme on all http versions on all implementations.
 
117
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
118
        remaining_tests, tests.condition_isinstance((
 
119
                TestProxyAuth,
 
120
                )))
 
121
    proxy_auth_scheme_scenarios = [
 
122
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
123
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
124
        ('basicdigest',
 
125
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
126
        ]
 
127
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
128
                                              proxy_auth_scheme_scenarios)
 
129
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
130
 
 
131
    # auth: each auth scheme on all http versions on all implementations.
 
132
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
 
133
        remaining_tests, tests.condition_isinstance((
 
134
                TestAuth,
 
135
                )))
 
136
    auth_scheme_scenarios = [
 
137
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
138
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
139
        ('basicdigest',
 
140
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
 
141
        ]
 
142
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
143
                                             auth_scheme_scenarios)
 
144
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
 
145
 
 
146
    # activity: on all http[s] versions on all implementations
 
147
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
 
148
        remaining_tests, tests.condition_isinstance((
 
149
                TestActivity,
 
150
                )))
 
151
    activity_scenarios = [
 
152
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
153
                             _transport=_urllib.HttpTransport_urllib,)),
 
154
        ]
 
155
    if tests.HTTPSServerFeature.available():
 
156
        activity_scenarios.append(
 
157
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
158
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
159
    if pycurl_present:
 
160
        activity_scenarios.append(
 
161
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
162
                                 _transport=PyCurlTransport,)),)
 
163
        if tests.HTTPSServerFeature.available():
 
164
            from bzrlib.tests import (
 
165
                ssl_certs,
 
166
                )
 
167
            # FIXME: Until we have a better way to handle self-signed
 
168
            # certificates (like allowing them in a test specific
 
169
            # authentication.conf for example), we need some specialized pycurl
 
170
            # transport for tests.
 
171
            class HTTPS_pycurl_transport(PyCurlTransport):
 
172
 
 
173
                def __init__(self, base, _from_transport=None):
 
174
                    super(HTTPS_pycurl_transport, self).__init__(
 
175
                        base, _from_transport)
 
176
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
177
 
 
178
            activity_scenarios.append(
 
179
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
180
                                      _transport=HTTPS_pycurl_transport,)),)
 
181
 
 
182
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
183
                                               protocol_scenarios)
 
184
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
 
185
 
 
186
    # No parametrization for the remaining tests
 
187
    result.addTests(remaining_tests)
 
188
 
 
189
    return result
 
190
 
 
191
 
 
192
class FakeManager(object):
 
193
 
 
194
    def __init__(self):
 
195
        self.credentials = []
 
196
 
 
197
    def add_password(self, realm, host, username, password):
 
198
        self.credentials.append([realm, host, username, password])
 
199
 
 
200
 
 
201
class RecordingServer(object):
 
202
    """A fake HTTP server.
 
203
 
 
204
    It records the bytes sent to it, and replies with a 200.
 
205
    """
 
206
 
 
207
    def __init__(self, expect_body_tail=None, scheme=''):
 
208
        """Constructor.
 
209
 
 
210
        :type expect_body_tail: str
 
211
        :param expect_body_tail: a reply won't be sent until this string is
 
212
            received.
 
213
        """
 
214
        self._expect_body_tail = expect_body_tail
 
215
        self.host = None
 
216
        self.port = None
 
217
        self.received_bytes = ''
 
218
        self.scheme = scheme
 
219
 
 
220
    def get_url(self):
 
221
        return '%s://%s:%s/' % (self.scheme, self.host, self.port)
 
222
 
 
223
    def setUp(self):
 
224
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
225
        self._sock.bind(('127.0.0.1', 0))
 
226
        self.host, self.port = self._sock.getsockname()
 
227
        self._ready = threading.Event()
 
228
        self._thread = threading.Thread(target=self._accept_read_and_reply)
 
229
        self._thread.setDaemon(True)
 
230
        self._thread.start()
 
231
        self._ready.wait(5)
 
232
 
 
233
    def _accept_read_and_reply(self):
 
234
        self._sock.listen(1)
 
235
        self._ready.set()
 
236
        self._sock.settimeout(5)
 
237
        try:
 
238
            conn, address = self._sock.accept()
 
239
            # On win32, the accepted connection will be non-blocking to start
 
240
            # with because we're using settimeout.
 
241
            conn.setblocking(True)
 
242
            while not self.received_bytes.endswith(self._expect_body_tail):
 
243
                self.received_bytes += conn.recv(4096)
 
244
            conn.sendall('HTTP/1.1 200 OK\r\n')
 
245
        except socket.timeout:
 
246
            # Make sure the client isn't stuck waiting for us to e.g. accept.
 
247
            self._sock.close()
 
248
        except socket.error:
 
249
            # The client may have already closed the socket.
 
250
            pass
 
251
 
 
252
    def tearDown(self):
 
253
        try:
 
254
            self._sock.close()
 
255
        except socket.error:
 
256
            # We might have already closed it.  We don't care.
 
257
            pass
 
258
        self.host = None
 
259
        self.port = None
 
260
 
 
261
 
 
262
class TestAuthHeader(tests.TestCase):
 
263
 
 
264
    def parse_header(self, header, auth_handler_class=None):
 
265
        if auth_handler_class is None:
 
266
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
 
267
        self.auth_handler =  auth_handler_class()
 
268
        return self.auth_handler._parse_auth_header(header)
 
269
 
 
270
    def test_empty_header(self):
 
271
        scheme, remainder = self.parse_header('')
 
272
        self.assertEqual('', scheme)
 
273
        self.assertIs(None, remainder)
 
274
 
 
275
    def test_negotiate_header(self):
 
276
        scheme, remainder = self.parse_header('Negotiate')
 
277
        self.assertEqual('negotiate', scheme)
 
278
        self.assertIs(None, remainder)
 
279
 
 
280
    def test_basic_header(self):
 
281
        scheme, remainder = self.parse_header(
 
282
            'Basic realm="Thou should not pass"')
 
283
        self.assertEqual('basic', scheme)
 
284
        self.assertEqual('realm="Thou should not pass"', remainder)
 
285
 
 
286
    def test_basic_extract_realm(self):
 
287
        scheme, remainder = self.parse_header(
 
288
            'Basic realm="Thou should not pass"',
 
289
            _urllib2_wrappers.BasicAuthHandler)
 
290
        match, realm = self.auth_handler.extract_realm(remainder)
 
291
        self.assertTrue(match is not None)
 
292
        self.assertEqual('Thou should not pass', realm)
 
293
 
 
294
    def test_digest_header(self):
 
295
        scheme, remainder = self.parse_header(
 
296
            'Digest realm="Thou should not pass"')
 
297
        self.assertEqual('digest', scheme)
 
298
        self.assertEqual('realm="Thou should not pass"', remainder)
 
299
 
 
300
 
 
301
class TestHTTPServer(tests.TestCase):
 
302
    """Test the HTTP servers implementations."""
 
303
 
 
304
    def test_invalid_protocol(self):
 
305
        class BogusRequestHandler(http_server.TestingHTTPRequestHandler):
 
306
 
 
307
            protocol_version = 'HTTP/0.1'
 
308
 
 
309
        server = http_server.HttpServer(BogusRequestHandler)
 
310
        try:
 
311
            self.assertRaises(httplib.UnknownProtocol, server.setUp)
 
312
        except:
 
313
            server.tearDown()
 
314
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
315
 
 
316
    def test_force_invalid_protocol(self):
 
317
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
 
318
        try:
 
319
            self.assertRaises(httplib.UnknownProtocol, server.setUp)
 
320
        except:
 
321
            server.tearDown()
 
322
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
323
 
 
324
    def test_server_start_and_stop(self):
 
325
        server = http_server.HttpServer()
 
326
        server.setUp()
 
327
        try:
 
328
            self.assertTrue(server._http_running)
 
329
        finally:
 
330
            server.tearDown()
 
331
        self.assertFalse(server._http_running)
 
332
 
 
333
    def test_create_http_server_one_zero(self):
 
334
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
335
 
 
336
            protocol_version = 'HTTP/1.0'
 
337
 
 
338
        server = http_server.HttpServer(RequestHandlerOneZero)
 
339
        self.start_server(server)
 
340
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
 
341
 
 
342
    def test_create_http_server_one_one(self):
 
343
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
344
 
 
345
            protocol_version = 'HTTP/1.1'
 
346
 
 
347
        server = http_server.HttpServer(RequestHandlerOneOne)
 
348
        self.start_server(server)
 
349
        self.assertIsInstance(server._httpd,
 
350
                              http_server.TestingThreadingHTTPServer)
 
351
 
 
352
    def test_create_http_server_force_one_one(self):
 
353
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
354
 
 
355
            protocol_version = 'HTTP/1.0'
 
356
 
 
357
        server = http_server.HttpServer(RequestHandlerOneZero,
 
358
                                        protocol_version='HTTP/1.1')
 
359
        self.start_server(server)
 
360
        self.assertIsInstance(server._httpd,
 
361
                              http_server.TestingThreadingHTTPServer)
 
362
 
 
363
    def test_create_http_server_force_one_zero(self):
 
364
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
365
 
 
366
            protocol_version = 'HTTP/1.1'
 
367
 
 
368
        server = http_server.HttpServer(RequestHandlerOneOne,
 
369
                                        protocol_version='HTTP/1.0')
 
370
        self.start_server(server)
 
371
        self.assertIsInstance(server._httpd,
 
372
                              http_server.TestingHTTPServer)
 
373
 
 
374
 
 
375
class TestWithTransport_pycurl(object):
 
376
    """Test case to inherit from if pycurl is present"""
 
377
 
 
378
    def _get_pycurl_maybe(self):
 
379
        try:
 
380
            from bzrlib.transport.http._pycurl import PyCurlTransport
 
381
            return PyCurlTransport
 
382
        except errors.DependencyNotPresent:
 
383
            raise tests.TestSkipped('pycurl not present')
 
384
 
 
385
    _transport = property(_get_pycurl_maybe)
 
386
 
 
387
 
 
388
class TestHttpUrls(tests.TestCase):
 
389
 
 
390
    # TODO: This should be moved to authorization tests once they
 
391
    # are written.
 
392
 
 
393
    def test_url_parsing(self):
 
394
        f = FakeManager()
 
395
        url = http.extract_auth('http://example.com', f)
 
396
        self.assertEqual('http://example.com', url)
 
397
        self.assertEqual(0, len(f.credentials))
 
398
        url = http.extract_auth(
 
399
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
 
400
        self.assertEqual('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
 
401
        self.assertEqual(1, len(f.credentials))
 
402
        self.assertEqual([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
403
                         f.credentials[0])
 
404
 
 
405
 
 
406
class TestHttpTransportUrls(tests.TestCase):
 
407
    """Test the http urls."""
 
408
 
 
409
    def test_abs_url(self):
 
410
        """Construction of absolute http URLs"""
 
411
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
412
        eq = self.assertEqualDiff
 
413
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
 
414
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
 
415
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
 
416
        eq(t.abspath('.bzr/1//2/./3'),
 
417
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
 
418
 
 
419
    def test_invalid_http_urls(self):
 
420
        """Trap invalid construction of urls"""
 
421
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
422
        self.assertRaises(errors.InvalidURL,
 
423
                          self._transport,
 
424
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
 
425
 
 
426
    def test_http_root_urls(self):
 
427
        """Construction of URLs from server root"""
 
428
        t = self._transport('http://bzr.ozlabs.org/')
 
429
        eq = self.assertEqualDiff
 
430
        eq(t.abspath('.bzr/tree-version'),
 
431
           'http://bzr.ozlabs.org/.bzr/tree-version')
 
432
 
 
433
    def test_http_impl_urls(self):
 
434
        """There are servers which ask for particular clients to connect"""
 
435
        server = self._server()
 
436
        server.setUp()
 
437
        try:
 
438
            url = server.get_url()
 
439
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
 
440
        finally:
 
441
            server.tearDown()
 
442
 
 
443
 
 
444
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
 
445
 
 
446
    # TODO: This should really be moved into another pycurl
 
447
    # specific test. When https tests will be implemented, take
 
448
    # this one into account.
 
449
    def test_pycurl_without_https_support(self):
 
450
        """Test that pycurl without SSL do not fail with a traceback.
 
451
 
 
452
        For the purpose of the test, we force pycurl to ignore
 
453
        https by supplying a fake version_info that do not
 
454
        support it.
 
455
        """
 
456
        try:
 
457
            import pycurl
 
458
        except ImportError:
 
459
            raise tests.TestSkipped('pycurl not present')
 
460
 
 
461
        version_info_orig = pycurl.version_info
 
462
        try:
 
463
            # Now that we have pycurl imported, we can fake its version_info
 
464
            # This was taken from a windows pycurl without SSL
 
465
            # (thanks to bialix)
 
466
            pycurl.version_info = lambda : (2,
 
467
                                            '7.13.2',
 
468
                                            462082,
 
469
                                            'i386-pc-win32',
 
470
                                            2576,
 
471
                                            None,
 
472
                                            0,
 
473
                                            None,
 
474
                                            ('ftp', 'gopher', 'telnet',
 
475
                                             'dict', 'ldap', 'http', 'file'),
 
476
                                            None,
 
477
                                            0,
 
478
                                            None)
 
479
            self.assertRaises(errors.DependencyNotPresent, self._transport,
 
480
                              'https://launchpad.net')
 
481
        finally:
 
482
            # Restore the right function
 
483
            pycurl.version_info = version_info_orig
 
484
 
 
485
 
 
486
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
 
487
    """Test the http connections."""
 
488
 
 
489
    def setUp(self):
 
490
        http_utils.TestCaseWithWebserver.setUp(self)
 
491
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
 
492
                        transport=self.get_transport())
 
493
 
 
494
    def test_http_has(self):
 
495
        server = self.get_readonly_server()
 
496
        t = self._transport(server.get_url())
 
497
        self.assertEqual(t.has('foo/bar'), True)
 
498
        self.assertEqual(len(server.logs), 1)
 
499
        self.assertContainsRe(server.logs[0],
 
500
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
 
501
 
 
502
    def test_http_has_not_found(self):
 
503
        server = self.get_readonly_server()
 
504
        t = self._transport(server.get_url())
 
505
        self.assertEqual(t.has('not-found'), False)
 
506
        self.assertContainsRe(server.logs[1],
 
507
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
 
508
 
 
509
    def test_http_get(self):
 
510
        server = self.get_readonly_server()
 
511
        t = self._transport(server.get_url())
 
512
        fp = t.get('foo/bar')
 
513
        self.assertEqualDiff(
 
514
            fp.read(),
 
515
            'contents of foo/bar\n')
 
516
        self.assertEqual(len(server.logs), 1)
 
517
        self.assertTrue(server.logs[0].find(
 
518
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
 
519
            % bzrlib.__version__) > -1)
 
520
 
 
521
    def test_has_on_bogus_host(self):
 
522
        # Get a free address and don't 'accept' on it, so that we
 
523
        # can be sure there is no http handler there, but set a
 
524
        # reasonable timeout to not slow down tests too much.
 
525
        default_timeout = socket.getdefaulttimeout()
 
526
        try:
 
527
            socket.setdefaulttimeout(2)
 
528
            s = socket.socket()
 
529
            s.bind(('localhost', 0))
 
530
            t = self._transport('http://%s:%s/' % s.getsockname())
 
531
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
 
532
        finally:
 
533
            socket.setdefaulttimeout(default_timeout)
 
534
 
 
535
 
 
536
class TestHttpTransportRegistration(tests.TestCase):
 
537
    """Test registrations of various http implementations"""
 
538
 
 
539
    def test_http_registered(self):
 
540
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
 
541
        self.assertIsInstance(t, transport.Transport)
 
542
        self.assertIsInstance(t, self._transport)
 
543
 
 
544
 
 
545
class TestPost(tests.TestCase):
 
546
 
 
547
    def test_post_body_is_received(self):
 
548
        server = RecordingServer(expect_body_tail='end-of-body',
 
549
            scheme=self._qualified_prefix)
 
550
        self.start_server(server)
 
551
        url = server.get_url()
 
552
        http_transport = self._transport(url)
 
553
        code, response = http_transport._post('abc def end-of-body')
 
554
        self.assertTrue(
 
555
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
 
556
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
 
557
        # The transport should not be assuming that the server can accept
 
558
        # chunked encoding the first time it connects, because HTTP/1.1, so we
 
559
        # check for the literal string.
 
560
        self.assertTrue(
 
561
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
 
562
 
 
563
 
 
564
class TestRangeHeader(tests.TestCase):
 
565
    """Test range_header method"""
 
566
 
 
567
    def check_header(self, value, ranges=[], tail=0):
 
568
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
569
        coalesce = transport.Transport._coalesce_offsets
 
570
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
571
        range_header = http.HttpTransportBase._range_header
 
572
        self.assertEqual(value, range_header(coalesced, tail))
 
573
 
 
574
    def test_range_header_single(self):
 
575
        self.check_header('0-9', ranges=[(0,9)])
 
576
        self.check_header('100-109', ranges=[(100,109)])
 
577
 
 
578
    def test_range_header_tail(self):
 
579
        self.check_header('-10', tail=10)
 
580
        self.check_header('-50', tail=50)
 
581
 
 
582
    def test_range_header_multi(self):
 
583
        self.check_header('0-9,100-200,300-5000',
 
584
                          ranges=[(0,9), (100, 200), (300,5000)])
 
585
 
 
586
    def test_range_header_mixed(self):
 
587
        self.check_header('0-9,300-5000,-50',
 
588
                          ranges=[(0,9), (300,5000)],
 
589
                          tail=50)
 
590
 
 
591
 
 
592
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
 
593
    """Tests a specific request handler.
 
594
 
 
595
    Daughter classes are expected to override _req_handler_class
 
596
    """
 
597
 
 
598
    # Provide a useful default
 
599
    _req_handler_class = http_server.TestingHTTPRequestHandler
 
600
 
 
601
    def create_transport_readonly_server(self):
 
602
        return http_server.HttpServer(self._req_handler_class,
 
603
                                      protocol_version=self._protocol_version)
 
604
 
 
605
    def _testing_pycurl(self):
 
606
        return pycurl_present and self._transport == PyCurlTransport
 
607
 
 
608
 
 
609
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
 
610
    """Whatever request comes in, close the connection"""
 
611
 
 
612
    def handle_one_request(self):
 
613
        """Handle a single HTTP request, by abruptly closing the connection"""
 
614
        self.close_connection = 1
 
615
 
 
616
 
 
617
class TestWallServer(TestSpecificRequestHandler):
 
618
    """Tests exceptions during the connection phase"""
 
619
 
 
620
    _req_handler_class = WallRequestHandler
 
621
 
 
622
    def test_http_has(self):
 
623
        server = self.get_readonly_server()
 
624
        t = self._transport(server.get_url())
 
625
        # Unfortunately httplib (see HTTPResponse._read_status
 
626
        # for details) make no distinction between a closed
 
627
        # socket and badly formatted status line, so we can't
 
628
        # just test for ConnectionError, we have to test
 
629
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
 
630
        # instead of ConnectionError too.
 
631
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
 
632
                            errors.InvalidHttpResponse),
 
633
                          t.has, 'foo/bar')
 
634
 
 
635
    def test_http_get(self):
 
636
        server = self.get_readonly_server()
 
637
        t = self._transport(server.get_url())
 
638
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
 
639
                           errors.InvalidHttpResponse),
 
640
                          t.get, 'foo/bar')
 
641
 
 
642
 
 
643
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
644
    """Whatever request comes in, returns a bad status"""
 
645
 
 
646
    def parse_request(self):
 
647
        """Fakes handling a single HTTP request, returns a bad status"""
 
648
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
649
        self.send_response(0, "Bad status")
 
650
        self.close_connection = 1
 
651
        return False
 
652
 
 
653
 
 
654
class TestBadStatusServer(TestSpecificRequestHandler):
 
655
    """Tests bad status from server."""
 
656
 
 
657
    _req_handler_class = BadStatusRequestHandler
 
658
 
 
659
    def test_http_has(self):
 
660
        server = self.get_readonly_server()
 
661
        t = self._transport(server.get_url())
 
662
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
663
 
 
664
    def test_http_get(self):
 
665
        server = self.get_readonly_server()
 
666
        t = self._transport(server.get_url())
 
667
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
668
 
 
669
 
 
670
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
671
    """Whatever request comes in, returns an invalid status"""
 
672
 
 
673
    def parse_request(self):
 
674
        """Fakes handling a single HTTP request, returns a bad status"""
 
675
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
676
        self.wfile.write("Invalid status line\r\n")
 
677
        return False
 
678
 
 
679
 
 
680
class TestInvalidStatusServer(TestBadStatusServer):
 
681
    """Tests invalid status from server.
 
682
 
 
683
    Both implementations raises the same error as for a bad status.
 
684
    """
 
685
 
 
686
    _req_handler_class = InvalidStatusRequestHandler
 
687
 
 
688
    def test_http_has(self):
 
689
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
690
            raise tests.KnownFailure(
 
691
                'pycurl hangs if the server send back garbage')
 
692
        super(TestInvalidStatusServer, self).test_http_has()
 
693
 
 
694
    def test_http_get(self):
 
695
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
696
            raise tests.KnownFailure(
 
697
                'pycurl hangs if the server send back garbage')
 
698
        super(TestInvalidStatusServer, self).test_http_get()
 
699
 
 
700
 
 
701
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
 
702
    """Whatever request comes in, returns a bad protocol version"""
 
703
 
 
704
    def parse_request(self):
 
705
        """Fakes handling a single HTTP request, returns a bad status"""
 
706
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
707
        # Returns an invalid protocol version, but curl just
 
708
        # ignores it and those cannot be tested.
 
709
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
 
710
                                           404,
 
711
                                           'Look at my protocol version'))
 
712
        return False
 
713
 
 
714
 
 
715
class TestBadProtocolServer(TestSpecificRequestHandler):
 
716
    """Tests bad protocol from server."""
 
717
 
 
718
    _req_handler_class = BadProtocolRequestHandler
 
719
 
 
720
    def setUp(self):
 
721
        if pycurl_present and self._transport == PyCurlTransport:
 
722
            raise tests.TestNotApplicable(
 
723
                "pycurl doesn't check the protocol version")
 
724
        super(TestBadProtocolServer, self).setUp()
 
725
 
 
726
    def test_http_has(self):
 
727
        server = self.get_readonly_server()
 
728
        t = self._transport(server.get_url())
 
729
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
730
 
 
731
    def test_http_get(self):
 
732
        server = self.get_readonly_server()
 
733
        t = self._transport(server.get_url())
 
734
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
735
 
 
736
 
 
737
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
738
    """Whatever request comes in, returns a 403 code"""
 
739
 
 
740
    def parse_request(self):
 
741
        """Handle a single HTTP request, by replying we cannot handle it"""
 
742
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
743
        self.send_error(403)
 
744
        return False
 
745
 
 
746
 
 
747
class TestForbiddenServer(TestSpecificRequestHandler):
 
748
    """Tests forbidden server"""
 
749
 
 
750
    _req_handler_class = ForbiddenRequestHandler
 
751
 
 
752
    def test_http_has(self):
 
753
        server = self.get_readonly_server()
 
754
        t = self._transport(server.get_url())
 
755
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
 
756
 
 
757
    def test_http_get(self):
 
758
        server = self.get_readonly_server()
 
759
        t = self._transport(server.get_url())
 
760
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
 
761
 
 
762
 
 
763
class TestRecordingServer(tests.TestCase):
 
764
 
 
765
    def test_create(self):
 
766
        server = RecordingServer(expect_body_tail=None)
 
767
        self.assertEqual('', server.received_bytes)
 
768
        self.assertEqual(None, server.host)
 
769
        self.assertEqual(None, server.port)
 
770
 
 
771
    def test_setUp_and_tearDown(self):
 
772
        server = RecordingServer(expect_body_tail=None)
 
773
        server.setUp()
 
774
        try:
 
775
            self.assertNotEqual(None, server.host)
 
776
            self.assertNotEqual(None, server.port)
 
777
        finally:
 
778
            server.tearDown()
 
779
        self.assertEqual(None, server.host)
 
780
        self.assertEqual(None, server.port)
 
781
 
 
782
    def test_send_receive_bytes(self):
 
783
        server = RecordingServer(expect_body_tail='c', scheme='http')
 
784
        self.start_server(server)
 
785
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
786
        sock.connect((server.host, server.port))
 
787
        sock.sendall('abc')
 
788
        self.assertEqual('HTTP/1.1 200 OK\r\n',
 
789
                         osutils.recv_all(sock, 4096))
 
790
        self.assertEqual('abc', server.received_bytes)
 
791
 
 
792
 
 
793
class TestRangeRequestServer(TestSpecificRequestHandler):
 
794
    """Tests readv requests against server.
 
795
 
 
796
    We test against default "normal" server.
 
797
    """
 
798
 
 
799
    def setUp(self):
 
800
        super(TestRangeRequestServer, self).setUp()
 
801
        self.build_tree_contents([('a', '0123456789')],)
 
802
 
 
803
    def test_readv(self):
 
804
        server = self.get_readonly_server()
 
805
        t = self._transport(server.get_url())
 
806
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
807
        self.assertEqual(l[0], (0, '0'))
 
808
        self.assertEqual(l[1], (1, '1'))
 
809
        self.assertEqual(l[2], (3, '34'))
 
810
        self.assertEqual(l[3], (9, '9'))
 
811
 
 
812
    def test_readv_out_of_order(self):
 
813
        server = self.get_readonly_server()
 
814
        t = self._transport(server.get_url())
 
815
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
816
        self.assertEqual(l[0], (1, '1'))
 
817
        self.assertEqual(l[1], (9, '9'))
 
818
        self.assertEqual(l[2], (0, '0'))
 
819
        self.assertEqual(l[3], (3, '34'))
 
820
 
 
821
    def test_readv_invalid_ranges(self):
 
822
        server = self.get_readonly_server()
 
823
        t = self._transport(server.get_url())
 
824
 
 
825
        # This is intentionally reading off the end of the file
 
826
        # since we are sure that it cannot get there
 
827
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
828
                              t.readv, 'a', [(1,1), (8,10)])
 
829
 
 
830
        # This is trying to seek past the end of the file, it should
 
831
        # also raise a special error
 
832
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
833
                              t.readv, 'a', [(12,2)])
 
834
 
 
835
    def test_readv_multiple_get_requests(self):
 
836
        server = self.get_readonly_server()
 
837
        t = self._transport(server.get_url())
 
838
        # force transport to issue multiple requests
 
839
        t._max_readv_combine = 1
 
840
        t._max_get_ranges = 1
 
841
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
842
        self.assertEqual(l[0], (0, '0'))
 
843
        self.assertEqual(l[1], (1, '1'))
 
844
        self.assertEqual(l[2], (3, '34'))
 
845
        self.assertEqual(l[3], (9, '9'))
 
846
        # The server should have issued 4 requests
 
847
        self.assertEqual(4, server.GET_request_nb)
 
848
 
 
849
    def test_readv_get_max_size(self):
 
850
        server = self.get_readonly_server()
 
851
        t = self._transport(server.get_url())
 
852
        # force transport to issue multiple requests by limiting the number of
 
853
        # bytes by request. Note that this apply to coalesced offsets only, a
 
854
        # single range will keep its size even if bigger than the limit.
 
855
        t._get_max_size = 2
 
856
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
857
        self.assertEqual(l[0], (0, '0'))
 
858
        self.assertEqual(l[1], (1, '1'))
 
859
        self.assertEqual(l[2], (2, '2345'))
 
860
        self.assertEqual(l[3], (6, '6789'))
 
861
        # The server should have issued 3 requests
 
862
        self.assertEqual(3, server.GET_request_nb)
 
863
 
 
864
    def test_complete_readv_leave_pipe_clean(self):
 
865
        server = self.get_readonly_server()
 
866
        t = self._transport(server.get_url())
 
867
        # force transport to issue multiple requests
 
868
        t._get_max_size = 2
 
869
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
870
        # The server should have issued 3 requests
 
871
        self.assertEqual(3, server.GET_request_nb)
 
872
        self.assertEqual('0123456789', t.get_bytes('a'))
 
873
        self.assertEqual(4, server.GET_request_nb)
 
874
 
 
875
    def test_incomplete_readv_leave_pipe_clean(self):
 
876
        server = self.get_readonly_server()
 
877
        t = self._transport(server.get_url())
 
878
        # force transport to issue multiple requests
 
879
        t._get_max_size = 2
 
880
        # Don't collapse readv results into a list so that we leave unread
 
881
        # bytes on the socket
 
882
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
883
        self.assertEqual((0, '0'), ireadv.next())
 
884
        # The server should have issued one request so far
 
885
        self.assertEqual(1, server.GET_request_nb)
 
886
        self.assertEqual('0123456789', t.get_bytes('a'))
 
887
        # get_bytes issued an additional request, the readv pending ones are
 
888
        # lost
 
889
        self.assertEqual(2, server.GET_request_nb)
 
890
 
 
891
 
 
892
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
893
    """Always reply to range request as if they were single.
 
894
 
 
895
    Don't be explicit about it, just to annoy the clients.
 
896
    """
 
897
 
 
898
    def get_multiple_ranges(self, file, file_size, ranges):
 
899
        """Answer as if it was a single range request and ignores the rest"""
 
900
        (start, end) = ranges[0]
 
901
        return self.get_single_range(file, file_size, start, end)
 
902
 
 
903
 
 
904
class TestSingleRangeRequestServer(TestRangeRequestServer):
 
905
    """Test readv against a server which accept only single range requests"""
 
906
 
 
907
    _req_handler_class = SingleRangeRequestHandler
 
908
 
 
909
 
 
910
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
911
    """Only reply to simple range requests, errors out on multiple"""
 
912
 
 
913
    def get_multiple_ranges(self, file, file_size, ranges):
 
914
        """Refuses the multiple ranges request"""
 
915
        if len(ranges) > 1:
 
916
            file.close()
 
917
            self.send_error(416, "Requested range not satisfiable")
 
918
            return
 
919
        (start, end) = ranges[0]
 
920
        return self.get_single_range(file, file_size, start, end)
 
921
 
 
922
 
 
923
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
 
924
    """Test readv against a server which only accept single range requests"""
 
925
 
 
926
    _req_handler_class = SingleOnlyRangeRequestHandler
 
927
 
 
928
 
 
929
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
930
    """Ignore range requests without notice"""
 
931
 
 
932
    def do_GET(self):
 
933
        # Update the statistics
 
934
        self.server.test_case_server.GET_request_nb += 1
 
935
        # Just bypass the range handling done by TestingHTTPRequestHandler
 
936
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
 
937
 
 
938
 
 
939
class TestNoRangeRequestServer(TestRangeRequestServer):
 
940
    """Test readv against a server which do not accept range requests"""
 
941
 
 
942
    _req_handler_class = NoRangeRequestHandler
 
943
 
 
944
 
 
945
class MultipleRangeWithoutContentLengthRequestHandler(
 
946
    http_server.TestingHTTPRequestHandler):
 
947
    """Reply to multiple range requests without content length header."""
 
948
 
 
949
    def get_multiple_ranges(self, file, file_size, ranges):
 
950
        self.send_response(206)
 
951
        self.send_header('Accept-Ranges', 'bytes')
 
952
        boundary = "%d" % random.randint(0,0x7FFFFFFF)
 
953
        self.send_header("Content-Type",
 
954
                         "multipart/byteranges; boundary=%s" % boundary)
 
955
        self.end_headers()
 
956
        for (start, end) in ranges:
 
957
            self.wfile.write("--%s\r\n" % boundary)
 
958
            self.send_header("Content-type", 'application/octet-stream')
 
959
            self.send_header("Content-Range", "bytes %d-%d/%d" % (start,
 
960
                                                                  end,
 
961
                                                                  file_size))
 
962
            self.end_headers()
 
963
            self.send_range_content(file, start, end - start + 1)
 
964
        # Final boundary
 
965
        self.wfile.write("--%s\r\n" % boundary)
 
966
 
 
967
 
 
968
class TestMultipleRangeWithoutContentLengthServer(TestRangeRequestServer):
 
969
 
 
970
    _req_handler_class = MultipleRangeWithoutContentLengthRequestHandler
 
971
 
 
972
 
 
973
class TruncatedMultipleRangeRequestHandler(
 
974
    http_server.TestingHTTPRequestHandler):
 
975
    """Reply to multiple range requests truncating the last ones.
 
976
 
 
977
    This server generates responses whose Content-Length describes all the
 
978
    ranges, but fail to include the last ones leading to client short reads.
 
979
    This has been observed randomly with lighttpd (bug #179368).
 
980
    """
 
981
 
 
982
    _truncated_ranges = 2
 
983
 
 
984
    def get_multiple_ranges(self, file, file_size, ranges):
 
985
        self.send_response(206)
 
986
        self.send_header('Accept-Ranges', 'bytes')
 
987
        boundary = 'tagada'
 
988
        self.send_header('Content-Type',
 
989
                         'multipart/byteranges; boundary=%s' % boundary)
 
990
        boundary_line = '--%s\r\n' % boundary
 
991
        # Calculate the Content-Length
 
992
        content_length = 0
 
993
        for (start, end) in ranges:
 
994
            content_length += len(boundary_line)
 
995
            content_length += self._header_line_length(
 
996
                'Content-type', 'application/octet-stream')
 
997
            content_length += self._header_line_length(
 
998
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
 
999
            content_length += len('\r\n') # end headers
 
1000
            content_length += end - start # + 1
 
1001
        content_length += len(boundary_line)
 
1002
        self.send_header('Content-length', content_length)
 
1003
        self.end_headers()
 
1004
 
 
1005
        # Send the multipart body
 
1006
        cur = 0
 
1007
        for (start, end) in ranges:
 
1008
            self.wfile.write(boundary_line)
 
1009
            self.send_header('Content-type', 'application/octet-stream')
 
1010
            self.send_header('Content-Range', 'bytes %d-%d/%d'
 
1011
                             % (start, end, file_size))
 
1012
            self.end_headers()
 
1013
            if cur + self._truncated_ranges >= len(ranges):
 
1014
                # Abruptly ends the response and close the connection
 
1015
                self.close_connection = 1
 
1016
                return
 
1017
            self.send_range_content(file, start, end - start + 1)
 
1018
            cur += 1
 
1019
        # No final boundary
 
1020
        self.wfile.write(boundary_line)
 
1021
 
 
1022
 
 
1023
class TestTruncatedMultipleRangeServer(TestSpecificRequestHandler):
 
1024
 
 
1025
    _req_handler_class = TruncatedMultipleRangeRequestHandler
 
1026
 
 
1027
    def setUp(self):
 
1028
        super(TestTruncatedMultipleRangeServer, self).setUp()
 
1029
        self.build_tree_contents([('a', '0123456789')],)
 
1030
 
 
1031
    def test_readv_with_short_reads(self):
 
1032
        server = self.get_readonly_server()
 
1033
        t = self._transport(server.get_url())
 
1034
        # Force separate ranges for each offset
 
1035
        t._bytes_to_read_before_seek = 0
 
1036
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
 
1037
        self.assertEqual((0, '0'), ireadv.next())
 
1038
        self.assertEqual((2, '2'), ireadv.next())
 
1039
        if not self._testing_pycurl():
 
1040
            # Only one request have been issued so far (except for pycurl that
 
1041
            # try to read the whole response at once)
 
1042
            self.assertEqual(1, server.GET_request_nb)
 
1043
        self.assertEqual((4, '45'), ireadv.next())
 
1044
        self.assertEqual((9, '9'), ireadv.next())
 
1045
        # Both implementations issue 3 requests but:
 
1046
        # - urllib does two multiple (4 ranges, then 2 ranges) then a single
 
1047
        #   range,
 
1048
        # - pycurl does two multiple (4 ranges, 4 ranges) then a single range
 
1049
        self.assertEqual(3, server.GET_request_nb)
 
1050
        # Finally the client have tried a single range request and stays in
 
1051
        # that mode
 
1052
        self.assertEqual('single', t._range_hint)
 
1053
 
 
1054
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
1055
    """Errors out when range specifiers exceed the limit"""
 
1056
 
 
1057
    def get_multiple_ranges(self, file, file_size, ranges):
 
1058
        """Refuses the multiple ranges request"""
 
1059
        tcs = self.server.test_case_server
 
1060
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
 
1061
            file.close()
 
1062
            # Emulate apache behavior
 
1063
            self.send_error(400, "Bad Request")
 
1064
            return
 
1065
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
 
1066
            self, file, file_size, ranges)
 
1067
 
 
1068
 
 
1069
class LimitedRangeHTTPServer(http_server.HttpServer):
 
1070
    """An HttpServer erroring out on requests with too much range specifiers"""
 
1071
 
 
1072
    def __init__(self, request_handler=LimitedRangeRequestHandler,
 
1073
                 protocol_version=None,
 
1074
                 range_limit=None):
 
1075
        http_server.HttpServer.__init__(self, request_handler,
 
1076
                                        protocol_version=protocol_version)
 
1077
        self.range_limit = range_limit
 
1078
 
 
1079
 
 
1080
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
 
1081
    """Tests readv requests against a server erroring out on too much ranges."""
 
1082
 
 
1083
    # Requests with more range specifiers will error out
 
1084
    range_limit = 3
 
1085
 
 
1086
    def create_transport_readonly_server(self):
 
1087
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
 
1088
                                      protocol_version=self._protocol_version)
 
1089
 
 
1090
    def get_transport(self):
 
1091
        return self._transport(self.get_readonly_server().get_url())
 
1092
 
 
1093
    def setUp(self):
 
1094
        http_utils.TestCaseWithWebserver.setUp(self)
 
1095
        # We need to manipulate ranges that correspond to real chunks in the
 
1096
        # response, so we build a content appropriately.
 
1097
        filler = ''.join(['abcdefghij' for x in range(102)])
 
1098
        content = ''.join(['%04d' % v + filler for v in range(16)])
 
1099
        self.build_tree_contents([('a', content)],)
 
1100
 
 
1101
    def test_few_ranges(self):
 
1102
        t = self.get_transport()
 
1103
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
 
1104
        self.assertEqual(l[0], (0, '0000'))
 
1105
        self.assertEqual(l[1], (1024, '0001'))
 
1106
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
 
1107
 
 
1108
    def test_more_ranges(self):
 
1109
        t = self.get_transport()
 
1110
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
 
1111
        self.assertEqual(l[0], (0, '0000'))
 
1112
        self.assertEqual(l[1], (1024, '0001'))
 
1113
        self.assertEqual(l[2], (4096, '0004'))
 
1114
        self.assertEqual(l[3], (8192, '0008'))
 
1115
        # The server will refuse to serve the first request (too much ranges),
 
1116
        # a second request will succeed.
 
1117
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
 
1118
 
 
1119
 
 
1120
class TestHttpProxyWhiteBox(tests.TestCase):
 
1121
    """Whitebox test proxy http authorization.
 
1122
 
 
1123
    Only the urllib implementation is tested here.
 
1124
    """
 
1125
 
 
1126
    def setUp(self):
 
1127
        tests.TestCase.setUp(self)
 
1128
        self._old_env = {}
 
1129
 
 
1130
    def tearDown(self):
 
1131
        self._restore_env()
 
1132
        tests.TestCase.tearDown(self)
 
1133
 
 
1134
    def _install_env(self, env):
 
1135
        for name, value in env.iteritems():
 
1136
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1137
 
 
1138
    def _restore_env(self):
 
1139
        for name, value in self._old_env.iteritems():
 
1140
            osutils.set_or_unset_env(name, value)
 
1141
 
 
1142
    def _proxied_request(self):
 
1143
        handler = _urllib2_wrappers.ProxyHandler()
 
1144
        request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
 
1145
        handler.set_proxy(request, 'http')
 
1146
        return request
 
1147
 
 
1148
    def test_empty_user(self):
 
1149
        self._install_env({'http_proxy': 'http://bar.com'})
 
1150
        request = self._proxied_request()
 
1151
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
 
1152
 
 
1153
    def test_invalid_proxy(self):
 
1154
        """A proxy env variable without scheme"""
 
1155
        self._install_env({'http_proxy': 'host:1234'})
 
1156
        self.assertRaises(errors.InvalidURL, self._proxied_request)
 
1157
 
 
1158
 
 
1159
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
 
1160
    """Tests proxy server.
 
1161
 
 
1162
    Be aware that we do not setup a real proxy here. Instead, we
 
1163
    check that the *connection* goes through the proxy by serving
 
1164
    different content (the faked proxy server append '-proxied'
 
1165
    to the file names).
 
1166
    """
 
1167
 
 
1168
    # FIXME: We don't have an https server available, so we don't
 
1169
    # test https connections.
 
1170
 
 
1171
    def setUp(self):
 
1172
        super(TestProxyHttpServer, self).setUp()
 
1173
        self.build_tree_contents([('foo', 'contents of foo\n'),
 
1174
                                  ('foo-proxied', 'proxied contents of foo\n')])
 
1175
        # Let's setup some attributes for tests
 
1176
        self.server = self.get_readonly_server()
 
1177
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
 
1178
        if self._testing_pycurl():
 
1179
            # Oh my ! pycurl does not check for the port as part of
 
1180
            # no_proxy :-( So we just test the host part
 
1181
            self.no_proxy_host = self.server.host
 
1182
        else:
 
1183
            self.no_proxy_host = self.proxy_address
 
1184
        # The secondary server is the proxy
 
1185
        self.proxy = self.get_secondary_server()
 
1186
        self.proxy_url = self.proxy.get_url()
 
1187
        self._old_env = {}
 
1188
 
 
1189
    def _testing_pycurl(self):
 
1190
        return pycurl_present and self._transport == PyCurlTransport
 
1191
 
 
1192
    def create_transport_secondary_server(self):
 
1193
        """Creates an http server that will serve files with
 
1194
        '-proxied' appended to their names.
 
1195
        """
 
1196
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
 
1197
 
 
1198
    def _install_env(self, env):
 
1199
        for name, value in env.iteritems():
 
1200
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1201
 
 
1202
    def _restore_env(self):
 
1203
        for name, value in self._old_env.iteritems():
 
1204
            osutils.set_or_unset_env(name, value)
 
1205
 
 
1206
    def proxied_in_env(self, env):
 
1207
        self._install_env(env)
 
1208
        url = self.server.get_url()
 
1209
        t = self._transport(url)
 
1210
        try:
 
1211
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1212
        finally:
 
1213
            self._restore_env()
 
1214
 
 
1215
    def not_proxied_in_env(self, env):
 
1216
        self._install_env(env)
 
1217
        url = self.server.get_url()
 
1218
        t = self._transport(url)
 
1219
        try:
 
1220
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1221
        finally:
 
1222
            self._restore_env()
 
1223
 
 
1224
    def test_http_proxy(self):
 
1225
        self.proxied_in_env({'http_proxy': self.proxy_url})
 
1226
 
 
1227
    def test_HTTP_PROXY(self):
 
1228
        if self._testing_pycurl():
 
1229
            # pycurl does not check HTTP_PROXY for security reasons
 
1230
            # (for use in a CGI context that we do not care
 
1231
            # about. Should we ?)
 
1232
            raise tests.TestNotApplicable(
 
1233
                'pycurl does not check HTTP_PROXY for security reasons')
 
1234
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
 
1235
 
 
1236
    def test_all_proxy(self):
 
1237
        self.proxied_in_env({'all_proxy': self.proxy_url})
 
1238
 
 
1239
    def test_ALL_PROXY(self):
 
1240
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
 
1241
 
 
1242
    def test_http_proxy_with_no_proxy(self):
 
1243
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
 
1244
                                 'no_proxy': self.no_proxy_host})
 
1245
 
 
1246
    def test_HTTP_PROXY_with_NO_PROXY(self):
 
1247
        if self._testing_pycurl():
 
1248
            raise tests.TestNotApplicable(
 
1249
                'pycurl does not check HTTP_PROXY for security reasons')
 
1250
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
 
1251
                                 'NO_PROXY': self.no_proxy_host})
 
1252
 
 
1253
    def test_all_proxy_with_no_proxy(self):
 
1254
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
 
1255
                                 'no_proxy': self.no_proxy_host})
 
1256
 
 
1257
    def test_ALL_PROXY_with_NO_PROXY(self):
 
1258
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
 
1259
                                 'NO_PROXY': self.no_proxy_host})
 
1260
 
 
1261
    def test_http_proxy_without_scheme(self):
 
1262
        if self._testing_pycurl():
 
1263
            # pycurl *ignores* invalid proxy env variables. If that ever change
 
1264
            # in the future, this test will fail indicating that pycurl do not
 
1265
            # ignore anymore such variables.
 
1266
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
 
1267
        else:
 
1268
            self.assertRaises(errors.InvalidURL,
 
1269
                              self.proxied_in_env,
 
1270
                              {'http_proxy': self.proxy_address})
 
1271
 
 
1272
 
 
1273
class TestRanges(http_utils.TestCaseWithWebserver):
 
1274
    """Test the Range header in GET methods."""
 
1275
 
 
1276
    def setUp(self):
 
1277
        http_utils.TestCaseWithWebserver.setUp(self)
 
1278
        self.build_tree_contents([('a', '0123456789')],)
 
1279
        server = self.get_readonly_server()
 
1280
        self.transport = self._transport(server.get_url())
 
1281
 
 
1282
    def create_transport_readonly_server(self):
 
1283
        return http_server.HttpServer(protocol_version=self._protocol_version)
 
1284
 
 
1285
    def _file_contents(self, relpath, ranges):
 
1286
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
1287
        coalesce = self.transport._coalesce_offsets
 
1288
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
1289
        code, data = self.transport._get(relpath, coalesced)
 
1290
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1291
        for start, end in ranges:
 
1292
            data.seek(start)
 
1293
            yield data.read(end - start + 1)
 
1294
 
 
1295
    def _file_tail(self, relpath, tail_amount):
 
1296
        code, data = self.transport._get(relpath, [], tail_amount)
 
1297
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1298
        data.seek(-tail_amount, 2)
 
1299
        return data.read(tail_amount)
 
1300
 
 
1301
    def test_range_header(self):
 
1302
        # Valid ranges
 
1303
        map(self.assertEqual,['0', '234'],
 
1304
            list(self._file_contents('a', [(0,0), (2,4)])),)
 
1305
 
 
1306
    def test_range_header_tail(self):
 
1307
        self.assertEqual('789', self._file_tail('a', 3))
 
1308
 
 
1309
    def test_syntactically_invalid_range_header(self):
 
1310
        self.assertListRaises(errors.InvalidHttpRange,
 
1311
                          self._file_contents, 'a', [(4, 3)])
 
1312
 
 
1313
    def test_semantically_invalid_range_header(self):
 
1314
        self.assertListRaises(errors.InvalidHttpRange,
 
1315
                          self._file_contents, 'a', [(42, 128)])
 
1316
 
 
1317
 
 
1318
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1319
    """Test redirection between http servers."""
 
1320
 
 
1321
    def create_transport_secondary_server(self):
 
1322
        """Create the secondary server redirecting to the primary server"""
 
1323
        new = self.get_readonly_server()
 
1324
 
 
1325
        redirecting = http_utils.HTTPServerRedirecting(
 
1326
            protocol_version=self._protocol_version)
 
1327
        redirecting.redirect_to(new.host, new.port)
 
1328
        return redirecting
 
1329
 
 
1330
    def setUp(self):
 
1331
        super(TestHTTPRedirections, self).setUp()
 
1332
        self.build_tree_contents([('a', '0123456789'),
 
1333
                                  ('bundle',
 
1334
                                  '# Bazaar revision bundle v0.9\n#\n')
 
1335
                                  ],)
 
1336
        # The requests to the old server will be redirected to the new server
 
1337
        self.old_transport = self._transport(self.old_server.get_url())
 
1338
 
 
1339
    def test_redirected(self):
 
1340
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
 
1341
        t = self._transport(self.new_server.get_url())
 
1342
        self.assertEqual('0123456789', t.get('a').read())
 
1343
 
 
1344
    def test_read_redirected_bundle_from_url(self):
 
1345
        from bzrlib.bundle import read_bundle_from_url
 
1346
        url = self.old_transport.abspath('bundle')
 
1347
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
 
1348
                read_bundle_from_url, url)
 
1349
        # If read_bundle_from_url was successful we get an empty bundle
 
1350
        self.assertEqual([], bundle.revisions)
 
1351
 
 
1352
 
 
1353
class RedirectedRequest(_urllib2_wrappers.Request):
 
1354
    """Request following redirections. """
 
1355
 
 
1356
    init_orig = _urllib2_wrappers.Request.__init__
 
1357
 
 
1358
    def __init__(self, method, url, *args, **kwargs):
 
1359
        """Constructor.
 
1360
 
 
1361
        """
 
1362
        # Since the tests using this class will replace
 
1363
        # _urllib2_wrappers.Request, we can't just call the base class __init__
 
1364
        # or we'll loop.
 
1365
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
 
1366
        self.follow_redirections = True
 
1367
 
 
1368
 
 
1369
def install_redirected_request(test):
 
1370
    test.original_class = _urllib2_wrappers.Request
 
1371
    def restore():
 
1372
        _urllib2_wrappers.Request = test.original_class
 
1373
    _urllib2_wrappers.Request = RedirectedRequest
 
1374
    test.addCleanup(restore)
 
1375
 
 
1376
 
 
1377
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1378
    """Test redirections.
 
1379
 
 
1380
    http implementations do not redirect silently anymore (they
 
1381
    do not redirect at all in fact). The mechanism is still in
 
1382
    place at the _urllib2_wrappers.Request level and these tests
 
1383
    exercise it.
 
1384
 
 
1385
    For the pycurl implementation
 
1386
    the redirection have been deleted as we may deprecate pycurl
 
1387
    and I have no place to keep a working implementation.
 
1388
    -- vila 20070212
 
1389
    """
 
1390
 
 
1391
    def setUp(self):
 
1392
        if pycurl_present and self._transport == PyCurlTransport:
 
1393
            raise tests.TestNotApplicable(
 
1394
                "pycurl doesn't redirect silently annymore")
 
1395
        super(TestHTTPSilentRedirections, self).setUp()
 
1396
        install_redirected_request(self)
 
1397
        self.build_tree_contents([('a','a'),
 
1398
                                  ('1/',),
 
1399
                                  ('1/a', 'redirected once'),
 
1400
                                  ('2/',),
 
1401
                                  ('2/a', 'redirected twice'),
 
1402
                                  ('3/',),
 
1403
                                  ('3/a', 'redirected thrice'),
 
1404
                                  ('4/',),
 
1405
                                  ('4/a', 'redirected 4 times'),
 
1406
                                  ('5/',),
 
1407
                                  ('5/a', 'redirected 5 times'),
 
1408
                                  ],)
 
1409
 
 
1410
        self.old_transport = self._transport(self.old_server.get_url())
 
1411
 
 
1412
    def create_transport_secondary_server(self):
 
1413
        """Create the secondary server, redirections are defined in the tests"""
 
1414
        return http_utils.HTTPServerRedirecting(
 
1415
            protocol_version=self._protocol_version)
 
1416
 
 
1417
    def test_one_redirection(self):
 
1418
        t = self.old_transport
 
1419
 
 
1420
        req = RedirectedRequest('GET', t.abspath('a'))
 
1421
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1422
                                       self.new_server.port)
 
1423
        self.old_server.redirections = \
 
1424
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
1425
        self.assertEqual('redirected once',t._perform(req).read())
 
1426
 
 
1427
    def test_five_redirections(self):
 
1428
        t = self.old_transport
 
1429
 
 
1430
        req = RedirectedRequest('GET', t.abspath('a'))
 
1431
        old_prefix = 'http://%s:%s' % (self.old_server.host,
 
1432
                                       self.old_server.port)
 
1433
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1434
                                       self.new_server.port)
 
1435
        self.old_server.redirections = [
 
1436
            ('/1(.*)', r'%s/2\1' % (old_prefix), 302),
 
1437
            ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
 
1438
            ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
 
1439
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
 
1440
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
 
1441
            ]
 
1442
        self.assertEqual('redirected 5 times',t._perform(req).read())
 
1443
 
 
1444
 
 
1445
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1446
    """Test transport.do_catching_redirections."""
 
1447
 
 
1448
    def setUp(self):
 
1449
        super(TestDoCatchRedirections, self).setUp()
 
1450
        self.build_tree_contents([('a', '0123456789'),],)
 
1451
 
 
1452
        self.old_transport = self._transport(self.old_server.get_url())
 
1453
 
 
1454
    def get_a(self, transport):
 
1455
        return transport.get('a')
 
1456
 
 
1457
    def test_no_redirection(self):
 
1458
        t = self._transport(self.new_server.get_url())
 
1459
 
 
1460
        # We use None for redirected so that we fail if redirected
 
1461
        self.assertEqual('0123456789',
 
1462
                         transport.do_catching_redirections(
 
1463
                self.get_a, t, None).read())
 
1464
 
 
1465
    def test_one_redirection(self):
 
1466
        self.redirections = 0
 
1467
 
 
1468
        def redirected(transport, exception, redirection_notice):
 
1469
            self.redirections += 1
 
1470
            dir, file = urlutils.split(exception.target)
 
1471
            return self._transport(dir)
 
1472
 
 
1473
        self.assertEqual('0123456789',
 
1474
                         transport.do_catching_redirections(
 
1475
                self.get_a, self.old_transport, redirected).read())
 
1476
        self.assertEqual(1, self.redirections)
 
1477
 
 
1478
    def test_redirection_loop(self):
 
1479
 
 
1480
        def redirected(transport, exception, redirection_notice):
 
1481
            # By using the redirected url as a base dir for the
 
1482
            # *old* transport, we create a loop: a => a/a =>
 
1483
            # a/a/a
 
1484
            return self.old_transport.clone(exception.target)
 
1485
 
 
1486
        self.assertRaises(errors.TooManyRedirections,
 
1487
                          transport.do_catching_redirections,
 
1488
                          self.get_a, self.old_transport, redirected)
 
1489
 
 
1490
 
 
1491
class TestAuth(http_utils.TestCaseWithWebserver):
 
1492
    """Test authentication scheme"""
 
1493
 
 
1494
    _auth_header = 'Authorization'
 
1495
    _password_prompt_prefix = ''
 
1496
    _username_prompt_prefix = ''
 
1497
    # Set by load_tests
 
1498
    _auth_server = None
 
1499
 
 
1500
    def setUp(self):
 
1501
        super(TestAuth, self).setUp()
 
1502
        self.server = self.get_readonly_server()
 
1503
        self.build_tree_contents([('a', 'contents of a\n'),
 
1504
                                  ('b', 'contents of b\n'),])
 
1505
 
 
1506
    def create_transport_readonly_server(self):
 
1507
        return self._auth_server(protocol_version=self._protocol_version)
 
1508
 
 
1509
    def _testing_pycurl(self):
 
1510
        return pycurl_present and self._transport == PyCurlTransport
 
1511
 
 
1512
    def get_user_url(self, user, password):
 
1513
        """Build an url embedding user and password"""
 
1514
        url = '%s://' % self.server._url_protocol
 
1515
        if user is not None:
 
1516
            url += user
 
1517
            if password is not None:
 
1518
                url += ':' + password
 
1519
            url += '@'
 
1520
        url += '%s:%s/' % (self.server.host, self.server.port)
 
1521
        return url
 
1522
 
 
1523
    def get_user_transport(self, user, password):
 
1524
        return self._transport(self.get_user_url(user, password))
 
1525
 
 
1526
    def test_no_user(self):
 
1527
        self.server.add_user('joe', 'foo')
 
1528
        t = self.get_user_transport(None, None)
 
1529
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1530
        # Only one 'Authentication Required' error should occur
 
1531
        self.assertEqual(1, self.server.auth_required_errors)
 
1532
 
 
1533
    def test_empty_pass(self):
 
1534
        self.server.add_user('joe', '')
 
1535
        t = self.get_user_transport('joe', '')
 
1536
        self.assertEqual('contents of a\n', t.get('a').read())
 
1537
        # Only one 'Authentication Required' error should occur
 
1538
        self.assertEqual(1, self.server.auth_required_errors)
 
1539
 
 
1540
    def test_user_pass(self):
 
1541
        self.server.add_user('joe', 'foo')
 
1542
        t = self.get_user_transport('joe', 'foo')
 
1543
        self.assertEqual('contents of a\n', t.get('a').read())
 
1544
        # Only one 'Authentication Required' error should occur
 
1545
        self.assertEqual(1, self.server.auth_required_errors)
 
1546
 
 
1547
    def test_unknown_user(self):
 
1548
        self.server.add_user('joe', 'foo')
 
1549
        t = self.get_user_transport('bill', 'foo')
 
1550
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1551
        # Two 'Authentication Required' errors should occur (the
 
1552
        # initial 'who are you' and 'I don't know you, who are
 
1553
        # you').
 
1554
        self.assertEqual(2, self.server.auth_required_errors)
 
1555
 
 
1556
    def test_wrong_pass(self):
 
1557
        self.server.add_user('joe', 'foo')
 
1558
        t = self.get_user_transport('joe', 'bar')
 
1559
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1560
        # Two 'Authentication Required' errors should occur (the
 
1561
        # initial 'who are you' and 'this is not you, who are you')
 
1562
        self.assertEqual(2, self.server.auth_required_errors)
 
1563
 
 
1564
    def test_prompt_for_username(self):
 
1565
        if self._testing_pycurl():
 
1566
            raise tests.TestNotApplicable(
 
1567
                'pycurl cannot prompt, it handles auth by embedding'
 
1568
                ' user:pass in urls only')
 
1569
 
 
1570
        self.server.add_user('joe', 'foo')
 
1571
        t = self.get_user_transport(None, None)
 
1572
        stdout = tests.StringIOWrapper()
 
1573
        stderr = tests.StringIOWrapper()
 
1574
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1575
                                            stdout=stdout, stderr=stderr)
 
1576
        self.assertEqual('contents of a\n',t.get('a').read())
 
1577
        # stdin should be empty
 
1578
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1579
        stderr.seek(0)
 
1580
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
 
1581
        self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
 
1582
        self.assertEqual('', stdout.getvalue())
 
1583
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1584
                                    stderr.readline())
 
1585
 
 
1586
    def test_prompt_for_password(self):
 
1587
        if self._testing_pycurl():
 
1588
            raise tests.TestNotApplicable(
 
1589
                'pycurl cannot prompt, it handles auth by embedding'
 
1590
                ' user:pass in urls only')
 
1591
 
 
1592
        self.server.add_user('joe', 'foo')
 
1593
        t = self.get_user_transport('joe', None)
 
1594
        stdout = tests.StringIOWrapper()
 
1595
        stderr = tests.StringIOWrapper()
 
1596
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1597
                                            stdout=stdout, stderr=stderr)
 
1598
        self.assertEqual('contents of a\n', t.get('a').read())
 
1599
        # stdin should be empty
 
1600
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1601
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1602
                                    stderr.getvalue())
 
1603
        self.assertEqual('', stdout.getvalue())
 
1604
        # And we shouldn't prompt again for a different request
 
1605
        # against the same transport.
 
1606
        self.assertEqual('contents of b\n',t.get('b').read())
 
1607
        t2 = t.clone()
 
1608
        # And neither against a clone
 
1609
        self.assertEqual('contents of b\n',t2.get('b').read())
 
1610
        # Only one 'Authentication Required' error should occur
 
1611
        self.assertEqual(1, self.server.auth_required_errors)
 
1612
 
 
1613
    def _check_password_prompt(self, scheme, user, actual_prompt):
 
1614
        expected_prompt = (self._password_prompt_prefix
 
1615
                           + ("%s %s@%s:%d, Realm: '%s' password: "
 
1616
                              % (scheme.upper(),
 
1617
                                 user, self.server.host, self.server.port,
 
1618
                                 self.server.auth_realm)))
 
1619
        self.assertEqual(expected_prompt, actual_prompt)
 
1620
 
 
1621
    def _expected_username_prompt(self, scheme):
 
1622
        return (self._username_prompt_prefix
 
1623
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
 
1624
                                 self.server.host, self.server.port,
 
1625
                                 self.server.auth_realm))
 
1626
 
 
1627
    def test_no_prompt_for_password_when_using_auth_config(self):
 
1628
        if self._testing_pycurl():
 
1629
            raise tests.TestNotApplicable(
 
1630
                'pycurl does not support authentication.conf'
 
1631
                ' since it cannot prompt')
 
1632
 
 
1633
        user =' joe'
 
1634
        password = 'foo'
 
1635
        stdin_content = 'bar\n'  # Not the right password
 
1636
        self.server.add_user(user, password)
 
1637
        t = self.get_user_transport(user, None)
 
1638
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
 
1639
                                            stderr=tests.StringIOWrapper())
 
1640
        # Create a minimal config file with the right password
 
1641
        conf = config.AuthenticationConfig()
 
1642
        conf._get_config().update(
 
1643
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1644
                          'user': user, 'password': password}})
 
1645
        conf._save()
 
1646
        # Issue a request to the server to connect
 
1647
        self.assertEqual('contents of a\n',t.get('a').read())
 
1648
        # stdin should have  been left untouched
 
1649
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
 
1650
        # Only one 'Authentication Required' error should occur
 
1651
        self.assertEqual(1, self.server.auth_required_errors)
 
1652
 
 
1653
    def test_user_from_auth_conf(self):
 
1654
        if self._testing_pycurl():
 
1655
            raise tests.TestNotApplicable(
 
1656
                'pycurl does not support authentication.conf')
 
1657
        user = 'joe'
 
1658
        password = 'foo'
 
1659
        self.server.add_user(user, password)
 
1660
        # Create a minimal config file with the right password
 
1661
        conf = config.AuthenticationConfig()
 
1662
        conf._get_config().update(
 
1663
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1664
                          'user': user, 'password': password}})
 
1665
        conf._save()
 
1666
        t = self.get_user_transport(None, None)
 
1667
        # Issue a request to the server to connect
 
1668
        self.assertEqual('contents of a\n', t.get('a').read())
 
1669
        # Only one 'Authentication Required' error should occur
 
1670
        self.assertEqual(1, self.server.auth_required_errors)
 
1671
 
 
1672
    def test_changing_nonce(self):
 
1673
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1674
                                     http_utils.ProxyDigestAuthServer):
 
1675
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1676
        if self._testing_pycurl():
 
1677
            raise tests.KnownFailure(
 
1678
                'pycurl does not handle a nonce change')
 
1679
        self.server.add_user('joe', 'foo')
 
1680
        t = self.get_user_transport('joe', 'foo')
 
1681
        self.assertEqual('contents of a\n', t.get('a').read())
 
1682
        self.assertEqual('contents of b\n', t.get('b').read())
 
1683
        # Only one 'Authentication Required' error should have
 
1684
        # occured so far
 
1685
        self.assertEqual(1, self.server.auth_required_errors)
 
1686
        # The server invalidates the current nonce
 
1687
        self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
 
1688
        self.assertEqual('contents of a\n', t.get('a').read())
 
1689
        # Two 'Authentication Required' errors should occur (the
 
1690
        # initial 'who are you' and a second 'who are you' with the new nonce)
 
1691
        self.assertEqual(2, self.server.auth_required_errors)
 
1692
 
 
1693
 
 
1694
 
 
1695
class TestProxyAuth(TestAuth):
 
1696
    """Test proxy authentication schemes."""
 
1697
 
 
1698
    _auth_header = 'Proxy-authorization'
 
1699
    _password_prompt_prefix = 'Proxy '
 
1700
    _username_prompt_prefix = 'Proxy '
 
1701
 
 
1702
    def setUp(self):
 
1703
        super(TestProxyAuth, self).setUp()
 
1704
        self._old_env = {}
 
1705
        self.addCleanup(self._restore_env)
 
1706
        # Override the contents to avoid false positives
 
1707
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
 
1708
                                  ('b', 'not proxied contents of b\n'),
 
1709
                                  ('a-proxied', 'contents of a\n'),
 
1710
                                  ('b-proxied', 'contents of b\n'),
 
1711
                                  ])
 
1712
 
 
1713
    def get_user_transport(self, user, password):
 
1714
        self._install_env({'all_proxy': self.get_user_url(user, password)})
 
1715
        return self._transport(self.server.get_url())
 
1716
 
 
1717
    def _install_env(self, env):
 
1718
        for name, value in env.iteritems():
 
1719
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1720
 
 
1721
    def _restore_env(self):
 
1722
        for name, value in self._old_env.iteritems():
 
1723
            osutils.set_or_unset_env(name, value)
 
1724
 
 
1725
    def test_empty_pass(self):
 
1726
        if self._testing_pycurl():
 
1727
            import pycurl
 
1728
            if pycurl.version_info()[1] < '7.16.0':
 
1729
                raise tests.KnownFailure(
 
1730
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
 
1731
        super(TestProxyAuth, self).test_empty_pass()
 
1732
 
 
1733
 
 
1734
class SampleSocket(object):
 
1735
    """A socket-like object for use in testing the HTTP request handler."""
 
1736
 
 
1737
    def __init__(self, socket_read_content):
 
1738
        """Constructs a sample socket.
 
1739
 
 
1740
        :param socket_read_content: a byte sequence
 
1741
        """
 
1742
        # Use plain python StringIO so we can monkey-patch the close method to
 
1743
        # not discard the contents.
 
1744
        from StringIO import StringIO
 
1745
        self.readfile = StringIO(socket_read_content)
 
1746
        self.writefile = StringIO()
 
1747
        self.writefile.close = lambda: None
 
1748
 
 
1749
    def makefile(self, mode='r', bufsize=None):
 
1750
        if 'r' in mode:
 
1751
            return self.readfile
 
1752
        else:
 
1753
            return self.writefile
 
1754
 
 
1755
 
 
1756
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
 
1757
 
 
1758
    def setUp(self):
 
1759
        super(SmartHTTPTunnellingTest, self).setUp()
 
1760
        # We use the VFS layer as part of HTTP tunnelling tests.
 
1761
        self._captureVar('BZR_NO_SMART_VFS', None)
 
1762
        self.transport_readonly_server = http_utils.HTTPServerWithSmarts
 
1763
 
 
1764
    def create_transport_readonly_server(self):
 
1765
        return http_utils.HTTPServerWithSmarts(
 
1766
            protocol_version=self._protocol_version)
 
1767
 
 
1768
    def test_open_bzrdir(self):
 
1769
        branch = self.make_branch('relpath')
 
1770
        http_server = self.get_readonly_server()
 
1771
        url = http_server.get_url() + 'relpath'
 
1772
        bd = bzrdir.BzrDir.open(url)
 
1773
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
 
1774
 
 
1775
    def test_bulk_data(self):
 
1776
        # We should be able to send and receive bulk data in a single message.
 
1777
        # The 'readv' command in the smart protocol both sends and receives
 
1778
        # bulk data, so we use that.
 
1779
        self.build_tree(['data-file'])
 
1780
        http_server = self.get_readonly_server()
 
1781
        http_transport = self._transport(http_server.get_url())
 
1782
        medium = http_transport.get_smart_medium()
 
1783
        # Since we provide the medium, the url below will be mostly ignored
 
1784
        # during the test, as long as the path is '/'.
 
1785
        remote_transport = remote.RemoteTransport('bzr://fake_host/',
 
1786
                                                  medium=medium)
 
1787
        self.assertEqual(
 
1788
            [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
 
1789
 
 
1790
    def test_http_send_smart_request(self):
 
1791
 
 
1792
        post_body = 'hello\n'
 
1793
        expected_reply_body = 'ok\x012\n'
 
1794
 
 
1795
        http_server = self.get_readonly_server()
 
1796
        http_transport = self._transport(http_server.get_url())
 
1797
        medium = http_transport.get_smart_medium()
 
1798
        response = medium.send_http_smart_request(post_body)
 
1799
        reply_body = response.read()
 
1800
        self.assertEqual(expected_reply_body, reply_body)
 
1801
 
 
1802
    def test_smart_http_server_post_request_handler(self):
 
1803
        httpd = self.get_readonly_server()._get_httpd()
 
1804
 
 
1805
        socket = SampleSocket(
 
1806
            'POST /.bzr/smart %s \r\n' % self._protocol_version
 
1807
            # HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
 
1808
            # for 1.0)
 
1809
            + 'Content-Length: 6\r\n'
 
1810
            '\r\n'
 
1811
            'hello\n')
 
1812
        # Beware: the ('localhost', 80) below is the
 
1813
        # client_address parameter, but we don't have one because
 
1814
        # we have defined a socket which is not bound to an
 
1815
        # address. The test framework never uses this client
 
1816
        # address, so far...
 
1817
        request_handler = http_utils.SmartRequestHandler(socket,
 
1818
                                                         ('localhost', 80),
 
1819
                                                         httpd)
 
1820
        response = socket.writefile.getvalue()
 
1821
        self.assertStartsWith(response, '%s 200 ' % self._protocol_version)
 
1822
        # This includes the end of the HTTP headers, and all the body.
 
1823
        expected_end_of_response = '\r\n\r\nok\x012\n'
 
1824
        self.assertEndsWith(response, expected_end_of_response)
 
1825
 
 
1826
 
 
1827
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
1828
    """No smart server here request handler."""
 
1829
 
 
1830
    def do_POST(self):
 
1831
        self.send_error(403, "Forbidden")
 
1832
 
 
1833
 
 
1834
class SmartClientAgainstNotSmartServer(TestSpecificRequestHandler):
 
1835
    """Test smart client behaviour against an http server without smarts."""
 
1836
 
 
1837
    _req_handler_class = ForbiddenRequestHandler
 
1838
 
 
1839
    def test_probe_smart_server(self):
 
1840
        """Test error handling against server refusing smart requests."""
 
1841
        server = self.get_readonly_server()
 
1842
        t = self._transport(server.get_url())
 
1843
        # No need to build a valid smart request here, the server will not even
 
1844
        # try to interpret it.
 
1845
        self.assertRaises(errors.SmartProtocolError,
 
1846
                          t.get_smart_medium().send_http_smart_request,
 
1847
                          'whatever')
 
1848
 
 
1849
class Test_redirected_to(tests.TestCase):
 
1850
 
 
1851
    def test_redirected_to_subdir(self):
 
1852
        t = self._transport('http://www.example.com/foo')
 
1853
        r = t._redirected_to('http://www.example.com/foo',
 
1854
                             'http://www.example.com/foo/subdir')
 
1855
        self.assertIsInstance(r, type(t))
 
1856
        # Both transports share the some connection
 
1857
        self.assertEqual(t._get_connection(), r._get_connection())
 
1858
 
 
1859
    def test_redirected_to_self_with_slash(self):
 
1860
        t = self._transport('http://www.example.com/foo')
 
1861
        r = t._redirected_to('http://www.example.com/foo',
 
1862
                             'http://www.example.com/foo/')
 
1863
        self.assertIsInstance(r, type(t))
 
1864
        # Both transports share the some connection (one can argue that we
 
1865
        # should return the exact same transport here, but that seems
 
1866
        # overkill).
 
1867
        self.assertEqual(t._get_connection(), r._get_connection())
 
1868
 
 
1869
    def test_redirected_to_host(self):
 
1870
        t = self._transport('http://www.example.com/foo')
 
1871
        r = t._redirected_to('http://www.example.com/foo',
 
1872
                             'http://foo.example.com/foo/subdir')
 
1873
        self.assertIsInstance(r, type(t))
 
1874
 
 
1875
    def test_redirected_to_same_host_sibling_protocol(self):
 
1876
        t = self._transport('http://www.example.com/foo')
 
1877
        r = t._redirected_to('http://www.example.com/foo',
 
1878
                             'https://www.example.com/foo')
 
1879
        self.assertIsInstance(r, type(t))
 
1880
 
 
1881
    def test_redirected_to_same_host_different_protocol(self):
 
1882
        t = self._transport('http://www.example.com/foo')
 
1883
        r = t._redirected_to('http://www.example.com/foo',
 
1884
                             'ftp://www.example.com/foo')
 
1885
        self.assertNotEquals(type(r), type(t))
 
1886
 
 
1887
    def test_redirected_to_different_host_same_user(self):
 
1888
        t = self._transport('http://joe@www.example.com/foo')
 
1889
        r = t._redirected_to('http://www.example.com/foo',
 
1890
                             'https://foo.example.com/foo')
 
1891
        self.assertIsInstance(r, type(t))
 
1892
        self.assertEqual(t._user, r._user)
 
1893
 
 
1894
 
 
1895
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
 
1896
    """Request handler for a unique and pre-defined request.
 
1897
 
 
1898
    The only thing we care about here is how many bytes travel on the wire. But
 
1899
    since we want to measure it for a real http client, we have to send it
 
1900
    correct responses.
 
1901
 
 
1902
    We expect to receive a *single* request nothing more (and we won't even
 
1903
    check what request it is, we just measure the bytes read until an empty
 
1904
    line.
 
1905
    """
 
1906
 
 
1907
    def handle_one_request(self):
 
1908
        tcs = self.server.test_case_server
 
1909
        requestline = self.rfile.readline()
 
1910
        headers = self.MessageClass(self.rfile, 0)
 
1911
        # We just read: the request, the headers, an empty line indicating the
 
1912
        # end of the headers.
 
1913
        bytes_read = len(requestline)
 
1914
        for line in headers.headers:
 
1915
            bytes_read += len(line)
 
1916
        bytes_read += len('\r\n')
 
1917
        if requestline.startswith('POST'):
 
1918
            # The body should be a single line (or we don't know where it ends
 
1919
            # and we don't want to issue a blocking read)
 
1920
            body = self.rfile.readline()
 
1921
            bytes_read += len(body)
 
1922
        tcs.bytes_read = bytes_read
 
1923
 
 
1924
        # We set the bytes written *before* issuing the write, the client is
 
1925
        # supposed to consume every produced byte *before* checking that value.
 
1926
 
 
1927
        # Doing the oppposite may lead to test failure: we may be interrupted
 
1928
        # after the write but before updating the value. The client can then
 
1929
        # continue and read the value *before* we can update it. And yes,
 
1930
        # this has been observed -- vila 20090129
 
1931
        tcs.bytes_written = len(tcs.canned_response)
 
1932
        self.wfile.write(tcs.canned_response)
 
1933
 
 
1934
 
 
1935
class ActivityServerMixin(object):
 
1936
 
 
1937
    def __init__(self, protocol_version):
 
1938
        super(ActivityServerMixin, self).__init__(
 
1939
            request_handler=PredefinedRequestHandler,
 
1940
            protocol_version=protocol_version)
 
1941
        # Bytes read and written by the server
 
1942
        self.bytes_read = 0
 
1943
        self.bytes_written = 0
 
1944
        self.canned_response = None
 
1945
 
 
1946
 
 
1947
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
 
1948
    pass
 
1949
 
 
1950
 
 
1951
if tests.HTTPSServerFeature.available():
 
1952
    from bzrlib.tests import https_server
 
1953
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
 
1954
        pass
 
1955
 
 
1956
 
 
1957
class TestActivityMixin(object):
 
1958
    """Test socket activity reporting.
 
1959
 
 
1960
    We use a special purpose server to control the bytes sent and received and
 
1961
    be able to predict the activity on the client socket.
 
1962
    """
 
1963
 
 
1964
    def setUp(self):
 
1965
        tests.TestCase.setUp(self)
 
1966
        self.server = self._activity_server(self._protocol_version)
 
1967
        self.server.setUp()
 
1968
        self.activities = {}
 
1969
        def report_activity(t, bytes, direction):
 
1970
            count = self.activities.get(direction, 0)
 
1971
            count += bytes
 
1972
            self.activities[direction] = count
 
1973
 
 
1974
        # We override at class level because constructors may propagate the
 
1975
        # bound method and render instance overriding ineffective (an
 
1976
        # alternative would be to define a specific ui factory instead...)
 
1977
        self.orig_report_activity = self._transport._report_activity
 
1978
        self._transport._report_activity = report_activity
 
1979
 
 
1980
    def tearDown(self):
 
1981
        self._transport._report_activity = self.orig_report_activity
 
1982
        self.server.tearDown()
 
1983
        tests.TestCase.tearDown(self)
 
1984
 
 
1985
    def get_transport(self):
 
1986
        return self._transport(self.server.get_url())
 
1987
 
 
1988
    def assertActivitiesMatch(self):
 
1989
        self.assertEqual(self.server.bytes_read,
 
1990
                         self.activities.get('write', 0), 'written bytes')
 
1991
        self.assertEqual(self.server.bytes_written,
 
1992
                         self.activities.get('read', 0), 'read bytes')
 
1993
 
 
1994
    def test_get(self):
 
1995
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
1996
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
 
1997
Server: Apache/2.0.54 (Fedora)\r
 
1998
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
 
1999
ETag: "56691-23-38e9ae00"\r
 
2000
Accept-Ranges: bytes\r
 
2001
Content-Length: 35\r
 
2002
Connection: close\r
 
2003
Content-Type: text/plain; charset=UTF-8\r
 
2004
\r
 
2005
Bazaar-NG meta directory, format 1
 
2006
'''
 
2007
        t = self.get_transport()
 
2008
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
 
2009
                         t.get('foo/bar').read())
 
2010
        self.assertActivitiesMatch()
 
2011
 
 
2012
    def test_has(self):
 
2013
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
2014
Server: SimpleHTTP/0.6 Python/2.5.2\r
 
2015
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
 
2016
Content-type: application/octet-stream\r
 
2017
Content-Length: 20\r
 
2018
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
 
2019
\r
 
2020
'''
 
2021
        t = self.get_transport()
 
2022
        self.assertTrue(t.has('foo/bar'))
 
2023
        self.assertActivitiesMatch()
 
2024
 
 
2025
    def test_readv(self):
 
2026
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
 
2027
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
 
2028
Server: Apache/2.0.54 (Fedora)\r
 
2029
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
 
2030
ETag: "238a3c-16ec2-805c5540"\r
 
2031
Accept-Ranges: bytes\r
 
2032
Content-Length: 1534\r
 
2033
Connection: close\r
 
2034
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
 
2035
\r
 
2036
\r
 
2037
--418470f848b63279b\r
 
2038
Content-type: text/plain; charset=UTF-8\r
 
2039
Content-range: bytes 0-254/93890\r
 
2040
\r
 
2041
mbp@sourcefrog.net-20050309040815-13242001617e4a06
 
2042
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
 
2043
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
 
2044
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
 
2045
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
 
2046
\r
 
2047
--418470f848b63279b\r
 
2048
Content-type: text/plain; charset=UTF-8\r
 
2049
Content-range: bytes 1000-2049/93890\r
 
2050
\r
 
2051
40-fd4ec249b6b139ab
 
2052
mbp@sourcefrog.net-20050311063625-07858525021f270b
 
2053
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
 
2054
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
 
2055
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
 
2056
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
 
2057
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
 
2058
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
 
2059
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
 
2060
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
 
2061
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
 
2062
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
 
2063
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
 
2064
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
 
2065
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
 
2066
mbp@sourcefrog.net-20050313120651-497bd231b19df600
 
2067
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
 
2068
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
 
2069
mbp@sourcefrog.net-20050314025539-637a636692c055cf
 
2070
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
 
2071
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
 
2072
mbp@source\r
 
2073
--418470f848b63279b--\r
 
2074
'''
 
2075
        t = self.get_transport()
 
2076
        # Remember that the request is ignored and that the ranges below
 
2077
        # doesn't have to match the canned response.
 
2078
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
 
2079
        self.assertEqual(2, len(l))
 
2080
        self.assertActivitiesMatch()
 
2081
 
 
2082
    def test_post(self):
 
2083
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
2084
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
 
2085
Server: Apache/2.0.54 (Fedora)\r
 
2086
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
 
2087
ETag: "56691-23-38e9ae00"\r
 
2088
Accept-Ranges: bytes\r
 
2089
Content-Length: 35\r
 
2090
Connection: close\r
 
2091
Content-Type: text/plain; charset=UTF-8\r
 
2092
\r
 
2093
lalala whatever as long as itsssss
 
2094
'''
 
2095
        t = self.get_transport()
 
2096
        # We must send a single line of body bytes, see
 
2097
        # PredefinedRequestHandler.handle_one_request
 
2098
        code, f = t._post('abc def end-of-body\n')
 
2099
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
 
2100
        self.assertActivitiesMatch()
 
2101
 
 
2102
 
 
2103
class TestActivity(tests.TestCase, TestActivityMixin):
 
2104
 
 
2105
    def setUp(self):
 
2106
        tests.TestCase.setUp(self)
 
2107
        self.server = self._activity_server(self._protocol_version)
 
2108
        self.server.setUp()
 
2109
        self.activities = {}
 
2110
        def report_activity(t, bytes, direction):
 
2111
            count = self.activities.get(direction, 0)
 
2112
            count += bytes
 
2113
            self.activities[direction] = count
 
2114
 
 
2115
        # We override at class level because constructors may propagate the
 
2116
        # bound method and render instance overriding ineffective (an
 
2117
        # alternative would be to define a specific ui factory instead...)
 
2118
        self.orig_report_activity = self._transport._report_activity
 
2119
        self._transport._report_activity = report_activity
 
2120
 
 
2121
    def tearDown(self):
 
2122
        self._transport._report_activity = self.orig_report_activity
 
2123
        self.server.tearDown()
 
2124
        tests.TestCase.tearDown(self)
 
2125
 
 
2126
 
 
2127
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
 
2128
 
 
2129
    def setUp(self):
 
2130
        tests.TestCase.setUp(self)
 
2131
        # Unlike TestActivity, we are really testing ReportingFileSocket and
 
2132
        # ReportingSocket, so we don't need all the parametrization. Since
 
2133
        # ReportingFileSocket and ReportingSocket are wrappers, it's easier to
 
2134
        # test them through their use by the transport than directly (that's a
 
2135
        # bit less clean but far more simpler and effective).
 
2136
        self.server = ActivityHTTPServer('HTTP/1.1')
 
2137
        self._transport=_urllib.HttpTransport_urllib
 
2138
 
 
2139
        self.server.setUp()
 
2140
 
 
2141
        # We override at class level because constructors may propagate the
 
2142
        # bound method and render instance overriding ineffective (an
 
2143
        # alternative would be to define a specific ui factory instead...)
 
2144
        self.orig_report_activity = self._transport._report_activity
 
2145
        self._transport._report_activity = None
 
2146
 
 
2147
    def tearDown(self):
 
2148
        self._transport._report_activity = self.orig_report_activity
 
2149
        self.server.tearDown()
 
2150
        tests.TestCase.tearDown(self)
 
2151
 
 
2152
    def assertActivitiesMatch(self):
 
2153
        # Nothing to check here
 
2154
        pass
 
2155
 
 
2156
 
 
2157
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
 
2158
    """Test authentication on the redirected http server."""
 
2159
 
 
2160
    _auth_header = 'Authorization'
 
2161
    _password_prompt_prefix = ''
 
2162
    _username_prompt_prefix = ''
 
2163
    _auth_server = http_utils.HTTPBasicAuthServer
 
2164
    _transport = _urllib.HttpTransport_urllib
 
2165
 
 
2166
    def create_transport_readonly_server(self):
 
2167
        return self._auth_server()
 
2168
 
 
2169
    def create_transport_secondary_server(self):
 
2170
        """Create the secondary server redirecting to the primary server"""
 
2171
        new = self.get_readonly_server()
 
2172
 
 
2173
        redirecting = http_utils.HTTPServerRedirecting()
 
2174
        redirecting.redirect_to(new.host, new.port)
 
2175
        return redirecting
 
2176
 
 
2177
    def setUp(self):
 
2178
        super(TestAuthOnRedirected, self).setUp()
 
2179
        self.build_tree_contents([('a','a'),
 
2180
                                  ('1/',),
 
2181
                                  ('1/a', 'redirected once'),
 
2182
                                  ],)
 
2183
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
2184
                                       self.new_server.port)
 
2185
        self.old_server.redirections = [
 
2186
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
2187
        self.old_transport = self._transport(self.old_server.get_url())
 
2188
        self.new_server.add_user('joe', 'foo')
 
2189
 
 
2190
    def get_a(self, transport):
 
2191
        return transport.get('a')
 
2192
 
 
2193
    def test_auth_on_redirected_via_do_catching_redirections(self):
 
2194
        self.redirections = 0
 
2195
 
 
2196
        def redirected(transport, exception, redirection_notice):
 
2197
            self.redirections += 1
 
2198
            dir, file = urlutils.split(exception.target)
 
2199
            return self._transport(dir)
 
2200
 
 
2201
        stdout = tests.StringIOWrapper()
 
2202
        stderr = tests.StringIOWrapper()
 
2203
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
2204
                                            stdout=stdout, stderr=stderr)
 
2205
        self.assertEqual('redirected once',
 
2206
                         transport.do_catching_redirections(
 
2207
                self.get_a, self.old_transport, redirected).read())
 
2208
        self.assertEqual(1, self.redirections)
 
2209
        # stdin should be empty
 
2210
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
2211
        # stdout should be empty, stderr will contains the prompts
 
2212
        self.assertEqual('', stdout.getvalue())
 
2213
 
 
2214
    def test_auth_on_redirected_via_following_redirections(self):
 
2215
        self.new_server.add_user('joe', 'foo')
 
2216
        stdout = tests.StringIOWrapper()
 
2217
        stderr = tests.StringIOWrapper()
 
2218
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
2219
                                            stdout=stdout, stderr=stderr)
 
2220
        t = self.old_transport
 
2221
        req = RedirectedRequest('GET', t.abspath('a'))
 
2222
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
2223
                                       self.new_server.port)
 
2224
        self.old_server.redirections = [
 
2225
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
2226
        self.assertEqual('redirected once',t._perform(req).read())
 
2227
        # stdin should be empty
 
2228
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
2229
        # stdout should be empty, stderr will contains the prompts
 
2230
        self.assertEqual('', stdout.getvalue())
 
2231
 
 
2232