493
494
class _DebugCounter(object):
494
495
"""An object that counts the HPSS calls made to each client medium.
496
When a medium is garbage-collected, or failing that when
497
bzrlib.global_state exits, the total number of calls made on that medium
498
are reported via trace.note.
497
When a medium is garbage-collected, or failing that when atexit functions
498
are run, the total number of calls made on that medium are reported via
501
502
def __init__(self):
502
503
self.counts = weakref.WeakKeyDictionary()
503
504
client._SmartClient.hooks.install_named_hook(
504
505
'call', self.increment_call_count, 'hpss call counter')
505
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
506
atexit.register(self.flush_all)
507
508
def track(self, medium):
508
509
"""Start tracking calls made to a medium.
733
738
def _read_bytes(self, count):
734
739
"""See SmartClientStreamMedium._read_bytes."""
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
740
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
737
741
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
745
class SmartSSHClientMedium(SmartClientStreamMedium):
746
"""A client medium using SSH."""
744
748
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
753
class SmartSSHClientMedium(SmartClientStreamMedium):
754
"""A client medium using SSH.
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
760
def __init__(self, base, ssh_params, vendor=None):
749
base=None, vendor=None, bzr_remote_path=None):
761
750
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
764
752
:param vendor: An optional override for the ssh vendor to use. See
765
753
bzrlib.transport.ssh for details on ssh vendors.
767
self._real_medium = None
768
self._ssh_params = ssh_params
755
self._connected = False
757
self._password = password
759
self._username = username
769
760
# for the benefit of progress making a short description of this
771
762
self._scheme = 'bzr+ssh'
773
764
# _DebugCounter so we have to store all the values used in our repr
774
765
# method before calling the super init.
775
766
SmartClientStreamMedium.__init__(self, base)
767
self._read_from = None
768
self._ssh_connection = None
776
769
self._vendor = vendor
777
self._ssh_connection = None
770
self._write_to = None
771
self._bzr_remote_path = bzr_remote_path
779
773
def __repr__(self):
780
if self._ssh_params.port is None:
774
if self._port is None:
783
maybe_port = ':%s' % self._ssh_params.port
777
maybe_port = ':%s' % self._port
784
778
return "%s(%s://%s@%s%s/)" % (
785
779
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
791
785
def _accept_bytes(self, bytes):
792
786
"""See SmartClientStreamMedium.accept_bytes."""
793
787
self._ensure_connection()
794
self._real_medium.accept_bytes(bytes)
788
self._write_to.write(bytes)
789
self._report_activity(len(bytes), 'write')
796
791
def disconnect(self):
797
792
"""See SmartClientMedium.disconnect()."""
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
793
if not self._connected:
795
self._read_from.close()
796
self._write_to.close()
797
self._ssh_connection.close()
798
self._connected = False
805
800
def _ensure_connection(self):
806
801
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
809
804
if self._vendor is None:
810
805
vendor = ssh._get_ssh_vendor()
812
807
vendor = self._vendor
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
808
self._ssh_connection = vendor.connect_ssh(self._username,
809
self._password, self._host, self._port,
810
command=[self._bzr_remote_path, 'serve', '--inet',
817
811
'--directory=/', '--allow-writes'])
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
812
self._read_from, self._write_to = \
813
self._ssh_connection.get_filelike_channels()
814
self._connected = True
831
816
def _flush(self):
832
817
"""See SmartClientStreamMedium._flush()."""
833
self._real_medium._flush()
818
self._write_to.flush()
835
820
def _read_bytes(self, count):
836
821
"""See SmartClientStreamMedium.read_bytes."""
837
if self._real_medium is None:
822
if not self._connected:
838
823
raise errors.MediumNotConnected(self)
839
return self._real_medium.read_bytes(count)
824
bytes_to_read = min(count, _MAX_READ_SIZE)
825
bytes = self._read_from.read(bytes_to_read)
826
self._report_activity(len(bytes), 'read')
842
830
# Port 4155 is the default port for bzr://, registered with IANA.
844
832
BZR_DEFAULT_PORT = 4155
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
835
class SmartTCPClientMedium(SmartClientStreamMedium):
836
"""A client medium using TCP."""
853
def __init__(self, base):
838
def __init__(self, host, port, base):
839
"""Creates a client that will connect on the first use."""
854
840
SmartClientStreamMedium.__init__(self, base)
841
self._connected = False
855
844
self._socket = None
856
self._connected = False
858
846
def _accept_bytes(self, bytes):
859
847
"""See SmartClientMedium.accept_bytes."""
860
848
self._ensure_connection()
861
849
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
882
851
def disconnect(self):
883
852
"""See SmartClientMedium.disconnect()."""
884
853
if not self._connected:
936
895
(self._host, port, err_msg))
937
896
self._connected = True
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
"""A client medium for an already connected socket.
943
Note that this class will assume it "owns" the socket, so it will close it
944
when its disconnect method is called.
947
def __init__(self, base, sock):
948
SmartClientSocketMedium.__init__(self, base)
950
self._connected = True
952
def _ensure_connection(self):
953
# Already connected, by definition! So nothing to do.
899
"""See SmartClientStreamMedium._flush().
901
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
add a means to do a flush, but that can be done in the future.
905
def _read_bytes(self, count):
906
"""See SmartClientMedium.read_bytes."""
907
if not self._connected:
908
raise errors.MediumNotConnected(self)
909
return osutils.read_bytes_from_socket(
910
self._socket, self._report_activity)
957
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):