1
# Copyright (C) 2006 Canonical Ltd
1
# Copyright (C) 2006-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
37
39
from bzrlib import (
46
from bzrlib.smart import client, protocol
47
from bzrlib.smart import client, protocol, request, vfs
47
48
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
from bzrlib import osutils
51
53
# We must not read any more than 64k at a time so we don't risk "no buffer
52
54
# space available" errors on some platforms. Windows in particular is likely
285
287
self._push_back(protocol.unused_data)
287
289
def _read_bytes(self, desired_count):
288
# We ignore the desired_count because on sockets it's more efficient to
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
294
293
def terminate_due_to_error(self):
295
294
# TODO: This should log to a server log file, but no such thing
296
295
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
298
297
self.finished = True
300
299
def _write_out(self, bytes):
300
tstart = osutils.timer_func()
301
301
osutils.send_all(self.socket, bytes, self._report_activity)
302
if 'hpss' in debug.debug_flags:
303
thread_id = thread.get_ident()
304
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
% ('wrote', thread_id, len(bytes),
306
osutils.timer_func() - tstart))
304
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
329
334
bytes_to_read = protocol.next_read_size()
330
335
if bytes_to_read == 0:
331
336
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
334
339
bytes = self.read_bytes(bytes_to_read)
336
341
# Connection has been closed.
337
342
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
340
345
protocol.accept_bytes(bytes)
342
347
def _read_bytes(self, desired_count):
343
return self._in.read(desired_count)
348
return osutils.until_no_eintr(self._in.read, desired_count)
345
350
def terminate_due_to_error(self):
346
351
# TODO: This should log to a server log file, but no such thing
347
352
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
349
354
self.finished = True
351
356
def _write_out(self, bytes):
352
self._out.write(bytes)
357
osutils.until_no_eintr(self._out.write, bytes)
355
360
class SmartClientMediumRequest(object):
381
386
def accept_bytes(self, bytes):
382
387
"""Accept bytes for inclusion in this request.
384
This method may not be be called after finished_writing() has been
389
This method may not be called after finished_writing() has been
385
390
called. It depends upon the Medium whether or not the bytes will be
386
391
immediately transmitted. Message based Mediums will tend to buffer the
387
392
bytes until finished_writing() is called.
475
480
if not line.endswith('\n'):
476
481
# end of file encountered reading from server
477
482
raise errors.ConnectionReset(
478
"please check connectivity and permissions")
483
"Unexpected end of message. Please check connectivity "
484
"and permissions, and report a bug if problems persist.")
481
487
def _read_line(self):
510
516
medium_repr = repr(medium)
511
517
# Add this medium to the WeakKeyDictionary
512
self.counts[medium] = [0, medium_repr]
518
self.counts[medium] = dict(count=0, vfs_count=0,
519
medium_repr=medium_repr)
513
520
# Weakref callbacks are fired in reverse order of their association
514
521
# with the referenced object. So we add a weakref *after* adding to
515
522
# the WeakKeyDict so that we can report the value from it before the
519
526
def increment_call_count(self, params):
520
527
# Increment the count in the WeakKeyDictionary
521
528
value = self.counts[params.medium]
531
request_method = request.request_handlers.get(params.method)
533
# A method we don't know about doesn't count as a VFS method.
535
if issubclass(request_method, vfs.VfsRequest):
536
value['vfs_count'] += 1
524
538
def done(self, ref):
525
539
value = self.counts[ref]
526
count, medium_repr = value
540
count, vfs_count, medium_repr = (
541
value['count'], value['vfs_count'], value['medium_repr'])
527
542
# In case this callback is invoked for the same ref twice (by the
528
543
# weakref callback and by the atexit function), set the call count back
529
544
# to 0 so this item won't be reported twice.
546
value['vfs_count'] = 0
532
trace.note('HPSS calls: %d %s', count, medium_repr)
548
trace.note('HPSS calls: %d (%d vfs) %s',
549
count, vfs_count, medium_repr)
534
551
def flush_all(self):
535
552
for ref in list(self.counts.keys()):
704
721
def _accept_bytes(self, bytes):
705
722
"""See SmartClientStreamMedium.accept_bytes."""
706
self._writeable_pipe.write(bytes)
723
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
707
724
self._report_activity(len(bytes), 'write')
709
726
def _flush(self):
710
727
"""See SmartClientStreamMedium._flush()."""
711
self._writeable_pipe.flush()
728
osutils.until_no_eintr(self._writeable_pipe.flush)
713
730
def _read_bytes(self, count):
714
731
"""See SmartClientStreamMedium._read_bytes."""
715
bytes = self._readable_pipe.read(count)
732
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
716
733
self._report_activity(len(bytes), 'read')
732
749
self._password = password
733
750
self._port = port
734
751
self._username = username
752
# for the benefit of progress making a short description of this
754
self._scheme = 'bzr+ssh'
735
755
# SmartClientStreamMedium stores the repr of this object in its
736
756
# _DebugCounter so we have to store all the values used in our repr
737
757
# method before calling the super init.
741
761
self._vendor = vendor
742
762
self._write_to = None
743
763
self._bzr_remote_path = bzr_remote_path
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
748
765
def __repr__(self):
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
766
if self._port is None:
769
maybe_port = ':%s' % self._port
770
return "%s(%s://%s@%s%s/)" % (
750
771
self.__class__.__name__,
756
777
def _accept_bytes(self, bytes):
757
778
"""See SmartClientStreamMedium.accept_bytes."""
758
779
self._ensure_connection()
759
self._write_to.write(bytes)
780
osutils.until_no_eintr(self._write_to.write, bytes)
760
781
self._report_activity(len(bytes), 'write')
762
783
def disconnect(self):
763
784
"""See SmartClientMedium.disconnect()."""
764
785
if not self._connected:
766
self._read_from.close()
767
self._write_to.close()
787
osutils.until_no_eintr(self._read_from.close)
788
osutils.until_no_eintr(self._write_to.close)
768
789
self._ssh_connection.close()
769
790
self._connected = False
793
814
if not self._connected:
794
815
raise errors.MediumNotConnected(self)
795
816
bytes_to_read = min(count, _MAX_READ_SIZE)
796
bytes = self._read_from.read(bytes_to_read)
817
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
797
818
self._report_activity(len(bytes), 'read')
877
898
"""See SmartClientMedium.read_bytes."""
878
899
if not self._connected:
879
900
raise errors.MediumNotConnected(self)
880
# We ignore the desired_count because on sockets it's more efficient to
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
except socket.error, e:
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
901
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
895
905
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
932
942
self._medium._flush()
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')