~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to urlgrabber/keepalive.py

Review comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#   This library is free software; you can redistribute it and/or
2
 
#   modify it under the terms of the GNU Lesser General Public
3
 
#   License as published by the Free Software Foundation; either
4
 
#   version 2.1 of the License, or (at your option) any later version.
5
 
#
6
 
#   This library is distributed in the hope that it will be useful,
7
 
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
8
 
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
9
 
#   Lesser General Public License for more details.
10
 
#
11
 
#   You should have received a copy of the GNU Lesser General Public
12
 
#   License along with this library; if not, write to the 
13
 
#      Free Software Foundation, Inc., 
14
 
#      59 Temple Place, Suite 330, 
15
 
#      Boston, MA  02111-1307  USA
16
 
 
17
 
# This file is part of urlgrabber, a high-level cross-protocol url-grabber
18
 
# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
19
 
 
20
 
"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
21
 
 
22
 
>>> import urllib2
23
 
>>> from keepalive import HTTPHandler
24
 
>>> keepalive_handler = HTTPHandler()
25
 
>>> opener = urllib2.build_opener(keepalive_handler)
26
 
>>> urllib2.install_opener(opener)
27
 
>>> 
28
 
>>> fo = urllib2.urlopen('http://www.python.org')
29
 
 
30
 
If a connection to a given host is requested, and all of the existing
31
 
connections are still in use, another connection will be opened.  If
32
 
the handler tries to use an existing connection but it fails in some
33
 
way, it will be closed and removed from the pool.
34
 
 
35
 
To remove the handler, simply re-run build_opener with no arguments, and
36
 
install that opener.
37
 
 
38
 
You can explicitly close connections by using the close_connection()
39
 
method of the returned file-like object (described below) or you can
40
 
use the handler methods:
41
 
 
42
 
  close_connection(host)
43
 
  close_all()
44
 
  open_connections()
45
 
 
46
 
NOTE: using the close_connection and close_all methods of the handler
47
 
should be done with care when using multiple threads.
48
 
  * there is nothing that prevents another thread from creating new
49
 
    connections immediately after connections are closed
50
 
  * no checks are done to prevent in-use connections from being closed
51
 
 
52
 
>>> keepalive_handler.close_all()
53
 
 
54
 
EXTRA ATTRIBUTES AND METHODS
55
 
 
56
 
  Upon a status of 200, the object returned has a few additional
57
 
  attributes and methods, which should not be used if you want to
58
 
  remain consistent with the normal urllib2-returned objects:
59
 
 
60
 
    close_connection()  -  close the connection to the host
61
 
    readlines()         -  you know, readlines()
62
 
    status              -  the return status (ie 404)
63
 
    reason              -  english translation of status (ie 'File not found')
64
 
 
65
 
  If you want the best of both worlds, use this inside an
66
 
  AttributeError-catching try:
67
 
 
68
 
  >>> try: status = fo.status
69
 
  >>> except AttributeError: status = None
70
 
 
71
 
  Unfortunately, these are ONLY there if status == 200, so it's not
72
 
  easy to distinguish between non-200 responses.  The reason is that
73
 
  urllib2 tries to do clever things with error codes 301, 302, 401,
74
 
  and 407, and it wraps the object upon return.
75
 
 
76
 
  For python versions earlier than 2.4, you can avoid this fancy error
77
 
  handling by setting the module-level global HANDLE_ERRORS to zero.
78
 
  You see, prior to 2.4, it's the HTTP Handler's job to determine what
79
 
  to handle specially, and what to just pass up.  HANDLE_ERRORS == 0
80
 
  means "pass everything up".  In python 2.4, however, this job no
81
 
  longer belongs to the HTTP Handler and is now done by a NEW handler,
82
 
  HTTPErrorProcessor.  Here's the bottom line:
83
 
 
84
 
    python version < 2.4
85
 
        HANDLE_ERRORS == 1  (default) pass up 200, treat the rest as
86
 
                            errors
87
 
        HANDLE_ERRORS == 0  pass everything up, error processing is
88
 
                            left to the calling code
89
 
    python version >= 2.4
90
 
        HANDLE_ERRORS == 1  pass up 200, treat the rest as errors
91
 
        HANDLE_ERRORS == 0  (default) pass everything up, let the
92
 
                            other handlers (specifically,
93
 
                            HTTPErrorProcessor) decide what to do
94
 
 
95
 
  In practice, setting the variable either way makes little difference
96
 
  in python 2.4, so for the most consistent behavior across versions,
97
 
  you probably just want to use the defaults, which will give you
98
 
  exceptions on errors.
99
 
 
100
 
"""
101
 
 
102
 
# $Id: keepalive.py,v 1.9 2005/02/14 21:55:07 mstenner Exp $
103
 
 
104
 
import urllib2
105
 
import httplib
106
 
import socket
107
 
import thread
108
 
 
109
 
DEBUG = 0
110
 
def DBPRINT(*args): print ' '.join(args)
111
 
 
112
 
import sys
113
 
if hasattr(sys, 'version_info'):
114
 
    _python_version = sys.version_info
115
 
else:
116
 
    _python_version = map(int, sys.version.split()[0].split('.'))
117
 
if _python_version < [2, 4]: HANDLE_ERRORS = 1
118
 
else: HANDLE_ERRORS = 0
119
 
    
120
 
class ConnectionManager:
121
 
    """
122
 
    The connection manager must be able to:
123
 
      * keep track of all existing
124
 
      """
125
 
    def __init__(self):
126
 
        self._lock = thread.allocate_lock()
127
 
        self._hostmap = {} # map hosts to a list of connections
128
 
        self._connmap = {} # map connections to host
129
 
        self._readymap = {} # map connection to ready state
130
 
 
131
 
    def add(self, host, connection, ready):
132
 
        self._lock.acquire()
133
 
        try:
134
 
            if not self._hostmap.has_key(host): self._hostmap[host] = []
135
 
            self._hostmap[host].append(connection)
136
 
            self._connmap[connection] = host
137
 
            self._readymap[connection] = ready
138
 
        finally:
139
 
            self._lock.release()
140
 
 
141
 
    def remove(self, connection):
142
 
        self._lock.acquire()
143
 
        try:
144
 
            try:
145
 
                host = self._connmap[connection]
146
 
            except KeyError:
147
 
                pass
148
 
            else:
149
 
                del self._connmap[connection]
150
 
                del self._readymap[connection]
151
 
                self._hostmap[host].remove(connection)
152
 
                if not self._hostmap[host]: del self._hostmap[host]
153
 
        finally:
154
 
            self._lock.release()
155
 
 
156
 
    def set_ready(self, connection, ready):
157
 
        try: self._readymap[connection] = ready
158
 
        except KeyError: pass
159
 
        
160
 
    def get_ready_conn(self, host):
161
 
        conn = None
162
 
        self._lock.acquire()
163
 
        try:
164
 
            if self._hostmap.has_key(host):
165
 
                for c in self._hostmap[host]:
166
 
                    if self._readymap[c]:
167
 
                        self._readymap[c] = 0
168
 
                        conn = c
169
 
                        break
170
 
        finally:
171
 
            self._lock.release()
172
 
        return conn
173
 
 
174
 
    def get_all(self, host=None):
175
 
        if host:
176
 
            return list(self._hostmap.get(host, []))
177
 
        else:
178
 
            return dict(self._hostmap)
179
 
 
180
 
class HTTPHandler(urllib2.HTTPHandler):
181
 
    def __init__(self):
182
 
        self._cm = ConnectionManager()
183
 
        
184
 
    #### Connection Management
185
 
    def open_connections(self):
186
 
        """return a list of connected hosts and the number of connections
187
 
        to each.  [('foo.com:80', 2), ('bar.org', 1)]"""
188
 
        return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
189
 
 
190
 
    def close_connection(self, host):
191
 
        """close connection(s) to <host>
192
 
        host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
193
 
        no error occurs if there is no connection to that host."""
194
 
        for h in self._cm.get_all(host):
195
 
            self._cm.remove(h)
196
 
            h.close()
197
 
        
198
 
    def close_all(self):
199
 
        """close all open connections"""
200
 
        for host, conns in self._cm.get_all().items():
201
 
            for h in conns:
202
 
                self._cm.remove(h)
203
 
                h.close()
204
 
        
205
 
    def _request_closed(self, request, host, connection):
206
 
        """tells us that this request is now closed and the the
207
 
        connection is ready for another request"""
208
 
        self._cm.set_ready(connection, 1)
209
 
 
210
 
    def _remove_connection(self, host, connection, close=0):
211
 
        if close: connection.close()
212
 
        self._cm.remove(connection)
213
 
        
214
 
    #### Transaction Execution
215
 
    def http_open(self, req):
216
 
        return self.do_open(HTTPConnection, req)
217
 
 
218
 
    def do_open(self, http_class, req):
219
 
        host = req.get_host()
220
 
        if not host:
221
 
            raise urllib2.URLError('no host given')
222
 
 
223
 
        try:
224
 
            h = self._cm.get_ready_conn(host)
225
 
            while h:
226
 
                r = self._reuse_connection(h, req, host)
227
 
 
228
 
                # if this response is non-None, then it worked and we're
229
 
                # done.  Break out, skipping the else block.
230
 
                if r: break
231
 
 
232
 
                # connection is bad - possibly closed by server
233
 
                # discard it and ask for the next free connection
234
 
                h.close()
235
 
                self._cm.remove(h)
236
 
                h = self._cm.get_ready_conn(host)
237
 
            else:
238
 
                # no (working) free connections were found.  Create a new one.
239
 
                h = http_class(host)
240
 
                if DEBUG: DBPRINT("creating new connection to %s (%d)" % \
241
 
                                  (host, id(h)))
242
 
                self._cm.add(host, h, 0)
243
 
                self._start_transaction(h, req)
244
 
                r = h.getresponse()
245
 
        except (socket.error, httplib.HTTPException), err:
246
 
            raise urllib2.URLError(err)
247
 
            
248
 
        # if not a persistent connection, don't try to reuse it
249
 
        if r.will_close: self._cm.remove(h)
250
 
 
251
 
        if DEBUG: DBPRINT("STATUS: %s, %s" % (r.status, r.reason))
252
 
        r._handler = self
253
 
        r._host = host
254
 
        r._url = req.get_full_url()
255
 
        r._connection = h
256
 
        r.code = r.status
257
 
        
258
 
        if r.status == 200 or not HANDLE_ERRORS:
259
 
            return r
260
 
        else:
261
 
            return self.parent.error('http', req, r, r.status, r.reason, r.msg)
262
 
 
263
 
 
264
 
    def _reuse_connection(self, h, req, host):
265
 
        """start the transaction with a re-used connection
266
 
        return a response object (r) upon success or None on failure.
267
 
        This DOES not close or remove bad connections in cases where
268
 
        it returns.  However, if an unexpected exception occurs, it
269
 
        will close and remove the connection before re-raising.
270
 
        """
271
 
        try:
272
 
            self._start_transaction(h, req)
273
 
            r = h.getresponse()
274
 
            # note: just because we got something back doesn't mean it
275
 
            # worked.  We'll check the version below, too.
276
 
        except (socket.error, httplib.HTTPException):
277
 
            r = None
278
 
        except:
279
 
            # adding this block just in case we've missed
280
 
            # something we will still raise the exception, but
281
 
            # lets try and close the connection and remove it
282
 
            # first.  We previously got into a nasty loop
283
 
            # where an exception was uncaught, and so the
284
 
            # connection stayed open.  On the next try, the
285
 
            # same exception was raised, etc.  The tradeoff is
286
 
            # that it's now possible this call will raise
287
 
            # a DIFFERENT exception
288
 
            if DEBUG: DBPRINT("unexpected exception - " \
289
 
                              "closing connection to %s (%d)" % (host, id(h)))
290
 
            self._cm.remove(h)
291
 
            h.close()
292
 
            raise
293
 
                    
294
 
        if r is None or r.version == 9:
295
 
            # httplib falls back to assuming HTTP 0.9 if it gets a
296
 
            # bad header back.  This is most likely to happen if
297
 
            # the socket has been closed by the server since we
298
 
            # last used the connection.
299
 
            if DEBUG: DBPRINT("failed to re-use connection to %s (%d)" \
300
 
                              % (host, id(h)))
301
 
            r = None
302
 
        else:
303
 
            if DEBUG: DBPRINT("re-using connection to %s (%d)" % (host, id(h)))
304
 
 
305
 
        return r
306
 
 
307
 
    def _start_transaction(self, h, req):
308
 
        try:
309
 
            if req.has_data():
310
 
                data = req.get_data()
311
 
                h.putrequest('POST', req.get_selector())
312
 
                if not req.headers.has_key('Content-type'):
313
 
                    h.putheader('Content-type',
314
 
                                'application/x-www-form-urlencoded')
315
 
                if not req.headers.has_key('Content-length'):
316
 
                    h.putheader('Content-length', '%d' % len(data))
317
 
            else:
318
 
                h.putrequest('GET', req.get_selector())
319
 
        except (socket.error, httplib.HTTPException), err:
320
 
            raise urllib2.URLError(err)
321
 
 
322
 
        for args in self.parent.addheaders:
323
 
            h.putheader(*args)
324
 
        for k, v in req.headers.items():
325
 
            h.putheader(k, v)
326
 
        h.endheaders()
327
 
        if req.has_data():
328
 
            h.send(data)
329
 
 
330
 
class HTTPResponse(httplib.HTTPResponse):
331
 
    # we need to subclass HTTPResponse in order to
332
 
    # 1) add readline() and readlines() methods
333
 
    # 2) add close_connection() methods
334
 
    # 3) add info() and geturl() methods
335
 
 
336
 
    # in order to add readline(), read must be modified to deal with a
337
 
    # buffer.  example: readline must read a buffer and then spit back
338
 
    # one line at a time.  The only real alternative is to read one
339
 
    # BYTE at a time (ick).  Once something has been read, it can't be
340
 
    # put back (ok, maybe it can, but that's even uglier than this),
341
 
    # so if you THEN do a normal read, you must first take stuff from
342
 
    # the buffer.
343
 
 
344
 
    # the read method wraps the original to accomodate buffering,
345
 
    # although read() never adds to the buffer.
346
 
    # Both readline and readlines have been stolen with almost no
347
 
    # modification from socket.py
348
 
    
349
 
 
350
 
    def __init__(self, sock, debuglevel=0, strict=0, method=None):
351
 
        if method: # the httplib in python 2.3 uses the method arg
352
 
            httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
353
 
        else: # 2.2 doesn't
354
 
            httplib.HTTPResponse.__init__(self, sock, debuglevel)
355
 
        self.fileno = sock.fileno
356
 
        self.code = None
357
 
        self._rbuf = ''
358
 
        self._rbufsize = 8096
359
 
        self._handler = None # inserted by the handler later
360
 
        self._host = None    # (same)
361
 
        self._url = None     # (same)
362
 
        self._connection = None # (same)
363
 
 
364
 
    _raw_read = httplib.HTTPResponse.read
365
 
 
366
 
    def close(self):
367
 
        if self.fp:
368
 
            self.fp.close()
369
 
            self.fp = None
370
 
            if self._handler:
371
 
                self._handler._request_closed(self, self._host,
372
 
                                              self._connection)
373
 
 
374
 
    def close_connection(self):
375
 
        self._handler._remove_connection(self._host, self._connection, close=1)
376
 
        self.close()
377
 
        
378
 
    def info(self):
379
 
        return self.msg
380
 
 
381
 
    def geturl(self):
382
 
        return self._url
383
 
 
384
 
    def read(self, amt=None):
385
 
        # the _rbuf test is only in this first if for speed.  It's not
386
 
        # logically necessary
387
 
        if self._rbuf and not amt is None:
388
 
            L = len(self._rbuf)
389
 
            if amt > L:
390
 
                amt -= L
391
 
            else:
392
 
                s = self._rbuf[:amt]
393
 
                self._rbuf = self._rbuf[amt:]
394
 
                return s
395
 
 
396
 
        s = self._rbuf + self._raw_read(amt)
397
 
        self._rbuf = ''
398
 
        return s
399
 
 
400
 
    def readline(self, limit=-1):
401
 
        data = ""
402
 
        i = self._rbuf.find('\n')
403
 
        while i < 0 and not (0 < limit <= len(self._rbuf)):
404
 
            new = self._raw_read(self._rbufsize)
405
 
            if not new: break
406
 
            i = new.find('\n')
407
 
            if i >= 0: i = i + len(self._rbuf)
408
 
            self._rbuf = self._rbuf + new
409
 
        if i < 0: i = len(self._rbuf)
410
 
        else: i = i+1
411
 
        if 0 <= limit < len(self._rbuf): i = limit
412
 
        data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
413
 
        return data
414
 
 
415
 
    def readlines(self, sizehint = 0):
416
 
        total = 0
417
 
        list = []
418
 
        while 1:
419
 
            line = self.readline()
420
 
            if not line: break
421
 
            list.append(line)
422
 
            total += len(line)
423
 
            if sizehint and total >= sizehint:
424
 
                break
425
 
        return list
426
 
 
427
 
 
428
 
class HTTPConnection(httplib.HTTPConnection):
429
 
    # use the modified response class
430
 
    response_class = HTTPResponse
431
 
    
432
 
#########################################################################
433
 
#####   TEST FUNCTIONS
434
 
#########################################################################
435
 
 
436
 
def error_handler(url):
437
 
    global HANDLE_ERRORS
438
 
    orig = HANDLE_ERRORS
439
 
    keepalive_handler = HTTPHandler()
440
 
    opener = urllib2.build_opener(keepalive_handler)
441
 
    urllib2.install_opener(opener)
442
 
    pos = {0: 'off', 1: 'on'}
443
 
    for i in (0, 1):
444
 
        print "  fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
445
 
        HANDLE_ERRORS = i
446
 
        try:
447
 
            fo = urllib2.urlopen(url)
448
 
            foo = fo.read()
449
 
            fo.close()
450
 
            try: status, reason = fo.status, fo.reason
451
 
            except AttributeError: status, reason = None, None
452
 
        except IOError, e:
453
 
            print "  EXCEPTION: %s" % e
454
 
            raise
455
 
        else:
456
 
            print "  status = %s, reason = %s" % (status, reason)
457
 
    HANDLE_ERRORS = orig
458
 
    hosts = keepalive_handler.open_connections()
459
 
    print "open connections:", hosts
460
 
    keepalive_handler.close_all()
461
 
 
462
 
def continuity(url):
463
 
    import md5
464
 
    format = '%25s: %s'
465
 
    
466
 
    # first fetch the file with the normal http handler
467
 
    opener = urllib2.build_opener()
468
 
    urllib2.install_opener(opener)
469
 
    fo = urllib2.urlopen(url)
470
 
    foo = fo.read()
471
 
    fo.close()
472
 
    m = md5.new(foo)
473
 
    print format % ('normal urllib', m.hexdigest())
474
 
 
475
 
    # now install the keepalive handler and try again
476
 
    opener = urllib2.build_opener(HTTPHandler())
477
 
    urllib2.install_opener(opener)
478
 
 
479
 
    fo = urllib2.urlopen(url)
480
 
    foo = fo.read()
481
 
    fo.close()
482
 
    m = md5.new(foo)
483
 
    print format % ('keepalive read', m.hexdigest())
484
 
 
485
 
    fo = urllib2.urlopen(url)
486
 
    foo = ''
487
 
    while 1:
488
 
        f = fo.readline()
489
 
        if f: foo = foo + f
490
 
        else: break
491
 
    fo.close()
492
 
    m = md5.new(foo)
493
 
    print format % ('keepalive readline', m.hexdigest())
494
 
 
495
 
def comp(N, url):
496
 
    print '  making %i connections to:\n  %s' % (N, url)
497
 
 
498
 
    sys.stdout.write('  first using the normal urllib handlers')
499
 
    # first use normal opener
500
 
    opener = urllib2.build_opener()
501
 
    urllib2.install_opener(opener)
502
 
    t1 = fetch(N, url)
503
 
    print '  TIME: %.3f s' % t1
504
 
 
505
 
    sys.stdout.write('  now using the keepalive handler       ')
506
 
    # now install the keepalive handler and try again
507
 
    opener = urllib2.build_opener(HTTPHandler())
508
 
    urllib2.install_opener(opener)
509
 
    t2 = fetch(N, url)
510
 
    print '  TIME: %.3f s' % t2
511
 
    print '  improvement factor: %.2f' % (t1/t2, )
512
 
    
513
 
def fetch(N, url, delay=0):
514
 
    lens = []
515
 
    starttime = time.time()
516
 
    for i in range(N):
517
 
        if delay and i > 0: time.sleep(delay)
518
 
        fo = urllib2.urlopen(url)
519
 
        foo = fo.read()
520
 
        fo.close()
521
 
        lens.append(len(foo))
522
 
    diff = time.time() - starttime
523
 
 
524
 
    j = 0
525
 
    for i in lens[1:]:
526
 
        j = j + 1
527
 
        if not i == lens[0]:
528
 
            print "WARNING: inconsistent length on read %i: %i" % (j, i)
529
 
 
530
 
    return diff
531
 
 
532
 
def test_timeout(url):
533
 
    global DEBUG, DBPRINT
534
 
    dbp = DBPRINT
535
 
    def DBPRINT(*args): print '    ' + ' '.join(args)
536
 
    DEBUG=1
537
 
    print "  fetching the file to establish a connection"
538
 
    fo = urllib2.urlopen(url)
539
 
    data1 = fo.read()
540
 
    fo.close()
541
 
 
542
 
    i = 20
543
 
    print "  waiting %i seconds for the server to close the connection" % i
544
 
    while i > 0:
545
 
        sys.stdout.write('\r  %2i' % i)
546
 
        sys.stdout.flush()
547
 
        time.sleep(1)
548
 
        i -= 1
549
 
    sys.stderr.write('\r')
550
 
 
551
 
    print "  fetching the file a second time"
552
 
    fo = urllib2.urlopen(url)
553
 
    data2 = fo.read()
554
 
    fo.close()
555
 
 
556
 
    if data1 == data2:
557
 
        print '  data are identical'
558
 
    else:
559
 
        print '  ERROR: DATA DIFFER'
560
 
 
561
 
    DEBUG=0
562
 
    DBPRINT = dbp
563
 
 
564
 
    
565
 
def test(url, N=10):
566
 
    print "checking error hander (do this on a non-200)"
567
 
    try: error_handler(url)
568
 
    except IOError, e:
569
 
        print "exiting - exception will prevent further tests"
570
 
        sys.exit()
571
 
    print
572
 
    print "performing continuity test (making sure stuff isn't corrupted)"
573
 
    continuity(url)
574
 
    print
575
 
    print "performing speed comparison"
576
 
    comp(N, url)
577
 
    print
578
 
    print "performing dropped-connection check"
579
 
    test_timeout(url)
580
 
    
581
 
if __name__ == '__main__':
582
 
    import time
583
 
    import sys
584
 
    try:
585
 
        N = int(sys.argv[1])
586
 
        url = sys.argv[2]
587
 
    except:
588
 
        print "%s <integer> <url>" % sys.argv[0]
589
 
    else:
590
 
        test(url, N)