~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to effbot/org/http_client.py

  • Committer: John Arbash Meinel
  • Date: 2005-09-14 20:49:08 UTC
  • mto: (1185.11.1)
  • mto: This revision was merged to the branch mainline in revision 1396.
  • Revision ID: john@arbash-meinel.com-20050914204908-766c55c17ec9ae4e
Trying to get pipelined http library working + tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    def __init__(self, location):
37
37
        self.location = location
38
38
 
39
 
class Request(object):
40
 
    """This keeps track of all the information for a single request.
41
 
    """
 
39
##
 
40
# Asynchronous HTTP/1.1 client.
 
41
 
 
42
class async_http(asyncore.dispatcher_with_send):
 
43
    # asynchronous http client
42
44
 
43
45
    user_agent = "http_client.py 1.2 (http://effbot.org/zone)"
44
46
    http_version = "1.1"
 
47
 
45
48
    proxies = urllib.getproxies()
46
49
 
47
50
    def __init__(self, uri, consumer, extra_headers=None):
48
 
        self.consumer = consumer
 
51
        asyncore.dispatcher_with_send.__init__(self)
 
52
 
49
53
        # turn the uri into a valid request
50
54
        scheme, host, path, params, query, fragment = urlparse.urlparse(uri)
51
55
 
52
 
        self.scheme = scheme
 
56
        # use origin host
53
57
        self.host = host
54
58
 
55
59
        # get proxy settings, if any
56
60
        proxy = self.proxies.get(scheme)
57
 
        self.proxy = proxy
 
61
        if proxy:
 
62
            scheme, host, x, x, x, x = urlparse.urlparse(proxy)
 
63
 
 
64
        assert scheme == "http", "only supports HTTP requests (%s)" % scheme
58
65
 
59
66
        if not path:
60
67
            path = "/"
67
74
 
68
75
        self.path = path
69
76
 
70
 
        # It turns out Content-Length isn't sufficient
71
 
        # to allow pipelining. Simply required.
72
 
        # So we will be extra stingy, and require the
73
 
        # response to also be HTTP/1.1 to enable pipelining
74
 
        self.http_1_1 = False
 
77
        # get port number
 
78
        try:
 
79
            host, port = host.split(":", 1)
 
80
            port = int(port)
 
81
        except (TypeError, ValueError):
 
82
            port = 80 # default port
 
83
 
 
84
        self.consumer = consumer
 
85
 
75
86
        self.status = None
76
87
        self.header = None
77
88
 
90
101
        self.timestamp = time.time()
91
102
 
92
103
        self.extra_headers = extra_headers
93
 
        self.requested = False
94
 
 
95
 
    def http_request(self, conn):
96
 
        """Place the actual http request on the server"""
 
104
 
 
105
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
106
        try:
 
107
            self.connect((host, port))
 
108
        except socket.error:
 
109
            self.consumer.http(0, self, sys.exc_info())
 
110
 
 
111
    def handle_connect(self):
97
112
        # connection succeeded
98
113
 
99
114
        request = [
101
116
            "Host: %s" % self.host,
102
117
            ]
103
118
 
104
 
        if False and GzipConsumer:
 
119
        if GzipConsumer:
105
120
            request.append("Accept-Encoding: gzip")
106
121
 
107
122
        if self.extra_headers:
116
131
 
117
132
        request = string.join(request, "\r\n") + "\r\n\r\n"
118
133
 
119
 
        conn.send(request)
 
134
        self.send(request)
120
135
 
121
136
        self.bytes_out = self.bytes_out + len(request)
122
 
        self.requested = True
123
 
 
124
 
    def add_data(self, data):
125
 
        """Some data has been downloaded, let the consumer know.
126
 
        :return: True- download is completed, there may be extra data in self.data which
127
 
                       should be transfered to the next item
128
 
                 False- download failed
129
 
                 None- continue downloading
130
 
        """
 
137
 
 
138
    def handle_expt(self):
 
139
        # connection failed (windows); notify consumer
 
140
 
 
141
        if sys.platform == "win32":
 
142
            self.close()
 
143
            self.consumer.http(0, self)
 
144
 
 
145
    def handle_read(self):
 
146
        # handle incoming data
 
147
 
 
148
        data = self.recv(2048)
131
149
 
132
150
        self.data = self.data + data
133
151
        self.bytes_in = self.bytes_in + len(data)
147
165
                self.status = fp.readline().split(" ", 2)
148
166
                self.header = mimetools.Message(fp)
149
167
 
150
 
                if self.status[0] == 'HTTP/1.1':
151
 
                    self.http_1_1 = True
152
 
                else:
153
 
                    self.http_1_1 = False
154
 
 
155
168
                # get http headers
156
169
                self.content_type = self.header.get("content-type")
157
170
                try:
160
173
                        )
161
174
                except (ValueError, TypeError):
162
175
                    self.content_length = None
163
 
                self.original_content_length = self.content_length
164
176
                self.transfer_encoding = self.header.get("transfer-encoding")
165
177
                self.content_encoding = self.header.get("content-encoding")
166
178
 
167
 
                # if self.content_encoding == "gzip":
168
 
                #     # FIXME: report error if GzipConsumer is not available
169
 
                #     self.consumer = GzipConsumer(self.consumer)
 
179
                if self.content_encoding == "gzip":
 
180
                    # FIXME: report error if GzipConsumer is not available
 
181
                    self.consumer = GzipConsumer(self.consumer)
170
182
 
171
183
                try:
172
184
                    self.consumer.http(1, self)
176
188
                        do_request(
177
189
                            v.location, self.consumer, self.extra_headers
178
190
                            )
179
 
                    return True
 
191
                    self.close()
 
192
                    return
180
193
                except CloseConnection:
181
 
                    return False
 
194
                    self.close()
 
195
                    return
182
196
 
183
197
            if self.transfer_encoding == "chunked" and self.chunk_size is None:
184
198
 
196
210
                    if self.chunk_size <= 0:
197
211
                        raise ValueError
198
212
                except ValueError:
199
 
                    self.consumer.close()
200
 
                    return False
 
213
                    return self.handle_close()
201
214
 
202
215
            if not self.data:
203
216
                return
207
220
 
208
221
            chunk_size = self.chunk_size or len(data)
209
222
 
210
 
            # Make sure to only feed the consumer whatever is left for
211
 
            # this file.
212
 
            if self.content_length:
213
 
                if chunk_size > self.content_length:
214
 
                    chunk_size = self.content_length
215
 
 
216
223
            if chunk_size < len(data):
217
224
                self.data = data[chunk_size:]
218
225
                data = data[:chunk_size]
228
235
            if self.content_length:
229
236
                self.content_length -= chunk_size
230
237
                if self.content_length <= 0:
231
 
                    self.consumer.close()
232
 
                    return True
233
 
 
234
 
 
235
 
##
236
 
# Asynchronous HTTP/1.1 client.
237
 
 
238
 
class async_http(asyncore.dispatcher_with_send):
239
 
    """Asynchronous HTTP client.
240
 
    This client keeps a queue of files to download, and
241
 
    tries to asynchronously (and pipelined) download them,
242
 
    alerting the consumer as bits are downloaded.
243
 
    """
244
 
 
245
 
    max_requests = 4
246
 
 
247
 
    def __init__(self, scheme, host):
248
 
        """Connect to the given host, extra requests are made on the
249
 
        add_request member function.
250
 
 
251
 
        :param scheme: The connection method, such as http/https, currently
252
 
                       we only support http
253
 
        :param host:   The host to connect to, either a proxy, or the actual host.
254
 
        """
255
 
        asyncore.dispatcher_with_send.__init__(self)
256
 
 
257
 
        # use origin host
258
 
        self.scheme = scheme
259
 
        self.host = host
260
 
 
261
 
        assert scheme == "http", "only supports HTTP requests (%s)" % scheme
262
 
 
263
 
        self._connected = False
264
 
        self._queue = []
265
 
        self._current = None
266
 
 
267
 
    def _connect(self):
268
 
        # get port number
269
 
        host = self.host
270
 
        try:
271
 
            host, port = self.host.split(":", 1)
272
 
            port = int(port)
273
 
        except (TypeError, ValueError):
274
 
            port = 80 # default port
275
 
 
276
 
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
277
 
        try:
278
 
            self.connect((host, port))
279
 
            self._connected = True
280
 
        except socket.error:
281
 
            self.handle_error()
282
 
 
283
 
    def _close(self):
284
 
        if self._connected:
285
 
            self.close()
286
 
        self._connected = False
287
 
 
288
 
    def _request_next(self):
289
 
        extra_data = None
290
 
        if self._current:
291
 
            extra_data = self._current.data
292
 
        if len(self._queue) > 0:
293
 
            self._current = self._queue.pop(0)
294
 
            if not self._current.requested:
295
 
                self._current.http_request(self)
296
 
            if extra_data:
297
 
                self._update_current(extra_data)
298
 
        else:
299
 
            # TODO: Consider some sort of delayed closing,
300
 
            # rather than closing the instant the
301
 
            # queue is empty. But I don't know any way
302
 
            # under async_core to be alerted at a later time.
303
 
            # If this were a thread, I would sleep, waking
304
 
            # up to check for more work, and eventually timing
305
 
            # out and disconnecting.
306
 
            self._close()
307
 
 
308
 
    def _update_current(self, data):
309
 
        res = self._current.add_data(data)
310
 
        if res is None:
311
 
            # Downloading is continuing
312
 
            if self._current.original_content_length and self._current.http_1_1:
313
 
                # We can pipeline our requests, since we
314
 
                # are getting a content_length count back
315
 
                for i, r in enumerate(self._queue[:self.max_requests-1]):
316
 
                    if not r.requested:
317
 
                        r.http_request(self)
318
 
            return
319
 
        if res:
320
 
            # We finished downloading the last file
321
 
            self._request_next()
322
 
        else:
323
 
            # There was a failure
324
 
            self.handle_error()
325
 
 
326
 
    def add_request(self, request):
327
 
        """Add a new Request into the queue."""
328
 
        self._queue.append(request)
329
 
        if not self._connected:
330
 
            self._connect()
331
 
 
332
 
    def handle_connect(self):
333
 
        self._request_next()
334
 
 
335
 
    def handle_expt(self):
336
 
        # connection failed (windows); notify consumer
337
 
        assert self._current
338
 
 
339
 
        if sys.platform == "win32":
340
 
            self._close()
341
 
            self._current.consumer.http(0, self._current)
342
 
 
343
 
    def handle_read(self):
344
 
        # handle incoming data
345
 
        assert self._current
346
 
 
347
 
        data = self.recv(2048)
348
 
 
349
 
        self._update_current(data)
350
 
 
 
238
                    return self.handle_close()
351
239
 
352
240
    def handle_close(self):
353
 
        """When does this event occur? Is it okay to start the next entry in the queue
354
 
        (possibly reconnecting), or is this an indication that we should stop?
355
 
        """
356
 
        if self._current:
357
 
            self._current.consumer.close()
358
 
        self._close()
359
 
        if len(self._queue) > 0:
360
 
            self._connect()
 
241
        self.consumer.close()
 
242
        self.close()
361
243
 
362
244
    def handle_error(self):
363
 
        if self._current:
364
 
            self._current.consumer.http(0, self._current, sys.exc_info())
365
 
        # Should this be broadcast to all other items waiting in the queue?
366
 
        self._close()
367
 
 
368
 
_connections = {}
 
245
        self.consumer.http(0, self, sys.exc_info())
 
246
        self.close()
369
247
 
370
248
def do_request(uri, consumer, extra_headers=None):
371
 
    global _connections
372
 
    request = Request(uri, consumer, extra_headers)
373
 
 
374
 
    scheme = request.scheme
375
 
    host = request.host
376
 
    if request.proxy:
377
 
        host = request.proxy
378
 
    key = (scheme, host)
379
 
    if not _connections.has_key(key):
380
 
        _connections[key] = async_http(scheme, host)
381
 
 
382
 
    _connections[key].add_request(request)
383
 
 
384
 
    return request
 
249
 
 
250
    return async_http(uri, consumer, extra_headers)
385
251
 
386
252
if __name__ == "__main__":
387
253
    class dummy_consumer:
390
256
            print "feed", repr(data[:20]), repr(data[-20:]), len(data)
391
257
        def close(self):
392
258
            print "close"
393
 
        def http(self, ok, connection, *args, **kwargs):
394
 
            print ok, connection, args, kwargs
 
259
        def http(self, ok, connection, **args):
 
260
            print ok, connection, args
395
261
            print "status", connection.status
396
262
            print "header", connection.header
397
 
    if len(sys.argv) < 2:
398
 
        do_request('http://www.cnn.com/', dummy_consumer())
399
 
    else:
400
 
        for url in sys.argv[1:]:
401
 
            do_request(url, dummy_consumer())
 
263
    try:
 
264
        url = sys.argv[1]
 
265
    except IndexError:
 
266
        url = "http://www.cnn.com/"
 
267
    do_request(url, dummy_consumer())
402
268
    asyncore.loop()