~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http_implementations.py

  • Committer: Vincent Ladeuil
  • Date: 2007-12-21 12:20:33 UTC
  • mto: (3146.3.1 179368) (3156.2.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 3158.
  • Revision ID: v.ladeuil+lp@free.fr-20071221122033-42bc21re0zj4kqbg
Merge back test_http_implementations.pc into test_http.py.

* bzrlib/tests/test_http.py: 
Merge test_http_implementations.py now that we have rewritten
load_tests. That should reduce the noise in the final proposed
patch.

* bzrlib/tests/http_server.py:
(TestingHTTPRequestHandler.log_message): Ghaaa, don't over spell-check.

Show diffs side-by-side

added added

removed removed

Lines of Context:
49
49
    )
50
50
 
51
51
 
52
 
try:
53
 
    from bzrlib.transport.http._pycurl import PyCurlTransport
54
 
    pycurl_present = True
55
 
except errors.DependencyNotPresent:
56
 
    pycurl_present = False
57
 
 
58
 
 
59
 
class TransportAdapter(tests.TestScenarioApplier):
60
 
    """Generate the same test for each transport implementation."""
61
 
 
62
 
    def __init__(self):
63
 
        transport_scenarios = [
64
 
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
65
 
                            _server=http_server.HttpServer_urllib,
66
 
                            _qualified_prefix='http+urllib',)),
67
 
            ]
68
 
        if pycurl_present:
69
 
            transport_scenarios.append(
70
 
                ('pycurl', dict(_transport=PyCurlTransport,
71
 
                                _server=http_server.HttpServer_PyCurl,
72
 
                                _qualified_prefix='http+pycurl',)))
73
 
        self.scenarios = transport_scenarios
74
 
 
75
 
 
76
 
class TransportProtocolAdapter(TransportAdapter):
77
 
    """Generate the same test for each protocol implementation.
78
 
 
79
 
    In addition to the transport adaptatation that we inherit from.
80
 
    """
81
 
 
82
 
    def __init__(self):
83
 
        super(TransportProtocolAdapter, self).__init__()
84
 
        protocol_scenarios = [
85
 
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
86
 
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
87
 
            ]
88
 
        self.scenarios = tests.multiply_scenarios(self.scenarios,
89
 
                                                  protocol_scenarios)
90
 
 
91
 
 
92
 
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
93
 
    """Generate the same test for each authentication scheme implementation.
94
 
 
95
 
    In addition to the protocol adaptatation that we inherit from.
96
 
    """
97
 
 
98
 
    def __init__(self):
99
 
        super(TransportProtocolAuthenticationAdapter, self).__init__()
100
 
        auth_scheme_scenarios = [
101
 
            ('basic', dict(_auth_scheme='basic')),
102
 
            ('digest', dict(_auth_scheme='digest')),
103
 
            ]
104
 
 
105
 
        self.scenarios = tests.multiply_scenarios(self.scenarios,
106
 
                                                  auth_scheme_scenarios)
107
 
 
108
 
def load_tests(standard_tests, module, loader):
109
 
    """Multiply tests for http clients and protocol versions."""
110
 
    # one for each transport
111
 
    t_adapter = TransportAdapter()
112
 
    t_classes= (TestHttpTransportRegistration,
113
 
                TestHttpTransportUrls,
114
 
                )
115
 
    is_testing_for_transports = tests.condition_isinstance(t_classes)
116
 
 
117
 
    # multiplied by one for each protocol version
118
 
    tp_adapter = TransportProtocolAdapter()
119
 
    tp_classes= (TestDoCatchRedirections,
120
 
                 TestHTTPConnections,
121
 
                 TestHTTPRedirections,
122
 
                 TestHTTPSilentRedirections,
123
 
                 TestLimitedRangeRequestServer,
124
 
                 TestPost,
125
 
                 TestProxyHttpServer,
126
 
                 TestRanges,
127
 
                 TestSpecificRequestHandler,
128
 
                 )
129
 
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
130
 
 
131
 
    # multiplied by one for each authentication scheme
132
 
    tpa_adapter = TransportProtocolAuthenticationAdapter()
133
 
    tpa_classes = (TestAuth,
134
 
                   )
135
 
    is_also_testing_for_authentication = tests.condition_isinstance(tpa_classes)
136
 
 
137
 
    result = loader.suiteClass()
138
 
    for test in tests.iter_suite_tests(standard_tests):
139
 
        # Each test is either standalone or testing for some combination of
140
 
        # transport, protocol version, authentication scheme. Use the right
141
 
        # adpater (or none) depending on the class.
142
 
        if is_testing_for_transports(test):
143
 
            result.addTests(t_adapter.adapt(test))
144
 
        elif is_also_testing_for_protocols(test):
145
 
            result.addTests(tp_adapter.adapt(test))
146
 
        elif is_also_testing_for_authentication(test):
147
 
            result.addTests(tpa_adapter.adapt(test))
148
 
        else:
149
 
            result.addTests(test)
150
 
    return result
151
 
 
152
 
 
153
 
class TestHttpTransportUrls(tests.TestCase):
154
 
    """Test the http urls."""
155
 
 
156
 
    def test_abs_url(self):
157
 
        """Construction of absolute http URLs"""
158
 
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
159
 
        eq = self.assertEqualDiff
160
 
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
161
 
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
162
 
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
163
 
        eq(t.abspath('.bzr/1//2/./3'),
164
 
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
165
 
 
166
 
    def test_invalid_http_urls(self):
167
 
        """Trap invalid construction of urls"""
168
 
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
169
 
        self.assertRaises(errors.InvalidURL,
170
 
                          self._transport,
171
 
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
172
 
 
173
 
    def test_http_root_urls(self):
174
 
        """Construction of URLs from server root"""
175
 
        t = self._transport('http://bzr.ozlabs.org/')
176
 
        eq = self.assertEqualDiff
177
 
        eq(t.abspath('.bzr/tree-version'),
178
 
           'http://bzr.ozlabs.org/.bzr/tree-version')
179
 
 
180
 
    def test_http_impl_urls(self):
181
 
        """There are servers which ask for particular clients to connect"""
182
 
        server = self._server()
183
 
        try:
184
 
            server.setUp()
185
 
            url = server.get_url()
186
 
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
187
 
        finally:
188
 
            server.tearDown()
189
 
 
190
 
 
191
 
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
192
 
    """Test the http connections."""
193
 
 
194
 
    def setUp(self):
195
 
        http_utils.TestCaseWithWebserver.setUp(self)
196
 
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
197
 
                        transport=self.get_transport())
198
 
 
199
 
    def test_http_has(self):
200
 
        server = self.get_readonly_server()
201
 
        t = self._transport(server.get_url())
202
 
        self.assertEqual(t.has('foo/bar'), True)
203
 
        self.assertEqual(len(server.logs), 1)
204
 
        self.assertContainsRe(server.logs[0],
205
 
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
206
 
 
207
 
    def test_http_has_not_found(self):
208
 
        server = self.get_readonly_server()
209
 
        t = self._transport(server.get_url())
210
 
        self.assertEqual(t.has('not-found'), False)
211
 
        self.assertContainsRe(server.logs[1],
212
 
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
213
 
 
214
 
    def test_http_get(self):
215
 
        server = self.get_readonly_server()
216
 
        t = self._transport(server.get_url())
217
 
        fp = t.get('foo/bar')
218
 
        self.assertEqualDiff(
219
 
            fp.read(),
220
 
            'contents of foo/bar\n')
221
 
        self.assertEqual(len(server.logs), 1)
222
 
        self.assertTrue(server.logs[0].find(
223
 
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
224
 
            % bzrlib.__version__) > -1)
225
 
 
226
 
    def test_get_smart_medium(self):
227
 
        # For HTTP, get_smart_medium should return the transport object.
228
 
        server = self.get_readonly_server()
229
 
        http_transport = self._transport(server.get_url())
230
 
        medium = http_transport.get_smart_medium()
231
 
        self.assertIs(medium, http_transport)
232
 
 
233
 
    def test_has_on_bogus_host(self):
234
 
        # Get a free address and don't 'accept' on it, so that we
235
 
        # can be sure there is no http handler there, but set a
236
 
        # reasonable timeout to not slow down tests too much.
237
 
        default_timeout = socket.getdefaulttimeout()
238
 
        try:
239
 
            socket.setdefaulttimeout(2)
240
 
            s = socket.socket()
241
 
            s.bind(('localhost', 0))
242
 
            t = self._transport('http://%s:%s/' % s.getsockname())
243
 
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
244
 
        finally:
245
 
            socket.setdefaulttimeout(default_timeout)
246
 
 
247
 
 
248
 
class TestPost(tests.TestCase):
249
 
 
250
 
    def test_post_body_is_received(self):
251
 
        server = http_utils.RecordingServer(expect_body_tail='end-of-body')
252
 
        server.setUp()
253
 
        self.addCleanup(server.tearDown)
254
 
        scheme = self._qualified_prefix
255
 
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
256
 
        http_transport = self._transport(url)
257
 
        code, response = http_transport._post('abc def end-of-body')
258
 
        self.assertTrue(
259
 
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
260
 
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
261
 
        # The transport should not be assuming that the server can accept
262
 
        # chunked encoding the first time it connects, because HTTP/1.1, so we
263
 
        # check for the literal string.
264
 
        self.assertTrue(
265
 
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
266
 
 
267
 
 
268
 
class TestHttpTransportRegistration(tests.TestCase):
269
 
    """Test registrations of various http implementations"""
270
 
 
271
 
    def test_http_registered(self):
272
 
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
273
 
        self.assertIsInstance(t, transport.Transport)
274
 
        self.assertIsInstance(t, self._transport)
275
 
 
276
 
 
277
 
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
278
 
    """Tests a specific request handler.
279
 
 
280
 
 
281
 
    Daughter class are expected to override _req_handler_class
282
 
    """
283
 
 
284
 
    # Provide a useful default
285
 
    _req_handler_class = http_server.TestingHTTPRequestHandler
286
 
 
287
 
    def create_transport_readonly_server(self):
288
 
        return http_server.HttpServer(self._req_handler_class,
289
 
                                      protocol_version=self._protocol_version)
290
 
 
291
 
 
292
 
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
293
 
    """Whatever request comes in, close the connection"""
294
 
 
295
 
    def handle_one_request(self):
296
 
        """Handle a single HTTP request, by abruptly closing the connection"""
297
 
        self.close_connection = 1
298
 
 
299
 
 
300
 
class TestWallServer(TestSpecificRequestHandler):
301
 
    """Tests exceptions during the connection phase"""
302
 
 
303
 
    _req_handler_class = WallRequestHandler
304
 
 
305
 
    def test_http_has(self):
306
 
        server = self.get_readonly_server()
307
 
        t = self._transport(server.get_url())
308
 
        # Unfortunately httplib (see HTTPResponse._read_status
309
 
        # for details) make no distinction between a closed
310
 
        # socket and badly formatted status line, so we can't
311
 
        # just test for ConnectionError, we have to test
312
 
        # InvalidHttpResponse too.
313
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
314
 
                          t.has, 'foo/bar')
315
 
 
316
 
    def test_http_get(self):
317
 
        server = self.get_readonly_server()
318
 
        t = self._transport(server.get_url())
319
 
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
320
 
                          t.get, 'foo/bar')
321
 
 
322
 
 
323
 
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
324
 
    """Whatever request comes in, returns a bad status"""
325
 
 
326
 
    def parse_request(self):
327
 
        """Fakes handling a single HTTP request, returns a bad status"""
328
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
329
 
        try:
330
 
            self.send_response(0, "Bad status")
331
 
            self.end_headers()
332
 
        except socket.error, e:
333
 
            # We don't want to pollute the test results with
334
 
            # spurious server errors while test succeed. In our
335
 
            # case, it may occur that the test has already read
336
 
            # the 'Bad Status' and closed the socket while we are
337
 
            # still trying to send some headers... So the test is
338
 
            # ok, but if we raise the exception, the output is
339
 
            # dirty. So we don't raise, but we close the
340
 
            # connection, just to be safe :)
341
 
            spurious = [errno.EPIPE,
342
 
                        errno.ECONNRESET,
343
 
                        errno.ECONNABORTED,
344
 
                        ]
345
 
            if (len(e.args) > 0) and (e.args[0] in spurious):
346
 
                self.close_connection = 1
347
 
                pass
348
 
            else:
349
 
                raise
350
 
        return False
351
 
 
352
 
 
353
 
class TestBadStatusServer(TestSpecificRequestHandler):
354
 
    """Tests bad status from server."""
355
 
 
356
 
    _req_handler_class = BadStatusRequestHandler
357
 
 
358
 
    def test_http_has(self):
359
 
        server = self.get_readonly_server()
360
 
        t = self._transport(server.get_url())
361
 
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
362
 
 
363
 
    def test_http_get(self):
364
 
        server = self.get_readonly_server()
365
 
        t = self._transport(server.get_url())
366
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
367
 
 
368
 
 
369
 
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
370
 
    """Whatever request comes in, returns am invalid status"""
371
 
 
372
 
    def parse_request(self):
373
 
        """Fakes handling a single HTTP request, returns a bad status"""
374
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
375
 
        self.wfile.write("Invalid status line\r\n")
376
 
        return False
377
 
 
378
 
 
379
 
class TestInvalidStatusServer(TestBadStatusServer):
380
 
    """Tests invalid status from server.
381
 
 
382
 
    Both implementations raises the same error as for a bad status.
383
 
    """
384
 
 
385
 
    _req_handler_class = InvalidStatusRequestHandler
386
 
 
387
 
 
388
 
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
389
 
    """Whatever request comes in, returns a bad protocol version"""
390
 
 
391
 
    def parse_request(self):
392
 
        """Fakes handling a single HTTP request, returns a bad status"""
393
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
394
 
        # Returns an invalid protocol version, but curl just
395
 
        # ignores it and those cannot be tested.
396
 
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
397
 
                                           404,
398
 
                                           'Look at my protocol version'))
399
 
        return False
400
 
 
401
 
 
402
 
class TestBadProtocolServer(TestSpecificRequestHandler):
403
 
    """Tests bad protocol from server."""
404
 
 
405
 
    _req_handler_class = BadProtocolRequestHandler
406
 
 
407
 
    def setUp(self):
408
 
        if pycurl_present and self._transport == PyCurlTransport:
409
 
            raise tests.TestNotApplicable(
410
 
                "pycurl doesn't check the protocol version")
411
 
        super(TestBadProtocolServer, self).setUp()
412
 
 
413
 
    def test_http_has(self):
414
 
        server = self.get_readonly_server()
415
 
        t = self._transport(server.get_url())
416
 
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
417
 
 
418
 
    def test_http_get(self):
419
 
        server = self.get_readonly_server()
420
 
        t = self._transport(server.get_url())
421
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
422
 
 
423
 
 
424
 
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
425
 
    """Whatever request comes in, returns a 403 code"""
426
 
 
427
 
    def parse_request(self):
428
 
        """Handle a single HTTP request, by replying we cannot handle it"""
429
 
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
430
 
        self.send_error(403)
431
 
        return False
432
 
 
433
 
 
434
 
class TestForbiddenServer(TestSpecificRequestHandler):
435
 
    """Tests forbidden server"""
436
 
 
437
 
    _req_handler_class = ForbiddenRequestHandler
438
 
 
439
 
    def test_http_has(self):
440
 
        server = self.get_readonly_server()
441
 
        t = self._transport(server.get_url())
442
 
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
443
 
 
444
 
    def test_http_get(self):
445
 
        server = self.get_readonly_server()
446
 
        t = self._transport(server.get_url())
447
 
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
448
 
 
449
 
 
450
 
class TestRanges(http_utils.TestCaseWithWebserver):
451
 
    """Test the Range header in GET methods."""
452
 
 
453
 
    def setUp(self):
454
 
        http_utils.TestCaseWithWebserver.setUp(self)
455
 
        self.build_tree_contents([('a', '0123456789')],)
456
 
        server = self.get_readonly_server()
457
 
        self.transport = self._transport(server.get_url())
458
 
 
459
 
    def _file_contents(self, relpath, ranges):
460
 
        offsets = [ (start, end - start + 1) for start, end in ranges]
461
 
        coalesce = self.transport._coalesce_offsets
462
 
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
463
 
        code, data = self.transport._get(relpath, coalesced)
464
 
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
465
 
        for start, end in ranges:
466
 
            data.seek(start)
467
 
            yield data.read(end - start + 1)
468
 
 
469
 
    def _file_tail(self, relpath, tail_amount):
470
 
        code, data = self.transport._get(relpath, [], tail_amount)
471
 
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
472
 
        data.seek(-tail_amount, 2)
473
 
        return data.read(tail_amount)
474
 
 
475
 
    def test_range_header(self):
476
 
        # Valid ranges
477
 
        map(self.assertEqual,['0', '234'],
478
 
            list(self._file_contents('a', [(0,0), (2,4)])),)
479
 
 
480
 
    def test_range_header_tail(self):
481
 
        self.assertEqual('789', self._file_tail('a', 3))
482
 
 
483
 
    def test_syntactically_invalid_range_header(self):
484
 
        self.assertListRaises(errors.InvalidHttpRange,
485
 
                          self._file_contents, 'a', [(4, 3)])
486
 
 
487
 
    def test_semantically_invalid_range_header(self):
488
 
        self.assertListRaises(errors.InvalidHttpRange,
489
 
                          self._file_contents, 'a', [(42, 128)])
490
 
 
491
 
 
492
 
class TestRangeRequestServer(TestSpecificRequestHandler):
493
 
    """Tests readv requests against server.
494
 
 
495
 
    We test against default "normal" server.
496
 
    """
497
 
 
498
 
    def setUp(self):
499
 
        super(TestRangeRequestServer, self).setUp()
500
 
        self.build_tree_contents([('a', '0123456789')],)
501
 
 
502
 
    def test_readv(self):
503
 
        server = self.get_readonly_server()
504
 
        t = self._transport(server.get_url())
505
 
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
506
 
        self.assertEqual(l[0], (0, '0'))
507
 
        self.assertEqual(l[1], (1, '1'))
508
 
        self.assertEqual(l[2], (3, '34'))
509
 
        self.assertEqual(l[3], (9, '9'))
510
 
 
511
 
    def test_readv_out_of_order(self):
512
 
        server = self.get_readonly_server()
513
 
        t = self._transport(server.get_url())
514
 
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
515
 
        self.assertEqual(l[0], (1, '1'))
516
 
        self.assertEqual(l[1], (9, '9'))
517
 
        self.assertEqual(l[2], (0, '0'))
518
 
        self.assertEqual(l[3], (3, '34'))
519
 
 
520
 
    def test_readv_invalid_ranges(self):
521
 
        server = self.get_readonly_server()
522
 
        t = self._transport(server.get_url())
523
 
 
524
 
        # This is intentionally reading off the end of the file
525
 
        # since we are sure that it cannot get there
526
 
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
527
 
                              t.readv, 'a', [(1,1), (8,10)])
528
 
 
529
 
        # This is trying to seek past the end of the file, it should
530
 
        # also raise a special error
531
 
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
532
 
                              t.readv, 'a', [(12,2)])
533
 
 
534
 
    def test_readv_multiple_get_requests(self):
535
 
        server = self.get_readonly_server()
536
 
        t = self._transport(server.get_url())
537
 
        # force transport to issue multiple requests
538
 
        t._max_readv_combine = 1
539
 
        t._max_get_ranges = 1
540
 
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
541
 
        self.assertEqual(l[0], (0, '0'))
542
 
        self.assertEqual(l[1], (1, '1'))
543
 
        self.assertEqual(l[2], (3, '34'))
544
 
        self.assertEqual(l[3], (9, '9'))
545
 
        # The server should have issued 4 requests
546
 
        self.assertEqual(4, server.GET_request_nb)
547
 
 
548
 
    def test_readv_get_max_size(self):
549
 
        server = self.get_readonly_server()
550
 
        t = self._transport(server.get_url())
551
 
        # force transport to issue multiple requests by limiting the number of
552
 
        # bytes by request. Note that this apply to coalesced offsets only, a
553
 
        # single range ill keep its size even if bigger than the limit.
554
 
        t._get_max_size = 2
555
 
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
556
 
        self.assertEqual(l[0], (0, '0'))
557
 
        self.assertEqual(l[1], (1, '1'))
558
 
        self.assertEqual(l[2], (2, '2345'))
559
 
        self.assertEqual(l[3], (6, '6789'))
560
 
        # The server should have issued 3 requests
561
 
        self.assertEqual(3, server.GET_request_nb)
562
 
 
563
 
 
564
 
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
565
 
    """Always reply to range request as if they were single.
566
 
 
567
 
    Don't be explicit about it, just to annoy the clients.
568
 
    """
569
 
 
570
 
    def get_multiple_ranges(self, file, file_size, ranges):
571
 
        """Answer as if it was a single range request and ignores the rest"""
572
 
        (start, end) = ranges[0]
573
 
        return self.get_single_range(file, file_size, start, end)
574
 
 
575
 
 
576
 
class TestSingleRangeRequestServer(TestRangeRequestServer):
577
 
    """Test readv against a server which accept only single range requests"""
578
 
 
579
 
    _req_handler_class = SingleRangeRequestHandler
580
 
 
581
 
 
582
 
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
583
 
    """Only reply to simple range requests, errors out on multiple"""
584
 
 
585
 
    def get_multiple_ranges(self, file, file_size, ranges):
586
 
        """Refuses the multiple ranges request"""
587
 
        if len(ranges) > 1:
588
 
            file.close()
589
 
            self.send_error(416, "Requested range not satisfiable")
590
 
            return
591
 
        (start, end) = ranges[0]
592
 
        return self.get_single_range(file, file_size, start, end)
593
 
 
594
 
 
595
 
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
596
 
    """Test readv against a server which only accept single range requests"""
597
 
 
598
 
    _req_handler_class = SingleOnlyRangeRequestHandler
599
 
 
600
 
 
601
 
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
602
 
    """Ignore range requests without notice"""
603
 
 
604
 
    def do_GET(self):
605
 
        # Update the statistics
606
 
        self.server.test_case_server.GET_request_nb += 1
607
 
        # Just bypass the range handling done by TestingHTTPRequestHandler
608
 
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
609
 
 
610
 
 
611
 
class TestNoRangeRequestServer(TestRangeRequestServer):
612
 
    """Test readv against a server which do not accept range requests"""
613
 
 
614
 
    _req_handler_class = NoRangeRequestHandler
615
 
 
616
 
 
617
 
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
618
 
    """Errors out when range specifiers exceed the limit"""
619
 
 
620
 
    def get_multiple_ranges(self, file, file_size, ranges):
621
 
        """Refuses the multiple ranges request"""
622
 
        tcs = self.server.test_case_server
623
 
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
624
 
            file.close()
625
 
            # Emulate apache behavior
626
 
            self.send_error(400, "Bad Request")
627
 
            return
628
 
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
629
 
            self, file, file_size, ranges)
630
 
 
631
 
 
632
 
class LimitedRangeHTTPServer(http_server.HttpServer):
633
 
    """An HttpServer erroring out on requests with too much range specifiers"""
634
 
 
635
 
    def __init__(self, request_handler=LimitedRangeRequestHandler,
636
 
                 protocol_version=None,
637
 
                 range_limit=None):
638
 
        http_server.HttpServer.__init__(self, request_handler,
639
 
                                        protocol_version=protocol_version)
640
 
        self.range_limit = range_limit
641
 
 
642
 
 
643
 
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
644
 
    """Tests readv requests against a server erroring out on too much ranges."""
645
 
 
646
 
    range_limit = 3
647
 
 
648
 
    def create_transport_readonly_server(self):
649
 
        # Requests with more range specifiers will error out
650
 
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
651
 
                                      protocol_version=self._protocol_version)
652
 
 
653
 
    def get_transport(self):
654
 
        return self._transport(self.get_readonly_server().get_url())
655
 
 
656
 
    def setUp(self):
657
 
        http_utils.TestCaseWithWebserver.setUp(self)
658
 
        # We need to manipulate ranges that correspond to real chunks in the
659
 
        # response, so we build a content appropriately.
660
 
        filler = ''.join(['abcdefghij' for x in range(102)])
661
 
        content = ''.join(['%04d' % v + filler for v in range(16)])
662
 
        self.build_tree_contents([('a', content)],)
663
 
 
664
 
    def test_few_ranges(self):
665
 
        t = self.get_transport()
666
 
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
667
 
        self.assertEqual(l[0], (0, '0000'))
668
 
        self.assertEqual(l[1], (1024, '0001'))
669
 
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
670
 
 
671
 
    def test_more_ranges(self):
672
 
        t = self.get_transport()
673
 
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
674
 
        self.assertEqual(l[0], (0, '0000'))
675
 
        self.assertEqual(l[1], (1024, '0001'))
676
 
        self.assertEqual(l[2], (4096, '0004'))
677
 
        self.assertEqual(l[3], (8192, '0008'))
678
 
        # The server will refuse to serve the first request (too much ranges),
679
 
        # a second request will succeeds.
680
 
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
681
 
 
682
 
 
683
 
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
684
 
    """Tests proxy server.
685
 
 
686
 
    Be aware that we do not setup a real proxy here. Instead, we
687
 
    check that the *connection* goes through the proxy by serving
688
 
    different content (the faked proxy server append '-proxied'
689
 
    to the file names).
690
 
    """
691
 
 
692
 
    # FIXME: We don't have an https server available, so we don't
693
 
    # test https connections.
694
 
 
695
 
    def setUp(self):
696
 
        super(TestProxyHttpServer, self).setUp()
697
 
        self.build_tree_contents([('foo', 'contents of foo\n'),
698
 
                                  ('foo-proxied', 'proxied contents of foo\n')])
699
 
        # Let's setup some attributes for tests
700
 
        self.server = self.get_readonly_server()
701
 
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
702
 
        if self._testing_pycurl():
703
 
            # Oh my ! pycurl does not check for the port as part of
704
 
            # no_proxy :-( So we just test the host part
705
 
            self.no_proxy_host = 'localhost'
706
 
        else:
707
 
            self.no_proxy_host = self.proxy_address
708
 
        # The secondary server is the proxy
709
 
        self.proxy = self.get_secondary_server()
710
 
        self.proxy_url = self.proxy.get_url()
711
 
        self._old_env = {}
712
 
 
713
 
    def _testing_pycurl(self):
714
 
        return pycurl_present and self._transport == PyCurlTransport
715
 
 
716
 
    def create_transport_secondary_server(self):
717
 
        """Creates an http server that will serve files with
718
 
        '-proxied' appended to their names.
719
 
        """
720
 
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
721
 
 
722
 
    def _install_env(self, env):
723
 
        for name, value in env.iteritems():
724
 
            self._old_env[name] = osutils.set_or_unset_env(name, value)
725
 
 
726
 
    def _restore_env(self):
727
 
        for name, value in self._old_env.iteritems():
728
 
            osutils.set_or_unset_env(name, value)
729
 
 
730
 
    def proxied_in_env(self, env):
731
 
        self._install_env(env)
732
 
        url = self.server.get_url()
733
 
        t = self._transport(url)
734
 
        try:
735
 
            self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
736
 
        finally:
737
 
            self._restore_env()
738
 
 
739
 
    def not_proxied_in_env(self, env):
740
 
        self._install_env(env)
741
 
        url = self.server.get_url()
742
 
        t = self._transport(url)
743
 
        try:
744
 
            self.assertEqual(t.get('foo').read(), 'contents of foo\n')
745
 
        finally:
746
 
            self._restore_env()
747
 
 
748
 
    def test_http_proxy(self):
749
 
        self.proxied_in_env({'http_proxy': self.proxy_url})
750
 
 
751
 
    def test_HTTP_PROXY(self):
752
 
        if self._testing_pycurl():
753
 
            # pycurl does not check HTTP_PROXY for security reasons
754
 
            # (for use in a CGI context that we do not care
755
 
            # about. Should we ?)
756
 
            raise tests.TestNotApplicable(
757
 
                'pycurl does not check HTTP_PROXY for security reasons')
758
 
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
759
 
 
760
 
    def test_all_proxy(self):
761
 
        self.proxied_in_env({'all_proxy': self.proxy_url})
762
 
 
763
 
    def test_ALL_PROXY(self):
764
 
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
765
 
 
766
 
    def test_http_proxy_with_no_proxy(self):
767
 
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
768
 
                                 'no_proxy': self.no_proxy_host})
769
 
 
770
 
    def test_HTTP_PROXY_with_NO_PROXY(self):
771
 
        if self._testing_pycurl():
772
 
            raise tests.TestNotApplicable(
773
 
                'pycurl does not check HTTP_PROXY for security reasons')
774
 
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
775
 
                                 'NO_PROXY': self.no_proxy_host})
776
 
 
777
 
    def test_all_proxy_with_no_proxy(self):
778
 
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
779
 
                                 'no_proxy': self.no_proxy_host})
780
 
 
781
 
    def test_ALL_PROXY_with_NO_PROXY(self):
782
 
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
783
 
                                 'NO_PROXY': self.no_proxy_host})
784
 
 
785
 
    def test_http_proxy_without_scheme(self):
786
 
        if self._testing_pycurl():
787
 
            # pycurl *ignores* invalid proxy env variables. If that ever change
788
 
            # in the future, this test will fail indicating that pycurl do not
789
 
            # ignore anymore such variables.
790
 
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
791
 
        else:
792
 
            self.assertRaises(errors.InvalidURL,
793
 
                              self.proxied_in_env,
794
 
                              {'http_proxy': self.proxy_address})
795
 
 
796
 
 
797
 
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
798
 
    """Test redirection between http servers."""
799
 
 
800
 
    def create_transport_secondary_server(self):
801
 
        """Create the secondary server redirecting to the primary server"""
802
 
        new = self.get_readonly_server()
803
 
 
804
 
        redirecting = http_utils.HTTPServerRedirecting(
805
 
            protocol_version=self._protocol_version)
806
 
        redirecting.redirect_to(new.host, new.port)
807
 
        return redirecting
808
 
 
809
 
    def setUp(self):
810
 
        super(TestHTTPRedirections, self).setUp()
811
 
        self.build_tree_contents([('a', '0123456789'),
812
 
                                  ('bundle',
813
 
                                  '# Bazaar revision bundle v0.9\n#\n')
814
 
                                  ],)
815
 
 
816
 
        self.old_transport = self._transport(self.old_server.get_url())
817
 
 
818
 
    def test_redirected(self):
819
 
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
820
 
        t = self._transport(self.new_server.get_url())
821
 
        self.assertEqual('0123456789', t.get('a').read())
822
 
 
823
 
    def test_read_redirected_bundle_from_url(self):
824
 
        from bzrlib.bundle import read_bundle_from_url
825
 
        url = self.old_transport.abspath('bundle')
826
 
        bundle = read_bundle_from_url(url)
827
 
        # If read_bundle_from_url was successful we get an empty bundle
828
 
        self.assertEqual([], bundle.revisions)
829
 
 
830
 
 
831
 
class RedirectedRequest(_urllib2_wrappers.Request):
832
 
    """Request following redirections. """
833
 
 
834
 
    init_orig = _urllib2_wrappers.Request.__init__
835
 
 
836
 
    def __init__(self, method, url, *args, **kwargs):
837
 
        """Constructor.
838
 
 
839
 
        """
840
 
        # Since the tests using this class will replace
841
 
        # _urllib2_wrappers.Request, we can't just call the base class __init__
842
 
        # or we'll loop.
843
 
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
844
 
        self.follow_redirections = True
845
 
 
846
 
 
847
 
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
848
 
    """Test redirections.
849
 
 
850
 
    http implementations do not redirect silently anymore (they
851
 
    do not redirect at all in fact). The mechanism is still in
852
 
    place at the _urllib2_wrappers.Request level and these tests
853
 
    exercise it.
854
 
 
855
 
    For the pycurl implementation
856
 
    the redirection have been deleted as we may deprecate pycurl
857
 
    and I have no place to keep a working implementation.
858
 
    -- vila 20070212
859
 
    """
860
 
 
861
 
    def setUp(self):
862
 
        if pycurl_present and self._transport == PyCurlTransport:
863
 
            raise tests.TestNotApplicable(
864
 
                "pycurl doesn't redirect silently annymore")
865
 
        super(TestHTTPSilentRedirections, self).setUp()
866
 
        self.setup_redirected_request()
867
 
        self.addCleanup(self.cleanup_redirected_request)
868
 
        self.build_tree_contents([('a','a'),
869
 
                                  ('1/',),
870
 
                                  ('1/a', 'redirected once'),
871
 
                                  ('2/',),
872
 
                                  ('2/a', 'redirected twice'),
873
 
                                  ('3/',),
874
 
                                  ('3/a', 'redirected thrice'),
875
 
                                  ('4/',),
876
 
                                  ('4/a', 'redirected 4 times'),
877
 
                                  ('5/',),
878
 
                                  ('5/a', 'redirected 5 times'),
879
 
                                  ],)
880
 
 
881
 
        self.old_transport = self._transport(self.old_server.get_url())
882
 
 
883
 
    def setup_redirected_request(self):
884
 
        self.original_class = _urllib2_wrappers.Request
885
 
        _urllib2_wrappers.Request = RedirectedRequest
886
 
 
887
 
    def cleanup_redirected_request(self):
888
 
        _urllib2_wrappers.Request = self.original_class
889
 
 
890
 
    def create_transport_secondary_server(self):
891
 
        """Create the secondary server, redirections are defined in the tests"""
892
 
        return http_utils.HTTPServerRedirecting(
893
 
            protocol_version=self._protocol_version)
894
 
 
895
 
    def test_one_redirection(self):
896
 
        t = self.old_transport
897
 
 
898
 
        req = RedirectedRequest('GET', t.abspath('a'))
899
 
        req.follow_redirections = True
900
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
901
 
                                       self.new_server.port)
902
 
        self.old_server.redirections = \
903
 
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
904
 
        self.assertEquals('redirected once',t._perform(req).read())
905
 
 
906
 
    def test_five_redirections(self):
907
 
        t = self.old_transport
908
 
 
909
 
        req = RedirectedRequest('GET', t.abspath('a'))
910
 
        req.follow_redirections = True
911
 
        old_prefix = 'http://%s:%s' % (self.old_server.host,
912
 
                                       self.old_server.port)
913
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
914
 
                                       self.new_server.port)
915
 
        self.old_server.redirections = \
916
 
            [('/1(.*)', r'%s/2\1' % (old_prefix), 302),
917
 
             ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
918
 
             ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
919
 
             ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
920
 
             ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
921
 
             ]
922
 
        self.assertEquals('redirected 5 times',t._perform(req).read())
923
 
 
924
 
 
925
 
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
926
 
    """Test transport.do_catching_redirections."""
927
 
 
928
 
    def setUp(self):
929
 
        super(TestDoCatchRedirections, self).setUp()
930
 
        self.build_tree_contents([('a', '0123456789'),],)
931
 
 
932
 
        self.old_transport = self._transport(self.old_server.get_url())
933
 
 
934
 
    def get_a(self, transport):
935
 
        return transport.get('a')
936
 
 
937
 
    def test_no_redirection(self):
938
 
        t = self._transport(self.new_server.get_url())
939
 
 
940
 
        # We use None for redirected so that we fail if redirected
941
 
        self.assertEquals('0123456789',
942
 
                          transport.do_catching_redirections(
943
 
                self.get_a, t, None).read())
944
 
 
945
 
    def test_one_redirection(self):
946
 
        self.redirections = 0
947
 
 
948
 
        def redirected(transport, exception, redirection_notice):
949
 
            self.redirections += 1
950
 
            dir, file = urlutils.split(exception.target)
951
 
            return self._transport(dir)
952
 
 
953
 
        self.assertEquals('0123456789',
954
 
                          transport.do_catching_redirections(
955
 
                self.get_a, self.old_transport, redirected).read())
956
 
        self.assertEquals(1, self.redirections)
957
 
 
958
 
    def test_redirection_loop(self):
959
 
 
960
 
        def redirected(transport, exception, redirection_notice):
961
 
            # By using the redirected url as a base dir for the
962
 
            # *old* transport, we create a loop: a => a/a =>
963
 
            # a/a/a
964
 
            return self.old_transport.clone(exception.target)
965
 
 
966
 
        self.assertRaises(errors.TooManyRedirections,
967
 
                          transport.do_catching_redirections,
968
 
                          self.get_a, self.old_transport, redirected)
969
 
 
970
 
 
971
 
class TestAuth(http_utils.TestCaseWithWebserver):
972
 
    """Test authentication scheme"""
973
 
 
974
 
    _auth_header = 'Authorization'
975
 
    _password_prompt_prefix = ''
976
 
 
977
 
    def setUp(self):
978
 
        super(TestAuth, self).setUp()
979
 
        self.server = self.get_readonly_server()
980
 
        self.build_tree_contents([('a', 'contents of a\n'),
981
 
                                  ('b', 'contents of b\n'),])
982
 
 
983
 
    def create_transport_readonly_server(self):
984
 
        if self._auth_scheme == 'basic':
985
 
            server = http_utils.HTTPBasicAuthServer(
986
 
                protocol_version=self._protocol_version)
987
 
        else:
988
 
            if self._auth_scheme != 'digest':
989
 
                raise AssertionError('Unknown auth scheme: %r'
990
 
                                     % self._auth_scheme)
991
 
            server = http_utils.HTTPDigestAuthServer(
992
 
                protocol_version=self._protocol_version)
993
 
        return server
994
 
 
995
 
    def _testing_pycurl(self):
996
 
        return pycurl_present and self._transport == PyCurlTransport
997
 
 
998
 
    def get_user_url(self, user=None, password=None):
999
 
        """Build an url embedding user and password"""
1000
 
        url = '%s://' % self.server._url_protocol
1001
 
        if user is not None:
1002
 
            url += user
1003
 
            if password is not None:
1004
 
                url += ':' + password
1005
 
            url += '@'
1006
 
        url += '%s:%s/' % (self.server.host, self.server.port)
1007
 
        return url
1008
 
 
1009
 
    def get_user_transport(self, user=None, password=None):
1010
 
        return self._transport(self.get_user_url(user, password))
1011
 
 
1012
 
    def test_no_user(self):
1013
 
        self.server.add_user('joe', 'foo')
1014
 
        t = self.get_user_transport()
1015
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1016
 
        # Only one 'Authentication Required' error should occur
1017
 
        self.assertEqual(1, self.server.auth_required_errors)
1018
 
 
1019
 
    def test_empty_pass(self):
1020
 
        self.server.add_user('joe', '')
1021
 
        t = self.get_user_transport('joe', '')
1022
 
        self.assertEqual('contents of a\n', t.get('a').read())
1023
 
        # Only one 'Authentication Required' error should occur
1024
 
        self.assertEqual(1, self.server.auth_required_errors)
1025
 
 
1026
 
    def test_user_pass(self):
1027
 
        self.server.add_user('joe', 'foo')
1028
 
        t = self.get_user_transport('joe', 'foo')
1029
 
        self.assertEqual('contents of a\n', t.get('a').read())
1030
 
        # Only one 'Authentication Required' error should occur
1031
 
        self.assertEqual(1, self.server.auth_required_errors)
1032
 
 
1033
 
    def test_unknown_user(self):
1034
 
        self.server.add_user('joe', 'foo')
1035
 
        t = self.get_user_transport('bill', 'foo')
1036
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1037
 
        # Two 'Authentication Required' errors should occur (the
1038
 
        # initial 'who are you' and 'I don't know you, who are
1039
 
        # you').
1040
 
        self.assertEqual(2, self.server.auth_required_errors)
1041
 
 
1042
 
    def test_wrong_pass(self):
1043
 
        self.server.add_user('joe', 'foo')
1044
 
        t = self.get_user_transport('joe', 'bar')
1045
 
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
1046
 
        # Two 'Authentication Required' errors should occur (the
1047
 
        # initial 'who are you' and 'this is not you, who are you')
1048
 
        self.assertEqual(2, self.server.auth_required_errors)
1049
 
 
1050
 
    def test_prompt_for_password(self):
1051
 
        if self._testing_pycurl():
1052
 
            raise tests.TestNotApplicable(
1053
 
                'pycurl cannot prompt, it handles auth by embedding'
1054
 
                ' user:pass in urls only')
1055
 
 
1056
 
        self.server.add_user('joe', 'foo')
1057
 
        t = self.get_user_transport('joe', None)
1058
 
        stdout = tests.StringIOWrapper()
1059
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1060
 
        self.assertEqual('contents of a\n',t.get('a').read())
1061
 
        # stdin should be empty
1062
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1063
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1064
 
                                    stdout.getvalue())
1065
 
        # And we shouldn't prompt again for a different request
1066
 
        # against the same transport.
1067
 
        self.assertEqual('contents of b\n',t.get('b').read())
1068
 
        t2 = t.clone()
1069
 
        # And neither against a clone
1070
 
        self.assertEqual('contents of b\n',t2.get('b').read())
1071
 
        # Only one 'Authentication Required' error should occur
1072
 
        self.assertEqual(1, self.server.auth_required_errors)
1073
 
 
1074
 
    def _check_password_prompt(self, scheme, user, actual_prompt):
1075
 
        expected_prompt = (self._password_prompt_prefix
1076
 
                           + ("%s %s@%s:%d, Realm: '%s' password: "
1077
 
                              % (scheme.upper(),
1078
 
                                 user, self.server.host, self.server.port,
1079
 
                                 self.server.auth_realm)))
1080
 
        self.assertEquals(expected_prompt, actual_prompt)
1081
 
 
1082
 
    def test_no_prompt_for_password_when_using_auth_config(self):
1083
 
        if self._testing_pycurl():
1084
 
            raise tests.TestNotApplicable(
1085
 
                'pycurl does not support authentication.conf'
1086
 
                ' since it cannot prompt')
1087
 
 
1088
 
        user =' joe'
1089
 
        password = 'foo'
1090
 
        stdin_content = 'bar\n'  # Not the right password
1091
 
        self.server.add_user(user, password)
1092
 
        t = self.get_user_transport(user, None)
1093
 
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1094
 
                                            stdout=tests.StringIOWrapper())
1095
 
        # Create a minimal config file with the right password
1096
 
        conf = config.AuthenticationConfig()
1097
 
        conf._get_config().update(
1098
 
            {'httptest': {'scheme': 'http', 'port': self.server.port,
1099
 
                          'user': user, 'password': password}})
1100
 
        conf._save()
1101
 
        # Issue a request to the server to connect
1102
 
        self.assertEqual('contents of a\n',t.get('a').read())
1103
 
        # stdin should have  been left untouched
1104
 
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
1105
 
        # Only one 'Authentication Required' error should occur
1106
 
        self.assertEqual(1, self.server.auth_required_errors)
1107
 
 
1108
 
 
1109
 
 
1110
 
class TestProxyAuth(TestAuth):
1111
 
    """Test proxy authentication schemes."""
1112
 
 
1113
 
    _auth_header = 'Proxy-authorization'
1114
 
    _password_prompt_prefix='Proxy '
1115
 
 
1116
 
    def setUp(self):
1117
 
        super(TestProxyAuth, self).setUp()
1118
 
        self._old_env = {}
1119
 
        self.addCleanup(self._restore_env)
1120
 
        # Override the contents to avoid false positives
1121
 
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
1122
 
                                  ('b', 'not proxied contents of b\n'),
1123
 
                                  ('a-proxied', 'contents of a\n'),
1124
 
                                  ('b-proxied', 'contents of b\n'),
1125
 
                                  ])
1126
 
 
1127
 
    def create_transport_readonly_server(self):
1128
 
        if self._auth_scheme == 'basic':
1129
 
            server = http_utils.ProxyBasicAuthServer(
1130
 
                protocol_version=self._protocol_version)
1131
 
        else:
1132
 
            if self._auth_scheme != 'digest':
1133
 
                raise AssertionError('Unknown auth scheme: %r'
1134
 
                                     % self._auth_scheme)
1135
 
            server = http_utils.ProxyDigestAuthServer(
1136
 
                protocol_version=self._protocol_version)
1137
 
        return server
1138
 
 
1139
 
    def get_user_transport(self, user=None, password=None):
1140
 
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1141
 
        return self._transport(self.server.get_url())
1142
 
 
1143
 
    def _install_env(self, env):
1144
 
        for name, value in env.iteritems():
1145
 
            self._old_env[name] = osutils.set_or_unset_env(name, value)
1146
 
 
1147
 
    def _restore_env(self):
1148
 
        for name, value in self._old_env.iteritems():
1149
 
            osutils.set_or_unset_env(name, value)
1150
 
 
1151
 
    def test_empty_pass(self):
1152
 
        if self._testing_pycurl():
1153
 
            import pycurl
1154
 
            if pycurl.version_info()[1] < '7.16.0':
1155
 
                raise tests.KnownFailure(
1156
 
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
1157
 
        super(TestProxyAuth, self).test_empty_pass()
1158