~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to urlgrabber/keepalive.py

  • Committer: Martin Pool
  • Date: 2005-04-28 10:01:44 UTC
  • Revision ID: mbp@sourcefrog.net-20050428100144-e9d4ccfe5fb236df
- new 'bzr ignored' command!

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#   This library is free software; you can redistribute it and/or
 
2
#   modify it under the terms of the GNU Lesser General Public
 
3
#   License as published by the Free Software Foundation; either
 
4
#   version 2.1 of the License, or (at your option) any later version.
 
5
#
 
6
#   This library is distributed in the hope that it will be useful,
 
7
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
8
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
9
#   Lesser General Public License for more details.
 
10
#
 
11
#   You should have received a copy of the GNU Lesser General Public
 
12
#   License along with this library; if not, write to the 
 
13
#      Free Software Foundation, Inc., 
 
14
#      59 Temple Place, Suite 330, 
 
15
#      Boston, MA  02111-1307  USA
 
16
 
 
17
# This file is part of urlgrabber, a high-level cross-protocol url-grabber
 
18
# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
 
19
 
 
20
"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
 
21
 
 
22
>>> import urllib2
 
23
>>> from keepalive import HTTPHandler
 
24
>>> keepalive_handler = HTTPHandler()
 
25
>>> opener = urllib2.build_opener(keepalive_handler)
 
26
>>> urllib2.install_opener(opener)
 
27
>>> 
 
28
>>> fo = urllib2.urlopen('http://www.python.org')
 
29
 
 
30
If a connection to a given host is requested, and all of the existing
 
31
connections are still in use, another connection will be opened.  If
 
32
the handler tries to use an existing connection but it fails in some
 
33
way, it will be closed and removed from the pool.
 
34
 
 
35
To remove the handler, simply re-run build_opener with no arguments, and
 
36
install that opener.
 
37
 
 
38
You can explicitly close connections by using the close_connection()
 
39
method of the returned file-like object (described below) or you can
 
40
use the handler methods:
 
41
 
 
42
  close_connection(host)
 
43
  close_all()
 
44
  open_connections()
 
45
 
 
46
NOTE: using the close_connection and close_all methods of the handler
 
47
should be done with care when using multiple threads.
 
48
  * there is nothing that prevents another thread from creating new
 
49
    connections immediately after connections are closed
 
50
  * no checks are done to prevent in-use connections from being closed
 
51
 
 
52
>>> keepalive_handler.close_all()
 
53
 
 
54
EXTRA ATTRIBUTES AND METHODS
 
55
 
 
56
  Upon a status of 200, the object returned has a few additional
 
57
  attributes and methods, which should not be used if you want to
 
58
  remain consistent with the normal urllib2-returned objects:
 
59
 
 
60
    close_connection()  -  close the connection to the host
 
61
    readlines()         -  you know, readlines()
 
62
    status              -  the return status (ie 404)
 
63
    reason              -  english translation of status (ie 'File not found')
 
64
 
 
65
  If you want the best of both worlds, use this inside an
 
66
  AttributeError-catching try:
 
67
 
 
68
  >>> try: status = fo.status
 
69
  >>> except AttributeError: status = None
 
70
 
 
71
  Unfortunately, these are ONLY there if status == 200, so it's not
 
72
  easy to distinguish between non-200 responses.  The reason is that
 
73
  urllib2 tries to do clever things with error codes 301, 302, 401,
 
74
  and 407, and it wraps the object upon return.
 
75
 
 
76
  For python versions earlier than 2.4, you can avoid this fancy error
 
77
  handling by setting the module-level global HANDLE_ERRORS to zero.
 
78
  You see, prior to 2.4, it's the HTTP Handler's job to determine what
 
79
  to handle specially, and what to just pass up.  HANDLE_ERRORS == 0
 
80
  means "pass everything up".  In python 2.4, however, this job no
 
81
  longer belongs to the HTTP Handler and is now done by a NEW handler,
 
82
  HTTPErrorProcessor.  Here's the bottom line:
 
83
 
 
84
    python version < 2.4
 
85
        HANDLE_ERRORS == 1  (default) pass up 200, treat the rest as
 
86
                            errors
 
87
        HANDLE_ERRORS == 0  pass everything up, error processing is
 
88
                            left to the calling code
 
89
    python version >= 2.4
 
90
        HANDLE_ERRORS == 1  pass up 200, treat the rest as errors
 
91
        HANDLE_ERRORS == 0  (default) pass everything up, let the
 
92
                            other handlers (specifically,
 
93
                            HTTPErrorProcessor) decide what to do
 
94
 
 
95
  In practice, setting the variable either way makes little difference
 
96
  in python 2.4, so for the most consistent behavior across versions,
 
97
  you probably just want to use the defaults, which will give you
 
98
  exceptions on errors.
 
99
 
 
100
"""
 
101
 
 
102
# $Id: keepalive.py,v 1.9 2005/02/14 21:55:07 mstenner Exp $
 
103
 
 
104
import urllib2
 
105
import httplib
 
106
import socket
 
107
import thread
 
108
 
 
109
DEBUG = 0
 
110
def DBPRINT(*args): print ' '.join(args)
 
111
 
 
112
import sys
 
113
_python_version = map(int, sys.version.split()[0].split('.'))
 
114
if _python_version < [2, 4]: HANDLE_ERRORS = 1
 
115
else: HANDLE_ERRORS = 0
 
116
    
 
117
class ConnectionManager:
 
118
    """
 
119
    The connection manager must be able to:
 
120
      * keep track of all existing
 
121
      """
 
122
    def __init__(self):
 
123
        self._lock = thread.allocate_lock()
 
124
        self._hostmap = {} # map hosts to a list of connections
 
125
        self._connmap = {} # map connections to host
 
126
        self._readymap = {} # map connection to ready state
 
127
 
 
128
    def add(self, host, connection, ready):
 
129
        self._lock.acquire()
 
130
        try:
 
131
            if not self._hostmap.has_key(host): self._hostmap[host] = []
 
132
            self._hostmap[host].append(connection)
 
133
            self._connmap[connection] = host
 
134
            self._readymap[connection] = ready
 
135
        finally:
 
136
            self._lock.release()
 
137
 
 
138
    def remove(self, connection):
 
139
        self._lock.acquire()
 
140
        try:
 
141
            try:
 
142
                host = self._connmap[connection]
 
143
            except KeyError:
 
144
                pass
 
145
            else:
 
146
                del self._connmap[connection]
 
147
                del self._readymap[connection]
 
148
                self._hostmap[host].remove(connection)
 
149
                if not self._hostmap[host]: del self._hostmap[host]
 
150
        finally:
 
151
            self._lock.release()
 
152
 
 
153
    def set_ready(self, connection, ready):
 
154
        try: self._readymap[connection] = ready
 
155
        except KeyError: pass
 
156
        
 
157
    def get_ready_conn(self, host):
 
158
        conn = None
 
159
        self._lock.acquire()
 
160
        try:
 
161
            if self._hostmap.has_key(host):
 
162
                for c in self._hostmap[host]:
 
163
                    if self._readymap[c]:
 
164
                        self._readymap[c] = 0
 
165
                        conn = c
 
166
                        break
 
167
        finally:
 
168
            self._lock.release()
 
169
        return conn
 
170
 
 
171
    def get_all(self, host=None):
 
172
        if host:
 
173
            return list(self._hostmap.get(host, []))
 
174
        else:
 
175
            return dict(self._hostmap)
 
176
 
 
177
class HTTPHandler(urllib2.HTTPHandler):
 
178
    def __init__(self):
 
179
        self._cm = ConnectionManager()
 
180
        
 
181
    #### Connection Management
 
182
    def open_connections(self):
 
183
        """return a list of connected hosts and the number of connections
 
184
        to each.  [('foo.com:80', 2), ('bar.org', 1)]"""
 
185
        return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
 
186
 
 
187
    def close_connection(self, host):
 
188
        """close connection(s) to <host>
 
189
        host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
 
190
        no error occurs if there is no connection to that host."""
 
191
        for h in self._cm.get_all(host):
 
192
            self._cm.remove(h)
 
193
            h.close()
 
194
        
 
195
    def close_all(self):
 
196
        """close all open connections"""
 
197
        for host, conns in self._cm.get_all().items():
 
198
            for h in conns:
 
199
                self._cm.remove(h)
 
200
                h.close()
 
201
        
 
202
    def _request_closed(self, request, host, connection):
 
203
        """tells us that this request is now closed and the the
 
204
        connection is ready for another request"""
 
205
        self._cm.set_ready(connection, 1)
 
206
 
 
207
    def _remove_connection(self, host, connection, close=0):
 
208
        if close: connection.close()
 
209
        self._cm.remove(connection)
 
210
        
 
211
    #### Transaction Execution
 
212
    def http_open(self, req):
 
213
        return self.do_open(HTTPConnection, req)
 
214
 
 
215
    def do_open(self, http_class, req):
 
216
        host = req.get_host()
 
217
        if not host:
 
218
            raise urllib2.URLError('no host given')
 
219
 
 
220
        try:
 
221
            h = self._cm.get_ready_conn(host)
 
222
            while h:
 
223
                r = self._reuse_connection(h, req, host)
 
224
 
 
225
                # if this response is non-None, then it worked and we're
 
226
                # done.  Break out, skipping the else block.
 
227
                if r: break
 
228
 
 
229
                # connection is bad - possibly closed by server
 
230
                # discard it and ask for the next free connection
 
231
                h.close()
 
232
                self._cm.remove(h)
 
233
                h = self._cm.get_ready_conn(host)
 
234
            else:
 
235
                # no (working) free connections were found.  Create a new one.
 
236
                h = http_class(host)
 
237
                if DEBUG: DBPRINT("creating new connection to %s (%d)" % \
 
238
                                  (host, id(h)))
 
239
                self._cm.add(host, h, 0)
 
240
                self._start_transaction(h, req)
 
241
                r = h.getresponse()
 
242
        except (socket.error, httplib.HTTPException), err:
 
243
            raise urllib2.URLError(err)
 
244
            
 
245
        # if not a persistent connection, don't try to reuse it
 
246
        if r.will_close: self._cm.remove(h)
 
247
 
 
248
        if DEBUG: DBPRINT("STATUS: %s, %s" % (r.status, r.reason))
 
249
        r._handler = self
 
250
        r._host = host
 
251
        r._url = req.get_full_url()
 
252
        r._connection = h
 
253
        r.code = r.status
 
254
        
 
255
        if r.status == 200 or not HANDLE_ERRORS:
 
256
            return r
 
257
        else:
 
258
            return self.parent.error('http', req, r, r.status, r.reason, r.msg)
 
259
 
 
260
 
 
261
    def _reuse_connection(self, h, req, host):
 
262
        """start the transaction with a re-used connection
 
263
        return a response object (r) upon success or None on failure.
 
264
        This DOES not close or remove bad connections in cases where
 
265
        it returns.  However, if an unexpected exception occurs, it
 
266
        will close and remove the connection before re-raising.
 
267
        """
 
268
        try:
 
269
            self._start_transaction(h, req)
 
270
            r = h.getresponse()
 
271
            # note: just because we got something back doesn't mean it
 
272
            # worked.  We'll check the version below, too.
 
273
        except (socket.error, httplib.HTTPException):
 
274
            r = None
 
275
        except:
 
276
            # adding this block just in case we've missed
 
277
            # something we will still raise the exception, but
 
278
            # lets try and close the connection and remove it
 
279
            # first.  We previously got into a nasty loop
 
280
            # where an exception was uncaught, and so the
 
281
            # connection stayed open.  On the next try, the
 
282
            # same exception was raised, etc.  The tradeoff is
 
283
            # that it's now possible this call will raise
 
284
            # a DIFFERENT exception
 
285
            if DEBUG: DBPRINT("unexpected exception - " \
 
286
                              "closing connection to %s (%d)" % (host, id(h)))
 
287
            self._cm.remove(h)
 
288
            h.close()
 
289
            raise
 
290
                    
 
291
        if r is None or r.version == 9:
 
292
            # httplib falls back to assuming HTTP 0.9 if it gets a
 
293
            # bad header back.  This is most likely to happen if
 
294
            # the socket has been closed by the server since we
 
295
            # last used the connection.
 
296
            if DEBUG: DBPRINT("failed to re-use connection to %s (%d)" \
 
297
                              % (host, id(h)))
 
298
            r = None
 
299
        else:
 
300
            if DEBUG: DBPRINT("re-using connection to %s (%d)" % (host, id(h)))
 
301
 
 
302
        return r
 
303
 
 
304
    def _start_transaction(self, h, req):
 
305
        try:
 
306
            if req.has_data():
 
307
                data = req.get_data()
 
308
                h.putrequest('POST', req.get_selector())
 
309
                if not req.headers.has_key('Content-type'):
 
310
                    h.putheader('Content-type',
 
311
                                'application/x-www-form-urlencoded')
 
312
                if not req.headers.has_key('Content-length'):
 
313
                    h.putheader('Content-length', '%d' % len(data))
 
314
            else:
 
315
                h.putrequest('GET', req.get_selector())
 
316
        except (socket.error, httplib.HTTPException), err:
 
317
            raise urllib2.URLError(err)
 
318
 
 
319
        for args in self.parent.addheaders:
 
320
            h.putheader(*args)
 
321
        for k, v in req.headers.items():
 
322
            h.putheader(k, v)
 
323
        h.endheaders()
 
324
        if req.has_data():
 
325
            h.send(data)
 
326
 
 
327
class HTTPResponse(httplib.HTTPResponse):
 
328
    # we need to subclass HTTPResponse in order to
 
329
    # 1) add readline() and readlines() methods
 
330
    # 2) add close_connection() methods
 
331
    # 3) add info() and geturl() methods
 
332
 
 
333
    # in order to add readline(), read must be modified to deal with a
 
334
    # buffer.  example: readline must read a buffer and then spit back
 
335
    # one line at a time.  The only real alternative is to read one
 
336
    # BYTE at a time (ick).  Once something has been read, it can't be
 
337
    # put back (ok, maybe it can, but that's even uglier than this),
 
338
    # so if you THEN do a normal read, you must first take stuff from
 
339
    # the buffer.
 
340
 
 
341
    # the read method wraps the original to accomodate buffering,
 
342
    # although read() never adds to the buffer.
 
343
    # Both readline and readlines have been stolen with almost no
 
344
    # modification from socket.py
 
345
    
 
346
 
 
347
    def __init__(self, sock, debuglevel=0, strict=0, method=None):
 
348
        if method: # the httplib in python 2.3 uses the method arg
 
349
            httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
 
350
        else: # 2.2 doesn't
 
351
            httplib.HTTPResponse.__init__(self, sock, debuglevel)
 
352
        self.fileno = sock.fileno
 
353
        self.code = None
 
354
        self._rbuf = ''
 
355
        self._rbufsize = 8096
 
356
        self._handler = None # inserted by the handler later
 
357
        self._host = None    # (same)
 
358
        self._url = None     # (same)
 
359
        self._connection = None # (same)
 
360
 
 
361
    _raw_read = httplib.HTTPResponse.read
 
362
 
 
363
    def close(self):
 
364
        if self.fp:
 
365
            self.fp.close()
 
366
            self.fp = None
 
367
            if self._handler:
 
368
                self._handler._request_closed(self, self._host,
 
369
                                              self._connection)
 
370
 
 
371
    def close_connection(self):
 
372
        self._handler._remove_connection(self._host, self._connection, close=1)
 
373
        self.close()
 
374
        
 
375
    def info(self):
 
376
        return self.msg
 
377
 
 
378
    def geturl(self):
 
379
        return self._url
 
380
 
 
381
    def read(self, amt=None):
 
382
        # the _rbuf test is only in this first if for speed.  It's not
 
383
        # logically necessary
 
384
        if self._rbuf and not amt is None:
 
385
            L = len(self._rbuf)
 
386
            if amt > L:
 
387
                amt -= L
 
388
            else:
 
389
                s = self._rbuf[:amt]
 
390
                self._rbuf = self._rbuf[amt:]
 
391
                return s
 
392
 
 
393
        s = self._rbuf + self._raw_read(amt)
 
394
        self._rbuf = ''
 
395
        return s
 
396
 
 
397
    def readline(self, limit=-1):
 
398
        data = ""
 
399
        i = self._rbuf.find('\n')
 
400
        while i < 0 and not (0 < limit <= len(self._rbuf)):
 
401
            new = self._raw_read(self._rbufsize)
 
402
            if not new: break
 
403
            i = new.find('\n')
 
404
            if i >= 0: i = i + len(self._rbuf)
 
405
            self._rbuf = self._rbuf + new
 
406
        if i < 0: i = len(self._rbuf)
 
407
        else: i = i+1
 
408
        if 0 <= limit < len(self._rbuf): i = limit
 
409
        data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
 
410
        return data
 
411
 
 
412
    def readlines(self, sizehint = 0):
 
413
        total = 0
 
414
        list = []
 
415
        while 1:
 
416
            line = self.readline()
 
417
            if not line: break
 
418
            list.append(line)
 
419
            total += len(line)
 
420
            if sizehint and total >= sizehint:
 
421
                break
 
422
        return list
 
423
 
 
424
 
 
425
class HTTPConnection(httplib.HTTPConnection):
 
426
    # use the modified response class
 
427
    response_class = HTTPResponse
 
428
    
 
429
#########################################################################
 
430
#####   TEST FUNCTIONS
 
431
#########################################################################
 
432
 
 
433
def error_handler(url):
 
434
    global HANDLE_ERRORS
 
435
    orig = HANDLE_ERRORS
 
436
    keepalive_handler = HTTPHandler()
 
437
    opener = urllib2.build_opener(keepalive_handler)
 
438
    urllib2.install_opener(opener)
 
439
    pos = {0: 'off', 1: 'on'}
 
440
    for i in (0, 1):
 
441
        print "  fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
 
442
        HANDLE_ERRORS = i
 
443
        try:
 
444
            fo = urllib2.urlopen(url)
 
445
            foo = fo.read()
 
446
            fo.close()
 
447
            try: status, reason = fo.status, fo.reason
 
448
            except AttributeError: status, reason = None, None
 
449
        except IOError, e:
 
450
            print "  EXCEPTION: %s" % e
 
451
            raise
 
452
        else:
 
453
            print "  status = %s, reason = %s" % (status, reason)
 
454
    HANDLE_ERRORS = orig
 
455
    hosts = keepalive_handler.open_connections()
 
456
    print "open connections:", hosts
 
457
    keepalive_handler.close_all()
 
458
 
 
459
def continuity(url):
 
460
    import md5
 
461
    format = '%25s: %s'
 
462
    
 
463
    # first fetch the file with the normal http handler
 
464
    opener = urllib2.build_opener()
 
465
    urllib2.install_opener(opener)
 
466
    fo = urllib2.urlopen(url)
 
467
    foo = fo.read()
 
468
    fo.close()
 
469
    m = md5.new(foo)
 
470
    print format % ('normal urllib', m.hexdigest())
 
471
 
 
472
    # now install the keepalive handler and try again
 
473
    opener = urllib2.build_opener(HTTPHandler())
 
474
    urllib2.install_opener(opener)
 
475
 
 
476
    fo = urllib2.urlopen(url)
 
477
    foo = fo.read()
 
478
    fo.close()
 
479
    m = md5.new(foo)
 
480
    print format % ('keepalive read', m.hexdigest())
 
481
 
 
482
    fo = urllib2.urlopen(url)
 
483
    foo = ''
 
484
    while 1:
 
485
        f = fo.readline()
 
486
        if f: foo = foo + f
 
487
        else: break
 
488
    fo.close()
 
489
    m = md5.new(foo)
 
490
    print format % ('keepalive readline', m.hexdigest())
 
491
 
 
492
def comp(N, url):
 
493
    print '  making %i connections to:\n  %s' % (N, url)
 
494
 
 
495
    sys.stdout.write('  first using the normal urllib handlers')
 
496
    # first use normal opener
 
497
    opener = urllib2.build_opener()
 
498
    urllib2.install_opener(opener)
 
499
    t1 = fetch(N, url)
 
500
    print '  TIME: %.3f s' % t1
 
501
 
 
502
    sys.stdout.write('  now using the keepalive handler       ')
 
503
    # now install the keepalive handler and try again
 
504
    opener = urllib2.build_opener(HTTPHandler())
 
505
    urllib2.install_opener(opener)
 
506
    t2 = fetch(N, url)
 
507
    print '  TIME: %.3f s' % t2
 
508
    print '  improvement factor: %.2f' % (t1/t2, )
 
509
    
 
510
def fetch(N, url, delay=0):
 
511
    lens = []
 
512
    starttime = time.time()
 
513
    for i in range(N):
 
514
        if delay and i > 0: time.sleep(delay)
 
515
        fo = urllib2.urlopen(url)
 
516
        foo = fo.read()
 
517
        fo.close()
 
518
        lens.append(len(foo))
 
519
    diff = time.time() - starttime
 
520
 
 
521
    j = 0
 
522
    for i in lens[1:]:
 
523
        j = j + 1
 
524
        if not i == lens[0]:
 
525
            print "WARNING: inconsistent length on read %i: %i" % (j, i)
 
526
 
 
527
    return diff
 
528
 
 
529
def test_timeout(url):
 
530
    global DEBUG, DBPRINT
 
531
    dbp = DBPRINT
 
532
    def DBPRINT(*args): print '    ' + ' '.join(args)
 
533
    DEBUG=1
 
534
    print "  fetching the file to establish a connection"
 
535
    fo = urllib2.urlopen(url)
 
536
    data1 = fo.read()
 
537
    fo.close()
 
538
 
 
539
    i = 20
 
540
    print "  waiting %i seconds for the server to close the connection" % i
 
541
    while i > 0:
 
542
        sys.stdout.write('\r  %2i' % i)
 
543
        sys.stdout.flush()
 
544
        time.sleep(1)
 
545
        i -= 1
 
546
    sys.stderr.write('\r')
 
547
 
 
548
    print "  fetching the file a second time"
 
549
    fo = urllib2.urlopen(url)
 
550
    data2 = fo.read()
 
551
    fo.close()
 
552
 
 
553
    if data1 == data2:
 
554
        print '  data are identical'
 
555
    else:
 
556
        print '  ERROR: DATA DIFFER'
 
557
 
 
558
    DEBUG=0
 
559
    DBPRINT = dbp
 
560
 
 
561
    
 
562
def test(url, N=10):
 
563
    print "checking error hander (do this on a non-200)"
 
564
    try: error_handler(url)
 
565
    except IOError, e:
 
566
        print "exiting - exception will prevent further tests"
 
567
        sys.exit()
 
568
    print
 
569
    print "performing continuity test (making sure stuff isn't corrupted)"
 
570
    continuity(url)
 
571
    print
 
572
    print "performing speed comparison"
 
573
    comp(N, url)
 
574
    print
 
575
    print "performing dropped-connection check"
 
576
    test_timeout(url)
 
577
    
 
578
if __name__ == '__main__':
 
579
    import time
 
580
    import sys
 
581
    try:
 
582
        N = int(sys.argv[1])
 
583
        url = sys.argv[2]
 
584
    except:
 
585
        print "%s <integer> <url>" % sys.argv[0]
 
586
    else:
 
587
        test(url, N)