~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to urlgrabber/keepalive.py

  • Committer: Martin Pool
  • Date: 2005-03-15 05:19:54 UTC
  • Revision ID: mbp@sourcefrog.net-20050315051954-e4bdd6dfd26f8ecf
witty comment

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#   This library is free software; you can redistribute it and/or
2
 
#   modify it under the terms of the GNU Lesser General Public
3
 
#   License as published by the Free Software Foundation; either
4
 
#   version 2.1 of the License, or (at your option) any later version.
5
 
#
6
 
#   This library is distributed in the hope that it will be useful,
7
 
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
8
 
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
9
 
#   Lesser General Public License for more details.
10
 
#
11
 
#   You should have received a copy of the GNU Lesser General Public
12
 
#   License along with this library; if not, write to the 
13
 
#      Free Software Foundation, Inc., 
14
 
#      59 Temple Place, Suite 330, 
15
 
#      Boston, MA  02111-1307  USA
16
 
 
17
 
# This file is part of urlgrabber, a high-level cross-protocol url-grabber
18
 
# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
19
 
 
20
 
"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
21
 
 
22
 
>>> import urllib2
23
 
>>> from keepalive import HTTPHandler
24
 
>>> keepalive_handler = HTTPHandler()
25
 
>>> opener = urllib2.build_opener(keepalive_handler)
26
 
>>> urllib2.install_opener(opener)
27
 
>>> 
28
 
>>> fo = urllib2.urlopen('http://www.python.org')
29
 
 
30
 
If a connection to a given host is requested, and all of the existing
31
 
connections are still in use, another connection will be opened.  If
32
 
the handler tries to use an existing connection but it fails in some
33
 
way, it will be closed and removed from the pool.
34
 
 
35
 
To remove the handler, simply re-run build_opener with no arguments, and
36
 
install that opener.
37
 
 
38
 
You can explicitly close connections by using the close_connection()
39
 
method of the returned file-like object (described below) or you can
40
 
use the handler methods:
41
 
 
42
 
  close_connection(host)
43
 
  close_all()
44
 
  open_connections()
45
 
 
46
 
NOTE: using the close_connection and close_all methods of the handler
47
 
should be done with care when using multiple threads.
48
 
  * there is nothing that prevents another thread from creating new
49
 
    connections immediately after connections are closed
50
 
  * no checks are done to prevent in-use connections from being closed
51
 
 
52
 
>>> keepalive_handler.close_all()
53
 
 
54
 
EXTRA ATTRIBUTES AND METHODS
55
 
 
56
 
  Upon a status of 200, the object returned has a few additional
57
 
  attributes and methods, which should not be used if you want to
58
 
  remain consistent with the normal urllib2-returned objects:
59
 
 
60
 
    close_connection()  -  close the connection to the host
61
 
    readlines()         -  you know, readlines()
62
 
    status              -  the return status (ie 404)
63
 
    reason              -  english translation of status (ie 'File not found')
64
 
 
65
 
  If you want the best of both worlds, use this inside an
66
 
  AttributeError-catching try:
67
 
 
68
 
  >>> try: status = fo.status
69
 
  >>> except AttributeError: status = None
70
 
 
71
 
  Unfortunately, these are ONLY there if status == 200, so it's not
72
 
  easy to distinguish between non-200 responses.  The reason is that
73
 
  urllib2 tries to do clever things with error codes 301, 302, 401,
74
 
  and 407, and it wraps the object upon return.
75
 
 
76
 
  For python versions earlier than 2.4, you can avoid this fancy error
77
 
  handling by setting the module-level global HANDLE_ERRORS to zero.
78
 
  You see, prior to 2.4, it's the HTTP Handler's job to determine what
79
 
  to handle specially, and what to just pass up.  HANDLE_ERRORS == 0
80
 
  means "pass everything up".  In python 2.4, however, this job no
81
 
  longer belongs to the HTTP Handler and is now done by a NEW handler,
82
 
  HTTPErrorProcessor.  Here's the bottom line:
83
 
 
84
 
    python version < 2.4
85
 
        HANDLE_ERRORS == 1  (default) pass up 200, treat the rest as
86
 
                            errors
87
 
        HANDLE_ERRORS == 0  pass everything up, error processing is
88
 
                            left to the calling code
89
 
    python version >= 2.4
90
 
        HANDLE_ERRORS == 1  pass up 200, treat the rest as errors
91
 
        HANDLE_ERRORS == 0  (default) pass everything up, let the
92
 
                            other handlers (specifically,
93
 
                            HTTPErrorProcessor) decide what to do
94
 
 
95
 
  In practice, setting the variable either way makes little difference
96
 
  in python 2.4, so for the most consistent behavior across versions,
97
 
  you probably just want to use the defaults, which will give you
98
 
  exceptions on errors.
99
 
 
100
 
"""
101
 
 
102
 
# $Id: keepalive.py,v 1.9 2005/02/14 21:55:07 mstenner Exp $
103
 
 
104
 
import urllib2
105
 
import httplib
106
 
import socket
107
 
import thread
108
 
 
109
 
DEBUG = 0
110
 
def DBPRINT(*args): print ' '.join(args)
111
 
 
112
 
import sys
113
 
_python_version = map(int, sys.version.split()[0].split('.'))
114
 
if _python_version < [2, 4]: HANDLE_ERRORS = 1
115
 
else: HANDLE_ERRORS = 0
116
 
    
117
 
class ConnectionManager:
118
 
    """
119
 
    The connection manager must be able to:
120
 
      * keep track of all existing
121
 
      """
122
 
    def __init__(self):
123
 
        self._lock = thread.allocate_lock()
124
 
        self._hostmap = {} # map hosts to a list of connections
125
 
        self._connmap = {} # map connections to host
126
 
        self._readymap = {} # map connection to ready state
127
 
 
128
 
    def add(self, host, connection, ready):
129
 
        self._lock.acquire()
130
 
        try:
131
 
            if not self._hostmap.has_key(host): self._hostmap[host] = []
132
 
            self._hostmap[host].append(connection)
133
 
            self._connmap[connection] = host
134
 
            self._readymap[connection] = ready
135
 
        finally:
136
 
            self._lock.release()
137
 
 
138
 
    def remove(self, connection):
139
 
        self._lock.acquire()
140
 
        try:
141
 
            try:
142
 
                host = self._connmap[connection]
143
 
            except KeyError:
144
 
                pass
145
 
            else:
146
 
                del self._connmap[connection]
147
 
                del self._readymap[connection]
148
 
                self._hostmap[host].remove(connection)
149
 
                if not self._hostmap[host]: del self._hostmap[host]
150
 
        finally:
151
 
            self._lock.release()
152
 
 
153
 
    def set_ready(self, connection, ready):
154
 
        try: self._readymap[connection] = ready
155
 
        except KeyError: pass
156
 
        
157
 
    def get_ready_conn(self, host):
158
 
        conn = None
159
 
        self._lock.acquire()
160
 
        try:
161
 
            if self._hostmap.has_key(host):
162
 
                for c in self._hostmap[host]:
163
 
                    if self._readymap[c]:
164
 
                        self._readymap[c] = 0
165
 
                        conn = c
166
 
                        break
167
 
        finally:
168
 
            self._lock.release()
169
 
        return conn
170
 
 
171
 
    def get_all(self, host=None):
172
 
        if host:
173
 
            return list(self._hostmap.get(host, []))
174
 
        else:
175
 
            return dict(self._hostmap)
176
 
 
177
 
class HTTPHandler(urllib2.HTTPHandler):
178
 
    def __init__(self):
179
 
        self._cm = ConnectionManager()
180
 
        
181
 
    #### Connection Management
182
 
    def open_connections(self):
183
 
        """return a list of connected hosts and the number of connections
184
 
        to each.  [('foo.com:80', 2), ('bar.org', 1)]"""
185
 
        return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
186
 
 
187
 
    def close_connection(self, host):
188
 
        """close connection(s) to <host>
189
 
        host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
190
 
        no error occurs if there is no connection to that host."""
191
 
        for h in self._cm.get_all(host):
192
 
            self._cm.remove(h)
193
 
            h.close()
194
 
        
195
 
    def close_all(self):
196
 
        """close all open connections"""
197
 
        for host, conns in self._cm.get_all().items():
198
 
            for h in conns:
199
 
                self._cm.remove(h)
200
 
                h.close()
201
 
        
202
 
    def _request_closed(self, request, host, connection):
203
 
        """tells us that this request is now closed and the the
204
 
        connection is ready for another request"""
205
 
        self._cm.set_ready(connection, 1)
206
 
 
207
 
    def _remove_connection(self, host, connection, close=0):
208
 
        if close: connection.close()
209
 
        self._cm.remove(connection)
210
 
        
211
 
    #### Transaction Execution
212
 
    def http_open(self, req):
213
 
        return self.do_open(HTTPConnection, req)
214
 
 
215
 
    def do_open(self, http_class, req):
216
 
        host = req.get_host()
217
 
        if not host:
218
 
            raise urllib2.URLError('no host given')
219
 
 
220
 
        try:
221
 
            h = self._cm.get_ready_conn(host)
222
 
            while h:
223
 
                r = self._reuse_connection(h, req, host)
224
 
 
225
 
                # if this response is non-None, then it worked and we're
226
 
                # done.  Break out, skipping the else block.
227
 
                if r: break
228
 
 
229
 
                # connection is bad - possibly closed by server
230
 
                # discard it and ask for the next free connection
231
 
                h.close()
232
 
                self._cm.remove(h)
233
 
                h = self._cm.get_ready_conn(host)
234
 
            else:
235
 
                # no (working) free connections were found.  Create a new one.
236
 
                h = http_class(host)
237
 
                if DEBUG: DBPRINT("creating new connection to %s (%d)" % \
238
 
                                  (host, id(h)))
239
 
                self._cm.add(host, h, 0)
240
 
                self._start_transaction(h, req)
241
 
                r = h.getresponse()
242
 
        except (socket.error, httplib.HTTPException), err:
243
 
            raise urllib2.URLError(err)
244
 
            
245
 
        # if not a persistent connection, don't try to reuse it
246
 
        if r.will_close: self._cm.remove(h)
247
 
 
248
 
        if DEBUG: DBPRINT("STATUS: %s, %s" % (r.status, r.reason))
249
 
        r._handler = self
250
 
        r._host = host
251
 
        r._url = req.get_full_url()
252
 
        r._connection = h
253
 
        r.code = r.status
254
 
        
255
 
        if r.status == 200 or not HANDLE_ERRORS:
256
 
            return r
257
 
        else:
258
 
            return self.parent.error('http', req, r, r.status, r.reason, r.msg)
259
 
 
260
 
 
261
 
    def _reuse_connection(self, h, req, host):
262
 
        """start the transaction with a re-used connection
263
 
        return a response object (r) upon success or None on failure.
264
 
        This DOES not close or remove bad connections in cases where
265
 
        it returns.  However, if an unexpected exception occurs, it
266
 
        will close and remove the connection before re-raising.
267
 
        """
268
 
        try:
269
 
            self._start_transaction(h, req)
270
 
            r = h.getresponse()
271
 
            # note: just because we got something back doesn't mean it
272
 
            # worked.  We'll check the version below, too.
273
 
        except (socket.error, httplib.HTTPException):
274
 
            r = None
275
 
        except:
276
 
            # adding this block just in case we've missed
277
 
            # something we will still raise the exception, but
278
 
            # lets try and close the connection and remove it
279
 
            # first.  We previously got into a nasty loop
280
 
            # where an exception was uncaught, and so the
281
 
            # connection stayed open.  On the next try, the
282
 
            # same exception was raised, etc.  The tradeoff is
283
 
            # that it's now possible this call will raise
284
 
            # a DIFFERENT exception
285
 
            if DEBUG: DBPRINT("unexpected exception - " \
286
 
                              "closing connection to %s (%d)" % (host, id(h)))
287
 
            self._cm.remove(h)
288
 
            h.close()
289
 
            raise
290
 
                    
291
 
        if r is None or r.version == 9:
292
 
            # httplib falls back to assuming HTTP 0.9 if it gets a
293
 
            # bad header back.  This is most likely to happen if
294
 
            # the socket has been closed by the server since we
295
 
            # last used the connection.
296
 
            if DEBUG: DBPRINT("failed to re-use connection to %s (%d)" \
297
 
                              % (host, id(h)))
298
 
            r = None
299
 
        else:
300
 
            if DEBUG: DBPRINT("re-using connection to %s (%d)" % (host, id(h)))
301
 
 
302
 
        return r
303
 
 
304
 
    def _start_transaction(self, h, req):
305
 
        try:
306
 
            if req.has_data():
307
 
                data = req.get_data()
308
 
                h.putrequest('POST', req.get_selector())
309
 
                if not req.headers.has_key('Content-type'):
310
 
                    h.putheader('Content-type',
311
 
                                'application/x-www-form-urlencoded')
312
 
                if not req.headers.has_key('Content-length'):
313
 
                    h.putheader('Content-length', '%d' % len(data))
314
 
            else:
315
 
                h.putrequest('GET', req.get_selector())
316
 
        except (socket.error, httplib.HTTPException), err:
317
 
            raise urllib2.URLError(err)
318
 
 
319
 
        for args in self.parent.addheaders:
320
 
            h.putheader(*args)
321
 
        for k, v in req.headers.items():
322
 
            h.putheader(k, v)
323
 
        h.endheaders()
324
 
        if req.has_data():
325
 
            h.send(data)
326
 
 
327
 
class HTTPResponse(httplib.HTTPResponse):
328
 
    # we need to subclass HTTPResponse in order to
329
 
    # 1) add readline() and readlines() methods
330
 
    # 2) add close_connection() methods
331
 
    # 3) add info() and geturl() methods
332
 
 
333
 
    # in order to add readline(), read must be modified to deal with a
334
 
    # buffer.  example: readline must read a buffer and then spit back
335
 
    # one line at a time.  The only real alternative is to read one
336
 
    # BYTE at a time (ick).  Once something has been read, it can't be
337
 
    # put back (ok, maybe it can, but that's even uglier than this),
338
 
    # so if you THEN do a normal read, you must first take stuff from
339
 
    # the buffer.
340
 
 
341
 
    # the read method wraps the original to accomodate buffering,
342
 
    # although read() never adds to the buffer.
343
 
    # Both readline and readlines have been stolen with almost no
344
 
    # modification from socket.py
345
 
    
346
 
 
347
 
    def __init__(self, sock, debuglevel=0, strict=0, method=None):
348
 
        if method: # the httplib in python 2.3 uses the method arg
349
 
            httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
350
 
        else: # 2.2 doesn't
351
 
            httplib.HTTPResponse.__init__(self, sock, debuglevel)
352
 
        self.fileno = sock.fileno
353
 
        self.code = None
354
 
        self._rbuf = ''
355
 
        self._rbufsize = 8096
356
 
        self._handler = None # inserted by the handler later
357
 
        self._host = None    # (same)
358
 
        self._url = None     # (same)
359
 
        self._connection = None # (same)
360
 
 
361
 
    _raw_read = httplib.HTTPResponse.read
362
 
 
363
 
    def close(self):
364
 
        if self.fp:
365
 
            self.fp.close()
366
 
            self.fp = None
367
 
            if self._handler:
368
 
                self._handler._request_closed(self, self._host,
369
 
                                              self._connection)
370
 
 
371
 
    def close_connection(self):
372
 
        self._handler._remove_connection(self._host, self._connection, close=1)
373
 
        self.close()
374
 
        
375
 
    def info(self):
376
 
        return self.msg
377
 
 
378
 
    def geturl(self):
379
 
        return self._url
380
 
 
381
 
    def read(self, amt=None):
382
 
        # the _rbuf test is only in this first if for speed.  It's not
383
 
        # logically necessary
384
 
        if self._rbuf and not amt is None:
385
 
            L = len(self._rbuf)
386
 
            if amt > L:
387
 
                amt -= L
388
 
            else:
389
 
                s = self._rbuf[:amt]
390
 
                self._rbuf = self._rbuf[amt:]
391
 
                return s
392
 
 
393
 
        s = self._rbuf + self._raw_read(amt)
394
 
        self._rbuf = ''
395
 
        return s
396
 
 
397
 
    def readline(self, limit=-1):
398
 
        data = ""
399
 
        i = self._rbuf.find('\n')
400
 
        while i < 0 and not (0 < limit <= len(self._rbuf)):
401
 
            new = self._raw_read(self._rbufsize)
402
 
            if not new: break
403
 
            i = new.find('\n')
404
 
            if i >= 0: i = i + len(self._rbuf)
405
 
            self._rbuf = self._rbuf + new
406
 
        if i < 0: i = len(self._rbuf)
407
 
        else: i = i+1
408
 
        if 0 <= limit < len(self._rbuf): i = limit
409
 
        data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
410
 
        return data
411
 
 
412
 
    def readlines(self, sizehint = 0):
413
 
        total = 0
414
 
        list = []
415
 
        while 1:
416
 
            line = self.readline()
417
 
            if not line: break
418
 
            list.append(line)
419
 
            total += len(line)
420
 
            if sizehint and total >= sizehint:
421
 
                break
422
 
        return list
423
 
 
424
 
 
425
 
class HTTPConnection(httplib.HTTPConnection):
426
 
    # use the modified response class
427
 
    response_class = HTTPResponse
428
 
    
429
 
#########################################################################
430
 
#####   TEST FUNCTIONS
431
 
#########################################################################
432
 
 
433
 
def error_handler(url):
434
 
    global HANDLE_ERRORS
435
 
    orig = HANDLE_ERRORS
436
 
    keepalive_handler = HTTPHandler()
437
 
    opener = urllib2.build_opener(keepalive_handler)
438
 
    urllib2.install_opener(opener)
439
 
    pos = {0: 'off', 1: 'on'}
440
 
    for i in (0, 1):
441
 
        print "  fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
442
 
        HANDLE_ERRORS = i
443
 
        try:
444
 
            fo = urllib2.urlopen(url)
445
 
            foo = fo.read()
446
 
            fo.close()
447
 
            try: status, reason = fo.status, fo.reason
448
 
            except AttributeError: status, reason = None, None
449
 
        except IOError, e:
450
 
            print "  EXCEPTION: %s" % e
451
 
            raise
452
 
        else:
453
 
            print "  status = %s, reason = %s" % (status, reason)
454
 
    HANDLE_ERRORS = orig
455
 
    hosts = keepalive_handler.open_connections()
456
 
    print "open connections:", hosts
457
 
    keepalive_handler.close_all()
458
 
 
459
 
def continuity(url):
460
 
    import md5
461
 
    format = '%25s: %s'
462
 
    
463
 
    # first fetch the file with the normal http handler
464
 
    opener = urllib2.build_opener()
465
 
    urllib2.install_opener(opener)
466
 
    fo = urllib2.urlopen(url)
467
 
    foo = fo.read()
468
 
    fo.close()
469
 
    m = md5.new(foo)
470
 
    print format % ('normal urllib', m.hexdigest())
471
 
 
472
 
    # now install the keepalive handler and try again
473
 
    opener = urllib2.build_opener(HTTPHandler())
474
 
    urllib2.install_opener(opener)
475
 
 
476
 
    fo = urllib2.urlopen(url)
477
 
    foo = fo.read()
478
 
    fo.close()
479
 
    m = md5.new(foo)
480
 
    print format % ('keepalive read', m.hexdigest())
481
 
 
482
 
    fo = urllib2.urlopen(url)
483
 
    foo = ''
484
 
    while 1:
485
 
        f = fo.readline()
486
 
        if f: foo = foo + f
487
 
        else: break
488
 
    fo.close()
489
 
    m = md5.new(foo)
490
 
    print format % ('keepalive readline', m.hexdigest())
491
 
 
492
 
def comp(N, url):
493
 
    print '  making %i connections to:\n  %s' % (N, url)
494
 
 
495
 
    sys.stdout.write('  first using the normal urllib handlers')
496
 
    # first use normal opener
497
 
    opener = urllib2.build_opener()
498
 
    urllib2.install_opener(opener)
499
 
    t1 = fetch(N, url)
500
 
    print '  TIME: %.3f s' % t1
501
 
 
502
 
    sys.stdout.write('  now using the keepalive handler       ')
503
 
    # now install the keepalive handler and try again
504
 
    opener = urllib2.build_opener(HTTPHandler())
505
 
    urllib2.install_opener(opener)
506
 
    t2 = fetch(N, url)
507
 
    print '  TIME: %.3f s' % t2
508
 
    print '  improvement factor: %.2f' % (t1/t2, )
509
 
    
510
 
def fetch(N, url, delay=0):
511
 
    lens = []
512
 
    starttime = time.time()
513
 
    for i in range(N):
514
 
        if delay and i > 0: time.sleep(delay)
515
 
        fo = urllib2.urlopen(url)
516
 
        foo = fo.read()
517
 
        fo.close()
518
 
        lens.append(len(foo))
519
 
    diff = time.time() - starttime
520
 
 
521
 
    j = 0
522
 
    for i in lens[1:]:
523
 
        j = j + 1
524
 
        if not i == lens[0]:
525
 
            print "WARNING: inconsistent length on read %i: %i" % (j, i)
526
 
 
527
 
    return diff
528
 
 
529
 
def test_timeout(url):
530
 
    global DEBUG, DBPRINT
531
 
    dbp = DBPRINT
532
 
    def DBPRINT(*args): print '    ' + ' '.join(args)
533
 
    DEBUG=1
534
 
    print "  fetching the file to establish a connection"
535
 
    fo = urllib2.urlopen(url)
536
 
    data1 = fo.read()
537
 
    fo.close()
538
 
 
539
 
    i = 20
540
 
    print "  waiting %i seconds for the server to close the connection" % i
541
 
    while i > 0:
542
 
        sys.stdout.write('\r  %2i' % i)
543
 
        sys.stdout.flush()
544
 
        time.sleep(1)
545
 
        i -= 1
546
 
    sys.stderr.write('\r')
547
 
 
548
 
    print "  fetching the file a second time"
549
 
    fo = urllib2.urlopen(url)
550
 
    data2 = fo.read()
551
 
    fo.close()
552
 
 
553
 
    if data1 == data2:
554
 
        print '  data are identical'
555
 
    else:
556
 
        print '  ERROR: DATA DIFFER'
557
 
 
558
 
    DEBUG=0
559
 
    DBPRINT = dbp
560
 
 
561
 
    
562
 
def test(url, N=10):
563
 
    print "checking error hander (do this on a non-200)"
564
 
    try: error_handler(url)
565
 
    except IOError, e:
566
 
        print "exiting - exception will prevent further tests"
567
 
        sys.exit()
568
 
    print
569
 
    print "performing continuity test (making sure stuff isn't corrupted)"
570
 
    continuity(url)
571
 
    print
572
 
    print "performing speed comparison"
573
 
    comp(N, url)
574
 
    print
575
 
    print "performing dropped-connection check"
576
 
    test_timeout(url)
577
 
    
578
 
if __name__ == '__main__':
579
 
    import time
580
 
    import sys
581
 
    try:
582
 
        N = int(sys.argv[1])
583
 
        url = sys.argv[2]
584
 
    except:
585
 
        print "%s <integer> <url>" % sys.argv[0]
586
 
    else:
587
 
        test(url, N)