3
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
4
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
5
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
7
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
10
# 2004-08-26 fl unified http callback
11
# 2004-10-09 fl factored out gzip_consumer support
12
# 2005-07-08 mbp experimental support for keepalive connections
14
# Copyright (c) 2001-2004 by Fredrik Lundh. All rights reserved.
19
"""async/pipelined http client
24
Users of this library pass in URLs they want to see, and consumer
25
objects that will receive the results at some point in the future.
26
Any number of requests may be queued up, and more may be added while
27
the download is in progress.
29
Requests can be both superscalar and superpipelined. That is to say,
30
for each server there can be multiple sockets open, and each socket
31
may have more than one request in flight.
36
There is a single DownloadManager, and a connection object for each
39
Request/consumer pairs are maintained in queues. Each connection has
40
a list of transmitted requests whose response has not yet been
41
received. There is also a per-server list of requests that have not
44
When a connection is ready to transmit a new request, it takes one
45
from the unsubmitted list, sends the request, and adds the request to
46
its unfulfilled list. This should happen when the connection has
47
space for more transmissions or when a new request is added by the
48
user. If the connection terminates with unfulfilled requests they are
49
put back onto the unsubmitted list, to be retried elsewhere.
51
Because responses come back precisely in order, the connection always
52
knows what it should expect next: the response for the next
56
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
57
# with a hostname does a remote DNS resolution, which is pretty sucky.
58
# Shouldn't there be a cache in glibc? We should probably cache the
59
# address in, say, the DownloadManager.
61
# TODO: A default consumer operation that writes the received data
62
# into a file; by default the file is named the same as the last
63
# component of the URL.
65
# TODO: A utility function that is given a list of URLs, and downloads
66
# them all parallel/pipelined. If any fail, it raises an exception
67
# (and discards the rest), or perhaps can be told to continue anyhow.
68
# The content is written into temporary files. It returns a list of
69
# readable file objects.
71
# TODO: If we try pipelined or keepalive and the connection drop out
72
# then retry the request on a new connection; eventually we should perhaps
73
# learn that a given host or network just won't allow keepalive.
77
import socket, string, time, sys
79
import mimetools, urlparse, urllib
82
logging.basicConfig(level=logging.DEBUG,
83
format='%(asctime)s %(levelname)s %(message)s',
84
filename='/tmp/http_client.log',
87
logger = logging.getLogger('bzr.http_client')
94
# Close connection. Request handlers can raise this exception to
95
# indicate that the connection should be closed.
97
class CloseConnection(Exception):
101
# Redirect connection. Request handlers can raise this exception to
102
# indicate that the a new request should be issued.
104
class Redirect(CloseConnection):
105
def __init__(self, location):
106
self.location = location
109
class DownloadManager(object):
110
"""Handles pipelined/overlapped downloads.
112
Pass in a series of URLs with handlers to receive the response.
113
This object will spread the requests over however many sockets
117
Requests not assigned to any channel
120
Currently assigned to a channel
123
self.queued_requests = []
124
# self.channel = HttpChannel('localhost', 8000, self)
126
self.try_pipelined = False
127
self.try_keepalive = False
128
self.max_channels = 5
131
def enqueue(self, url, consumer):
132
self.queued_requests.append((url, consumer))
133
self._wake_up_channel()
136
def _channel_closed(self, channel):
137
"""Called by the channel when its socket closes.
139
self.channels.remove(channel)
140
if self.queued_requests:
142
self._wake_up_channel()
145
def _make_channel(self):
146
# proxy2 203.17.154.69
147
# return HttpChannel('82.211.81.161', 80, self) # bazaar-ng.org
148
# return HttpChannel('203.17.154.69', 8080, self)
149
return HttpChannel('127.0.0.1', 8000, self) # forwarded
152
def _wake_up_channel(self):
153
"""Try to wake up one channel to send the newly-added request.
155
There may be more than one request pending, and this may cause
156
more than one channel to take requests. That's OK; some of
157
them may be frustrated.
159
from random import shuffle, choice
161
# first, wake up any idle channels
163
for ch in self.channels:
164
if not ch.sent_requests:
168
debug("woke existing idle channel(s)")
171
if len(self.channels) < self.max_channels:
172
newch = self._make_channel()
173
self.channels.append(newch)
175
debug("created new channel")
178
if self.try_pipelined:
179
# ask existing channels to take it
180
debug("woke busy channel")
181
choice(self.channels).take_one()
184
# debug("request postponed until a channel's idle")
190
"""Run until all outstanding requests have been served."""
191
#while self.running_requests or self.queued_requests \
192
# or not self.channel.is_idle():
193
# asyncore.loop(count=1)
198
class Response(object):
199
"""Holds in-flight response."""
203
def _parse_response_http10(header):
204
from cStringIO import StringIO
206
fp = StringIO(header)
209
r.status = fp.readline().split(" ", 2)
210
r.headers = mimetools.Message(fp)
212
# we can only(?) expect to do keepalive if we got either a
213
# content-length or chunked encoding; otherwise there's no way to know
214
# when the content ends apart from through the connection close
215
r.content_type = r.headers.get("content-type")
217
r.content_length = int(r.headers.get("content-length"))
218
except (ValueError, TypeError):
219
r.content_length = None
220
debug("seen content length of %r" % r.content_length)
222
r.transfer_encoding = r.headers.get("transfer-encoding")
223
r.content_encoding = r.headers.get("content-encoding")
224
r.connection_reply = r.headers.get("connection")
226
# TODO: pass status code to consumer?
228
if r.transfer_encoding:
229
raise NotImplementedError()
231
if r.transfer_encoding:
232
raise NotImplementedError()
234
if int(r.status[1]) != 200:
235
debug("can't handle response status %r" % r.status)
236
raise NotImplementedError()
238
if r.content_length is None:
239
raise NotImplementedError()
241
if r.content_length == 0:
242
raise NotImplementedError()
244
r.content_remaining = r.content_length
254
class HttpChannel(asyncore.dispatcher_with_send):
255
"""One http socket, pipelining if possible."""
256
# asynchronous http client
258
user_agent = "http_client.py 1.3ka (based on effbot)"
260
proxies = urllib.getproxies()
262
def __init__(self, ip_host, ip_port, manager):
263
asyncore.dispatcher_with_send.__init__(self)
264
self.manager = manager
266
# if a response header has been seen, this holds it
271
self.chunk_size = None
273
self.timestamp = time.time()
275
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
276
debug('connecting...')
277
self.connect((ip_host, ip_port))
279
# sent_requests holds (url, consumer)
280
self.sent_requests = []
286
return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
290
return (not self.sent_requests)
293
def handle_connect(self):
299
"""Accept one request from the manager if possible."""
300
if self.manager.try_pipelined:
301
if len(self.sent_requests) > 4:
304
if len(self.sent_requests) > 0:
308
url, consumer = self.manager.queued_requests.pop(0)
309
debug('request accepted by channel')
313
# TODO: If there are too many already in flight, don't take one.
314
# TODO: If the socket's not writable (tx buffer full), don't take.
315
self._push_request_http10(url, consumer)
319
def _push_request_http10(self, url, consumer):
320
"""Send a request, and add it to the outstanding queue."""
321
# TODO: check the url requested is appropriate for this connection
323
# TODO: If there are too many requests outstanding or (less likely) the
324
# connection fails, queue it for later use.
326
# TODO: Keep track of requests that have been sent but not yet fulfilled,
327
# because we might need to retransmit them if the connection fails. (Or
328
# should the caller do that?)
330
request = self._form_request_http10(url)
331
debug('send request for %s from %r' % (url, self))
333
# dispatcher_with_send handles buffering the data until it can
334
# be written, and hooks handle_write.
338
self.sent_requests.append((url, consumer))
341
def _form_request_http10(self, url):
342
# TODO: get right vhost name
344
"GET %s HTTP/1.0" % (url),
345
"Host: www.bazaar-ng.org",
348
if self.manager.try_keepalive or self.manager.try_pipelined:
351
"Connection: keep-alive",
354
# make sure to include a user agent
355
for header in request:
356
if string.lower(header).startswith("user-agent:"):
359
request.append("User-Agent: %s" % self.user_agent)
361
return string.join(request, "\r\n") + "\r\n\r\n"
364
def handle_read(self):
365
# handle incoming data
366
data = self.recv(2048)
368
self.data = self.data + data
371
debug('got %d bytes from socket' % len(data))
373
debug('server closed connection')
376
consumer = self.sent_requests[0][1]
377
if not self.response:
378
# do not have a full response header yet
380
# check if we've seen a full header
381
debug('getting header for %s' % self.sent_requests[0][0])
383
header = self.data.split("\r\n\r\n", 1)
386
header, self.data = header
388
self.response = _parse_response_http10(header)
389
self.content_remaining = self.response.content_length
394
# we now know how many (more) content bytes we have, and how much
395
# is in the data buffer. there are two main possibilities:
396
# too much data, and some must be left behind containing the next
397
# response headers, or too little, or possibly just right
399
want = self.content_remaining
401
got_data = self.data[:want]
402
self.data = self.data[want:]
406
self.content_remaining -= len(got_data)
408
debug('pass back %d bytes of %s, %d remain'
410
self.sent_requests[0][0],
411
self.content_remaining))
414
if self.content_remaining == 0:
415
del self.sent_requests[0]
417
debug('content complete')
418
consumer.content_complete()
420
# reset lots of things and try to get the next response header
421
if self.response.connection_reply == 'close':
422
debug('server requested close')
423
self.manager._channel_closed(self)
425
elif not self.manager.try_keepalive:
426
debug('no keepalive for this socket')
427
self.manager._channel_closed(self)
430
debug("ready for next header...")
436
def handle_close(self):
437
debug('async told us of close on %r' % self)
438
# if there are outstanding requests should probably reopen and
439
# retransmit, but if we're not making any progress then give up
440
self.manager._channel_closed(self)
445
def __init__(self, url, pb):
450
def feed(self, data):
451
# print "feed", repr(data)
452
# print "feed", repr(data[:20]), repr(data[-20:]), len(data)
454
base = self.url[self.url.rindex('/')+1:]
455
self.outf = file('/tmp/download/' + base, 'wb')
456
self.outf.write(data)
458
def error(self, err_info):
460
error('error reported to consumer')
461
traceback.print_exception(err_info[0], err_info[1], err_info[2])
464
def content_complete(self):
465
info('content complete from %s' % self.url)
468
# using last_cnt is cheating
469
self._pb.update('downloading inventory',
475
if __name__ == "__main__":
476
logging.basicConfig(level=logging.DEBUG)
478
mgr = DownloadManager()
480
from bzrlib.branch import Branch
481
from bzrlib.progress import ProgressBar
484
revs = Branch('/home/mbp/work/bzr').revision_history()
485
pb.update('downloading inventories', 0, len(revs))
488
url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
490
mgr.enqueue(url, DummyConsumer(url, pb))
497
# for url in ['http://www.bazaar-ng.org/',
498
# 'http://www.bazaar-ng.org/tutorial.html',
499
# 'http://www.bazaar-ng.org/download.html',
500
# 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',