~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: INADA Naoki
  • Date: 2011-05-05 09:15:34 UTC
  • mto: (5830.3.3 i18n-msgfmt)
  • mto: This revision was merged to the branch mainline in revision 5873.
  • Revision ID: songofacandy@gmail.com-20110505091534-7sv835xpofwrmpt4
Add update-pot command to Makefile and tools/bzrgettext script that
extracts help text from bzr commands.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
 
import socket
30
28
import sys
31
29
import urllib
32
30
 
 
31
import bzrlib
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
35
 
import atexit
 
34
import socket
36
35
import thread
37
36
import weakref
38
37
 
39
38
from bzrlib import (
40
39
    debug,
41
40
    errors,
42
 
    symbol_versioning,
43
41
    trace,
44
42
    ui,
45
43
    urlutils,
47
45
from bzrlib.smart import client, protocol, request, vfs
48
46
from bzrlib.transport import ssh
49
47
""")
50
 
#usually already imported, and getting IllegalScoperReplacer on it here.
51
48
from bzrlib import osutils
52
49
 
53
 
# We must not read any more than 64k at a time so we don't risk "no buffer
54
 
# space available" errors on some platforms.  Windows in particular is likely
55
 
# to give error 10053 or 10055 if we read more than 64k from a socket.
56
 
_MAX_READ_SIZE = 64 * 1024
57
 
 
 
50
# Throughout this module buffer size parameters are either limited to be at
 
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
 
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
 
53
# from non-sockets as well.
 
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
58
55
 
59
56
def _get_protocol_factory_for_bytes(bytes):
60
57
    """Determine the right protocol factory for 'bytes'.
276
273
    def _serve_one_request_unguarded(self, protocol):
277
274
        while protocol.next_read_size():
278
275
            # We can safely try to read large chunks.  If there is less data
279
 
            # than _MAX_READ_SIZE ready, the socket wil just return a short
280
 
            # read immediately rather than block.
281
 
            bytes = self.read_bytes(_MAX_READ_SIZE)
 
276
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
 
277
            # short read immediately rather than block.
 
278
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
282
279
            if bytes == '':
283
280
                self.finished = True
284
281
                return
287
284
        self._push_back(protocol.unused_data)
288
285
 
289
286
    def _read_bytes(self, desired_count):
290
 
        return _read_bytes_from_socket(
291
 
            self.socket.recv, desired_count, self._report_activity)
 
287
        return osutils.read_bytes_from_socket(
 
288
            self.socket, self._report_activity)
292
289
 
293
290
    def terminate_due_to_error(self):
294
291
        # TODO: This should log to a server log file, but no such thing
295
292
        # exists yet.  Andrew Bennetts 2006-09-29.
296
 
        osutils.until_no_eintr(self.socket.close)
 
293
        self.socket.close()
297
294
        self.finished = True
298
295
 
299
296
    def _write_out(self, bytes):
334
331
            bytes_to_read = protocol.next_read_size()
335
332
            if bytes_to_read == 0:
336
333
                # Finished serving this request.
337
 
                osutils.until_no_eintr(self._out.flush)
 
334
                self._out.flush()
338
335
                return
339
336
            bytes = self.read_bytes(bytes_to_read)
340
337
            if bytes == '':
341
338
                # Connection has been closed.
342
339
                self.finished = True
343
 
                osutils.until_no_eintr(self._out.flush)
 
340
                self._out.flush()
344
341
                return
345
342
            protocol.accept_bytes(bytes)
346
343
 
347
344
    def _read_bytes(self, desired_count):
348
 
        return osutils.until_no_eintr(self._in.read, desired_count)
 
345
        return self._in.read(desired_count)
349
346
 
350
347
    def terminate_due_to_error(self):
351
348
        # TODO: This should log to a server log file, but no such thing
352
349
        # exists yet.  Andrew Bennetts 2006-09-29.
353
 
        osutils.until_no_eintr(self._out.close)
 
350
        self._out.close()
354
351
        self.finished = True
355
352
 
356
353
    def _write_out(self, bytes):
357
 
        osutils.until_no_eintr(self._out.write, bytes)
 
354
        self._out.write(bytes)
358
355
 
359
356
 
360
357
class SmartClientMediumRequest(object):
496
493
class _DebugCounter(object):
497
494
    """An object that counts the HPSS calls made to each client medium.
498
495
 
499
 
    When a medium is garbage-collected, or failing that when atexit functions
500
 
    are run, the total number of calls made on that medium are reported via
501
 
    trace.note.
 
496
    When a medium is garbage-collected, or failing that when
 
497
    bzrlib.global_state exits, the total number of calls made on that medium
 
498
    are reported via trace.note.
502
499
    """
503
500
 
504
501
    def __init__(self):
505
502
        self.counts = weakref.WeakKeyDictionary()
506
503
        client._SmartClient.hooks.install_named_hook(
507
504
            'call', self.increment_call_count, 'hpss call counter')
508
 
        atexit.register(self.flush_all)
 
505
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
509
506
 
510
507
    def track(self, medium):
511
508
        """Start tracking calls made to a medium.
609
606
            # which is newer than a previously supplied older-than version.
610
607
            # This indicates that some smart verb call is not guarded
611
608
            # appropriately (it should simply not have been tried).
612
 
            raise AssertionError(
 
609
            trace.mutter(
613
610
                "_remember_remote_is_before(%r) called, but "
614
611
                "_remember_remote_is_before(%r) was called previously."
615
 
                % (version_tuple, self._remote_version_is_before))
 
612
                , version_tuple, self._remote_version_is_before)
 
613
            if 'hpss' in debug.debug_flags:
 
614
                ui.ui_factory.show_warning(
 
615
                    "_remember_remote_is_before(%r) called, but "
 
616
                    "_remember_remote_is_before(%r) was called previously."
 
617
                    % (version_tuple, self._remote_version_is_before))
 
618
            return
616
619
        self._remote_version_is_before = version_tuple
617
620
 
618
621
    def protocol_version(self):
720
723
 
721
724
    def _accept_bytes(self, bytes):
722
725
        """See SmartClientStreamMedium.accept_bytes."""
723
 
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
 
726
        self._writeable_pipe.write(bytes)
724
727
        self._report_activity(len(bytes), 'write')
725
728
 
726
729
    def _flush(self):
727
730
        """See SmartClientStreamMedium._flush()."""
728
 
        osutils.until_no_eintr(self._writeable_pipe.flush)
 
731
        self._writeable_pipe.flush()
729
732
 
730
733
    def _read_bytes(self, count):
731
734
        """See SmartClientStreamMedium._read_bytes."""
732
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
735
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
736
        bytes = self._readable_pipe.read(bytes_to_read)
733
737
        self._report_activity(len(bytes), 'read')
734
738
        return bytes
735
739
 
736
740
 
 
741
class SSHParams(object):
 
742
    """A set of parameters for starting a remote bzr via SSH."""
 
743
 
 
744
    def __init__(self, host, port=None, username=None, password=None,
 
745
            bzr_remote_path='bzr'):
 
746
        self.host = host
 
747
        self.port = port
 
748
        self.username = username
 
749
        self.password = password
 
750
        self.bzr_remote_path = bzr_remote_path
 
751
 
 
752
 
737
753
class SmartSSHClientMedium(SmartClientStreamMedium):
738
 
    """A client medium using SSH."""
 
754
    """A client medium using SSH.
 
755
    
 
756
    It delegates IO to a SmartClientSocketMedium or
 
757
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
758
    """
739
759
 
740
 
    def __init__(self, host, port=None, username=None, password=None,
741
 
            base=None, vendor=None, bzr_remote_path=None):
 
760
    def __init__(self, base, ssh_params, vendor=None):
742
761
        """Creates a client that will connect on the first use.
743
762
 
 
763
        :param ssh_params: A SSHParams instance.
744
764
        :param vendor: An optional override for the ssh vendor to use. See
745
765
            bzrlib.transport.ssh for details on ssh vendors.
746
766
        """
747
 
        self._connected = False
748
 
        self._host = host
749
 
        self._password = password
750
 
        self._port = port
751
 
        self._username = username
 
767
        self._real_medium = None
 
768
        self._ssh_params = ssh_params
752
769
        # for the benefit of progress making a short description of this
753
770
        # transport
754
771
        self._scheme = 'bzr+ssh'
756
773
        # _DebugCounter so we have to store all the values used in our repr
757
774
        # method before calling the super init.
758
775
        SmartClientStreamMedium.__init__(self, base)
759
 
        self._read_from = None
 
776
        self._vendor = vendor
760
777
        self._ssh_connection = None
761
 
        self._vendor = vendor
762
 
        self._write_to = None
763
 
        self._bzr_remote_path = bzr_remote_path
764
778
 
765
779
    def __repr__(self):
766
 
        if self._port is None:
 
780
        if self._ssh_params.port is None:
767
781
            maybe_port = ''
768
782
        else:
769
 
            maybe_port = ':%s' % self._port
 
783
            maybe_port = ':%s' % self._ssh_params.port
770
784
        return "%s(%s://%s@%s%s/)" % (
771
785
            self.__class__.__name__,
772
786
            self._scheme,
773
 
            self._username,
774
 
            self._host,
 
787
            self._ssh_params.username,
 
788
            self._ssh_params.host,
775
789
            maybe_port)
776
790
 
777
791
    def _accept_bytes(self, bytes):
778
792
        """See SmartClientStreamMedium.accept_bytes."""
779
793
        self._ensure_connection()
780
 
        osutils.until_no_eintr(self._write_to.write, bytes)
781
 
        self._report_activity(len(bytes), 'write')
 
794
        self._real_medium.accept_bytes(bytes)
782
795
 
783
796
    def disconnect(self):
784
797
        """See SmartClientMedium.disconnect()."""
785
 
        if not self._connected:
786
 
            return
787
 
        osutils.until_no_eintr(self._read_from.close)
788
 
        osutils.until_no_eintr(self._write_to.close)
789
 
        self._ssh_connection.close()
790
 
        self._connected = False
 
798
        if self._real_medium is not None:
 
799
            self._real_medium.disconnect()
 
800
            self._real_medium = None
 
801
        if self._ssh_connection is not None:
 
802
            self._ssh_connection.close()
 
803
            self._ssh_connection = None
791
804
 
792
805
    def _ensure_connection(self):
793
806
        """Connect this medium if not already connected."""
794
 
        if self._connected:
 
807
        if self._real_medium is not None:
795
808
            return
796
809
        if self._vendor is None:
797
810
            vendor = ssh._get_ssh_vendor()
798
811
        else:
799
812
            vendor = self._vendor
800
 
        self._ssh_connection = vendor.connect_ssh(self._username,
801
 
                self._password, self._host, self._port,
802
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
813
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
814
                self._ssh_params.password, self._ssh_params.host,
 
815
                self._ssh_params.port,
 
816
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
803
817
                         '--directory=/', '--allow-writes'])
804
 
        self._read_from, self._write_to = \
805
 
            self._ssh_connection.get_filelike_channels()
806
 
        self._connected = True
 
818
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
819
        if io_kind == 'socket':
 
820
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
821
                self.base, io_object)
 
822
        elif io_kind == 'pipes':
 
823
            read_from, write_to = io_object
 
824
            self._real_medium = SmartSimplePipesClientMedium(
 
825
                read_from, write_to, self.base)
 
826
        else:
 
827
            raise AssertionError(
 
828
                "Unexpected io_kind %r from %r"
 
829
                % (io_kind, self._ssh_connection))
807
830
 
808
831
    def _flush(self):
809
832
        """See SmartClientStreamMedium._flush()."""
810
 
        self._write_to.flush()
 
833
        self._real_medium._flush()
811
834
 
812
835
    def _read_bytes(self, count):
813
836
        """See SmartClientStreamMedium.read_bytes."""
814
 
        if not self._connected:
 
837
        if self._real_medium is None:
815
838
            raise errors.MediumNotConnected(self)
816
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
817
 
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
 
        self._report_activity(len(bytes), 'read')
819
 
        return bytes
 
839
        return self._real_medium.read_bytes(count)
820
840
 
821
841
 
822
842
# Port 4155 is the default port for bzr://, registered with IANA.
824
844
BZR_DEFAULT_PORT = 4155
825
845
 
826
846
 
827
 
class SmartTCPClientMedium(SmartClientStreamMedium):
828
 
    """A client medium using TCP."""
 
847
class SmartClientSocketMedium(SmartClientStreamMedium):
 
848
    """A client medium using a socket.
 
849
    
 
850
    This class isn't usable directly.  Use one of its subclasses instead.
 
851
    """
 
852
 
 
853
    def __init__(self, base):
 
854
        SmartClientStreamMedium.__init__(self, base)
 
855
        self._socket = None
 
856
        self._connected = False
 
857
 
 
858
    def _accept_bytes(self, bytes):
 
859
        """See SmartClientMedium.accept_bytes."""
 
860
        self._ensure_connection()
 
861
        osutils.send_all(self._socket, bytes, self._report_activity)
 
862
 
 
863
    def _ensure_connection(self):
 
864
        """Connect this medium if not already connected."""
 
865
        raise NotImplementedError(self._ensure_connection)
 
866
 
 
867
    def _flush(self):
 
868
        """See SmartClientStreamMedium._flush().
 
869
 
 
870
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
871
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
872
        future.
 
873
        """
 
874
 
 
875
    def _read_bytes(self, count):
 
876
        """See SmartClientMedium.read_bytes."""
 
877
        if not self._connected:
 
878
            raise errors.MediumNotConnected(self)
 
879
        return osutils.read_bytes_from_socket(
 
880
            self._socket, self._report_activity)
 
881
 
 
882
    def disconnect(self):
 
883
        """See SmartClientMedium.disconnect()."""
 
884
        if not self._connected:
 
885
            return
 
886
        self._socket.close()
 
887
        self._socket = None
 
888
        self._connected = False
 
889
 
 
890
 
 
891
class SmartTCPClientMedium(SmartClientSocketMedium):
 
892
    """A client medium that creates a TCP connection."""
829
893
 
830
894
    def __init__(self, host, port, base):
831
895
        """Creates a client that will connect on the first use."""
832
 
        SmartClientStreamMedium.__init__(self, base)
833
 
        self._connected = False
 
896
        SmartClientSocketMedium.__init__(self, base)
834
897
        self._host = host
835
898
        self._port = port
836
 
        self._socket = None
837
 
 
838
 
    def _accept_bytes(self, bytes):
839
 
        """See SmartClientMedium.accept_bytes."""
840
 
        self._ensure_connection()
841
 
        osutils.send_all(self._socket, bytes, self._report_activity)
842
 
 
843
 
    def disconnect(self):
844
 
        """See SmartClientMedium.disconnect()."""
845
 
        if not self._connected:
846
 
            return
847
 
        osutils.until_no_eintr(self._socket.close)
848
 
        self._socket = None
849
 
        self._connected = False
850
899
 
851
900
    def _ensure_connection(self):
852
901
        """Connect this medium if not already connected."""
887
936
                    (self._host, port, err_msg))
888
937
        self._connected = True
889
938
 
890
 
    def _flush(self):
891
 
        """See SmartClientStreamMedium._flush().
892
 
 
893
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
894
 
        add a means to do a flush, but that can be done in the future.
895
 
        """
896
 
 
897
 
    def _read_bytes(self, count):
898
 
        """See SmartClientMedium.read_bytes."""
899
 
        if not self._connected:
900
 
            raise errors.MediumNotConnected(self)
901
 
        return _read_bytes_from_socket(
902
 
            self._socket.recv, count, self._report_activity)
 
939
 
 
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
941
    """A client medium for an already connected socket.
 
942
    
 
943
    Note that this class will assume it "owns" the socket, so it will close it
 
944
    when its disconnect method is called.
 
945
    """
 
946
 
 
947
    def __init__(self, base, sock):
 
948
        SmartClientSocketMedium.__init__(self, base)
 
949
        self._socket = sock
 
950
        self._connected = True
 
951
 
 
952
    def _ensure_connection(self):
 
953
        # Already connected, by definition!  So nothing to do.
 
954
        pass
903
955
 
904
956
 
905
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
942
994
        self._medium._flush()
943
995
 
944
996
 
945
 
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
 
    # We ignore the desired_count because on sockets it's more efficient to
947
 
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
948
 
    try:
949
 
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
 
    except socket.error, e:
951
 
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
 
            # The connection was closed by the other side.  Callers expect an
953
 
            # empty string to signal end-of-stream.
954
 
            bytes = ''
955
 
        else:
956
 
            raise
957
 
    else:
958
 
        report_activity(len(bytes), 'read')
959
 
    return bytes
960