~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to tools/http_client.py

  • Committer: Martin Pool
  • Date: 2005-08-19 22:42:40 UTC
  • Revision ID: mbp@sourcefrog.net-20050819224240-88a85f0218e61690
- add experimental pipelined http client

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/python
 
2
 
 
3
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
 
4
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
 
5
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
 
6
#
 
7
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
 
8
#
 
9
# changes:
 
10
# 2004-08-26 fl   unified http callback
 
11
# 2004-10-09 fl   factored out gzip_consumer support
 
12
# 2005-07-08 mbp  experimental support for keepalive connections
 
13
#
 
14
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
 
15
#
 
16
 
 
17
 
 
18
 
 
19
"""async/pipelined http client
 
20
 
 
21
Use
 
22
===
 
23
 
 
24
Users of this library pass in URLs they want to see, and consumer
 
25
objects that will receive the results at some point in the future.
 
26
Any number of requests may be queued up, and more may be added while
 
27
the download is in progress.
 
28
 
 
29
Requests can be both superscalar and superpipelined.  That is to say,
 
30
for each server there can be multiple sockets open, and each socket
 
31
may have more than one request in flight.
 
32
 
 
33
Design
 
34
======
 
35
 
 
36
There is a single DownloadManager, and a connection object for each
 
37
open socket.
 
38
 
 
39
Request/consumer pairs are maintained in queues.  Each connection has
 
40
a list of transmitted requests whose response has not yet been
 
41
received.  There is also a per-server list of requests that have not
 
42
yet been submitted.
 
43
 
 
44
When a connection is ready to transmit a new request, it takes one
 
45
from the unsubmitted list, sends the request, and adds the request to
 
46
its unfulfilled list.  This should happen when the connection has
 
47
space for more transmissions or when a new request is added by the
 
48
user.  If the connection terminates with unfulfilled requests they are
 
49
put back onto the unsubmitted list, to be retried elsewhere.
 
50
 
 
51
Because responses come back precisely in order, the connection always
 
52
knows what it should expect next: the response for the next
 
53
unfulfilled request.
 
54
"""
 
55
 
 
56
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
 
57
# with a hostname does a remote DNS resolution, which is pretty sucky.
 
58
# Shouldn't there be a cache in glibc?  We should probably cache the
 
59
# address in, say, the DownloadManager.
 
60
 
 
61
# TODO: A default consumer operation that writes the received data
 
62
# into a file; by default the file is named the same as the last
 
63
# component of the URL.
 
64
 
 
65
# TODO: A utility function that is given a list of URLs, and downloads
 
66
# them all parallel/pipelined.  If any fail, it raises an exception
 
67
# (and discards the rest), or perhaps can be told to continue anyhow.
 
68
# The content is written into temporary files.  It returns a list of
 
69
# readable file objects.
 
70
 
 
71
import asyncore
 
72
import socket, string, time, sys
 
73
import StringIO
 
74
import mimetools, urlparse, urllib
 
75
import logging
 
76
 
 
77
logger = logging.getLogger('bzr.http_client')
 
78
debug = logger.debug
 
79
info = logger.info
 
80
error = logger.error
 
81
 
 
82
 
 
83
##
 
84
# Close connection.   Request handlers can raise this exception to
 
85
# indicate that the connection should be closed.
 
86
 
 
87
class CloseConnection(Exception):
 
88
    pass
 
89
 
 
90
##
 
91
# Redirect connection.  Request handlers can raise this exception to
 
92
# indicate that the a new request should be issued.
 
93
 
 
94
class Redirect(CloseConnection):
 
95
    def __init__(self, location):
 
96
        self.location = location
 
97
 
 
98
 
 
99
class DownloadManager(object):
 
100
    """Handles pipelined/overlapped downloads.
 
101
 
 
102
    Pass in a series of URLs with handlers to receive the response.
 
103
    This object will spread the requests over however many sockets
 
104
    seem useful.
 
105
 
 
106
    queued_requests
 
107
        Requests not assigned to any channel
 
108
 
 
109
    running_requests
 
110
        Currently assigned to a channel
 
111
    """
 
112
    def __init__(self):
 
113
        self.queued_requests = []
 
114
        # self.channel = HttpChannel('localhost', 8000, self)
 
115
        self.channels = []
 
116
        self.try_pipelined = False
 
117
        self.try_keepalive = False
 
118
        self.max_channels = 3
 
119
 
 
120
 
 
121
    def enqueue(self, url, consumer):
 
122
        self.queued_requests.append((url, consumer))
 
123
        self._wake_up_channel()
 
124
 
 
125
 
 
126
    def _channel_closed(self, channel):
 
127
        """Called by the channel when its socket closes.
 
128
        """
 
129
        self.channels.remove(channel)
 
130
        if self.queued_requests:
 
131
            # might recreate one
 
132
            self._wake_up_channel()
 
133
 
 
134
 
 
135
    def _make_channel(self):
 
136
        # proxy2 203.17.154.69
 
137
        # bazaar-ng.org 
 
138
        return HttpChannel('82.211.81.161', 80, self)
 
139
        # return HttpChannel('203.17.154.69', 8080, self)
 
140
        # return HttpChannel('localhost', 8000, self)
 
141
            
 
142
 
 
143
    def _wake_up_channel(self):
 
144
        """Try to wake up one channel to send the newly-added request.
 
145
 
 
146
        There may be more than one request pending, and this may cause
 
147
        more than one channel to take requests.  That's OK; some of
 
148
        them may be frustrated.
 
149
        """
 
150
        from random import shuffle, choice
 
151
        
 
152
        # first, wake up any idle channels
 
153
        done = False
 
154
        for ch in self.channels:
 
155
            if not ch.sent_requests:
 
156
                ch.take_one()
 
157
                done = True
 
158
        if done:
 
159
            debug("woke existing idle channel(s)")
 
160
            return
 
161
 
 
162
        if len(self.channels) < self.max_channels:
 
163
            newch = self._make_channel()
 
164
            self.channels.append(newch)
 
165
            newch.take_one()
 
166
            debug("created new channel")
 
167
            return
 
168
 
 
169
        if self.try_pipelined:
 
170
            # ask existing channels to take it
 
171
            debug("woke busy channel")
 
172
            choice(self.channels).take_one()
 
173
 
 
174
 
 
175
        debug("request left until a channel's idle")
 
176
        
 
177
 
 
178
 
 
179
 
 
180
    def run(self):
 
181
        """Run until all outstanding requests have been served."""
 
182
        #while self.running_requests or self.queued_requests \
 
183
        #          or not self.channel.is_idle():
 
184
        #    asyncore.loop(count=1)
 
185
        asyncore.loop()
 
186
 
 
187
 
 
188
 
 
189
class Response(object):
 
190
    """Holds in-flight response."""
 
191
 
 
192
 
 
193
 
 
194
def _parse_response_http10(header):
 
195
    from cStringIO import StringIO
 
196
 
 
197
    fp = StringIO(header)
 
198
    r = Response()
 
199
 
 
200
    r.status = fp.readline().split(" ", 2)
 
201
    r.headers = mimetools.Message(fp)
 
202
 
 
203
    # we can only(?) expect to do keepalive if we got either a 
 
204
    # content-length or chunked encoding; otherwise there's no way to know
 
205
    # when the content ends apart from through the connection close
 
206
    r.content_type = r.headers.get("content-type")
 
207
    try:
 
208
        r.content_length = int(r.headers.get("content-length"))
 
209
    except (ValueError, TypeError):
 
210
        r.content_length = None
 
211
    debug("seen content length of %r" % r.content_length)
 
212
 
 
213
    r.transfer_encoding = r.headers.get("transfer-encoding")
 
214
    r.content_encoding = r.headers.get("content-encoding")
 
215
    r.connection_reply = r.headers.get("connection")
 
216
 
 
217
    # TODO: pass status code to consumer?
 
218
 
 
219
    if r.transfer_encoding:
 
220
        raise NotImplementedError()
 
221
 
 
222
    if r.transfer_encoding:
 
223
        raise NotImplementedError()
 
224
 
 
225
    if int(r.status[1]) != 200:
 
226
        debug("can't handle response status %r" % r.status)
 
227
        raise NotImplementedError()
 
228
 
 
229
    if r.content_length == None:
 
230
        raise NotImplementedError()
 
231
 
 
232
    if r.content_length == 0:
 
233
        raise NotImplementedError()
 
234
 
 
235
    r.content_remaining = r.content_length                
 
236
 
 
237
    return r
 
238
 
 
239
 
 
240
    
 
241
    
 
242
        
 
243
 
 
244
 
 
245
class HttpChannel(asyncore.dispatcher_with_send):
 
246
    """One http socket, pipelining if possible."""
 
247
    # asynchronous http client
 
248
 
 
249
    user_agent = "http_client.py 1.3ka (based on effbot)"
 
250
 
 
251
    proxies = urllib.getproxies()
 
252
 
 
253
    def __init__(self, ip_host, ip_port, manager):
 
254
        asyncore.dispatcher_with_send.__init__(self)
 
255
        self.manager = manager
 
256
 
 
257
        # if a response header has been seen, this holds it
 
258
        self.response = None
 
259
        
 
260
        self.data = ""
 
261
 
 
262
        self.chunk_size = None
 
263
 
 
264
        self.timestamp = time.time()
 
265
 
 
266
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
267
        debug('connecting...')
 
268
        self.connect((ip_host, ip_port))
 
269
 
 
270
        # sent_requests holds (url, consumer) 
 
271
        self.sent_requests = []
 
272
 
 
273
        self._outbuf = ''
 
274
 
 
275
 
 
276
    def __repr__(self):
 
277
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
 
278
 
 
279
 
 
280
    def is_idle(self):
 
281
        return (not self.sent_requests)
 
282
 
 
283
 
 
284
    def handle_connect(self):
 
285
        debug("connected")
 
286
        self.take_one()
 
287
 
 
288
 
 
289
    def take_one(self):
 
290
        """Accept one request from the manager if possible."""
 
291
        if self.manager.try_pipelined:
 
292
            if len(self.sent_requests) > 4:
 
293
                return
 
294
        else:
 
295
            if len(self.sent_requests) > 0:
 
296
                return 
 
297
        
 
298
        try:
 
299
            url, consumer = self.manager.queued_requests.pop(0)
 
300
            debug('request accepted by channel')
 
301
        except IndexError:
 
302
            return
 
303
        
 
304
        # TODO: If there are too many already in flight, don't take one.
 
305
        # TODO: If the socket's not writable (tx buffer full), don't take.
 
306
        self._push_request_http10(url, consumer)
 
307
 
 
308
 
 
309
 
 
310
    def _push_request_http10(self, url, consumer):
 
311
        """Send a request, and add it to the outstanding queue."""
 
312
        # TODO: check the url requested is appropriate for this connection
 
313
 
 
314
        # TODO: If there are too many requests outstanding or (less likely) the 
 
315
        # connection fails, queue it for later use.
 
316
 
 
317
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
 
318
        # because we might need to retransmit them if the connection fails. (Or
 
319
        # should the caller do that?)
 
320
 
 
321
        request = self._form_request_http10(url)
 
322
        debug('send request for %s from %r' % (url, self))
 
323
 
 
324
        # dispatcher_with_send handles buffering the data until it can
 
325
        # be written, and hooks handle_write.
 
326
 
 
327
        self.send(request)
 
328
 
 
329
        self.sent_requests.append((url, consumer))
 
330
 
 
331
 
 
332
    def _form_request_http10(self, url):
 
333
        # TODO: get right vhost name
 
334
        request = [
 
335
            "GET %s HTTP/1.0" % (url),
 
336
            "Host: www.bazaar-ng.org",
 
337
            ]
 
338
 
 
339
        if self.manager.try_keepalive or self.manager.try_pipelined:
 
340
            request.extend([
 
341
                "Keep-Alive: 60", 
 
342
                "Connection: keep-alive",
 
343
                ])
 
344
 
 
345
        # make sure to include a user agent
 
346
        for header in request:
 
347
            if string.lower(header).startswith("user-agent:"):
 
348
                break
 
349
        else:
 
350
            request.append("User-Agent: %s" % self.user_agent)
 
351
 
 
352
        return string.join(request, "\r\n") + "\r\n\r\n"
 
353
 
 
354
 
 
355
    def handle_read(self):
 
356
        # handle incoming data
 
357
        data = self.recv(2048)
 
358
 
 
359
        self.data = self.data + data
 
360
 
 
361
        if len(data):
 
362
            debug('got %d bytes from socket' % len(data))
 
363
        else:
 
364
            debug('server closed connection')
 
365
 
 
366
        while self.data:
 
367
            consumer = self.sent_requests[0][1]
 
368
            if not self.response:
 
369
                # do not have a full response header yet
 
370
 
 
371
                # check if we've seen a full header
 
372
                debug('getting header for %s' % self.sent_requests[0][0])
 
373
 
 
374
                header = self.data.split("\r\n\r\n", 1)
 
375
                if len(header) <= 1:
 
376
                    return
 
377
                header, self.data = header
 
378
 
 
379
                self.response = _parse_response_http10(header)
 
380
                self.content_remaining = self.response.content_length
 
381
 
 
382
            if not self.data:
 
383
                return
 
384
 
 
385
            # we now know how many (more) content bytes we have, and how much
 
386
            # is in the data buffer. there are two main possibilities:
 
387
            # too much data, and some must be left behind containing the next
 
388
            # response headers, or too little, or possibly just right
 
389
 
 
390
            want = self.content_remaining
 
391
            if want > 0:
 
392
                got_data = self.data[:want]
 
393
                self.data = self.data[want:]
 
394
                
 
395
                assert got_data
 
396
 
 
397
                debug('pass back %d bytes of %s' % (len(got_data),
 
398
                                                    self.sent_requests[0][0]))
 
399
                consumer.feed(data)
 
400
 
 
401
                self.content_remaining -= len(got_data)
 
402
 
 
403
            if self.content_remaining == 0:
 
404
                del self.sent_requests[0]
 
405
 
 
406
                # reset lots of things and try to get the next response header
 
407
                if self.response.connection_reply == 'close':
 
408
                    debug('server requested close')
 
409
                    self.manager._channel_closed(self)
 
410
                    self.close()
 
411
                elif not self.manager.try_keepalive:
 
412
                    debug('no keepalive for this socket')
 
413
                    self.manager._channel_closed(self)
 
414
                    self.close()
 
415
                else:
 
416
                    debug("ready for next header...")
 
417
                    consumer.content_complete()
 
418
                    self.take_one()
 
419
                self.response = None
 
420
 
 
421
 
 
422
 
 
423
    def handle_close(self):
 
424
        debug('async told us of close on %r' % self)
 
425
        # if there are outstanding requests should probably reopen and 
 
426
        # retransmit, but if we're not making any progress then give up
 
427
        self.manager._channel_closed(self)
 
428
        self.close()
 
429
 
 
430
 
 
431
if __name__ == "__main__":
 
432
    class dummy_consumer:
 
433
        def __init__(self, url):
 
434
            self.url = url
 
435
 
 
436
        def feed(self, data):
 
437
            # print "feed", repr(data)
 
438
            # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
 
439
            pass
 
440
            
 
441
        def error(self, err_info):
 
442
            import traceback
 
443
            traceback.print_exception(err_info[0], err_info[1], err_info[2])
 
444
 
 
445
        def content_complete(self):
 
446
            debug('content complete from %s' % self.url)
 
447
            
 
448
 
 
449
    logging.basicConfig(level=logging.DEBUG)
 
450
 
 
451
    mgr = DownloadManager()
 
452
 
 
453
    from bzrlib.branch import Branch
 
454
    revs = Branch('/home/mbp/work/bzr').revision_history()
 
455
 
 
456
        
 
457
    
 
458
#     for url in ['http://www.bazaar-ng.org/',
 
459
#                 'http://www.bazaar-ng.org/tutorial.html',
 
460
#                 'http://www.bazaar-ng.org/download.html',
 
461
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
 
462
#                 ]:
 
463
 
 
464
    for rev in revs:
 
465
#        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/' \
 
466
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
 
467
              + rev + '.gz'
 
468
        mgr.enqueue(url, dummy_consumer(url))
 
469
 
 
470
    mgr.run()
 
471