162
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
"""Handles smart commands coming over a stream.
274
287
def _read_bytes(self, desired_count):
275
288
# We ignore the desired_count because on sockets it's more efficient to
276
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
279
294
def terminate_due_to_error(self):
280
295
# TODO: This should log to a server log file, but no such thing
283
298
self.finished = True
285
300
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
301
osutils.send_all(self.socket, bytes, self._report_activity)
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
460
475
if not line.endswith('\n'):
461
476
# end of file encountered reading from server
462
477
raise errors.ConnectionReset(
463
"please check connectivity and permissions",
464
"(and try -Dhpss if further diagnosis is required)")
478
"please check connectivity and permissions")
467
481
def _read_line(self):
690
704
def _accept_bytes(self, bytes):
691
705
"""See SmartClientStreamMedium.accept_bytes."""
692
706
self._writeable_pipe.write(bytes)
707
self._report_activity(len(bytes), 'write')
694
709
def _flush(self):
695
710
"""See SmartClientStreamMedium._flush()."""
698
713
def _read_bytes(self, count):
699
714
"""See SmartClientStreamMedium._read_bytes."""
700
return self._readable_pipe.read(count)
715
bytes = self._readable_pipe.read(count)
716
self._report_activity(len(bytes), 'read')
703
720
class SmartSSHClientMedium(SmartClientStreamMedium):
710
727
:param vendor: An optional override for the ssh vendor to use. See
711
728
bzrlib.transport.ssh for details on ssh vendors.
713
SmartClientStreamMedium.__init__(self, base)
714
730
self._connected = False
715
731
self._host = host
716
732
self._password = password
717
733
self._port = port
718
734
self._username = username
735
# SmartClientStreamMedium stores the repr of this object in its
736
# _DebugCounter so we have to store all the values used in our repr
737
# method before calling the super init.
738
SmartClientStreamMedium.__init__(self, base)
719
739
self._read_from = None
720
740
self._ssh_connection = None
721
741
self._vendor = vendor
722
742
self._write_to = None
723
743
self._bzr_remote_path = bzr_remote_path
724
if self._bzr_remote_path is None:
725
symbol_versioning.warn(
726
'bzr_remote_path is required as of bzr 0.92',
727
DeprecationWarning, stacklevel=2)
728
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
746
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
747
self.__class__.__name__,
730
753
def _accept_bytes(self, bytes):
731
754
"""See SmartClientStreamMedium.accept_bytes."""
732
755
self._ensure_connection()
733
756
self._write_to.write(bytes)
757
self._report_activity(len(bytes), 'write')
735
759
def disconnect(self):
736
760
"""See SmartClientMedium.disconnect()."""
766
790
if not self._connected:
767
791
raise errors.MediumNotConnected(self)
768
792
bytes_to_read = min(count, _MAX_READ_SIZE)
769
return self._read_from.read(bytes_to_read)
793
bytes = self._read_from.read(bytes_to_read)
794
self._report_activity(len(bytes), 'read')
772
798
# Port 4155 is the default port for bzr://, registered with IANA.
788
814
def _accept_bytes(self, bytes):
789
815
"""See SmartClientMedium.accept_bytes."""
790
816
self._ensure_connection()
791
osutils.send_all(self._socket, bytes)
817
osutils.send_all(self._socket, bytes, self._report_activity)
793
819
def disconnect(self):
794
820
"""See SmartClientMedium.disconnect()."""
851
877
# We ignore the desired_count because on sockets it's more efficient to
852
878
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
854
return self._socket.recv(_MAX_READ_SIZE)
880
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
855
881
except socket.error, e:
856
882
if len(e.args) and e.args[0] == errno.ECONNRESET:
857
883
# Callers expect an empty string in that case
888
self._report_activity(len(bytes), 'read')
863
892
class SmartClientStreamMediumRequest(SmartClientMediumRequest):