~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-17 07:05:37 UTC
  • mfrom: (4152.1.2 branch.stacked.streams)
  • Revision ID: pqm@pqm.ubuntu.com-20090317070537-zaud24vjs2szna87
(robertc) Add client-side streaming from stacked branches (over
        bzr:// protocols) when the sort order is compatible with doing
        that. (Robert Collins, Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
import errno
27
28
import os
 
29
import socket
28
30
import sys
29
31
import urllib
30
32
 
31
 
import bzrlib
32
33
from bzrlib.lazy_import import lazy_import
33
34
lazy_import(globals(), """
34
 
import socket
35
 
import thread
 
35
import atexit
36
36
import weakref
37
 
 
38
37
from bzrlib import (
39
38
    debug,
40
39
    errors,
 
40
    osutils,
41
41
    symbol_versioning,
42
42
    trace,
43
43
    ui,
44
44
    urlutils,
45
45
    )
46
 
from bzrlib.smart import client, protocol, request, vfs
 
46
from bzrlib.smart import client, protocol
47
47
from bzrlib.transport import ssh
48
48
""")
49
 
from bzrlib import osutils
50
 
 
51
 
# Throughout this module buffer size parameters are either limited to be at
52
 
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
53
 
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
54
 
# from non-sockets as well.
55
 
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
 
49
 
 
50
 
 
51
# We must not read any more than 64k at a time so we don't risk "no buffer
 
52
# space available" errors on some platforms.  Windows in particular is likely
 
53
# to give error 10053 or 10055 if we read more than 64k from a socket.
 
54
_MAX_READ_SIZE = 64 * 1024
 
55
 
56
56
 
57
57
def _get_protocol_factory_for_bytes(bytes):
58
58
    """Determine the right protocol factory for 'bytes'.
274
274
    def _serve_one_request_unguarded(self, protocol):
275
275
        while protocol.next_read_size():
276
276
            # We can safely try to read large chunks.  If there is less data
277
 
            # than MAX_SOCKET_CHUNK ready, the socket will just return a
278
 
            # short read immediately rather than block.
279
 
            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
 
277
            # than _MAX_READ_SIZE ready, the socket wil just return a short
 
278
            # read immediately rather than block.
 
279
            bytes = self.read_bytes(_MAX_READ_SIZE)
280
280
            if bytes == '':
281
281
                self.finished = True
282
282
                return
285
285
        self._push_back(protocol.unused_data)
286
286
 
287
287
    def _read_bytes(self, desired_count):
288
 
        return osutils.read_bytes_from_socket(
289
 
            self.socket, self._report_activity)
 
288
        # We ignore the desired_count because on sockets it's more efficient to
 
289
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
290
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
291
        self._report_activity(len(bytes), 'read')
 
292
        return bytes
290
293
 
291
294
    def terminate_due_to_error(self):
292
295
        # TODO: This should log to a server log file, but no such thing
295
298
        self.finished = True
296
299
 
297
300
    def _write_out(self, bytes):
298
 
        tstart = osutils.timer_func()
299
301
        osutils.send_all(self.socket, bytes, self._report_activity)
300
 
        if 'hpss' in debug.debug_flags:
301
 
            thread_id = thread.get_ident()
302
 
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
303
 
                         % ('wrote', thread_id, len(bytes),
304
 
                            osutils.timer_func() - tstart))
305
302
 
306
303
 
307
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
384
381
    def accept_bytes(self, bytes):
385
382
        """Accept bytes for inclusion in this request.
386
383
 
387
 
        This method may not be called after finished_writing() has been
 
384
        This method may not be be called after finished_writing() has been
388
385
        called.  It depends upon the Medium whether or not the bytes will be
389
386
        immediately transmitted. Message based Mediums will tend to buffer the
390
387
        bytes until finished_writing() is called.
478
475
        if not line.endswith('\n'):
479
476
            # end of file encountered reading from server
480
477
            raise errors.ConnectionReset(
481
 
                "Unexpected end of message. Please check connectivity "
482
 
                "and permissions, and report a bug if problems persist.")
 
478
                "please check connectivity and permissions")
483
479
        return line
484
480
 
485
481
    def _read_line(self):
494
490
class _DebugCounter(object):
495
491
    """An object that counts the HPSS calls made to each client medium.
496
492
 
497
 
    When a medium is garbage-collected, or failing that when
498
 
    bzrlib.global_state exits, the total number of calls made on that medium
499
 
    are reported via trace.note.
 
493
    When a medium is garbage-collected, or failing that when atexit functions
 
494
    are run, the total number of calls made on that medium are reported via
 
495
    trace.note.
500
496
    """
501
497
 
502
498
    def __init__(self):
503
499
        self.counts = weakref.WeakKeyDictionary()
504
500
        client._SmartClient.hooks.install_named_hook(
505
501
            'call', self.increment_call_count, 'hpss call counter')
506
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
502
        atexit.register(self.flush_all)
507
503
 
508
504
    def track(self, medium):
509
505
        """Start tracking calls made to a medium.
513
509
        """
514
510
        medium_repr = repr(medium)
515
511
        # Add this medium to the WeakKeyDictionary
516
 
        self.counts[medium] = dict(count=0, vfs_count=0,
517
 
                                   medium_repr=medium_repr)
 
512
        self.counts[medium] = [0, medium_repr]
518
513
        # Weakref callbacks are fired in reverse order of their association
519
514
        # with the referenced object.  So we add a weakref *after* adding to
520
515
        # the WeakKeyDict so that we can report the value from it before the
524
519
    def increment_call_count(self, params):
525
520
        # Increment the count in the WeakKeyDictionary
526
521
        value = self.counts[params.medium]
527
 
        value['count'] += 1
528
 
        try:
529
 
            request_method = request.request_handlers.get(params.method)
530
 
        except KeyError:
531
 
            # A method we don't know about doesn't count as a VFS method.
532
 
            return
533
 
        if issubclass(request_method, vfs.VfsRequest):
534
 
            value['vfs_count'] += 1
 
522
        value[0] += 1
535
523
 
536
524
    def done(self, ref):
537
525
        value = self.counts[ref]
538
 
        count, vfs_count, medium_repr = (
539
 
            value['count'], value['vfs_count'], value['medium_repr'])
 
526
        count, medium_repr = value
540
527
        # In case this callback is invoked for the same ref twice (by the
541
528
        # weakref callback and by the atexit function), set the call count back
542
529
        # to 0 so this item won't be reported twice.
543
 
        value['count'] = 0
544
 
        value['vfs_count'] = 0
 
530
        value[0] = 0
545
531
        if count != 0:
546
 
            trace.note('HPSS calls: %d (%d vfs) %s',
547
 
                       count, vfs_count, medium_repr)
 
532
            trace.note('HPSS calls: %d %s', count, medium_repr)
548
533
 
549
534
    def flush_all(self):
550
535
        for ref in list(self.counts.keys()):
607
592
            # which is newer than a previously supplied older-than version.
608
593
            # This indicates that some smart verb call is not guarded
609
594
            # appropriately (it should simply not have been tried).
610
 
            trace.mutter(
 
595
            raise AssertionError(
611
596
                "_remember_remote_is_before(%r) called, but "
612
597
                "_remember_remote_is_before(%r) was called previously."
613
 
                , version_tuple, self._remote_version_is_before)
614
 
            if 'hpss' in debug.debug_flags:
615
 
                ui.ui_factory.show_warning(
616
 
                    "_remember_remote_is_before(%r) called, but "
617
 
                    "_remember_remote_is_before(%r) was called previously."
618
 
                    % (version_tuple, self._remote_version_is_before))
619
 
            return
 
598
                % (version_tuple, self._remote_version_is_before))
620
599
        self._remote_version_is_before = version_tuple
621
600
 
622
601
    def protocol_version(self):
733
712
 
734
713
    def _read_bytes(self, count):
735
714
        """See SmartClientStreamMedium._read_bytes."""
736
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
737
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
715
        bytes = self._readable_pipe.read(count)
738
716
        self._report_activity(len(bytes), 'read')
739
717
        return bytes
740
718
 
741
719
 
742
 
class SSHParams(object):
743
 
    """A set of parameters for starting a remote bzr via SSH."""
 
720
class SmartSSHClientMedium(SmartClientStreamMedium):
 
721
    """A client medium using SSH."""
744
722
 
745
723
    def __init__(self, host, port=None, username=None, password=None,
746
 
            bzr_remote_path='bzr'):
747
 
        self.host = host
748
 
        self.port = port
749
 
        self.username = username
750
 
        self.password = password
751
 
        self.bzr_remote_path = bzr_remote_path
752
 
 
753
 
 
754
 
class SmartSSHClientMedium(SmartClientStreamMedium):
755
 
    """A client medium using SSH.
756
 
    
757
 
    It delegates IO to a SmartClientSocketMedium or
758
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
759
 
    """
760
 
 
761
 
    def __init__(self, base, ssh_params, vendor=None):
 
724
            base=None, vendor=None, bzr_remote_path=None):
762
725
        """Creates a client that will connect on the first use.
763
726
 
764
 
        :param ssh_params: A SSHParams instance.
765
727
        :param vendor: An optional override for the ssh vendor to use. See
766
728
            bzrlib.transport.ssh for details on ssh vendors.
767
729
        """
768
 
        self._real_medium = None
769
 
        self._ssh_params = ssh_params
770
 
        # for the benefit of progress making a short description of this
771
 
        # transport
772
 
        self._scheme = 'bzr+ssh'
 
730
        self._connected = False
 
731
        self._host = host
 
732
        self._password = password
 
733
        self._port = port
 
734
        self._username = username
773
735
        # SmartClientStreamMedium stores the repr of this object in its
774
736
        # _DebugCounter so we have to store all the values used in our repr
775
737
        # method before calling the super init.
776
738
        SmartClientStreamMedium.__init__(self, base)
 
739
        self._read_from = None
 
740
        self._ssh_connection = None
777
741
        self._vendor = vendor
778
 
        self._ssh_connection = None
 
742
        self._write_to = None
 
743
        self._bzr_remote_path = bzr_remote_path
 
744
        # for the benefit of progress making a short description of this
 
745
        # transport
 
746
        self._scheme = 'bzr+ssh'
779
747
 
780
748
    def __repr__(self):
781
 
        if self._ssh_params.port is None:
782
 
            maybe_port = ''
783
 
        else:
784
 
            maybe_port = ':%s' % self._ssh_params.port
785
 
        return "%s(%s://%s@%s%s/)" % (
 
749
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
786
750
            self.__class__.__name__,
787
 
            self._scheme,
788
 
            self._ssh_params.username,
789
 
            self._ssh_params.host,
790
 
            maybe_port)
 
751
            self._connected,
 
752
            self._username,
 
753
            self._host,
 
754
            self._port)
791
755
 
792
756
    def _accept_bytes(self, bytes):
793
757
        """See SmartClientStreamMedium.accept_bytes."""
794
758
        self._ensure_connection()
795
 
        self._real_medium.accept_bytes(bytes)
 
759
        self._write_to.write(bytes)
 
760
        self._report_activity(len(bytes), 'write')
796
761
 
797
762
    def disconnect(self):
798
763
        """See SmartClientMedium.disconnect()."""
799
 
        if self._real_medium is not None:
800
 
            self._real_medium.disconnect()
801
 
            self._real_medium = None
802
 
        if self._ssh_connection is not None:
803
 
            self._ssh_connection.close()
804
 
            self._ssh_connection = None
 
764
        if not self._connected:
 
765
            return
 
766
        self._read_from.close()
 
767
        self._write_to.close()
 
768
        self._ssh_connection.close()
 
769
        self._connected = False
805
770
 
806
771
    def _ensure_connection(self):
807
772
        """Connect this medium if not already connected."""
808
 
        if self._real_medium is not None:
 
773
        if self._connected:
809
774
            return
810
775
        if self._vendor is None:
811
776
            vendor = ssh._get_ssh_vendor()
812
777
        else:
813
778
            vendor = self._vendor
814
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
 
                self._ssh_params.password, self._ssh_params.host,
816
 
                self._ssh_params.port,
817
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
779
        self._ssh_connection = vendor.connect_ssh(self._username,
 
780
                self._password, self._host, self._port,
 
781
                command=[self._bzr_remote_path, 'serve', '--inet',
818
782
                         '--directory=/', '--allow-writes'])
819
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
 
        if io_kind == 'socket':
821
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
 
                self.base, io_object)
823
 
        elif io_kind == 'pipes':
824
 
            read_from, write_to = io_object
825
 
            self._real_medium = SmartSimplePipesClientMedium(
826
 
                read_from, write_to, self.base)
827
 
        else:
828
 
            raise AssertionError(
829
 
                "Unexpected io_kind %r from %r"
830
 
                % (io_kind, self._ssh_connection))
 
783
        self._read_from, self._write_to = \
 
784
            self._ssh_connection.get_filelike_channels()
 
785
        self._connected = True
831
786
 
832
787
    def _flush(self):
833
788
        """See SmartClientStreamMedium._flush()."""
834
 
        self._real_medium._flush()
 
789
        self._write_to.flush()
835
790
 
836
791
    def _read_bytes(self, count):
837
792
        """See SmartClientStreamMedium.read_bytes."""
838
 
        if self._real_medium is None:
 
793
        if not self._connected:
839
794
            raise errors.MediumNotConnected(self)
840
 
        return self._real_medium.read_bytes(count)
 
795
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
796
        bytes = self._read_from.read(bytes_to_read)
 
797
        self._report_activity(len(bytes), 'read')
 
798
        return bytes
841
799
 
842
800
 
843
801
# Port 4155 is the default port for bzr://, registered with IANA.
845
803
BZR_DEFAULT_PORT = 4155
846
804
 
847
805
 
848
 
class SmartClientSocketMedium(SmartClientStreamMedium):
849
 
    """A client medium using a socket.
850
 
    
851
 
    This class isn't usable directly.  Use one of its subclasses instead.
852
 
    """
 
806
class SmartTCPClientMedium(SmartClientStreamMedium):
 
807
    """A client medium using TCP."""
853
808
 
854
 
    def __init__(self, base):
 
809
    def __init__(self, host, port, base):
 
810
        """Creates a client that will connect on the first use."""
855
811
        SmartClientStreamMedium.__init__(self, base)
 
812
        self._connected = False
 
813
        self._host = host
 
814
        self._port = port
856
815
        self._socket = None
857
 
        self._connected = False
858
816
 
859
817
    def _accept_bytes(self, bytes):
860
818
        """See SmartClientMedium.accept_bytes."""
861
819
        self._ensure_connection()
862
820
        osutils.send_all(self._socket, bytes, self._report_activity)
863
821
 
864
 
    def _ensure_connection(self):
865
 
        """Connect this medium if not already connected."""
866
 
        raise NotImplementedError(self._ensure_connection)
867
 
 
868
 
    def _flush(self):
869
 
        """See SmartClientStreamMedium._flush().
870
 
 
871
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
872
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
873
 
        future.
874
 
        """
875
 
 
876
 
    def _read_bytes(self, count):
877
 
        """See SmartClientMedium.read_bytes."""
878
 
        if not self._connected:
879
 
            raise errors.MediumNotConnected(self)
880
 
        return osutils.read_bytes_from_socket(
881
 
            self._socket, self._report_activity)
882
 
 
883
822
    def disconnect(self):
884
823
        """See SmartClientMedium.disconnect()."""
885
824
        if not self._connected:
888
827
        self._socket = None
889
828
        self._connected = False
890
829
 
891
 
 
892
 
class SmartTCPClientMedium(SmartClientSocketMedium):
893
 
    """A client medium that creates a TCP connection."""
894
 
 
895
 
    def __init__(self, host, port, base):
896
 
        """Creates a client that will connect on the first use."""
897
 
        SmartClientSocketMedium.__init__(self, base)
898
 
        self._host = host
899
 
        self._port = port
900
 
 
901
830
    def _ensure_connection(self):
902
831
        """Connect this medium if not already connected."""
903
832
        if self._connected:
937
866
                    (self._host, port, err_msg))
938
867
        self._connected = True
939
868
 
940
 
 
941
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
 
    """A client medium for an already connected socket.
943
 
    
944
 
    Note that this class will assume it "owns" the socket, so it will close it
945
 
    when its disconnect method is called.
946
 
    """
947
 
 
948
 
    def __init__(self, base, sock):
949
 
        SmartClientSocketMedium.__init__(self, base)
950
 
        self._socket = sock
951
 
        self._connected = True
952
 
 
953
 
    def _ensure_connection(self):
954
 
        # Already connected, by definition!  So nothing to do.
955
 
        pass
 
869
    def _flush(self):
 
870
        """See SmartClientStreamMedium._flush().
 
871
 
 
872
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
873
        add a means to do a flush, but that can be done in the future.
 
874
        """
 
875
 
 
876
    def _read_bytes(self, count):
 
877
        """See SmartClientMedium.read_bytes."""
 
878
        if not self._connected:
 
879
            raise errors.MediumNotConnected(self)
 
880
        # We ignore the desired_count because on sockets it's more efficient to
 
881
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
882
        try:
 
883
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
 
884
        except socket.error, e:
 
885
            if len(e.args) and e.args[0] == errno.ECONNRESET:
 
886
                # Callers expect an empty string in that case
 
887
                return ''
 
888
            else:
 
889
                raise
 
890
        else:
 
891
            self._report_activity(len(bytes), 'read')
 
892
            return bytes
956
893
 
957
894
 
958
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
994
931
        """
995
932
        self._medium._flush()
996
933
 
997