~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: John Arbash Meinel
  • Date: 2008-07-11 21:41:24 UTC
  • mto: This revision was merged to the branch mainline in revision 3543.
  • Revision ID: john@arbash-meinel.com-20080711214124-qi09irlj7pd5cuzg
Shortcut the case when one revision is in the ancestry of the other.

At the cost of a heads() check, when one parent supersedes, we don't have to extract
the text for the other. Changes merge time from 3m37s => 3m21s. Using a
CachingParentsProvider would drop the time down to 3m11s.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
 
from bzrlib.util.bencode import bdecode_as_tuple, bencode
 
32
from bzrlib.util.bencode import bdecode, bencode
33
33
 
34
34
 
35
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
323
323
 
324
324
    def __init__(self):
325
325
        self.finished_reading = False
326
 
        self._in_buffer_list = []
327
 
        self._in_buffer_len = 0
 
326
        self._in_buffer = ''
328
327
        self.unused_data = ''
329
328
        self.bytes_left = None
330
329
        self._number_needed_bytes = None
331
330
 
332
 
    def _get_in_buffer(self):
333
 
        if len(self._in_buffer_list) == 1:
334
 
            return self._in_buffer_list[0]
335
 
        in_buffer = ''.join(self._in_buffer_list)
336
 
        if len(in_buffer) != self._in_buffer_len:
337
 
            raise AssertionError(
338
 
                "Length of buffer did not match expected value: %s != %s"
339
 
                % self._in_buffer_len, len(in_buffer))
340
 
        self._in_buffer_list = [in_buffer]
341
 
        return in_buffer
342
 
 
343
 
    def _get_in_bytes(self, count):
344
 
        """Grab X bytes from the input_buffer.
345
 
 
346
 
        Callers should have already checked that self._in_buffer_len is >
347
 
        count. Note, this does not consume the bytes from the buffer. The
348
 
        caller will still need to call _get_in_buffer() and then
349
 
        _set_in_buffer() if they actually need to consume the bytes.
350
 
        """
351
 
        # check if we can yield the bytes from just the first entry in our list
352
 
        if len(self._in_buffer_list) == 0:
353
 
            raise AssertionError('Callers must be sure we have buffered bytes'
354
 
                ' before calling _get_in_bytes')
355
 
        if len(self._in_buffer_list[0]) > count:
356
 
            return self._in_buffer_list[0][:count]
357
 
        # We can't yield it from the first buffer, so collapse all buffers, and
358
 
        # yield it from that
359
 
        in_buf = self._get_in_buffer()
360
 
        return in_buf[:count]
361
 
 
362
 
    def _set_in_buffer(self, new_buf):
363
 
        if new_buf is not None:
364
 
            self._in_buffer_list = [new_buf]
365
 
            self._in_buffer_len = len(new_buf)
366
 
        else:
367
 
            self._in_buffer_list = []
368
 
            self._in_buffer_len = 0
369
 
 
370
331
    def accept_bytes(self, bytes):
371
332
        """Decode as much of bytes as possible.
372
333
 
377
338
        data will be appended to self.unused_data.
378
339
        """
379
340
        # accept_bytes is allowed to change the state
 
341
        current_state = self.state_accept
380
342
        self._number_needed_bytes = None
381
 
        # lsprof puts a very large amount of time on this specific call for
382
 
        # large readv arrays
383
 
        self._in_buffer_list.append(bytes)
384
 
        self._in_buffer_len += len(bytes)
 
343
        self._in_buffer += bytes
385
344
        try:
386
345
            # Run the function for the current state.
387
 
            current_state = self.state_accept
388
346
            self.state_accept()
389
347
            while current_state != self.state_accept:
390
348
                # The current state has changed.  Run the function for the new
421
379
            # the rest of this chunk plus an END chunk.
422
380
            return self.bytes_left + 4
423
381
        elif self.state_accept == self._state_accept_expecting_length:
424
 
            if self._in_buffer_len == 0:
 
382
            if self._in_buffer == '':
425
383
                # We're expecting a chunk length.  There's at least two bytes
426
384
                # left: a digit plus '\n'.
427
385
                return 2
432
390
        elif self.state_accept == self._state_accept_reading_unused:
433
391
            return 1
434
392
        elif self.state_accept == self._state_accept_expecting_header:
435
 
            return max(0, len('chunked\n') - self._in_buffer_len)
 
393
            return max(0, len('chunked\n') - len(self._in_buffer))
436
394
        else:
437
395
            raise AssertionError("Impossible state: %r" % (self.state_accept,))
438
396
 
443
401
            return None
444
402
 
445
403
    def _extract_line(self):
446
 
        in_buf = self._get_in_buffer()
447
 
        pos = in_buf.find('\n')
 
404
        pos = self._in_buffer.find('\n')
448
405
        if pos == -1:
449
406
            # We haven't read a complete line yet, so request more bytes before
450
407
            # we continue.
451
408
            raise _NeedMoreBytes(1)
452
 
        line = in_buf[:pos]
 
409
        line = self._in_buffer[:pos]
453
410
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
454
 
        self._set_in_buffer(in_buf[pos+1:])
 
411
        self._in_buffer = self._in_buffer[pos+1:]
455
412
        return line
456
413
 
457
414
    def _finished(self):
458
 
        self.unused_data = self._get_in_buffer()
459
 
        self._in_buffer_list = []
460
 
        self._in_buffer_len = 0
 
415
        self.unused_data = self._in_buffer
 
416
        self._in_buffer = ''
461
417
        self.state_accept = self._state_accept_reading_unused
462
418
        if self.error:
463
419
            error_args = tuple(self.error_in_progress)
492
448
            self.state_accept = self._state_accept_reading_chunk
493
449
 
494
450
    def _state_accept_reading_chunk(self):
495
 
        in_buf = self._get_in_buffer()
496
 
        in_buffer_len = len(in_buf)
497
 
        self.chunk_in_progress += in_buf[:self.bytes_left]
498
 
        self._set_in_buffer(in_buf[self.bytes_left:])
 
451
        in_buffer_len = len(self._in_buffer)
 
452
        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
 
453
        self._in_buffer = self._in_buffer[self.bytes_left:]
499
454
        self.bytes_left -= in_buffer_len
500
455
        if self.bytes_left <= 0:
501
456
            # Finished with chunk
508
463
            self.state_accept = self._state_accept_expecting_length
509
464
        
510
465
    def _state_accept_reading_unused(self):
511
 
        self.unused_data += self._get_in_buffer()
512
 
        self._in_buffer_list = []
 
466
        self.unused_data += self._in_buffer
 
467
        self._in_buffer = ''
513
468
 
514
469
 
515
470
class LengthPrefixedBodyDecoder(_StatefulDecoder):
543
498
        return self.state_read()
544
499
 
545
500
    def _state_accept_expecting_length(self):
546
 
        in_buf = self._get_in_buffer()
547
 
        pos = in_buf.find('\n')
 
501
        pos = self._in_buffer.find('\n')
548
502
        if pos == -1:
549
503
            return
550
 
        self.bytes_left = int(in_buf[:pos])
551
 
        self._set_in_buffer(in_buf[pos+1:])
 
504
        self.bytes_left = int(self._in_buffer[:pos])
 
505
        self._in_buffer = self._in_buffer[pos+1:]
552
506
        self.state_accept = self._state_accept_reading_body
553
507
        self.state_read = self._state_read_body_buffer
554
508
 
555
509
    def _state_accept_reading_body(self):
556
 
        in_buf = self._get_in_buffer()
557
 
        self._body += in_buf
558
 
        self.bytes_left -= len(in_buf)
559
 
        self._set_in_buffer(None)
 
510
        self._body += self._in_buffer
 
511
        self.bytes_left -= len(self._in_buffer)
 
512
        self._in_buffer = ''
560
513
        if self.bytes_left <= 0:
561
514
            # Finished with body
562
515
            if self.bytes_left != 0:
566
519
            self.state_accept = self._state_accept_reading_trailer
567
520
        
568
521
    def _state_accept_reading_trailer(self):
569
 
        self._trailer_buffer += self._get_in_buffer()
570
 
        self._set_in_buffer(None)
 
522
        self._trailer_buffer += self._in_buffer
 
523
        self._in_buffer = ''
571
524
        # TODO: what if the trailer does not match "done\n"?  Should this raise
572
525
        # a ProtocolViolation exception?
573
526
        if self._trailer_buffer.startswith('done\n'):
576
529
            self.finished_reading = True
577
530
    
578
531
    def _state_accept_reading_unused(self):
579
 
        self.unused_data += self._get_in_buffer()
580
 
        self._set_in_buffer(None)
 
532
        self.unused_data += self._in_buffer
 
533
        self._in_buffer = ''
581
534
 
582
535
    def _state_read_no_data(self):
583
536
        return ''
655
608
        if 'hpss' in debug.debug_flags:
656
609
            mutter('              %d bytes in readv request', len(readv_bytes))
657
610
        self._last_verb = args[0]
658
 
    
659
 
    def call_with_body_stream(self, args, stream):
660
 
        # Protocols v1 and v2 don't support body streams.  So it's safe to
661
 
        # assume that a v1/v2 server doesn't support whatever method we're
662
 
        # trying to call with a body stream.
663
 
        self._request.finished_writing()
664
 
        self._request.finished_reading()
665
 
        raise errors.UnknownSmartMethod(args[0])
666
611
 
667
612
    def cancel_read_body(self):
668
613
        """After expecting a body, a response code may indicate one otherwise.
753
698
            return self._body_buffer.read(count)
754
699
        _body_decoder = LengthPrefixedBodyDecoder()
755
700
 
 
701
        # Read no more than 64k at a time so that we don't risk error 10055 (no
 
702
        # buffer space available) on Windows.
 
703
        max_read = 64 * 1024
756
704
        while not _body_decoder.finished_reading:
757
 
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
 
705
            bytes_wanted = min(_body_decoder.next_read_size(), max_read)
 
706
            bytes = self._request.read_bytes(bytes_wanted)
758
707
            if bytes == '':
759
708
                # end of file encountered reading from server
760
709
                raise errors.ConnectionReset(
770
719
 
771
720
    def _recv_tuple(self):
772
721
        """Receive a tuple from the medium request."""
773
 
        return _decode_tuple(self._request.read_line())
 
722
        return _decode_tuple(self._recv_line())
 
723
 
 
724
    def _recv_line(self):
 
725
        """Read an entire line from the medium request."""
 
726
        line = ''
 
727
        while not line or line[-1] != '\n':
 
728
            # TODO: this is inefficient - but tuples are short.
 
729
            new_char = self._request.read_bytes(1)
 
730
            if new_char == '':
 
731
                # end of file encountered reading from server
 
732
                raise errors.ConnectionReset(
 
733
                    "please check connectivity and permissions",
 
734
                    "(and try -Dhpss if further diagnosis is required)")
 
735
            line += new_char
 
736
        return line
774
737
 
775
738
    def query_version(self):
776
739
        """Return protocol version number of the server."""
813
776
        if version != self.response_marker:
814
777
            self._request.finished_reading()
815
778
            raise errors.UnexpectedProtocolVersionMarker(version)
816
 
        response_status = self._request.read_line()
 
779
        response_status = self._recv_line()
817
780
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
818
781
        self._response_is_unknown_method(result)
819
782
        if response_status == 'success\n':
841
804
        """
842
805
        # Read no more than 64k at a time so that we don't risk error 10055 (no
843
806
        # buffer space available) on Windows.
 
807
        max_read = 64 * 1024
844
808
        _body_decoder = ChunkedBodyDecoder()
845
809
        while not _body_decoder.finished_reading:
846
 
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
 
810
            bytes_wanted = min(_body_decoder.next_read_size(), max_read)
 
811
            bytes = self._request.read_bytes(bytes_wanted)
847
812
            if bytes == '':
848
813
                # end of file encountered reading from server
849
814
                raise errors.ConnectionReset(
920
885
            self.message_handler.protocol_error(exception)
921
886
 
922
887
    def _extract_length_prefixed_bytes(self):
923
 
        if self._in_buffer_len < 4:
 
888
        if len(self._in_buffer) < 4:
924
889
            # A length prefix by itself is 4 bytes, and we don't even have that
925
890
            # many yet.
926
891
            raise _NeedMoreBytes(4)
927
 
        (length,) = struct.unpack('!L', self._get_in_bytes(4))
 
892
        (length,) = struct.unpack('!L', self._in_buffer[:4])
928
893
        end_of_bytes = 4 + length
929
 
        if self._in_buffer_len < end_of_bytes:
 
894
        if len(self._in_buffer) < end_of_bytes:
930
895
            # We haven't yet read as many bytes as the length-prefix says there
931
896
            # are.
932
897
            raise _NeedMoreBytes(end_of_bytes)
933
898
        # Extract the bytes from the buffer.
934
 
        in_buf = self._get_in_buffer()
935
 
        bytes = in_buf[4:end_of_bytes]
936
 
        self._set_in_buffer(in_buf[end_of_bytes:])
 
899
        bytes = self._in_buffer[4:end_of_bytes]
 
900
        self._in_buffer = self._in_buffer[end_of_bytes:]
937
901
        return bytes
938
902
 
939
903
    def _extract_prefixed_bencoded_data(self):
940
904
        prefixed_bytes = self._extract_length_prefixed_bytes()
941
905
        try:
942
 
            decoded = bdecode_as_tuple(prefixed_bytes)
 
906
            decoded = bdecode(prefixed_bytes)
943
907
        except ValueError:
944
908
            raise errors.SmartProtocolError(
945
909
                'Bytes %r not bencoded' % (prefixed_bytes,))
946
910
        return decoded
947
911
 
948
912
    def _extract_single_byte(self):
949
 
        if self._in_buffer_len == 0:
 
913
        if self._in_buffer == '':
950
914
            # The buffer is empty
951
915
            raise _NeedMoreBytes(1)
952
 
        in_buf = self._get_in_buffer()
953
 
        one_byte = in_buf[0]
954
 
        self._set_in_buffer(in_buf[1:])
 
916
        one_byte = self._in_buffer[0]
 
917
        self._in_buffer = self._in_buffer[1:]
955
918
        return one_byte
956
919
 
957
920
    def _state_accept_expecting_protocol_version(self):
958
 
        needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
959
 
        in_buf = self._get_in_buffer()
 
921
        needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
960
922
        if needed_bytes > 0:
961
923
            # We don't have enough bytes to check if the protocol version
962
924
            # marker is right.  But we can check if it is already wrong by
966
928
            # len(MESSAGE_VERSION_THREE) bytes.  So if the bytes we have so far
967
929
            # are wrong then we should just raise immediately rather than
968
930
            # stall.]
969
 
            if not MESSAGE_VERSION_THREE.startswith(in_buf):
 
931
            if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
970
932
                # We have enough bytes to know the protocol version is wrong
971
 
                raise errors.UnexpectedProtocolVersionMarker(in_buf)
 
933
                raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
972
934
            raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
973
 
        if not in_buf.startswith(MESSAGE_VERSION_THREE):
974
 
            raise errors.UnexpectedProtocolVersionMarker(in_buf)
975
 
        self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
 
935
        if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
 
936
            raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
 
937
        self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
976
938
        self.state_accept = self._state_accept_expecting_headers
977
939
 
978
940
    def _state_accept_expecting_headers(self):
1027
989
            raise errors.SmartMessageHandlerError(sys.exc_info())
1028
990
 
1029
991
    def done(self):
1030
 
        self.unused_data = self._get_in_buffer()
1031
 
        self._set_in_buffer(None)
 
992
        self.unused_data = self._in_buffer
 
993
        self._in_buffer = ''
1032
994
        self.state_accept = self._state_accept_reading_unused
1033
995
        try:
1034
996
            self.message_handler.end_received()
1036
998
            raise errors.SmartMessageHandlerError(sys.exc_info())
1037
999
 
1038
1000
    def _state_accept_reading_unused(self):
1039
 
        self.unused_data = self._get_in_buffer()
1040
 
        self._set_in_buffer(None)
 
1001
        self.unused_data += self._in_buffer
 
1002
        self._in_buffer = ''
1041
1003
 
1042
1004
    def next_read_size(self):
1043
1005
        if self.state_accept == self._state_accept_reading_unused:
1050
1012
            return 0
1051
1013
        else:
1052
1014
            if self._number_needed_bytes is not None:
1053
 
                return self._number_needed_bytes - self._in_buffer_len
 
1015
                return self._number_needed_bytes - len(self._in_buffer)
1054
1016
            else:
1055
1017
                raise AssertionError("don't know how many bytes are expected!")
1056
1018
 
1108
1070
        self._write_func(struct.pack('!L', len(bytes)))
1109
1071
        self._write_func(bytes)
1110
1072
 
1111
 
    def _write_chunked_body_start(self):
1112
 
        self._write_func('oC')
1113
 
 
1114
1073
    def _write_error_status(self):
1115
1074
        self._write_func('oE')
1116
1075
 
1228
1187
        self._write_end()
1229
1188
        self._medium_request.finished_writing()
1230
1189
 
1231
 
    def call_with_body_stream(self, args, stream):
1232
 
        if 'hpss' in debug.debug_flags:
1233
 
            mutter('hpss call w/body stream: %r', args)
1234
 
            path = getattr(self._medium_request._medium, '_path', None)
1235
 
            if path is not None:
1236
 
                mutter('                  (to %s)', path)
1237
 
            self._request_start_time = time.time()
1238
 
        self._write_protocol_version()
1239
 
        self._write_headers(self._headers)
1240
 
        self._write_structure(args)
1241
 
        # TODO: notice if the server has sent an early error reply before we
1242
 
        #       have finished sending the stream.  We would notice at the end
1243
 
        #       anyway, but if the medium can deliver it early then it's good
1244
 
        #       to short-circuit the whole request...
1245
 
        try:
1246
 
            for part in stream:
1247
 
                self._write_prefixed_body(part)
1248
 
                self.flush()
1249
 
        except Exception:
1250
 
            # Iterating the stream failed.  Cleanly abort the request.
1251
 
            self._write_error_status()
1252
 
            # Currently the client unconditionally sends ('error',) as the
1253
 
            # error args.
1254
 
            self._write_structure(('error',))
1255
 
        self._write_end()
1256
 
        self._medium_request.finished_writing()
1257