~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-06-18 05:22:35 UTC
  • mfrom: (1551.15.27 Aaron's mergeable stuff)
  • Revision ID: pqm@pqm.ubuntu.com-20070618052235-mvns8j28szyzscy0
Turn list-weave into list-versionedfile

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
21
 
import collections
 
21
 
22
22
from cStringIO import StringIO
23
 
import time
24
23
 
25
 
from bzrlib import debug
26
24
from bzrlib import errors
27
25
from bzrlib.smart import request
28
 
from bzrlib.trace import log_exception_quietly, mutter
29
26
 
30
27
 
31
28
# Protocol version strings.  These are sent as prefixes of bzr requests and
111
108
                raise
112
109
            except Exception, exception:
113
110
                # everything else: pass to client, flush, and quit
114
 
                log_exception_quietly()
115
111
                self._send_response(request.FailedSmartServerResponse(
116
112
                    ('error', str(exception))))
117
113
                return
198
194
        """
199
195
        self._write_func(RESPONSE_VERSION_TWO)
200
196
 
201
 
    def _send_response(self, response):
202
 
        """Send a smart server response down the output stream."""
203
 
        assert not self._finished, 'response already sent'
204
 
        self._finished = True
205
 
        self._write_protocol_version()
206
 
        self._write_success_or_failure_prefix(response)
207
 
        self._write_func(_encode_tuple(response.args))
208
 
        if response.body is not None:
209
 
            assert isinstance(response.body, str), 'body must be a str'
210
 
            assert response.body_stream is None, (
211
 
                'body_stream and body cannot both be set')
212
 
            bytes = self._encode_bulk_data(response.body)
213
 
            self._write_func(bytes)
214
 
        elif response.body_stream is not None:
215
 
            _send_stream(response.body_stream, self._write_func)
216
 
 
217
 
 
218
 
def _send_stream(stream, write_func):
219
 
    write_func('chunked\n')
220
 
    _send_chunks(stream, write_func)
221
 
    write_func('END\n')
222
 
 
223
 
 
224
 
def _send_chunks(stream, write_func):
225
 
    for chunk in stream:
226
 
        if isinstance(chunk, str):
227
 
            bytes = "%x\n%s" % (len(chunk), chunk)
228
 
            write_func(bytes)
229
 
        elif isinstance(chunk, request.FailedSmartServerResponse):
230
 
            write_func('ERR\n')
231
 
            _send_chunks(chunk.args, write_func)
232
 
            return
233
 
        else:
234
 
            raise errors.BzrError(
235
 
                'Chunks must be str or FailedSmartServerResponse, got %r'
236
 
                % chunk)
237
 
 
238
 
 
239
 
class _StatefulDecoder(object):
240
 
 
 
197
 
 
198
class LengthPrefixedBodyDecoder(object):
 
199
    """Decodes the length-prefixed bulk data."""
 
200
    
241
201
    def __init__(self):
 
202
        self.bytes_left = None
242
203
        self.finished_reading = False
243
204
        self.unused_data = ''
244
 
        self.bytes_left = None
245
 
 
 
205
        self.state_accept = self._state_accept_expecting_length
 
206
        self.state_read = self._state_read_no_data
 
207
        self._in_buffer = ''
 
208
        self._trailer_buffer = ''
 
209
    
246
210
    def accept_bytes(self, bytes):
247
211
        """Decode as much of bytes as possible.
248
212
 
259
223
            current_state = self.state_accept
260
224
            self.state_accept('')
261
225
 
262
 
 
263
 
class ChunkedBodyDecoder(_StatefulDecoder):
264
 
    """Decoder for chunked body data.
265
 
 
266
 
    This is very similar the HTTP's chunked encoding.  See the description of
267
 
    streamed body data in `doc/developers/network-protocol.txt` for details.
268
 
    """
269
 
 
270
 
    def __init__(self):
271
 
        _StatefulDecoder.__init__(self)
272
 
        self.state_accept = self._state_accept_expecting_header
273
 
        self._in_buffer = ''
274
 
        self.chunk_in_progress = None
275
 
        self.chunks = collections.deque()
276
 
        self.error = False
277
 
        self.error_in_progress = None
278
 
    
279
 
    def next_read_size(self):
280
 
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
281
 
        # end-of-body marker is 4 bytes: 'END\n'.
282
 
        if self.state_accept == self._state_accept_reading_chunk:
283
 
            # We're expecting more chunk content.  So we're expecting at least
284
 
            # the rest of this chunk plus an END chunk.
285
 
            return self.bytes_left + 4
286
 
        elif self.state_accept == self._state_accept_expecting_length:
287
 
            if self._in_buffer == '':
288
 
                # We're expecting a chunk length.  There's at least two bytes
289
 
                # left: a digit plus '\n'.
290
 
                return 2
291
 
            else:
292
 
                # We're in the middle of reading a chunk length.  So there's at
293
 
                # least one byte left, the '\n' that terminates the length.
294
 
                return 1
295
 
        elif self.state_accept == self._state_accept_reading_unused:
296
 
            return 1
297
 
        elif self.state_accept == self._state_accept_expecting_header:
298
 
            return max(0, len('chunked\n') - len(self._in_buffer))
299
 
        else:
300
 
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
301
 
 
302
 
    def read_next_chunk(self):
303
 
        try:
304
 
            return self.chunks.popleft()
305
 
        except IndexError:
306
 
            return None
307
 
 
308
 
    def _extract_line(self):
309
 
        pos = self._in_buffer.find('\n')
310
 
        if pos == -1:
311
 
            # We haven't read a complete length prefix yet, so there's nothing
312
 
            # to do.
313
 
            return None
314
 
        line = self._in_buffer[:pos]
315
 
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
316
 
        self._in_buffer = self._in_buffer[pos+1:]
317
 
        return line
318
 
 
319
 
    def _finished(self):
320
 
        self.unused_data = self._in_buffer
321
 
        self._in_buffer = None
322
 
        self.state_accept = self._state_accept_reading_unused
323
 
        if self.error:
324
 
            error_args = tuple(self.error_in_progress)
325
 
            self.chunks.append(request.FailedSmartServerResponse(error_args))
326
 
            self.error_in_progress = None
327
 
        self.finished_reading = True
328
 
 
329
 
    def _state_accept_expecting_header(self, bytes):
330
 
        self._in_buffer += bytes
331
 
        prefix = self._extract_line()
332
 
        if prefix is None:
333
 
            # We haven't read a complete length prefix yet, so there's nothing
334
 
            # to do.
335
 
            return
336
 
        elif prefix == 'chunked':
337
 
            self.state_accept = self._state_accept_expecting_length
338
 
        else:
339
 
            raise errors.SmartProtocolError(
340
 
                'Bad chunked body header: "%s"' % (prefix,))
341
 
 
342
 
    def _state_accept_expecting_length(self, bytes):
343
 
        self._in_buffer += bytes
344
 
        prefix = self._extract_line()
345
 
        if prefix is None:
346
 
            # We haven't read a complete length prefix yet, so there's nothing
347
 
            # to do.
348
 
            return
349
 
        elif prefix == 'ERR':
350
 
            self.error = True
351
 
            self.error_in_progress = []
352
 
            self._state_accept_expecting_length('')
353
 
            return
354
 
        elif prefix == 'END':
355
 
            # We've read the end-of-body marker.
356
 
            # Any further bytes are unused data, including the bytes left in
357
 
            # the _in_buffer.
358
 
            self._finished()
359
 
            return
360
 
        else:
361
 
            self.bytes_left = int(prefix, 16)
362
 
            self.chunk_in_progress = ''
363
 
            self.state_accept = self._state_accept_reading_chunk
364
 
 
365
 
    def _state_accept_reading_chunk(self, bytes):
366
 
        self._in_buffer += bytes
367
 
        in_buffer_len = len(self._in_buffer)
368
 
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
369
 
        self._in_buffer = self._in_buffer[self.bytes_left:]
370
 
        self.bytes_left -= in_buffer_len
371
 
        if self.bytes_left <= 0:
372
 
            # Finished with chunk
373
 
            self.bytes_left = None
374
 
            if self.error:
375
 
                self.error_in_progress.append(self.chunk_in_progress)
376
 
            else:
377
 
                self.chunks.append(self.chunk_in_progress)
378
 
            self.chunk_in_progress = None
379
 
            self.state_accept = self._state_accept_expecting_length
380
 
        
381
 
    def _state_accept_reading_unused(self, bytes):
382
 
        self.unused_data += bytes
383
 
 
384
 
 
385
 
class LengthPrefixedBodyDecoder(_StatefulDecoder):
386
 
    """Decodes the length-prefixed bulk data."""
387
 
    
388
 
    def __init__(self):
389
 
        _StatefulDecoder.__init__(self)
390
 
        self.state_accept = self._state_accept_expecting_length
391
 
        self.state_read = self._state_read_no_data
392
 
        self._in_buffer = ''
393
 
        self._trailer_buffer = ''
394
 
    
395
226
    def next_read_size(self):
396
227
        if self.bytes_left is not None:
397
228
            # Ideally we want to read all the remainder of the body and the
466
297
        """
467
298
        self._request = request
468
299
        self._body_buffer = None
469
 
        self._request_start_time = None
470
300
 
471
301
    def call(self, *args):
472
 
        if 'hpss' in debug.debug_flags:
473
 
            mutter('hpss call:   %s', repr(args)[1:-1])
474
 
            self._request_start_time = time.time()
475
302
        self._write_args(args)
476
303
        self._request.finished_writing()
477
304
 
480
307
 
481
308
        After calling this, call read_response_tuple to find the result out.
482
309
        """
483
 
        if 'hpss' in debug.debug_flags:
484
 
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
485
 
            mutter('              %d bytes', len(body))
486
 
            self._request_start_time = time.time()
487
310
        self._write_args(args)
488
311
        bytes = self._encode_bulk_data(body)
489
312
        self._request.accept_bytes(bytes)
495
318
        The body is encoded with one line per readv offset pair. The numbers in
496
319
        each pair are separated by a comma, and no trailing \n is emitted.
497
320
        """
498
 
        if 'hpss' in debug.debug_flags:
499
 
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
500
 
            self._request_start_time = time.time()
501
321
        self._write_args(args)
502
322
        readv_bytes = self._serialise_offsets(body)
503
323
        bytes = self._encode_bulk_data(readv_bytes)
504
324
        self._request.accept_bytes(bytes)
505
325
        self._request.finished_writing()
506
 
        if 'hpss' in debug.debug_flags:
507
 
            mutter('              %d bytes in readv request', len(readv_bytes))
508
326
 
509
327
    def cancel_read_body(self):
510
328
        """After expecting a body, a response code may indicate one otherwise.
521
339
        This should only be called once.
522
340
        """
523
341
        result = self._recv_tuple()
524
 
        if 'hpss' in debug.debug_flags:
525
 
            if self._request_start_time is not None:
526
 
                mutter('   result:   %6.3fs  %s',
527
 
                       time.time() - self._request_start_time,
528
 
                       repr(result)[1:-1])
529
 
                self._request_start_time = None
530
 
            else:
531
 
                mutter('   result:   %s', repr(result)[1:-1])
532
342
        if not expect_body:
533
343
            self._request.finished_reading()
534
344
        return result
550
360
        self._request.finished_reading()
551
361
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
552
362
        # XXX: TODO check the trailer result.
553
 
        if 'hpss' in debug.debug_flags:
554
 
            mutter('              %d body bytes read',
555
 
                   len(self._body_buffer.getvalue()))
556
363
        return self._body_buffer.read(count)
557
364
 
558
365
    def _recv_tuple(self):
565
372
        while not line or line[-1] != '\n':
566
373
            # TODO: this is inefficient - but tuples are short.
567
374
            new_char = self._request.read_bytes(1)
568
 
            if new_char == '':
569
 
                # end of file encountered reading from server
570
 
                raise errors.ConnectionReset(
571
 
                    "please check connectivity and permissions",
572
 
                    "(and try -Dhpss if further diagnosis is required)")
573
375
            line += new_char
 
376
            assert new_char != '', "end of file reading from server."
574
377
        return line
575
378
 
576
379
    def query_version(self):
618
421
        return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
619
422
 
620
423
    def _write_protocol_version(self):
621
 
        """Write any prefixes this protocol requires.
 
424
        r"""Write any prefixes this protocol requires.
622
425
        
623
426
        Version two sends the value of REQUEST_VERSION_TWO.
624
427
        """
625
428
        self._request.accept_bytes(REQUEST_VERSION_TWO)
626
429
 
627
 
    def read_streamed_body(self):
628
 
        """Read bytes from the body, decoding into a byte stream.
629
 
        """
630
 
        _body_decoder = ChunkedBodyDecoder()
631
 
        while not _body_decoder.finished_reading:
632
 
            bytes_wanted = _body_decoder.next_read_size()
633
 
            bytes = self._request.read_bytes(bytes_wanted)
634
 
            _body_decoder.accept_bytes(bytes)
635
 
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
636
 
                if 'hpss' in debug.debug_flags:
637
 
                    mutter('              %d byte chunk read',
638
 
                           len(body_bytes))
639
 
                yield body_bytes
640
 
        self._request.finished_reading()
641