36
36
def __init__(self, location):
37
37
self.location = location
39
class Request(object):
40
"""This keeps track of all the information for a single request.
40
# Asynchronous HTTP/1.1 client.
42
class async_http(asyncore.dispatcher_with_send):
43
# asynchronous http client
43
45
user_agent = "http_client.py 1.2 (http://effbot.org/zone)"
44
46
http_version = "1.1"
45
48
proxies = urllib.getproxies()
47
50
def __init__(self, uri, consumer, extra_headers=None):
48
self.consumer = consumer
51
asyncore.dispatcher_with_send.__init__(self)
49
53
# turn the uri into a valid request
50
54
scheme, host, path, params, query, fragment = urlparse.urlparse(uri)
55
59
# get proxy settings, if any
56
60
proxy = self.proxies.get(scheme)
62
scheme, host, x, x, x, x = urlparse.urlparse(proxy)
64
assert scheme == "http", "only supports HTTP requests (%s)" % scheme
117
132
request = string.join(request, "\r\n") + "\r\n\r\n"
121
136
self.bytes_out = self.bytes_out + len(request)
122
self.requested = True
124
def add_data(self, data):
125
"""Some data has been downloaded, let the consumer know.
126
:return: True- download is completed, there may be extra data in self.data which
127
should be transfered to the next item
128
False- download failed
129
None- continue downloading
138
def handle_expt(self):
139
# connection failed (windows); notify consumer
141
if sys.platform == "win32":
143
self.consumer.http(0, self)
145
def handle_read(self):
146
# handle incoming data
148
data = self.recv(2048)
132
150
self.data = self.data + data
133
151
self.bytes_in = self.bytes_in + len(data)
161
174
except (ValueError, TypeError):
162
175
self.content_length = None
163
self.original_content_length = self.content_length
164
176
self.transfer_encoding = self.header.get("transfer-encoding")
165
177
self.content_encoding = self.header.get("content-encoding")
167
# if self.content_encoding == "gzip":
168
# # FIXME: report error if GzipConsumer is not available
169
# self.consumer = GzipConsumer(self.consumer)
179
if self.content_encoding == "gzip":
180
# FIXME: report error if GzipConsumer is not available
181
self.consumer = GzipConsumer(self.consumer)
172
184
self.consumer.http(1, self)
228
235
if self.content_length:
229
236
self.content_length -= chunk_size
230
237
if self.content_length <= 0:
231
self.consumer.close()
236
# Asynchronous HTTP/1.1 client.
238
class async_http(asyncore.dispatcher_with_send):
239
"""Asynchronous HTTP client.
240
This client keeps a queue of files to download, and
241
tries to asynchronously (and pipelined) download them,
242
alerting the consumer as bits are downloaded.
247
def __init__(self, scheme, host):
248
"""Connect to the given host, extra requests are made on the
249
add_request member function.
251
:param scheme: The connection method, such as http/https, currently
253
:param host: The host to connect to, either a proxy, or the actual host.
255
asyncore.dispatcher_with_send.__init__(self)
261
assert scheme == "http", "only supports HTTP requests (%s)" % scheme
263
self._connected = False
271
host, port = self.host.split(":", 1)
273
except (TypeError, ValueError):
274
port = 80 # default port
276
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
278
self.connect((host, port))
279
self._connected = True
286
self._connected = False
288
def _request_next(self):
291
extra_data = self._current.data
292
if len(self._queue) > 0:
293
self._current = self._queue.pop(0)
294
if not self._current.requested:
295
self._current.http_request(self)
297
self._update_current(extra_data)
299
# TODO: Consider some sort of delayed closing,
300
# rather than closing the instant the
301
# queue is empty. But I don't know any way
302
# under async_core to be alerted at a later time.
303
# If this were a thread, I would sleep, waking
304
# up to check for more work, and eventually timing
305
# out and disconnecting.
308
def _update_current(self, data):
309
res = self._current.add_data(data)
311
# Downloading is continuing
312
if self._current.original_content_length and self._current.http_1_1:
313
# We can pipeline our requests, since we
314
# are getting a content_length count back
315
for i, r in enumerate(self._queue[:self.max_requests-1]):
320
# We finished downloading the last file
323
# There was a failure
326
def add_request(self, request):
327
"""Add a new Request into the queue."""
328
self._queue.append(request)
329
if not self._connected:
332
def handle_connect(self):
335
def handle_expt(self):
336
# connection failed (windows); notify consumer
339
if sys.platform == "win32":
341
self._current.consumer.http(0, self._current)
343
def handle_read(self):
344
# handle incoming data
347
data = self.recv(2048)
349
self._update_current(data)
238
return self.handle_close()
352
240
def handle_close(self):
353
"""When does this event occur? Is it okay to start the next entry in the queue
354
(possibly reconnecting), or is this an indication that we should stop?
357
self._current.consumer.close()
359
if len(self._queue) > 0:
241
self.consumer.close()
362
244
def handle_error(self):
364
self._current.consumer.http(0, self._current, sys.exc_info())
365
# Should this be broadcast to all other items waiting in the queue?
245
self.consumer.http(0, self, sys.exc_info())
370
248
def do_request(uri, consumer, extra_headers=None):
372
request = Request(uri, consumer, extra_headers)
374
scheme = request.scheme
379
if not _connections.has_key(key):
380
_connections[key] = async_http(scheme, host)
382
_connections[key].add_request(request)
250
return async_http(uri, consumer, extra_headers)
386
252
if __name__ == "__main__":
387
253
class dummy_consumer:
390
256
print "feed", repr(data[:20]), repr(data[-20:]), len(data)
393
def http(self, ok, connection, *args, **kwargs):
394
print ok, connection, args, kwargs
259
def http(self, ok, connection, **args):
260
print ok, connection, args
395
261
print "status", connection.status
396
262
print "header", connection.header
397
if len(sys.argv) < 2:
398
do_request('http://www.cnn.com/', dummy_consumer())
400
for url in sys.argv[1:]:
401
do_request(url, dummy_consumer())
266
url = "http://www.cnn.com/"
267
do_request(url, dummy_consumer())