13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""The 'medium' layer for the smart servers and clients.
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
37
39
from bzrlib import (
45
from bzrlib.smart import client, protocol
47
from bzrlib.smart import client, protocol, request, vfs
46
48
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
50
53
# We must not read any more than 64k at a time so we don't risk "no buffer
51
54
# space available" errors on some platforms. Windows in particular is likely
161
164
line, excess = _get_line(self.read_bytes)
162
165
self._push_back(excess)
168
def _report_activity(self, bytes, direction):
169
"""Notify that this medium has activity.
171
Implementations should call this from all methods that actually do IO.
172
Be careful that it's not called twice, if one method is implemented on
175
:param bytes: Number of bytes read or written.
176
:param direction: 'read' or 'write' or None.
178
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
181
class SmartServerStreamMedium(SmartMedium):
167
182
"""Handles smart commands coming over a stream.
268
283
self.finished = True
270
285
protocol.accept_bytes(bytes)
272
287
self._push_back(protocol.unused_data)
274
289
def _read_bytes(self, desired_count):
275
# We ignore the desired_count because on sockets it's more efficient to
276
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
279
293
def terminate_due_to_error(self):
280
294
# TODO: This should log to a server log file, but no such thing
281
295
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
283
297
self.finished = True
285
299
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
300
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
289
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
314
334
bytes_to_read = protocol.next_read_size()
315
335
if bytes_to_read == 0:
316
336
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
319
339
bytes = self.read_bytes(bytes_to_read)
321
341
# Connection has been closed.
322
342
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
325
345
protocol.accept_bytes(bytes)
327
347
def _read_bytes(self, desired_count):
328
return self._in.read(desired_count)
348
return osutils.until_no_eintr(self._in.read, desired_count)
330
350
def terminate_due_to_error(self):
331
351
# TODO: This should log to a server log file, but no such thing
332
352
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
334
354
self.finished = True
336
356
def _write_out(self, bytes):
337
self._out.write(bytes)
357
osutils.until_no_eintr(self._out.write, bytes)
340
360
class SmartClientMediumRequest(object):
350
370
request.finished_reading()
352
372
It is up to the individual SmartClientMedium whether multiple concurrent
353
requests can exist. See SmartClientMedium.get_request to obtain instances
354
of SmartClientMediumRequest, and the concrete Medium you are using for
373
requests can exist. See SmartClientMedium.get_request to obtain instances
374
of SmartClientMediumRequest, and the concrete Medium you are using for
355
375
details on concurrency and pipelining.
366
386
def accept_bytes(self, bytes):
367
387
"""Accept bytes for inclusion in this request.
369
This method may not be be called after finished_writing() has been
389
This method may not be called after finished_writing() has been
370
390
called. It depends upon the Medium whether or not the bytes will be
371
391
immediately transmitted. Message based Mediums will tend to buffer the
372
392
bytes until finished_writing() is called.
460
480
if not line.endswith('\n'):
461
481
# end of file encountered reading from server
462
482
raise errors.ConnectionReset(
463
"please check connectivity and permissions",
464
"(and try -Dhpss if further diagnosis is required)")
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
467
487
def _read_line(self):
468
488
"""Helper for SmartClientMediumRequest.read_line.
470
490
By default this forwards to self._medium._get_line because we are
471
491
operating on the medium's stream.
496
516
medium_repr = repr(medium)
497
517
# Add this medium to the WeakKeyDictionary
498
self.counts[medium] = [0, medium_repr]
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
499
520
# Weakref callbacks are fired in reverse order of their association
500
521
# with the referenced object. So we add a weakref *after* adding to
501
522
# the WeakKeyDict so that we can report the value from it before the
505
526
def increment_call_count(self, params):
506
527
# Increment the count in the WeakKeyDictionary
507
528
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
510
538
def done(self, ref):
511
539
value = self.counts[ref]
512
count, medium_repr = value
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
513
542
# In case this callback is invoked for the same ref twice (by the
514
543
# weakref callback and by the atexit function), set the call count back
515
544
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
518
trace.note('HPSS calls: %d %s', count, medium_repr)
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
520
551
def flush_all(self):
521
552
for ref in list(self.counts.keys()):
524
555
_debug_counter = None
527
558
class SmartClientMedium(SmartMedium):
528
559
"""Smart client is a medium for sending smart protocol requests over."""
575
606
if (self._remote_version_is_before is not None and
576
607
version_tuple > self._remote_version_is_before):
608
# We have been told that the remote side is older than some version
609
# which is newer than a previously supplied older-than version.
610
# This indicates that some smart verb call is not guarded
611
# appropriately (it should simply not have been tried).
577
612
raise AssertionError(
578
613
"_remember_remote_is_before(%r) called, but "
579
614
"_remember_remote_is_before(%r) was called previously."
618
653
def disconnect(self):
619
654
"""If this medium maintains a persistent connection, close it.
621
656
The default implementation does nothing.
624
659
def remote_path_from_transport(self, transport):
625
660
"""Convert transport into a path suitable for using in a request.
627
662
Note that the resulting remote path doesn't encode the host name or
628
663
anything but path, so it is only safe to use it in requests sent over
629
664
the medium from the matching transport.
686
721
def _accept_bytes(self, bytes):
687
722
"""See SmartClientStreamMedium.accept_bytes."""
688
self._writeable_pipe.write(bytes)
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
self._report_activity(len(bytes), 'write')
690
726
def _flush(self):
691
727
"""See SmartClientStreamMedium._flush()."""
692
self._writeable_pipe.flush()
728
osutils.until_no_eintr(self._writeable_pipe.flush)
694
730
def _read_bytes(self, count):
695
731
"""See SmartClientStreamMedium._read_bytes."""
696
return self._readable_pipe.read(count)
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
self._report_activity(len(bytes), 'read')
699
737
class SmartSSHClientMedium(SmartClientStreamMedium):
700
738
"""A client medium using SSH."""
702
740
def __init__(self, host, port=None, username=None, password=None,
703
741
base=None, vendor=None, bzr_remote_path=None):
704
742
"""Creates a client that will connect on the first use.
706
744
:param vendor: An optional override for the ssh vendor to use. See
707
745
bzrlib.transport.ssh for details on ssh vendors.
709
SmartClientStreamMedium.__init__(self, base)
710
747
self._connected = False
711
748
self._host = host
712
749
self._password = password
713
750
self._port = port
714
751
self._username = username
752
# SmartClientStreamMedium stores the repr of this object in its
753
# _DebugCounter so we have to store all the values used in our repr
754
# method before calling the super init.
755
SmartClientStreamMedium.__init__(self, base)
715
756
self._read_from = None
716
757
self._ssh_connection = None
717
758
self._vendor = vendor
718
759
self._write_to = None
719
760
self._bzr_remote_path = bzr_remote_path
720
if self._bzr_remote_path is None:
721
symbol_versioning.warn(
722
'bzr_remote_path is required as of bzr 0.92',
723
DeprecationWarning, stacklevel=2)
724
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
761
# for the benefit of progress making a short description of this
763
self._scheme = 'bzr+ssh'
766
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
767
self.__class__.__name__,
726
773
def _accept_bytes(self, bytes):
727
774
"""See SmartClientStreamMedium.accept_bytes."""
728
775
self._ensure_connection()
729
self._write_to.write(bytes)
776
osutils.until_no_eintr(self._write_to.write, bytes)
777
self._report_activity(len(bytes), 'write')
731
779
def disconnect(self):
732
780
"""See SmartClientMedium.disconnect()."""
733
781
if not self._connected:
735
self._read_from.close()
736
self._write_to.close()
783
osutils.until_no_eintr(self._read_from.close)
784
osutils.until_no_eintr(self._write_to.close)
737
785
self._ssh_connection.close()
738
786
self._connected = False
762
810
if not self._connected:
763
811
raise errors.MediumNotConnected(self)
764
812
bytes_to_read = min(count, _MAX_READ_SIZE)
765
return self._read_from.read(bytes_to_read)
813
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
814
self._report_activity(len(bytes), 'read')
768
818
# Port 4155 is the default port for bzr://, registered with IANA.
784
834
def _accept_bytes(self, bytes):
785
835
"""See SmartClientMedium.accept_bytes."""
786
836
self._ensure_connection()
787
osutils.send_all(self._socket, bytes)
837
osutils.send_all(self._socket, bytes, self._report_activity)
789
839
def disconnect(self):
790
840
"""See SmartClientMedium.disconnect()."""
791
841
if not self._connected:
843
osutils.until_no_eintr(self._socket.close)
794
844
self._socket = None
795
845
self._connected = False
804
854
port = int(self._port)
806
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
856
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
807
857
socket.SOCK_STREAM, 0, 0)
808
858
except socket.gaierror, (err_num, err_msg):
809
859
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
813
863
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
815
865
self._socket = socket.socket(family, socktype, proto)
816
self._socket.setsockopt(socket.IPPROTO_TCP,
866
self._socket.setsockopt(socket.IPPROTO_TCP,
817
867
socket.TCP_NODELAY, 1)
818
868
self._socket.connect(sockaddr)
819
869
except socket.error, err:
844
894
"""See SmartClientMedium.read_bytes."""
845
895
if not self._connected:
846
896
raise errors.MediumNotConnected(self)
847
# We ignore the desired_count because on sockets it's more efficient to
848
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
850
return self._socket.recv(_MAX_READ_SIZE)
851
except socket.error, e:
852
if len(e.args) and e.args[0] == errno.ECONNRESET:
853
# Callers expect an empty string in that case
897
return _read_bytes_from_socket(
898
self._socket.recv, count, self._report_activity)
859
901
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
881
923
def _finished_reading(self):
882
924
"""See SmartClientMediumRequest._finished_reading.
884
This clears the _current_request on self._medium to allow a new
926
This clears the _current_request on self._medium to allow a new
885
927
request to be created.
887
929
if self._medium._current_request is not self:
888
930
raise AssertionError()
889
931
self._medium._current_request = None
891
933
def _finished_writing(self):
892
934
"""See SmartClientMediumRequest._finished_writing.
896
938
self._medium._flush()
941
def _read_bytes_from_socket(sock, desired_count, report_activity):
942
# We ignore the desired_count because on sockets it's more efficient to
943
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
945
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
946
except socket.error, e:
947
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
948
# The connection was closed by the other side. Callers expect an
949
# empty string to signal end-of-stream.
954
report_activity(len(bytes), 'read')