~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: INADA Naoki
  • Date: 2011-05-18 06:01:08 UTC
  • mto: This revision was merged to the branch mainline in revision 5894.
  • Revision ID: songofacandy@gmail.com-20110518060108-86t2kffcrzu0nf6i
Update Japanese docs.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
 
import socket
30
28
import sys
31
29
import urllib
32
30
 
 
31
import bzrlib
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
35
 
import atexit
 
34
import socket
 
35
import thread
36
36
import weakref
 
37
 
37
38
from bzrlib import (
38
39
    debug,
39
40
    errors,
40
 
    symbol_versioning,
41
41
    trace,
42
42
    ui,
43
43
    urlutils,
45
45
from bzrlib.smart import client, protocol, request, vfs
46
46
from bzrlib.transport import ssh
47
47
""")
48
 
#usually already imported, and getting IllegalScoperReplacer on it here.
49
48
from bzrlib import osutils
50
49
 
51
 
# We must not read any more than 64k at a time so we don't risk "no buffer
52
 
# space available" errors on some platforms.  Windows in particular is likely
53
 
# to give error 10053 or 10055 if we read more than 64k from a socket.
54
 
_MAX_READ_SIZE = 64 * 1024
55
 
 
 
50
# Throughout this module buffer size parameters are either limited to be at
 
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
 
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
 
53
# from non-sockets as well.
 
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
56
55
 
57
56
def _get_protocol_factory_for_bytes(bytes):
58
57
    """Determine the right protocol factory for 'bytes'.
274
273
    def _serve_one_request_unguarded(self, protocol):
275
274
        while protocol.next_read_size():
276
275
            # We can safely try to read large chunks.  If there is less data
277
 
            # than _MAX_READ_SIZE ready, the socket wil just return a short
278
 
            # read immediately rather than block.
279
 
            bytes = self.read_bytes(_MAX_READ_SIZE)
 
276
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
 
277
            # short read immediately rather than block.
 
278
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
280
279
            if bytes == '':
281
280
                self.finished = True
282
281
                return
285
284
        self._push_back(protocol.unused_data)
286
285
 
287
286
    def _read_bytes(self, desired_count):
288
 
        return _read_bytes_from_socket(
289
 
            self.socket.recv, desired_count, self._report_activity)
 
287
        return osutils.read_bytes_from_socket(
 
288
            self.socket, self._report_activity)
290
289
 
291
290
    def terminate_due_to_error(self):
292
291
        # TODO: This should log to a server log file, but no such thing
295
294
        self.finished = True
296
295
 
297
296
    def _write_out(self, bytes):
 
297
        tstart = osutils.timer_func()
298
298
        osutils.send_all(self.socket, bytes, self._report_activity)
 
299
        if 'hpss' in debug.debug_flags:
 
300
            thread_id = thread.get_ident()
 
301
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
302
                         % ('wrote', thread_id, len(bytes),
 
303
                            osutils.timer_func() - tstart))
299
304
 
300
305
 
301
306
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
488
493
class _DebugCounter(object):
489
494
    """An object that counts the HPSS calls made to each client medium.
490
495
 
491
 
    When a medium is garbage-collected, or failing that when atexit functions
492
 
    are run, the total number of calls made on that medium are reported via
493
 
    trace.note.
 
496
    When a medium is garbage-collected, or failing that when
 
497
    bzrlib.global_state exits, the total number of calls made on that medium
 
498
    are reported via trace.note.
494
499
    """
495
500
 
496
501
    def __init__(self):
497
502
        self.counts = weakref.WeakKeyDictionary()
498
503
        client._SmartClient.hooks.install_named_hook(
499
504
            'call', self.increment_call_count, 'hpss call counter')
500
 
        atexit.register(self.flush_all)
 
505
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
501
506
 
502
507
    def track(self, medium):
503
508
        """Start tracking calls made to a medium.
601
606
            # which is newer than a previously supplied older-than version.
602
607
            # This indicates that some smart verb call is not guarded
603
608
            # appropriately (it should simply not have been tried).
604
 
            raise AssertionError(
 
609
            trace.mutter(
605
610
                "_remember_remote_is_before(%r) called, but "
606
611
                "_remember_remote_is_before(%r) was called previously."
607
 
                % (version_tuple, self._remote_version_is_before))
 
612
                , version_tuple, self._remote_version_is_before)
 
613
            if 'hpss' in debug.debug_flags:
 
614
                ui.ui_factory.show_warning(
 
615
                    "_remember_remote_is_before(%r) called, but "
 
616
                    "_remember_remote_is_before(%r) was called previously."
 
617
                    % (version_tuple, self._remote_version_is_before))
 
618
            return
608
619
        self._remote_version_is_before = version_tuple
609
620
 
610
621
    def protocol_version(self):
721
732
 
722
733
    def _read_bytes(self, count):
723
734
        """See SmartClientStreamMedium._read_bytes."""
724
 
        bytes = self._readable_pipe.read(count)
 
735
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
736
        bytes = self._readable_pipe.read(bytes_to_read)
725
737
        self._report_activity(len(bytes), 'read')
726
738
        return bytes
727
739
 
728
740
 
 
741
class SSHParams(object):
 
742
    """A set of parameters for starting a remote bzr via SSH."""
 
743
 
 
744
    def __init__(self, host, port=None, username=None, password=None,
 
745
            bzr_remote_path='bzr'):
 
746
        self.host = host
 
747
        self.port = port
 
748
        self.username = username
 
749
        self.password = password
 
750
        self.bzr_remote_path = bzr_remote_path
 
751
 
 
752
 
729
753
class SmartSSHClientMedium(SmartClientStreamMedium):
730
 
    """A client medium using SSH."""
 
754
    """A client medium using SSH.
 
755
    
 
756
    It delegates IO to a SmartClientSocketMedium or
 
757
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
758
    """
731
759
 
732
 
    def __init__(self, host, port=None, username=None, password=None,
733
 
            base=None, vendor=None, bzr_remote_path=None):
 
760
    def __init__(self, base, ssh_params, vendor=None):
734
761
        """Creates a client that will connect on the first use.
735
762
 
 
763
        :param ssh_params: A SSHParams instance.
736
764
        :param vendor: An optional override for the ssh vendor to use. See
737
765
            bzrlib.transport.ssh for details on ssh vendors.
738
766
        """
739
 
        self._connected = False
740
 
        self._host = host
741
 
        self._password = password
742
 
        self._port = port
743
 
        self._username = username
 
767
        self._real_medium = None
 
768
        self._ssh_params = ssh_params
 
769
        # for the benefit of progress making a short description of this
 
770
        # transport
 
771
        self._scheme = 'bzr+ssh'
744
772
        # SmartClientStreamMedium stores the repr of this object in its
745
773
        # _DebugCounter so we have to store all the values used in our repr
746
774
        # method before calling the super init.
747
775
        SmartClientStreamMedium.__init__(self, base)
748
 
        self._read_from = None
 
776
        self._vendor = vendor
749
777
        self._ssh_connection = None
750
 
        self._vendor = vendor
751
 
        self._write_to = None
752
 
        self._bzr_remote_path = bzr_remote_path
753
 
        # for the benefit of progress making a short description of this
754
 
        # transport
755
 
        self._scheme = 'bzr+ssh'
756
778
 
757
779
    def __repr__(self):
758
 
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
 
780
        if self._ssh_params.port is None:
 
781
            maybe_port = ''
 
782
        else:
 
783
            maybe_port = ':%s' % self._ssh_params.port
 
784
        return "%s(%s://%s@%s%s/)" % (
759
785
            self.__class__.__name__,
760
 
            self._connected,
761
 
            self._username,
762
 
            self._host,
763
 
            self._port)
 
786
            self._scheme,
 
787
            self._ssh_params.username,
 
788
            self._ssh_params.host,
 
789
            maybe_port)
764
790
 
765
791
    def _accept_bytes(self, bytes):
766
792
        """See SmartClientStreamMedium.accept_bytes."""
767
793
        self._ensure_connection()
768
 
        self._write_to.write(bytes)
769
 
        self._report_activity(len(bytes), 'write')
 
794
        self._real_medium.accept_bytes(bytes)
770
795
 
771
796
    def disconnect(self):
772
797
        """See SmartClientMedium.disconnect()."""
773
 
        if not self._connected:
774
 
            return
775
 
        self._read_from.close()
776
 
        self._write_to.close()
777
 
        self._ssh_connection.close()
778
 
        self._connected = False
 
798
        if self._real_medium is not None:
 
799
            self._real_medium.disconnect()
 
800
            self._real_medium = None
 
801
        if self._ssh_connection is not None:
 
802
            self._ssh_connection.close()
 
803
            self._ssh_connection = None
779
804
 
780
805
    def _ensure_connection(self):
781
806
        """Connect this medium if not already connected."""
782
 
        if self._connected:
 
807
        if self._real_medium is not None:
783
808
            return
784
809
        if self._vendor is None:
785
810
            vendor = ssh._get_ssh_vendor()
786
811
        else:
787
812
            vendor = self._vendor
788
 
        self._ssh_connection = vendor.connect_ssh(self._username,
789
 
                self._password, self._host, self._port,
790
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
813
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
814
                self._ssh_params.password, self._ssh_params.host,
 
815
                self._ssh_params.port,
 
816
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
791
817
                         '--directory=/', '--allow-writes'])
792
 
        self._read_from, self._write_to = \
793
 
            self._ssh_connection.get_filelike_channels()
794
 
        self._connected = True
 
818
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
819
        if io_kind == 'socket':
 
820
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
821
                self.base, io_object)
 
822
        elif io_kind == 'pipes':
 
823
            read_from, write_to = io_object
 
824
            self._real_medium = SmartSimplePipesClientMedium(
 
825
                read_from, write_to, self.base)
 
826
        else:
 
827
            raise AssertionError(
 
828
                "Unexpected io_kind %r from %r"
 
829
                % (io_kind, self._ssh_connection))
795
830
 
796
831
    def _flush(self):
797
832
        """See SmartClientStreamMedium._flush()."""
798
 
        self._write_to.flush()
 
833
        self._real_medium._flush()
799
834
 
800
835
    def _read_bytes(self, count):
801
836
        """See SmartClientStreamMedium.read_bytes."""
802
 
        if not self._connected:
 
837
        if self._real_medium is None:
803
838
            raise errors.MediumNotConnected(self)
804
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
805
 
        bytes = self._read_from.read(bytes_to_read)
806
 
        self._report_activity(len(bytes), 'read')
807
 
        return bytes
 
839
        return self._real_medium.read_bytes(count)
808
840
 
809
841
 
810
842
# Port 4155 is the default port for bzr://, registered with IANA.
812
844
BZR_DEFAULT_PORT = 4155
813
845
 
814
846
 
815
 
class SmartTCPClientMedium(SmartClientStreamMedium):
816
 
    """A client medium using TCP."""
 
847
class SmartClientSocketMedium(SmartClientStreamMedium):
 
848
    """A client medium using a socket.
 
849
    
 
850
    This class isn't usable directly.  Use one of its subclasses instead.
 
851
    """
817
852
 
818
 
    def __init__(self, host, port, base):
819
 
        """Creates a client that will connect on the first use."""
 
853
    def __init__(self, base):
820
854
        SmartClientStreamMedium.__init__(self, base)
 
855
        self._socket = None
821
856
        self._connected = False
822
 
        self._host = host
823
 
        self._port = port
824
 
        self._socket = None
825
857
 
826
858
    def _accept_bytes(self, bytes):
827
859
        """See SmartClientMedium.accept_bytes."""
828
860
        self._ensure_connection()
829
861
        osutils.send_all(self._socket, bytes, self._report_activity)
830
862
 
 
863
    def _ensure_connection(self):
 
864
        """Connect this medium if not already connected."""
 
865
        raise NotImplementedError(self._ensure_connection)
 
866
 
 
867
    def _flush(self):
 
868
        """See SmartClientStreamMedium._flush().
 
869
 
 
870
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
871
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
872
        future.
 
873
        """
 
874
 
 
875
    def _read_bytes(self, count):
 
876
        """See SmartClientMedium.read_bytes."""
 
877
        if not self._connected:
 
878
            raise errors.MediumNotConnected(self)
 
879
        return osutils.read_bytes_from_socket(
 
880
            self._socket, self._report_activity)
 
881
 
831
882
    def disconnect(self):
832
883
        """See SmartClientMedium.disconnect()."""
833
884
        if not self._connected:
836
887
        self._socket = None
837
888
        self._connected = False
838
889
 
 
890
 
 
891
class SmartTCPClientMedium(SmartClientSocketMedium):
 
892
    """A client medium that creates a TCP connection."""
 
893
 
 
894
    def __init__(self, host, port, base):
 
895
        """Creates a client that will connect on the first use."""
 
896
        SmartClientSocketMedium.__init__(self, base)
 
897
        self._host = host
 
898
        self._port = port
 
899
 
839
900
    def _ensure_connection(self):
840
901
        """Connect this medium if not already connected."""
841
902
        if self._connected:
875
936
                    (self._host, port, err_msg))
876
937
        self._connected = True
877
938
 
878
 
    def _flush(self):
879
 
        """See SmartClientStreamMedium._flush().
880
 
 
881
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
882
 
        add a means to do a flush, but that can be done in the future.
883
 
        """
884
 
 
885
 
    def _read_bytes(self, count):
886
 
        """See SmartClientMedium.read_bytes."""
887
 
        if not self._connected:
888
 
            raise errors.MediumNotConnected(self)
889
 
        return _read_bytes_from_socket(
890
 
            self._socket.recv, count, self._report_activity)
 
939
 
 
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
941
    """A client medium for an already connected socket.
 
942
    
 
943
    Note that this class will assume it "owns" the socket, so it will close it
 
944
    when its disconnect method is called.
 
945
    """
 
946
 
 
947
    def __init__(self, base, sock):
 
948
        SmartClientSocketMedium.__init__(self, base)
 
949
        self._socket = sock
 
950
        self._connected = True
 
951
 
 
952
    def _ensure_connection(self):
 
953
        # Already connected, by definition!  So nothing to do.
 
954
        pass
891
955
 
892
956
 
893
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
930
994
        self._medium._flush()
931
995
 
932
996
 
933
 
def _read_bytes_from_socket(sock, desired_count, report_activity):
934
 
    # We ignore the desired_count because on sockets it's more efficient to
935
 
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
936
 
    try:
937
 
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
938
 
    except socket.error, e:
939
 
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
940
 
            # The connection was closed by the other side.  Callers expect an
941
 
            # empty string to signal end-of-stream.
942
 
            bytes = ''
943
 
        else:
944
 
            raise
945
 
    else:
946
 
        report_activity(len(bytes), 'read')
947
 
    return bytes
948