494
494
class _DebugCounter(object):
495
495
"""An object that counts the HPSS calls made to each client medium.
497
When a medium is garbage-collected, or failing that when atexit functions
498
are run, the total number of calls made on that medium are reported via
497
When a medium is garbage-collected, or failing that when
498
bzrlib.global_state exits, the total number of calls made on that medium
499
are reported via trace.note.
502
502
def __init__(self):
503
503
self.counts = weakref.WeakKeyDictionary()
504
504
client._SmartClient.hooks.install_named_hook(
505
505
'call', self.increment_call_count, 'hpss call counter')
506
atexit.register(self.flush_all)
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
508
508
def track(self, medium):
509
509
"""Start tracking calls made to a medium.
607
607
# which is newer than a previously supplied older-than version.
608
608
# This indicates that some smart verb call is not guarded
609
609
# appropriately (it should simply not have been tried).
610
raise AssertionError(
611
611
"_remember_remote_is_before(%r) called, but "
612
612
"_remember_remote_is_before(%r) was called previously."
613
% (version_tuple, self._remote_version_is_before))
613
, version_tuple, self._remote_version_is_before)
614
if 'hpss' in debug.debug_flags:
615
ui.ui_factory.show_warning(
616
"_remember_remote_is_before(%r) called, but "
617
"_remember_remote_is_before(%r) was called previously."
618
% (version_tuple, self._remote_version_is_before))
614
620
self._remote_version_is_before = version_tuple
616
622
def protocol_version(self):
732
734
def _read_bytes(self, count):
733
735
"""See SmartClientStreamMedium._read_bytes."""
734
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
736
bytes_to_read = min(count, _MAX_READ_SIZE)
737
bytes = self._readable_pipe.read(bytes_to_read)
735
738
self._report_activity(len(bytes), 'read')
742
class SSHParams(object):
743
"""A set of parameters for starting a remote bzr via SSH."""
745
def __init__(self, host, port=None, username=None, password=None,
746
bzr_remote_path='bzr'):
749
self.username = username
750
self.password = password
751
self.bzr_remote_path = bzr_remote_path
739
754
class SmartSSHClientMedium(SmartClientStreamMedium):
740
"""A client medium using SSH."""
755
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
758
SmartClientAlreadyConnectedSocketMedium (depending on platform).
742
def __init__(self, host, port=None, username=None, password=None,
743
base=None, vendor=None, bzr_remote_path=None):
761
def __init__(self, base, ssh_params, vendor=None):
744
762
"""Creates a client that will connect on the first use.
764
:param ssh_params: A SSHParams instance.
746
765
:param vendor: An optional override for the ssh vendor to use. See
747
766
bzrlib.transport.ssh for details on ssh vendors.
749
self._connected = False
751
self._password = password
753
self._username = username
768
self._real_medium = None
769
self._ssh_params = ssh_params
754
770
# for the benefit of progress making a short description of this
756
772
self._scheme = 'bzr+ssh'
758
774
# _DebugCounter so we have to store all the values used in our repr
759
775
# method before calling the super init.
760
776
SmartClientStreamMedium.__init__(self, base)
761
self._read_from = None
777
self._vendor = vendor
762
778
self._ssh_connection = None
763
self._vendor = vendor
764
self._write_to = None
765
self._bzr_remote_path = bzr_remote_path
767
780
def __repr__(self):
768
if self._port is None:
781
if self._ssh_params.port is None:
771
maybe_port = ':%s' % self._port
784
maybe_port = ':%s' % self._ssh_params.port
772
785
return "%s(%s://%s@%s%s/)" % (
773
786
self.__class__.__name__,
788
self._ssh_params.username,
789
self._ssh_params.host,
779
792
def _accept_bytes(self, bytes):
780
793
"""See SmartClientStreamMedium.accept_bytes."""
781
794
self._ensure_connection()
782
self._write_to.write(bytes)
783
self._report_activity(len(bytes), 'write')
795
self._real_medium.accept_bytes(bytes)
785
797
def disconnect(self):
786
798
"""See SmartClientMedium.disconnect()."""
787
if not self._connected:
789
self._read_from.close()
790
self._write_to.close()
791
self._ssh_connection.close()
792
self._connected = False
799
if self._real_medium is not None:
800
self._real_medium.disconnect()
801
self._real_medium = None
802
if self._ssh_connection is not None:
803
self._ssh_connection.close()
804
self._ssh_connection = None
794
806
def _ensure_connection(self):
795
807
"""Connect this medium if not already connected."""
808
if self._real_medium is not None:
798
810
if self._vendor is None:
799
811
vendor = ssh._get_ssh_vendor()
801
813
vendor = self._vendor
802
self._ssh_connection = vendor.connect_ssh(self._username,
803
self._password, self._host, self._port,
804
command=[self._bzr_remote_path, 'serve', '--inet',
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
805
818
'--directory=/', '--allow-writes'])
806
self._read_from, self._write_to = \
807
self._ssh_connection.get_filelike_channels()
808
self._connected = True
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
if io_kind == 'socket':
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
self.base, io_object)
823
elif io_kind == 'pipes':
824
read_from, write_to = io_object
825
self._real_medium = SmartSimplePipesClientMedium(
826
read_from, write_to, self.base)
828
raise AssertionError(
829
"Unexpected io_kind %r from %r"
830
% (io_kind, self._ssh_connection))
810
832
def _flush(self):
811
833
"""See SmartClientStreamMedium._flush()."""
812
self._write_to.flush()
834
self._real_medium._flush()
814
836
def _read_bytes(self, count):
815
837
"""See SmartClientStreamMedium.read_bytes."""
816
if not self._connected:
838
if self._real_medium is None:
817
839
raise errors.MediumNotConnected(self)
818
bytes_to_read = min(count, _MAX_READ_SIZE)
819
bytes = self._read_from.read(bytes_to_read)
820
self._report_activity(len(bytes), 'read')
840
return self._real_medium.read_bytes(count)
824
843
# Port 4155 is the default port for bzr://, registered with IANA.
826
845
BZR_DEFAULT_PORT = 4155
829
class SmartTCPClientMedium(SmartClientStreamMedium):
830
"""A client medium using TCP."""
848
class SmartClientSocketMedium(SmartClientStreamMedium):
849
"""A client medium using a socket.
851
This class isn't usable directly. Use one of its subclasses instead.
832
def __init__(self, host, port, base):
833
"""Creates a client that will connect on the first use."""
854
def __init__(self, base):
834
855
SmartClientStreamMedium.__init__(self, base)
835
857
self._connected = False
840
859
def _accept_bytes(self, bytes):
841
860
"""See SmartClientMedium.accept_bytes."""
842
861
self._ensure_connection()
843
862
osutils.send_all(self._socket, bytes, self._report_activity)
864
def _ensure_connection(self):
865
"""Connect this medium if not already connected."""
866
raise NotImplementedError(self._ensure_connection)
869
"""See SmartClientStreamMedium._flush().
871
For sockets we do no flushing. For TCP sockets we may want to turn off
872
TCP_NODELAY and add a means to do a flush, but that can be done in the
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
return osutils.read_bytes_from_socket(
881
self._socket, self._report_activity)
845
883
def disconnect(self):
846
884
"""See SmartClientMedium.disconnect()."""
847
885
if not self._connected:
889
937
(self._host, port, err_msg))
890
938
self._connected = True
893
"""See SmartClientStreamMedium._flush().
895
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
896
add a means to do a flush, but that can be done in the future.
899
def _read_bytes(self, count):
900
"""See SmartClientMedium.read_bytes."""
901
if not self._connected:
902
raise errors.MediumNotConnected(self)
903
return osutils.read_bytes_from_socket(
904
self._socket, self._report_activity)
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
"""A client medium for an already connected socket.
944
Note that this class will assume it "owns" the socket, so it will close it
945
when its disconnect method is called.
948
def __init__(self, base, sock):
949
SmartClientSocketMedium.__init__(self, base)
951
self._connected = True
953
def _ensure_connection(self):
954
# Already connected, by definition! So nothing to do.
907
958
class SmartClientStreamMediumRequest(SmartClientMediumRequest):