13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""The 'medium' layer for the smart servers and clients.
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
39
37
from bzrlib import (
47
from bzrlib.smart import client, protocol, request, vfs
45
from bzrlib.smart import client, protocol
48
46
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
53
50
# We must not read any more than 64k at a time so we don't risk "no buffer
54
51
# space available" errors on some platforms. Windows in particular is likely
165
162
self._push_back(excess)
168
def _report_activity(self, bytes, direction):
169
"""Notify that this medium has activity.
171
Implementations should call this from all methods that actually do IO.
172
Be careful that it's not called twice, if one method is implemented on
175
:param bytes: Number of bytes read or written.
176
:param direction: 'read' or 'write' or None.
178
ui.ui_factory.report_transport_activity(self, bytes, direction)
181
166
class SmartServerStreamMedium(SmartMedium):
182
167
"""Handles smart commands coming over a stream.
287
272
self._push_back(protocol.unused_data)
289
274
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
275
# We ignore the desired_count because on sockets it's more efficient to
276
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
293
279
def terminate_due_to_error(self):
294
280
# TODO: This should log to a server log file, but no such thing
295
281
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
283
self.finished = True
299
285
def _write_out(self, bytes):
300
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
286
osutils.send_all(self.socket, bytes)
309
289
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
314
bytes_to_read = protocol.next_read_size()
335
315
if bytes_to_read == 0:
336
316
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
319
bytes = self.read_bytes(bytes_to_read)
341
321
# Connection has been closed.
342
322
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
325
protocol.accept_bytes(bytes)
347
327
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
328
return self._in.read(desired_count)
350
330
def terminate_due_to_error(self):
351
331
# TODO: This should log to a server log file, but no such thing
352
332
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
334
self.finished = True
356
336
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
337
self._out.write(bytes)
360
340
class SmartClientMediumRequest(object):
386
366
def accept_bytes(self, bytes):
387
367
"""Accept bytes for inclusion in this request.
389
This method may not be called after finished_writing() has been
369
This method may not be be called after finished_writing() has been
390
370
called. It depends upon the Medium whether or not the bytes will be
391
371
immediately transmitted. Message based Mediums will tend to buffer the
392
372
bytes until finished_writing() is called.
480
460
if not line.endswith('\n'):
481
461
# end of file encountered reading from server
482
462
raise errors.ConnectionReset(
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
463
"please check connectivity and permissions")
487
466
def _read_line(self):
516
495
medium_repr = repr(medium)
517
496
# Add this medium to the WeakKeyDictionary
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
497
self.counts[medium] = [0, medium_repr]
520
498
# Weakref callbacks are fired in reverse order of their association
521
499
# with the referenced object. So we add a weakref *after* adding to
522
500
# the WeakKeyDict so that we can report the value from it before the
526
504
def increment_call_count(self, params):
527
505
# Increment the count in the WeakKeyDictionary
528
506
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
538
509
def done(self, ref):
539
510
value = self.counts[ref]
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
511
count, medium_repr = value
542
512
# In case this callback is invoked for the same ref twice (by the
543
513
# weakref callback and by the atexit function), set the call count back
544
514
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
517
trace.note('HPSS calls: %d %s', count, medium_repr)
551
519
def flush_all(self):
552
520
for ref in list(self.counts.keys()):
721
689
def _accept_bytes(self, bytes):
722
690
"""See SmartClientStreamMedium.accept_bytes."""
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
self._report_activity(len(bytes), 'write')
691
self._writeable_pipe.write(bytes)
726
693
def _flush(self):
727
694
"""See SmartClientStreamMedium._flush()."""
728
osutils.until_no_eintr(self._writeable_pipe.flush)
695
self._writeable_pipe.flush()
730
697
def _read_bytes(self, count):
731
698
"""See SmartClientStreamMedium._read_bytes."""
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
self._report_activity(len(bytes), 'read')
699
return self._readable_pipe.read(count)
737
702
class SmartSSHClientMedium(SmartClientStreamMedium):
744
709
:param vendor: An optional override for the ssh vendor to use. See
745
710
bzrlib.transport.ssh for details on ssh vendors.
712
SmartClientStreamMedium.__init__(self, base)
747
713
self._connected = False
748
714
self._host = host
749
715
self._password = password
750
716
self._port = port
751
717
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
755
# SmartClientStreamMedium stores the repr of this object in its
756
# _DebugCounter so we have to store all the values used in our repr
757
# method before calling the super init.
758
SmartClientStreamMedium.__init__(self, base)
759
718
self._read_from = None
760
719
self._ssh_connection = None
761
720
self._vendor = vendor
762
721
self._write_to = None
763
722
self._bzr_remote_path = bzr_remote_path
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
771
self.__class__.__name__,
723
if self._bzr_remote_path is None:
724
symbol_versioning.warn(
725
'bzr_remote_path is required as of bzr 0.92',
726
DeprecationWarning, stacklevel=2)
727
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
777
729
def _accept_bytes(self, bytes):
778
730
"""See SmartClientStreamMedium.accept_bytes."""
779
731
self._ensure_connection()
780
osutils.until_no_eintr(self._write_to.write, bytes)
781
self._report_activity(len(bytes), 'write')
732
self._write_to.write(bytes)
783
734
def disconnect(self):
784
735
"""See SmartClientMedium.disconnect()."""
785
736
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
738
self._read_from.close()
739
self._write_to.close()
789
740
self._ssh_connection.close()
790
741
self._connected = False
814
765
if not self._connected:
815
766
raise errors.MediumNotConnected(self)
816
767
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
self._report_activity(len(bytes), 'read')
768
return self._read_from.read(bytes_to_read)
822
771
# Port 4155 is the default port for bzr://, registered with IANA.
838
787
def _accept_bytes(self, bytes):
839
788
"""See SmartClientMedium.accept_bytes."""
840
789
self._ensure_connection()
841
osutils.send_all(self._socket, bytes, self._report_activity)
790
osutils.send_all(self._socket, bytes)
843
792
def disconnect(self):
844
793
"""See SmartClientMedium.disconnect()."""
845
794
if not self._connected:
847
osutils.until_no_eintr(self._socket.close)
848
797
self._socket = None
849
798
self._connected = False
898
847
"""See SmartClientMedium.read_bytes."""
899
848
if not self._connected:
900
849
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
850
# We ignore the desired_count because on sockets it's more efficient to
851
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
853
return self._socket.recv(_MAX_READ_SIZE)
854
except socket.error, e:
855
if len(e.args) and e.args[0] == errno.ECONNRESET:
856
# Callers expect an empty string in that case
905
862
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
942
899
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')