738
734
def _read_bytes(self, count):
739
735
"""See SmartClientStreamMedium._read_bytes."""
740
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
736
bytes_to_read = min(count, _MAX_READ_SIZE)
737
bytes = self._readable_pipe.read(bytes_to_read)
741
738
self._report_activity(len(bytes), 'read')
742
class SSHParams(object):
744
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
746
"""Creates a client that will connect on the first use.
751
self.username = username
752
self.password = password
753
self.bzr_remote_path = bzr_remote_path
745
756
class SmartSSHClientMedium(SmartClientStreamMedium):
746
"""A client medium using SSH."""
757
"""A client medium using SSH.
759
It delegates IO to a SmartClientSocketMedium or
760
SmartClientAlreadyConnectedSocketMedium (depending on platform).
748
def __init__(self, host, port=None, username=None, password=None,
749
base=None, vendor=None, bzr_remote_path=None):
763
def __init__(self, base, ssh_params, vendor=None):
750
764
"""Creates a client that will connect on the first use.
766
:param ssh_params: A SSHParams instance.
752
767
:param vendor: An optional override for the ssh vendor to use. See
753
768
bzrlib.transport.ssh for details on ssh vendors.
755
self._connected = False
757
self._password = password
759
self._username = username
770
self._real_medium = None
771
self._ssh_params = ssh_params
760
772
# for the benefit of progress making a short description of this
762
774
self._scheme = 'bzr+ssh'
764
776
# _DebugCounter so we have to store all the values used in our repr
765
777
# method before calling the super init.
766
778
SmartClientStreamMedium.__init__(self, base)
767
self._read_from = None
779
self._vendor = vendor
768
780
self._ssh_connection = None
769
self._vendor = vendor
770
self._write_to = None
771
self._bzr_remote_path = bzr_remote_path
773
782
def __repr__(self):
774
if self._port is None:
783
if self._ssh_params.port is None:
777
maybe_port = ':%s' % self._port
786
maybe_port = ':%s' % self._ssh_params.port
778
787
return "%s(%s://%s@%s%s/)" % (
779
788
self.__class__.__name__,
790
self._ssh_params.username,
791
self._ssh_params.host,
785
794
def _accept_bytes(self, bytes):
786
795
"""See SmartClientStreamMedium.accept_bytes."""
787
796
self._ensure_connection()
788
self._write_to.write(bytes)
789
self._report_activity(len(bytes), 'write')
797
# XXX: Perhaps should use accept_bytes rather than _accept_bytes?
798
self._real_medium._accept_bytes(bytes)
791
800
def disconnect(self):
792
801
"""See SmartClientMedium.disconnect()."""
793
if not self._connected:
795
self._read_from.close()
796
self._write_to.close()
797
self._ssh_connection.close()
798
self._connected = False
802
if self._real_medium is not None:
803
self._real_medium.disconnect()
804
self._real_medium = None
805
if self._ssh_connection is not None:
806
self._ssh_connection.close()
807
self._ssh_connection = None
800
809
def _ensure_connection(self):
801
810
"""Connect this medium if not already connected."""
811
if self._real_medium is not None:
804
813
if self._vendor is None:
805
814
vendor = ssh._get_ssh_vendor()
807
816
vendor = self._vendor
808
self._ssh_connection = vendor.connect_ssh(self._username,
809
self._password, self._host, self._port,
810
command=[self._bzr_remote_path, 'serve', '--inet',
817
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
818
self._ssh_params.password, self._ssh_params.host,
819
self._ssh_params.port,
820
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
821
'--directory=/', '--allow-writes'])
812
self._read_from, self._write_to = \
813
self._ssh_connection.get_filelike_channels()
814
self._connected = True
822
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
823
if io_kind == 'socket':
824
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
825
self.base, io_object)
826
elif io_kind == 'pipes':
827
read_from, write_to = io_object
828
self._real_medium = SmartSimplePipesClientMedium(
829
read_from, write_to, self.base)
831
raise AssertionError(
832
"Unexpected io_kind %r from %r"
833
% (io_kind, self._ssh_connection))
816
835
def _flush(self):
817
836
"""See SmartClientStreamMedium._flush()."""
818
self._write_to.flush()
837
self._real_medium._flush()
820
839
def _read_bytes(self, count):
821
840
"""See SmartClientStreamMedium.read_bytes."""
822
if not self._connected:
841
if self._real_medium is None:
823
842
raise errors.MediumNotConnected(self)
824
bytes_to_read = min(count, _MAX_READ_SIZE)
825
bytes = self._read_from.read(bytes_to_read)
826
self._report_activity(len(bytes), 'read')
843
# XXX: perhaps should delegate to read_bytes, not _read_bytes?
844
return self._real_medium._read_bytes(count)
830
847
# Port 4155 is the default port for bzr://, registered with IANA.
832
849
BZR_DEFAULT_PORT = 4155
835
class SmartTCPClientMedium(SmartClientStreamMedium):
836
"""A client medium using TCP."""
852
class SmartClientSocketMedium(SmartClientStreamMedium):
853
"""A client medium using sockets."""
838
def __init__(self, host, port, base):
855
def __init__(self, base):
839
856
"""Creates a client that will connect on the first use."""
840
857
SmartClientStreamMedium.__init__(self, base)
841
859
self._connected = False
846
861
def _accept_bytes(self, bytes):
847
862
"""See SmartClientMedium.accept_bytes."""
848
863
self._ensure_connection()
849
864
osutils.send_all(self._socket, bytes, self._report_activity)
867
"""See SmartClientStreamMedium._flush().
869
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
870
add a means to do a flush, but that can be done in the future.
873
def _read_bytes(self, count):
874
"""See SmartClientMedium.read_bytes."""
875
if not self._connected:
876
raise errors.MediumNotConnected(self)
877
return osutils.read_bytes_from_socket(
878
self._socket, self._report_activity)
851
880
def disconnect(self):
852
881
"""See SmartClientMedium.disconnect()."""
853
882
if not self._connected:
895
935
(self._host, port, err_msg))
896
936
self._connected = True
899
"""See SmartClientStreamMedium._flush().
901
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
add a means to do a flush, but that can be done in the future.
905
def _read_bytes(self, count):
906
"""See SmartClientMedium.read_bytes."""
907
if not self._connected:
908
raise errors.MediumNotConnected(self)
909
return osutils.read_bytes_from_socket(
910
self._socket, self._report_activity)
939
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
940
"""A SmartClientSocketMedium for an already-connected socket.
942
Note that this class will assume it "owns" the socket, so it will close it
943
when its disconnect method is called.
946
def __init__(self, base, sock):
947
SmartClientSocketMedium.__init__(self, base)
949
self._connected = True
951
def _ensure_connection(self):
952
# Already connected, by definition! So nothing to do.
913
956
class SmartClientStreamMediumRequest(SmartClientMediumRequest):