13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""The 'medium' layer for the smart servers and clients.
32
33
from bzrlib.lazy_import import lazy_import
33
34
lazy_import(globals(), """
34
39
from bzrlib import (
40
from bzrlib.smart import protocol
47
from bzrlib.smart import client, protocol, request, vfs
41
48
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
45
53
# We must not read any more than 64k at a time so we don't risk "no buffer
46
54
# space available" errors on some platforms. Windows in particular is likely
156
164
line, excess = _get_line(self.read_bytes)
157
165
self._push_back(excess)
168
def _report_activity(self, bytes, direction):
169
"""Notify that this medium has activity.
171
Implementations should call this from all methods that actually do IO.
172
Be careful that it's not called twice, if one method is implemented on
175
:param bytes: Number of bytes read or written.
176
:param direction: 'read' or 'write' or None.
178
ui.ui_factory.report_transport_activity(self, bytes, direction)
161
181
class SmartServerStreamMedium(SmartMedium):
162
182
"""Handles smart commands coming over a stream.
263
283
self.finished = True
265
285
protocol.accept_bytes(bytes)
267
287
self._push_back(protocol.unused_data)
269
289
def _read_bytes(self, desired_count):
270
# We ignore the desired_count because on sockets it's more efficient to
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
274
293
def terminate_due_to_error(self):
275
294
# TODO: This should log to a server log file, but no such thing
276
295
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
278
297
self.finished = True
280
299
def _write_out(self, bytes):
281
osutils.send_all(self.socket, bytes)
300
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
284
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
309
334
bytes_to_read = protocol.next_read_size()
310
335
if bytes_to_read == 0:
311
336
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
314
339
bytes = self.read_bytes(bytes_to_read)
316
341
# Connection has been closed.
317
342
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
320
345
protocol.accept_bytes(bytes)
322
347
def _read_bytes(self, desired_count):
323
return self._in.read(desired_count)
348
return osutils.until_no_eintr(self._in.read, desired_count)
325
350
def terminate_due_to_error(self):
326
351
# TODO: This should log to a server log file, but no such thing
327
352
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
329
354
self.finished = True
331
356
def _write_out(self, bytes):
332
self._out.write(bytes)
357
osutils.until_no_eintr(self._out.write, bytes)
335
360
class SmartClientMediumRequest(object):
345
370
request.finished_reading()
347
372
It is up to the individual SmartClientMedium whether multiple concurrent
348
requests can exist. See SmartClientMedium.get_request to obtain instances
349
of SmartClientMediumRequest, and the concrete Medium you are using for
373
requests can exist. See SmartClientMedium.get_request to obtain instances
374
of SmartClientMediumRequest, and the concrete Medium you are using for
350
375
details on concurrency and pipelining.
361
386
def accept_bytes(self, bytes):
362
387
"""Accept bytes for inclusion in this request.
364
This method may not be be called after finished_writing() has been
389
This method may not be called after finished_writing() has been
365
390
called. It depends upon the Medium whether or not the bytes will be
366
391
immediately transmitted. Message based Mediums will tend to buffer the
367
392
bytes until finished_writing() is called.
455
480
if not line.endswith('\n'):
456
481
# end of file encountered reading from server
457
482
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
462
487
def _read_line(self):
463
488
"""Helper for SmartClientMediumRequest.read_line.
465
490
By default this forwards to self._medium._get_line because we are
466
491
operating on the medium's stream.
468
493
return self._medium._get_line()
496
class _DebugCounter(object):
497
"""An object that counts the HPSS calls made to each client medium.
499
When a medium is garbage-collected, or failing that when atexit functions
500
are run, the total number of calls made on that medium are reported via
505
self.counts = weakref.WeakKeyDictionary()
506
client._SmartClient.hooks.install_named_hook(
507
'call', self.increment_call_count, 'hpss call counter')
508
atexit.register(self.flush_all)
510
def track(self, medium):
511
"""Start tracking calls made to a medium.
513
This only keeps a weakref to the medium, so shouldn't affect the
516
medium_repr = repr(medium)
517
# Add this medium to the WeakKeyDictionary
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
520
# Weakref callbacks are fired in reverse order of their association
521
# with the referenced object. So we add a weakref *after* adding to
522
# the WeakKeyDict so that we can report the value from it before the
523
# entry is removed by the WeakKeyDict's own callback.
524
ref = weakref.ref(medium, self.done)
526
def increment_call_count(self, params):
527
# Increment the count in the WeakKeyDictionary
528
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
539
value = self.counts[ref]
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
542
# In case this callback is invoked for the same ref twice (by the
543
# weakref callback and by the atexit function), set the call count back
544
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
552
for ref in list(self.counts.keys()):
555
_debug_counter = None
471
558
class SmartClientMedium(SmartMedium):
472
559
"""Smart client is a medium for sending smart protocol requests over."""
482
569
# _remote_version_is_before tracks the bzr version the remote side
483
570
# can be based on what we've seen so far.
484
571
self._remote_version_is_before = None
572
# Install debug hook function if debug flag is set.
573
if 'hpss' in debug.debug_flags:
574
global _debug_counter
575
if _debug_counter is None:
576
_debug_counter = _DebugCounter()
577
_debug_counter.track(self)
486
579
def _is_remote_before(self, version_tuple):
487
580
"""Is it possible the remote side supports RPCs for a given version?
513
606
if (self._remote_version_is_before is not None and
514
607
version_tuple > self._remote_version_is_before):
608
# We have been told that the remote side is older than some version
609
# which is newer than a previously supplied older-than version.
610
# This indicates that some smart verb call is not guarded
611
# appropriately (it should simply not have been tried).
515
612
raise AssertionError(
516
613
"_remember_remote_is_before(%r) called, but "
517
614
"_remember_remote_is_before(%r) was called previously."
556
653
def disconnect(self):
557
654
"""If this medium maintains a persistent connection, close it.
559
656
The default implementation does nothing.
562
659
def remote_path_from_transport(self, transport):
563
660
"""Convert transport into a path suitable for using in a request.
565
662
Note that the resulting remote path doesn't encode the host name or
566
663
anything but path, so it is only safe to use it in requests sent over
567
664
the medium from the matching transport.
624
721
def _accept_bytes(self, bytes):
625
722
"""See SmartClientStreamMedium.accept_bytes."""
626
self._writeable_pipe.write(bytes)
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
self._report_activity(len(bytes), 'write')
628
726
def _flush(self):
629
727
"""See SmartClientStreamMedium._flush()."""
630
self._writeable_pipe.flush()
728
osutils.until_no_eintr(self._writeable_pipe.flush)
632
730
def _read_bytes(self, count):
633
731
"""See SmartClientStreamMedium._read_bytes."""
634
return self._readable_pipe.read(count)
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
self._report_activity(len(bytes), 'read')
637
737
class SmartSSHClientMedium(SmartClientStreamMedium):
638
738
"""A client medium using SSH."""
640
740
def __init__(self, host, port=None, username=None, password=None,
641
741
base=None, vendor=None, bzr_remote_path=None):
642
742
"""Creates a client that will connect on the first use.
644
744
:param vendor: An optional override for the ssh vendor to use. See
645
745
bzrlib.transport.ssh for details on ssh vendors.
647
SmartClientStreamMedium.__init__(self, base)
648
747
self._connected = False
649
748
self._host = host
650
749
self._password = password
651
750
self._port = port
652
751
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
755
# SmartClientStreamMedium stores the repr of this object in its
756
# _DebugCounter so we have to store all the values used in our repr
757
# method before calling the super init.
758
SmartClientStreamMedium.__init__(self, base)
653
759
self._read_from = None
654
760
self._ssh_connection = None
655
761
self._vendor = vendor
656
762
self._write_to = None
657
763
self._bzr_remote_path = bzr_remote_path
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
771
self.__class__.__name__,
664
777
def _accept_bytes(self, bytes):
665
778
"""See SmartClientStreamMedium.accept_bytes."""
666
779
self._ensure_connection()
667
self._write_to.write(bytes)
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
669
783
def disconnect(self):
670
784
"""See SmartClientMedium.disconnect()."""
671
785
if not self._connected:
673
self._read_from.close()
674
self._write_to.close()
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
675
789
self._ssh_connection.close()
676
790
self._connected = False
700
814
if not self._connected:
701
815
raise errors.MediumNotConnected(self)
702
816
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
706
822
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
823
BZR_DEFAULT_INTERFACE = None
708
824
BZR_DEFAULT_PORT = 4155
711
827
class SmartTCPClientMedium(SmartClientStreamMedium):
712
828
"""A client medium using TCP."""
714
830
def __init__(self, host, port, base):
715
831
"""Creates a client that will connect on the first use."""
716
832
SmartClientStreamMedium.__init__(self, base)
722
838
def _accept_bytes(self, bytes):
723
839
"""See SmartClientMedium.accept_bytes."""
724
840
self._ensure_connection()
725
osutils.send_all(self._socket, bytes)
841
osutils.send_all(self._socket, bytes, self._report_activity)
727
843
def disconnect(self):
728
844
"""See SmartClientMedium.disconnect()."""
729
845
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
732
848
self._socket = None
733
849
self._connected = False
736
852
"""Connect this medium if not already connected."""
737
853
if self._connected:
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
741
855
if self._port is None:
742
856
port = BZR_DEFAULT_PORT
744
858
port = int(self._port)
746
self._socket.connect((self._host, port))
747
except socket.error, err:
860
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
861
socket.SOCK_STREAM, 0, 0)
862
except socket.gaierror, (err_num, err_msg):
863
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
864
(self._host, port, err_msg))
865
# Initialize err in case there are no addresses returned:
866
err = socket.error("no address found for %s" % self._host)
867
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
869
self._socket = socket.socket(family, socktype, proto)
870
self._socket.setsockopt(socket.IPPROTO_TCP,
871
socket.TCP_NODELAY, 1)
872
self._socket.connect(sockaddr)
873
except socket.error, err:
874
if self._socket is not None:
879
if self._socket is None:
748
880
# socket errors either have a (string) or (errno, string) as their
750
882
if type(err.args) is str:
766
898
"""See SmartClientMedium.read_bytes."""
767
899
if not self._connected:
768
900
raise errors.MediumNotConnected(self)
769
# We ignore the desired_count because on sockets it's more efficient to
770
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
771
return self._socket.recv(_MAX_READ_SIZE)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
774
905
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
796
927
def _finished_reading(self):
797
928
"""See SmartClientMediumRequest._finished_reading.
799
This clears the _current_request on self._medium to allow a new
930
This clears the _current_request on self._medium to allow a new
800
931
request to be created.
802
933
if self._medium._current_request is not self:
803
934
raise AssertionError()
804
935
self._medium._current_request = None
806
937
def _finished_writing(self):
807
938
"""See SmartClientMediumRequest._finished_writing.
811
942
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')