491
490
return self._medium._get_line()
493
class _VfsRefuser(object):
494
"""An object that refuses all VFS requests.
499
client._SmartClient.hooks.install_named_hook(
500
'call', self.check_vfs, 'vfs refuser')
502
def check_vfs(self, params):
504
request_method = request.request_handlers.get(params.method)
506
# A method we don't know about doesn't count as a VFS method.
508
if issubclass(request_method, vfs.VfsRequest):
509
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
494
512
class _DebugCounter(object):
495
513
"""An object that counts the HPSS calls made to each client medium.
497
When a medium is garbage-collected, or failing that when atexit functions
498
are run, the total number of calls made on that medium are reported via
515
When a medium is garbage-collected, or failing that when
516
bzrlib.global_state exits, the total number of calls made on that medium
517
are reported via trace.note.
502
520
def __init__(self):
503
521
self.counts = weakref.WeakKeyDictionary()
504
522
client._SmartClient.hooks.install_named_hook(
505
523
'call', self.increment_call_count, 'hpss call counter')
506
atexit.register(self.flush_all)
524
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
508
526
def track(self, medium):
509
527
"""Start tracking calls made to a medium.
738
757
def _read_bytes(self, count):
739
758
"""See SmartClientStreamMedium._read_bytes."""
740
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
759
bytes_to_read = min(count, _MAX_READ_SIZE)
760
bytes = self._readable_pipe.read(bytes_to_read)
741
761
self._report_activity(len(bytes), 'read')
765
class SSHParams(object):
766
"""A set of parameters for starting a remote bzr via SSH."""
768
def __init__(self, host, port=None, username=None, password=None,
769
bzr_remote_path='bzr'):
772
self.username = username
773
self.password = password
774
self.bzr_remote_path = bzr_remote_path
745
777
class SmartSSHClientMedium(SmartClientStreamMedium):
746
"""A client medium using SSH."""
778
"""A client medium using SSH.
780
It delegates IO to a SmartClientSocketMedium or
781
SmartClientAlreadyConnectedSocketMedium (depending on platform).
748
def __init__(self, host, port=None, username=None, password=None,
749
base=None, vendor=None, bzr_remote_path=None):
784
def __init__(self, base, ssh_params, vendor=None):
750
785
"""Creates a client that will connect on the first use.
787
:param ssh_params: A SSHParams instance.
752
788
:param vendor: An optional override for the ssh vendor to use. See
753
789
bzrlib.transport.ssh for details on ssh vendors.
755
self._connected = False
757
self._password = password
759
self._username = username
791
self._real_medium = None
792
self._ssh_params = ssh_params
760
793
# for the benefit of progress making a short description of this
762
795
self._scheme = 'bzr+ssh'
764
797
# _DebugCounter so we have to store all the values used in our repr
765
798
# method before calling the super init.
766
799
SmartClientStreamMedium.__init__(self, base)
767
self._read_from = None
800
self._vendor = vendor
768
801
self._ssh_connection = None
769
self._vendor = vendor
770
self._write_to = None
771
self._bzr_remote_path = bzr_remote_path
773
803
def __repr__(self):
774
if self._port is None:
804
if self._ssh_params.port is None:
777
maybe_port = ':%s' % self._port
807
maybe_port = ':%s' % self._ssh_params.port
778
808
return "%s(%s://%s@%s%s/)" % (
779
809
self.__class__.__name__,
811
self._ssh_params.username,
812
self._ssh_params.host,
785
815
def _accept_bytes(self, bytes):
786
816
"""See SmartClientStreamMedium.accept_bytes."""
787
817
self._ensure_connection()
788
self._write_to.write(bytes)
789
self._report_activity(len(bytes), 'write')
818
self._real_medium.accept_bytes(bytes)
791
820
def disconnect(self):
792
821
"""See SmartClientMedium.disconnect()."""
793
if not self._connected:
795
self._read_from.close()
796
self._write_to.close()
797
self._ssh_connection.close()
798
self._connected = False
822
if self._real_medium is not None:
823
self._real_medium.disconnect()
824
self._real_medium = None
825
if self._ssh_connection is not None:
826
self._ssh_connection.close()
827
self._ssh_connection = None
800
829
def _ensure_connection(self):
801
830
"""Connect this medium if not already connected."""
831
if self._real_medium is not None:
804
833
if self._vendor is None:
805
834
vendor = ssh._get_ssh_vendor()
807
836
vendor = self._vendor
808
self._ssh_connection = vendor.connect_ssh(self._username,
809
self._password, self._host, self._port,
810
command=[self._bzr_remote_path, 'serve', '--inet',
837
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
838
self._ssh_params.password, self._ssh_params.host,
839
self._ssh_params.port,
840
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
841
'--directory=/', '--allow-writes'])
812
self._read_from, self._write_to = \
813
self._ssh_connection.get_filelike_channels()
814
self._connected = True
842
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
843
if io_kind == 'socket':
844
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
845
self.base, io_object)
846
elif io_kind == 'pipes':
847
read_from, write_to = io_object
848
self._real_medium = SmartSimplePipesClientMedium(
849
read_from, write_to, self.base)
851
raise AssertionError(
852
"Unexpected io_kind %r from %r"
853
% (io_kind, self._ssh_connection))
816
855
def _flush(self):
817
856
"""See SmartClientStreamMedium._flush()."""
818
self._write_to.flush()
857
self._real_medium._flush()
820
859
def _read_bytes(self, count):
821
860
"""See SmartClientStreamMedium.read_bytes."""
822
if not self._connected:
861
if self._real_medium is None:
823
862
raise errors.MediumNotConnected(self)
824
bytes_to_read = min(count, _MAX_READ_SIZE)
825
bytes = self._read_from.read(bytes_to_read)
826
self._report_activity(len(bytes), 'read')
863
return self._real_medium.read_bytes(count)
830
866
# Port 4155 is the default port for bzr://, registered with IANA.
832
868
BZR_DEFAULT_PORT = 4155
835
class SmartTCPClientMedium(SmartClientStreamMedium):
836
"""A client medium using TCP."""
871
class SmartClientSocketMedium(SmartClientStreamMedium):
872
"""A client medium using a socket.
874
This class isn't usable directly. Use one of its subclasses instead.
838
def __init__(self, host, port, base):
839
"""Creates a client that will connect on the first use."""
877
def __init__(self, base):
840
878
SmartClientStreamMedium.__init__(self, base)
841
880
self._connected = False
846
882
def _accept_bytes(self, bytes):
847
883
"""See SmartClientMedium.accept_bytes."""
848
884
self._ensure_connection()
849
885
osutils.send_all(self._socket, bytes, self._report_activity)
887
def _ensure_connection(self):
888
"""Connect this medium if not already connected."""
889
raise NotImplementedError(self._ensure_connection)
892
"""See SmartClientStreamMedium._flush().
894
For sockets we do no flushing. For TCP sockets we may want to turn off
895
TCP_NODELAY and add a means to do a flush, but that can be done in the
899
def _read_bytes(self, count):
900
"""See SmartClientMedium.read_bytes."""
901
if not self._connected:
902
raise errors.MediumNotConnected(self)
903
return osutils.read_bytes_from_socket(
904
self._socket, self._report_activity)
851
906
def disconnect(self):
852
907
"""See SmartClientMedium.disconnect()."""
853
908
if not self._connected:
895
960
(self._host, port, err_msg))
896
961
self._connected = True
899
"""See SmartClientStreamMedium._flush().
901
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
add a means to do a flush, but that can be done in the future.
905
def _read_bytes(self, count):
906
"""See SmartClientMedium.read_bytes."""
907
if not self._connected:
908
raise errors.MediumNotConnected(self)
909
return osutils.read_bytes_from_socket(
910
self._socket, self._report_activity)
964
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
965
"""A client medium for an already connected socket.
967
Note that this class will assume it "owns" the socket, so it will close it
968
when its disconnect method is called.
971
def __init__(self, base, sock):
972
SmartClientSocketMedium.__init__(self, base)
974
self._connected = True
976
def _ensure_connection(self):
977
# Already connected, by definition! So nothing to do.
913
981
class SmartClientStreamMediumRequest(SmartClientMediumRequest):