24
24
bzrlib/transport/smart/__init__.py.
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
37
38
from bzrlib import (
46
from bzrlib.smart import client, protocol
45
from bzrlib.smart import client, protocol, request, vfs
47
46
from bzrlib.transport import ssh
51
# We must not read any more than 64k at a time so we don't risk "no buffer
52
# space available" errors on some platforms. Windows in particular is likely
53
# to give error 10053 or 10055 if we read more than 64k from a socket.
54
_MAX_READ_SIZE = 64 * 1024
48
from bzrlib import osutils
50
# Throughout this module buffer size parameters are either limited to be at
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
53
# from non-sockets as well.
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
57
56
def _get_protocol_factory_for_bytes(bytes):
58
57
"""Determine the right protocol factory for 'bytes'.
519
523
def increment_call_count(self, params):
520
524
# Increment the count in the WeakKeyDictionary
521
525
value = self.counts[params.medium]
528
request_method = request.request_handlers.get(params.method)
530
# A method we don't know about doesn't count as a VFS method.
532
if issubclass(request_method, vfs.VfsRequest):
533
value['vfs_count'] += 1
524
535
def done(self, ref):
525
536
value = self.counts[ref]
526
count, medium_repr = value
537
count, vfs_count, medium_repr = (
538
value['count'], value['vfs_count'], value['medium_repr'])
527
539
# In case this callback is invoked for the same ref twice (by the
528
540
# weakref callback and by the atexit function), set the call count back
529
541
# to 0 so this item won't be reported twice.
543
value['vfs_count'] = 0
532
trace.note('HPSS calls: %d %s', count, medium_repr)
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
534
548
def flush_all(self):
535
549
for ref in list(self.counts.keys()):
713
733
def _read_bytes(self, count):
714
734
"""See SmartClientStreamMedium._read_bytes."""
715
bytes = self._readable_pipe.read(count)
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
716
737
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
744
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
720
753
class SmartSSHClientMedium(SmartClientStreamMedium):
721
"""A client medium using SSH."""
754
"""A client medium using SSH.
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
723
def __init__(self, host, port=None, username=None, password=None,
724
base=None, vendor=None, bzr_remote_path=None):
760
def __init__(self, base, ssh_params, vendor=None):
725
761
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
727
764
:param vendor: An optional override for the ssh vendor to use. See
728
765
bzrlib.transport.ssh for details on ssh vendors.
730
self._connected = False
732
self._password = password
734
self._username = username
767
self._real_medium = None
768
self._ssh_params = ssh_params
769
# for the benefit of progress making a short description of this
771
self._scheme = 'bzr+ssh'
735
772
# SmartClientStreamMedium stores the repr of this object in its
736
773
# _DebugCounter so we have to store all the values used in our repr
737
774
# method before calling the super init.
738
775
SmartClientStreamMedium.__init__(self, base)
739
self._read_from = None
776
self._vendor = vendor
740
777
self._ssh_connection = None
741
self._vendor = vendor
742
self._write_to = None
743
self._bzr_remote_path = bzr_remote_path
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
748
779
def __repr__(self):
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
780
if self._ssh_params.port is None:
783
maybe_port = ':%s' % self._ssh_params.port
784
return "%s(%s://%s@%s%s/)" % (
750
785
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
756
791
def _accept_bytes(self, bytes):
757
792
"""See SmartClientStreamMedium.accept_bytes."""
758
793
self._ensure_connection()
759
self._write_to.write(bytes)
760
self._report_activity(len(bytes), 'write')
794
self._real_medium.accept_bytes(bytes)
762
796
def disconnect(self):
763
797
"""See SmartClientMedium.disconnect()."""
764
if not self._connected:
766
self._read_from.close()
767
self._write_to.close()
768
self._ssh_connection.close()
769
self._connected = False
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
771
805
def _ensure_connection(self):
772
806
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
775
809
if self._vendor is None:
776
810
vendor = ssh._get_ssh_vendor()
778
812
vendor = self._vendor
779
self._ssh_connection = vendor.connect_ssh(self._username,
780
self._password, self._host, self._port,
781
command=[self._bzr_remote_path, 'serve', '--inet',
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
782
817
'--directory=/', '--allow-writes'])
783
self._read_from, self._write_to = \
784
self._ssh_connection.get_filelike_channels()
785
self._connected = True
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
787
831
def _flush(self):
788
832
"""See SmartClientStreamMedium._flush()."""
789
self._write_to.flush()
833
self._real_medium._flush()
791
835
def _read_bytes(self, count):
792
836
"""See SmartClientStreamMedium.read_bytes."""
793
if not self._connected:
837
if self._real_medium is None:
794
838
raise errors.MediumNotConnected(self)
795
bytes_to_read = min(count, _MAX_READ_SIZE)
796
bytes = self._read_from.read(bytes_to_read)
797
self._report_activity(len(bytes), 'read')
839
return self._real_medium.read_bytes(count)
801
842
# Port 4155 is the default port for bzr://, registered with IANA.
803
844
BZR_DEFAULT_PORT = 4155
806
class SmartTCPClientMedium(SmartClientStreamMedium):
807
"""A client medium using TCP."""
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
809
def __init__(self, host, port, base):
810
"""Creates a client that will connect on the first use."""
853
def __init__(self, base):
811
854
SmartClientStreamMedium.__init__(self, base)
812
856
self._connected = False
817
858
def _accept_bytes(self, bytes):
818
859
"""See SmartClientMedium.accept_bytes."""
819
860
self._ensure_connection()
820
861
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
822
882
def disconnect(self):
823
883
"""See SmartClientMedium.disconnect()."""
824
884
if not self._connected:
866
936
(self._host, port, err_msg))
867
937
self._connected = True
870
"""See SmartClientStreamMedium._flush().
872
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
873
add a means to do a flush, but that can be done in the future.
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
# We ignore the desired_count because on sockets it's more efficient to
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
except socket.error, e:
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
"""A client medium for an already connected socket.
943
Note that this class will assume it "owns" the socket, so it will close it
944
when its disconnect method is called.
947
def __init__(self, base, sock):
948
SmartClientSocketMedium.__init__(self, base)
950
self._connected = True
952
def _ensure_connection(self):
953
# Already connected, by definition! So nothing to do.
895
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):