24
24
bzrlib/transport/smart/__init__.py.
32
33
from bzrlib.lazy_import import lazy_import
33
34
lazy_import(globals(), """
38
37
from bzrlib import (
45
from bzrlib.smart import client, protocol, request, vfs
46
from bzrlib.smart import client, protocol
46
47
from bzrlib.transport import ssh
48
from bzrlib import osutils
50
# Throughout this module buffer size parameters are either limited to be at
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
53
# from non-sockets as well.
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
51
# We must not read any more than 64k at a time so we don't risk "no buffer
52
# space available" errors on some platforms. Windows in particular is likely
53
# to give error 10053 or 10055 if we read more than 64k from a socket.
54
_MAX_READ_SIZE = 64 * 1024
56
57
def _get_protocol_factory_for_bytes(bytes):
57
58
"""Determine the right protocol factory for 'bytes'.
523
519
def increment_call_count(self, params):
524
520
# Increment the count in the WeakKeyDictionary
525
521
value = self.counts[params.medium]
528
request_method = request.request_handlers.get(params.method)
530
# A method we don't know about doesn't count as a VFS method.
532
if issubclass(request_method, vfs.VfsRequest):
533
value['vfs_count'] += 1
535
524
def done(self, ref):
536
525
value = self.counts[ref]
537
count, vfs_count, medium_repr = (
538
value['count'], value['vfs_count'], value['medium_repr'])
526
count, medium_repr = value
539
527
# In case this callback is invoked for the same ref twice (by the
540
528
# weakref callback and by the atexit function), set the call count back
541
529
# to 0 so this item won't be reported twice.
543
value['vfs_count'] = 0
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
532
trace.note('HPSS calls: %d %s', count, medium_repr)
548
534
def flush_all(self):
549
535
for ref in list(self.counts.keys()):
606
592
# which is newer than a previously supplied older-than version.
607
593
# This indicates that some smart verb call is not guarded
608
594
# appropriately (it should simply not have been tried).
595
raise AssertionError(
610
596
"_remember_remote_is_before(%r) called, but "
611
597
"_remember_remote_is_before(%r) was called previously."
612
, version_tuple, self._remote_version_is_before)
613
if 'hpss' in debug.debug_flags:
614
ui.ui_factory.show_warning(
615
"_remember_remote_is_before(%r) called, but "
616
"_remember_remote_is_before(%r) was called previously."
617
% (version_tuple, self._remote_version_is_before))
598
% (version_tuple, self._remote_version_is_before))
619
599
self._remote_version_is_before = version_tuple
621
601
def protocol_version(self):
733
713
def _read_bytes(self, count):
734
714
"""See SmartClientStreamMedium._read_bytes."""
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
715
bytes = self._readable_pipe.read(count)
737
716
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
720
class SmartSSHClientMedium(SmartClientStreamMedium):
721
"""A client medium using SSH."""
744
723
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
753
class SmartSSHClientMedium(SmartClientStreamMedium):
754
"""A client medium using SSH.
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
760
def __init__(self, base, ssh_params, vendor=None):
724
base=None, vendor=None, bzr_remote_path=None):
761
725
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
764
727
:param vendor: An optional override for the ssh vendor to use. See
765
728
bzrlib.transport.ssh for details on ssh vendors.
767
self._real_medium = None
768
self._ssh_params = ssh_params
769
# for the benefit of progress making a short description of this
771
self._scheme = 'bzr+ssh'
730
self._connected = False
732
self._password = password
734
self._username = username
772
735
# SmartClientStreamMedium stores the repr of this object in its
773
736
# _DebugCounter so we have to store all the values used in our repr
774
737
# method before calling the super init.
775
738
SmartClientStreamMedium.__init__(self, base)
739
self._read_from = None
740
self._ssh_connection = None
776
741
self._vendor = vendor
777
self._ssh_connection = None
742
self._write_to = None
743
self._bzr_remote_path = bzr_remote_path
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
779
748
def __repr__(self):
780
if self._ssh_params.port is None:
783
maybe_port = ':%s' % self._ssh_params.port
784
return "%s(%s://%s@%s%s/)" % (
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
785
750
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
791
756
def _accept_bytes(self, bytes):
792
757
"""See SmartClientStreamMedium.accept_bytes."""
793
758
self._ensure_connection()
794
self._real_medium.accept_bytes(bytes)
759
self._write_to.write(bytes)
760
self._report_activity(len(bytes), 'write')
796
762
def disconnect(self):
797
763
"""See SmartClientMedium.disconnect()."""
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
764
if not self._connected:
766
self._read_from.close()
767
self._write_to.close()
768
self._ssh_connection.close()
769
self._connected = False
805
771
def _ensure_connection(self):
806
772
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
809
775
if self._vendor is None:
810
776
vendor = ssh._get_ssh_vendor()
812
778
vendor = self._vendor
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
779
self._ssh_connection = vendor.connect_ssh(self._username,
780
self._password, self._host, self._port,
781
command=[self._bzr_remote_path, 'serve', '--inet',
817
782
'--directory=/', '--allow-writes'])
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
783
self._read_from, self._write_to = \
784
self._ssh_connection.get_filelike_channels()
785
self._connected = True
831
787
def _flush(self):
832
788
"""See SmartClientStreamMedium._flush()."""
833
self._real_medium._flush()
789
self._write_to.flush()
835
791
def _read_bytes(self, count):
836
792
"""See SmartClientStreamMedium.read_bytes."""
837
if self._real_medium is None:
793
if not self._connected:
838
794
raise errors.MediumNotConnected(self)
839
return self._real_medium.read_bytes(count)
795
bytes_to_read = min(count, _MAX_READ_SIZE)
796
bytes = self._read_from.read(bytes_to_read)
797
self._report_activity(len(bytes), 'read')
842
801
# Port 4155 is the default port for bzr://, registered with IANA.
844
803
BZR_DEFAULT_PORT = 4155
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
806
class SmartTCPClientMedium(SmartClientStreamMedium):
807
"""A client medium using TCP."""
853
def __init__(self, base):
809
def __init__(self, host, port, base):
810
"""Creates a client that will connect on the first use."""
854
811
SmartClientStreamMedium.__init__(self, base)
812
self._connected = False
855
815
self._socket = None
856
self._connected = False
858
817
def _accept_bytes(self, bytes):
859
818
"""See SmartClientMedium.accept_bytes."""
860
819
self._ensure_connection()
861
820
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
882
822
def disconnect(self):
883
823
"""See SmartClientMedium.disconnect()."""
884
824
if not self._connected:
936
866
(self._host, port, err_msg))
937
867
self._connected = True
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
"""A client medium for an already connected socket.
943
Note that this class will assume it "owns" the socket, so it will close it
944
when its disconnect method is called.
947
def __init__(self, base, sock):
948
SmartClientSocketMedium.__init__(self, base)
950
self._connected = True
952
def _ensure_connection(self):
953
# Already connected, by definition! So nothing to do.
870
"""See SmartClientStreamMedium._flush().
872
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
873
add a means to do a flush, but that can be done in the future.
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
# We ignore the desired_count because on sockets it's more efficient to
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
except socket.error, e:
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
957
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):