3
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
4
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
5
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
7
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
10
# 2004-08-26 fl unified http callback
11
# 2004-10-09 fl factored out gzip_consumer support
12
# 2005-07-08 mbp experimental support for keepalive connections
14
# Copyright (c) 2001-2004 by Fredrik Lundh. All rights reserved.
19
"""async/pipelined http client
24
Users of this library pass in URLs they want to see, and consumer
25
objects that will receive the results at some point in the future.
26
Any number of requests may be queued up, and more may be added while
27
the download is in progress.
29
Requests can be both superscalar and superpipelined. That is to say,
30
for each server there can be multiple sockets open, and each socket
31
may have more than one request in flight.
36
There is a single DownloadManager, and a connection object for each
39
Request/consumer pairs are maintained in queues. Each connection has
40
a list of transmitted requests whose response has not yet been
41
received. There is also a per-server list of requests that have not
44
When a connection is ready to transmit a new request, it takes one
45
from the unsubmitted list, sends the request, and adds the request to
46
its unfulfilled list. This should happen when the connection has
47
space for more transmissions or when a new request is added by the
48
user. If the connection terminates with unfulfilled requests they are
49
put back onto the unsubmitted list, to be retried elsewhere.
51
Because responses come back precisely in order, the connection always
52
knows what it should expect next: the response for the next
56
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
57
# with a hostname does a remote DNS resolution, which is pretty sucky.
58
# Shouldn't there be a cache in glibc? We should probably cache the
59
# address in, say, the DownloadManager.
61
# TODO: A default consumer operation that writes the received data
62
# into a file; by default the file is named the same as the last
63
# component of the URL.
65
# TODO: A utility function that is given a list of URLs, and downloads
66
# them all parallel/pipelined. If any fail, it raises an exception
67
# (and discards the rest), or perhaps can be told to continue anyhow.
68
# The content is written into temporary files. It returns a list of
69
# readable file objects.
72
import socket, string, time, sys
74
import mimetools, urlparse, urllib
77
logger = logging.getLogger('bzr.http_client')
84
# Close connection. Request handlers can raise this exception to
85
# indicate that the connection should be closed.
87
class CloseConnection(Exception):
91
# Redirect connection. Request handlers can raise this exception to
92
# indicate that the a new request should be issued.
94
class Redirect(CloseConnection):
95
def __init__(self, location):
96
self.location = location
99
class DownloadManager(object):
100
"""Handles pipelined/overlapped downloads.
102
Pass in a series of URLs with handlers to receive the response.
103
This object will spread the requests over however many sockets
107
Requests not assigned to any channel
110
Currently assigned to a channel
113
self.queued_requests = []
114
# self.channel = HttpChannel('localhost', 8000, self)
116
self.try_pipelined = False
117
self.try_keepalive = False
118
self.max_channels = 3
121
def enqueue(self, url, consumer):
122
self.queued_requests.append((url, consumer))
123
self._wake_up_channel()
126
def _channel_closed(self, channel):
127
"""Called by the channel when its socket closes.
129
self.channels.remove(channel)
130
if self.queued_requests:
132
self._wake_up_channel()
135
def _make_channel(self):
136
# proxy2 203.17.154.69
138
return HttpChannel('82.211.81.161', 80, self)
139
# return HttpChannel('203.17.154.69', 8080, self)
140
# return HttpChannel('localhost', 8000, self)
143
def _wake_up_channel(self):
144
"""Try to wake up one channel to send the newly-added request.
146
There may be more than one request pending, and this may cause
147
more than one channel to take requests. That's OK; some of
148
them may be frustrated.
150
from random import shuffle, choice
152
# first, wake up any idle channels
154
for ch in self.channels:
155
if not ch.sent_requests:
159
debug("woke existing idle channel(s)")
162
if len(self.channels) < self.max_channels:
163
newch = self._make_channel()
164
self.channels.append(newch)
166
debug("created new channel")
169
if self.try_pipelined:
170
# ask existing channels to take it
171
debug("woke busy channel")
172
choice(self.channels).take_one()
175
debug("request left until a channel's idle")
181
"""Run until all outstanding requests have been served."""
182
#while self.running_requests or self.queued_requests \
183
# or not self.channel.is_idle():
184
# asyncore.loop(count=1)
189
class Response(object):
190
"""Holds in-flight response."""
194
def _parse_response_http10(header):
195
from cStringIO import StringIO
197
fp = StringIO(header)
200
r.status = fp.readline().split(" ", 2)
201
r.headers = mimetools.Message(fp)
203
# we can only(?) expect to do keepalive if we got either a
204
# content-length or chunked encoding; otherwise there's no way to know
205
# when the content ends apart from through the connection close
206
r.content_type = r.headers.get("content-type")
208
r.content_length = int(r.headers.get("content-length"))
209
except (ValueError, TypeError):
210
r.content_length = None
211
debug("seen content length of %r" % r.content_length)
213
r.transfer_encoding = r.headers.get("transfer-encoding")
214
r.content_encoding = r.headers.get("content-encoding")
215
r.connection_reply = r.headers.get("connection")
217
# TODO: pass status code to consumer?
219
if r.transfer_encoding:
220
raise NotImplementedError()
222
if r.transfer_encoding:
223
raise NotImplementedError()
225
if int(r.status[1]) != 200:
226
debug("can't handle response status %r" % r.status)
227
raise NotImplementedError()
229
if r.content_length == None:
230
raise NotImplementedError()
232
if r.content_length == 0:
233
raise NotImplementedError()
235
r.content_remaining = r.content_length
245
class HttpChannel(asyncore.dispatcher_with_send):
246
"""One http socket, pipelining if possible."""
247
# asynchronous http client
249
user_agent = "http_client.py 1.3ka (based on effbot)"
251
proxies = urllib.getproxies()
253
def __init__(self, ip_host, ip_port, manager):
254
asyncore.dispatcher_with_send.__init__(self)
255
self.manager = manager
257
# if a response header has been seen, this holds it
262
self.chunk_size = None
264
self.timestamp = time.time()
266
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
267
debug('connecting...')
268
self.connect((ip_host, ip_port))
270
# sent_requests holds (url, consumer)
271
self.sent_requests = []
277
return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
281
return (not self.sent_requests)
284
def handle_connect(self):
290
"""Accept one request from the manager if possible."""
291
if self.manager.try_pipelined:
292
if len(self.sent_requests) > 4:
295
if len(self.sent_requests) > 0:
299
url, consumer = self.manager.queued_requests.pop(0)
300
debug('request accepted by channel')
304
# TODO: If there are too many already in flight, don't take one.
305
# TODO: If the socket's not writable (tx buffer full), don't take.
306
self._push_request_http10(url, consumer)
310
def _push_request_http10(self, url, consumer):
311
"""Send a request, and add it to the outstanding queue."""
312
# TODO: check the url requested is appropriate for this connection
314
# TODO: If there are too many requests outstanding or (less likely) the
315
# connection fails, queue it for later use.
317
# TODO: Keep track of requests that have been sent but not yet fulfilled,
318
# because we might need to retransmit them if the connection fails. (Or
319
# should the caller do that?)
321
request = self._form_request_http10(url)
322
debug('send request for %s from %r' % (url, self))
324
# dispatcher_with_send handles buffering the data until it can
325
# be written, and hooks handle_write.
329
self.sent_requests.append((url, consumer))
332
def _form_request_http10(self, url):
333
# TODO: get right vhost name
335
"GET %s HTTP/1.0" % (url),
336
"Host: www.bazaar-ng.org",
339
if self.manager.try_keepalive or self.manager.try_pipelined:
342
"Connection: keep-alive",
345
# make sure to include a user agent
346
for header in request:
347
if string.lower(header).startswith("user-agent:"):
350
request.append("User-Agent: %s" % self.user_agent)
352
return string.join(request, "\r\n") + "\r\n\r\n"
355
def handle_read(self):
356
# handle incoming data
357
data = self.recv(2048)
359
self.data = self.data + data
362
debug('got %d bytes from socket' % len(data))
364
debug('server closed connection')
367
consumer = self.sent_requests[0][1]
368
if not self.response:
369
# do not have a full response header yet
371
# check if we've seen a full header
372
debug('getting header for %s' % self.sent_requests[0][0])
374
header = self.data.split("\r\n\r\n", 1)
377
header, self.data = header
379
self.response = _parse_response_http10(header)
380
self.content_remaining = self.response.content_length
385
# we now know how many (more) content bytes we have, and how much
386
# is in the data buffer. there are two main possibilities:
387
# too much data, and some must be left behind containing the next
388
# response headers, or too little, or possibly just right
390
want = self.content_remaining
392
got_data = self.data[:want]
393
self.data = self.data[want:]
397
debug('pass back %d bytes of %s' % (len(got_data),
398
self.sent_requests[0][0]))
401
self.content_remaining -= len(got_data)
403
if self.content_remaining == 0:
404
del self.sent_requests[0]
406
# reset lots of things and try to get the next response header
407
if self.response.connection_reply == 'close':
408
debug('server requested close')
409
self.manager._channel_closed(self)
411
elif not self.manager.try_keepalive:
412
debug('no keepalive for this socket')
413
self.manager._channel_closed(self)
416
debug("ready for next header...")
417
consumer.content_complete()
423
def handle_close(self):
424
debug('async told us of close on %r' % self)
425
# if there are outstanding requests should probably reopen and
426
# retransmit, but if we're not making any progress then give up
427
self.manager._channel_closed(self)
431
if __name__ == "__main__":
432
class dummy_consumer:
433
def __init__(self, url):
436
def feed(self, data):
437
# print "feed", repr(data)
438
# print "feed", repr(data[:20]), repr(data[-20:]), len(data)
441
def error(self, err_info):
443
traceback.print_exception(err_info[0], err_info[1], err_info[2])
445
def content_complete(self):
446
debug('content complete from %s' % self.url)
449
logging.basicConfig(level=logging.DEBUG)
451
mgr = DownloadManager()
453
from bzrlib.branch import Branch
454
revs = Branch('/home/mbp/work/bzr').revision_history()
458
# for url in ['http://www.bazaar-ng.org/',
459
# 'http://www.bazaar-ng.org/tutorial.html',
460
# 'http://www.bazaar-ng.org/download.html',
461
# 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
465
# url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/' \
466
url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
468
mgr.enqueue(url, dummy_consumer(url))