~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to tools/http_client.py

  • Committer: Martin Pool
  • Date: 2005-05-17 06:56:16 UTC
  • Revision ID: mbp@sourcefrog.net-20050517065616-6f23381d6184a8aa
- add space for un-merged patches

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#! /usr/bin/python
2
 
 
3
 
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
4
 
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
5
 
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
6
 
#
7
 
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
8
 
#
9
 
# changes:
10
 
# 2004-08-26 fl   unified http callback
11
 
# 2004-10-09 fl   factored out gzip_consumer support
12
 
# 2005-07-08 mbp  experimental support for keepalive connections
13
 
#
14
 
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
15
 
#
16
 
 
17
 
 
18
 
 
19
 
"""async/pipelined http client
20
 
 
21
 
Use
22
 
===
23
 
 
24
 
Users of this library pass in URLs they want to see, and consumer
25
 
objects that will receive the results at some point in the future.
26
 
Any number of requests may be queued up, and more may be added while
27
 
the download is in progress.
28
 
 
29
 
Requests can be both superscalar and superpipelined.  That is to say,
30
 
for each server there can be multiple sockets open, and each socket
31
 
may have more than one request in flight.
32
 
 
33
 
Design
34
 
======
35
 
 
36
 
There is a single DownloadManager, and a connection object for each
37
 
open socket.
38
 
 
39
 
Request/consumer pairs are maintained in queues.  Each connection has
40
 
a list of transmitted requests whose response has not yet been
41
 
received.  There is also a per-server list of requests that have not
42
 
yet been submitted.
43
 
 
44
 
When a connection is ready to transmit a new request, it takes one
45
 
from the unsubmitted list, sends the request, and adds the request to
46
 
its unfulfilled list.  This should happen when the connection has
47
 
space for more transmissions or when a new request is added by the
48
 
user.  If the connection terminates with unfulfilled requests they are
49
 
put back onto the unsubmitted list, to be retried elsewhere.
50
 
 
51
 
Because responses come back precisely in order, the connection always
52
 
knows what it should expect next: the response for the next
53
 
unfulfilled request.
54
 
"""
55
 
 
56
 
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
57
 
# with a hostname does a remote DNS resolution, which is pretty sucky.
58
 
# Shouldn't there be a cache in glibc?  We should probably cache the
59
 
# address in, say, the DownloadManager.
60
 
 
61
 
# TODO: A default consumer operation that writes the received data
62
 
# into a file; by default the file is named the same as the last
63
 
# component of the URL.
64
 
 
65
 
# TODO: A utility function that is given a list of URLs, and downloads
66
 
# them all parallel/pipelined.  If any fail, it raises an exception
67
 
# (and discards the rest), or perhaps can be told to continue anyhow.
68
 
# The content is written into temporary files.  It returns a list of
69
 
# readable file objects.
70
 
 
71
 
# TODO: If we try pipelined or keepalive and the connection drop out
72
 
# then retry the request on a new connection; eventually we should perhaps
73
 
# learn that a given host or network just won't allow keepalive.
74
 
 
75
 
 
76
 
import asyncore
77
 
import socket, string, time, sys
78
 
import StringIO
79
 
import mimetools, urlparse, urllib
80
 
import logging
81
 
 
82
 
logging.basicConfig(level=logging.DEBUG,
83
 
                    format='%(asctime)s %(levelname)s %(message)s',
84
 
                    filename='/tmp/http_client.log',
85
 
                    filemode='w')
86
 
 
87
 
logger = logging.getLogger('bzr.http_client')
88
 
debug = logger.debug
89
 
info = logger.info
90
 
error = logger.error
91
 
 
92
 
 
93
 
##
94
 
# Close connection.   Request handlers can raise this exception to
95
 
# indicate that the connection should be closed.
96
 
 
97
 
class CloseConnection(Exception):
98
 
    pass
99
 
 
100
 
##
101
 
# Redirect connection.  Request handlers can raise this exception to
102
 
# indicate that the a new request should be issued.
103
 
 
104
 
class Redirect(CloseConnection):
105
 
    def __init__(self, location):
106
 
        self.location = location
107
 
 
108
 
 
109
 
class DownloadManager(object):
110
 
    """Handles pipelined/overlapped downloads.
111
 
 
112
 
    Pass in a series of URLs with handlers to receive the response.
113
 
    This object will spread the requests over however many sockets
114
 
    seem useful.
115
 
 
116
 
    queued_requests
117
 
        Requests not assigned to any channel
118
 
 
119
 
    running_requests
120
 
        Currently assigned to a channel
121
 
    """
122
 
    def __init__(self):
123
 
        self.queued_requests = []
124
 
        # self.channel = HttpChannel('localhost', 8000, self)
125
 
        self.channels = []
126
 
        self.try_pipelined = False
127
 
        self.try_keepalive = False
128
 
        self.max_channels = 5
129
 
 
130
 
 
131
 
    def enqueue(self, url, consumer):
132
 
        self.queued_requests.append((url, consumer))
133
 
        self._wake_up_channel()
134
 
 
135
 
 
136
 
    def _channel_closed(self, channel):
137
 
        """Called by the channel when its socket closes.
138
 
        """
139
 
        self.channels.remove(channel)
140
 
        if self.queued_requests:
141
 
            # might recreate one
142
 
            self._wake_up_channel()
143
 
 
144
 
 
145
 
    def _make_channel(self):
146
 
        # proxy2 203.17.154.69
147
 
        # return HttpChannel('82.211.81.161', 80, self)         # bazaar-ng.org 
148
 
        # return HttpChannel('203.17.154.69', 8080, self)
149
 
        return HttpChannel('127.0.0.1', 8000, self)  # forwarded
150
 
            
151
 
 
152
 
    def _wake_up_channel(self):
153
 
        """Try to wake up one channel to send the newly-added request.
154
 
 
155
 
        There may be more than one request pending, and this may cause
156
 
        more than one channel to take requests.  That's OK; some of
157
 
        them may be frustrated.
158
 
        """
159
 
        from random import shuffle, choice
160
 
        
161
 
        # first, wake up any idle channels
162
 
        done = False
163
 
        for ch in self.channels:
164
 
            if not ch.sent_requests:
165
 
                ch.take_one()
166
 
                done = True
167
 
        if done:
168
 
            debug("woke existing idle channel(s)")
169
 
            return
170
 
 
171
 
        if len(self.channels) < self.max_channels:
172
 
            newch = self._make_channel()
173
 
            self.channels.append(newch)
174
 
            newch.take_one()
175
 
            debug("created new channel")
176
 
            return
177
 
 
178
 
        if self.try_pipelined:
179
 
            # ask existing channels to take it
180
 
            debug("woke busy channel")
181
 
            choice(self.channels).take_one()
182
 
 
183
 
 
184
 
        # debug("request postponed until a channel's idle")
185
 
        
186
 
 
187
 
 
188
 
 
189
 
    def run(self):
190
 
        """Run until all outstanding requests have been served."""
191
 
        #while self.running_requests or self.queued_requests \
192
 
        #          or not self.channel.is_idle():
193
 
        #    asyncore.loop(count=1)
194
 
        asyncore.loop()
195
 
 
196
 
 
197
 
 
198
 
class Response(object):
199
 
    """Holds in-flight response."""
200
 
 
201
 
 
202
 
 
203
 
def _parse_response_http10(header):
204
 
    from cStringIO import StringIO
205
 
 
206
 
    fp = StringIO(header)
207
 
    r = Response()
208
 
 
209
 
    r.status = fp.readline().split(" ", 2)
210
 
    r.headers = mimetools.Message(fp)
211
 
 
212
 
    # we can only(?) expect to do keepalive if we got either a 
213
 
    # content-length or chunked encoding; otherwise there's no way to know
214
 
    # when the content ends apart from through the connection close
215
 
    r.content_type = r.headers.get("content-type")
216
 
    try:
217
 
        r.content_length = int(r.headers.get("content-length"))
218
 
    except (ValueError, TypeError):
219
 
        r.content_length = None
220
 
    debug("seen content length of %r" % r.content_length)
221
 
 
222
 
    r.transfer_encoding = r.headers.get("transfer-encoding")
223
 
    r.content_encoding = r.headers.get("content-encoding")
224
 
    r.connection_reply = r.headers.get("connection")
225
 
 
226
 
    # TODO: pass status code to consumer?
227
 
 
228
 
    if r.transfer_encoding:
229
 
        raise NotImplementedError()
230
 
 
231
 
    if r.transfer_encoding:
232
 
        raise NotImplementedError()
233
 
 
234
 
    if int(r.status[1]) != 200:
235
 
        debug("can't handle response status %r" % r.status)
236
 
        raise NotImplementedError()
237
 
 
238
 
    if r.content_length is None:
239
 
        raise NotImplementedError()
240
 
 
241
 
    if r.content_length == 0:
242
 
        raise NotImplementedError()
243
 
 
244
 
    r.content_remaining = r.content_length                
245
 
 
246
 
    return r
247
 
 
248
 
 
249
 
    
250
 
    
251
 
        
252
 
 
253
 
 
254
 
class HttpChannel(asyncore.dispatcher_with_send):
255
 
    """One http socket, pipelining if possible."""
256
 
    # asynchronous http client
257
 
 
258
 
    user_agent = "http_client.py 1.3ka (based on effbot)"
259
 
 
260
 
    proxies = urllib.getproxies()
261
 
 
262
 
    def __init__(self, ip_host, ip_port, manager):
263
 
        asyncore.dispatcher_with_send.__init__(self)
264
 
        self.manager = manager
265
 
 
266
 
        # if a response header has been seen, this holds it
267
 
        self.response = None
268
 
        
269
 
        self.data = ""
270
 
 
271
 
        self.chunk_size = None
272
 
 
273
 
        self.timestamp = time.time()
274
 
 
275
 
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
276
 
        debug('connecting...')
277
 
        self.connect((ip_host, ip_port))
278
 
 
279
 
        # sent_requests holds (url, consumer) 
280
 
        self.sent_requests = []
281
 
 
282
 
        self._outbuf = ''
283
 
 
284
 
 
285
 
    def __repr__(self):
286
 
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
287
 
 
288
 
 
289
 
    def is_idle(self):
290
 
        return (not self.sent_requests)
291
 
 
292
 
 
293
 
    def handle_connect(self):
294
 
        debug("connected")
295
 
        self.take_one()
296
 
 
297
 
 
298
 
    def take_one(self):
299
 
        """Accept one request from the manager if possible."""
300
 
        if self.manager.try_pipelined:
301
 
            if len(self.sent_requests) > 4:
302
 
                return
303
 
        else:
304
 
            if len(self.sent_requests) > 0:
305
 
                return 
306
 
        
307
 
        try:
308
 
            url, consumer = self.manager.queued_requests.pop(0)
309
 
            debug('request accepted by channel')
310
 
        except IndexError:
311
 
            return
312
 
        
313
 
        # TODO: If there are too many already in flight, don't take one.
314
 
        # TODO: If the socket's not writable (tx buffer full), don't take.
315
 
        self._push_request_http10(url, consumer)
316
 
 
317
 
 
318
 
 
319
 
    def _push_request_http10(self, url, consumer):
320
 
        """Send a request, and add it to the outstanding queue."""
321
 
        # TODO: check the url requested is appropriate for this connection
322
 
 
323
 
        # TODO: If there are too many requests outstanding or (less likely) the 
324
 
        # connection fails, queue it for later use.
325
 
 
326
 
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
327
 
        # because we might need to retransmit them if the connection fails. (Or
328
 
        # should the caller do that?)
329
 
 
330
 
        request = self._form_request_http10(url)
331
 
        debug('send request for %s from %r' % (url, self))
332
 
 
333
 
        # dispatcher_with_send handles buffering the data until it can
334
 
        # be written, and hooks handle_write.
335
 
 
336
 
        self.send(request)
337
 
 
338
 
        self.sent_requests.append((url, consumer))
339
 
 
340
 
 
341
 
    def _form_request_http10(self, url):
342
 
        # TODO: get right vhost name
343
 
        request = [
344
 
            "GET %s HTTP/1.0" % (url),
345
 
            "Host: www.bazaar-ng.org",
346
 
            ]
347
 
 
348
 
        if self.manager.try_keepalive or self.manager.try_pipelined:
349
 
            request.extend([
350
 
                "Keep-Alive: 60", 
351
 
                "Connection: keep-alive",
352
 
                ])
353
 
 
354
 
        # make sure to include a user agent
355
 
        for header in request:
356
 
            if string.lower(header).startswith("user-agent:"):
357
 
                break
358
 
        else:
359
 
            request.append("User-Agent: %s" % self.user_agent)
360
 
 
361
 
        return string.join(request, "\r\n") + "\r\n\r\n"
362
 
 
363
 
 
364
 
    def handle_read(self):
365
 
        # handle incoming data
366
 
        data = self.recv(2048)
367
 
 
368
 
        self.data = self.data + data
369
 
 
370
 
        if len(data):
371
 
            debug('got %d bytes from socket' % len(data))
372
 
        else:
373
 
            debug('server closed connection')
374
 
 
375
 
        while self.data:
376
 
            consumer = self.sent_requests[0][1]
377
 
            if not self.response:
378
 
                # do not have a full response header yet
379
 
 
380
 
                # check if we've seen a full header
381
 
                debug('getting header for %s' % self.sent_requests[0][0])
382
 
 
383
 
                header = self.data.split("\r\n\r\n", 1)
384
 
                if len(header) <= 1:
385
 
                    return
386
 
                header, self.data = header
387
 
 
388
 
                self.response = _parse_response_http10(header)
389
 
                self.content_remaining = self.response.content_length
390
 
 
391
 
            if not self.data:
392
 
                return
393
 
 
394
 
            # we now know how many (more) content bytes we have, and how much
395
 
            # is in the data buffer. there are two main possibilities:
396
 
            # too much data, and some must be left behind containing the next
397
 
            # response headers, or too little, or possibly just right
398
 
 
399
 
            want = self.content_remaining
400
 
            if want > 0:
401
 
                got_data = self.data[:want]
402
 
                self.data = self.data[want:]
403
 
                
404
 
                assert got_data
405
 
 
406
 
                self.content_remaining -= len(got_data)
407
 
 
408
 
                debug('pass back %d bytes of %s, %d remain'
409
 
                      % (len(got_data),
410
 
                         self.sent_requests[0][0],
411
 
                         self.content_remaining))
412
 
                consumer.feed(data)
413
 
 
414
 
            if self.content_remaining == 0:
415
 
                del self.sent_requests[0]
416
 
 
417
 
                debug('content complete')
418
 
                consumer.content_complete()
419
 
                
420
 
                # reset lots of things and try to get the next response header
421
 
                if self.response.connection_reply == 'close':
422
 
                    debug('server requested close')
423
 
                    self.manager._channel_closed(self)
424
 
                    self.close()
425
 
                elif not self.manager.try_keepalive:
426
 
                    debug('no keepalive for this socket')
427
 
                    self.manager._channel_closed(self)
428
 
                    self.close()
429
 
                else:
430
 
                    debug("ready for next header...")
431
 
                    self.take_one()
432
 
                self.response = None
433
 
 
434
 
 
435
 
 
436
 
    def handle_close(self):
437
 
        debug('async told us of close on %r' % self)
438
 
        # if there are outstanding requests should probably reopen and 
439
 
        # retransmit, but if we're not making any progress then give up
440
 
        self.manager._channel_closed(self)
441
 
        self.close()
442
 
 
443
 
 
444
 
class DummyConsumer:
445
 
    def __init__(self, url, pb):
446
 
        self.url = url
447
 
        self.outf = None
448
 
        self._pb = pb
449
 
 
450
 
    def feed(self, data):
451
 
        # print "feed", repr(data)
452
 
        # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
453
 
        if not self.outf:
454
 
            base = self.url[self.url.rindex('/')+1:]
455
 
            self.outf = file('/tmp/download/' + base, 'wb')
456
 
        self.outf.write(data)
457
 
 
458
 
    def error(self, err_info):
459
 
        import traceback
460
 
        error('error reported to consumer')
461
 
        traceback.print_exception(err_info[0], err_info[1], err_info[2])
462
 
        sys.exit(1)
463
 
 
464
 
    def content_complete(self):
465
 
        info('content complete from %s' % self.url)
466
 
        self.outf.close()
467
 
        self.outf = None
468
 
        # using last_cnt is cheating
469
 
        self._pb.update('downloading inventory',
470
 
                        self._pb.last_cnt+1,
471
 
                        self._pb.last_total)
472
 
 
473
 
 
474
 
 
475
 
if __name__ == "__main__":
476
 
    logging.basicConfig(level=logging.DEBUG)
477
 
 
478
 
    mgr = DownloadManager()
479
 
 
480
 
    from bzrlib.branch import Branch
481
 
    from bzrlib.progress import ProgressBar
482
 
 
483
 
    pb = ProgressBar()
484
 
    revs = Branch('/home/mbp/work/bzr').revision_history()
485
 
    pb.update('downloading inventories', 0, len(revs))
486
 
 
487
 
    for rev in revs:
488
 
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
489
 
              + rev + '.gz'
490
 
        mgr.enqueue(url, DummyConsumer(url, pb))
491
 
 
492
 
    mgr.run()
493
 
    
494
 
 
495
 
 
496
 
    
497
 
#     for url in ['http://www.bazaar-ng.org/',
498
 
#                 'http://www.bazaar-ng.org/tutorial.html',
499
 
#                 'http://www.bazaar-ng.org/download.html',
500
 
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
501
 
#                 ]: