~bzr-pqm/bzr/bzr.dev

1185.1.29 by Robert Collins
merge merge tweaks from aaron, which includes latest .dev
1
#! /usr/bin/python
2
3
# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
4
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
5
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
6
#
7
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
8
#
9
# changes:
10
# 2004-08-26 fl   unified http callback
11
# 2004-10-09 fl   factored out gzip_consumer support
12
# 2005-07-08 mbp  experimental support for keepalive connections
13
#
14
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
15
#
16
17
18
19
"""async/pipelined http client
20
21
Use
22
===
23
24
Users of this library pass in URLs they want to see, and consumer
25
objects that will receive the results at some point in the future.
26
Any number of requests may be queued up, and more may be added while
27
the download is in progress.
28
29
Requests can be both superscalar and superpipelined.  That is to say,
30
for each server there can be multiple sockets open, and each socket
31
may have more than one request in flight.
32
33
Design
34
======
35
36
There is a single DownloadManager, and a connection object for each
37
open socket.
38
39
Request/consumer pairs are maintained in queues.  Each connection has
40
a list of transmitted requests whose response has not yet been
41
received.  There is also a per-server list of requests that have not
42
yet been submitted.
43
44
When a connection is ready to transmit a new request, it takes one
45
from the unsubmitted list, sends the request, and adds the request to
46
its unfulfilled list.  This should happen when the connection has
47
space for more transmissions or when a new request is added by the
48
user.  If the connection terminates with unfulfilled requests they are
49
put back onto the unsubmitted list, to be retried elsewhere.
50
51
Because responses come back precisely in order, the connection always
52
knows what it should expect next: the response for the next
53
unfulfilled request.
54
"""
55
56
# Note that (as of ubuntu python 2.4.1) every socket.connect() call
57
# with a hostname does a remote DNS resolution, which is pretty sucky.
58
# Shouldn't there be a cache in glibc?  We should probably cache the
59
# address in, say, the DownloadManager.
60
61
# TODO: A default consumer operation that writes the received data
62
# into a file; by default the file is named the same as the last
63
# component of the URL.
64
65
# TODO: A utility function that is given a list of URLs, and downloads
66
# them all parallel/pipelined.  If any fail, it raises an exception
67
# (and discards the rest), or perhaps can be told to continue anyhow.
68
# The content is written into temporary files.  It returns a list of
69
# readable file objects.
70
71
# TODO: If we try pipelined or keepalive and the connection drop out
72
# then retry the request on a new connection; eventually we should perhaps
73
# learn that a given host or network just won't allow keepalive.
74
75
76
import asyncore
77
import socket, string, time, sys
78
import StringIO
79
import mimetools, urlparse, urllib
80
import logging
81
82
logging.basicConfig(level=logging.DEBUG,
83
                    format='%(asctime)s %(levelname)s %(message)s',
84
                    filename='/tmp/http_client.log',
85
                    filemode='w')
86
87
logger = logging.getLogger('bzr.http_client')
88
debug = logger.debug
89
info = logger.info
90
error = logger.error
91
92
93
##
94
# Close connection.   Request handlers can raise this exception to
95
# indicate that the connection should be closed.
96
97
class CloseConnection(Exception):
98
    pass
99
100
##
101
# Redirect connection.  Request handlers can raise this exception to
102
# indicate that the a new request should be issued.
103
104
class Redirect(CloseConnection):
105
    def __init__(self, location):
106
        self.location = location
107
108
109
class DownloadManager(object):
110
    """Handles pipelined/overlapped downloads.
111
112
    Pass in a series of URLs with handlers to receive the response.
113
    This object will spread the requests over however many sockets
114
    seem useful.
115
116
    queued_requests
117
        Requests not assigned to any channel
118
119
    running_requests
120
        Currently assigned to a channel
121
    """
122
    def __init__(self):
123
        self.queued_requests = []
124
        # self.channel = HttpChannel('localhost', 8000, self)
125
        self.channels = []
126
        self.try_pipelined = False
127
        self.try_keepalive = False
128
        self.max_channels = 5
129
130
131
    def enqueue(self, url, consumer):
132
        self.queued_requests.append((url, consumer))
133
        self._wake_up_channel()
134
135
136
    def _channel_closed(self, channel):
137
        """Called by the channel when its socket closes.
138
        """
139
        self.channels.remove(channel)
140
        if self.queued_requests:
141
            # might recreate one
142
            self._wake_up_channel()
143
144
145
    def _make_channel(self):
146
        # proxy2 203.17.154.69
147
        # return HttpChannel('82.211.81.161', 80, self)         # bazaar-ng.org 
148
        # return HttpChannel('203.17.154.69', 8080, self)
149
        return HttpChannel('127.0.0.1', 8000, self)  # forwarded
150
            
151
152
    def _wake_up_channel(self):
153
        """Try to wake up one channel to send the newly-added request.
154
155
        There may be more than one request pending, and this may cause
156
        more than one channel to take requests.  That's OK; some of
157
        them may be frustrated.
158
        """
159
        from random import shuffle, choice
160
        
161
        # first, wake up any idle channels
162
        done = False
163
        for ch in self.channels:
164
            if not ch.sent_requests:
165
                ch.take_one()
166
                done = True
167
        if done:
168
            debug("woke existing idle channel(s)")
169
            return
170
171
        if len(self.channels) < self.max_channels:
172
            newch = self._make_channel()
173
            self.channels.append(newch)
174
            newch.take_one()
175
            debug("created new channel")
176
            return
177
178
        if self.try_pipelined:
179
            # ask existing channels to take it
180
            debug("woke busy channel")
181
            choice(self.channels).take_one()
182
183
184
        # debug("request postponed until a channel's idle")
185
        
186
187
188
189
    def run(self):
190
        """Run until all outstanding requests have been served."""
191
        #while self.running_requests or self.queued_requests \
192
        #          or not self.channel.is_idle():
193
        #    asyncore.loop(count=1)
194
        asyncore.loop()
195
196
197
198
class Response(object):
199
    """Holds in-flight response."""
200
201
202
203
def _parse_response_http10(header):
204
    from cStringIO import StringIO
205
206
    fp = StringIO(header)
207
    r = Response()
208
209
    r.status = fp.readline().split(" ", 2)
210
    r.headers = mimetools.Message(fp)
211
212
    # we can only(?) expect to do keepalive if we got either a 
213
    # content-length or chunked encoding; otherwise there's no way to know
214
    # when the content ends apart from through the connection close
215
    r.content_type = r.headers.get("content-type")
216
    try:
217
        r.content_length = int(r.headers.get("content-length"))
218
    except (ValueError, TypeError):
219
        r.content_length = None
220
    debug("seen content length of %r" % r.content_length)
221
222
    r.transfer_encoding = r.headers.get("transfer-encoding")
223
    r.content_encoding = r.headers.get("content-encoding")
224
    r.connection_reply = r.headers.get("connection")
225
226
    # TODO: pass status code to consumer?
227
228
    if r.transfer_encoding:
229
        raise NotImplementedError()
230
231
    if r.transfer_encoding:
232
        raise NotImplementedError()
233
234
    if int(r.status[1]) != 200:
235
        debug("can't handle response status %r" % r.status)
236
        raise NotImplementedError()
237
1963.2.6 by Robey Pointer
pychecker is on crack; go back to using 'is None'.
238
    if r.content_length is None:
1185.1.29 by Robert Collins
merge merge tweaks from aaron, which includes latest .dev
239
        raise NotImplementedError()
240
241
    if r.content_length == 0:
242
        raise NotImplementedError()
243
244
    r.content_remaining = r.content_length                
245
246
    return r
247
248
249
    
250
    
251
        
252
253
254
class HttpChannel(asyncore.dispatcher_with_send):
255
    """One http socket, pipelining if possible."""
256
    # asynchronous http client
257
258
    user_agent = "http_client.py 1.3ka (based on effbot)"
259
260
    proxies = urllib.getproxies()
261
262
    def __init__(self, ip_host, ip_port, manager):
263
        asyncore.dispatcher_with_send.__init__(self)
264
        self.manager = manager
265
266
        # if a response header has been seen, this holds it
267
        self.response = None
268
        
269
        self.data = ""
270
271
        self.chunk_size = None
272
273
        self.timestamp = time.time()
274
275
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
276
        debug('connecting...')
277
        self.connect((ip_host, ip_port))
278
279
        # sent_requests holds (url, consumer) 
280
        self.sent_requests = []
281
282
        self._outbuf = ''
283
284
285
    def __repr__(self):
286
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)
287
288
289
    def is_idle(self):
290
        return (not self.sent_requests)
291
292
293
    def handle_connect(self):
294
        debug("connected")
295
        self.take_one()
296
297
298
    def take_one(self):
299
        """Accept one request from the manager if possible."""
300
        if self.manager.try_pipelined:
301
            if len(self.sent_requests) > 4:
302
                return
303
        else:
304
            if len(self.sent_requests) > 0:
305
                return 
306
        
307
        try:
308
            url, consumer = self.manager.queued_requests.pop(0)
309
            debug('request accepted by channel')
310
        except IndexError:
311
            return
312
        
313
        # TODO: If there are too many already in flight, don't take one.
314
        # TODO: If the socket's not writable (tx buffer full), don't take.
315
        self._push_request_http10(url, consumer)
316
317
318
319
    def _push_request_http10(self, url, consumer):
320
        """Send a request, and add it to the outstanding queue."""
321
        # TODO: check the url requested is appropriate for this connection
322
323
        # TODO: If there are too many requests outstanding or (less likely) the 
324
        # connection fails, queue it for later use.
325
326
        # TODO: Keep track of requests that have been sent but not yet fulfilled,
327
        # because we might need to retransmit them if the connection fails. (Or
328
        # should the caller do that?)
329
330
        request = self._form_request_http10(url)
331
        debug('send request for %s from %r' % (url, self))
332
333
        # dispatcher_with_send handles buffering the data until it can
334
        # be written, and hooks handle_write.
335
336
        self.send(request)
337
338
        self.sent_requests.append((url, consumer))
339
340
341
    def _form_request_http10(self, url):
342
        # TODO: get right vhost name
343
        request = [
344
            "GET %s HTTP/1.0" % (url),
345
            "Host: www.bazaar-ng.org",
346
            ]
347
348
        if self.manager.try_keepalive or self.manager.try_pipelined:
349
            request.extend([
350
                "Keep-Alive: 60", 
351
                "Connection: keep-alive",
352
                ])
353
354
        # make sure to include a user agent
355
        for header in request:
356
            if string.lower(header).startswith("user-agent:"):
357
                break
358
        else:
359
            request.append("User-Agent: %s" % self.user_agent)
360
361
        return string.join(request, "\r\n") + "\r\n\r\n"
362
363
364
    def handle_read(self):
365
        # handle incoming data
366
        data = self.recv(2048)
367
368
        self.data = self.data + data
369
370
        if len(data):
371
            debug('got %d bytes from socket' % len(data))
372
        else:
373
            debug('server closed connection')
374
375
        while self.data:
376
            consumer = self.sent_requests[0][1]
377
            if not self.response:
378
                # do not have a full response header yet
379
380
                # check if we've seen a full header
381
                debug('getting header for %s' % self.sent_requests[0][0])
382
383
                header = self.data.split("\r\n\r\n", 1)
384
                if len(header) <= 1:
385
                    return
386
                header, self.data = header
387
388
                self.response = _parse_response_http10(header)
389
                self.content_remaining = self.response.content_length
390
391
            if not self.data:
392
                return
393
394
            # we now know how many (more) content bytes we have, and how much
395
            # is in the data buffer. there are two main possibilities:
396
            # too much data, and some must be left behind containing the next
397
            # response headers, or too little, or possibly just right
398
399
            want = self.content_remaining
400
            if want > 0:
401
                got_data = self.data[:want]
402
                self.data = self.data[want:]
403
                
404
                assert got_data
405
406
                self.content_remaining -= len(got_data)
407
408
                debug('pass back %d bytes of %s, %d remain'
409
                      % (len(got_data),
410
                         self.sent_requests[0][0],
411
                         self.content_remaining))
412
                consumer.feed(data)
413
414
            if self.content_remaining == 0:
415
                del self.sent_requests[0]
416
417
                debug('content complete')
418
                consumer.content_complete()
419
                
420
                # reset lots of things and try to get the next response header
421
                if self.response.connection_reply == 'close':
422
                    debug('server requested close')
423
                    self.manager._channel_closed(self)
424
                    self.close()
425
                elif not self.manager.try_keepalive:
426
                    debug('no keepalive for this socket')
427
                    self.manager._channel_closed(self)
428
                    self.close()
429
                else:
430
                    debug("ready for next header...")
431
                    self.take_one()
432
                self.response = None
433
434
435
436
    def handle_close(self):
437
        debug('async told us of close on %r' % self)
438
        # if there are outstanding requests should probably reopen and 
439
        # retransmit, but if we're not making any progress then give up
440
        self.manager._channel_closed(self)
441
        self.close()
442
443
444
class DummyConsumer:
445
    def __init__(self, url, pb):
446
        self.url = url
447
        self.outf = None
448
        self._pb = pb
449
450
    def feed(self, data):
451
        # print "feed", repr(data)
452
        # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
453
        if not self.outf:
454
            base = self.url[self.url.rindex('/')+1:]
455
            self.outf = file('/tmp/download/' + base, 'wb')
456
        self.outf.write(data)
457
458
    def error(self, err_info):
459
        import traceback
460
        error('error reported to consumer')
461
        traceback.print_exception(err_info[0], err_info[1], err_info[2])
462
        sys.exit(1)
463
464
    def content_complete(self):
465
        info('content complete from %s' % self.url)
466
        self.outf.close()
467
        self.outf = None
468
        # using last_cnt is cheating
469
        self._pb.update('downloading inventory',
470
                        self._pb.last_cnt+1,
471
                        self._pb.last_total)
472
473
474
475
if __name__ == "__main__":
476
    logging.basicConfig(level=logging.DEBUG)
477
478
    mgr = DownloadManager()
479
480
    from bzrlib.branch import Branch
481
    from bzrlib.progress import ProgressBar
482
483
    pb = ProgressBar()
484
    revs = Branch('/home/mbp/work/bzr').revision_history()
485
    pb.update('downloading inventories', 0, len(revs))
486
487
    for rev in revs:
488
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
489
              + rev + '.gz'
490
        mgr.enqueue(url, DummyConsumer(url, pb))
491
492
    mgr.run()
493
    
494
495
496
    
497
#     for url in ['http://www.bazaar-ng.org/',
498
#                 'http://www.bazaar-ng.org/tutorial.html',
499
#                 'http://www.bazaar-ng.org/download.html',
500
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
501
#                 ]: