1088
by Martin Pool
- add experimental pipelined http client |
1 |
#! /usr/bin/python
|
2 |
||
3 |
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
|
|
4 |
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
|
|
5 |
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
|
|
6 |
#
|
|
7 |
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
|
|
8 |
#
|
|
9 |
# changes:
|
|
10 |
# 2004-08-26 fl unified http callback
|
|
11 |
# 2004-10-09 fl factored out gzip_consumer support
|
|
12 |
# 2005-07-08 mbp experimental support for keepalive connections
|
|
13 |
#
|
|
14 |
# Copyright (c) 2001-2004 by Fredrik Lundh. All rights reserved.
|
|
15 |
#
|
|
16 |
||
17 |
||
18 |
||
19 |
"""async/pipelined http client
|
|
20 |
||
21 |
Use
|
|
22 |
===
|
|
23 |
||
24 |
Users of this library pass in URLs they want to see, and consumer
|
|
25 |
objects that will receive the results at some point in the future.
|
|
26 |
Any number of requests may be queued up, and more may be added while
|
|
27 |
the download is in progress.
|
|
28 |
||
29 |
Requests can be both superscalar and superpipelined. That is to say,
|
|
30 |
for each server there can be multiple sockets open, and each socket
|
|
31 |
may have more than one request in flight.
|
|
32 |
||
33 |
Design
|
|
34 |
======
|
|
35 |
||
36 |
There is a single DownloadManager, and a connection object for each
|
|
37 |
open socket.
|
|
38 |
||
39 |
Request/consumer pairs are maintained in queues. Each connection has
|
|
40 |
a list of transmitted requests whose response has not yet been
|
|
41 |
received. There is also a per-server list of requests that have not
|
|
42 |
yet been submitted.
|
|
43 |
||
44 |
When a connection is ready to transmit a new request, it takes one
|
|
45 |
from the unsubmitted list, sends the request, and adds the request to
|
|
46 |
its unfulfilled list. This should happen when the connection has
|
|
47 |
space for more transmissions or when a new request is added by the
|
|
48 |
user. If the connection terminates with unfulfilled requests they are
|
|
49 |
put back onto the unsubmitted list, to be retried elsewhere.
|
|
50 |
||
51 |
Because responses come back precisely in order, the connection always
|
|
52 |
knows what it should expect next: the response for the next
|
|
53 |
unfulfilled request.
|
|
54 |
"""
|
|
55 |
||
56 |
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
|
|
57 |
# with a hostname does a remote DNS resolution, which is pretty sucky.
|
|
58 |
# Shouldn't there be a cache in glibc? We should probably cache the
|
|
59 |
# address in, say, the DownloadManager.
|
|
60 |
||
61 |
# TODO: A default consumer operation that writes the received data
|
|
62 |
# into a file; by default the file is named the same as the last
|
|
63 |
# component of the URL.
|
|
64 |
||
65 |
# TODO: A utility function that is given a list of URLs, and downloads
|
|
66 |
# them all parallel/pipelined. If any fail, it raises an exception
|
|
67 |
# (and discards the rest), or perhaps can be told to continue anyhow.
|
|
68 |
# The content is written into temporary files. It returns a list of
|
|
69 |
# readable file objects.
|
|
70 |
||
1093
by Martin Pool
- more experiments with http_client |
71 |
# TODO: If we try pipelined or keepalive and the connection drop out
|
72 |
# then retry the request on a new connection; eventually we should perhaps
|
|
73 |
# learn that a given host or network just won't allow keepalive.
|
|
74 |
||
75 |
||
1088
by Martin Pool
- add experimental pipelined http client |
76 |
import asyncore |
77 |
import socket, string, time, sys |
|
78 |
import StringIO |
|
79 |
import mimetools, urlparse, urllib |
|
80 |
import logging |
|
81 |
||
1093
by Martin Pool
- more experiments with http_client |
82 |
logging.basicConfig(level=logging.DEBUG, |
83 |
format='%(asctime)s %(levelname)s %(message)s', |
|
84 |
filename='/tmp/http_client.log', |
|
85 |
filemode='w') |
|
86 |
||
1088
by Martin Pool
- add experimental pipelined http client |
87 |
logger = logging.getLogger('bzr.http_client') |
88 |
debug = logger.debug |
|
89 |
info = logger.info |
|
90 |
error = logger.error |
|
91 |
||
92 |
||
93 |
##
|
|
94 |
# Close connection. Request handlers can raise this exception to
|
|
95 |
# indicate that the connection should be closed.
|
|
96 |
||
97 |
class CloseConnection(Exception): |
|
98 |
pass
|
|
99 |
||
100 |
##
|
|
101 |
# Redirect connection. Request handlers can raise this exception to
|
|
102 |
# indicate that the a new request should be issued.
|
|
103 |
||
104 |
class Redirect(CloseConnection): |
|
105 |
def __init__(self, location): |
|
106 |
self.location = location |
|
107 |
||
108 |
||
109 |
class DownloadManager(object): |
|
110 |
"""Handles pipelined/overlapped downloads.
|
|
111 |
||
112 |
Pass in a series of URLs with handlers to receive the response.
|
|
113 |
This object will spread the requests over however many sockets
|
|
114 |
seem useful.
|
|
115 |
||
116 |
queued_requests
|
|
117 |
Requests not assigned to any channel
|
|
118 |
||
119 |
running_requests
|
|
120 |
Currently assigned to a channel
|
|
121 |
"""
|
|
122 |
def __init__(self): |
|
123 |
self.queued_requests = [] |
|
124 |
# self.channel = HttpChannel('localhost', 8000, self)
|
|
125 |
self.channels = [] |
|
126 |
self.try_pipelined = False |
|
127 |
self.try_keepalive = False |
|
1093
by Martin Pool
- more experiments with http_client |
128 |
self.max_channels = 5 |
1088
by Martin Pool
- add experimental pipelined http client |
129 |
|
130 |
||
131 |
def enqueue(self, url, consumer): |
|
132 |
self.queued_requests.append((url, consumer)) |
|
133 |
self._wake_up_channel() |
|
134 |
||
135 |
||
136 |
def _channel_closed(self, channel): |
|
137 |
"""Called by the channel when its socket closes.
|
|
138 |
"""
|
|
139 |
self.channels.remove(channel) |
|
140 |
if self.queued_requests: |
|
141 |
# might recreate one
|
|
142 |
self._wake_up_channel() |
|
143 |
||
144 |
||
145 |
def _make_channel(self): |
|
146 |
# proxy2 203.17.154.69
|
|
1093
by Martin Pool
- more experiments with http_client |
147 |
# return HttpChannel('82.211.81.161', 80, self) # bazaar-ng.org
|
1088
by Martin Pool
- add experimental pipelined http client |
148 |
# return HttpChannel('203.17.154.69', 8080, self)
|
1093
by Martin Pool
- more experiments with http_client |
149 |
return HttpChannel('127.0.0.1', 8000, self) # forwarded |
1088
by Martin Pool
- add experimental pipelined http client |
150 |
|
151 |
||
152 |
def _wake_up_channel(self): |
|
153 |
"""Try to wake up one channel to send the newly-added request.
|
|
154 |
||
155 |
There may be more than one request pending, and this may cause
|
|
156 |
more than one channel to take requests. That's OK; some of
|
|
157 |
them may be frustrated.
|
|
158 |
"""
|
|
159 |
from random import shuffle, choice |
|
160 |
||
161 |
# first, wake up any idle channels
|
|
162 |
done = False |
|
163 |
for ch in self.channels: |
|
164 |
if not ch.sent_requests: |
|
165 |
ch.take_one() |
|
166 |
done = True |
|
167 |
if done: |
|
168 |
debug("woke existing idle channel(s)") |
|
169 |
return
|
|
170 |
||
171 |
if len(self.channels) < self.max_channels: |
|
172 |
newch = self._make_channel() |
|
173 |
self.channels.append(newch) |
|
174 |
newch.take_one() |
|
175 |
debug("created new channel") |
|
176 |
return
|
|
177 |
||
178 |
if self.try_pipelined: |
|
179 |
# ask existing channels to take it
|
|
180 |
debug("woke busy channel") |
|
181 |
choice(self.channels).take_one() |
|
182 |
||
183 |
||
1093
by Martin Pool
- more experiments with http_client |
184 |
# debug("request postponed until a channel's idle")
|
1088
by Martin Pool
- add experimental pipelined http client |
185 |
|
186 |
||
187 |
||
188 |
||
189 |
def run(self): |
|
190 |
"""Run until all outstanding requests have been served."""
|
|
191 |
#while self.running_requests or self.queued_requests \
|
|
192 |
# or not self.channel.is_idle():
|
|
193 |
# asyncore.loop(count=1)
|
|
194 |
asyncore.loop() |
|
195 |
||
196 |
||
197 |
||
198 |
class Response(object): |
|
199 |
"""Holds in-flight response."""
|
|
200 |
||
201 |
||
202 |
||
203 |
def _parse_response_http10(header): |
|
204 |
from cStringIO import StringIO |
|
205 |
||
206 |
fp = StringIO(header) |
|
207 |
r = Response() |
|
208 |
||
209 |
r.status = fp.readline().split(" ", 2) |
|
210 |
r.headers = mimetools.Message(fp) |
|
211 |
||
212 |
# we can only(?) expect to do keepalive if we got either a
|
|
213 |
# content-length or chunked encoding; otherwise there's no way to know
|
|
214 |
# when the content ends apart from through the connection close
|
|
215 |
r.content_type = r.headers.get("content-type") |
|
216 |
try: |
|
217 |
r.content_length = int(r.headers.get("content-length")) |
|
218 |
except (ValueError, TypeError): |
|
219 |
r.content_length = None |
|
220 |
debug("seen content length of %r" % r.content_length) |
|
221 |
||
222 |
r.transfer_encoding = r.headers.get("transfer-encoding") |
|
223 |
r.content_encoding = r.headers.get("content-encoding") |
|
224 |
r.connection_reply = r.headers.get("connection") |
|
225 |
||
226 |
# TODO: pass status code to consumer?
|
|
227 |
||
228 |
if r.transfer_encoding: |
|
229 |
raise NotImplementedError() |
|
230 |
||
231 |
if r.transfer_encoding: |
|
232 |
raise NotImplementedError() |
|
233 |
||
234 |
if int(r.status[1]) != 200: |
|
235 |
debug("can't handle response status %r" % r.status) |
|
236 |
raise NotImplementedError() |
|
237 |
||
238 |
if r.content_length == None: |
|
239 |
raise NotImplementedError() |
|
240 |
||
241 |
if r.content_length == 0: |
|
242 |
raise NotImplementedError() |
|
243 |
||
244 |
r.content_remaining = r.content_length |
|
245 |
||
246 |
return r |
|
247 |
||
248 |
||
249 |
||
250 |
||
251 |
||
252 |
||
253 |
||
254 |
class HttpChannel(asyncore.dispatcher_with_send): |
|
255 |
"""One http socket, pipelining if possible."""
|
|
256 |
# asynchronous http client
|
|
257 |
||
258 |
user_agent = "http_client.py 1.3ka (based on effbot)" |
|
259 |
||
260 |
proxies = urllib.getproxies() |
|
261 |
||
262 |
def __init__(self, ip_host, ip_port, manager): |
|
263 |
asyncore.dispatcher_with_send.__init__(self) |
|
264 |
self.manager = manager |
|
265 |
||
266 |
# if a response header has been seen, this holds it
|
|
267 |
self.response = None |
|
268 |
||
269 |
self.data = "" |
|
270 |
||
271 |
self.chunk_size = None |
|
272 |
||
273 |
self.timestamp = time.time() |
|
274 |
||
275 |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) |
|
276 |
debug('connecting...') |
|
277 |
self.connect((ip_host, ip_port)) |
|
278 |
||
279 |
# sent_requests holds (url, consumer)
|
|
280 |
self.sent_requests = [] |
|
281 |
||
282 |
self._outbuf = '' |
|
283 |
||
284 |
||
285 |
def __repr__(self): |
|
286 |
return 'HttpChannel(local_port=%r)' % (self.getsockname(),) |
|
287 |
||
288 |
||
289 |
def is_idle(self): |
|
290 |
return (not self.sent_requests) |
|
291 |
||
292 |
||
293 |
def handle_connect(self): |
|
294 |
debug("connected") |
|
295 |
self.take_one() |
|
296 |
||
297 |
||
298 |
def take_one(self): |
|
299 |
"""Accept one request from the manager if possible."""
|
|
300 |
if self.manager.try_pipelined: |
|
301 |
if len(self.sent_requests) > 4: |
|
302 |
return
|
|
303 |
else: |
|
304 |
if len(self.sent_requests) > 0: |
|
305 |
return
|
|
306 |
||
307 |
try: |
|
308 |
url, consumer = self.manager.queued_requests.pop(0) |
|
309 |
debug('request accepted by channel') |
|
310 |
except IndexError: |
|
311 |
return
|
|
312 |
||
313 |
# TODO: If there are too many already in flight, don't take one.
|
|
314 |
# TODO: If the socket's not writable (tx buffer full), don't take.
|
|
315 |
self._push_request_http10(url, consumer) |
|
316 |
||
317 |
||
318 |
||
319 |
def _push_request_http10(self, url, consumer): |
|
320 |
"""Send a request, and add it to the outstanding queue."""
|
|
321 |
# TODO: check the url requested is appropriate for this connection
|
|
322 |
||
323 |
# TODO: If there are too many requests outstanding or (less likely) the
|
|
324 |
# connection fails, queue it for later use.
|
|
325 |
||
326 |
# TODO: Keep track of requests that have been sent but not yet fulfilled,
|
|
327 |
# because we might need to retransmit them if the connection fails. (Or
|
|
328 |
# should the caller do that?)
|
|
329 |
||
330 |
request = self._form_request_http10(url) |
|
331 |
debug('send request for %s from %r' % (url, self)) |
|
332 |
||
333 |
# dispatcher_with_send handles buffering the data until it can
|
|
334 |
# be written, and hooks handle_write.
|
|
335 |
||
336 |
self.send(request) |
|
337 |
||
338 |
self.sent_requests.append((url, consumer)) |
|
339 |
||
340 |
||
341 |
def _form_request_http10(self, url): |
|
342 |
# TODO: get right vhost name
|
|
343 |
request = [ |
|
344 |
"GET %s HTTP/1.0" % (url), |
|
345 |
"Host: www.bazaar-ng.org", |
|
346 |
]
|
|
347 |
||
348 |
if self.manager.try_keepalive or self.manager.try_pipelined: |
|
349 |
request.extend([ |
|
350 |
"Keep-Alive: 60", |
|
351 |
"Connection: keep-alive", |
|
352 |
])
|
|
353 |
||
354 |
# make sure to include a user agent
|
|
355 |
for header in request: |
|
356 |
if string.lower(header).startswith("user-agent:"): |
|
357 |
break
|
|
358 |
else: |
|
359 |
request.append("User-Agent: %s" % self.user_agent) |
|
360 |
||
361 |
return string.join(request, "\r\n") + "\r\n\r\n" |
|
362 |
||
363 |
||
364 |
def handle_read(self): |
|
365 |
# handle incoming data
|
|
366 |
data = self.recv(2048) |
|
367 |
||
368 |
self.data = self.data + data |
|
369 |
||
370 |
if len(data): |
|
371 |
debug('got %d bytes from socket' % len(data)) |
|
372 |
else: |
|
373 |
debug('server closed connection') |
|
374 |
||
375 |
while self.data: |
|
376 |
consumer = self.sent_requests[0][1] |
|
377 |
if not self.response: |
|
378 |
# do not have a full response header yet
|
|
379 |
||
380 |
# check if we've seen a full header
|
|
381 |
debug('getting header for %s' % self.sent_requests[0][0]) |
|
382 |
||
383 |
header = self.data.split("\r\n\r\n", 1) |
|
384 |
if len(header) <= 1: |
|
385 |
return
|
|
386 |
header, self.data = header |
|
387 |
||
388 |
self.response = _parse_response_http10(header) |
|
389 |
self.content_remaining = self.response.content_length |
|
390 |
||
391 |
if not self.data: |
|
392 |
return
|
|
393 |
||
394 |
# we now know how many (more) content bytes we have, and how much
|
|
395 |
# is in the data buffer. there are two main possibilities:
|
|
396 |
# too much data, and some must be left behind containing the next
|
|
397 |
# response headers, or too little, or possibly just right
|
|
398 |
||
399 |
want = self.content_remaining |
|
400 |
if want > 0: |
|
401 |
got_data = self.data[:want] |
|
402 |
self.data = self.data[want:] |
|
403 |
||
404 |
assert got_data |
|
405 |
||
1093
by Martin Pool
- more experiments with http_client |
406 |
self.content_remaining -= len(got_data) |
407 |
||
408 |
debug('pass back %d bytes of %s, %d remain' |
|
409 |
% (len(got_data), |
|
410 |
self.sent_requests[0][0], |
|
411 |
self.content_remaining)) |
|
1088
by Martin Pool
- add experimental pipelined http client |
412 |
consumer.feed(data) |
413 |
||
414 |
if self.content_remaining == 0: |
|
415 |
del self.sent_requests[0] |
|
416 |
||
1093
by Martin Pool
- more experiments with http_client |
417 |
debug('content complete') |
418 |
consumer.content_complete() |
|
419 |
||
1088
by Martin Pool
- add experimental pipelined http client |
420 |
# reset lots of things and try to get the next response header
|
421 |
if self.response.connection_reply == 'close': |
|
422 |
debug('server requested close') |
|
423 |
self.manager._channel_closed(self) |
|
424 |
self.close() |
|
425 |
elif not self.manager.try_keepalive: |
|
426 |
debug('no keepalive for this socket') |
|
427 |
self.manager._channel_closed(self) |
|
428 |
self.close() |
|
429 |
else: |
|
430 |
debug("ready for next header...") |
|
431 |
self.take_one() |
|
432 |
self.response = None |
|
433 |
||
434 |
||
435 |
||
436 |
def handle_close(self): |
|
437 |
debug('async told us of close on %r' % self) |
|
438 |
# if there are outstanding requests should probably reopen and
|
|
439 |
# retransmit, but if we're not making any progress then give up
|
|
440 |
self.manager._channel_closed(self) |
|
441 |
self.close() |
|
442 |
||
443 |
||
1093
by Martin Pool
- more experiments with http_client |
444 |
class DummyConsumer: |
445 |
def __init__(self, url, pb): |
|
446 |
self.url = url |
|
447 |
self.outf = None |
|
448 |
self._pb = pb |
|
449 |
||
450 |
def feed(self, data): |
|
451 |
# print "feed", repr(data)
|
|
452 |
# print "feed", repr(data[:20]), repr(data[-20:]), len(data)
|
|
453 |
if not self.outf: |
|
454 |
base = self.url[self.url.rindex('/')+1:] |
|
455 |
self.outf = file('/tmp/download/' + base, 'wb') |
|
456 |
self.outf.write(data) |
|
457 |
||
458 |
def error(self, err_info): |
|
459 |
import traceback |
|
460 |
error('error reported to consumer') |
|
461 |
traceback.print_exception(err_info[0], err_info[1], err_info[2]) |
|
462 |
sys.exit(1) |
|
463 |
||
464 |
def content_complete(self): |
|
465 |
info('content complete from %s' % self.url) |
|
466 |
self.outf.close() |
|
467 |
self.outf = None |
|
468 |
# using last_cnt is cheating
|
|
469 |
self._pb.update('downloading inventory', |
|
470 |
self._pb.last_cnt+1, |
|
471 |
self._pb.last_total) |
|
472 |
||
473 |
||
474 |
||
1088
by Martin Pool
- add experimental pipelined http client |
475 |
if __name__ == "__main__": |
476 |
logging.basicConfig(level=logging.DEBUG) |
|
477 |
||
478 |
mgr = DownloadManager() |
|
479 |
||
480 |
from bzrlib.branch import Branch |
|
1093
by Martin Pool
- more experiments with http_client |
481 |
from bzrlib.progress import ProgressBar |
482 |
||
483 |
pb = ProgressBar() |
|
1088
by Martin Pool
- add experimental pipelined http client |
484 |
revs = Branch('/home/mbp/work/bzr').revision_history() |
1093
by Martin Pool
- more experiments with http_client |
485 |
pb.update('downloading inventories', 0, len(revs)) |
486 |
||
487 |
for rev in revs: |
|
488 |
url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \ |
|
489 |
+ rev + '.gz' |
|
490 |
mgr.enqueue(url, DummyConsumer(url, pb)) |
|
491 |
||
492 |
mgr.run() |
|
493 |
||
494 |
||
495 |
||
1088
by Martin Pool
- add experimental pipelined http client |
496 |
|
497 |
# for url in ['http://www.bazaar-ng.org/',
|
|
498 |
# 'http://www.bazaar-ng.org/tutorial.html',
|
|
499 |
# 'http://www.bazaar-ng.org/download.html',
|
|
500 |
# 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
|
|
501 |
# ]:
|