13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""The 'medium' layer for the smart servers and clients.
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
37
39
from bzrlib import (
45
from bzrlib.smart import client, protocol
47
from bzrlib.smart import client, protocol, request, vfs
46
48
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
50
53
# We must not read any more than 64k at a time so we don't risk "no buffer
51
54
# space available" errors on some platforms. Windows in particular is likely
161
164
line, excess = _get_line(self.read_bytes)
162
165
self._push_back(excess)
168
def _report_activity(self, bytes, direction):
169
"""Notify that this medium has activity.
171
Implementations should call this from all methods that actually do IO.
172
Be careful that it's not called twice, if one method is implemented on
175
:param bytes: Number of bytes read or written.
176
:param direction: 'read' or 'write' or None.
178
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
181
class SmartServerStreamMedium(SmartMedium):
167
182
"""Handles smart commands coming over a stream.
268
283
self.finished = True
270
285
protocol.accept_bytes(bytes)
272
287
self._push_back(protocol.unused_data)
274
289
def _read_bytes(self, desired_count):
275
# We ignore the desired_count because on sockets it's more efficient to
276
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
279
293
def terminate_due_to_error(self):
280
294
# TODO: This should log to a server log file, but no such thing
281
295
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
283
297
self.finished = True
285
299
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
300
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
289
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
314
334
bytes_to_read = protocol.next_read_size()
315
335
if bytes_to_read == 0:
316
336
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
319
339
bytes = self.read_bytes(bytes_to_read)
321
341
# Connection has been closed.
322
342
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
325
345
protocol.accept_bytes(bytes)
327
347
def _read_bytes(self, desired_count):
328
return self._in.read(desired_count)
348
return osutils.until_no_eintr(self._in.read, desired_count)
330
350
def terminate_due_to_error(self):
331
351
# TODO: This should log to a server log file, but no such thing
332
352
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
334
354
self.finished = True
336
356
def _write_out(self, bytes):
337
self._out.write(bytes)
357
osutils.until_no_eintr(self._out.write, bytes)
340
360
class SmartClientMediumRequest(object):
350
370
request.finished_reading()
352
372
It is up to the individual SmartClientMedium whether multiple concurrent
353
requests can exist. See SmartClientMedium.get_request to obtain instances
354
of SmartClientMediumRequest, and the concrete Medium you are using for
373
requests can exist. See SmartClientMedium.get_request to obtain instances
374
of SmartClientMediumRequest, and the concrete Medium you are using for
355
375
details on concurrency and pipelining.
366
386
def accept_bytes(self, bytes):
367
387
"""Accept bytes for inclusion in this request.
369
This method may not be be called after finished_writing() has been
389
This method may not be called after finished_writing() has been
370
390
called. It depends upon the Medium whether or not the bytes will be
371
391
immediately transmitted. Message based Mediums will tend to buffer the
372
392
bytes until finished_writing() is called.
460
480
if not line.endswith('\n'):
461
481
# end of file encountered reading from server
462
482
raise errors.ConnectionReset(
463
"please check connectivity and permissions",
464
"(and try -Dhpss if further diagnosis is required)")
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
467
487
def _read_line(self):
468
488
"""Helper for SmartClientMediumRequest.read_line.
470
490
By default this forwards to self._medium._get_line because we are
471
491
operating on the medium's stream.
496
516
medium_repr = repr(medium)
497
517
# Add this medium to the WeakKeyDictionary
498
self.counts[medium] = [0, medium_repr]
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
499
520
# Weakref callbacks are fired in reverse order of their association
500
521
# with the referenced object. So we add a weakref *after* adding to
501
522
# the WeakKeyDict so that we can report the value from it before the
505
526
def increment_call_count(self, params):
506
527
# Increment the count in the WeakKeyDictionary
507
528
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
510
538
def done(self, ref):
511
539
value = self.counts[ref]
512
count, medium_repr = value
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
513
542
# In case this callback is invoked for the same ref twice (by the
514
543
# weakref callback and by the atexit function), set the call count back
515
544
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
518
trace.note('HPSS calls: %d %s', count, medium_repr)
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
520
551
def flush_all(self):
521
552
for ref in list(self.counts.keys()):
524
555
_debug_counter = None
527
558
class SmartClientMedium(SmartMedium):
528
559
"""Smart client is a medium for sending smart protocol requests over."""
575
606
if (self._remote_version_is_before is not None and
576
607
version_tuple > self._remote_version_is_before):
608
# We have been told that the remote side is older than some version
609
# which is newer than a previously supplied older-than version.
610
# This indicates that some smart verb call is not guarded
611
# appropriately (it should simply not have been tried).
577
612
raise AssertionError(
578
613
"_remember_remote_is_before(%r) called, but "
579
614
"_remember_remote_is_before(%r) was called previously."
618
653
def disconnect(self):
619
654
"""If this medium maintains a persistent connection, close it.
621
656
The default implementation does nothing.
624
659
def remote_path_from_transport(self, transport):
625
660
"""Convert transport into a path suitable for using in a request.
627
662
Note that the resulting remote path doesn't encode the host name or
628
663
anything but path, so it is only safe to use it in requests sent over
629
664
the medium from the matching transport.
686
721
def _accept_bytes(self, bytes):
687
722
"""See SmartClientStreamMedium.accept_bytes."""
688
self._writeable_pipe.write(bytes)
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
self._report_activity(len(bytes), 'write')
690
726
def _flush(self):
691
727
"""See SmartClientStreamMedium._flush()."""
692
self._writeable_pipe.flush()
728
osutils.until_no_eintr(self._writeable_pipe.flush)
694
730
def _read_bytes(self, count):
695
731
"""See SmartClientStreamMedium._read_bytes."""
696
return self._readable_pipe.read(count)
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
self._report_activity(len(bytes), 'read')
699
737
class SmartSSHClientMedium(SmartClientStreamMedium):
700
738
"""A client medium using SSH."""
702
740
def __init__(self, host, port=None, username=None, password=None,
703
741
base=None, vendor=None, bzr_remote_path=None):
704
742
"""Creates a client that will connect on the first use.
706
744
:param vendor: An optional override for the ssh vendor to use. See
707
745
bzrlib.transport.ssh for details on ssh vendors.
709
SmartClientStreamMedium.__init__(self, base)
710
747
self._connected = False
711
748
self._host = host
712
749
self._password = password
713
750
self._port = port
714
751
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
755
# SmartClientStreamMedium stores the repr of this object in its
756
# _DebugCounter so we have to store all the values used in our repr
757
# method before calling the super init.
758
SmartClientStreamMedium.__init__(self, base)
715
759
self._read_from = None
716
760
self._ssh_connection = None
717
761
self._vendor = vendor
718
762
self._write_to = None
719
763
self._bzr_remote_path = bzr_remote_path
720
if self._bzr_remote_path is None:
721
symbol_versioning.warn(
722
'bzr_remote_path is required as of bzr 0.92',
723
DeprecationWarning, stacklevel=2)
724
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
771
self.__class__.__name__,
726
777
def _accept_bytes(self, bytes):
727
778
"""See SmartClientStreamMedium.accept_bytes."""
728
779
self._ensure_connection()
729
self._write_to.write(bytes)
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
731
783
def disconnect(self):
732
784
"""See SmartClientMedium.disconnect()."""
733
785
if not self._connected:
735
self._read_from.close()
736
self._write_to.close()
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
737
789
self._ssh_connection.close()
738
790
self._connected = False
762
814
if not self._connected:
763
815
raise errors.MediumNotConnected(self)
764
816
bytes_to_read = min(count, _MAX_READ_SIZE)
765
return self._read_from.read(bytes_to_read)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
768
822
# Port 4155 is the default port for bzr://, registered with IANA.
784
838
def _accept_bytes(self, bytes):
785
839
"""See SmartClientMedium.accept_bytes."""
786
840
self._ensure_connection()
787
osutils.send_all(self._socket, bytes)
841
osutils.send_all(self._socket, bytes, self._report_activity)
789
843
def disconnect(self):
790
844
"""See SmartClientMedium.disconnect()."""
791
845
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
794
848
self._socket = None
795
849
self._connected = False
804
858
port = int(self._port)
806
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
860
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
807
861
socket.SOCK_STREAM, 0, 0)
808
862
except socket.gaierror, (err_num, err_msg):
809
863
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
813
867
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
815
869
self._socket = socket.socket(family, socktype, proto)
816
self._socket.setsockopt(socket.IPPROTO_TCP,
870
self._socket.setsockopt(socket.IPPROTO_TCP,
817
871
socket.TCP_NODELAY, 1)
818
872
self._socket.connect(sockaddr)
819
873
except socket.error, err:
844
898
"""See SmartClientMedium.read_bytes."""
845
899
if not self._connected:
846
900
raise errors.MediumNotConnected(self)
847
# We ignore the desired_count because on sockets it's more efficient to
848
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
850
return self._socket.recv(_MAX_READ_SIZE)
851
except socket.error, e:
852
if len(e.args) and e.args[0] == errno.ECONNRESET:
853
# Callers expect an empty string in that case
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
859
905
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
881
927
def _finished_reading(self):
882
928
"""See SmartClientMediumRequest._finished_reading.
884
This clears the _current_request on self._medium to allow a new
930
This clears the _current_request on self._medium to allow a new
885
931
request to be created.
887
933
if self._medium._current_request is not self:
888
934
raise AssertionError()
889
935
self._medium._current_request = None
891
937
def _finished_writing(self):
892
938
"""See SmartClientMediumRequest._finished_writing.
896
942
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')