~bzr-pqm/bzr/bzr.dev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
#! /usr/bin/python

# $Id: http_client.py 271 2004-10-09 10:50:59Z fredrik $
# a simple asynchronous http client (based on SimpleAsyncHTTP.py from
# "Python Standard Library" by Fredrik Lundh, O'Reilly 2001)
#
# HTTP/1.1 and GZIP support added in January 2003 by Fredrik Lundh.
#
# changes:
# 2004-08-26 fl   unified http callback
# 2004-10-09 fl   factored out gzip_consumer support
# 2005-07-08 mbp  experimental support for keepalive connections
#
# Copyright (c) 2001-2004 by Fredrik Lundh.  All rights reserved.
#



"""async/pipelined http client

Use
===

Users of this library pass in URLs they want to see, and consumer
objects that will receive the results at some point in the future.
Any number of requests may be queued up, and more may be added while
the download is in progress.

Requests can be both superscalar and superpipelined.  That is to say,
for each server there can be multiple sockets open, and each socket
may have more than one request in flight.

Design
======

There is a single DownloadManager, and a connection object for each
open socket.

Request/consumer pairs are maintained in queues.  Each connection has
a list of transmitted requests whose response has not yet been
received.  There is also a per-server list of requests that have not
yet been submitted.

When a connection is ready to transmit a new request, it takes one
from the unsubmitted list, sends the request, and adds the request to
its unfulfilled list.  This should happen when the connection has
space for more transmissions or when a new request is added by the
user.  If the connection terminates with unfulfilled requests they are
put back onto the unsubmitted list, to be retried elsewhere.

Because responses come back precisely in order, the connection always
knows what it should expect next: the response for the next
unfulfilled request.
"""

# Note that (as of ubuntu python 2.4.1) every socket.connect() call
# with a hostname does a remote DNS resolution, which is pretty sucky.
# Shouldn't there be a cache in glibc?  We should probably cache the
# address in, say, the DownloadManager.

# TODO: A default consumer operation that writes the received data
# into a file; by default the file is named the same as the last
# component of the URL.

# TODO: A utility function that is given a list of URLs, and downloads
# them all parallel/pipelined.  If any fail, it raises an exception
# (and discards the rest), or perhaps can be told to continue anyhow.
# The content is written into temporary files.  It returns a list of
# readable file objects.

# TODO: If we try pipelined or keepalive and the connection drop out
# then retry the request on a new connection; eventually we should perhaps
# learn that a given host or network just won't allow keepalive.


import asyncore
import socket, string, time, sys
import StringIO
import mimetools, urlparse, urllib
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s %(message)s',
                    filename='/tmp/http_client.log',
                    filemode='w')

logger = logging.getLogger('bzr.http_client')
debug = logger.debug
info = logger.info
error = logger.error


##
# Close connection.   Request handlers can raise this exception to
# indicate that the connection should be closed.

class CloseConnection(Exception):
    pass

##
# Redirect connection.  Request handlers can raise this exception to
# indicate that the a new request should be issued.

class Redirect(CloseConnection):
    def __init__(self, location):
        self.location = location


class DownloadManager(object):
    """Handles pipelined/overlapped downloads.

    Pass in a series of URLs with handlers to receive the response.
    This object will spread the requests over however many sockets
    seem useful.

    queued_requests
        Requests not assigned to any channel

    running_requests
        Currently assigned to a channel
    """
    def __init__(self):
        self.queued_requests = []
        # self.channel = HttpChannel('localhost', 8000, self)
        self.channels = []
        self.try_pipelined = False
        self.try_keepalive = False
        self.max_channels = 5


    def enqueue(self, url, consumer):
        self.queued_requests.append((url, consumer))
        self._wake_up_channel()


    def _channel_closed(self, channel):
        """Called by the channel when its socket closes.
        """
        self.channels.remove(channel)
        if self.queued_requests:
            # might recreate one
            self._wake_up_channel()


    def _make_channel(self):
        # proxy2 203.17.154.69
        # return HttpChannel('82.211.81.161', 80, self)         # bazaar-ng.org 
        # return HttpChannel('203.17.154.69', 8080, self)
        return HttpChannel('127.0.0.1', 8000, self)  # forwarded
            

    def _wake_up_channel(self):
        """Try to wake up one channel to send the newly-added request.

        There may be more than one request pending, and this may cause
        more than one channel to take requests.  That's OK; some of
        them may be frustrated.
        """
        from random import shuffle, choice
        
        # first, wake up any idle channels
        done = False
        for ch in self.channels:
            if not ch.sent_requests:
                ch.take_one()
                done = True
        if done:
            debug("woke existing idle channel(s)")
            return

        if len(self.channels) < self.max_channels:
            newch = self._make_channel()
            self.channels.append(newch)
            newch.take_one()
            debug("created new channel")
            return

        if self.try_pipelined:
            # ask existing channels to take it
            debug("woke busy channel")
            choice(self.channels).take_one()


        # debug("request postponed until a channel's idle")
        



    def run(self):
        """Run until all outstanding requests have been served."""
        #while self.running_requests or self.queued_requests \
        #          or not self.channel.is_idle():
        #    asyncore.loop(count=1)
        asyncore.loop()



class Response(object):
    """Holds in-flight response."""



def _parse_response_http10(header):
    from cStringIO import StringIO

    fp = StringIO(header)
    r = Response()

    r.status = fp.readline().split(" ", 2)
    r.headers = mimetools.Message(fp)

    # we can only(?) expect to do keepalive if we got either a 
    # content-length or chunked encoding; otherwise there's no way to know
    # when the content ends apart from through the connection close
    r.content_type = r.headers.get("content-type")
    try:
        r.content_length = int(r.headers.get("content-length"))
    except (ValueError, TypeError):
        r.content_length = None
    debug("seen content length of %r" % r.content_length)

    r.transfer_encoding = r.headers.get("transfer-encoding")
    r.content_encoding = r.headers.get("content-encoding")
    r.connection_reply = r.headers.get("connection")

    # TODO: pass status code to consumer?

    if r.transfer_encoding:
        raise NotImplementedError()

    if r.transfer_encoding:
        raise NotImplementedError()

    if int(r.status[1]) != 200:
        debug("can't handle response status %r" % r.status)
        raise NotImplementedError()

    if r.content_length == None:
        raise NotImplementedError()

    if r.content_length == 0:
        raise NotImplementedError()

    r.content_remaining = r.content_length                

    return r


    
    
        


class HttpChannel(asyncore.dispatcher_with_send):
    """One http socket, pipelining if possible."""
    # asynchronous http client

    user_agent = "http_client.py 1.3ka (based on effbot)"

    proxies = urllib.getproxies()

    def __init__(self, ip_host, ip_port, manager):
        asyncore.dispatcher_with_send.__init__(self)
        self.manager = manager

        # if a response header has been seen, this holds it
        self.response = None
        
        self.data = ""

        self.chunk_size = None

        self.timestamp = time.time()

        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        debug('connecting...')
        self.connect((ip_host, ip_port))

        # sent_requests holds (url, consumer) 
        self.sent_requests = []

        self._outbuf = ''


    def __repr__(self):
        return 'HttpChannel(local_port=%r)' % (self.getsockname(),)


    def is_idle(self):
        return (not self.sent_requests)


    def handle_connect(self):
        debug("connected")
        self.take_one()


    def take_one(self):
        """Accept one request from the manager if possible."""
        if self.manager.try_pipelined:
            if len(self.sent_requests) > 4:
                return
        else:
            if len(self.sent_requests) > 0:
                return 
        
        try:
            url, consumer = self.manager.queued_requests.pop(0)
            debug('request accepted by channel')
        except IndexError:
            return
        
        # TODO: If there are too many already in flight, don't take one.
        # TODO: If the socket's not writable (tx buffer full), don't take.
        self._push_request_http10(url, consumer)



    def _push_request_http10(self, url, consumer):
        """Send a request, and add it to the outstanding queue."""
        # TODO: check the url requested is appropriate for this connection

        # TODO: If there are too many requests outstanding or (less likely) the 
        # connection fails, queue it for later use.

        # TODO: Keep track of requests that have been sent but not yet fulfilled,
        # because we might need to retransmit them if the connection fails. (Or
        # should the caller do that?)

        request = self._form_request_http10(url)
        debug('send request for %s from %r' % (url, self))

        # dispatcher_with_send handles buffering the data until it can
        # be written, and hooks handle_write.

        self.send(request)

        self.sent_requests.append((url, consumer))


    def _form_request_http10(self, url):
        # TODO: get right vhost name
        request = [
            "GET %s HTTP/1.0" % (url),
            "Host: www.bazaar-ng.org",
            ]

        if self.manager.try_keepalive or self.manager.try_pipelined:
            request.extend([
                "Keep-Alive: 60", 
                "Connection: keep-alive",
                ])

        # make sure to include a user agent
        for header in request:
            if string.lower(header).startswith("user-agent:"):
                break
        else:
            request.append("User-Agent: %s" % self.user_agent)

        return string.join(request, "\r\n") + "\r\n\r\n"


    def handle_read(self):
        # handle incoming data
        data = self.recv(2048)

        self.data = self.data + data

        if len(data):
            debug('got %d bytes from socket' % len(data))
        else:
            debug('server closed connection')

        while self.data:
            consumer = self.sent_requests[0][1]
            if not self.response:
                # do not have a full response header yet

                # check if we've seen a full header
                debug('getting header for %s' % self.sent_requests[0][0])

                header = self.data.split("\r\n\r\n", 1)
                if len(header) <= 1:
                    return
                header, self.data = header

                self.response = _parse_response_http10(header)
                self.content_remaining = self.response.content_length

            if not self.data:
                return

            # we now know how many (more) content bytes we have, and how much
            # is in the data buffer. there are two main possibilities:
            # too much data, and some must be left behind containing the next
            # response headers, or too little, or possibly just right

            want = self.content_remaining
            if want > 0:
                got_data = self.data[:want]
                self.data = self.data[want:]
                
                assert got_data

                self.content_remaining -= len(got_data)

                debug('pass back %d bytes of %s, %d remain'
                      % (len(got_data),
                         self.sent_requests[0][0],
                         self.content_remaining))
                consumer.feed(data)

            if self.content_remaining == 0:
                del self.sent_requests[0]

                debug('content complete')
                consumer.content_complete()
                
                # reset lots of things and try to get the next response header
                if self.response.connection_reply == 'close':
                    debug('server requested close')
                    self.manager._channel_closed(self)
                    self.close()
                elif not self.manager.try_keepalive:
                    debug('no keepalive for this socket')
                    self.manager._channel_closed(self)
                    self.close()
                else:
                    debug("ready for next header...")
                    self.take_one()
                self.response = None



    def handle_close(self):
        debug('async told us of close on %r' % self)
        # if there are outstanding requests should probably reopen and 
        # retransmit, but if we're not making any progress then give up
        self.manager._channel_closed(self)
        self.close()


class DummyConsumer:
    def __init__(self, url, pb):
        self.url = url
        self.outf = None
        self._pb = pb

    def feed(self, data):
        # print "feed", repr(data)
        # print "feed", repr(data[:20]), repr(data[-20:]), len(data)
        if not self.outf:
            base = self.url[self.url.rindex('/')+1:]
            self.outf = file('/tmp/download/' + base, 'wb')
        self.outf.write(data)

    def error(self, err_info):
        import traceback
        error('error reported to consumer')
        traceback.print_exception(err_info[0], err_info[1], err_info[2])
        sys.exit(1)

    def content_complete(self):
        info('content complete from %s' % self.url)
        self.outf.close()
        self.outf = None
        # using last_cnt is cheating
        self._pb.update('downloading inventory',
                        self._pb.last_cnt+1,
                        self._pb.last_total)



if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)

    mgr = DownloadManager()

    from bzrlib.branch import Branch
    from bzrlib.progress import ProgressBar

    pb = ProgressBar()
    revs = Branch('/home/mbp/work/bzr').revision_history()
    pb.update('downloading inventories', 0, len(revs))

    for rev in revs:
        url = 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/inventory-store/' \
              + rev + '.gz'
        mgr.enqueue(url, DummyConsumer(url, pb))

    mgr.run()
    


    
#     for url in ['http://www.bazaar-ng.org/',
#                 'http://www.bazaar-ng.org/tutorial.html',
#                 'http://www.bazaar-ng.org/download.html',
#                 'http://www.bazaar-ng.org/bzr/bzr.dev/.bzr/revision-store/mbp@hope-20050415013653-3b3c9c3d33fae0a6.gz',
#                 ]: