~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Alexander Belchenko
  • Date: 2007-11-19 22:54:30 UTC
  • mfrom: (3006 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3008.
  • Revision ID: bialix@ukr.net-20071119225430-x0ewosrsagis0yno
merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
 
21
import collections
21
22
from cStringIO import StringIO
22
23
import time
23
24
 
197
198
        """
198
199
        self._write_func(RESPONSE_VERSION_TWO)
199
200
 
200
 
 
201
 
class LengthPrefixedBodyDecoder(object):
202
 
    """Decodes the length-prefixed bulk data."""
203
 
    
 
201
    def _send_response(self, response):
 
202
        """Send a smart server response down the output stream."""
 
203
        assert not self._finished, 'response already sent'
 
204
        self._finished = True
 
205
        self._write_protocol_version()
 
206
        self._write_success_or_failure_prefix(response)
 
207
        self._write_func(_encode_tuple(response.args))
 
208
        if response.body is not None:
 
209
            assert isinstance(response.body, str), 'body must be a str'
 
210
            assert response.body_stream is None, (
 
211
                'body_stream and body cannot both be set')
 
212
            bytes = self._encode_bulk_data(response.body)
 
213
            self._write_func(bytes)
 
214
        elif response.body_stream is not None:
 
215
            _send_stream(response.body_stream, self._write_func)
 
216
 
 
217
 
 
218
def _send_stream(stream, write_func):
 
219
    write_func('chunked\n')
 
220
    _send_chunks(stream, write_func)
 
221
    write_func('END\n')
 
222
 
 
223
 
 
224
def _send_chunks(stream, write_func):
 
225
    for chunk in stream:
 
226
        if isinstance(chunk, str):
 
227
            bytes = "%x\n%s" % (len(chunk), chunk)
 
228
            write_func(bytes)
 
229
        elif isinstance(chunk, request.FailedSmartServerResponse):
 
230
            write_func('ERR\n')
 
231
            _send_chunks(chunk.args, write_func)
 
232
            return
 
233
        else:
 
234
            raise errors.BzrError(
 
235
                'Chunks must be str or FailedSmartServerResponse, got %r'
 
236
                % chunk)
 
237
 
 
238
 
 
239
class _StatefulDecoder(object):
 
240
 
204
241
    def __init__(self):
205
 
        self.bytes_left = None
206
242
        self.finished_reading = False
207
243
        self.unused_data = ''
208
 
        self.state_accept = self._state_accept_expecting_length
209
 
        self.state_read = self._state_read_no_data
210
 
        self._in_buffer = ''
211
 
        self._trailer_buffer = ''
212
 
    
 
244
        self.bytes_left = None
 
245
 
213
246
    def accept_bytes(self, bytes):
214
247
        """Decode as much of bytes as possible.
215
248
 
226
259
            current_state = self.state_accept
227
260
            self.state_accept('')
228
261
 
 
262
 
 
263
class ChunkedBodyDecoder(_StatefulDecoder):
 
264
    """Decoder for chunked body data.
 
265
 
 
266
    This is very similar the HTTP's chunked encoding.  See the description of
 
267
    streamed body data in `doc/developers/network-protocol.txt` for details.
 
268
    """
 
269
 
 
270
    def __init__(self):
 
271
        _StatefulDecoder.__init__(self)
 
272
        self.state_accept = self._state_accept_expecting_header
 
273
        self._in_buffer = ''
 
274
        self.chunk_in_progress = None
 
275
        self.chunks = collections.deque()
 
276
        self.error = False
 
277
        self.error_in_progress = None
 
278
    
 
279
    def next_read_size(self):
 
280
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
 
281
        # end-of-body marker is 4 bytes: 'END\n'.
 
282
        if self.state_accept == self._state_accept_reading_chunk:
 
283
            # We're expecting more chunk content.  So we're expecting at least
 
284
            # the rest of this chunk plus an END chunk.
 
285
            return self.bytes_left + 4
 
286
        elif self.state_accept == self._state_accept_expecting_length:
 
287
            if self._in_buffer == '':
 
288
                # We're expecting a chunk length.  There's at least two bytes
 
289
                # left: a digit plus '\n'.
 
290
                return 2
 
291
            else:
 
292
                # We're in the middle of reading a chunk length.  So there's at
 
293
                # least one byte left, the '\n' that terminates the length.
 
294
                return 1
 
295
        elif self.state_accept == self._state_accept_reading_unused:
 
296
            return 1
 
297
        elif self.state_accept == self._state_accept_expecting_header:
 
298
            return max(0, len('chunked\n') - len(self._in_buffer))
 
299
        else:
 
300
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
 
301
 
 
302
    def read_next_chunk(self):
 
303
        try:
 
304
            return self.chunks.popleft()
 
305
        except IndexError:
 
306
            return None
 
307
 
 
308
    def _extract_line(self):
 
309
        pos = self._in_buffer.find('\n')
 
310
        if pos == -1:
 
311
            # We haven't read a complete length prefix yet, so there's nothing
 
312
            # to do.
 
313
            return None
 
314
        line = self._in_buffer[:pos]
 
315
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
 
316
        self._in_buffer = self._in_buffer[pos+1:]
 
317
        return line
 
318
 
 
319
    def _finished(self):
 
320
        self.unused_data = self._in_buffer
 
321
        self._in_buffer = None
 
322
        self.state_accept = self._state_accept_reading_unused
 
323
        if self.error:
 
324
            error_args = tuple(self.error_in_progress)
 
325
            self.chunks.append(request.FailedSmartServerResponse(error_args))
 
326
            self.error_in_progress = None
 
327
        self.finished_reading = True
 
328
 
 
329
    def _state_accept_expecting_header(self, bytes):
 
330
        self._in_buffer += bytes
 
331
        prefix = self._extract_line()
 
332
        if prefix is None:
 
333
            # We haven't read a complete length prefix yet, so there's nothing
 
334
            # to do.
 
335
            return
 
336
        elif prefix == 'chunked':
 
337
            self.state_accept = self._state_accept_expecting_length
 
338
        else:
 
339
            raise errors.SmartProtocolError(
 
340
                'Bad chunked body header: "%s"' % (prefix,))
 
341
 
 
342
    def _state_accept_expecting_length(self, bytes):
 
343
        self._in_buffer += bytes
 
344
        prefix = self._extract_line()
 
345
        if prefix is None:
 
346
            # We haven't read a complete length prefix yet, so there's nothing
 
347
            # to do.
 
348
            return
 
349
        elif prefix == 'ERR':
 
350
            self.error = True
 
351
            self.error_in_progress = []
 
352
            self._state_accept_expecting_length('')
 
353
            return
 
354
        elif prefix == 'END':
 
355
            # We've read the end-of-body marker.
 
356
            # Any further bytes are unused data, including the bytes left in
 
357
            # the _in_buffer.
 
358
            self._finished()
 
359
            return
 
360
        else:
 
361
            self.bytes_left = int(prefix, 16)
 
362
            self.chunk_in_progress = ''
 
363
            self.state_accept = self._state_accept_reading_chunk
 
364
 
 
365
    def _state_accept_reading_chunk(self, bytes):
 
366
        self._in_buffer += bytes
 
367
        in_buffer_len = len(self._in_buffer)
 
368
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
 
369
        self._in_buffer = self._in_buffer[self.bytes_left:]
 
370
        self.bytes_left -= in_buffer_len
 
371
        if self.bytes_left <= 0:
 
372
            # Finished with chunk
 
373
            self.bytes_left = None
 
374
            if self.error:
 
375
                self.error_in_progress.append(self.chunk_in_progress)
 
376
            else:
 
377
                self.chunks.append(self.chunk_in_progress)
 
378
            self.chunk_in_progress = None
 
379
            self.state_accept = self._state_accept_expecting_length
 
380
        
 
381
    def _state_accept_reading_unused(self, bytes):
 
382
        self.unused_data += bytes
 
383
 
 
384
 
 
385
class LengthPrefixedBodyDecoder(_StatefulDecoder):
 
386
    """Decodes the length-prefixed bulk data."""
 
387
    
 
388
    def __init__(self):
 
389
        _StatefulDecoder.__init__(self)
 
390
        self.state_accept = self._state_accept_expecting_length
 
391
        self.state_read = self._state_read_no_data
 
392
        self._in_buffer = ''
 
393
        self._trailer_buffer = ''
 
394
    
229
395
    def next_read_size(self):
230
396
        if self.bytes_left is not None:
231
397
            # Ideally we want to read all the remainder of the body and the
452
618
        return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
453
619
 
454
620
    def _write_protocol_version(self):
455
 
        r"""Write any prefixes this protocol requires.
 
621
        """Write any prefixes this protocol requires.
456
622
        
457
623
        Version two sends the value of REQUEST_VERSION_TWO.
458
624
        """
459
625
        self._request.accept_bytes(REQUEST_VERSION_TWO)
460
626
 
 
627
    def read_streamed_body(self):
 
628
        """Read bytes from the body, decoding into a byte stream.
 
629
        """
 
630
        _body_decoder = ChunkedBodyDecoder()
 
631
        while not _body_decoder.finished_reading:
 
632
            bytes_wanted = _body_decoder.next_read_size()
 
633
            bytes = self._request.read_bytes(bytes_wanted)
 
634
            _body_decoder.accept_bytes(bytes)
 
635
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
 
636
                if 'hpss' in debug.debug_flags:
 
637
                    mutter('              %d byte chunk read',
 
638
                           len(body_bytes))
 
639
                yield body_bytes
 
640
        self._request.finished_reading()
 
641