~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2010-05-11 08:36:16 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100511083616-b8fjb19zomwupid0
Make all lock methods return Result objects, rather than lock_read returning self, as per John's review.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
import sys
29
29
import urllib
30
30
 
31
 
import bzrlib
32
31
from bzrlib.lazy_import import lazy_import
33
32
lazy_import(globals(), """
 
33
import atexit
34
34
import socket
35
35
import thread
36
36
import weakref
38
38
from bzrlib import (
39
39
    debug,
40
40
    errors,
 
41
    symbol_versioning,
41
42
    trace,
42
43
    ui,
43
44
    urlutils,
493
494
class _DebugCounter(object):
494
495
    """An object that counts the HPSS calls made to each client medium.
495
496
 
496
 
    When a medium is garbage-collected, or failing that when
497
 
    bzrlib.global_state exits, the total number of calls made on that medium
498
 
    are reported via trace.note.
 
497
    When a medium is garbage-collected, or failing that when atexit functions
 
498
    are run, the total number of calls made on that medium are reported via
 
499
    trace.note.
499
500
    """
500
501
 
501
502
    def __init__(self):
502
503
        self.counts = weakref.WeakKeyDictionary()
503
504
        client._SmartClient.hooks.install_named_hook(
504
505
            'call', self.increment_call_count, 'hpss call counter')
505
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
506
        atexit.register(self.flush_all)
506
507
 
507
508
    def track(self, medium):
508
509
        """Start tracking calls made to a medium.
714
715
    """A client medium using simple pipes.
715
716
 
716
717
    This client does not manage the pipes: it assumes they will always be open.
 
718
 
 
719
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
720
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
721
    (such as used for sys.stdin) are safe.
717
722
    """
718
723
 
719
724
    def __init__(self, readable_pipe, writeable_pipe, base):
732
737
 
733
738
    def _read_bytes(self, count):
734
739
        """See SmartClientStreamMedium._read_bytes."""
735
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
736
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
740
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
737
741
        self._report_activity(len(bytes), 'read')
738
742
        return bytes
739
743
 
740
744
 
741
 
class SSHParams(object):
742
 
    """A set of parameters for starting a remote bzr via SSH."""
 
745
class SmartSSHClientMedium(SmartClientStreamMedium):
 
746
    """A client medium using SSH."""
743
747
 
744
748
    def __init__(self, host, port=None, username=None, password=None,
745
 
            bzr_remote_path='bzr'):
746
 
        self.host = host
747
 
        self.port = port
748
 
        self.username = username
749
 
        self.password = password
750
 
        self.bzr_remote_path = bzr_remote_path
751
 
 
752
 
 
753
 
class SmartSSHClientMedium(SmartClientStreamMedium):
754
 
    """A client medium using SSH.
755
 
    
756
 
    It delegates IO to a SmartClientSocketMedium or
757
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
758
 
    """
759
 
 
760
 
    def __init__(self, base, ssh_params, vendor=None):
 
749
            base=None, vendor=None, bzr_remote_path=None):
761
750
        """Creates a client that will connect on the first use.
762
751
 
763
 
        :param ssh_params: A SSHParams instance.
764
752
        :param vendor: An optional override for the ssh vendor to use. See
765
753
            bzrlib.transport.ssh for details on ssh vendors.
766
754
        """
767
 
        self._real_medium = None
768
 
        self._ssh_params = ssh_params
 
755
        self._connected = False
 
756
        self._host = host
 
757
        self._password = password
 
758
        self._port = port
 
759
        self._username = username
769
760
        # for the benefit of progress making a short description of this
770
761
        # transport
771
762
        self._scheme = 'bzr+ssh'
773
764
        # _DebugCounter so we have to store all the values used in our repr
774
765
        # method before calling the super init.
775
766
        SmartClientStreamMedium.__init__(self, base)
 
767
        self._read_from = None
 
768
        self._ssh_connection = None
776
769
        self._vendor = vendor
777
 
        self._ssh_connection = None
 
770
        self._write_to = None
 
771
        self._bzr_remote_path = bzr_remote_path
778
772
 
779
773
    def __repr__(self):
780
 
        if self._ssh_params.port is None:
 
774
        if self._port is None:
781
775
            maybe_port = ''
782
776
        else:
783
 
            maybe_port = ':%s' % self._ssh_params.port
 
777
            maybe_port = ':%s' % self._port
784
778
        return "%s(%s://%s@%s%s/)" % (
785
779
            self.__class__.__name__,
786
780
            self._scheme,
787
 
            self._ssh_params.username,
788
 
            self._ssh_params.host,
 
781
            self._username,
 
782
            self._host,
789
783
            maybe_port)
790
784
 
791
785
    def _accept_bytes(self, bytes):
792
786
        """See SmartClientStreamMedium.accept_bytes."""
793
787
        self._ensure_connection()
794
 
        self._real_medium.accept_bytes(bytes)
 
788
        self._write_to.write(bytes)
 
789
        self._report_activity(len(bytes), 'write')
795
790
 
796
791
    def disconnect(self):
797
792
        """See SmartClientMedium.disconnect()."""
798
 
        if self._real_medium is not None:
799
 
            self._real_medium.disconnect()
800
 
            self._real_medium = None
801
 
        if self._ssh_connection is not None:
802
 
            self._ssh_connection.close()
803
 
            self._ssh_connection = None
 
793
        if not self._connected:
 
794
            return
 
795
        self._read_from.close()
 
796
        self._write_to.close()
 
797
        self._ssh_connection.close()
 
798
        self._connected = False
804
799
 
805
800
    def _ensure_connection(self):
806
801
        """Connect this medium if not already connected."""
807
 
        if self._real_medium is not None:
 
802
        if self._connected:
808
803
            return
809
804
        if self._vendor is None:
810
805
            vendor = ssh._get_ssh_vendor()
811
806
        else:
812
807
            vendor = self._vendor
813
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
 
                self._ssh_params.password, self._ssh_params.host,
815
 
                self._ssh_params.port,
816
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
808
        self._ssh_connection = vendor.connect_ssh(self._username,
 
809
                self._password, self._host, self._port,
 
810
                command=[self._bzr_remote_path, 'serve', '--inet',
817
811
                         '--directory=/', '--allow-writes'])
818
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
 
        if io_kind == 'socket':
820
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
 
                self.base, io_object)
822
 
        elif io_kind == 'pipes':
823
 
            read_from, write_to = io_object
824
 
            self._real_medium = SmartSimplePipesClientMedium(
825
 
                read_from, write_to, self.base)
826
 
        else:
827
 
            raise AssertionError(
828
 
                "Unexpected io_kind %r from %r"
829
 
                % (io_kind, self._ssh_connection))
 
812
        self._read_from, self._write_to = \
 
813
            self._ssh_connection.get_filelike_channels()
 
814
        self._connected = True
830
815
 
831
816
    def _flush(self):
832
817
        """See SmartClientStreamMedium._flush()."""
833
 
        self._real_medium._flush()
 
818
        self._write_to.flush()
834
819
 
835
820
    def _read_bytes(self, count):
836
821
        """See SmartClientStreamMedium.read_bytes."""
837
 
        if self._real_medium is None:
 
822
        if not self._connected:
838
823
            raise errors.MediumNotConnected(self)
839
 
        return self._real_medium.read_bytes(count)
 
824
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
825
        bytes = self._read_from.read(bytes_to_read)
 
826
        self._report_activity(len(bytes), 'read')
 
827
        return bytes
840
828
 
841
829
 
842
830
# Port 4155 is the default port for bzr://, registered with IANA.
844
832
BZR_DEFAULT_PORT = 4155
845
833
 
846
834
 
847
 
class SmartClientSocketMedium(SmartClientStreamMedium):
848
 
    """A client medium using a socket.
849
 
    
850
 
    This class isn't usable directly.  Use one of its subclasses instead.
851
 
    """
 
835
class SmartTCPClientMedium(SmartClientStreamMedium):
 
836
    """A client medium using TCP."""
852
837
 
853
 
    def __init__(self, base):
 
838
    def __init__(self, host, port, base):
 
839
        """Creates a client that will connect on the first use."""
854
840
        SmartClientStreamMedium.__init__(self, base)
 
841
        self._connected = False
 
842
        self._host = host
 
843
        self._port = port
855
844
        self._socket = None
856
 
        self._connected = False
857
845
 
858
846
    def _accept_bytes(self, bytes):
859
847
        """See SmartClientMedium.accept_bytes."""
860
848
        self._ensure_connection()
861
849
        osutils.send_all(self._socket, bytes, self._report_activity)
862
850
 
863
 
    def _ensure_connection(self):
864
 
        """Connect this medium if not already connected."""
865
 
        raise NotImplementedError(self._ensure_connection)
866
 
 
867
 
    def _flush(self):
868
 
        """See SmartClientStreamMedium._flush().
869
 
 
870
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
871
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
872
 
        future.
873
 
        """
874
 
 
875
 
    def _read_bytes(self, count):
876
 
        """See SmartClientMedium.read_bytes."""
877
 
        if not self._connected:
878
 
            raise errors.MediumNotConnected(self)
879
 
        return osutils.read_bytes_from_socket(
880
 
            self._socket, self._report_activity)
881
 
 
882
851
    def disconnect(self):
883
852
        """See SmartClientMedium.disconnect()."""
884
853
        if not self._connected:
887
856
        self._socket = None
888
857
        self._connected = False
889
858
 
890
 
 
891
 
class SmartTCPClientMedium(SmartClientSocketMedium):
892
 
    """A client medium that creates a TCP connection."""
893
 
 
894
 
    def __init__(self, host, port, base):
895
 
        """Creates a client that will connect on the first use."""
896
 
        SmartClientSocketMedium.__init__(self, base)
897
 
        self._host = host
898
 
        self._port = port
899
 
 
900
859
    def _ensure_connection(self):
901
860
        """Connect this medium if not already connected."""
902
861
        if self._connected:
936
895
                    (self._host, port, err_msg))
937
896
        self._connected = True
938
897
 
939
 
 
940
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
941
 
    """A client medium for an already connected socket.
942
 
    
943
 
    Note that this class will assume it "owns" the socket, so it will close it
944
 
    when its disconnect method is called.
945
 
    """
946
 
 
947
 
    def __init__(self, base, sock):
948
 
        SmartClientSocketMedium.__init__(self, base)
949
 
        self._socket = sock
950
 
        self._connected = True
951
 
 
952
 
    def _ensure_connection(self):
953
 
        # Already connected, by definition!  So nothing to do.
954
 
        pass
 
898
    def _flush(self):
 
899
        """See SmartClientStreamMedium._flush().
 
900
 
 
901
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
902
        add a means to do a flush, but that can be done in the future.
 
903
        """
 
904
 
 
905
    def _read_bytes(self, count):
 
906
        """See SmartClientMedium.read_bytes."""
 
907
        if not self._connected:
 
908
            raise errors.MediumNotConnected(self)
 
909
        return osutils.read_bytes_from_socket(
 
910
            self._socket, self._report_activity)
955
911
 
956
912
 
957
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):