33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
39
37
from bzrlib import (
47
from bzrlib.smart import client, protocol, request, vfs
46
from bzrlib.smart import client, protocol
48
47
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
53
51
# We must not read any more than 64k at a time so we don't risk "no buffer
54
52
# space available" errors on some platforms. Windows in particular is likely
287
285
self._push_back(protocol.unused_data)
289
287
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
288
# We ignore the desired_count because on sockets it's more efficient to
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
293
294
def terminate_due_to_error(self):
294
295
# TODO: This should log to a server log file, but no such thing
295
296
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
298
self.finished = True
299
300
def _write_out(self, bytes):
300
tstart = osutils.timer_func()
301
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
309
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
329
bytes_to_read = protocol.next_read_size()
335
330
if bytes_to_read == 0:
336
331
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
334
bytes = self.read_bytes(bytes_to_read)
341
336
# Connection has been closed.
342
337
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
340
protocol.accept_bytes(bytes)
347
342
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
343
return self._in.read(desired_count)
350
345
def terminate_due_to_error(self):
351
346
# TODO: This should log to a server log file, but no such thing
352
347
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
349
self.finished = True
356
351
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
352
self._out.write(bytes)
360
355
class SmartClientMediumRequest(object):
480
475
if not line.endswith('\n'):
481
476
# end of file encountered reading from server
482
477
raise errors.ConnectionReset(
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
478
"please check connectivity and permissions")
487
481
def _read_line(self):
516
510
medium_repr = repr(medium)
517
511
# Add this medium to the WeakKeyDictionary
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
512
self.counts[medium] = [0, medium_repr]
520
513
# Weakref callbacks are fired in reverse order of their association
521
514
# with the referenced object. So we add a weakref *after* adding to
522
515
# the WeakKeyDict so that we can report the value from it before the
526
519
def increment_call_count(self, params):
527
520
# Increment the count in the WeakKeyDictionary
528
521
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
538
524
def done(self, ref):
539
525
value = self.counts[ref]
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
526
count, medium_repr = value
542
527
# In case this callback is invoked for the same ref twice (by the
543
528
# weakref callback and by the atexit function), set the call count back
544
529
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
532
trace.note('HPSS calls: %d %s', count, medium_repr)
551
534
def flush_all(self):
552
535
for ref in list(self.counts.keys()):
721
704
def _accept_bytes(self, bytes):
722
705
"""See SmartClientStreamMedium.accept_bytes."""
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
706
self._writeable_pipe.write(bytes)
724
707
self._report_activity(len(bytes), 'write')
726
709
def _flush(self):
727
710
"""See SmartClientStreamMedium._flush()."""
728
osutils.until_no_eintr(self._writeable_pipe.flush)
711
self._writeable_pipe.flush()
730
713
def _read_bytes(self, count):
731
714
"""See SmartClientStreamMedium._read_bytes."""
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
715
bytes = self._readable_pipe.read(count)
733
716
self._report_activity(len(bytes), 'read')
749
732
self._password = password
750
733
self._port = port
751
734
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
755
735
# SmartClientStreamMedium stores the repr of this object in its
756
736
# _DebugCounter so we have to store all the values used in our repr
757
737
# method before calling the super init.
761
741
self._vendor = vendor
762
742
self._write_to = None
763
743
self._bzr_remote_path = bzr_remote_path
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
765
748
def __repr__(self):
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
771
750
self.__class__.__name__,
777
756
def _accept_bytes(self, bytes):
778
757
"""See SmartClientStreamMedium.accept_bytes."""
779
758
self._ensure_connection()
780
osutils.until_no_eintr(self._write_to.write, bytes)
759
self._write_to.write(bytes)
781
760
self._report_activity(len(bytes), 'write')
783
762
def disconnect(self):
784
763
"""See SmartClientMedium.disconnect()."""
785
764
if not self._connected:
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
766
self._read_from.close()
767
self._write_to.close()
789
768
self._ssh_connection.close()
790
769
self._connected = False
814
793
if not self._connected:
815
794
raise errors.MediumNotConnected(self)
816
795
bytes_to_read = min(count, _MAX_READ_SIZE)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
796
bytes = self._read_from.read(bytes_to_read)
818
797
self._report_activity(len(bytes), 'read')
898
877
"""See SmartClientMedium.read_bytes."""
899
878
if not self._connected:
900
879
raise errors.MediumNotConnected(self)
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
880
# We ignore the desired_count because on sockets it's more efficient to
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
except socket.error, e:
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
905
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
942
932
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')