~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
21
 
 
 
21
import collections
22
22
from cStringIO import StringIO
 
23
import time
23
24
 
 
25
from bzrlib import debug
24
26
from bzrlib import errors
25
27
from bzrlib.smart import request
 
28
from bzrlib.trace import log_exception_quietly, mutter
26
29
 
27
30
 
28
31
# Protocol version strings.  These are sent as prefixes of bzr requests and
108
111
                raise
109
112
            except Exception, exception:
110
113
                # everything else: pass to client, flush, and quit
 
114
                log_exception_quietly()
111
115
                self._send_response(request.FailedSmartServerResponse(
112
116
                    ('error', str(exception))))
113
117
                return
194
198
        """
195
199
        self._write_func(RESPONSE_VERSION_TWO)
196
200
 
197
 
 
198
 
class LengthPrefixedBodyDecoder(object):
199
 
    """Decodes the length-prefixed bulk data."""
200
 
    
 
201
    def _send_response(self, response):
 
202
        """Send a smart server response down the output stream."""
 
203
        assert not self._finished, 'response already sent'
 
204
        self._finished = True
 
205
        self._write_protocol_version()
 
206
        self._write_success_or_failure_prefix(response)
 
207
        self._write_func(_encode_tuple(response.args))
 
208
        if response.body is not None:
 
209
            assert isinstance(response.body, str), 'body must be a str'
 
210
            assert response.body_stream is None, (
 
211
                'body_stream and body cannot both be set')
 
212
            bytes = self._encode_bulk_data(response.body)
 
213
            self._write_func(bytes)
 
214
        elif response.body_stream is not None:
 
215
            _send_stream(response.body_stream, self._write_func)
 
216
 
 
217
 
 
218
def _send_stream(stream, write_func):
 
219
    write_func('chunked\n')
 
220
    _send_chunks(stream, write_func)
 
221
    write_func('END\n')
 
222
 
 
223
 
 
224
def _send_chunks(stream, write_func):
 
225
    for chunk in stream:
 
226
        if isinstance(chunk, str):
 
227
            bytes = "%x\n%s" % (len(chunk), chunk)
 
228
            write_func(bytes)
 
229
        elif isinstance(chunk, request.FailedSmartServerResponse):
 
230
            write_func('ERR\n')
 
231
            _send_chunks(chunk.args, write_func)
 
232
            return
 
233
        else:
 
234
            raise errors.BzrError(
 
235
                'Chunks must be str or FailedSmartServerResponse, got %r'
 
236
                % chunk)
 
237
 
 
238
 
 
239
class _StatefulDecoder(object):
 
240
 
201
241
    def __init__(self):
202
 
        self.bytes_left = None
203
242
        self.finished_reading = False
204
243
        self.unused_data = ''
205
 
        self.state_accept = self._state_accept_expecting_length
206
 
        self.state_read = self._state_read_no_data
207
 
        self._in_buffer = ''
208
 
        self._trailer_buffer = ''
209
 
    
 
244
        self.bytes_left = None
 
245
 
210
246
    def accept_bytes(self, bytes):
211
247
        """Decode as much of bytes as possible.
212
248
 
223
259
            current_state = self.state_accept
224
260
            self.state_accept('')
225
261
 
 
262
 
 
263
class ChunkedBodyDecoder(_StatefulDecoder):
 
264
    """Decoder for chunked body data.
 
265
 
 
266
    This is very similar the HTTP's chunked encoding.  See the description of
 
267
    streamed body data in `doc/developers/network-protocol.txt` for details.
 
268
    """
 
269
 
 
270
    def __init__(self):
 
271
        _StatefulDecoder.__init__(self)
 
272
        self.state_accept = self._state_accept_expecting_header
 
273
        self._in_buffer = ''
 
274
        self.chunk_in_progress = None
 
275
        self.chunks = collections.deque()
 
276
        self.error = False
 
277
        self.error_in_progress = None
 
278
    
 
279
    def next_read_size(self):
 
280
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
 
281
        # end-of-body marker is 4 bytes: 'END\n'.
 
282
        if self.state_accept == self._state_accept_reading_chunk:
 
283
            # We're expecting more chunk content.  So we're expecting at least
 
284
            # the rest of this chunk plus an END chunk.
 
285
            return self.bytes_left + 4
 
286
        elif self.state_accept == self._state_accept_expecting_length:
 
287
            if self._in_buffer == '':
 
288
                # We're expecting a chunk length.  There's at least two bytes
 
289
                # left: a digit plus '\n'.
 
290
                return 2
 
291
            else:
 
292
                # We're in the middle of reading a chunk length.  So there's at
 
293
                # least one byte left, the '\n' that terminates the length.
 
294
                return 1
 
295
        elif self.state_accept == self._state_accept_reading_unused:
 
296
            return 1
 
297
        elif self.state_accept == self._state_accept_expecting_header:
 
298
            return max(0, len('chunked\n') - len(self._in_buffer))
 
299
        else:
 
300
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
 
301
 
 
302
    def read_next_chunk(self):
 
303
        try:
 
304
            return self.chunks.popleft()
 
305
        except IndexError:
 
306
            return None
 
307
 
 
308
    def _extract_line(self):
 
309
        pos = self._in_buffer.find('\n')
 
310
        if pos == -1:
 
311
            # We haven't read a complete length prefix yet, so there's nothing
 
312
            # to do.
 
313
            return None
 
314
        line = self._in_buffer[:pos]
 
315
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
 
316
        self._in_buffer = self._in_buffer[pos+1:]
 
317
        return line
 
318
 
 
319
    def _finished(self):
 
320
        self.unused_data = self._in_buffer
 
321
        self._in_buffer = None
 
322
        self.state_accept = self._state_accept_reading_unused
 
323
        if self.error:
 
324
            error_args = tuple(self.error_in_progress)
 
325
            self.chunks.append(request.FailedSmartServerResponse(error_args))
 
326
            self.error_in_progress = None
 
327
        self.finished_reading = True
 
328
 
 
329
    def _state_accept_expecting_header(self, bytes):
 
330
        self._in_buffer += bytes
 
331
        prefix = self._extract_line()
 
332
        if prefix is None:
 
333
            # We haven't read a complete length prefix yet, so there's nothing
 
334
            # to do.
 
335
            return
 
336
        elif prefix == 'chunked':
 
337
            self.state_accept = self._state_accept_expecting_length
 
338
        else:
 
339
            raise errors.SmartProtocolError(
 
340
                'Bad chunked body header: "%s"' % (prefix,))
 
341
 
 
342
    def _state_accept_expecting_length(self, bytes):
 
343
        self._in_buffer += bytes
 
344
        prefix = self._extract_line()
 
345
        if prefix is None:
 
346
            # We haven't read a complete length prefix yet, so there's nothing
 
347
            # to do.
 
348
            return
 
349
        elif prefix == 'ERR':
 
350
            self.error = True
 
351
            self.error_in_progress = []
 
352
            self._state_accept_expecting_length('')
 
353
            return
 
354
        elif prefix == 'END':
 
355
            # We've read the end-of-body marker.
 
356
            # Any further bytes are unused data, including the bytes left in
 
357
            # the _in_buffer.
 
358
            self._finished()
 
359
            return
 
360
        else:
 
361
            self.bytes_left = int(prefix, 16)
 
362
            self.chunk_in_progress = ''
 
363
            self.state_accept = self._state_accept_reading_chunk
 
364
 
 
365
    def _state_accept_reading_chunk(self, bytes):
 
366
        self._in_buffer += bytes
 
367
        in_buffer_len = len(self._in_buffer)
 
368
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
 
369
        self._in_buffer = self._in_buffer[self.bytes_left:]
 
370
        self.bytes_left -= in_buffer_len
 
371
        if self.bytes_left <= 0:
 
372
            # Finished with chunk
 
373
            self.bytes_left = None
 
374
            if self.error:
 
375
                self.error_in_progress.append(self.chunk_in_progress)
 
376
            else:
 
377
                self.chunks.append(self.chunk_in_progress)
 
378
            self.chunk_in_progress = None
 
379
            self.state_accept = self._state_accept_expecting_length
 
380
        
 
381
    def _state_accept_reading_unused(self, bytes):
 
382
        self.unused_data += bytes
 
383
 
 
384
 
 
385
class LengthPrefixedBodyDecoder(_StatefulDecoder):
 
386
    """Decodes the length-prefixed bulk data."""
 
387
    
 
388
    def __init__(self):
 
389
        _StatefulDecoder.__init__(self)
 
390
        self.state_accept = self._state_accept_expecting_length
 
391
        self.state_read = self._state_read_no_data
 
392
        self._in_buffer = ''
 
393
        self._trailer_buffer = ''
 
394
    
226
395
    def next_read_size(self):
227
396
        if self.bytes_left is not None:
228
397
            # Ideally we want to read all the remainder of the body and the
297
466
        """
298
467
        self._request = request
299
468
        self._body_buffer = None
 
469
        self._request_start_time = None
300
470
 
301
471
    def call(self, *args):
 
472
        if 'hpss' in debug.debug_flags:
 
473
            mutter('hpss call:   %s', repr(args)[1:-1])
 
474
            if getattr(self._request._medium, 'base', None) is not None:
 
475
                mutter('             (to %s)', self._request._medium.base)
 
476
            self._request_start_time = time.time()
302
477
        self._write_args(args)
303
478
        self._request.finished_writing()
304
479
 
307
482
 
308
483
        After calling this, call read_response_tuple to find the result out.
309
484
        """
 
485
        if 'hpss' in debug.debug_flags:
 
486
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
 
487
            if getattr(self._request._medium, '_path', None) is not None:
 
488
                mutter('                  (to %s)', self._request._medium._path)
 
489
            mutter('              %d bytes', len(body))
 
490
            self._request_start_time = time.time()
 
491
            if 'hpssdetail' in debug.debug_flags:
 
492
                mutter('hpss body content: %s', body)
310
493
        self._write_args(args)
311
494
        bytes = self._encode_bulk_data(body)
312
495
        self._request.accept_bytes(bytes)
318
501
        The body is encoded with one line per readv offset pair. The numbers in
319
502
        each pair are separated by a comma, and no trailing \n is emitted.
320
503
        """
 
504
        if 'hpss' in debug.debug_flags:
 
505
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
 
506
            if getattr(self._request._medium, '_path', None) is not None:
 
507
                mutter('                  (to %s)', self._request._medium._path)
 
508
            self._request_start_time = time.time()
321
509
        self._write_args(args)
322
510
        readv_bytes = self._serialise_offsets(body)
323
511
        bytes = self._encode_bulk_data(readv_bytes)
324
512
        self._request.accept_bytes(bytes)
325
513
        self._request.finished_writing()
 
514
        if 'hpss' in debug.debug_flags:
 
515
            mutter('              %d bytes in readv request', len(readv_bytes))
326
516
 
327
517
    def cancel_read_body(self):
328
518
        """After expecting a body, a response code may indicate one otherwise.
339
529
        This should only be called once.
340
530
        """
341
531
        result = self._recv_tuple()
 
532
        if 'hpss' in debug.debug_flags:
 
533
            if self._request_start_time is not None:
 
534
                mutter('   result:   %6.3fs  %s',
 
535
                       time.time() - self._request_start_time,
 
536
                       repr(result)[1:-1])
 
537
                self._request_start_time = None
 
538
            else:
 
539
                mutter('   result:   %s', repr(result)[1:-1])
342
540
        if not expect_body:
343
541
            self._request.finished_reading()
344
542
        return result
353
551
            return self._body_buffer.read(count)
354
552
        _body_decoder = LengthPrefixedBodyDecoder()
355
553
 
 
554
        # Read no more than 64k at a time so that we don't risk error 10055 (no
 
555
        # buffer space available) on Windows.
 
556
        max_read = 64 * 1024
356
557
        while not _body_decoder.finished_reading:
357
 
            bytes_wanted = _body_decoder.next_read_size()
 
558
            bytes_wanted = min(_body_decoder.next_read_size(), max_read)
358
559
            bytes = self._request.read_bytes(bytes_wanted)
359
560
            _body_decoder.accept_bytes(bytes)
360
561
        self._request.finished_reading()
361
562
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
362
563
        # XXX: TODO check the trailer result.
 
564
        if 'hpss' in debug.debug_flags:
 
565
            mutter('              %d body bytes read',
 
566
                   len(self._body_buffer.getvalue()))
363
567
        return self._body_buffer.read(count)
364
568
 
365
569
    def _recv_tuple(self):
372
576
        while not line or line[-1] != '\n':
373
577
            # TODO: this is inefficient - but tuples are short.
374
578
            new_char = self._request.read_bytes(1)
 
579
            if new_char == '':
 
580
                # end of file encountered reading from server
 
581
                raise errors.ConnectionReset(
 
582
                    "please check connectivity and permissions",
 
583
                    "(and try -Dhpss if further diagnosis is required)")
375
584
            line += new_char
376
 
            assert new_char != '', "end of file reading from server."
377
585
        return line
378
586
 
379
587
    def query_version(self):
421
629
        return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
422
630
 
423
631
    def _write_protocol_version(self):
424
 
        r"""Write any prefixes this protocol requires.
 
632
        """Write any prefixes this protocol requires.
425
633
        
426
634
        Version two sends the value of REQUEST_VERSION_TWO.
427
635
        """
428
636
        self._request.accept_bytes(REQUEST_VERSION_TWO)
429
637
 
 
638
    def read_streamed_body(self):
 
639
        """Read bytes from the body, decoding into a byte stream.
 
640
        """
 
641
        # Read no more than 64k at a time so that we don't risk error 10055 (no
 
642
        # buffer space available) on Windows.
 
643
        max_read = 64 * 1024
 
644
        _body_decoder = ChunkedBodyDecoder()
 
645
        while not _body_decoder.finished_reading:
 
646
            bytes_wanted = min(_body_decoder.next_read_size(), max_read)
 
647
            bytes = self._request.read_bytes(bytes_wanted)
 
648
            _body_decoder.accept_bytes(bytes)
 
649
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
 
650
                if 'hpss' in debug.debug_flags:
 
651
                    mutter('              %d byte chunk read',
 
652
                           len(body_bytes))
 
653
                yield body_bytes
 
654
        self._request.finished_reading()
 
655