13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""The 'medium' layer for the smart servers and clients.
89
83
def _get_line(read_bytes_func):
90
84
"""Read bytes using read_bytes_func until a newline byte.
92
86
This isn't particularly efficient, so should only be used when the
93
87
expected size of the line is quite short.
95
89
:returns: a tuple of two strs: (line, excess)
162
156
line, excess = _get_line(self.read_bytes)
163
157
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
179
161
class SmartServerStreamMedium(SmartMedium):
180
162
"""Handles smart commands coming over a stream.
185
167
One instance is created for each connected client; it can serve multiple
186
168
requests in the lifetime of the connection.
188
The server passes requests through to an underlying backing transport,
170
The server passes requests through to an underlying backing transport,
189
171
which will typically be a LocalTransport looking at the server's filesystem.
191
173
:ivar _push_back_buffer: a str of bytes that have been read from the stream
281
263
self.finished = True
283
265
protocol.accept_bytes(bytes)
285
267
self._push_back(protocol.unused_data)
287
269
def _read_bytes(self, desired_count):
288
270
# We ignore the desired_count because on sockets it's more efficient to
289
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
272
return self.socket.recv(_MAX_READ_SIZE)
294
274
def terminate_due_to_error(self):
295
275
# TODO: This should log to a server log file, but no such thing
365
345
request.finished_reading()
367
347
It is up to the individual SmartClientMedium whether multiple concurrent
368
requests can exist. See SmartClientMedium.get_request to obtain instances
369
of SmartClientMediumRequest, and the concrete Medium you are using for
348
requests can exist. See SmartClientMedium.get_request to obtain instances
349
of SmartClientMediumRequest, and the concrete Medium you are using for
370
350
details on concurrency and pipelining.
475
455
if not line.endswith('\n'):
476
456
# end of file encountered reading from server
477
457
raise errors.ConnectionReset(
478
"please check connectivity and permissions")
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
481
462
def _read_line(self):
482
463
"""Helper for SmartClientMediumRequest.read_line.
484
465
By default this forwards to self._medium._get_line because we are
485
466
operating on the medium's stream.
487
468
return self._medium._get_line()
490
class _DebugCounter(object):
491
"""An object that counts the HPSS calls made to each client medium.
493
When a medium is garbage-collected, or failing that when atexit functions
494
are run, the total number of calls made on that medium are reported via
499
self.counts = weakref.WeakKeyDictionary()
500
client._SmartClient.hooks.install_named_hook(
501
'call', self.increment_call_count, 'hpss call counter')
502
atexit.register(self.flush_all)
504
def track(self, medium):
505
"""Start tracking calls made to a medium.
507
This only keeps a weakref to the medium, so shouldn't affect the
510
medium_repr = repr(medium)
511
# Add this medium to the WeakKeyDictionary
512
self.counts[medium] = [0, medium_repr]
513
# Weakref callbacks are fired in reverse order of their association
514
# with the referenced object. So we add a weakref *after* adding to
515
# the WeakKeyDict so that we can report the value from it before the
516
# entry is removed by the WeakKeyDict's own callback.
517
ref = weakref.ref(medium, self.done)
519
def increment_call_count(self, params):
520
# Increment the count in the WeakKeyDictionary
521
value = self.counts[params.medium]
525
value = self.counts[ref]
526
count, medium_repr = value
527
# In case this callback is invoked for the same ref twice (by the
528
# weakref callback and by the atexit function), set the call count back
529
# to 0 so this item won't be reported twice.
532
trace.note('HPSS calls: %d %s', count, medium_repr)
535
for ref in list(self.counts.keys()):
538
_debug_counter = None
541
471
class SmartClientMedium(SmartMedium):
542
472
"""Smart client is a medium for sending smart protocol requests over."""
552
482
# _remote_version_is_before tracks the bzr version the remote side
553
483
# can be based on what we've seen so far.
554
484
self._remote_version_is_before = None
555
# Install debug hook function if debug flag is set.
556
if 'hpss' in debug.debug_flags:
557
global _debug_counter
558
if _debug_counter is None:
559
_debug_counter = _DebugCounter()
560
_debug_counter.track(self)
562
486
def _is_remote_before(self, version_tuple):
563
487
"""Is it possible the remote side supports RPCs for a given version?
589
513
if (self._remote_version_is_before is not None and
590
514
version_tuple > self._remote_version_is_before):
591
# We have been told that the remote side is older than some version
592
# which is newer than a previously supplied older-than version.
593
# This indicates that some smart verb call is not guarded
594
# appropriately (it should simply not have been tried).
595
515
raise AssertionError(
596
516
"_remember_remote_is_before(%r) called, but "
597
517
"_remember_remote_is_before(%r) was called previously."
636
556
def disconnect(self):
637
557
"""If this medium maintains a persistent connection, close it.
639
559
The default implementation does nothing.
642
562
def remote_path_from_transport(self, transport):
643
563
"""Convert transport into a path suitable for using in a request.
645
565
Note that the resulting remote path doesn't encode the host name or
646
566
anything but path, so it is only safe to use it in requests sent over
647
567
the medium from the matching transport.
704
624
def _accept_bytes(self, bytes):
705
625
"""See SmartClientStreamMedium.accept_bytes."""
706
626
self._writeable_pipe.write(bytes)
707
self._report_activity(len(bytes), 'write')
709
628
def _flush(self):
710
629
"""See SmartClientStreamMedium._flush()."""
713
632
def _read_bytes(self, count):
714
633
"""See SmartClientStreamMedium._read_bytes."""
715
bytes = self._readable_pipe.read(count)
716
self._report_activity(len(bytes), 'read')
634
return self._readable_pipe.read(count)
720
637
class SmartSSHClientMedium(SmartClientStreamMedium):
721
638
"""A client medium using SSH."""
723
640
def __init__(self, host, port=None, username=None, password=None,
724
641
base=None, vendor=None, bzr_remote_path=None):
725
642
"""Creates a client that will connect on the first use.
727
644
:param vendor: An optional override for the ssh vendor to use. See
728
645
bzrlib.transport.ssh for details on ssh vendors.
647
SmartClientStreamMedium.__init__(self, base)
730
648
self._connected = False
731
649
self._host = host
732
650
self._password = password
733
651
self._port = port
734
652
self._username = username
735
# SmartClientStreamMedium stores the repr of this object in its
736
# _DebugCounter so we have to store all the values used in our repr
737
# method before calling the super init.
738
SmartClientStreamMedium.__init__(self, base)
739
653
self._read_from = None
740
654
self._ssh_connection = None
741
655
self._vendor = vendor
742
656
self._write_to = None
743
657
self._bzr_remote_path = bzr_remote_path
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
750
self.__class__.__name__,
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
756
664
def _accept_bytes(self, bytes):
757
665
"""See SmartClientStreamMedium.accept_bytes."""
758
666
self._ensure_connection()
759
667
self._write_to.write(bytes)
760
self._report_activity(len(bytes), 'write')
762
669
def disconnect(self):
763
670
"""See SmartClientMedium.disconnect()."""
793
700
if not self._connected:
794
701
raise errors.MediumNotConnected(self)
795
702
bytes_to_read = min(count, _MAX_READ_SIZE)
796
bytes = self._read_from.read(bytes_to_read)
797
self._report_activity(len(bytes), 'read')
703
return self._read_from.read(bytes_to_read)
801
706
# Port 4155 is the default port for bzr://, registered with IANA.
802
BZR_DEFAULT_INTERFACE = None
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
803
708
BZR_DEFAULT_PORT = 4155
806
711
class SmartTCPClientMedium(SmartClientStreamMedium):
807
712
"""A client medium using TCP."""
809
714
def __init__(self, host, port, base):
810
715
"""Creates a client that will connect on the first use."""
811
716
SmartClientStreamMedium.__init__(self, base)
817
722
def _accept_bytes(self, bytes):
818
723
"""See SmartClientMedium.accept_bytes."""
819
724
self._ensure_connection()
820
osutils.send_all(self._socket, bytes, self._report_activity)
725
osutils.send_all(self._socket, bytes)
822
727
def disconnect(self):
823
728
"""See SmartClientMedium.disconnect()."""
831
736
"""Connect this medium if not already connected."""
832
737
if self._connected:
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
834
741
if self._port is None:
835
742
port = BZR_DEFAULT_PORT
837
744
port = int(self._port)
839
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
840
socket.SOCK_STREAM, 0, 0)
841
except socket.gaierror, (err_num, err_msg):
842
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
843
(self._host, port, err_msg))
844
# Initialize err in case there are no addresses returned:
845
err = socket.error("no address found for %s" % self._host)
846
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
848
self._socket = socket.socket(family, socktype, proto)
849
self._socket.setsockopt(socket.IPPROTO_TCP,
850
socket.TCP_NODELAY, 1)
851
self._socket.connect(sockaddr)
852
except socket.error, err:
853
if self._socket is not None:
858
if self._socket is None:
746
self._socket.connect((self._host, port))
747
except socket.error, err:
859
748
# socket errors either have a (string) or (errno, string) as their
861
750
if type(err.args) is str:
879
768
raise errors.MediumNotConnected(self)
880
769
# We ignore the desired_count because on sockets it's more efficient to
881
770
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
except socket.error, e:
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
771
return self._socket.recv(_MAX_READ_SIZE)
895
774
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
917
796
def _finished_reading(self):
918
797
"""See SmartClientMediumRequest._finished_reading.
920
This clears the _current_request on self._medium to allow a new
799
This clears the _current_request on self._medium to allow a new
921
800
request to be created.
923
802
if self._medium._current_request is not self:
924
803
raise AssertionError()
925
804
self._medium._current_request = None
927
806
def _finished_writing(self):
928
807
"""See SmartClientMediumRequest._finished_writing.