~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
import sys
29
29
import urllib
30
30
 
 
31
import bzrlib
31
32
from bzrlib.lazy_import import lazy_import
32
33
lazy_import(globals(), """
33
 
import atexit
34
34
import socket
35
35
import thread
36
36
import weakref
38
38
from bzrlib import (
39
39
    debug,
40
40
    errors,
41
 
    symbol_versioning,
42
41
    trace,
43
42
    ui,
44
43
    urlutils,
494
493
class _DebugCounter(object):
495
494
    """An object that counts the HPSS calls made to each client medium.
496
495
 
497
 
    When a medium is garbage-collected, or failing that when atexit functions
498
 
    are run, the total number of calls made on that medium are reported via
499
 
    trace.note.
 
496
    When a medium is garbage-collected, or failing that when
 
497
    bzrlib.global_state exits, the total number of calls made on that medium
 
498
    are reported via trace.note.
500
499
    """
501
500
 
502
501
    def __init__(self):
503
502
        self.counts = weakref.WeakKeyDictionary()
504
503
        client._SmartClient.hooks.install_named_hook(
505
504
            'call', self.increment_call_count, 'hpss call counter')
506
 
        atexit.register(self.flush_all)
 
505
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
506
 
508
507
    def track(self, medium):
509
508
        """Start tracking calls made to a medium.
715
714
    """A client medium using simple pipes.
716
715
 
717
716
    This client does not manage the pipes: it assumes they will always be open.
718
 
 
719
 
    Note that if readable_pipe.read might raise IOError or OSError with errno
720
 
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
721
 
    (such as used for sys.stdin) are safe.
722
717
    """
723
718
 
724
719
    def __init__(self, readable_pipe, writeable_pipe, base):
737
732
 
738
733
    def _read_bytes(self, count):
739
734
        """See SmartClientStreamMedium._read_bytes."""
740
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
735
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
736
        bytes = self._readable_pipe.read(bytes_to_read)
741
737
        self._report_activity(len(bytes), 'read')
742
738
        return bytes
743
739
 
744
740
 
 
741
class SSHParams(object):
 
742
    """A set of parameters for starting a remote bzr via SSH."""
 
743
 
 
744
    def __init__(self, host, port=None, username=None, password=None,
 
745
            bzr_remote_path='bzr'):
 
746
        self.host = host
 
747
        self.port = port
 
748
        self.username = username
 
749
        self.password = password
 
750
        self.bzr_remote_path = bzr_remote_path
 
751
 
 
752
 
745
753
class SmartSSHClientMedium(SmartClientStreamMedium):
746
 
    """A client medium using SSH."""
 
754
    """A client medium using SSH.
 
755
    
 
756
    It delegates IO to a SmartClientSocketMedium or
 
757
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
758
    """
747
759
 
748
 
    def __init__(self, host, port=None, username=None, password=None,
749
 
            base=None, vendor=None, bzr_remote_path=None):
 
760
    def __init__(self, base, ssh_params, vendor=None):
750
761
        """Creates a client that will connect on the first use.
751
762
 
 
763
        :param ssh_params: A SSHParams instance.
752
764
        :param vendor: An optional override for the ssh vendor to use. See
753
765
            bzrlib.transport.ssh for details on ssh vendors.
754
766
        """
755
 
        self._connected = False
756
 
        self._host = host
757
 
        self._password = password
758
 
        self._port = port
759
 
        self._username = username
 
767
        self._real_medium = None
 
768
        self._ssh_params = ssh_params
760
769
        # for the benefit of progress making a short description of this
761
770
        # transport
762
771
        self._scheme = 'bzr+ssh'
764
773
        # _DebugCounter so we have to store all the values used in our repr
765
774
        # method before calling the super init.
766
775
        SmartClientStreamMedium.__init__(self, base)
767
 
        self._read_from = None
 
776
        self._vendor = vendor
768
777
        self._ssh_connection = None
769
 
        self._vendor = vendor
770
 
        self._write_to = None
771
 
        self._bzr_remote_path = bzr_remote_path
772
778
 
773
779
    def __repr__(self):
774
 
        if self._port is None:
 
780
        if self._ssh_params.port is None:
775
781
            maybe_port = ''
776
782
        else:
777
 
            maybe_port = ':%s' % self._port
 
783
            maybe_port = ':%s' % self._ssh_params.port
778
784
        return "%s(%s://%s@%s%s/)" % (
779
785
            self.__class__.__name__,
780
786
            self._scheme,
781
 
            self._username,
782
 
            self._host,
 
787
            self._ssh_params.username,
 
788
            self._ssh_params.host,
783
789
            maybe_port)
784
790
 
785
791
    def _accept_bytes(self, bytes):
786
792
        """See SmartClientStreamMedium.accept_bytes."""
787
793
        self._ensure_connection()
788
 
        self._write_to.write(bytes)
789
 
        self._report_activity(len(bytes), 'write')
 
794
        self._real_medium.accept_bytes(bytes)
790
795
 
791
796
    def disconnect(self):
792
797
        """See SmartClientMedium.disconnect()."""
793
 
        if not self._connected:
794
 
            return
795
 
        self._read_from.close()
796
 
        self._write_to.close()
797
 
        self._ssh_connection.close()
798
 
        self._connected = False
 
798
        if self._real_medium is not None:
 
799
            self._real_medium.disconnect()
 
800
            self._real_medium = None
 
801
        if self._ssh_connection is not None:
 
802
            self._ssh_connection.close()
 
803
            self._ssh_connection = None
799
804
 
800
805
    def _ensure_connection(self):
801
806
        """Connect this medium if not already connected."""
802
 
        if self._connected:
 
807
        if self._real_medium is not None:
803
808
            return
804
809
        if self._vendor is None:
805
810
            vendor = ssh._get_ssh_vendor()
806
811
        else:
807
812
            vendor = self._vendor
808
 
        self._ssh_connection = vendor.connect_ssh(self._username,
809
 
                self._password, self._host, self._port,
810
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
813
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
814
                self._ssh_params.password, self._ssh_params.host,
 
815
                self._ssh_params.port,
 
816
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
817
                         '--directory=/', '--allow-writes'])
812
 
        self._read_from, self._write_to = \
813
 
            self._ssh_connection.get_filelike_channels()
814
 
        self._connected = True
 
818
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
819
        if io_kind == 'socket':
 
820
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
821
                self.base, io_object)
 
822
        elif io_kind == 'pipes':
 
823
            read_from, write_to = io_object
 
824
            self._real_medium = SmartSimplePipesClientMedium(
 
825
                read_from, write_to, self.base)
 
826
        else:
 
827
            raise AssertionError(
 
828
                "Unexpected io_kind %r from %r"
 
829
                % (io_kind, self._ssh_connection))
815
830
 
816
831
    def _flush(self):
817
832
        """See SmartClientStreamMedium._flush()."""
818
 
        self._write_to.flush()
 
833
        self._real_medium._flush()
819
834
 
820
835
    def _read_bytes(self, count):
821
836
        """See SmartClientStreamMedium.read_bytes."""
822
 
        if not self._connected:
 
837
        if self._real_medium is None:
823
838
            raise errors.MediumNotConnected(self)
824
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
825
 
        bytes = self._read_from.read(bytes_to_read)
826
 
        self._report_activity(len(bytes), 'read')
827
 
        return bytes
 
839
        return self._real_medium.read_bytes(count)
828
840
 
829
841
 
830
842
# Port 4155 is the default port for bzr://, registered with IANA.
832
844
BZR_DEFAULT_PORT = 4155
833
845
 
834
846
 
835
 
class SmartTCPClientMedium(SmartClientStreamMedium):
836
 
    """A client medium using TCP."""
 
847
class SmartClientSocketMedium(SmartClientStreamMedium):
 
848
    """A client medium using a socket.
 
849
    
 
850
    This class isn't usable directly.  Use one of its subclasses instead.
 
851
    """
837
852
 
838
 
    def __init__(self, host, port, base):
839
 
        """Creates a client that will connect on the first use."""
 
853
    def __init__(self, base):
840
854
        SmartClientStreamMedium.__init__(self, base)
 
855
        self._socket = None
841
856
        self._connected = False
842
 
        self._host = host
843
 
        self._port = port
844
 
        self._socket = None
845
857
 
846
858
    def _accept_bytes(self, bytes):
847
859
        """See SmartClientMedium.accept_bytes."""
848
860
        self._ensure_connection()
849
861
        osutils.send_all(self._socket, bytes, self._report_activity)
850
862
 
 
863
    def _ensure_connection(self):
 
864
        """Connect this medium if not already connected."""
 
865
        raise NotImplementedError(self._ensure_connection)
 
866
 
 
867
    def _flush(self):
 
868
        """See SmartClientStreamMedium._flush().
 
869
 
 
870
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
871
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
872
        future.
 
873
        """
 
874
 
 
875
    def _read_bytes(self, count):
 
876
        """See SmartClientMedium.read_bytes."""
 
877
        if not self._connected:
 
878
            raise errors.MediumNotConnected(self)
 
879
        return osutils.read_bytes_from_socket(
 
880
            self._socket, self._report_activity)
 
881
 
851
882
    def disconnect(self):
852
883
        """See SmartClientMedium.disconnect()."""
853
884
        if not self._connected:
856
887
        self._socket = None
857
888
        self._connected = False
858
889
 
 
890
 
 
891
class SmartTCPClientMedium(SmartClientSocketMedium):
 
892
    """A client medium that creates a TCP connection."""
 
893
 
 
894
    def __init__(self, host, port, base):
 
895
        """Creates a client that will connect on the first use."""
 
896
        SmartClientSocketMedium.__init__(self, base)
 
897
        self._host = host
 
898
        self._port = port
 
899
 
859
900
    def _ensure_connection(self):
860
901
        """Connect this medium if not already connected."""
861
902
        if self._connected:
895
936
                    (self._host, port, err_msg))
896
937
        self._connected = True
897
938
 
898
 
    def _flush(self):
899
 
        """See SmartClientStreamMedium._flush().
900
 
 
901
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
 
        add a means to do a flush, but that can be done in the future.
903
 
        """
904
 
 
905
 
    def _read_bytes(self, count):
906
 
        """See SmartClientMedium.read_bytes."""
907
 
        if not self._connected:
908
 
            raise errors.MediumNotConnected(self)
909
 
        return osutils.read_bytes_from_socket(
910
 
            self._socket, self._report_activity)
 
939
 
 
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
941
    """A client medium for an already connected socket.
 
942
    
 
943
    Note that this class will assume it "owns" the socket, so it will close it
 
944
    when its disconnect method is called.
 
945
    """
 
946
 
 
947
    def __init__(self, base, sock):
 
948
        SmartClientSocketMedium.__init__(self, base)
 
949
        self._socket = sock
 
950
        self._connected = True
 
951
 
 
952
    def _ensure_connection(self):
 
953
        # Already connected, by definition!  So nothing to do.
 
954
        pass
911
955
 
912
956
 
913
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):