47
45
from bzrlib.smart import client, protocol, request, vfs
48
46
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
48
from bzrlib import osutils
53
# We must not read any more than 64k at a time so we don't risk "no buffer
54
# space available" errors on some platforms. Windows in particular is likely
55
# to give error 10053 or 10055 if we read more than 64k from a socket.
56
_MAX_READ_SIZE = 64 * 1024
50
# Throughout this module buffer size parameters are either limited to be at
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
53
# from non-sockets as well.
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
59
56
def _get_protocol_factory_for_bytes(bytes):
60
57
"""Determine the right protocol factory for 'bytes'.
276
273
def _serve_one_request_unguarded(self, protocol):
277
274
while protocol.next_read_size():
278
275
# We can safely try to read large chunks. If there is less data
279
# than _MAX_READ_SIZE ready, the socket wil just return a short
280
# read immediately rather than block.
281
bytes = self.read_bytes(_MAX_READ_SIZE)
276
# than MAX_SOCKET_CHUNK ready, the socket will just return a
277
# short read immediately rather than block.
278
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
283
280
self.finished = True
287
284
self._push_back(protocol.unused_data)
289
286
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
287
return osutils.read_bytes_from_socket(
288
self.socket, self._report_activity)
293
290
def terminate_due_to_error(self):
294
291
# TODO: This should log to a server log file, but no such thing
295
292
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
294
self.finished = True
299
296
def _write_out(self, bytes):
334
331
bytes_to_read = protocol.next_read_size()
335
332
if bytes_to_read == 0:
336
333
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
336
bytes = self.read_bytes(bytes_to_read)
341
338
# Connection has been closed.
342
339
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
342
protocol.accept_bytes(bytes)
347
344
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
345
return self._in.read(desired_count)
350
347
def terminate_due_to_error(self):
351
348
# TODO: This should log to a server log file, but no such thing
352
349
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
351
self.finished = True
356
353
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
354
self._out.write(bytes)
360
357
class SmartClientMediumRequest(object):
496
493
class _DebugCounter(object):
497
494
"""An object that counts the HPSS calls made to each client medium.
499
When a medium is garbage-collected, or failing that when atexit functions
500
are run, the total number of calls made on that medium are reported via
496
When a medium is garbage-collected, or failing that when
497
bzrlib.global_state exits, the total number of calls made on that medium
498
are reported via trace.note.
504
501
def __init__(self):
505
502
self.counts = weakref.WeakKeyDictionary()
506
503
client._SmartClient.hooks.install_named_hook(
507
504
'call', self.increment_call_count, 'hpss call counter')
508
atexit.register(self.flush_all)
505
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
510
507
def track(self, medium):
511
508
"""Start tracking calls made to a medium.
609
606
# which is newer than a previously supplied older-than version.
610
607
# This indicates that some smart verb call is not guarded
611
608
# appropriately (it should simply not have been tried).
612
raise AssertionError(
613
610
"_remember_remote_is_before(%r) called, but "
614
611
"_remember_remote_is_before(%r) was called previously."
615
% (version_tuple, self._remote_version_is_before))
612
, version_tuple, self._remote_version_is_before)
613
if 'hpss' in debug.debug_flags:
614
ui.ui_factory.show_warning(
615
"_remember_remote_is_before(%r) called, but "
616
"_remember_remote_is_before(%r) was called previously."
617
% (version_tuple, self._remote_version_is_before))
616
619
self._remote_version_is_before = version_tuple
618
621
def protocol_version(self):
721
724
def _accept_bytes(self, bytes):
722
725
"""See SmartClientStreamMedium.accept_bytes."""
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
726
self._writeable_pipe.write(bytes)
724
727
self._report_activity(len(bytes), 'write')
726
729
def _flush(self):
727
730
"""See SmartClientStreamMedium._flush()."""
728
osutils.until_no_eintr(self._writeable_pipe.flush)
731
self._writeable_pipe.flush()
730
733
def _read_bytes(self, count):
731
734
"""See SmartClientStreamMedium._read_bytes."""
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
733
737
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
744
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
737
753
class SmartSSHClientMedium(SmartClientStreamMedium):
738
"""A client medium using SSH."""
754
"""A client medium using SSH.
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
740
def __init__(self, host, port=None, username=None, password=None,
741
base=None, vendor=None, bzr_remote_path=None):
760
def __init__(self, base, ssh_params, vendor=None):
742
761
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
744
764
:param vendor: An optional override for the ssh vendor to use. See
745
765
bzrlib.transport.ssh for details on ssh vendors.
747
self._connected = False
749
self._password = password
751
self._username = username
767
self._real_medium = None
768
self._ssh_params = ssh_params
752
769
# for the benefit of progress making a short description of this
754
771
self._scheme = 'bzr+ssh'
756
773
# _DebugCounter so we have to store all the values used in our repr
757
774
# method before calling the super init.
758
775
SmartClientStreamMedium.__init__(self, base)
759
self._read_from = None
776
self._vendor = vendor
760
777
self._ssh_connection = None
761
self._vendor = vendor
762
self._write_to = None
763
self._bzr_remote_path = bzr_remote_path
765
779
def __repr__(self):
766
if self._port is None:
780
if self._ssh_params.port is None:
769
maybe_port = ':%s' % self._port
783
maybe_port = ':%s' % self._ssh_params.port
770
784
return "%s(%s://%s@%s%s/)" % (
771
785
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
777
791
def _accept_bytes(self, bytes):
778
792
"""See SmartClientStreamMedium.accept_bytes."""
779
793
self._ensure_connection()
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
794
self._real_medium.accept_bytes(bytes)
783
796
def disconnect(self):
784
797
"""See SmartClientMedium.disconnect()."""
785
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
789
self._ssh_connection.close()
790
self._connected = False
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
792
805
def _ensure_connection(self):
793
806
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
796
809
if self._vendor is None:
797
810
vendor = ssh._get_ssh_vendor()
799
812
vendor = self._vendor
800
self._ssh_connection = vendor.connect_ssh(self._username,
801
self._password, self._host, self._port,
802
command=[self._bzr_remote_path, 'serve', '--inet',
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
803
817
'--directory=/', '--allow-writes'])
804
self._read_from, self._write_to = \
805
self._ssh_connection.get_filelike_channels()
806
self._connected = True
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
808
831
def _flush(self):
809
832
"""See SmartClientStreamMedium._flush()."""
810
self._write_to.flush()
833
self._real_medium._flush()
812
835
def _read_bytes(self, count):
813
836
"""See SmartClientStreamMedium.read_bytes."""
814
if not self._connected:
837
if self._real_medium is None:
815
838
raise errors.MediumNotConnected(self)
816
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
839
return self._real_medium.read_bytes(count)
822
842
# Port 4155 is the default port for bzr://, registered with IANA.
824
844
BZR_DEFAULT_PORT = 4155
827
class SmartTCPClientMedium(SmartClientStreamMedium):
828
"""A client medium using TCP."""
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
850
This class isn't usable directly. Use one of its subclasses instead.
853
def __init__(self, base):
854
SmartClientStreamMedium.__init__(self, base)
856
self._connected = False
858
def _accept_bytes(self, bytes):
859
"""See SmartClientMedium.accept_bytes."""
860
self._ensure_connection()
861
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
882
def disconnect(self):
883
"""See SmartClientMedium.disconnect()."""
884
if not self._connected:
888
self._connected = False
891
class SmartTCPClientMedium(SmartClientSocketMedium):
892
"""A client medium that creates a TCP connection."""
830
894
def __init__(self, host, port, base):
831
895
"""Creates a client that will connect on the first use."""
832
SmartClientStreamMedium.__init__(self, base)
833
self._connected = False
896
SmartClientSocketMedium.__init__(self, base)
834
897
self._host = host
835
898
self._port = port
838
def _accept_bytes(self, bytes):
839
"""See SmartClientMedium.accept_bytes."""
840
self._ensure_connection()
841
osutils.send_all(self._socket, bytes, self._report_activity)
843
def disconnect(self):
844
"""See SmartClientMedium.disconnect()."""
845
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
849
self._connected = False
851
900
def _ensure_connection(self):
852
901
"""Connect this medium if not already connected."""
887
936
(self._host, port, err_msg))
888
937
self._connected = True
891
"""See SmartClientStreamMedium._flush().
893
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
894
add a means to do a flush, but that can be done in the future.
897
def _read_bytes(self, count):
898
"""See SmartClientMedium.read_bytes."""
899
if not self._connected:
900
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
"""A client medium for an already connected socket.
943
Note that this class will assume it "owns" the socket, so it will close it
944
when its disconnect method is called.
947
def __init__(self, base, sock):
948
SmartClientSocketMedium.__init__(self, base)
950
self._connected = True
952
def _ensure_connection(self):
953
# Already connected, by definition! So nothing to do.
905
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
942
994
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')