46
47
from bzrlib.smart import client, protocol, request, vfs
47
48
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
49
51
from bzrlib import osutils
51
# Throughout this module buffer size parameters are either limited to be at
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
53
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
54
# from non-sockets as well.
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
53
# We must not read any more than 64k at a time so we don't risk "no buffer
54
# space available" errors on some platforms. Windows in particular is likely
55
# to give error 10053 or 10055 if we read more than 64k from a socket.
56
_MAX_READ_SIZE = 64 * 1024
57
59
def _get_protocol_factory_for_bytes(bytes):
58
60
"""Determine the right protocol factory for 'bytes'.
285
287
self._push_back(protocol.unused_data)
287
289
def _read_bytes(self, desired_count):
288
return osutils.read_bytes_from_socket(
289
self.socket, self._report_activity)
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
291
293
def terminate_due_to_error(self):
292
294
# TODO: This should log to a server log file, but no such thing
293
295
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
295
297
self.finished = True
297
299
def _write_out(self, bytes):
332
334
bytes_to_read = protocol.next_read_size()
333
335
if bytes_to_read == 0:
334
336
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
337
339
bytes = self.read_bytes(bytes_to_read)
339
341
# Connection has been closed.
340
342
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
343
345
protocol.accept_bytes(bytes)
345
347
def _read_bytes(self, desired_count):
346
return self._in.read(desired_count)
348
return osutils.until_no_eintr(self._in.read, desired_count)
348
350
def terminate_due_to_error(self):
349
351
# TODO: This should log to a server log file, but no such thing
350
352
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
352
354
self.finished = True
354
356
def _write_out(self, bytes):
355
self._out.write(bytes)
357
osutils.until_no_eintr(self._out.write, bytes)
358
360
class SmartClientMediumRequest(object):
494
496
class _DebugCounter(object):
495
497
"""An object that counts the HPSS calls made to each client medium.
497
When a medium is garbage-collected, or failing that when
498
bzrlib.global_state exits, the total number of calls made on that medium
499
are reported via trace.note.
499
When a medium is garbage-collected, or failing that when atexit functions
500
are run, the total number of calls made on that medium are reported via
502
504
def __init__(self):
503
505
self.counts = weakref.WeakKeyDictionary()
504
506
client._SmartClient.hooks.install_named_hook(
505
507
'call', self.increment_call_count, 'hpss call counter')
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
508
atexit.register(self.flush_all)
508
510
def track(self, medium):
509
511
"""Start tracking calls made to a medium.
607
609
# which is newer than a previously supplied older-than version.
608
610
# This indicates that some smart verb call is not guarded
609
611
# appropriately (it should simply not have been tried).
612
raise AssertionError(
611
613
"_remember_remote_is_before(%r) called, but "
612
614
"_remember_remote_is_before(%r) was called previously."
613
, version_tuple, self._remote_version_is_before)
614
if 'hpss' in debug.debug_flags:
615
ui.ui_factory.show_warning(
616
"_remember_remote_is_before(%r) called, but "
617
"_remember_remote_is_before(%r) was called previously."
618
% (version_tuple, self._remote_version_is_before))
615
% (version_tuple, self._remote_version_is_before))
620
616
self._remote_version_is_before = version_tuple
622
618
def protocol_version(self):
725
721
def _accept_bytes(self, bytes):
726
722
"""See SmartClientStreamMedium.accept_bytes."""
727
self._writeable_pipe.write(bytes)
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
728
724
self._report_activity(len(bytes), 'write')
730
726
def _flush(self):
731
727
"""See SmartClientStreamMedium._flush()."""
732
self._writeable_pipe.flush()
728
osutils.until_no_eintr(self._writeable_pipe.flush)
734
730
def _read_bytes(self, count):
735
731
"""See SmartClientStreamMedium._read_bytes."""
736
bytes_to_read = min(count, _MAX_READ_SIZE)
737
bytes = self._readable_pipe.read(bytes_to_read)
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
738
733
self._report_activity(len(bytes), 'read')
742
class SSHParams(object):
743
"""A set of parameters for starting a remote bzr via SSH."""
737
class SmartSSHClientMedium(SmartClientStreamMedium):
738
"""A client medium using SSH."""
745
740
def __init__(self, host, port=None, username=None, password=None,
746
bzr_remote_path='bzr'):
749
self.username = username
750
self.password = password
751
self.bzr_remote_path = bzr_remote_path
754
class SmartSSHClientMedium(SmartClientStreamMedium):
755
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
758
SmartClientAlreadyConnectedSocketMedium (depending on platform).
761
def __init__(self, base, ssh_params, vendor=None):
741
base=None, vendor=None, bzr_remote_path=None):
762
742
"""Creates a client that will connect on the first use.
764
:param ssh_params: A SSHParams instance.
765
744
:param vendor: An optional override for the ssh vendor to use. See
766
745
bzrlib.transport.ssh for details on ssh vendors.
768
self._real_medium = None
769
self._ssh_params = ssh_params
747
self._connected = False
749
self._password = password
751
self._username = username
770
752
# for the benefit of progress making a short description of this
772
754
self._scheme = 'bzr+ssh'
774
756
# _DebugCounter so we have to store all the values used in our repr
775
757
# method before calling the super init.
776
758
SmartClientStreamMedium.__init__(self, base)
759
self._read_from = None
760
self._ssh_connection = None
777
761
self._vendor = vendor
778
self._ssh_connection = None
762
self._write_to = None
763
self._bzr_remote_path = bzr_remote_path
780
765
def __repr__(self):
781
if self._ssh_params.port is None:
766
if self._port is None:
784
maybe_port = ':%s' % self._ssh_params.port
769
maybe_port = ':%s' % self._port
785
770
return "%s(%s://%s@%s%s/)" % (
786
771
self.__class__.__name__,
788
self._ssh_params.username,
789
self._ssh_params.host,
792
777
def _accept_bytes(self, bytes):
793
778
"""See SmartClientStreamMedium.accept_bytes."""
794
779
self._ensure_connection()
795
self._real_medium.accept_bytes(bytes)
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
797
783
def disconnect(self):
798
784
"""See SmartClientMedium.disconnect()."""
799
if self._real_medium is not None:
800
self._real_medium.disconnect()
801
self._real_medium = None
802
if self._ssh_connection is not None:
803
self._ssh_connection.close()
804
self._ssh_connection = None
785
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
789
self._ssh_connection.close()
790
self._connected = False
806
792
def _ensure_connection(self):
807
793
"""Connect this medium if not already connected."""
808
if self._real_medium is not None:
810
796
if self._vendor is None:
811
797
vendor = ssh._get_ssh_vendor()
813
799
vendor = self._vendor
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
800
self._ssh_connection = vendor.connect_ssh(self._username,
801
self._password, self._host, self._port,
802
command=[self._bzr_remote_path, 'serve', '--inet',
818
803
'--directory=/', '--allow-writes'])
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
if io_kind == 'socket':
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
self.base, io_object)
823
elif io_kind == 'pipes':
824
read_from, write_to = io_object
825
self._real_medium = SmartSimplePipesClientMedium(
826
read_from, write_to, self.base)
828
raise AssertionError(
829
"Unexpected io_kind %r from %r"
830
% (io_kind, self._ssh_connection))
804
self._read_from, self._write_to = \
805
self._ssh_connection.get_filelike_channels()
806
self._connected = True
832
808
def _flush(self):
833
809
"""See SmartClientStreamMedium._flush()."""
834
self._real_medium._flush()
810
self._write_to.flush()
836
812
def _read_bytes(self, count):
837
813
"""See SmartClientStreamMedium.read_bytes."""
838
if self._real_medium is None:
814
if not self._connected:
839
815
raise errors.MediumNotConnected(self)
840
return self._real_medium.read_bytes(count)
816
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
843
822
# Port 4155 is the default port for bzr://, registered with IANA.
845
824
BZR_DEFAULT_PORT = 4155
848
class SmartClientSocketMedium(SmartClientStreamMedium):
849
"""A client medium using a socket.
851
This class isn't usable directly. Use one of its subclasses instead.
827
class SmartTCPClientMedium(SmartClientStreamMedium):
828
"""A client medium using TCP."""
854
def __init__(self, base):
830
def __init__(self, host, port, base):
831
"""Creates a client that will connect on the first use."""
855
832
SmartClientStreamMedium.__init__(self, base)
833
self._connected = False
856
836
self._socket = None
857
self._connected = False
859
838
def _accept_bytes(self, bytes):
860
839
"""See SmartClientMedium.accept_bytes."""
861
840
self._ensure_connection()
862
841
osutils.send_all(self._socket, bytes, self._report_activity)
864
def _ensure_connection(self):
865
"""Connect this medium if not already connected."""
866
raise NotImplementedError(self._ensure_connection)
869
"""See SmartClientStreamMedium._flush().
871
For sockets we do no flushing. For TCP sockets we may want to turn off
872
TCP_NODELAY and add a means to do a flush, but that can be done in the
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
return osutils.read_bytes_from_socket(
881
self._socket, self._report_activity)
883
843
def disconnect(self):
884
844
"""See SmartClientMedium.disconnect()."""
885
845
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
888
848
self._socket = None
889
849
self._connected = False
892
class SmartTCPClientMedium(SmartClientSocketMedium):
893
"""A client medium that creates a TCP connection."""
895
def __init__(self, host, port, base):
896
"""Creates a client that will connect on the first use."""
897
SmartClientSocketMedium.__init__(self, base)
901
851
def _ensure_connection(self):
902
852
"""Connect this medium if not already connected."""
903
853
if self._connected:
937
887
(self._host, port, err_msg))
938
888
self._connected = True
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
"""A client medium for an already connected socket.
944
Note that this class will assume it "owns" the socket, so it will close it
945
when its disconnect method is called.
948
def __init__(self, base, sock):
949
SmartClientSocketMedium.__init__(self, base)
951
self._connected = True
953
def _ensure_connection(self):
954
# Already connected, by definition! So nothing to do.
891
"""See SmartClientStreamMedium._flush().
893
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
894
add a means to do a flush, but that can be done in the future.
897
def _read_bytes(self, count):
898
"""See SmartClientMedium.read_bytes."""
899
if not self._connected:
900
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
958
905
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
995
942
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')