284
287
self._push_back(protocol.unused_data)
286
289
def _read_bytes(self, desired_count):
287
return osutils.read_bytes_from_socket(
288
self.socket, self._report_activity)
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
290
293
def terminate_due_to_error(self):
291
294
# TODO: This should log to a server log file, but no such thing
292
295
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
294
297
self.finished = True
296
299
def _write_out(self, bytes):
297
300
tstart = osutils.timer_func()
298
301
osutils.send_all(self.socket, bytes, self._report_activity)
299
302
if 'hpss' in debug.debug_flags:
300
thread_id = thread.get_ident()
303
thread_id = threading.currentThread().ident
301
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
302
305
% ('wrote', thread_id, len(bytes),
303
306
osutils.timer_func() - tstart))
724
721
def _accept_bytes(self, bytes):
725
722
"""See SmartClientStreamMedium.accept_bytes."""
726
self._writeable_pipe.write(bytes)
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
727
724
self._report_activity(len(bytes), 'write')
729
726
def _flush(self):
730
727
"""See SmartClientStreamMedium._flush()."""
731
self._writeable_pipe.flush()
728
osutils.until_no_eintr(self._writeable_pipe.flush)
733
730
def _read_bytes(self, count):
734
731
"""See SmartClientStreamMedium._read_bytes."""
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
737
733
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
737
class SmartSSHClientMedium(SmartClientStreamMedium):
738
"""A client medium using SSH."""
744
740
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
753
class SmartSSHClientMedium(SmartClientStreamMedium):
754
"""A client medium using SSH.
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
760
def __init__(self, base, ssh_params, vendor=None):
741
base=None, vendor=None, bzr_remote_path=None):
761
742
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
764
744
:param vendor: An optional override for the ssh vendor to use. See
765
745
bzrlib.transport.ssh for details on ssh vendors.
767
self._real_medium = None
768
self._ssh_params = ssh_params
769
# for the benefit of progress making a short description of this
771
self._scheme = 'bzr+ssh'
747
self._connected = False
749
self._password = password
751
self._username = username
772
752
# SmartClientStreamMedium stores the repr of this object in its
773
753
# _DebugCounter so we have to store all the values used in our repr
774
754
# method before calling the super init.
775
755
SmartClientStreamMedium.__init__(self, base)
756
self._read_from = None
757
self._ssh_connection = None
776
758
self._vendor = vendor
777
self._ssh_connection = None
759
self._write_to = None
760
self._bzr_remote_path = bzr_remote_path
761
# for the benefit of progress making a short description of this
763
self._scheme = 'bzr+ssh'
779
765
def __repr__(self):
780
if self._ssh_params.port is None:
783
maybe_port = ':%s' % self._ssh_params.port
784
return "%s(%s://%s@%s%s/)" % (
766
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
785
767
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
791
773
def _accept_bytes(self, bytes):
792
774
"""See SmartClientStreamMedium.accept_bytes."""
793
775
self._ensure_connection()
794
self._real_medium.accept_bytes(bytes)
776
osutils.until_no_eintr(self._write_to.write, bytes)
777
self._report_activity(len(bytes), 'write')
796
779
def disconnect(self):
797
780
"""See SmartClientMedium.disconnect()."""
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
781
if not self._connected:
783
osutils.until_no_eintr(self._read_from.close)
784
osutils.until_no_eintr(self._write_to.close)
785
self._ssh_connection.close()
786
self._connected = False
805
788
def _ensure_connection(self):
806
789
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
809
792
if self._vendor is None:
810
793
vendor = ssh._get_ssh_vendor()
812
795
vendor = self._vendor
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
796
self._ssh_connection = vendor.connect_ssh(self._username,
797
self._password, self._host, self._port,
798
command=[self._bzr_remote_path, 'serve', '--inet',
817
799
'--directory=/', '--allow-writes'])
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
800
self._read_from, self._write_to = \
801
self._ssh_connection.get_filelike_channels()
802
self._connected = True
831
804
def _flush(self):
832
805
"""See SmartClientStreamMedium._flush()."""
833
self._real_medium._flush()
806
self._write_to.flush()
835
808
def _read_bytes(self, count):
836
809
"""See SmartClientStreamMedium.read_bytes."""
837
if self._real_medium is None:
810
if not self._connected:
838
811
raise errors.MediumNotConnected(self)
839
return self._real_medium.read_bytes(count)
812
bytes_to_read = min(count, _MAX_READ_SIZE)
813
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
814
self._report_activity(len(bytes), 'read')
842
818
# Port 4155 is the default port for bzr://, registered with IANA.
844
820
BZR_DEFAULT_PORT = 4155
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
823
class SmartTCPClientMedium(SmartClientStreamMedium):
824
"""A client medium using TCP."""
853
def __init__(self, base):
826
def __init__(self, host, port, base):
827
"""Creates a client that will connect on the first use."""
854
828
SmartClientStreamMedium.__init__(self, base)
829
self._connected = False
855
832
self._socket = None
856
self._connected = False
858
834
def _accept_bytes(self, bytes):
859
835
"""See SmartClientMedium.accept_bytes."""
860
836
self._ensure_connection()
861
837
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
882
839
def disconnect(self):
883
840
"""See SmartClientMedium.disconnect()."""
884
841
if not self._connected:
843
osutils.until_no_eintr(self._socket.close)
887
844
self._socket = None
888
845
self._connected = False
891
class SmartTCPClientMedium(SmartClientSocketMedium):
892
"""A client medium that creates a TCP connection."""
894
def __init__(self, host, port, base):
895
"""Creates a client that will connect on the first use."""
896
SmartClientSocketMedium.__init__(self, base)
900
847
def _ensure_connection(self):
901
848
"""Connect this medium if not already connected."""
902
849
if self._connected:
936
883
(self._host, port, err_msg))
937
884
self._connected = True
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
"""A client medium for an already connected socket.
943
Note that this class will assume it "owns" the socket, so it will close it
944
when its disconnect method is called.
947
def __init__(self, base, sock):
948
SmartClientSocketMedium.__init__(self, base)
950
self._connected = True
952
def _ensure_connection(self):
953
# Already connected, by definition! So nothing to do.
887
"""See SmartClientStreamMedium._flush().
889
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
890
add a means to do a flush, but that can be done in the future.
893
def _read_bytes(self, count):
894
"""See SmartClientMedium.read_bytes."""
895
if not self._connected:
896
raise errors.MediumNotConnected(self)
897
return _read_bytes_from_socket(
898
self._socket.recv, count, self._report_activity)
957
901
class SmartClientStreamMediumRequest(SmartClientMediumRequest):