~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Jelmer Vernooij
  • Date: 2011-09-05 14:16:12 UTC
  • mto: (6123.1.5 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6124.
  • Revision ID: jelmer@samba.org-20110905141612-o8t6zu2tjezh2vk3
Move flags to BranchFormat.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
import sys
29
29
import urllib
30
30
 
 
31
import bzrlib
31
32
from bzrlib.lazy_import import lazy_import
32
33
lazy_import(globals(), """
33
 
import atexit
34
34
import socket
35
35
import thread
36
36
import weakref
38
38
from bzrlib import (
39
39
    debug,
40
40
    errors,
41
 
    symbol_versioning,
42
41
    trace,
43
42
    ui,
44
43
    urlutils,
491
490
        return self._medium._get_line()
492
491
 
493
492
 
 
493
class _VfsRefuser(object):
 
494
    """An object that refuses all VFS requests.
 
495
 
 
496
    """
 
497
 
 
498
    def __init__(self):
 
499
        client._SmartClient.hooks.install_named_hook(
 
500
            'call', self.check_vfs, 'vfs refuser')
 
501
 
 
502
    def check_vfs(self, params):
 
503
        try:
 
504
            request_method = request.request_handlers.get(params.method)
 
505
        except KeyError:
 
506
            # A method we don't know about doesn't count as a VFS method.
 
507
            return
 
508
        if issubclass(request_method, vfs.VfsRequest):
 
509
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
 
510
 
 
511
 
494
512
class _DebugCounter(object):
495
513
    """An object that counts the HPSS calls made to each client medium.
496
514
 
497
 
    When a medium is garbage-collected, or failing that when atexit functions
498
 
    are run, the total number of calls made on that medium are reported via
499
 
    trace.note.
 
515
    When a medium is garbage-collected, or failing that when
 
516
    bzrlib.global_state exits, the total number of calls made on that medium
 
517
    are reported via trace.note.
500
518
    """
501
519
 
502
520
    def __init__(self):
503
521
        self.counts = weakref.WeakKeyDictionary()
504
522
        client._SmartClient.hooks.install_named_hook(
505
523
            'call', self.increment_call_count, 'hpss call counter')
506
 
        atexit.register(self.flush_all)
 
524
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
525
 
508
526
    def track(self, medium):
509
527
        """Start tracking calls made to a medium.
551
569
            self.done(ref)
552
570
 
553
571
_debug_counter = None
 
572
_vfs_refuser = None
554
573
 
555
574
 
556
575
class SmartClientMedium(SmartMedium):
573
592
            if _debug_counter is None:
574
593
                _debug_counter = _DebugCounter()
575
594
            _debug_counter.track(self)
 
595
        if 'hpss_client_no_vfs' in debug.debug_flags:
 
596
            global _vfs_refuser
 
597
            if _vfs_refuser is None:
 
598
                _vfs_refuser = _VfsRefuser()
576
599
 
577
600
    def _is_remote_before(self, version_tuple):
578
601
        """Is it possible the remote side supports RPCs for a given version?
715
738
    """A client medium using simple pipes.
716
739
 
717
740
    This client does not manage the pipes: it assumes they will always be open.
718
 
 
719
 
    Note that if readable_pipe.read might raise IOError or OSError with errno
720
 
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
721
 
    (such as used for sys.stdin) are safe.
722
741
    """
723
742
 
724
743
    def __init__(self, readable_pipe, writeable_pipe, base):
737
756
 
738
757
    def _read_bytes(self, count):
739
758
        """See SmartClientStreamMedium._read_bytes."""
740
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
759
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
760
        bytes = self._readable_pipe.read(bytes_to_read)
741
761
        self._report_activity(len(bytes), 'read')
742
762
        return bytes
743
763
 
744
764
 
 
765
class SSHParams(object):
 
766
    """A set of parameters for starting a remote bzr via SSH."""
 
767
 
 
768
    def __init__(self, host, port=None, username=None, password=None,
 
769
            bzr_remote_path='bzr'):
 
770
        self.host = host
 
771
        self.port = port
 
772
        self.username = username
 
773
        self.password = password
 
774
        self.bzr_remote_path = bzr_remote_path
 
775
 
 
776
 
745
777
class SmartSSHClientMedium(SmartClientStreamMedium):
746
 
    """A client medium using SSH."""
 
778
    """A client medium using SSH.
 
779
    
 
780
    It delegates IO to a SmartClientSocketMedium or
 
781
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
782
    """
747
783
 
748
 
    def __init__(self, host, port=None, username=None, password=None,
749
 
            base=None, vendor=None, bzr_remote_path=None):
 
784
    def __init__(self, base, ssh_params, vendor=None):
750
785
        """Creates a client that will connect on the first use.
751
786
 
 
787
        :param ssh_params: A SSHParams instance.
752
788
        :param vendor: An optional override for the ssh vendor to use. See
753
789
            bzrlib.transport.ssh for details on ssh vendors.
754
790
        """
755
 
        self._connected = False
756
 
        self._host = host
757
 
        self._password = password
758
 
        self._port = port
759
 
        self._username = username
 
791
        self._real_medium = None
 
792
        self._ssh_params = ssh_params
760
793
        # for the benefit of progress making a short description of this
761
794
        # transport
762
795
        self._scheme = 'bzr+ssh'
764
797
        # _DebugCounter so we have to store all the values used in our repr
765
798
        # method before calling the super init.
766
799
        SmartClientStreamMedium.__init__(self, base)
767
 
        self._read_from = None
 
800
        self._vendor = vendor
768
801
        self._ssh_connection = None
769
 
        self._vendor = vendor
770
 
        self._write_to = None
771
 
        self._bzr_remote_path = bzr_remote_path
772
802
 
773
803
    def __repr__(self):
774
 
        if self._port is None:
 
804
        if self._ssh_params.port is None:
775
805
            maybe_port = ''
776
806
        else:
777
 
            maybe_port = ':%s' % self._port
 
807
            maybe_port = ':%s' % self._ssh_params.port
778
808
        return "%s(%s://%s@%s%s/)" % (
779
809
            self.__class__.__name__,
780
810
            self._scheme,
781
 
            self._username,
782
 
            self._host,
 
811
            self._ssh_params.username,
 
812
            self._ssh_params.host,
783
813
            maybe_port)
784
814
 
785
815
    def _accept_bytes(self, bytes):
786
816
        """See SmartClientStreamMedium.accept_bytes."""
787
817
        self._ensure_connection()
788
 
        self._write_to.write(bytes)
789
 
        self._report_activity(len(bytes), 'write')
 
818
        self._real_medium.accept_bytes(bytes)
790
819
 
791
820
    def disconnect(self):
792
821
        """See SmartClientMedium.disconnect()."""
793
 
        if not self._connected:
794
 
            return
795
 
        self._read_from.close()
796
 
        self._write_to.close()
797
 
        self._ssh_connection.close()
798
 
        self._connected = False
 
822
        if self._real_medium is not None:
 
823
            self._real_medium.disconnect()
 
824
            self._real_medium = None
 
825
        if self._ssh_connection is not None:
 
826
            self._ssh_connection.close()
 
827
            self._ssh_connection = None
799
828
 
800
829
    def _ensure_connection(self):
801
830
        """Connect this medium if not already connected."""
802
 
        if self._connected:
 
831
        if self._real_medium is not None:
803
832
            return
804
833
        if self._vendor is None:
805
834
            vendor = ssh._get_ssh_vendor()
806
835
        else:
807
836
            vendor = self._vendor
808
 
        self._ssh_connection = vendor.connect_ssh(self._username,
809
 
                self._password, self._host, self._port,
810
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
837
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
838
                self._ssh_params.password, self._ssh_params.host,
 
839
                self._ssh_params.port,
 
840
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
841
                         '--directory=/', '--allow-writes'])
812
 
        self._read_from, self._write_to = \
813
 
            self._ssh_connection.get_filelike_channels()
814
 
        self._connected = True
 
842
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
843
        if io_kind == 'socket':
 
844
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
845
                self.base, io_object)
 
846
        elif io_kind == 'pipes':
 
847
            read_from, write_to = io_object
 
848
            self._real_medium = SmartSimplePipesClientMedium(
 
849
                read_from, write_to, self.base)
 
850
        else:
 
851
            raise AssertionError(
 
852
                "Unexpected io_kind %r from %r"
 
853
                % (io_kind, self._ssh_connection))
815
854
 
816
855
    def _flush(self):
817
856
        """See SmartClientStreamMedium._flush()."""
818
 
        self._write_to.flush()
 
857
        self._real_medium._flush()
819
858
 
820
859
    def _read_bytes(self, count):
821
860
        """See SmartClientStreamMedium.read_bytes."""
822
 
        if not self._connected:
 
861
        if self._real_medium is None:
823
862
            raise errors.MediumNotConnected(self)
824
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
825
 
        bytes = self._read_from.read(bytes_to_read)
826
 
        self._report_activity(len(bytes), 'read')
827
 
        return bytes
 
863
        return self._real_medium.read_bytes(count)
828
864
 
829
865
 
830
866
# Port 4155 is the default port for bzr://, registered with IANA.
832
868
BZR_DEFAULT_PORT = 4155
833
869
 
834
870
 
835
 
class SmartTCPClientMedium(SmartClientStreamMedium):
836
 
    """A client medium using TCP."""
 
871
class SmartClientSocketMedium(SmartClientStreamMedium):
 
872
    """A client medium using a socket.
 
873
    
 
874
    This class isn't usable directly.  Use one of its subclasses instead.
 
875
    """
837
876
 
838
 
    def __init__(self, host, port, base):
839
 
        """Creates a client that will connect on the first use."""
 
877
    def __init__(self, base):
840
878
        SmartClientStreamMedium.__init__(self, base)
 
879
        self._socket = None
841
880
        self._connected = False
842
 
        self._host = host
843
 
        self._port = port
844
 
        self._socket = None
845
881
 
846
882
    def _accept_bytes(self, bytes):
847
883
        """See SmartClientMedium.accept_bytes."""
848
884
        self._ensure_connection()
849
885
        osutils.send_all(self._socket, bytes, self._report_activity)
850
886
 
 
887
    def _ensure_connection(self):
 
888
        """Connect this medium if not already connected."""
 
889
        raise NotImplementedError(self._ensure_connection)
 
890
 
 
891
    def _flush(self):
 
892
        """See SmartClientStreamMedium._flush().
 
893
 
 
894
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
895
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
896
        future.
 
897
        """
 
898
 
 
899
    def _read_bytes(self, count):
 
900
        """See SmartClientMedium.read_bytes."""
 
901
        if not self._connected:
 
902
            raise errors.MediumNotConnected(self)
 
903
        return osutils.read_bytes_from_socket(
 
904
            self._socket, self._report_activity)
 
905
 
851
906
    def disconnect(self):
852
907
        """See SmartClientMedium.disconnect()."""
853
908
        if not self._connected:
856
911
        self._socket = None
857
912
        self._connected = False
858
913
 
 
914
 
 
915
class SmartTCPClientMedium(SmartClientSocketMedium):
 
916
    """A client medium that creates a TCP connection."""
 
917
 
 
918
    def __init__(self, host, port, base):
 
919
        """Creates a client that will connect on the first use."""
 
920
        SmartClientSocketMedium.__init__(self, base)
 
921
        self._host = host
 
922
        self._port = port
 
923
 
859
924
    def _ensure_connection(self):
860
925
        """Connect this medium if not already connected."""
861
926
        if self._connected:
895
960
                    (self._host, port, err_msg))
896
961
        self._connected = True
897
962
 
898
 
    def _flush(self):
899
 
        """See SmartClientStreamMedium._flush().
900
 
 
901
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
 
        add a means to do a flush, but that can be done in the future.
903
 
        """
904
 
 
905
 
    def _read_bytes(self, count):
906
 
        """See SmartClientMedium.read_bytes."""
907
 
        if not self._connected:
908
 
            raise errors.MediumNotConnected(self)
909
 
        return osutils.read_bytes_from_socket(
910
 
            self._socket, self._report_activity)
 
963
 
 
964
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
965
    """A client medium for an already connected socket.
 
966
    
 
967
    Note that this class will assume it "owns" the socket, so it will close it
 
968
    when its disconnect method is called.
 
969
    """
 
970
 
 
971
    def __init__(self, base, sock):
 
972
        SmartClientSocketMedium.__init__(self, base)
 
973
        self._socket = sock
 
974
        self._connected = True
 
975
 
 
976
    def _ensure_connection(self):
 
977
        # Already connected, by definition!  So nothing to do.
 
978
        pass
911
979
 
912
980
 
913
981
class SmartClientStreamMediumRequest(SmartClientMediumRequest):