455
477
if not line.endswith('\n'):
456
478
# end of file encountered reading from server
457
479
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
480
"Unexpected end of message. Please check connectivity "
481
"and permissions, and report a bug if problems persist.")
462
484
def _read_line(self):
463
485
"""Helper for SmartClientMediumRequest.read_line.
465
487
By default this forwards to self._medium._get_line because we are
466
488
operating on the medium's stream.
468
490
return self._medium._get_line()
493
class _DebugCounter(object):
494
"""An object that counts the HPSS calls made to each client medium.
496
When a medium is garbage-collected, or failing that when
497
bzrlib.global_state exits, the total number of calls made on that medium
498
are reported via trace.note.
502
self.counts = weakref.WeakKeyDictionary()
503
client._SmartClient.hooks.install_named_hook(
504
'call', self.increment_call_count, 'hpss call counter')
505
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
def track(self, medium):
508
"""Start tracking calls made to a medium.
510
This only keeps a weakref to the medium, so shouldn't affect the
513
medium_repr = repr(medium)
514
# Add this medium to the WeakKeyDictionary
515
self.counts[medium] = dict(count=0, vfs_count=0,
516
medium_repr=medium_repr)
517
# Weakref callbacks are fired in reverse order of their association
518
# with the referenced object. So we add a weakref *after* adding to
519
# the WeakKeyDict so that we can report the value from it before the
520
# entry is removed by the WeakKeyDict's own callback.
521
ref = weakref.ref(medium, self.done)
523
def increment_call_count(self, params):
524
# Increment the count in the WeakKeyDictionary
525
value = self.counts[params.medium]
528
request_method = request.request_handlers.get(params.method)
530
# A method we don't know about doesn't count as a VFS method.
532
if issubclass(request_method, vfs.VfsRequest):
533
value['vfs_count'] += 1
536
value = self.counts[ref]
537
count, vfs_count, medium_repr = (
538
value['count'], value['vfs_count'], value['medium_repr'])
539
# In case this callback is invoked for the same ref twice (by the
540
# weakref callback and by the atexit function), set the call count back
541
# to 0 so this item won't be reported twice.
543
value['vfs_count'] = 0
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
549
for ref in list(self.counts.keys()):
552
_debug_counter = None
471
555
class SmartClientMedium(SmartMedium):
472
556
"""Smart client is a medium for sending smart protocol requests over."""
632
733
def _read_bytes(self, count):
633
734
"""See SmartClientStreamMedium._read_bytes."""
634
return self._readable_pipe.read(count)
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
737
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
744
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
637
753
class SmartSSHClientMedium(SmartClientStreamMedium):
638
"""A client medium using SSH."""
754
"""A client medium using SSH.
640
def __init__(self, host, port=None, username=None, password=None,
641
base=None, vendor=None, bzr_remote_path=None):
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
760
def __init__(self, base, ssh_params, vendor=None):
642
761
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
644
764
:param vendor: An optional override for the ssh vendor to use. See
645
765
bzrlib.transport.ssh for details on ssh vendors.
767
self._real_medium = None
768
self._ssh_params = ssh_params
769
# for the benefit of progress making a short description of this
771
self._scheme = 'bzr+ssh'
772
# SmartClientStreamMedium stores the repr of this object in its
773
# _DebugCounter so we have to store all the values used in our repr
774
# method before calling the super init.
647
775
SmartClientStreamMedium.__init__(self, base)
648
self._connected = False
650
self._password = password
652
self._username = username
653
self._read_from = None
776
self._vendor = vendor
654
777
self._ssh_connection = None
655
self._vendor = vendor
656
self._write_to = None
657
self._bzr_remote_path = bzr_remote_path
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
780
if self._ssh_params.port is None:
783
maybe_port = ':%s' % self._ssh_params.port
784
return "%s(%s://%s@%s%s/)" % (
785
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
664
791
def _accept_bytes(self, bytes):
665
792
"""See SmartClientStreamMedium.accept_bytes."""
666
793
self._ensure_connection()
667
self._write_to.write(bytes)
794
self._real_medium.accept_bytes(bytes)
669
796
def disconnect(self):
670
797
"""See SmartClientMedium.disconnect()."""
671
if not self._connected:
673
self._read_from.close()
674
self._write_to.close()
675
self._ssh_connection.close()
676
self._connected = False
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
678
805
def _ensure_connection(self):
679
806
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
682
809
if self._vendor is None:
683
810
vendor = ssh._get_ssh_vendor()
685
812
vendor = self._vendor
686
self._ssh_connection = vendor.connect_ssh(self._username,
687
self._password, self._host, self._port,
688
command=[self._bzr_remote_path, 'serve', '--inet',
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
689
817
'--directory=/', '--allow-writes'])
690
self._read_from, self._write_to = \
691
self._ssh_connection.get_filelike_channels()
692
self._connected = True
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
694
831
def _flush(self):
695
832
"""See SmartClientStreamMedium._flush()."""
696
self._write_to.flush()
833
self._real_medium._flush()
698
835
def _read_bytes(self, count):
699
836
"""See SmartClientStreamMedium.read_bytes."""
700
if not self._connected:
837
if self._real_medium is None:
701
838
raise errors.MediumNotConnected(self)
702
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
839
return self._real_medium.read_bytes(count)
706
842
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
843
BZR_DEFAULT_INTERFACE = None
708
844
BZR_DEFAULT_PORT = 4155
711
class SmartTCPClientMedium(SmartClientStreamMedium):
712
"""A client medium using TCP."""
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
853
def __init__(self, base):
854
SmartClientStreamMedium.__init__(self, base)
856
self._connected = False
858
def _accept_bytes(self, bytes):
859
"""See SmartClientMedium.accept_bytes."""
860
self._ensure_connection()
861
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
882
def disconnect(self):
883
"""See SmartClientMedium.disconnect()."""
884
if not self._connected:
888
self._connected = False
891
class SmartTCPClientMedium(SmartClientSocketMedium):
892
"""A client medium that creates a TCP connection."""
714
894
def __init__(self, host, port, base):
715
895
"""Creates a client that will connect on the first use."""
716
SmartClientStreamMedium.__init__(self, base)
717
self._connected = False
896
SmartClientSocketMedium.__init__(self, base)
718
897
self._host = host
719
898
self._port = port
722
def _accept_bytes(self, bytes):
723
"""See SmartClientMedium.accept_bytes."""
724
self._ensure_connection()
725
osutils.send_all(self._socket, bytes)
727
def disconnect(self):
728
"""See SmartClientMedium.disconnect()."""
729
if not self._connected:
733
self._connected = False
735
900
def _ensure_connection(self):
736
901
"""Connect this medium if not already connected."""
737
902
if self._connected:
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
741
904
if self._port is None:
742
905
port = BZR_DEFAULT_PORT
744
907
port = int(self._port)
746
self._socket.connect((self._host, port))
747
except socket.error, err:
909
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
910
socket.SOCK_STREAM, 0, 0)
911
except socket.gaierror, (err_num, err_msg):
912
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
913
(self._host, port, err_msg))
914
# Initialize err in case there are no addresses returned:
915
err = socket.error("no address found for %s" % self._host)
916
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
918
self._socket = socket.socket(family, socktype, proto)
919
self._socket.setsockopt(socket.IPPROTO_TCP,
920
socket.TCP_NODELAY, 1)
921
self._socket.connect(sockaddr)
922
except socket.error, err:
923
if self._socket is not None:
928
if self._socket is None:
748
929
# socket errors either have a (string) or (errno, string) as their
750
931
if type(err.args) is str: