~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: INADA Naoki
  • Date: 2011-05-05 09:15:34 UTC
  • mto: (5830.3.3 i18n-msgfmt)
  • mto: This revision was merged to the branch mainline in revision 5873.
  • Revision ID: songofacandy@gmail.com-20110505091534-7sv835xpofwrmpt4
Add update-pot command to Makefile and tools/bzrgettext script that
extracts help text from bzr commands.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
 
import socket
30
28
import sys
31
29
import urllib
32
30
 
 
31
import bzrlib
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
35
 
import atexit
 
34
import socket
 
35
import thread
36
36
import weakref
 
37
 
37
38
from bzrlib import (
38
39
    debug,
39
40
    errors,
40
 
    osutils,
41
 
    symbol_versioning,
42
41
    trace,
43
42
    ui,
44
43
    urlutils,
45
44
    )
46
 
from bzrlib.smart import client, protocol
 
45
from bzrlib.smart import client, protocol, request, vfs
47
46
from bzrlib.transport import ssh
48
47
""")
49
 
 
50
 
 
51
 
# We must not read any more than 64k at a time so we don't risk "no buffer
52
 
# space available" errors on some platforms.  Windows in particular is likely
53
 
# to give error 10053 or 10055 if we read more than 64k from a socket.
54
 
_MAX_READ_SIZE = 64 * 1024
55
 
 
 
48
from bzrlib import osutils
 
49
 
 
50
# Throughout this module buffer size parameters are either limited to be at
 
51
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
 
52
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
 
53
# from non-sockets as well.
 
54
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
56
55
 
57
56
def _get_protocol_factory_for_bytes(bytes):
58
57
    """Determine the right protocol factory for 'bytes'.
274
273
    def _serve_one_request_unguarded(self, protocol):
275
274
        while protocol.next_read_size():
276
275
            # We can safely try to read large chunks.  If there is less data
277
 
            # than _MAX_READ_SIZE ready, the socket wil just return a short
278
 
            # read immediately rather than block.
279
 
            bytes = self.read_bytes(_MAX_READ_SIZE)
 
276
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
 
277
            # short read immediately rather than block.
 
278
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
280
279
            if bytes == '':
281
280
                self.finished = True
282
281
                return
285
284
        self._push_back(protocol.unused_data)
286
285
 
287
286
    def _read_bytes(self, desired_count):
288
 
        # We ignore the desired_count because on sockets it's more efficient to
289
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
 
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
 
        self._report_activity(len(bytes), 'read')
292
 
        return bytes
 
287
        return osutils.read_bytes_from_socket(
 
288
            self.socket, self._report_activity)
293
289
 
294
290
    def terminate_due_to_error(self):
295
291
        # TODO: This should log to a server log file, but no such thing
298
294
        self.finished = True
299
295
 
300
296
    def _write_out(self, bytes):
 
297
        tstart = osutils.timer_func()
301
298
        osutils.send_all(self.socket, bytes, self._report_activity)
 
299
        if 'hpss' in debug.debug_flags:
 
300
            thread_id = thread.get_ident()
 
301
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
302
                         % ('wrote', thread_id, len(bytes),
 
303
                            osutils.timer_func() - tstart))
302
304
 
303
305
 
304
306
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
381
383
    def accept_bytes(self, bytes):
382
384
        """Accept bytes for inclusion in this request.
383
385
 
384
 
        This method may not be be called after finished_writing() has been
 
386
        This method may not be called after finished_writing() has been
385
387
        called.  It depends upon the Medium whether or not the bytes will be
386
388
        immediately transmitted. Message based Mediums will tend to buffer the
387
389
        bytes until finished_writing() is called.
475
477
        if not line.endswith('\n'):
476
478
            # end of file encountered reading from server
477
479
            raise errors.ConnectionReset(
478
 
                "please check connectivity and permissions")
 
480
                "Unexpected end of message. Please check connectivity "
 
481
                "and permissions, and report a bug if problems persist.")
479
482
        return line
480
483
 
481
484
    def _read_line(self):
490
493
class _DebugCounter(object):
491
494
    """An object that counts the HPSS calls made to each client medium.
492
495
 
493
 
    When a medium is garbage-collected, or failing that when atexit functions
494
 
    are run, the total number of calls made on that medium are reported via
495
 
    trace.note.
 
496
    When a medium is garbage-collected, or failing that when
 
497
    bzrlib.global_state exits, the total number of calls made on that medium
 
498
    are reported via trace.note.
496
499
    """
497
500
 
498
501
    def __init__(self):
499
502
        self.counts = weakref.WeakKeyDictionary()
500
503
        client._SmartClient.hooks.install_named_hook(
501
504
            'call', self.increment_call_count, 'hpss call counter')
502
 
        atexit.register(self.flush_all)
 
505
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
503
506
 
504
507
    def track(self, medium):
505
508
        """Start tracking calls made to a medium.
509
512
        """
510
513
        medium_repr = repr(medium)
511
514
        # Add this medium to the WeakKeyDictionary
512
 
        self.counts[medium] = [0, medium_repr]
 
515
        self.counts[medium] = dict(count=0, vfs_count=0,
 
516
                                   medium_repr=medium_repr)
513
517
        # Weakref callbacks are fired in reverse order of their association
514
518
        # with the referenced object.  So we add a weakref *after* adding to
515
519
        # the WeakKeyDict so that we can report the value from it before the
519
523
    def increment_call_count(self, params):
520
524
        # Increment the count in the WeakKeyDictionary
521
525
        value = self.counts[params.medium]
522
 
        value[0] += 1
 
526
        value['count'] += 1
 
527
        try:
 
528
            request_method = request.request_handlers.get(params.method)
 
529
        except KeyError:
 
530
            # A method we don't know about doesn't count as a VFS method.
 
531
            return
 
532
        if issubclass(request_method, vfs.VfsRequest):
 
533
            value['vfs_count'] += 1
523
534
 
524
535
    def done(self, ref):
525
536
        value = self.counts[ref]
526
 
        count, medium_repr = value
 
537
        count, vfs_count, medium_repr = (
 
538
            value['count'], value['vfs_count'], value['medium_repr'])
527
539
        # In case this callback is invoked for the same ref twice (by the
528
540
        # weakref callback and by the atexit function), set the call count back
529
541
        # to 0 so this item won't be reported twice.
530
 
        value[0] = 0
 
542
        value['count'] = 0
 
543
        value['vfs_count'] = 0
531
544
        if count != 0:
532
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
 
545
            trace.note('HPSS calls: %d (%d vfs) %s',
 
546
                       count, vfs_count, medium_repr)
533
547
 
534
548
    def flush_all(self):
535
549
        for ref in list(self.counts.keys()):
592
606
            # which is newer than a previously supplied older-than version.
593
607
            # This indicates that some smart verb call is not guarded
594
608
            # appropriately (it should simply not have been tried).
595
 
            raise AssertionError(
 
609
            trace.mutter(
596
610
                "_remember_remote_is_before(%r) called, but "
597
611
                "_remember_remote_is_before(%r) was called previously."
598
 
                % (version_tuple, self._remote_version_is_before))
 
612
                , version_tuple, self._remote_version_is_before)
 
613
            if 'hpss' in debug.debug_flags:
 
614
                ui.ui_factory.show_warning(
 
615
                    "_remember_remote_is_before(%r) called, but "
 
616
                    "_remember_remote_is_before(%r) was called previously."
 
617
                    % (version_tuple, self._remote_version_is_before))
 
618
            return
599
619
        self._remote_version_is_before = version_tuple
600
620
 
601
621
    def protocol_version(self):
712
732
 
713
733
    def _read_bytes(self, count):
714
734
        """See SmartClientStreamMedium._read_bytes."""
715
 
        bytes = self._readable_pipe.read(count)
 
735
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
736
        bytes = self._readable_pipe.read(bytes_to_read)
716
737
        self._report_activity(len(bytes), 'read')
717
738
        return bytes
718
739
 
719
740
 
 
741
class SSHParams(object):
 
742
    """A set of parameters for starting a remote bzr via SSH."""
 
743
 
 
744
    def __init__(self, host, port=None, username=None, password=None,
 
745
            bzr_remote_path='bzr'):
 
746
        self.host = host
 
747
        self.port = port
 
748
        self.username = username
 
749
        self.password = password
 
750
        self.bzr_remote_path = bzr_remote_path
 
751
 
 
752
 
720
753
class SmartSSHClientMedium(SmartClientStreamMedium):
721
 
    """A client medium using SSH."""
 
754
    """A client medium using SSH.
 
755
    
 
756
    It delegates IO to a SmartClientSocketMedium or
 
757
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
758
    """
722
759
 
723
 
    def __init__(self, host, port=None, username=None, password=None,
724
 
            base=None, vendor=None, bzr_remote_path=None):
 
760
    def __init__(self, base, ssh_params, vendor=None):
725
761
        """Creates a client that will connect on the first use.
726
762
 
 
763
        :param ssh_params: A SSHParams instance.
727
764
        :param vendor: An optional override for the ssh vendor to use. See
728
765
            bzrlib.transport.ssh for details on ssh vendors.
729
766
        """
730
 
        self._connected = False
731
 
        self._host = host
732
 
        self._password = password
733
 
        self._port = port
734
 
        self._username = username
 
767
        self._real_medium = None
 
768
        self._ssh_params = ssh_params
 
769
        # for the benefit of progress making a short description of this
 
770
        # transport
 
771
        self._scheme = 'bzr+ssh'
735
772
        # SmartClientStreamMedium stores the repr of this object in its
736
773
        # _DebugCounter so we have to store all the values used in our repr
737
774
        # method before calling the super init.
738
775
        SmartClientStreamMedium.__init__(self, base)
739
 
        self._read_from = None
 
776
        self._vendor = vendor
740
777
        self._ssh_connection = None
741
 
        self._vendor = vendor
742
 
        self._write_to = None
743
 
        self._bzr_remote_path = bzr_remote_path
744
 
        # for the benefit of progress making a short description of this
745
 
        # transport
746
 
        self._scheme = 'bzr+ssh'
747
778
 
748
779
    def __repr__(self):
749
 
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
 
780
        if self._ssh_params.port is None:
 
781
            maybe_port = ''
 
782
        else:
 
783
            maybe_port = ':%s' % self._ssh_params.port
 
784
        return "%s(%s://%s@%s%s/)" % (
750
785
            self.__class__.__name__,
751
 
            self._connected,
752
 
            self._username,
753
 
            self._host,
754
 
            self._port)
 
786
            self._scheme,
 
787
            self._ssh_params.username,
 
788
            self._ssh_params.host,
 
789
            maybe_port)
755
790
 
756
791
    def _accept_bytes(self, bytes):
757
792
        """See SmartClientStreamMedium.accept_bytes."""
758
793
        self._ensure_connection()
759
 
        self._write_to.write(bytes)
760
 
        self._report_activity(len(bytes), 'write')
 
794
        self._real_medium.accept_bytes(bytes)
761
795
 
762
796
    def disconnect(self):
763
797
        """See SmartClientMedium.disconnect()."""
764
 
        if not self._connected:
765
 
            return
766
 
        self._read_from.close()
767
 
        self._write_to.close()
768
 
        self._ssh_connection.close()
769
 
        self._connected = False
 
798
        if self._real_medium is not None:
 
799
            self._real_medium.disconnect()
 
800
            self._real_medium = None
 
801
        if self._ssh_connection is not None:
 
802
            self._ssh_connection.close()
 
803
            self._ssh_connection = None
770
804
 
771
805
    def _ensure_connection(self):
772
806
        """Connect this medium if not already connected."""
773
 
        if self._connected:
 
807
        if self._real_medium is not None:
774
808
            return
775
809
        if self._vendor is None:
776
810
            vendor = ssh._get_ssh_vendor()
777
811
        else:
778
812
            vendor = self._vendor
779
 
        self._ssh_connection = vendor.connect_ssh(self._username,
780
 
                self._password, self._host, self._port,
781
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
813
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
814
                self._ssh_params.password, self._ssh_params.host,
 
815
                self._ssh_params.port,
 
816
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
782
817
                         '--directory=/', '--allow-writes'])
783
 
        self._read_from, self._write_to = \
784
 
            self._ssh_connection.get_filelike_channels()
785
 
        self._connected = True
 
818
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
819
        if io_kind == 'socket':
 
820
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
821
                self.base, io_object)
 
822
        elif io_kind == 'pipes':
 
823
            read_from, write_to = io_object
 
824
            self._real_medium = SmartSimplePipesClientMedium(
 
825
                read_from, write_to, self.base)
 
826
        else:
 
827
            raise AssertionError(
 
828
                "Unexpected io_kind %r from %r"
 
829
                % (io_kind, self._ssh_connection))
786
830
 
787
831
    def _flush(self):
788
832
        """See SmartClientStreamMedium._flush()."""
789
 
        self._write_to.flush()
 
833
        self._real_medium._flush()
790
834
 
791
835
    def _read_bytes(self, count):
792
836
        """See SmartClientStreamMedium.read_bytes."""
793
 
        if not self._connected:
 
837
        if self._real_medium is None:
794
838
            raise errors.MediumNotConnected(self)
795
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
796
 
        bytes = self._read_from.read(bytes_to_read)
797
 
        self._report_activity(len(bytes), 'read')
798
 
        return bytes
 
839
        return self._real_medium.read_bytes(count)
799
840
 
800
841
 
801
842
# Port 4155 is the default port for bzr://, registered with IANA.
803
844
BZR_DEFAULT_PORT = 4155
804
845
 
805
846
 
806
 
class SmartTCPClientMedium(SmartClientStreamMedium):
807
 
    """A client medium using TCP."""
 
847
class SmartClientSocketMedium(SmartClientStreamMedium):
 
848
    """A client medium using a socket.
 
849
    
 
850
    This class isn't usable directly.  Use one of its subclasses instead.
 
851
    """
808
852
 
809
 
    def __init__(self, host, port, base):
810
 
        """Creates a client that will connect on the first use."""
 
853
    def __init__(self, base):
811
854
        SmartClientStreamMedium.__init__(self, base)
 
855
        self._socket = None
812
856
        self._connected = False
813
 
        self._host = host
814
 
        self._port = port
815
 
        self._socket = None
816
857
 
817
858
    def _accept_bytes(self, bytes):
818
859
        """See SmartClientMedium.accept_bytes."""
819
860
        self._ensure_connection()
820
861
        osutils.send_all(self._socket, bytes, self._report_activity)
821
862
 
 
863
    def _ensure_connection(self):
 
864
        """Connect this medium if not already connected."""
 
865
        raise NotImplementedError(self._ensure_connection)
 
866
 
 
867
    def _flush(self):
 
868
        """See SmartClientStreamMedium._flush().
 
869
 
 
870
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
871
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
872
        future.
 
873
        """
 
874
 
 
875
    def _read_bytes(self, count):
 
876
        """See SmartClientMedium.read_bytes."""
 
877
        if not self._connected:
 
878
            raise errors.MediumNotConnected(self)
 
879
        return osutils.read_bytes_from_socket(
 
880
            self._socket, self._report_activity)
 
881
 
822
882
    def disconnect(self):
823
883
        """See SmartClientMedium.disconnect()."""
824
884
        if not self._connected:
827
887
        self._socket = None
828
888
        self._connected = False
829
889
 
 
890
 
 
891
class SmartTCPClientMedium(SmartClientSocketMedium):
 
892
    """A client medium that creates a TCP connection."""
 
893
 
 
894
    def __init__(self, host, port, base):
 
895
        """Creates a client that will connect on the first use."""
 
896
        SmartClientSocketMedium.__init__(self, base)
 
897
        self._host = host
 
898
        self._port = port
 
899
 
830
900
    def _ensure_connection(self):
831
901
        """Connect this medium if not already connected."""
832
902
        if self._connected:
866
936
                    (self._host, port, err_msg))
867
937
        self._connected = True
868
938
 
869
 
    def _flush(self):
870
 
        """See SmartClientStreamMedium._flush().
871
 
 
872
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
873
 
        add a means to do a flush, but that can be done in the future.
874
 
        """
875
 
 
876
 
    def _read_bytes(self, count):
877
 
        """See SmartClientMedium.read_bytes."""
878
 
        if not self._connected:
879
 
            raise errors.MediumNotConnected(self)
880
 
        # We ignore the desired_count because on sockets it's more efficient to
881
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
882
 
        try:
883
 
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
 
        except socket.error, e:
885
 
            if len(e.args) and e.args[0] == errno.ECONNRESET:
886
 
                # Callers expect an empty string in that case
887
 
                return ''
888
 
            else:
889
 
                raise
890
 
        else:
891
 
            self._report_activity(len(bytes), 'read')
892
 
            return bytes
 
939
 
 
940
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
941
    """A client medium for an already connected socket.
 
942
    
 
943
    Note that this class will assume it "owns" the socket, so it will close it
 
944
    when its disconnect method is called.
 
945
    """
 
946
 
 
947
    def __init__(self, base, sock):
 
948
        SmartClientSocketMedium.__init__(self, base)
 
949
        self._socket = sock
 
950
        self._connected = True
 
951
 
 
952
    def _ensure_connection(self):
 
953
        # Already connected, by definition!  So nothing to do.
 
954
        pass
893
955
 
894
956
 
895
957
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
931
993
        """
932
994
        self._medium._flush()
933
995
 
 
996