~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to tools/http_client.py

  • Committer: Martin Pool
  • Date: 2005-04-21 01:30:22 UTC
  • Revision ID: mbp@sourcefrog.net-20050421013022-858a70691455e1dc
todo

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#! /usr/bin/python
2
 
 
3
 
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
4
 
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
5
 
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
6
 
#
7
 
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
8
 
#
9
 
# changes:
10
 
# 2004-08-26 fl   unified http callback
11
 
# 2004-10-09 fl   factored out gzip_consumer support
12
 
# 2005-07-08 mbp  experimental support for keepalive connections
13
 
#
14
 
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
15
 
#
16
 
 
17
 
 
18
 
 
19
 
"""async/pipelined http client
20
 
 
21
 
Use
22
 
===
23
 
 
24
 
Users of this library pass in URLs they want to see, and consumer
25
 
objects that will receive the results at some point in the future.
26
 
Any number of requests may be queued up, and more may be added while
27
 
the download is in progress.
28
 
 
29
 
Requests can be both superscalar and superpipelined.  That is to say,
30
 
for each server there can be multiple sockets open, and each socket
31
 
may have more than one request in flight.
32
 
 
33
 
Design
34
 
======
35
 
 
36
 
There is a single DownloadManager, and a connection object for each
37
 
open socket.
38
 
 
39
 
Request/consumer pairs are maintained in queues.  Each connection has
40
 
a list of transmitted requests whose response has not yet been
41
 
received.  There is also a per-server list of requests that have not
42
 
yet been submitted.
43
 
 
44
 
When a connection is ready to transmit a new request, it takes one
45
 
from the unsubmitted list, sends the request, and adds the request to
46
 
its unfulfilled list.  This should happen when the connection has
47
 
space for more transmissions or when a new request is added by the
48
 
user.  If the connection terminates with unfulfilled requests they are
49
 
put back onto the unsubmitted list, to be retried elsewhere.
50
 
 
51
 
Because responses come back precisely in order, the connection always
52
 
knows what it should expect next: the response for the next
53
 
unfulfilled request.
54
 
"""
55
 
 
56
 
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
57
 
# with a hostname does a remote DNS resolution, which is pretty sucky.
58
 
# Shouldn't there be a cache in glibc?  We should probably cache the
59
 
# address in, say, the DownloadManager.
60
 
 
61
 
# TODO: A default consumer operation that writes the received data
62
 
# into a file; by default the file is named the same as the last
63
 
# component of the URL.
64
 
 
65
 
# TODO: A utility function that is given a list of URLs, and downloads
66
 
# them all parallel/pipelined.  If any fail, it raises an exception
67
 
# (and discards the rest), or perhaps can be told to continue anyhow.
68
 
# The content is written into temporary files.  It returns a list of
69
 
# readable file objects.
70
 
 
71
 
import asyncore
72
 
import socket, string, time, sys
73
 
import StringIO
74
 
import mimetools, urlparse, urllib
75
 
import logging
76
 
 
77
 
logger = logging.getLogger('bzr.http_client')
78
 
debug = logger.debug
79
 
info = logger.info
80
 
error = logger.error
81
 
 
82
 
 
83
 
##
84
 
# Close connection.   Request handlers can raise this exception to
85
 
# indicate that the connection should be closed.
86
 
 
87
 
class CloseConnection(Exception):
88
 
    pass
89
 
 
90
 
##
91
 
# Redirect connection.  Request handlers can raise this exception to
92
 
# indicate that the a new request should be issued.
93
 
 
94
 
class Redirect(CloseConnection):
95
 
    def __init__(self, location):
96
 
        self.location = location
97
 
 
98
 
 
99
 
class DownloadManager(object):
100
 
    """Handles pipelined/overlapped downloads.
101
 
 
102
 
    Pass in a series of URLs with handlers to receive the response.
103
 
    This object will spread the requests over however many sockets
104
 
    seem useful.
105
 
 
106
 
    queued_requests
107
 
        Requests not assigned to any channel
108
 
 
109
 
    running_requests
110
 
        Currently assigned to a channel
111
 
    """
112
 
    def __init__(self):
113
 
        self.queued_requests = []
114
 
        # self.channel = HttpChannel('localhost', 8000, self)
115
 
        self.channels = []
116
 
        self.try_pipelined = False
117
 
        self.try_keepalive = False
118
 
        self.max_channels = 3
119
 
 
120
 
 
121
 
    def enqueue(self, url, consumer):
122
 
        self.queued_requests.append((url, consumer))
123
 
        self._wake_up_channel()
124
 
 
125
 
 
126
 
    def _channel_closed(self, channel):
127
 
        """Called by the channel when its socket closes.
128
 
        """
129
 
        self.channels.remove(channel)
130
 
        if self.queued_requests:
131
 
            # might recreate one
132
 
            self._wake_up_channel()
133
 
 
134
 
 
135
 
    def _make_channel(self):
136
 
        # proxy2 203.17.154.69
137
 
        # bazaar-ng.org 
138
 
        return HttpChannel('82.211.81.161', 80, self)
139
 
        # return HttpChannel('203.17.154.69', 8080, self)
140
 
        # return HttpChannel('localhost', 8000, self)
141
 
            
142
 
 
143
 
    def _wake_up_channel(self):
144
 
        """Try to wake up one channel to send the newly-added request.
145
 
 
146
 
        There may be more than one request pending, and this may cause
147
 
        more than one channel to take requests.  That's OK; some of
148
 
        them may be frustrated.
149
 
        """
150
 
        from random import shuffle, choice
151
 
        
152
 
        # first, wake up any idle channels
153
 
        done = False
154
 
        for ch in self.channels:
155
 
            if not ch.sent_requests:
156
 
                ch.take_one()
157
 
                done = True
158
 
        if done:
159
 
            debug("woke existing idle channel(s)")
160
 
            return
161
 
 
162
 
        if len(self.channels) < self.max_channels:
163
 
            newch = self._make_channel()
164
 
            self.channels.append(newch)
165
 
            newch.take_one()
166
 
            debug("created new channel")
167
 
            return
168
 
 
169
 
        if self.try_pipelined:
170
 
            # ask existing channels to take it
171
 
            debug("woke busy channel")
172
 
            choice(self.channels).take_one()
173
 
 
174
 
 
175
 
        debug("request left until a channel's idle")
176
 
        
177
 
 
178
 
 
179
 
 
180
 
    def run(self):
181
 
        """Run until all outstanding requests have been served."""
182
 
        #while self.running_requests or self.queued_requests \
183
 
        #          or not self.channel.is_idle():
184
 
        #    asyncore.loop(count=1)
185
 
        asyncore.loop()
186
 
 
187
 
 
188
 
 
189
 
class Response(object):
190
 
    """Holds in-flight response."""
191
 
 
192
 
 
193
 
 
194
 
def _parse_response_http10(header):
195
 
    from cStringIO import StringIO
196
 
 
197
 
    fp = StringIO(header)
198
 
    r = Response()
199
 
 
200
 
    r.status = fp.readline().split(" ", 2)
201
 
    r.headers = mimetools.Message(fp)
202
 
 
203
 
    # we can only(?) expect to do keepalive if we got either a 
204
 
    # content-length or chunked encoding; otherwise there's no way to know
205
 
    # when the content ends apart from through the connection close
206
 
    r.content_type = r.headers.get("content-type")
207
 
    try:
208
 
        r.content_length = int(r.headers.get("content-length"))
209
 
    except (ValueError, TypeError):
210
 
        r.content_length = None
211
 
    debug("seen content length of %r" % r.content_length)
212
 
 
213
 
    r.transfer_encoding = r.headers.get("transfer-encoding")
214
 
    r.content_encoding = r.headers.get("content-encoding")
215
 
    r.connection_reply = r.headers.get("connection")
216
 
 
217
 
    # TODO: pass status code to consumer?
218
 
 
219
 
    if r.transfer_encoding:
220
 
        raise NotImplementedError()
221
 
 
222
 
    if r.transfer_encoding:
223
 
        raise NotImplementedError()
224
 
 
225
 
    if int(r.status[1]) != 200:
226
 
        debug("can't handle response status %r" % r.status)
227
 
        raise NotImplementedError()
228
 
 
229
 
    if r.content_length == None:
230
 
        raise NotImplementedError()
231
 
 
232
 
    if r.content_length == 0:
233
 
        raise NotImplementedError()
234
 
 
235
 
    r.content_remaining = r.content_length                
236
 
 
237
 
    return r
238
 
 
239
 
 
240
 
    
241
 
    
242
 
        
243
 
 
244
 
 
245
 
class HttpChannel(asyncore.dispatcher_with_send):
246
 
    """One http socket, pipelining if possible."""
247
 
    # asynchronous http client
248
 
 
249
 
    user_agent = "http_client.py 1.3ka (based on effbot)"
250
 
 
251
 
    proxies = urllib.getproxies()
252
 
 
253
 
    def __init__(self, ip_host, ip_port, manager):
254
 
        asyncore.dispatcher_with_send.__init__(self)
255
 
        self.manager = manager
256
 
 
257
 
        # if a response header has been seen, this holds it
258
 
        self.response = None
259
 
        
260
 
        self.data = ""
261
 
 
262
 
        self.chunk_size = None
263
 
 
264
 
        self.timestamp = time.time()
265
 
 
266
 
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
267
 
        debug('connecting...')
268
 
        self.connect((ip_host, ip_port))
269
 
 
270
 
        # sent_requests holds (url, consumer) 
271
 
        self.sent_requests = []
272
 
 
273
 
        self._outbuf = ''
274
 
 
275
 
 
276
 
    def __repr__(self):
277
 
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
278
 
 
279
 
 
280
 
    def is_idle(self):
281
 
        return (not self.sent_requests)
282
 
 
283
 
 
284
 
    def handle_connect(self):
285
 
        debug("connected")
286
 
        self.take_one()
287
 
 
288
 
 
289
 
    def take_one(self):
290
 
        """Accept one request from the manager if possible."""
291
 
        if self.manager.try_pipelined:
292
 
            if len(self.sent_requests) > 4:
293
 
                return
294
 
        else:
295
 
            if len(self.sent_requests) > 0:
296
 
                return 
297
 
        
298
 
        try:
299
 
            url, consumer = self.manager.queued_requests.pop(0)
300
 
            debug('request accepted by channel')
301
 
        except IndexError:
302
 
            return
303
 
        
304
 
        # TODO: If there are too many already in flight, don't take one.
305
 
        # TODO: If the socket's not writable (tx buffer full), don't take.
306
 
        self._push_request_http10(url, consumer)
307
 
 
308
 
 
309
 
 
310
 
    def _push_request_http10(self, url, consumer):
311
 
        """Send a request, and add it to the outstanding queue."""
312
 
        # TODO: check the url requested is appropriate for this connection
313
 
 
314
 
        # TODO: If there are too many requests outstanding or (less likely) the 
315
 
        # connection fails, queue it for later use.
316
 
 
317
 
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
318
 
        # because we might need to retransmit them if the connection fails. (Or
319
 
        # should the caller do that?)
320
 
 
321
 
        request = self._form_request_http10(url)
322
 
        debug('send request for %s from %r' % (url, self))
323
 
 
324
 
        # dispatcher_with_send handles buffering the data until it can
325
 
        # be written, and hooks handle_write.
326
 
 
327
 
        self.send(request)
328
 
 
329
 
        self.sent_requests.append((url, consumer))
330
 
 
331
 
 
332
 
    def _form_request_http10(self, url):
333
 
        # TODO: get right vhost name
334
 
        request = [
335
 
            "GET %s HTTP/1.0" % (url),
336
 
            "Host: www.bazaar-ng.org",
337
 
            ]
338
 
 
339
 
        if self.manager.try_keepalive or self.manager.try_pipelined:
340
 
            request.extend([
341
 
                "Keep-Alive: 60", 
342
 
                "Connection: keep-alive",
343
 
                ])
344
 
 
345
 
        # make sure to include a user agent
346
 
        for header in request:
347
 
            if string.lower(header).startswith("user-agent:"):
348
 
                break
349
 
        else:
350
 
            request.append("User-Agent: %s" % self.user_agent)
351
 
 
352
 
        return string.join(request, "\r\n") + "\r\n\r\n"
353
 
 
354
 
 
355
 
    def handle_read(self):
356
 
        # handle incoming data
357
 
        data = self.recv(2048)
358
 
 
359
 
        self.data = self.data + data
360
 
 
361
 
        if len(data):
362
 
            debug('got %d bytes from socket' % len(data))
363
 
        else:
364
 
            debug('server closed connection')
365
 
 
366
 
        while self.data:
367
 
            consumer = self.sent_requests[0][1]
368
 
            if not self.response:
369
 
                # do not have a full response header yet
370
 
 
371
 
                # check if we've seen a full header
372
 
                debug('getting header for %s' % self.sent_requests[0][0])
373
 
 
374
 
                header = self.data.split("\r\n\r\n", 1)
375
 
                if len(header) <= 1:
376
 
                    return
377
 
                header, self.data = header
378
 
 
379
 
                self.response = _parse_response_http10(header)
380
 
                self.content_remaining = self.response.content_length
381
 
 
382
 
            if not self.data:
383
 
                return
384
 
 
385
 
            # we now know how many (more) content bytes we have, and how much
386
 
            # is in the data buffer. there are two main possibilities:
387
 
            # too much data, and some must be left behind containing the next
388
 
            # response headers, or too little, or possibly just right
389
 
 
390
 
            want = self.content_remaining
391
 
            if want > 0:
392
 
                got_data = self.data[:want]
393
 
                self.data = self.data[want:]
394
 
                
395
 
                assert got_data
396
 
 
397
 
                debug('pass back %d bytes of %s' % (len(got_data),
398
 
                                                    self.sent_requests[0][0]))
399
 
                consumer.feed(data)
400
 
 
401
 
                self.content_remaining -= len(got_data)
402
 
 
403
 
            if self.content_remaining == 0:
404
 
                del self.sent_requests[0]
405
 
 
406
 
                # reset lots of things and try to get the next response header
407
 
                if self.response.connection_reply == 'close':
408
 
                    debug('server requested close')
409
 
                    self.manager._channel_closed(self)
410
 
                    self.close()
411
 
                elif not self.manager.try_keepalive:
412
 
                    debug('no keepalive for this socket')
413
 
                    self.manager._channel_closed(self)
414
 
                    self.close()
415
 
                else:
416
 
                    debug("ready for next header...")
417
 
                    consumer.content_complete()
418
 
                    self.take_one()
419
 
                self.response = None
420
 
 
421
 
 
422
 
 
423
 
    def handle_close(self):
424
 
        debug('async told us of close on %r' % self)
425
 
        # if there are outstanding requests should probably reopen and 
426
 
        # retransmit, but if we're not making any progress then give up
427
 
        self.manager._channel_closed(self)
428
 
        self.close()
429
 
 
430
 
 
431
 
if __name__ == "__main__":
432
 
    class dummy_consumer:
433
 
        def __init__(self, url):
434
 
            self.url = url
435
 
 
436
 
        def feed(self, data):
437
 
            # print "feed", repr(data)
438
 
            # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
439
 
            pass
440
 
            
441
 
        def error(self, err_info):
442
 
            import traceback
443
 
            traceback.print_exception(err_info[0], err_info[1], err_info[2])
444
 
 
445
 
        def content_complete(self):
446
 
            debug('content complete from %s' % self.url)
447
 
            
448
 
 
449
 
    logging.basicConfig(level=logging.DEBUG)
450
 
 
451
 
    mgr = DownloadManager()
452
 
 
453
 
    from bzrlib.branch import Branch
454
 
    revs = Branch('/home/mbp/work/bzr').revision_history()
455
 
 
456
 
        
457
 
    
458
 
#     for url in ['http://www.bazaar-ng.org/',
459
 
#                 'http://www.bazaar-ng.org/tutorial.html',
460
 
#                 'http://www.bazaar-ng.org/download.html',
461
 
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
462
 
#                 ]:
463
 
 
464
 
    for rev in revs:
465
 
#        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/' \
466
 
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
467
 
              + rev + '.gz'
468
 
        mgr.enqueue(url, dummy_consumer(url))
469
 
 
470
 
    mgr.run()
471