13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""The 'medium' layer for the smart servers and clients.
88
89
def _get_line(read_bytes_func):
89
90
"""Read bytes using read_bytes_func until a newline byte.
91
92
This isn't particularly efficient, so should only be used when the
92
93
expected size of the line is quite short.
94
95
:returns: a tuple of two strs: (line, excess)
161
162
line, excess = _get_line(self.read_bytes)
162
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
"""Handles smart commands coming over a stream.
172
185
One instance is created for each connected client; it can serve multiple
173
186
requests in the lifetime of the connection.
175
The server passes requests through to an underlying backing transport,
188
The server passes requests through to an underlying backing transport,
176
189
which will typically be a LocalTransport looking at the server's filesystem.
178
191
:ivar _push_back_buffer: a str of bytes that have been read from the stream
268
281
self.finished = True
270
283
protocol.accept_bytes(bytes)
272
285
self._push_back(protocol.unused_data)
274
287
def _read_bytes(self, desired_count):
275
288
# We ignore the desired_count because on sockets it's more efficient to
276
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
279
294
def terminate_due_to_error(self):
280
295
# TODO: This should log to a server log file, but no such thing
283
298
self.finished = True
285
300
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
301
osutils.send_all(self.socket, bytes, self._report_activity)
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
350
365
request.finished_reading()
352
367
It is up to the individual SmartClientMedium whether multiple concurrent
353
requests can exist. See SmartClientMedium.get_request to obtain instances
354
of SmartClientMediumRequest, and the concrete Medium you are using for
368
requests can exist. See SmartClientMedium.get_request to obtain instances
369
of SmartClientMediumRequest, and the concrete Medium you are using for
355
370
details on concurrency and pipelining.
403
418
def _finished_reading(self):
404
419
"""Helper for finished_reading.
406
finished_reading checks the state of the request to determine if
421
finished_reading checks the state of the request to determine if
407
422
finished_reading is allowed, and if it is hands off to _finished_reading
408
423
to perform the action.
423
438
def _finished_writing(self):
424
439
"""Helper for finished_writing.
426
finished_writing checks the state of the request to determine if
441
finished_writing checks the state of the request to determine if
427
442
finished_writing is allowed, and if it is hands off to _finished_writing
428
443
to perform the action.
460
475
if not line.endswith('\n'):
461
476
# end of file encountered reading from server
462
477
raise errors.ConnectionReset(
463
"please check connectivity and permissions",
464
"(and try -Dhpss if further diagnosis is required)")
478
"please check connectivity and permissions")
467
481
def _read_line(self):
468
482
"""Helper for SmartClientMediumRequest.read_line.
470
484
By default this forwards to self._medium._get_line because we are
471
485
operating on the medium's stream.
518
532
trace.note('HPSS calls: %d %s', count, medium_repr)
520
534
def flush_all(self):
521
535
for ref in list(self.counts.keys()):
524
538
_debug_counter = None
527
541
class SmartClientMedium(SmartMedium):
528
542
"""Smart client is a medium for sending smart protocol requests over."""
622
636
def disconnect(self):
623
637
"""If this medium maintains a persistent connection, close it.
625
639
The default implementation does nothing.
628
642
def remote_path_from_transport(self, transport):
629
643
"""Convert transport into a path suitable for using in a request.
631
645
Note that the resulting remote path doesn't encode the host name or
632
646
anything but path, so it is only safe to use it in requests sent over
633
647
the medium from the matching transport.
690
704
def _accept_bytes(self, bytes):
691
705
"""See SmartClientStreamMedium.accept_bytes."""
692
706
self._writeable_pipe.write(bytes)
707
self._report_activity(len(bytes), 'write')
694
709
def _flush(self):
695
710
"""See SmartClientStreamMedium._flush()."""
698
713
def _read_bytes(self, count):
699
714
"""See SmartClientStreamMedium._read_bytes."""
700
return self._readable_pipe.read(count)
715
bytes = self._readable_pipe.read(count)
716
self._report_activity(len(bytes), 'read')
703
720
class SmartSSHClientMedium(SmartClientStreamMedium):
704
721
"""A client medium using SSH."""
706
723
def __init__(self, host, port=None, username=None, password=None,
707
724
base=None, vendor=None, bzr_remote_path=None):
708
725
"""Creates a client that will connect on the first use.
710
727
:param vendor: An optional override for the ssh vendor to use. See
711
728
bzrlib.transport.ssh for details on ssh vendors.
713
SmartClientStreamMedium.__init__(self, base)
714
730
self._connected = False
715
731
self._host = host
716
732
self._password = password
717
733
self._port = port
718
734
self._username = username
735
# SmartClientStreamMedium stores the repr of this object in its
736
# _DebugCounter so we have to store all the values used in our repr
737
# method before calling the super init.
738
SmartClientStreamMedium.__init__(self, base)
719
739
self._read_from = None
720
740
self._ssh_connection = None
721
741
self._vendor = vendor
722
742
self._write_to = None
723
743
self._bzr_remote_path = bzr_remote_path
724
if self._bzr_remote_path is None:
725
symbol_versioning.warn(
726
'bzr_remote_path is required as of bzr 0.92',
727
DeprecationWarning, stacklevel=2)
728
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
750
self.__class__.__name__,
730
756
def _accept_bytes(self, bytes):
731
757
"""See SmartClientStreamMedium.accept_bytes."""
732
758
self._ensure_connection()
733
759
self._write_to.write(bytes)
760
self._report_activity(len(bytes), 'write')
735
762
def disconnect(self):
736
763
"""See SmartClientMedium.disconnect()."""
766
793
if not self._connected:
767
794
raise errors.MediumNotConnected(self)
768
795
bytes_to_read = min(count, _MAX_READ_SIZE)
769
return self._read_from.read(bytes_to_read)
796
bytes = self._read_from.read(bytes_to_read)
797
self._report_activity(len(bytes), 'read')
772
801
# Port 4155 is the default port for bzr://, registered with IANA.
777
806
class SmartTCPClientMedium(SmartClientStreamMedium):
778
807
"""A client medium using TCP."""
780
809
def __init__(self, host, port, base):
781
810
"""Creates a client that will connect on the first use."""
782
811
SmartClientStreamMedium.__init__(self, base)
788
817
def _accept_bytes(self, bytes):
789
818
"""See SmartClientMedium.accept_bytes."""
790
819
self._ensure_connection()
791
osutils.send_all(self._socket, bytes)
820
osutils.send_all(self._socket, bytes, self._report_activity)
793
822
def disconnect(self):
794
823
"""See SmartClientMedium.disconnect()."""
808
837
port = int(self._port)
810
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
839
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
811
840
socket.SOCK_STREAM, 0, 0)
812
841
except socket.gaierror, (err_num, err_msg):
813
842
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
817
846
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
819
848
self._socket = socket.socket(family, socktype, proto)
820
self._socket.setsockopt(socket.IPPROTO_TCP,
849
self._socket.setsockopt(socket.IPPROTO_TCP,
821
850
socket.TCP_NODELAY, 1)
822
851
self._socket.connect(sockaddr)
823
852
except socket.error, err:
851
880
# We ignore the desired_count because on sockets it's more efficient to
852
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
854
return self._socket.recv(_MAX_READ_SIZE)
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
855
884
except socket.error, e:
856
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
857
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
863
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
885
917
def _finished_reading(self):
886
918
"""See SmartClientMediumRequest._finished_reading.
888
This clears the _current_request on self._medium to allow a new
920
This clears the _current_request on self._medium to allow a new
889
921
request to be created.
891
923
if self._medium._current_request is not self:
892
924
raise AssertionError()
893
925
self._medium._current_request = None
895
927
def _finished_writing(self):
896
928
"""See SmartClientMediumRequest._finished_writing.