13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""The 'medium' layer for the smart servers and clients.
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
39
37
from bzrlib import (
47
from bzrlib.smart import client, protocol, request, vfs
45
from bzrlib.smart import client, protocol
48
46
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
53
50
# We must not read any more than 64k at a time so we don't risk "no buffer
54
51
# space available" errors on some platforms. Windows in particular is likely
164
161
line, excess = _get_line(self.read_bytes)
165
162
self._push_back(excess)
168
def _report_activity(self, bytes, direction):
169
"""Notify that this medium has activity.
171
Implementations should call this from all methods that actually do IO.
172
Be careful that it's not called twice, if one method is implemented on
175
:param bytes: Number of bytes read or written.
176
:param direction: 'read' or 'write' or None.
178
ui.ui_factory.report_transport_activity(self, bytes, direction)
181
166
class SmartServerStreamMedium(SmartMedium):
182
167
"""Handles smart commands coming over a stream.
283
268
self.finished = True
285
270
protocol.accept_bytes(bytes)
287
272
self._push_back(protocol.unused_data)
289
274
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
275
# We ignore the desired_count because on sockets it's more efficient to
276
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return self.socket.recv(_MAX_READ_SIZE)
293
279
def terminate_due_to_error(self):
294
280
# TODO: This should log to a server log file, but no such thing
295
281
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
283
self.finished = True
299
285
def _write_out(self, bytes):
300
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
286
osutils.send_all(self.socket, bytes)
309
289
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
314
bytes_to_read = protocol.next_read_size()
335
315
if bytes_to_read == 0:
336
316
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
319
bytes = self.read_bytes(bytes_to_read)
341
321
# Connection has been closed.
342
322
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
325
protocol.accept_bytes(bytes)
347
327
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
328
return self._in.read(desired_count)
350
330
def terminate_due_to_error(self):
351
331
# TODO: This should log to a server log file, but no such thing
352
332
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
334
self.finished = True
356
336
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
337
self._out.write(bytes)
360
340
class SmartClientMediumRequest(object):
370
350
request.finished_reading()
372
352
It is up to the individual SmartClientMedium whether multiple concurrent
373
requests can exist. See SmartClientMedium.get_request to obtain instances
374
of SmartClientMediumRequest, and the concrete Medium you are using for
353
requests can exist. See SmartClientMedium.get_request to obtain instances
354
of SmartClientMediumRequest, and the concrete Medium you are using for
375
355
details on concurrency and pipelining.
386
366
def accept_bytes(self, bytes):
387
367
"""Accept bytes for inclusion in this request.
389
This method may not be called after finished_writing() has been
369
This method may not be be called after finished_writing() has been
390
370
called. It depends upon the Medium whether or not the bytes will be
391
371
immediately transmitted. Message based Mediums will tend to buffer the
392
372
bytes until finished_writing() is called.
480
460
if not line.endswith('\n'):
481
461
# end of file encountered reading from server
482
462
raise errors.ConnectionReset(
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
463
"please check connectivity and permissions",
464
"(and try -Dhpss if further diagnosis is required)")
487
467
def _read_line(self):
488
468
"""Helper for SmartClientMediumRequest.read_line.
490
470
By default this forwards to self._medium._get_line because we are
491
471
operating on the medium's stream.
516
496
medium_repr = repr(medium)
517
497
# Add this medium to the WeakKeyDictionary
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
498
self.counts[medium] = [0, medium_repr]
520
499
# Weakref callbacks are fired in reverse order of their association
521
500
# with the referenced object. So we add a weakref *after* adding to
522
501
# the WeakKeyDict so that we can report the value from it before the
526
505
def increment_call_count(self, params):
527
506
# Increment the count in the WeakKeyDictionary
528
507
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
538
510
def done(self, ref):
539
511
value = self.counts[ref]
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
512
count, medium_repr = value
542
513
# In case this callback is invoked for the same ref twice (by the
543
514
# weakref callback and by the atexit function), set the call count back
544
515
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
518
trace.note('HPSS calls: %d %s', count, medium_repr)
551
520
def flush_all(self):
552
521
for ref in list(self.counts.keys()):
555
524
_debug_counter = None
558
527
class SmartClientMedium(SmartMedium):
559
528
"""Smart client is a medium for sending smart protocol requests over."""
606
575
if (self._remote_version_is_before is not None and
607
576
version_tuple > self._remote_version_is_before):
608
# We have been told that the remote side is older than some version
609
# which is newer than a previously supplied older-than version.
610
# This indicates that some smart verb call is not guarded
611
# appropriately (it should simply not have been tried).
612
577
raise AssertionError(
613
578
"_remember_remote_is_before(%r) called, but "
614
579
"_remember_remote_is_before(%r) was called previously."
653
618
def disconnect(self):
654
619
"""If this medium maintains a persistent connection, close it.
656
621
The default implementation does nothing.
659
624
def remote_path_from_transport(self, transport):
660
625
"""Convert transport into a path suitable for using in a request.
662
627
Note that the resulting remote path doesn't encode the host name or
663
628
anything but path, so it is only safe to use it in requests sent over
664
629
the medium from the matching transport.
721
686
def _accept_bytes(self, bytes):
722
687
"""See SmartClientStreamMedium.accept_bytes."""
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
self._report_activity(len(bytes), 'write')
688
self._writeable_pipe.write(bytes)
726
690
def _flush(self):
727
691
"""See SmartClientStreamMedium._flush()."""
728
osutils.until_no_eintr(self._writeable_pipe.flush)
692
self._writeable_pipe.flush()
730
694
def _read_bytes(self, count):
731
695
"""See SmartClientStreamMedium._read_bytes."""
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
self._report_activity(len(bytes), 'read')
696
return self._readable_pipe.read(count)
737
699
class SmartSSHClientMedium(SmartClientStreamMedium):
738
700
"""A client medium using SSH."""
740
702
def __init__(self, host, port=None, username=None, password=None,
741
703
base=None, vendor=None, bzr_remote_path=None):
742
704
"""Creates a client that will connect on the first use.
744
706
:param vendor: An optional override for the ssh vendor to use. See
745
707
bzrlib.transport.ssh for details on ssh vendors.
709
SmartClientStreamMedium.__init__(self, base)
747
710
self._connected = False
748
711
self._host = host
749
712
self._password = password
750
713
self._port = port
751
714
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
755
# SmartClientStreamMedium stores the repr of this object in its
756
# _DebugCounter so we have to store all the values used in our repr
757
# method before calling the super init.
758
SmartClientStreamMedium.__init__(self, base)
759
715
self._read_from = None
760
716
self._ssh_connection = None
761
717
self._vendor = vendor
762
718
self._write_to = None
763
719
self._bzr_remote_path = bzr_remote_path
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
771
self.__class__.__name__,
720
if self._bzr_remote_path is None:
721
symbol_versioning.warn(
722
'bzr_remote_path is required as of bzr 0.92',
723
DeprecationWarning, stacklevel=2)
724
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
777
726
def _accept_bytes(self, bytes):
778
727
"""See SmartClientStreamMedium.accept_bytes."""
779
728
self._ensure_connection()
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
729
self._write_to.write(bytes)
783
731
def disconnect(self):
784
732
"""See SmartClientMedium.disconnect()."""
785
733
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
735
self._read_from.close()
736
self._write_to.close()
789
737
self._ssh_connection.close()
790
738
self._connected = False
814
762
if not self._connected:
815
763
raise errors.MediumNotConnected(self)
816
764
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
765
return self._read_from.read(bytes_to_read)
822
768
# Port 4155 is the default port for bzr://, registered with IANA.
838
784
def _accept_bytes(self, bytes):
839
785
"""See SmartClientMedium.accept_bytes."""
840
786
self._ensure_connection()
841
osutils.send_all(self._socket, bytes, self._report_activity)
787
osutils.send_all(self._socket, bytes)
843
789
def disconnect(self):
844
790
"""See SmartClientMedium.disconnect()."""
845
791
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
848
794
self._socket = None
849
795
self._connected = False
858
804
port = int(self._port)
860
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
806
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
861
807
socket.SOCK_STREAM, 0, 0)
862
808
except socket.gaierror, (err_num, err_msg):
863
809
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
867
813
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
869
815
self._socket = socket.socket(family, socktype, proto)
870
self._socket.setsockopt(socket.IPPROTO_TCP,
816
self._socket.setsockopt(socket.IPPROTO_TCP,
871
817
socket.TCP_NODELAY, 1)
872
818
self._socket.connect(sockaddr)
873
819
except socket.error, err:
898
844
"""See SmartClientMedium.read_bytes."""
899
845
if not self._connected:
900
846
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
847
# We ignore the desired_count because on sockets it's more efficient to
848
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
850
return self._socket.recv(_MAX_READ_SIZE)
851
except socket.error, e:
852
if len(e.args) and e.args[0] == errno.ECONNRESET:
853
# Callers expect an empty string in that case
905
859
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
927
881
def _finished_reading(self):
928
882
"""See SmartClientMediumRequest._finished_reading.
930
This clears the _current_request on self._medium to allow a new
884
This clears the _current_request on self._medium to allow a new
931
885
request to be created.
933
887
if self._medium._current_request is not self:
934
888
raise AssertionError()
935
889
self._medium._current_request = None
937
891
def _finished_writing(self):
938
892
"""See SmartClientMediumRequest._finished_writing.
942
896
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')