13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""The 'medium' layer for the smart servers and clients.
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
39
34
from bzrlib import (
47
from bzrlib.smart import client, protocol, request, vfs
40
from bzrlib.smart import protocol
48
41
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
53
45
# We must not read any more than 64k at a time so we don't risk "no buffer
54
46
# space available" errors on some platforms. Windows in particular is likely
164
156
line, excess = _get_line(self.read_bytes)
165
157
self._push_back(excess)
168
def _report_activity(self, bytes, direction):
169
"""Notify that this medium has activity.
171
Implementations should call this from all methods that actually do IO.
172
Be careful that it's not called twice, if one method is implemented on
175
:param bytes: Number of bytes read or written.
176
:param direction: 'read' or 'write' or None.
178
ui.ui_factory.report_transport_activity(self, bytes, direction)
181
161
class SmartServerStreamMedium(SmartMedium):
182
162
"""Handles smart commands coming over a stream.
283
263
self.finished = True
285
265
protocol.accept_bytes(bytes)
287
267
self._push_back(protocol.unused_data)
289
269
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
270
# We ignore the desired_count because on sockets it's more efficient to
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
293
274
def terminate_due_to_error(self):
294
275
# TODO: This should log to a server log file, but no such thing
295
276
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
278
self.finished = True
299
280
def _write_out(self, bytes):
300
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
281
osutils.send_all(self.socket, bytes)
309
284
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
309
bytes_to_read = protocol.next_read_size()
335
310
if bytes_to_read == 0:
336
311
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
314
bytes = self.read_bytes(bytes_to_read)
341
316
# Connection has been closed.
342
317
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
320
protocol.accept_bytes(bytes)
347
322
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
323
return self._in.read(desired_count)
350
325
def terminate_due_to_error(self):
351
326
# TODO: This should log to a server log file, but no such thing
352
327
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
329
self.finished = True
356
331
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
332
self._out.write(bytes)
360
335
class SmartClientMediumRequest(object):
370
345
request.finished_reading()
372
347
It is up to the individual SmartClientMedium whether multiple concurrent
373
requests can exist. See SmartClientMedium.get_request to obtain instances
374
of SmartClientMediumRequest, and the concrete Medium you are using for
348
requests can exist. See SmartClientMedium.get_request to obtain instances
349
of SmartClientMediumRequest, and the concrete Medium you are using for
375
350
details on concurrency and pipelining.
386
361
def accept_bytes(self, bytes):
387
362
"""Accept bytes for inclusion in this request.
389
This method may not be called after finished_writing() has been
364
This method may not be be called after finished_writing() has been
390
365
called. It depends upon the Medium whether or not the bytes will be
391
366
immediately transmitted. Message based Mediums will tend to buffer the
392
367
bytes until finished_writing() is called.
480
455
if not line.endswith('\n'):
481
456
# end of file encountered reading from server
482
457
raise errors.ConnectionReset(
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
487
462
def _read_line(self):
488
463
"""Helper for SmartClientMediumRequest.read_line.
490
465
By default this forwards to self._medium._get_line because we are
491
466
operating on the medium's stream.
493
468
return self._medium._get_line()
496
class _DebugCounter(object):
497
"""An object that counts the HPSS calls made to each client medium.
499
When a medium is garbage-collected, or failing that when atexit functions
500
are run, the total number of calls made on that medium are reported via
505
self.counts = weakref.WeakKeyDictionary()
506
client._SmartClient.hooks.install_named_hook(
507
'call', self.increment_call_count, 'hpss call counter')
508
atexit.register(self.flush_all)
510
def track(self, medium):
511
"""Start tracking calls made to a medium.
513
This only keeps a weakref to the medium, so shouldn't affect the
516
medium_repr = repr(medium)
517
# Add this medium to the WeakKeyDictionary
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
520
# Weakref callbacks are fired in reverse order of their association
521
# with the referenced object. So we add a weakref *after* adding to
522
# the WeakKeyDict so that we can report the value from it before the
523
# entry is removed by the WeakKeyDict's own callback.
524
ref = weakref.ref(medium, self.done)
526
def increment_call_count(self, params):
527
# Increment the count in the WeakKeyDictionary
528
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
539
value = self.counts[ref]
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
542
# In case this callback is invoked for the same ref twice (by the
543
# weakref callback and by the atexit function), set the call count back
544
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
552
for ref in list(self.counts.keys()):
555
_debug_counter = None
558
471
class SmartClientMedium(SmartMedium):
559
472
"""Smart client is a medium for sending smart protocol requests over."""
569
482
# _remote_version_is_before tracks the bzr version the remote side
570
483
# can be based on what we've seen so far.
571
484
self._remote_version_is_before = None
572
# Install debug hook function if debug flag is set.
573
if 'hpss' in debug.debug_flags:
574
global _debug_counter
575
if _debug_counter is None:
576
_debug_counter = _DebugCounter()
577
_debug_counter.track(self)
579
486
def _is_remote_before(self, version_tuple):
580
487
"""Is it possible the remote side supports RPCs for a given version?
606
513
if (self._remote_version_is_before is not None and
607
514
version_tuple > self._remote_version_is_before):
608
# We have been told that the remote side is older than some version
609
# which is newer than a previously supplied older-than version.
610
# This indicates that some smart verb call is not guarded
611
# appropriately (it should simply not have been tried).
612
515
raise AssertionError(
613
516
"_remember_remote_is_before(%r) called, but "
614
517
"_remember_remote_is_before(%r) was called previously."
653
556
def disconnect(self):
654
557
"""If this medium maintains a persistent connection, close it.
656
559
The default implementation does nothing.
659
562
def remote_path_from_transport(self, transport):
660
563
"""Convert transport into a path suitable for using in a request.
662
565
Note that the resulting remote path doesn't encode the host name or
663
566
anything but path, so it is only safe to use it in requests sent over
664
567
the medium from the matching transport.
721
624
def _accept_bytes(self, bytes):
722
625
"""See SmartClientStreamMedium.accept_bytes."""
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
self._report_activity(len(bytes), 'write')
626
self._writeable_pipe.write(bytes)
726
628
def _flush(self):
727
629
"""See SmartClientStreamMedium._flush()."""
728
osutils.until_no_eintr(self._writeable_pipe.flush)
630
self._writeable_pipe.flush()
730
632
def _read_bytes(self, count):
731
633
"""See SmartClientStreamMedium._read_bytes."""
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
self._report_activity(len(bytes), 'read')
634
return self._readable_pipe.read(count)
737
637
class SmartSSHClientMedium(SmartClientStreamMedium):
738
638
"""A client medium using SSH."""
740
640
def __init__(self, host, port=None, username=None, password=None,
741
641
base=None, vendor=None, bzr_remote_path=None):
742
642
"""Creates a client that will connect on the first use.
744
644
:param vendor: An optional override for the ssh vendor to use. See
745
645
bzrlib.transport.ssh for details on ssh vendors.
647
SmartClientStreamMedium.__init__(self, base)
747
648
self._connected = False
748
649
self._host = host
749
650
self._password = password
750
651
self._port = port
751
652
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
755
# SmartClientStreamMedium stores the repr of this object in its
756
# _DebugCounter so we have to store all the values used in our repr
757
# method before calling the super init.
758
SmartClientStreamMedium.__init__(self, base)
759
653
self._read_from = None
760
654
self._ssh_connection = None
761
655
self._vendor = vendor
762
656
self._write_to = None
763
657
self._bzr_remote_path = bzr_remote_path
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
771
self.__class__.__name__,
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
777
664
def _accept_bytes(self, bytes):
778
665
"""See SmartClientStreamMedium.accept_bytes."""
779
666
self._ensure_connection()
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
667
self._write_to.write(bytes)
783
669
def disconnect(self):
784
670
"""See SmartClientMedium.disconnect()."""
785
671
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
673
self._read_from.close()
674
self._write_to.close()
789
675
self._ssh_connection.close()
790
676
self._connected = False
814
700
if not self._connected:
815
701
raise errors.MediumNotConnected(self)
816
702
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
703
return self._read_from.read(bytes_to_read)
822
706
# Port 4155 is the default port for bzr://, registered with IANA.
823
BZR_DEFAULT_INTERFACE = None
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
824
708
BZR_DEFAULT_PORT = 4155
827
711
class SmartTCPClientMedium(SmartClientStreamMedium):
828
712
"""A client medium using TCP."""
830
714
def __init__(self, host, port, base):
831
715
"""Creates a client that will connect on the first use."""
832
716
SmartClientStreamMedium.__init__(self, base)
838
722
def _accept_bytes(self, bytes):
839
723
"""See SmartClientMedium.accept_bytes."""
840
724
self._ensure_connection()
841
osutils.send_all(self._socket, bytes, self._report_activity)
725
osutils.send_all(self._socket, bytes)
843
727
def disconnect(self):
844
728
"""See SmartClientMedium.disconnect()."""
845
729
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
848
732
self._socket = None
849
733
self._connected = False
852
736
"""Connect this medium if not already connected."""
853
737
if self._connected:
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
855
741
if self._port is None:
856
742
port = BZR_DEFAULT_PORT
858
744
port = int(self._port)
860
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
861
socket.SOCK_STREAM, 0, 0)
862
except socket.gaierror, (err_num, err_msg):
863
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
864
(self._host, port, err_msg))
865
# Initialize err in case there are no addresses returned:
866
err = socket.error("no address found for %s" % self._host)
867
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
869
self._socket = socket.socket(family, socktype, proto)
870
self._socket.setsockopt(socket.IPPROTO_TCP,
871
socket.TCP_NODELAY, 1)
872
self._socket.connect(sockaddr)
873
except socket.error, err:
874
if self._socket is not None:
879
if self._socket is None:
746
self._socket.connect((self._host, port))
747
except socket.error, err:
880
748
# socket errors either have a (string) or (errno, string) as their
882
750
if type(err.args) is str:
898
766
"""See SmartClientMedium.read_bytes."""
899
767
if not self._connected:
900
768
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
769
# We ignore the desired_count because on sockets it's more efficient to
770
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
771
return self._socket.recv(_MAX_READ_SIZE)
905
774
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
927
796
def _finished_reading(self):
928
797
"""See SmartClientMediumRequest._finished_reading.
930
This clears the _current_request on self._medium to allow a new
799
This clears the _current_request on self._medium to allow a new
931
800
request to be created.
933
802
if self._medium._current_request is not self:
934
803
raise AssertionError()
935
804
self._medium._current_request = None
937
806
def _finished_writing(self):
938
807
"""See SmartClientMediumRequest._finished_writing.
942
811
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')