13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""The 'medium' layer for the smart servers and clients.
156
162
line, excess = _get_line(self.read_bytes)
157
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
161
179
class SmartServerStreamMedium(SmartMedium):
162
180
"""Handles smart commands coming over a stream.
167
185
One instance is created for each connected client; it can serve multiple
168
186
requests in the lifetime of the connection.
170
The server passes requests through to an underlying backing transport,
188
The server passes requests through to an underlying backing transport,
171
189
which will typically be a LocalTransport looking at the server's filesystem.
173
191
:ivar _push_back_buffer: a str of bytes that have been read from the stream
263
281
self.finished = True
265
283
protocol.accept_bytes(bytes)
267
285
self._push_back(protocol.unused_data)
269
287
def _read_bytes(self, desired_count):
270
# We ignore the desired_count because on sockets it's more efficient to
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
288
return _read_bytes_from_socket(
289
self.socket.recv, desired_count, self._report_activity)
274
291
def terminate_due_to_error(self):
275
292
# TODO: This should log to a server log file, but no such thing
345
362
request.finished_reading()
347
364
It is up to the individual SmartClientMedium whether multiple concurrent
348
requests can exist. See SmartClientMedium.get_request to obtain instances
349
of SmartClientMediumRequest, and the concrete Medium you are using for
365
requests can exist. See SmartClientMedium.get_request to obtain instances
366
of SmartClientMediumRequest, and the concrete Medium you are using for
350
367
details on concurrency and pipelining.
361
378
def accept_bytes(self, bytes):
362
379
"""Accept bytes for inclusion in this request.
364
This method may not be be called after finished_writing() has been
381
This method may not be called after finished_writing() has been
365
382
called. It depends upon the Medium whether or not the bytes will be
366
383
immediately transmitted. Message based Mediums will tend to buffer the
367
384
bytes until finished_writing() is called.
455
472
if not line.endswith('\n'):
456
473
# end of file encountered reading from server
457
474
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
475
"please check connectivity and permissions")
462
478
def _read_line(self):
463
479
"""Helper for SmartClientMediumRequest.read_line.
465
481
By default this forwards to self._medium._get_line because we are
466
482
operating on the medium's stream.
468
484
return self._medium._get_line()
487
class _DebugCounter(object):
488
"""An object that counts the HPSS calls made to each client medium.
490
When a medium is garbage-collected, or failing that when atexit functions
491
are run, the total number of calls made on that medium are reported via
496
self.counts = weakref.WeakKeyDictionary()
497
client._SmartClient.hooks.install_named_hook(
498
'call', self.increment_call_count, 'hpss call counter')
499
atexit.register(self.flush_all)
501
def track(self, medium):
502
"""Start tracking calls made to a medium.
504
This only keeps a weakref to the medium, so shouldn't affect the
507
medium_repr = repr(medium)
508
# Add this medium to the WeakKeyDictionary
509
self.counts[medium] = dict(count=0, vfs_count=0,
510
medium_repr=medium_repr)
511
# Weakref callbacks are fired in reverse order of their association
512
# with the referenced object. So we add a weakref *after* adding to
513
# the WeakKeyDict so that we can report the value from it before the
514
# entry is removed by the WeakKeyDict's own callback.
515
ref = weakref.ref(medium, self.done)
517
def increment_call_count(self, params):
518
# Increment the count in the WeakKeyDictionary
519
value = self.counts[params.medium]
521
request_method = request.request_handlers.get(params.method)
522
if issubclass(request_method, vfs.VfsRequest):
523
value['vfs_count'] += 1
526
value = self.counts[ref]
527
count, vfs_count, medium_repr = (
528
value['count'], value['vfs_count'], value['medium_repr'])
529
# In case this callback is invoked for the same ref twice (by the
530
# weakref callback and by the atexit function), set the call count back
531
# to 0 so this item won't be reported twice.
533
value['vfs_count'] = 0
535
trace.note('HPSS calls: %d (%d vfs) %s',
536
count, vfs_count, medium_repr)
539
for ref in list(self.counts.keys()):
542
_debug_counter = None
471
545
class SmartClientMedium(SmartMedium):
472
546
"""Smart client is a medium for sending smart protocol requests over."""
482
556
# _remote_version_is_before tracks the bzr version the remote side
483
557
# can be based on what we've seen so far.
484
558
self._remote_version_is_before = None
559
# Install debug hook function if debug flag is set.
560
if 'hpss' in debug.debug_flags:
561
global _debug_counter
562
if _debug_counter is None:
563
_debug_counter = _DebugCounter()
564
_debug_counter.track(self)
486
566
def _is_remote_before(self, version_tuple):
487
567
"""Is it possible the remote side supports RPCs for a given version?
513
593
if (self._remote_version_is_before is not None and
514
594
version_tuple > self._remote_version_is_before):
595
# We have been told that the remote side is older than some version
596
# which is newer than a previously supplied older-than version.
597
# This indicates that some smart verb call is not guarded
598
# appropriately (it should simply not have been tried).
515
599
raise AssertionError(
516
600
"_remember_remote_is_before(%r) called, but "
517
601
"_remember_remote_is_before(%r) was called previously."
556
640
def disconnect(self):
557
641
"""If this medium maintains a persistent connection, close it.
559
643
The default implementation does nothing.
562
646
def remote_path_from_transport(self, transport):
563
647
"""Convert transport into a path suitable for using in a request.
565
649
Note that the resulting remote path doesn't encode the host name or
566
650
anything but path, so it is only safe to use it in requests sent over
567
651
the medium from the matching transport.
632
717
def _read_bytes(self, count):
633
718
"""See SmartClientStreamMedium._read_bytes."""
634
return self._readable_pipe.read(count)
719
bytes = self._readable_pipe.read(count)
720
self._report_activity(len(bytes), 'read')
637
724
class SmartSSHClientMedium(SmartClientStreamMedium):
638
725
"""A client medium using SSH."""
640
727
def __init__(self, host, port=None, username=None, password=None,
641
728
base=None, vendor=None, bzr_remote_path=None):
642
729
"""Creates a client that will connect on the first use.
644
731
:param vendor: An optional override for the ssh vendor to use. See
645
732
bzrlib.transport.ssh for details on ssh vendors.
647
SmartClientStreamMedium.__init__(self, base)
648
734
self._connected = False
649
735
self._host = host
650
736
self._password = password
651
737
self._port = port
652
738
self._username = username
739
# SmartClientStreamMedium stores the repr of this object in its
740
# _DebugCounter so we have to store all the values used in our repr
741
# method before calling the super init.
742
SmartClientStreamMedium.__init__(self, base)
653
743
self._read_from = None
654
744
self._ssh_connection = None
655
745
self._vendor = vendor
656
746
self._write_to = None
657
747
self._bzr_remote_path = bzr_remote_path
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
748
# for the benefit of progress making a short description of this
750
self._scheme = 'bzr+ssh'
753
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
754
self.__class__.__name__,
664
760
def _accept_bytes(self, bytes):
665
761
"""See SmartClientStreamMedium.accept_bytes."""
666
762
self._ensure_connection()
667
763
self._write_to.write(bytes)
764
self._report_activity(len(bytes), 'write')
669
766
def disconnect(self):
670
767
"""See SmartClientMedium.disconnect()."""
700
797
if not self._connected:
701
798
raise errors.MediumNotConnected(self)
702
799
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
800
bytes = self._read_from.read(bytes_to_read)
801
self._report_activity(len(bytes), 'read')
706
805
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
806
BZR_DEFAULT_INTERFACE = None
708
807
BZR_DEFAULT_PORT = 4155
711
810
class SmartTCPClientMedium(SmartClientStreamMedium):
712
811
"""A client medium using TCP."""
714
813
def __init__(self, host, port, base):
715
814
"""Creates a client that will connect on the first use."""
716
815
SmartClientStreamMedium.__init__(self, base)
722
821
def _accept_bytes(self, bytes):
723
822
"""See SmartClientMedium.accept_bytes."""
724
823
self._ensure_connection()
725
osutils.send_all(self._socket, bytes)
824
osutils.send_all(self._socket, bytes, self._report_activity)
727
826
def disconnect(self):
728
827
"""See SmartClientMedium.disconnect()."""
736
835
"""Connect this medium if not already connected."""
737
836
if self._connected:
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
741
838
if self._port is None:
742
839
port = BZR_DEFAULT_PORT
744
841
port = int(self._port)
746
self._socket.connect((self._host, port))
747
except socket.error, err:
843
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
844
socket.SOCK_STREAM, 0, 0)
845
except socket.gaierror, (err_num, err_msg):
846
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
847
(self._host, port, err_msg))
848
# Initialize err in case there are no addresses returned:
849
err = socket.error("no address found for %s" % self._host)
850
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
852
self._socket = socket.socket(family, socktype, proto)
853
self._socket.setsockopt(socket.IPPROTO_TCP,
854
socket.TCP_NODELAY, 1)
855
self._socket.connect(sockaddr)
856
except socket.error, err:
857
if self._socket is not None:
862
if self._socket is None:
748
863
# socket errors either have a (string) or (errno, string) as their
750
865
if type(err.args) is str:
766
881
"""See SmartClientMedium.read_bytes."""
767
882
if not self._connected:
768
883
raise errors.MediumNotConnected(self)
769
# We ignore the desired_count because on sockets it's more efficient to
770
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
771
return self._socket.recv(_MAX_READ_SIZE)
884
return _read_bytes_from_socket(
885
self._socket.recv, count, self._report_activity)
774
888
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
796
910
def _finished_reading(self):
797
911
"""See SmartClientMediumRequest._finished_reading.
799
This clears the _current_request on self._medium to allow a new
913
This clears the _current_request on self._medium to allow a new
800
914
request to be created.
802
916
if self._medium._current_request is not self:
803
917
raise AssertionError()
804
918
self._medium._current_request = None
806
920
def _finished_writing(self):
807
921
"""See SmartClientMediumRequest._finished_writing.
811
925
self._medium._flush()
928
def _read_bytes_from_socket(sock, desired_count, report_activity):
929
# We ignore the desired_count because on sockets it's more efficient to
930
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
932
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
933
except socket.error, e:
934
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
935
# The connection was closed by the other side. Callers expect an
936
# empty string to signal end-of-stream.
941
report_activity(len(bytes), 'read')