~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2009-04-27 03:27:46 UTC
  • mto: This revision was merged to the branch mainline in revision 4304.
  • Revision ID: robertc@robertcollins.net-20090427032746-vqmcsfbsbvbm04sk
Fixup tests broken by cleaning up the layering.

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
36
 
import thread
37
36
import weakref
38
 
 
39
37
from bzrlib import (
40
38
    debug,
41
39
    errors,
 
40
    osutils,
42
41
    symbol_versioning,
43
42
    trace,
44
43
    ui,
45
44
    urlutils,
46
45
    )
47
 
from bzrlib.smart import client, protocol, request, vfs
 
46
from bzrlib.smart import client, protocol
48
47
from bzrlib.transport import ssh
49
48
""")
50
 
#usually already imported, and getting IllegalScoperReplacer on it here.
51
 
from bzrlib import osutils
 
49
 
52
50
 
53
51
# We must not read any more than 64k at a time so we don't risk "no buffer
54
52
# space available" errors on some platforms.  Windows in particular is likely
287
285
        self._push_back(protocol.unused_data)
288
286
 
289
287
    def _read_bytes(self, desired_count):
290
 
        return _read_bytes_from_socket(
291
 
            self.socket.recv, desired_count, self._report_activity)
 
288
        # We ignore the desired_count because on sockets it's more efficient to
 
289
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
290
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
291
        self._report_activity(len(bytes), 'read')
 
292
        return bytes
292
293
 
293
294
    def terminate_due_to_error(self):
294
295
        # TODO: This should log to a server log file, but no such thing
295
296
        # exists yet.  Andrew Bennetts 2006-09-29.
296
 
        osutils.until_no_eintr(self.socket.close)
 
297
        self.socket.close()
297
298
        self.finished = True
298
299
 
299
300
    def _write_out(self, bytes):
300
 
        tstart = osutils.timer_func()
301
301
        osutils.send_all(self.socket, bytes, self._report_activity)
302
 
        if 'hpss' in debug.debug_flags:
303
 
            thread_id = thread.get_ident()
304
 
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
 
                         % ('wrote', thread_id, len(bytes),
306
 
                            osutils.timer_func() - tstart))
307
302
 
308
303
 
309
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
329
            bytes_to_read = protocol.next_read_size()
335
330
            if bytes_to_read == 0:
336
331
                # Finished serving this request.
337
 
                osutils.until_no_eintr(self._out.flush)
 
332
                self._out.flush()
338
333
                return
339
334
            bytes = self.read_bytes(bytes_to_read)
340
335
            if bytes == '':
341
336
                # Connection has been closed.
342
337
                self.finished = True
343
 
                osutils.until_no_eintr(self._out.flush)
 
338
                self._out.flush()
344
339
                return
345
340
            protocol.accept_bytes(bytes)
346
341
 
347
342
    def _read_bytes(self, desired_count):
348
 
        return osutils.until_no_eintr(self._in.read, desired_count)
 
343
        return self._in.read(desired_count)
349
344
 
350
345
    def terminate_due_to_error(self):
351
346
        # TODO: This should log to a server log file, but no such thing
352
347
        # exists yet.  Andrew Bennetts 2006-09-29.
353
 
        osutils.until_no_eintr(self._out.close)
 
348
        self._out.close()
354
349
        self.finished = True
355
350
 
356
351
    def _write_out(self, bytes):
357
 
        osutils.until_no_eintr(self._out.write, bytes)
 
352
        self._out.write(bytes)
358
353
 
359
354
 
360
355
class SmartClientMediumRequest(object):
480
475
        if not line.endswith('\n'):
481
476
            # end of file encountered reading from server
482
477
            raise errors.ConnectionReset(
483
 
                "Unexpected end of message. Please check connectivity "
484
 
                "and permissions, and report a bug if problems persist.")
 
478
                "please check connectivity and permissions")
485
479
        return line
486
480
 
487
481
    def _read_line(self):
515
509
        """
516
510
        medium_repr = repr(medium)
517
511
        # Add this medium to the WeakKeyDictionary
518
 
        self.counts[medium] = dict(count=0, vfs_count=0,
519
 
                                   medium_repr=medium_repr)
 
512
        self.counts[medium] = [0, medium_repr]
520
513
        # Weakref callbacks are fired in reverse order of their association
521
514
        # with the referenced object.  So we add a weakref *after* adding to
522
515
        # the WeakKeyDict so that we can report the value from it before the
526
519
    def increment_call_count(self, params):
527
520
        # Increment the count in the WeakKeyDictionary
528
521
        value = self.counts[params.medium]
529
 
        value['count'] += 1
530
 
        try:
531
 
            request_method = request.request_handlers.get(params.method)
532
 
        except KeyError:
533
 
            # A method we don't know about doesn't count as a VFS method.
534
 
            return
535
 
        if issubclass(request_method, vfs.VfsRequest):
536
 
            value['vfs_count'] += 1
 
522
        value[0] += 1
537
523
 
538
524
    def done(self, ref):
539
525
        value = self.counts[ref]
540
 
        count, vfs_count, medium_repr = (
541
 
            value['count'], value['vfs_count'], value['medium_repr'])
 
526
        count, medium_repr = value
542
527
        # In case this callback is invoked for the same ref twice (by the
543
528
        # weakref callback and by the atexit function), set the call count back
544
529
        # to 0 so this item won't be reported twice.
545
 
        value['count'] = 0
546
 
        value['vfs_count'] = 0
 
530
        value[0] = 0
547
531
        if count != 0:
548
 
            trace.note('HPSS calls: %d (%d vfs) %s',
549
 
                       count, vfs_count, medium_repr)
 
532
            trace.note('HPSS calls: %d %s', count, medium_repr)
550
533
 
551
534
    def flush_all(self):
552
535
        for ref in list(self.counts.keys()):
720
703
 
721
704
    def _accept_bytes(self, bytes):
722
705
        """See SmartClientStreamMedium.accept_bytes."""
723
 
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
 
706
        self._writeable_pipe.write(bytes)
724
707
        self._report_activity(len(bytes), 'write')
725
708
 
726
709
    def _flush(self):
727
710
        """See SmartClientStreamMedium._flush()."""
728
 
        osutils.until_no_eintr(self._writeable_pipe.flush)
 
711
        self._writeable_pipe.flush()
729
712
 
730
713
    def _read_bytes(self, count):
731
714
        """See SmartClientStreamMedium._read_bytes."""
732
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
715
        bytes = self._readable_pipe.read(count)
733
716
        self._report_activity(len(bytes), 'read')
734
717
        return bytes
735
718
 
749
732
        self._password = password
750
733
        self._port = port
751
734
        self._username = username
752
 
        # for the benefit of progress making a short description of this
753
 
        # transport
754
 
        self._scheme = 'bzr+ssh'
755
735
        # SmartClientStreamMedium stores the repr of this object in its
756
736
        # _DebugCounter so we have to store all the values used in our repr
757
737
        # method before calling the super init.
761
741
        self._vendor = vendor
762
742
        self._write_to = None
763
743
        self._bzr_remote_path = bzr_remote_path
 
744
        # for the benefit of progress making a short description of this
 
745
        # transport
 
746
        self._scheme = 'bzr+ssh'
764
747
 
765
748
    def __repr__(self):
766
 
        if self._port is None:
767
 
            maybe_port = ''
768
 
        else:
769
 
            maybe_port = ':%s' % self._port
770
 
        return "%s(%s://%s@%s%s/)" % (
 
749
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
771
750
            self.__class__.__name__,
772
 
            self._scheme,
 
751
            self._connected,
773
752
            self._username,
774
753
            self._host,
775
 
            maybe_port)
 
754
            self._port)
776
755
 
777
756
    def _accept_bytes(self, bytes):
778
757
        """See SmartClientStreamMedium.accept_bytes."""
779
758
        self._ensure_connection()
780
 
        osutils.until_no_eintr(self._write_to.write, bytes)
 
759
        self._write_to.write(bytes)
781
760
        self._report_activity(len(bytes), 'write')
782
761
 
783
762
    def disconnect(self):
784
763
        """See SmartClientMedium.disconnect()."""
785
764
        if not self._connected:
786
765
            return
787
 
        osutils.until_no_eintr(self._read_from.close)
788
 
        osutils.until_no_eintr(self._write_to.close)
 
766
        self._read_from.close()
 
767
        self._write_to.close()
789
768
        self._ssh_connection.close()
790
769
        self._connected = False
791
770
 
814
793
        if not self._connected:
815
794
            raise errors.MediumNotConnected(self)
816
795
        bytes_to_read = min(count, _MAX_READ_SIZE)
817
 
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
 
796
        bytes = self._read_from.read(bytes_to_read)
818
797
        self._report_activity(len(bytes), 'read')
819
798
        return bytes
820
799
 
844
823
        """See SmartClientMedium.disconnect()."""
845
824
        if not self._connected:
846
825
            return
847
 
        osutils.until_no_eintr(self._socket.close)
 
826
        self._socket.close()
848
827
        self._socket = None
849
828
        self._connected = False
850
829
 
898
877
        """See SmartClientMedium.read_bytes."""
899
878
        if not self._connected:
900
879
            raise errors.MediumNotConnected(self)
901
 
        return _read_bytes_from_socket(
902
 
            self._socket.recv, count, self._report_activity)
 
880
        # We ignore the desired_count because on sockets it's more efficient to
 
881
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
882
        try:
 
883
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
 
884
        except socket.error, e:
 
885
            if len(e.args) and e.args[0] == errno.ECONNRESET:
 
886
                # Callers expect an empty string in that case
 
887
                return ''
 
888
            else:
 
889
                raise
 
890
        else:
 
891
            self._report_activity(len(bytes), 'read')
 
892
            return bytes
903
893
 
904
894
 
905
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
941
931
        """
942
932
        self._medium._flush()
943
933
 
944
 
 
945
 
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
 
    # We ignore the desired_count because on sockets it's more efficient to
947
 
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
948
 
    try:
949
 
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
 
    except socket.error, e:
951
 
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
 
            # The connection was closed by the other side.  Callers expect an
953
 
            # empty string to signal end-of-stream.
954
 
            bytes = ''
955
 
        else:
956
 
            raise
957
 
    else:
958
 
        report_activity(len(bytes), 'read')
959
 
    return bytes
960