47
46
from bzrlib.smart import client, protocol, request, vfs
48
47
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
49
from bzrlib import osutils
53
# We must not read any more than 64k at a time so we don't risk "no buffer
54
# space available" errors on some platforms. Windows in particular is likely
55
# to give error 10053 or 10055 if we read more than 64k from a socket.
56
_MAX_READ_SIZE = 64 * 1024
51
# Throughout this module buffer size parameters are either limited to be at
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
53
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
54
# from non-sockets as well.
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
59
57
def _get_protocol_factory_for_bytes(bytes):
60
58
"""Determine the right protocol factory for 'bytes'.
276
274
def _serve_one_request_unguarded(self, protocol):
277
275
while protocol.next_read_size():
278
276
# We can safely try to read large chunks. If there is less data
279
# than _MAX_READ_SIZE ready, the socket wil just return a short
280
# read immediately rather than block.
281
bytes = self.read_bytes(_MAX_READ_SIZE)
277
# than MAX_SOCKET_CHUNK ready, the socket will just return a
278
# short read immediately rather than block.
279
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
283
281
self.finished = True
287
285
self._push_back(protocol.unused_data)
289
287
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
288
return osutils.read_bytes_from_socket(
289
self.socket, self._report_activity)
293
291
def terminate_due_to_error(self):
294
292
# TODO: This should log to a server log file, but no such thing
295
293
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
295
self.finished = True
299
297
def _write_out(self, bytes):
334
332
bytes_to_read = protocol.next_read_size()
335
333
if bytes_to_read == 0:
336
334
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
337
bytes = self.read_bytes(bytes_to_read)
341
339
# Connection has been closed.
342
340
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
343
protocol.accept_bytes(bytes)
347
345
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
346
return self._in.read(desired_count)
350
348
def terminate_due_to_error(self):
351
349
# TODO: This should log to a server log file, but no such thing
352
350
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
352
self.finished = True
356
354
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
355
self._out.write(bytes)
360
358
class SmartClientMediumRequest(object):
496
494
class _DebugCounter(object):
497
495
"""An object that counts the HPSS calls made to each client medium.
499
When a medium is garbage-collected, or failing that when atexit functions
500
are run, the total number of calls made on that medium are reported via
497
When a medium is garbage-collected, or failing that when
498
bzrlib.global_state exits, the total number of calls made on that medium
499
are reported via trace.note.
504
502
def __init__(self):
505
503
self.counts = weakref.WeakKeyDictionary()
506
504
client._SmartClient.hooks.install_named_hook(
507
505
'call', self.increment_call_count, 'hpss call counter')
508
atexit.register(self.flush_all)
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
510
508
def track(self, medium):
511
509
"""Start tracking calls made to a medium.
609
607
# which is newer than a previously supplied older-than version.
610
608
# This indicates that some smart verb call is not guarded
611
609
# appropriately (it should simply not have been tried).
612
raise AssertionError(
613
611
"_remember_remote_is_before(%r) called, but "
614
612
"_remember_remote_is_before(%r) was called previously."
615
% (version_tuple, self._remote_version_is_before))
613
, version_tuple, self._remote_version_is_before)
614
if 'hpss' in debug.debug_flags:
615
ui.ui_factory.show_warning(
616
"_remember_remote_is_before(%r) called, but "
617
"_remember_remote_is_before(%r) was called previously."
618
% (version_tuple, self._remote_version_is_before))
616
620
self._remote_version_is_before = version_tuple
618
622
def protocol_version(self):
721
725
def _accept_bytes(self, bytes):
722
726
"""See SmartClientStreamMedium.accept_bytes."""
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
727
self._writeable_pipe.write(bytes)
724
728
self._report_activity(len(bytes), 'write')
726
730
def _flush(self):
727
731
"""See SmartClientStreamMedium._flush()."""
728
osutils.until_no_eintr(self._writeable_pipe.flush)
732
self._writeable_pipe.flush()
730
734
def _read_bytes(self, count):
731
735
"""See SmartClientStreamMedium._read_bytes."""
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
736
bytes_to_read = min(count, _MAX_READ_SIZE)
737
bytes = self._readable_pipe.read(bytes_to_read)
733
738
self._report_activity(len(bytes), 'read')
742
class SSHParams(object):
743
"""A set of parameters for starting a remote bzr via SSH."""
745
def __init__(self, host, port=None, username=None, password=None,
746
bzr_remote_path='bzr'):
749
self.username = username
750
self.password = password
751
self.bzr_remote_path = bzr_remote_path
737
754
class SmartSSHClientMedium(SmartClientStreamMedium):
738
"""A client medium using SSH."""
755
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
758
SmartClientAlreadyConnectedSocketMedium (depending on platform).
740
def __init__(self, host, port=None, username=None, password=None,
741
base=None, vendor=None, bzr_remote_path=None):
761
def __init__(self, base, ssh_params, vendor=None):
742
762
"""Creates a client that will connect on the first use.
764
:param ssh_params: A SSHParams instance.
744
765
:param vendor: An optional override for the ssh vendor to use. See
745
766
bzrlib.transport.ssh for details on ssh vendors.
747
self._connected = False
749
self._password = password
751
self._username = username
768
self._real_medium = None
769
self._ssh_params = ssh_params
752
770
# for the benefit of progress making a short description of this
754
772
self._scheme = 'bzr+ssh'
756
774
# _DebugCounter so we have to store all the values used in our repr
757
775
# method before calling the super init.
758
776
SmartClientStreamMedium.__init__(self, base)
759
self._read_from = None
777
self._vendor = vendor
760
778
self._ssh_connection = None
761
self._vendor = vendor
762
self._write_to = None
763
self._bzr_remote_path = bzr_remote_path
765
780
def __repr__(self):
766
if self._port is None:
781
if self._ssh_params.port is None:
769
maybe_port = ':%s' % self._port
784
maybe_port = ':%s' % self._ssh_params.port
770
785
return "%s(%s://%s@%s%s/)" % (
771
786
self.__class__.__name__,
788
self._ssh_params.username,
789
self._ssh_params.host,
777
792
def _accept_bytes(self, bytes):
778
793
"""See SmartClientStreamMedium.accept_bytes."""
779
794
self._ensure_connection()
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
795
self._real_medium.accept_bytes(bytes)
783
797
def disconnect(self):
784
798
"""See SmartClientMedium.disconnect()."""
785
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
789
self._ssh_connection.close()
790
self._connected = False
799
if self._real_medium is not None:
800
self._real_medium.disconnect()
801
self._real_medium = None
802
if self._ssh_connection is not None:
803
self._ssh_connection.close()
804
self._ssh_connection = None
792
806
def _ensure_connection(self):
793
807
"""Connect this medium if not already connected."""
808
if self._real_medium is not None:
796
810
if self._vendor is None:
797
811
vendor = ssh._get_ssh_vendor()
799
813
vendor = self._vendor
800
self._ssh_connection = vendor.connect_ssh(self._username,
801
self._password, self._host, self._port,
802
command=[self._bzr_remote_path, 'serve', '--inet',
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
803
818
'--directory=/', '--allow-writes'])
804
self._read_from, self._write_to = \
805
self._ssh_connection.get_filelike_channels()
806
self._connected = True
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
if io_kind == 'socket':
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
self.base, io_object)
823
elif io_kind == 'pipes':
824
read_from, write_to = io_object
825
self._real_medium = SmartSimplePipesClientMedium(
826
read_from, write_to, self.base)
828
raise AssertionError(
829
"Unexpected io_kind %r from %r"
830
% (io_kind, self._ssh_connection))
808
832
def _flush(self):
809
833
"""See SmartClientStreamMedium._flush()."""
810
self._write_to.flush()
834
self._real_medium._flush()
812
836
def _read_bytes(self, count):
813
837
"""See SmartClientStreamMedium.read_bytes."""
814
if not self._connected:
838
if self._real_medium is None:
815
839
raise errors.MediumNotConnected(self)
816
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
840
return self._real_medium.read_bytes(count)
822
843
# Port 4155 is the default port for bzr://, registered with IANA.
824
845
BZR_DEFAULT_PORT = 4155
827
class SmartTCPClientMedium(SmartClientStreamMedium):
828
"""A client medium using TCP."""
848
class SmartClientSocketMedium(SmartClientStreamMedium):
849
"""A client medium using a socket.
851
This class isn't usable directly. Use one of its subclasses instead.
854
def __init__(self, base):
855
SmartClientStreamMedium.__init__(self, base)
857
self._connected = False
859
def _accept_bytes(self, bytes):
860
"""See SmartClientMedium.accept_bytes."""
861
self._ensure_connection()
862
osutils.send_all(self._socket, bytes, self._report_activity)
864
def _ensure_connection(self):
865
"""Connect this medium if not already connected."""
866
raise NotImplementedError(self._ensure_connection)
869
"""See SmartClientStreamMedium._flush().
871
For sockets we do no flushing. For TCP sockets we may want to turn off
872
TCP_NODELAY and add a means to do a flush, but that can be done in the
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
return osutils.read_bytes_from_socket(
881
self._socket, self._report_activity)
883
def disconnect(self):
884
"""See SmartClientMedium.disconnect()."""
885
if not self._connected:
889
self._connected = False
892
class SmartTCPClientMedium(SmartClientSocketMedium):
893
"""A client medium that creates a TCP connection."""
830
895
def __init__(self, host, port, base):
831
896
"""Creates a client that will connect on the first use."""
832
SmartClientStreamMedium.__init__(self, base)
833
self._connected = False
897
SmartClientSocketMedium.__init__(self, base)
834
898
self._host = host
835
899
self._port = port
838
def _accept_bytes(self, bytes):
839
"""See SmartClientMedium.accept_bytes."""
840
self._ensure_connection()
841
osutils.send_all(self._socket, bytes, self._report_activity)
843
def disconnect(self):
844
"""See SmartClientMedium.disconnect()."""
845
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
849
self._connected = False
851
901
def _ensure_connection(self):
852
902
"""Connect this medium if not already connected."""
887
937
(self._host, port, err_msg))
888
938
self._connected = True
891
"""See SmartClientStreamMedium._flush().
893
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
894
add a means to do a flush, but that can be done in the future.
897
def _read_bytes(self, count):
898
"""See SmartClientMedium.read_bytes."""
899
if not self._connected:
900
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
"""A client medium for an already connected socket.
944
Note that this class will assume it "owns" the socket, so it will close it
945
when its disconnect method is called.
948
def __init__(self, base, sock):
949
SmartClientSocketMedium.__init__(self, base)
951
self._connected = True
953
def _ensure_connection(self):
954
# Already connected, by definition! So nothing to do.
905
958
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
942
995
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')