~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Andrew Bennetts
  • Date: 2010-06-16 05:47:02 UTC
  • mto: This revision was merged to the branch mainline in revision 5299.
  • Revision ID: andrew.bennetts@canonical.com-20100616054702-hho3e72ebs7biki8
Use socketpairs (rather than pipes) for SSH subprocesses where possible, and formalise some internal APIs a little more.

Show diffs side-by-side

added added

removed removed

Lines of Context:
715
715
    """A client medium using simple pipes.
716
716
 
717
717
    This client does not manage the pipes: it assumes they will always be open.
718
 
 
719
 
    Note that if readable_pipe.read might raise IOError or OSError with errno
720
 
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
721
 
    (such as used for sys.stdin) are safe.
722
718
    """
723
719
 
724
720
    def __init__(self, readable_pipe, writeable_pipe, base):
737
733
 
738
734
    def _read_bytes(self, count):
739
735
        """See SmartClientStreamMedium._read_bytes."""
740
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
736
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
737
        bytes = self._readable_pipe.read(bytes_to_read)
741
738
        self._report_activity(len(bytes), 'read')
742
739
        return bytes
743
740
 
744
741
 
 
742
class SSHParams(object):
 
743
 
 
744
    def __init__(self, host, port=None, username=None, password=None,
 
745
            bzr_remote_path='bzr'):
 
746
        """Creates a client that will connect on the first use.
 
747
 
 
748
        """
 
749
        self.host = host
 
750
        self.port = port
 
751
        self.username = username
 
752
        self.password = password
 
753
        self.bzr_remote_path = bzr_remote_path
 
754
 
 
755
 
745
756
class SmartSSHClientMedium(SmartClientStreamMedium):
746
 
    """A client medium using SSH."""
 
757
    """A client medium using SSH.
 
758
    
 
759
    It delegates IO to a SmartClientSocketMedium or
 
760
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
761
    """
747
762
 
748
 
    def __init__(self, host, port=None, username=None, password=None,
749
 
            base=None, vendor=None, bzr_remote_path=None):
 
763
    def __init__(self, base, ssh_params, vendor=None):
750
764
        """Creates a client that will connect on the first use.
751
765
 
 
766
        :param ssh_params: A SSHParams instance.
752
767
        :param vendor: An optional override for the ssh vendor to use. See
753
768
            bzrlib.transport.ssh for details on ssh vendors.
754
769
        """
755
 
        self._connected = False
756
 
        self._host = host
757
 
        self._password = password
758
 
        self._port = port
759
 
        self._username = username
 
770
        self._real_medium = None
 
771
        self._ssh_params = ssh_params
760
772
        # for the benefit of progress making a short description of this
761
773
        # transport
762
774
        self._scheme = 'bzr+ssh'
764
776
        # _DebugCounter so we have to store all the values used in our repr
765
777
        # method before calling the super init.
766
778
        SmartClientStreamMedium.__init__(self, base)
767
 
        self._read_from = None
 
779
        self._vendor = vendor
768
780
        self._ssh_connection = None
769
 
        self._vendor = vendor
770
 
        self._write_to = None
771
 
        self._bzr_remote_path = bzr_remote_path
772
781
 
773
782
    def __repr__(self):
774
 
        if self._port is None:
 
783
        if self._ssh_params.port is None:
775
784
            maybe_port = ''
776
785
        else:
777
 
            maybe_port = ':%s' % self._port
 
786
            maybe_port = ':%s' % self._ssh_params.port
778
787
        return "%s(%s://%s@%s%s/)" % (
779
788
            self.__class__.__name__,
780
789
            self._scheme,
781
 
            self._username,
782
 
            self._host,
 
790
            self._ssh_params.username,
 
791
            self._ssh_params.host,
783
792
            maybe_port)
784
793
 
785
794
    def _accept_bytes(self, bytes):
786
795
        """See SmartClientStreamMedium.accept_bytes."""
787
796
        self._ensure_connection()
788
 
        self._write_to.write(bytes)
789
 
        self._report_activity(len(bytes), 'write')
 
797
        # XXX: Perhaps should use accept_bytes rather than _accept_bytes?
 
798
        self._real_medium._accept_bytes(bytes)
790
799
 
791
800
    def disconnect(self):
792
801
        """See SmartClientMedium.disconnect()."""
793
 
        if not self._connected:
794
 
            return
795
 
        self._read_from.close()
796
 
        self._write_to.close()
797
 
        self._ssh_connection.close()
798
 
        self._connected = False
 
802
        if self._real_medium is not None:
 
803
            self._real_medium.disconnect()
 
804
            self._real_medium = None
 
805
        if self._ssh_connection is not None:
 
806
            self._ssh_connection.close()
 
807
            self._ssh_connection = None
799
808
 
800
809
    def _ensure_connection(self):
801
810
        """Connect this medium if not already connected."""
802
 
        if self._connected:
 
811
        if self._real_medium is not None:
803
812
            return
804
813
        if self._vendor is None:
805
814
            vendor = ssh._get_ssh_vendor()
806
815
        else:
807
816
            vendor = self._vendor
808
 
        self._ssh_connection = vendor.connect_ssh(self._username,
809
 
                self._password, self._host, self._port,
810
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
817
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
818
                self._ssh_params.password, self._ssh_params.host,
 
819
                self._ssh_params.port,
 
820
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
821
                         '--directory=/', '--allow-writes'])
812
 
        self._read_from, self._write_to = \
813
 
            self._ssh_connection.get_filelike_channels()
814
 
        self._connected = True
 
822
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
823
        if io_kind == 'socket':
 
824
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
825
                self.base, io_object)
 
826
        elif io_kind == 'pipes':
 
827
            read_from, write_to = io_object
 
828
            self._real_medium = SmartSimplePipesClientMedium(
 
829
                read_from, write_to, self.base)
 
830
        else:
 
831
            raise AssertionError(
 
832
                "Unexpected io_kind %r from %r"
 
833
                % (io_kind, self._ssh_connection))
815
834
 
816
835
    def _flush(self):
817
836
        """See SmartClientStreamMedium._flush()."""
818
 
        self._write_to.flush()
 
837
        self._real_medium._flush()
819
838
 
820
839
    def _read_bytes(self, count):
821
840
        """See SmartClientStreamMedium.read_bytes."""
822
 
        if not self._connected:
 
841
        if self._real_medium is None:
823
842
            raise errors.MediumNotConnected(self)
824
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
825
 
        bytes = self._read_from.read(bytes_to_read)
826
 
        self._report_activity(len(bytes), 'read')
827
 
        return bytes
 
843
        # XXX: perhaps should delegate to read_bytes, not _read_bytes?
 
844
        return self._real_medium._read_bytes(count)
828
845
 
829
846
 
830
847
# Port 4155 is the default port for bzr://, registered with IANA.
832
849
BZR_DEFAULT_PORT = 4155
833
850
 
834
851
 
835
 
class SmartTCPClientMedium(SmartClientStreamMedium):
836
 
    """A client medium using TCP."""
 
852
class SmartClientSocketMedium(SmartClientStreamMedium):
 
853
    """A client medium using sockets."""
837
854
 
838
 
    def __init__(self, host, port, base):
 
855
    def __init__(self, base):
839
856
        """Creates a client that will connect on the first use."""
840
857
        SmartClientStreamMedium.__init__(self, base)
 
858
        self._socket = None
841
859
        self._connected = False
842
 
        self._host = host
843
 
        self._port = port
844
 
        self._socket = None
845
860
 
846
861
    def _accept_bytes(self, bytes):
847
862
        """See SmartClientMedium.accept_bytes."""
848
863
        self._ensure_connection()
849
864
        osutils.send_all(self._socket, bytes, self._report_activity)
850
865
 
 
866
    def _flush(self):
 
867
        """See SmartClientStreamMedium._flush().
 
868
 
 
869
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
870
        add a means to do a flush, but that can be done in the future.
 
871
        """
 
872
 
 
873
    def _read_bytes(self, count):
 
874
        """See SmartClientMedium.read_bytes."""
 
875
        if not self._connected:
 
876
            raise errors.MediumNotConnected(self)
 
877
        return osutils.read_bytes_from_socket(
 
878
            self._socket, self._report_activity)
 
879
 
851
880
    def disconnect(self):
852
881
        """See SmartClientMedium.disconnect()."""
853
882
        if not self._connected:
856
885
        self._socket = None
857
886
        self._connected = False
858
887
 
 
888
 
 
889
class SmartTCPClientMedium(SmartClientSocketMedium):
 
890
 
 
891
    def __init__(self, host, port, base):
 
892
        """Creates a client that will connect on the first use."""
 
893
        SmartClientSocketMedium.__init__(self, base)
 
894
        self._connected = False
 
895
        self._host = host
 
896
        self._port = port
 
897
        self._socket = None
 
898
 
859
899
    def _ensure_connection(self):
860
900
        """Connect this medium if not already connected."""
861
901
        if self._connected:
895
935
                    (self._host, port, err_msg))
896
936
        self._connected = True
897
937
 
898
 
    def _flush(self):
899
 
        """See SmartClientStreamMedium._flush().
900
 
 
901
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
 
        add a means to do a flush, but that can be done in the future.
903
 
        """
904
 
 
905
 
    def _read_bytes(self, count):
906
 
        """See SmartClientMedium.read_bytes."""
907
 
        if not self._connected:
908
 
            raise errors.MediumNotConnected(self)
909
 
        return osutils.read_bytes_from_socket(
910
 
            self._socket, self._report_activity)
 
938
 
 
939
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
940
    """A SmartClientSocketMedium for an already-connected socket.
 
941
    
 
942
    Note that this class will assume it "owns" the socket, so it will close it
 
943
    when its disconnect method is called.
 
944
    """
 
945
 
 
946
    def __init__(self, base, sock):
 
947
        SmartClientSocketMedium.__init__(self, base)
 
948
        self._socket = sock
 
949
        self._connected = True
 
950
 
 
951
    def _ensure_connection(self):
 
952
        # Already connected, by definition!  So nothing to do.
 
953
        pass
911
954
 
912
955
 
913
956
class SmartClientStreamMediumRequest(SmartClientMediumRequest):