~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Martin Pool
  • Date: 2010-06-02 05:03:31 UTC
  • mto: This revision was merged to the branch mainline in revision 5279.
  • Revision ID: mbp@canonical.com-20100602050331-n2p1qt8hfsahspnv
Correct more sloppy use of the term 'Linux'

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
import sys
29
29
import urllib
30
30
 
31
 
import bzrlib
32
31
from bzrlib.lazy_import import lazy_import
33
32
lazy_import(globals(), """
 
33
import atexit
34
34
import socket
35
35
import thread
36
36
import weakref
494
494
class _DebugCounter(object):
495
495
    """An object that counts the HPSS calls made to each client medium.
496
496
 
497
 
    When a medium is garbage-collected, or failing that when
498
 
    bzrlib.global_state exits, the total number of calls made on that medium
499
 
    are reported via trace.note.
 
497
    When a medium is garbage-collected, or failing that when atexit functions
 
498
    are run, the total number of calls made on that medium are reported via
 
499
    trace.note.
500
500
    """
501
501
 
502
502
    def __init__(self):
503
503
        self.counts = weakref.WeakKeyDictionary()
504
504
        client._SmartClient.hooks.install_named_hook(
505
505
            'call', self.increment_call_count, 'hpss call counter')
506
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
506
        atexit.register(self.flush_all)
507
507
 
508
508
    def track(self, medium):
509
509
        """Start tracking calls made to a medium.
715
715
    """A client medium using simple pipes.
716
716
 
717
717
    This client does not manage the pipes: it assumes they will always be open.
 
718
 
 
719
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
720
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
721
    (such as used for sys.stdin) are safe.
718
722
    """
719
723
 
720
724
    def __init__(self, readable_pipe, writeable_pipe, base):
733
737
 
734
738
    def _read_bytes(self, count):
735
739
        """See SmartClientStreamMedium._read_bytes."""
736
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
737
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
740
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
738
741
        self._report_activity(len(bytes), 'read')
739
742
        return bytes
740
743
 
741
744
 
742
 
class SSHParams(object):
743
 
    """A set of parameters for starting a remote bzr via SSH."""
 
745
class SmartSSHClientMedium(SmartClientStreamMedium):
 
746
    """A client medium using SSH."""
744
747
 
745
748
    def __init__(self, host, port=None, username=None, password=None,
746
 
            bzr_remote_path='bzr'):
747
 
        self.host = host
748
 
        self.port = port
749
 
        self.username = username
750
 
        self.password = password
751
 
        self.bzr_remote_path = bzr_remote_path
752
 
 
753
 
 
754
 
class SmartSSHClientMedium(SmartClientStreamMedium):
755
 
    """A client medium using SSH.
756
 
    
757
 
    It delegates IO to a SmartClientSocketMedium or
758
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
759
 
    """
760
 
 
761
 
    def __init__(self, base, ssh_params, vendor=None):
 
749
            base=None, vendor=None, bzr_remote_path=None):
762
750
        """Creates a client that will connect on the first use.
763
751
 
764
 
        :param ssh_params: A SSHParams instance.
765
752
        :param vendor: An optional override for the ssh vendor to use. See
766
753
            bzrlib.transport.ssh for details on ssh vendors.
767
754
        """
768
 
        self._real_medium = None
769
 
        self._ssh_params = ssh_params
 
755
        self._connected = False
 
756
        self._host = host
 
757
        self._password = password
 
758
        self._port = port
 
759
        self._username = username
770
760
        # for the benefit of progress making a short description of this
771
761
        # transport
772
762
        self._scheme = 'bzr+ssh'
774
764
        # _DebugCounter so we have to store all the values used in our repr
775
765
        # method before calling the super init.
776
766
        SmartClientStreamMedium.__init__(self, base)
 
767
        self._read_from = None
 
768
        self._ssh_connection = None
777
769
        self._vendor = vendor
778
 
        self._ssh_connection = None
 
770
        self._write_to = None
 
771
        self._bzr_remote_path = bzr_remote_path
779
772
 
780
773
    def __repr__(self):
781
 
        if self._ssh_params.port is None:
 
774
        if self._port is None:
782
775
            maybe_port = ''
783
776
        else:
784
 
            maybe_port = ':%s' % self._ssh_params.port
 
777
            maybe_port = ':%s' % self._port
785
778
        return "%s(%s://%s@%s%s/)" % (
786
779
            self.__class__.__name__,
787
780
            self._scheme,
788
 
            self._ssh_params.username,
789
 
            self._ssh_params.host,
 
781
            self._username,
 
782
            self._host,
790
783
            maybe_port)
791
784
 
792
785
    def _accept_bytes(self, bytes):
793
786
        """See SmartClientStreamMedium.accept_bytes."""
794
787
        self._ensure_connection()
795
 
        self._real_medium.accept_bytes(bytes)
 
788
        self._write_to.write(bytes)
 
789
        self._report_activity(len(bytes), 'write')
796
790
 
797
791
    def disconnect(self):
798
792
        """See SmartClientMedium.disconnect()."""
799
 
        if self._real_medium is not None:
800
 
            self._real_medium.disconnect()
801
 
            self._real_medium = None
802
 
        if self._ssh_connection is not None:
803
 
            self._ssh_connection.close()
804
 
            self._ssh_connection = None
 
793
        if not self._connected:
 
794
            return
 
795
        self._read_from.close()
 
796
        self._write_to.close()
 
797
        self._ssh_connection.close()
 
798
        self._connected = False
805
799
 
806
800
    def _ensure_connection(self):
807
801
        """Connect this medium if not already connected."""
808
 
        if self._real_medium is not None:
 
802
        if self._connected:
809
803
            return
810
804
        if self._vendor is None:
811
805
            vendor = ssh._get_ssh_vendor()
812
806
        else:
813
807
            vendor = self._vendor
814
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
 
                self._ssh_params.password, self._ssh_params.host,
816
 
                self._ssh_params.port,
817
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
808
        self._ssh_connection = vendor.connect_ssh(self._username,
 
809
                self._password, self._host, self._port,
 
810
                command=[self._bzr_remote_path, 'serve', '--inet',
818
811
                         '--directory=/', '--allow-writes'])
819
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
 
        if io_kind == 'socket':
821
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
 
                self.base, io_object)
823
 
        elif io_kind == 'pipes':
824
 
            read_from, write_to = io_object
825
 
            self._real_medium = SmartSimplePipesClientMedium(
826
 
                read_from, write_to, self.base)
827
 
        else:
828
 
            raise AssertionError(
829
 
                "Unexpected io_kind %r from %r"
830
 
                % (io_kind, self._ssh_connection))
 
812
        self._read_from, self._write_to = \
 
813
            self._ssh_connection.get_filelike_channels()
 
814
        self._connected = True
831
815
 
832
816
    def _flush(self):
833
817
        """See SmartClientStreamMedium._flush()."""
834
 
        self._real_medium._flush()
 
818
        self._write_to.flush()
835
819
 
836
820
    def _read_bytes(self, count):
837
821
        """See SmartClientStreamMedium.read_bytes."""
838
 
        if self._real_medium is None:
 
822
        if not self._connected:
839
823
            raise errors.MediumNotConnected(self)
840
 
        return self._real_medium.read_bytes(count)
 
824
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
825
        bytes = self._read_from.read(bytes_to_read)
 
826
        self._report_activity(len(bytes), 'read')
 
827
        return bytes
841
828
 
842
829
 
843
830
# Port 4155 is the default port for bzr://, registered with IANA.
845
832
BZR_DEFAULT_PORT = 4155
846
833
 
847
834
 
848
 
class SmartClientSocketMedium(SmartClientStreamMedium):
849
 
    """A client medium using a socket.
850
 
    
851
 
    This class isn't usable directly.  Use one of its subclasses instead.
852
 
    """
 
835
class SmartTCPClientMedium(SmartClientStreamMedium):
 
836
    """A client medium using TCP."""
853
837
 
854
 
    def __init__(self, base):
 
838
    def __init__(self, host, port, base):
 
839
        """Creates a client that will connect on the first use."""
855
840
        SmartClientStreamMedium.__init__(self, base)
 
841
        self._connected = False
 
842
        self._host = host
 
843
        self._port = port
856
844
        self._socket = None
857
 
        self._connected = False
858
845
 
859
846
    def _accept_bytes(self, bytes):
860
847
        """See SmartClientMedium.accept_bytes."""
861
848
        self._ensure_connection()
862
849
        osutils.send_all(self._socket, bytes, self._report_activity)
863
850
 
864
 
    def _ensure_connection(self):
865
 
        """Connect this medium if not already connected."""
866
 
        raise NotImplementedError(self._ensure_connection)
867
 
 
868
 
    def _flush(self):
869
 
        """See SmartClientStreamMedium._flush().
870
 
 
871
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
872
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
873
 
        future.
874
 
        """
875
 
 
876
 
    def _read_bytes(self, count):
877
 
        """See SmartClientMedium.read_bytes."""
878
 
        if not self._connected:
879
 
            raise errors.MediumNotConnected(self)
880
 
        return osutils.read_bytes_from_socket(
881
 
            self._socket, self._report_activity)
882
 
 
883
851
    def disconnect(self):
884
852
        """See SmartClientMedium.disconnect()."""
885
853
        if not self._connected:
888
856
        self._socket = None
889
857
        self._connected = False
890
858
 
891
 
 
892
 
class SmartTCPClientMedium(SmartClientSocketMedium):
893
 
    """A client medium that creates a TCP connection."""
894
 
 
895
 
    def __init__(self, host, port, base):
896
 
        """Creates a client that will connect on the first use."""
897
 
        SmartClientSocketMedium.__init__(self, base)
898
 
        self._host = host
899
 
        self._port = port
900
 
 
901
859
    def _ensure_connection(self):
902
860
        """Connect this medium if not already connected."""
903
861
        if self._connected:
937
895
                    (self._host, port, err_msg))
938
896
        self._connected = True
939
897
 
940
 
 
941
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
 
    """A client medium for an already connected socket.
943
 
    
944
 
    Note that this class will assume it "owns" the socket, so it will close it
945
 
    when its disconnect method is called.
946
 
    """
947
 
 
948
 
    def __init__(self, base, sock):
949
 
        SmartClientSocketMedium.__init__(self, base)
950
 
        self._socket = sock
951
 
        self._connected = True
952
 
 
953
 
    def _ensure_connection(self):
954
 
        # Already connected, by definition!  So nothing to do.
955
 
        pass
 
898
    def _flush(self):
 
899
        """See SmartClientStreamMedium._flush().
 
900
 
 
901
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
902
        add a means to do a flush, but that can be done in the future.
 
903
        """
 
904
 
 
905
    def _read_bytes(self, count):
 
906
        """See SmartClientMedium.read_bytes."""
 
907
        if not self._connected:
 
908
            raise errors.MediumNotConnected(self)
 
909
        return osutils.read_bytes_from_socket(
 
910
            self._socket, self._report_activity)
956
911
 
957
912
 
958
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):