24
24
bzrlib/transport/smart/__init__.py.
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
37
38
from bzrlib import (
45
from bzrlib.smart import client, protocol
45
from bzrlib.smart import client, protocol, request, vfs
46
46
from bzrlib.transport import ssh
50
# We must not read any more than 64k at a time so we don't risk "no buffer
51
# space available" errors on some platforms. Windows in particular is likely
52
# to give error 10053 or 10055 if we read more than 64k from a socket.
53
_MAX_READ_SIZE = 64 * 1024
48
from bzrlib import osutils
50
# Throughout this module buffer size parameters are either limited to be at
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
53
# from non-sockets as well.
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
56
56
def _get_protocol_factory_for_bytes(bytes):
57
57
"""Determine the right protocol factory for 'bytes'.
261
273
def _serve_one_request_unguarded(self, protocol):
262
274
while protocol.next_read_size():
263
275
# We can safely try to read large chunks. If there is less data
264
# than _MAX_READ_SIZE ready, the socket wil just return a short
265
# read immediately rather than block.
266
bytes = self.read_bytes(_MAX_READ_SIZE)
276
# than MAX_SOCKET_CHUNK ready, the socket will just return a
277
# short read immediately rather than block.
278
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
268
280
self.finished = True
270
282
protocol.accept_bytes(bytes)
272
284
self._push_back(protocol.unused_data)
274
286
def _read_bytes(self, desired_count):
275
# We ignore the desired_count because on sockets it's more efficient to
276
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return self.socket.recv(_MAX_READ_SIZE)
287
return osutils.read_bytes_from_socket(
288
self.socket, self._report_activity)
279
290
def terminate_due_to_error(self):
280
291
# TODO: This should log to a server log file, but no such thing
505
523
def increment_call_count(self, params):
506
524
# Increment the count in the WeakKeyDictionary
507
525
value = self.counts[params.medium]
528
request_method = request.request_handlers.get(params.method)
530
# A method we don't know about doesn't count as a VFS method.
532
if issubclass(request_method, vfs.VfsRequest):
533
value['vfs_count'] += 1
510
535
def done(self, ref):
511
536
value = self.counts[ref]
512
count, medium_repr = value
537
count, vfs_count, medium_repr = (
538
value['count'], value['vfs_count'], value['medium_repr'])
513
539
# In case this callback is invoked for the same ref twice (by the
514
540
# weakref callback and by the atexit function), set the call count back
515
541
# to 0 so this item won't be reported twice.
543
value['vfs_count'] = 0
518
trace.note('HPSS calls: %d %s', count, medium_repr)
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
520
548
def flush_all(self):
521
549
for ref in list(self.counts.keys()):
524
552
_debug_counter = None
527
555
class SmartClientMedium(SmartMedium):
528
556
"""Smart client is a medium for sending smart protocol requests over."""
575
603
if (self._remote_version_is_before is not None and
576
604
version_tuple > self._remote_version_is_before):
577
raise AssertionError(
605
# We have been told that the remote side is older than some version
606
# which is newer than a previously supplied older-than version.
607
# This indicates that some smart verb call is not guarded
608
# appropriately (it should simply not have been tried).
578
610
"_remember_remote_is_before(%r) called, but "
579
611
"_remember_remote_is_before(%r) was called previously."
580
% (version_tuple, self._remote_version_is_before))
612
, version_tuple, self._remote_version_is_before)
613
if 'hpss' in debug.debug_flags:
614
ui.ui_factory.show_warning(
615
"_remember_remote_is_before(%r) called, but "
616
"_remember_remote_is_before(%r) was called previously."
617
% (version_tuple, self._remote_version_is_before))
581
619
self._remote_version_is_before = version_tuple
583
621
def protocol_version(self):
694
733
def _read_bytes(self, count):
695
734
"""See SmartClientStreamMedium._read_bytes."""
696
return self._readable_pipe.read(count)
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
737
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
744
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
699
753
class SmartSSHClientMedium(SmartClientStreamMedium):
700
"""A client medium using SSH."""
754
"""A client medium using SSH.
702
def __init__(self, host, port=None, username=None, password=None,
703
base=None, vendor=None, bzr_remote_path=None):
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
760
def __init__(self, base, ssh_params, vendor=None):
704
761
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
706
764
:param vendor: An optional override for the ssh vendor to use. See
707
765
bzrlib.transport.ssh for details on ssh vendors.
767
self._real_medium = None
768
self._ssh_params = ssh_params
769
# for the benefit of progress making a short description of this
771
self._scheme = 'bzr+ssh'
772
# SmartClientStreamMedium stores the repr of this object in its
773
# _DebugCounter so we have to store all the values used in our repr
774
# method before calling the super init.
709
775
SmartClientStreamMedium.__init__(self, base)
710
self._connected = False
712
self._password = password
714
self._username = username
715
self._read_from = None
776
self._vendor = vendor
716
777
self._ssh_connection = None
717
self._vendor = vendor
718
self._write_to = None
719
self._bzr_remote_path = bzr_remote_path
720
if self._bzr_remote_path is None:
721
symbol_versioning.warn(
722
'bzr_remote_path is required as of bzr 0.92',
723
DeprecationWarning, stacklevel=2)
724
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
780
if self._ssh_params.port is None:
783
maybe_port = ':%s' % self._ssh_params.port
784
return "%s(%s://%s@%s%s/)" % (
785
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
726
791
def _accept_bytes(self, bytes):
727
792
"""See SmartClientStreamMedium.accept_bytes."""
728
793
self._ensure_connection()
729
self._write_to.write(bytes)
794
self._real_medium.accept_bytes(bytes)
731
796
def disconnect(self):
732
797
"""See SmartClientMedium.disconnect()."""
733
if not self._connected:
735
self._read_from.close()
736
self._write_to.close()
737
self._ssh_connection.close()
738
self._connected = False
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
740
805
def _ensure_connection(self):
741
806
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
744
809
if self._vendor is None:
745
810
vendor = ssh._get_ssh_vendor()
747
812
vendor = self._vendor
748
self._ssh_connection = vendor.connect_ssh(self._username,
749
self._password, self._host, self._port,
750
command=[self._bzr_remote_path, 'serve', '--inet',
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
751
817
'--directory=/', '--allow-writes'])
752
self._read_from, self._write_to = \
753
self._ssh_connection.get_filelike_channels()
754
self._connected = True
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
756
831
def _flush(self):
757
832
"""See SmartClientStreamMedium._flush()."""
758
self._write_to.flush()
833
self._real_medium._flush()
760
835
def _read_bytes(self, count):
761
836
"""See SmartClientStreamMedium.read_bytes."""
762
if not self._connected:
837
if self._real_medium is None:
763
838
raise errors.MediumNotConnected(self)
764
bytes_to_read = min(count, _MAX_READ_SIZE)
765
return self._read_from.read(bytes_to_read)
839
return self._real_medium.read_bytes(count)
768
842
# Port 4155 is the default port for bzr://, registered with IANA.
770
844
BZR_DEFAULT_PORT = 4155
773
class SmartTCPClientMedium(SmartClientStreamMedium):
774
"""A client medium using TCP."""
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
853
def __init__(self, base):
854
SmartClientStreamMedium.__init__(self, base)
856
self._connected = False
858
def _accept_bytes(self, bytes):
859
"""See SmartClientMedium.accept_bytes."""
860
self._ensure_connection()
861
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
882
def disconnect(self):
883
"""See SmartClientMedium.disconnect()."""
884
if not self._connected:
888
self._connected = False
891
class SmartTCPClientMedium(SmartClientSocketMedium):
892
"""A client medium that creates a TCP connection."""
776
894
def __init__(self, host, port, base):
777
895
"""Creates a client that will connect on the first use."""
778
SmartClientStreamMedium.__init__(self, base)
779
self._connected = False
896
SmartClientSocketMedium.__init__(self, base)
780
897
self._host = host
781
898
self._port = port
784
def _accept_bytes(self, bytes):
785
"""See SmartClientMedium.accept_bytes."""
786
self._ensure_connection()
787
osutils.send_all(self._socket, bytes)
789
def disconnect(self):
790
"""See SmartClientMedium.disconnect()."""
791
if not self._connected:
795
self._connected = False
797
900
def _ensure_connection(self):
798
901
"""Connect this medium if not already connected."""
833
936
(self._host, port, err_msg))
834
937
self._connected = True
837
"""See SmartClientStreamMedium._flush().
839
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
840
add a means to do a flush, but that can be done in the future.
843
def _read_bytes(self, count):
844
"""See SmartClientMedium.read_bytes."""
845
if not self._connected:
846
raise errors.MediumNotConnected(self)
847
# We ignore the desired_count because on sockets it's more efficient to
848
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
850
return self._socket.recv(_MAX_READ_SIZE)
851
except socket.error, e:
852
if len(e.args) and e.args[0] == errno.ECONNRESET:
853
# Callers expect an empty string in that case
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
"""A client medium for an already connected socket.
943
Note that this class will assume it "owns" the socket, so it will close it
944
when its disconnect method is called.
947
def __init__(self, base, sock):
948
SmartClientSocketMedium.__init__(self, base)
950
self._connected = True
952
def _ensure_connection(self):
953
# Already connected, by definition! So nothing to do.
859
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):