~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to urlgrabber/keepalive.py

  • Committer: Martin Pool
  • Date: 2005-07-04 08:06:51 UTC
  • Revision ID: mbp@sourcefrog.net-20050704080651-6ecec49164359e48
- track pending-merges

- unit tests for this

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#   This library is free software; you can redistribute it and/or
 
2
#   modify it under the terms of the GNU Lesser General Public
 
3
#   License as published by the Free Software Foundation; either
 
4
#   version 2.1 of the License, or (at your option) any later version.
 
5
#
 
6
#   This library is distributed in the hope that it will be useful,
 
7
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
8
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
9
#   Lesser General Public License for more details.
 
10
#
 
11
#   You should have received a copy of the GNU Lesser General Public
 
12
#   License along with this library; if not, write to the 
 
13
#      Free Software Foundation, Inc., 
 
14
#      59 Temple Place, Suite 330, 
 
15
#      Boston, MA  02111-1307  USA
 
16
 
 
17
# This file is part of urlgrabber, a high-level cross-protocol url-grabber
 
18
# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
 
19
 
 
20
"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
 
21
 
 
22
>>> import urllib2
 
23
>>> from keepalive import HTTPHandler
 
24
>>> keepalive_handler = HTTPHandler()
 
25
>>> opener = urllib2.build_opener(keepalive_handler)
 
26
>>> urllib2.install_opener(opener)
 
27
>>> 
 
28
>>> fo = urllib2.urlopen('http://www.python.org')
 
29
 
 
30
If a connection to a given host is requested, and all of the existing
 
31
connections are still in use, another connection will be opened.  If
 
32
the handler tries to use an existing connection but it fails in some
 
33
way, it will be closed and removed from the pool.
 
34
 
 
35
To remove the handler, simply re-run build_opener with no arguments, and
 
36
install that opener.
 
37
 
 
38
You can explicitly close connections by using the close_connection()
 
39
method of the returned file-like object (described below) or you can
 
40
use the handler methods:
 
41
 
 
42
  close_connection(host)
 
43
  close_all()
 
44
  open_connections()
 
45
 
 
46
NOTE: using the close_connection and close_all methods of the handler
 
47
should be done with care when using multiple threads.
 
48
  * there is nothing that prevents another thread from creating new
 
49
    connections immediately after connections are closed
 
50
  * no checks are done to prevent in-use connections from being closed
 
51
 
 
52
>>> keepalive_handler.close_all()
 
53
 
 
54
EXTRA ATTRIBUTES AND METHODS
 
55
 
 
56
  Upon a status of 200, the object returned has a few additional
 
57
  attributes and methods, which should not be used if you want to
 
58
  remain consistent with the normal urllib2-returned objects:
 
59
 
 
60
    close_connection()  -  close the connection to the host
 
61
    readlines()         -  you know, readlines()
 
62
    status              -  the return status (ie 404)
 
63
    reason              -  english translation of status (ie 'File not found')
 
64
 
 
65
  If you want the best of both worlds, use this inside an
 
66
  AttributeError-catching try:
 
67
 
 
68
  >>> try: status = fo.status
 
69
  >>> except AttributeError: status = None
 
70
 
 
71
  Unfortunately, these are ONLY there if status == 200, so it's not
 
72
  easy to distinguish between non-200 responses.  The reason is that
 
73
  urllib2 tries to do clever things with error codes 301, 302, 401,
 
74
  and 407, and it wraps the object upon return.
 
75
 
 
76
  For python versions earlier than 2.4, you can avoid this fancy error
 
77
  handling by setting the module-level global HANDLE_ERRORS to zero.
 
78
  You see, prior to 2.4, it's the HTTP Handler's job to determine what
 
79
  to handle specially, and what to just pass up.  HANDLE_ERRORS == 0
 
80
  means "pass everything up".  In python 2.4, however, this job no
 
81
  longer belongs to the HTTP Handler and is now done by a NEW handler,
 
82
  HTTPErrorProcessor.  Here's the bottom line:
 
83
 
 
84
    python version < 2.4
 
85
        HANDLE_ERRORS == 1  (default) pass up 200, treat the rest as
 
86
                            errors
 
87
        HANDLE_ERRORS == 0  pass everything up, error processing is
 
88
                            left to the calling code
 
89
    python version >= 2.4
 
90
        HANDLE_ERRORS == 1  pass up 200, treat the rest as errors
 
91
        HANDLE_ERRORS == 0  (default) pass everything up, let the
 
92
                            other handlers (specifically,
 
93
                            HTTPErrorProcessor) decide what to do
 
94
 
 
95
  In practice, setting the variable either way makes little difference
 
96
  in python 2.4, so for the most consistent behavior across versions,
 
97
  you probably just want to use the defaults, which will give you
 
98
  exceptions on errors.
 
99
 
 
100
"""
 
101
 
 
102
# $Id: keepalive.py,v 1.9 2005/02/14 21:55:07 mstenner Exp $
 
103
 
 
104
import urllib2
 
105
import httplib
 
106
import socket
 
107
import thread
 
108
 
 
109
DEBUG = 0
 
110
def DBPRINT(*args): print ' '.join(args)
 
111
 
 
112
import sys
 
113
if hasattr(sys, 'version_info'):
 
114
    _python_version = sys.version_info
 
115
else:
 
116
    _python_version = map(int, sys.version.split()[0].split('.'))
 
117
if _python_version < [2, 4]: HANDLE_ERRORS = 1
 
118
else: HANDLE_ERRORS = 0
 
119
    
 
120
class ConnectionManager:
 
121
    """
 
122
    The connection manager must be able to:
 
123
      * keep track of all existing
 
124
      """
 
125
    def __init__(self):
 
126
        self._lock = thread.allocate_lock()
 
127
        self._hostmap = {} # map hosts to a list of connections
 
128
        self._connmap = {} # map connections to host
 
129
        self._readymap = {} # map connection to ready state
 
130
 
 
131
    def add(self, host, connection, ready):
 
132
        self._lock.acquire()
 
133
        try:
 
134
            if not self._hostmap.has_key(host): self._hostmap[host] = []
 
135
            self._hostmap[host].append(connection)
 
136
            self._connmap[connection] = host
 
137
            self._readymap[connection] = ready
 
138
        finally:
 
139
            self._lock.release()
 
140
 
 
141
    def remove(self, connection):
 
142
        self._lock.acquire()
 
143
        try:
 
144
            try:
 
145
                host = self._connmap[connection]
 
146
            except KeyError:
 
147
                pass
 
148
            else:
 
149
                del self._connmap[connection]
 
150
                del self._readymap[connection]
 
151
                self._hostmap[host].remove(connection)
 
152
                if not self._hostmap[host]: del self._hostmap[host]
 
153
        finally:
 
154
            self._lock.release()
 
155
 
 
156
    def set_ready(self, connection, ready):
 
157
        try: self._readymap[connection] = ready
 
158
        except KeyError: pass
 
159
        
 
160
    def get_ready_conn(self, host):
 
161
        conn = None
 
162
        self._lock.acquire()
 
163
        try:
 
164
            if self._hostmap.has_key(host):
 
165
                for c in self._hostmap[host]:
 
166
                    if self._readymap[c]:
 
167
                        self._readymap[c] = 0
 
168
                        conn = c
 
169
                        break
 
170
        finally:
 
171
            self._lock.release()
 
172
        return conn
 
173
 
 
174
    def get_all(self, host=None):
 
175
        if host:
 
176
            return list(self._hostmap.get(host, []))
 
177
        else:
 
178
            return dict(self._hostmap)
 
179
 
 
180
class HTTPHandler(urllib2.HTTPHandler):
 
181
    def __init__(self):
 
182
        self._cm = ConnectionManager()
 
183
        
 
184
    #### Connection Management
 
185
    def open_connections(self):
 
186
        """return a list of connected hosts and the number of connections
 
187
        to each.  [('foo.com:80', 2), ('bar.org', 1)]"""
 
188
        return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
 
189
 
 
190
    def close_connection(self, host):
 
191
        """close connection(s) to <host>
 
192
        host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
 
193
        no error occurs if there is no connection to that host."""
 
194
        for h in self._cm.get_all(host):
 
195
            self._cm.remove(h)
 
196
            h.close()
 
197
        
 
198
    def close_all(self):
 
199
        """close all open connections"""
 
200
        for host, conns in self._cm.get_all().items():
 
201
            for h in conns:
 
202
                self._cm.remove(h)
 
203
                h.close()
 
204
        
 
205
    def _request_closed(self, request, host, connection):
 
206
        """tells us that this request is now closed and the the
 
207
        connection is ready for another request"""
 
208
        self._cm.set_ready(connection, 1)
 
209
 
 
210
    def _remove_connection(self, host, connection, close=0):
 
211
        if close: connection.close()
 
212
        self._cm.remove(connection)
 
213
        
 
214
    #### Transaction Execution
 
215
    def http_open(self, req):
 
216
        return self.do_open(HTTPConnection, req)
 
217
 
 
218
    def do_open(self, http_class, req):
 
219
        host = req.get_host()
 
220
        if not host:
 
221
            raise urllib2.URLError('no host given')
 
222
 
 
223
        try:
 
224
            h = self._cm.get_ready_conn(host)
 
225
            while h:
 
226
                r = self._reuse_connection(h, req, host)
 
227
 
 
228
                # if this response is non-None, then it worked and we're
 
229
                # done.  Break out, skipping the else block.
 
230
                if r: break
 
231
 
 
232
                # connection is bad - possibly closed by server
 
233
                # discard it and ask for the next free connection
 
234
                h.close()
 
235
                self._cm.remove(h)
 
236
                h = self._cm.get_ready_conn(host)
 
237
            else:
 
238
                # no (working) free connections were found.  Create a new one.
 
239
                h = http_class(host)
 
240
                if DEBUG: DBPRINT("creating new connection to %s (%d)" % \
 
241
                                  (host, id(h)))
 
242
                self._cm.add(host, h, 0)
 
243
                self._start_transaction(h, req)
 
244
                r = h.getresponse()
 
245
        except (socket.error, httplib.HTTPException), err:
 
246
            raise urllib2.URLError(err)
 
247
            
 
248
        # if not a persistent connection, don't try to reuse it
 
249
        if r.will_close: self._cm.remove(h)
 
250
 
 
251
        if DEBUG: DBPRINT("STATUS: %s, %s" % (r.status, r.reason))
 
252
        r._handler = self
 
253
        r._host = host
 
254
        r._url = req.get_full_url()
 
255
        r._connection = h
 
256
        r.code = r.status
 
257
        
 
258
        if r.status == 200 or not HANDLE_ERRORS:
 
259
            return r
 
260
        else:
 
261
            return self.parent.error('http', req, r, r.status, r.reason, r.msg)
 
262
 
 
263
 
 
264
    def _reuse_connection(self, h, req, host):
 
265
        """start the transaction with a re-used connection
 
266
        return a response object (r) upon success or None on failure.
 
267
        This DOES not close or remove bad connections in cases where
 
268
        it returns.  However, if an unexpected exception occurs, it
 
269
        will close and remove the connection before re-raising.
 
270
        """
 
271
        try:
 
272
            self._start_transaction(h, req)
 
273
            r = h.getresponse()
 
274
            # note: just because we got something back doesn't mean it
 
275
            # worked.  We'll check the version below, too.
 
276
        except (socket.error, httplib.HTTPException):
 
277
            r = None
 
278
        except:
 
279
            # adding this block just in case we've missed
 
280
            # something we will still raise the exception, but
 
281
            # lets try and close the connection and remove it
 
282
            # first.  We previously got into a nasty loop
 
283
            # where an exception was uncaught, and so the
 
284
            # connection stayed open.  On the next try, the
 
285
            # same exception was raised, etc.  The tradeoff is
 
286
            # that it's now possible this call will raise
 
287
            # a DIFFERENT exception
 
288
            if DEBUG: DBPRINT("unexpected exception - " \
 
289
                              "closing connection to %s (%d)" % (host, id(h)))
 
290
            self._cm.remove(h)
 
291
            h.close()
 
292
            raise
 
293
                    
 
294
        if r is None or r.version == 9:
 
295
            # httplib falls back to assuming HTTP 0.9 if it gets a
 
296
            # bad header back.  This is most likely to happen if
 
297
            # the socket has been closed by the server since we
 
298
            # last used the connection.
 
299
            if DEBUG: DBPRINT("failed to re-use connection to %s (%d)" \
 
300
                              % (host, id(h)))
 
301
            r = None
 
302
        else:
 
303
            if DEBUG: DBPRINT("re-using connection to %s (%d)" % (host, id(h)))
 
304
 
 
305
        return r
 
306
 
 
307
    def _start_transaction(self, h, req):
 
308
        try:
 
309
            if req.has_data():
 
310
                data = req.get_data()
 
311
                h.putrequest('POST', req.get_selector())
 
312
                if not req.headers.has_key('Content-type'):
 
313
                    h.putheader('Content-type',
 
314
                                'application/x-www-form-urlencoded')
 
315
                if not req.headers.has_key('Content-length'):
 
316
                    h.putheader('Content-length', '%d' % len(data))
 
317
            else:
 
318
                h.putrequest('GET', req.get_selector())
 
319
        except (socket.error, httplib.HTTPException), err:
 
320
            raise urllib2.URLError(err)
 
321
 
 
322
        for args in self.parent.addheaders:
 
323
            h.putheader(*args)
 
324
        for k, v in req.headers.items():
 
325
            h.putheader(k, v)
 
326
        h.endheaders()
 
327
        if req.has_data():
 
328
            h.send(data)
 
329
 
 
330
class HTTPResponse(httplib.HTTPResponse):
 
331
    # we need to subclass HTTPResponse in order to
 
332
    # 1) add readline() and readlines() methods
 
333
    # 2) add close_connection() methods
 
334
    # 3) add info() and geturl() methods
 
335
 
 
336
    # in order to add readline(), read must be modified to deal with a
 
337
    # buffer.  example: readline must read a buffer and then spit back
 
338
    # one line at a time.  The only real alternative is to read one
 
339
    # BYTE at a time (ick).  Once something has been read, it can't be
 
340
    # put back (ok, maybe it can, but that's even uglier than this),
 
341
    # so if you THEN do a normal read, you must first take stuff from
 
342
    # the buffer.
 
343
 
 
344
    # the read method wraps the original to accomodate buffering,
 
345
    # although read() never adds to the buffer.
 
346
    # Both readline and readlines have been stolen with almost no
 
347
    # modification from socket.py
 
348
    
 
349
 
 
350
    def __init__(self, sock, debuglevel=0, strict=0, method=None):
 
351
        if method: # the httplib in python 2.3 uses the method arg
 
352
            httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
 
353
        else: # 2.2 doesn't
 
354
            httplib.HTTPResponse.__init__(self, sock, debuglevel)
 
355
        self.fileno = sock.fileno
 
356
        self.code = None
 
357
        self._rbuf = ''
 
358
        self._rbufsize = 8096
 
359
        self._handler = None # inserted by the handler later
 
360
        self._host = None    # (same)
 
361
        self._url = None     # (same)
 
362
        self._connection = None # (same)
 
363
 
 
364
    _raw_read = httplib.HTTPResponse.read
 
365
 
 
366
    def close(self):
 
367
        if self.fp:
 
368
            self.fp.close()
 
369
            self.fp = None
 
370
            if self._handler:
 
371
                self._handler._request_closed(self, self._host,
 
372
                                              self._connection)
 
373
 
 
374
    def close_connection(self):
 
375
        self._handler._remove_connection(self._host, self._connection, close=1)
 
376
        self.close()
 
377
        
 
378
    def info(self):
 
379
        return self.msg
 
380
 
 
381
    def geturl(self):
 
382
        return self._url
 
383
 
 
384
    def read(self, amt=None):
 
385
        # the _rbuf test is only in this first if for speed.  It's not
 
386
        # logically necessary
 
387
        if self._rbuf and not amt is None:
 
388
            L = len(self._rbuf)
 
389
            if amt > L:
 
390
                amt -= L
 
391
            else:
 
392
                s = self._rbuf[:amt]
 
393
                self._rbuf = self._rbuf[amt:]
 
394
                return s
 
395
 
 
396
        s = self._rbuf + self._raw_read(amt)
 
397
        self._rbuf = ''
 
398
        return s
 
399
 
 
400
    def readline(self, limit=-1):
 
401
        data = ""
 
402
        i = self._rbuf.find('\n')
 
403
        while i < 0 and not (0 < limit <= len(self._rbuf)):
 
404
            new = self._raw_read(self._rbufsize)
 
405
            if not new: break
 
406
            i = new.find('\n')
 
407
            if i >= 0: i = i + len(self._rbuf)
 
408
            self._rbuf = self._rbuf + new
 
409
        if i < 0: i = len(self._rbuf)
 
410
        else: i = i+1
 
411
        if 0 <= limit < len(self._rbuf): i = limit
 
412
        data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
 
413
        return data
 
414
 
 
415
    def readlines(self, sizehint = 0):
 
416
        total = 0
 
417
        list = []
 
418
        while 1:
 
419
            line = self.readline()
 
420
            if not line: break
 
421
            list.append(line)
 
422
            total += len(line)
 
423
            if sizehint and total >= sizehint:
 
424
                break
 
425
        return list
 
426
 
 
427
 
 
428
class HTTPConnection(httplib.HTTPConnection):
 
429
    # use the modified response class
 
430
    response_class = HTTPResponse
 
431
    
 
432
#########################################################################
 
433
#####   TEST FUNCTIONS
 
434
#########################################################################
 
435
 
 
436
def error_handler(url):
 
437
    global HANDLE_ERRORS
 
438
    orig = HANDLE_ERRORS
 
439
    keepalive_handler = HTTPHandler()
 
440
    opener = urllib2.build_opener(keepalive_handler)
 
441
    urllib2.install_opener(opener)
 
442
    pos = {0: 'off', 1: 'on'}
 
443
    for i in (0, 1):
 
444
        print "  fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
 
445
        HANDLE_ERRORS = i
 
446
        try:
 
447
            fo = urllib2.urlopen(url)
 
448
            foo = fo.read()
 
449
            fo.close()
 
450
            try: status, reason = fo.status, fo.reason
 
451
            except AttributeError: status, reason = None, None
 
452
        except IOError, e:
 
453
            print "  EXCEPTION: %s" % e
 
454
            raise
 
455
        else:
 
456
            print "  status = %s, reason = %s" % (status, reason)
 
457
    HANDLE_ERRORS = orig
 
458
    hosts = keepalive_handler.open_connections()
 
459
    print "open connections:", hosts
 
460
    keepalive_handler.close_all()
 
461
 
 
462
def continuity(url):
 
463
    import md5
 
464
    format = '%25s: %s'
 
465
    
 
466
    # first fetch the file with the normal http handler
 
467
    opener = urllib2.build_opener()
 
468
    urllib2.install_opener(opener)
 
469
    fo = urllib2.urlopen(url)
 
470
    foo = fo.read()
 
471
    fo.close()
 
472
    m = md5.new(foo)
 
473
    print format % ('normal urllib', m.hexdigest())
 
474
 
 
475
    # now install the keepalive handler and try again
 
476
    opener = urllib2.build_opener(HTTPHandler())
 
477
    urllib2.install_opener(opener)
 
478
 
 
479
    fo = urllib2.urlopen(url)
 
480
    foo = fo.read()
 
481
    fo.close()
 
482
    m = md5.new(foo)
 
483
    print format % ('keepalive read', m.hexdigest())
 
484
 
 
485
    fo = urllib2.urlopen(url)
 
486
    foo = ''
 
487
    while 1:
 
488
        f = fo.readline()
 
489
        if f: foo = foo + f
 
490
        else: break
 
491
    fo.close()
 
492
    m = md5.new(foo)
 
493
    print format % ('keepalive readline', m.hexdigest())
 
494
 
 
495
def comp(N, url):
 
496
    print '  making %i connections to:\n  %s' % (N, url)
 
497
 
 
498
    sys.stdout.write('  first using the normal urllib handlers')
 
499
    # first use normal opener
 
500
    opener = urllib2.build_opener()
 
501
    urllib2.install_opener(opener)
 
502
    t1 = fetch(N, url)
 
503
    print '  TIME: %.3f s' % t1
 
504
 
 
505
    sys.stdout.write('  now using the keepalive handler       ')
 
506
    # now install the keepalive handler and try again
 
507
    opener = urllib2.build_opener(HTTPHandler())
 
508
    urllib2.install_opener(opener)
 
509
    t2 = fetch(N, url)
 
510
    print '  TIME: %.3f s' % t2
 
511
    print '  improvement factor: %.2f' % (t1/t2, )
 
512
    
 
513
def fetch(N, url, delay=0):
 
514
    lens = []
 
515
    starttime = time.time()
 
516
    for i in range(N):
 
517
        if delay and i > 0: time.sleep(delay)
 
518
        fo = urllib2.urlopen(url)
 
519
        foo = fo.read()
 
520
        fo.close()
 
521
        lens.append(len(foo))
 
522
    diff = time.time() - starttime
 
523
 
 
524
    j = 0
 
525
    for i in lens[1:]:
 
526
        j = j + 1
 
527
        if not i == lens[0]:
 
528
            print "WARNING: inconsistent length on read %i: %i" % (j, i)
 
529
 
 
530
    return diff
 
531
 
 
532
def test_timeout(url):
 
533
    global DEBUG, DBPRINT
 
534
    dbp = DBPRINT
 
535
    def DBPRINT(*args): print '    ' + ' '.join(args)
 
536
    DEBUG=1
 
537
    print "  fetching the file to establish a connection"
 
538
    fo = urllib2.urlopen(url)
 
539
    data1 = fo.read()
 
540
    fo.close()
 
541
 
 
542
    i = 20
 
543
    print "  waiting %i seconds for the server to close the connection" % i
 
544
    while i > 0:
 
545
        sys.stdout.write('\r  %2i' % i)
 
546
        sys.stdout.flush()
 
547
        time.sleep(1)
 
548
        i -= 1
 
549
    sys.stderr.write('\r')
 
550
 
 
551
    print "  fetching the file a second time"
 
552
    fo = urllib2.urlopen(url)
 
553
    data2 = fo.read()
 
554
    fo.close()
 
555
 
 
556
    if data1 == data2:
 
557
        print '  data are identical'
 
558
    else:
 
559
        print '  ERROR: DATA DIFFER'
 
560
 
 
561
    DEBUG=0
 
562
    DBPRINT = dbp
 
563
 
 
564
    
 
565
def test(url, N=10):
 
566
    print "checking error hander (do this on a non-200)"
 
567
    try: error_handler(url)
 
568
    except IOError, e:
 
569
        print "exiting - exception will prevent further tests"
 
570
        sys.exit()
 
571
    print
 
572
    print "performing continuity test (making sure stuff isn't corrupted)"
 
573
    continuity(url)
 
574
    print
 
575
    print "performing speed comparison"
 
576
    comp(N, url)
 
577
    print
 
578
    print "performing dropped-connection check"
 
579
    test_timeout(url)
 
580
    
 
581
if __name__ == '__main__':
 
582
    import time
 
583
    import sys
 
584
    try:
 
585
        N = int(sys.argv[1])
 
586
        url = sys.argv[2]
 
587
    except:
 
588
        print "%s <integer> <url>" % sys.argv[0]
 
589
    else:
 
590
        test(url, N)