251
203
return self.__secondary_server
254
class ProxyServer(HttpServer):
255
"""A proxy test server for http transports."""
257
proxy_requests = True
260
class RedirectRequestHandler(TestingHTTPRequestHandler):
261
"""Redirect all request to the specified server"""
263
def parse_request(self):
264
"""Redirect a single HTTP request to another host"""
265
valid = TestingHTTPRequestHandler.parse_request(self)
267
tcs = self.server.test_case_server
268
code, target = tcs.is_redirected(self.path)
269
if code is not None and target is not None:
270
# Redirect as instructed
271
self.send_response(code)
272
self.send_header('Location', target)
274
return False # The job is done
276
# We leave the parent class serve the request
281
class HTTPServerRedirecting(HttpServer):
282
"""An HttpServer redirecting to another server """
284
def __init__(self, request_handler=RedirectRequestHandler):
285
HttpServer.__init__(self, request_handler)
286
# redirections is a list of tuples (source, target, code)
287
# - source is a regexp for the paths requested
288
# - target is a replacement for re.sub describing where
289
# the request will be redirected
290
# - code is the http error code associated to the
291
# redirection (301 permanent, 302 temporarry, etc
292
self.redirections = []
294
def redirect_to(self, host, port):
295
"""Redirect all requests to a specific host:port"""
296
self.redirections = [('(.*)',
297
r'http://%s:%s\1' % (host, port) ,
300
def is_redirected(self, path):
301
"""Is the path redirected by this server.
303
:param path: the requested relative path
305
:returns: a tuple (code, target) if a matching
306
redirection is found, (None, None) otherwise.
310
for (rsource, rtarget, rcode) in self.redirections:
311
target, match = re.subn(rsource, rtarget, path)
314
break # The first match wins
320
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
321
"""A support class providing redirections from one server to another.
323
We set up two webservers to allows various tests involving
325
The 'old' server is redirected to the 'new' server.
328
def create_transport_secondary_server(self):
329
"""Create the secondary server redirecting to the primary server"""
330
new = self.get_readonly_server()
331
redirecting = HTTPServerRedirecting()
332
redirecting.redirect_to(new.host, new.port)
336
super(TestCaseWithRedirectedWebserver, self).setUp()
337
# The redirections will point to the new server
338
self.new_server = self.get_readonly_server()
339
# The requests to the old server will be redirected
340
self.old_server = self.get_secondary_server()
343
class AuthRequestHandler(TestingHTTPRequestHandler):
344
"""Requires an authentication to process requests.
346
This is intended to be used with a server that always and
347
only use one authentication scheme (implemented by daughter
351
# The following attributes should be defined in the server
352
# - auth_header_sent: the header name sent to require auth
353
# - auth_header_recv: the header received containing auth
354
# - auth_error_code: the error code to indicate auth required
357
if self.authorized():
358
return TestingHTTPRequestHandler.do_GET(self)
360
# Note that we must update test_case_server *before*
361
# sending the error or the client may try to read it
362
# before we have sent the whole error back.
363
tcs = self.server.test_case_server
364
tcs.auth_required_errors += 1
365
self.send_response(tcs.auth_error_code)
366
self.send_header_auth_reqed()
371
class BasicAuthRequestHandler(AuthRequestHandler):
372
"""Implements the basic authentication of a request"""
374
def authorized(self):
375
tcs = self.server.test_case_server
376
if tcs.auth_scheme != 'basic':
379
auth_header = self.headers.get(tcs.auth_header_recv, None)
381
scheme, raw_auth = auth_header.split(' ', 1)
382
if scheme.lower() == tcs.auth_scheme:
383
user, password = raw_auth.decode('base64').split(':')
384
return tcs.authorized(user, password)
388
def send_header_auth_reqed(self):
389
tcs = self.server.test_case_server
390
self.send_header(tcs.auth_header_sent,
391
'Basic realm="%s"' % tcs.auth_realm)
394
# FIXME: We could send an Authentication-Info header too when
395
# the authentication is succesful
397
class DigestAuthRequestHandler(AuthRequestHandler):
398
"""Implements the digest authentication of a request.
400
We need persistence for some attributes and that can't be
401
achieved here since we get instantiated for each request. We
402
rely on the DigestAuthServer to take care of them.
405
def authorized(self):
406
tcs = self.server.test_case_server
407
if tcs.auth_scheme != 'digest':
410
auth_header = self.headers.get(tcs.auth_header_recv, None)
411
if auth_header is None:
413
scheme, auth = auth_header.split(None, 1)
414
if scheme.lower() == tcs.auth_scheme:
415
auth_dict = urllib2.parse_keqv_list(urllib2.parse_http_list(auth))
417
return tcs.digest_authorized(auth_dict, self.command)
421
def send_header_auth_reqed(self):
422
tcs = self.server.test_case_server
423
header = 'Digest realm="%s", ' % tcs.auth_realm
424
header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
426
self.send_header(tcs.auth_header_sent,header)
429
class AuthServer(HttpServer):
430
"""Extends HttpServer with a dictionary of passwords.
432
This is used as a base class for various schemes which should
433
all use or redefined the associated AuthRequestHandler.
435
Note that no users are defined by default, so add_user should
436
be called before issuing the first request.
439
# The following attributes should be set dy daughter classes
440
# and are used by AuthRequestHandler.
441
auth_header_sent = None
442
auth_header_recv = None
443
auth_error_code = None
444
auth_realm = "Thou should not pass"
446
def __init__(self, request_handler, auth_scheme):
447
HttpServer.__init__(self, request_handler)
448
self.auth_scheme = auth_scheme
449
self.password_of = {}
450
self.auth_required_errors = 0
452
def add_user(self, user, password):
453
"""Declare a user with an associated password.
455
password can be empty, use an empty string ('') in that
458
self.password_of[user] = password
460
def authorized(self, user, password):
461
"""Check that the given user provided the right password"""
462
expected_password = self.password_of.get(user, None)
463
return expected_password is not None and password == expected_password
466
# FIXME: There is some code duplication with
467
# _urllib2_wrappers.py.DigestAuthHandler. If that duplciation
468
# grows, it may require a refactoring. Also, we don't implement
469
# SHA algorithm nor MD5-sess here, but that does not seem worth
471
class DigestAuthServer(AuthServer):
472
"""A digest authentication server"""
476
def __init__(self, request_handler, auth_scheme):
477
AuthServer.__init__(self, request_handler, auth_scheme)
479
def digest_authorized(self, auth, command):
480
nonce = auth['nonce']
481
if nonce != self.auth_nonce:
483
realm = auth['realm']
484
if realm != self.auth_realm:
486
user = auth['username']
487
if not self.password_of.has_key(user):
489
algorithm= auth['algorithm']
490
if algorithm != 'MD5':
496
password = self.password_of[user]
498
# Recalculate the response_digest to compare with the one
500
A1 = '%s:%s:%s' % (user, realm, password)
501
A2 = '%s:%s' % (command, auth['uri'])
503
H = lambda x: md5.new(x).hexdigest()
504
KD = lambda secret, data: H("%s:%s" % (secret, data))
506
nonce_count = int(auth['nc'], 16)
508
ncvalue = '%08x' % nonce_count
510
cnonce = auth['cnonce']
511
noncebit = '%s:%s:%s:%s:%s' % (nonce, ncvalue, cnonce, qop, H(A2))
512
response_digest = KD(H(A1), noncebit)
514
return response_digest == auth['response']
516
class HTTPAuthServer(AuthServer):
517
"""An HTTP server requiring authentication"""
519
def init_http_auth(self):
520
self.auth_header_sent = 'WWW-Authenticate'
521
self.auth_header_recv = 'Authorization'
522
self.auth_error_code = 401
525
class ProxyAuthServer(AuthServer):
526
"""A proxy server requiring authentication"""
528
def init_proxy_auth(self):
529
self.proxy_requests = True
530
self.auth_header_sent = 'Proxy-Authenticate'
531
self.auth_header_recv = 'Proxy-Authorization'
532
self.auth_error_code = 407
535
class HTTPBasicAuthServer(HTTPAuthServer):
536
"""An HTTP server requiring basic authentication"""
539
HTTPAuthServer.__init__(self, BasicAuthRequestHandler, 'basic')
540
self.init_http_auth()
543
class HTTPDigestAuthServer(DigestAuthServer, HTTPAuthServer):
544
"""An HTTP server requiring digest authentication"""
547
DigestAuthServer.__init__(self, DigestAuthRequestHandler, 'digest')
548
self.init_http_auth()
551
class ProxyBasicAuthServer(ProxyAuthServer):
552
"""A proxy server requiring basic authentication"""
555
ProxyAuthServer.__init__(self, BasicAuthRequestHandler, 'basic')
556
self.init_proxy_auth()
559
class ProxyDigestAuthServer(DigestAuthServer, ProxyAuthServer):
560
"""A proxy server requiring basic authentication"""
563
ProxyAuthServer.__init__(self, DigestAuthRequestHandler, 'digest')
564
self.init_proxy_auth()
206
class FakeProxyRequestHandler(TestingHTTPRequestHandler):
207
"""Append a '-proxied' suffix to file served"""
209
def translate_path(self, path):
210
# We need to act as a proxy and accept absolute urls,
211
# which SimpleHTTPRequestHandler (grand parent) is not
212
# ready for. So we just drop the protocol://host:port
213
# part in front of the request-url (because we know we
214
# would not forward the request to *another* proxy).
216
# So we do what SimpleHTTPRequestHandler.translate_path
217
# do beginning with python 2.4.3: abandon query
218
# parameters, scheme, host port, etc (which ensure we
219
# provide the right behaviour on all python versions).
220
path = urlparse.urlparse(path)[2]
221
# And now, we can apply *our* trick to proxy files
222
self.path += '-proxied'
223
# An finally we leave our mother class do whatever it
224
# wants with the path
225
return TestingHTTPRequestHandler.translate_path(self, path)