~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Matt Nordhoff
  • Date: 2009-04-04 02:50:01 UTC
  • mfrom: (4253 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4256.
  • Revision ID: mnordhoff@mattnordhoff.com-20090404025001-z1403k0tatmc8l91
Merge bzr.dev, fixing conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
40
40
    osutils,
41
41
    symbol_versioning,
42
42
    trace,
 
43
    ui,
43
44
    urlutils,
44
45
    )
45
46
from bzrlib.smart import client, protocol
87
88
 
88
89
def _get_line(read_bytes_func):
89
90
    """Read bytes using read_bytes_func until a newline byte.
90
 
    
 
91
 
91
92
    This isn't particularly efficient, so should only be used when the
92
93
    expected size of the line is quite short.
93
 
    
 
94
 
94
95
    :returns: a tuple of two strs: (line, excess)
95
96
    """
96
97
    newline_pos = -1
112
113
 
113
114
    def __init__(self):
114
115
        self._push_back_buffer = None
115
 
        
 
116
 
116
117
    def _push_back(self, bytes):
117
118
        """Return unused bytes to the medium, because they belong to the next
118
119
        request(s).
152
153
 
153
154
    def _get_line(self):
154
155
        """Read bytes from this request's response until a newline byte.
155
 
        
 
156
 
156
157
        This isn't particularly efficient, so should only be used when the
157
158
        expected size of the line is quite short.
158
159
 
161
162
        line, excess = _get_line(self.read_bytes)
162
163
        self._push_back(excess)
163
164
        return line
164
 
 
 
165
 
 
166
    def _report_activity(self, bytes, direction):
 
167
        """Notify that this medium has activity.
 
168
 
 
169
        Implementations should call this from all methods that actually do IO.
 
170
        Be careful that it's not called twice, if one method is implemented on
 
171
        top of another.
 
172
 
 
173
        :param bytes: Number of bytes read or written.
 
174
        :param direction: 'read' or 'write' or None.
 
175
        """
 
176
        ui.ui_factory.report_transport_activity(self, bytes, direction)
 
177
 
165
178
 
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
    """Handles smart commands coming over a stream.
172
185
    One instance is created for each connected client; it can serve multiple
173
186
    requests in the lifetime of the connection.
174
187
 
175
 
    The server passes requests through to an underlying backing transport, 
 
188
    The server passes requests through to an underlying backing transport,
176
189
    which will typically be a LocalTransport looking at the server's filesystem.
177
190
 
178
191
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
223
236
 
224
237
    def _serve_one_request(self, protocol):
225
238
        """Read one request from input, process, send back a response.
226
 
        
 
239
 
227
240
        :param protocol: a SmartServerRequestProtocol.
228
241
        """
229
242
        try:
268
281
                self.finished = True
269
282
                return
270
283
            protocol.accept_bytes(bytes)
271
 
        
 
284
 
272
285
        self._push_back(protocol.unused_data)
273
286
 
274
287
    def _read_bytes(self, desired_count):
275
288
        # We ignore the desired_count because on sockets it's more efficient to
276
289
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
 
        return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
290
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
291
        self._report_activity(len(bytes), 'read')
 
292
        return bytes
278
293
 
279
294
    def terminate_due_to_error(self):
280
295
        # TODO: This should log to a server log file, but no such thing
283
298
        self.finished = True
284
299
 
285
300
    def _write_out(self, bytes):
286
 
        osutils.send_all(self.socket, bytes)
 
301
        osutils.send_all(self.socket, bytes, self._report_activity)
287
302
 
288
303
 
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
350
365
    request.finished_reading()
351
366
 
352
367
    It is up to the individual SmartClientMedium whether multiple concurrent
353
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
354
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
368
    requests can exist. See SmartClientMedium.get_request to obtain instances
 
369
    of SmartClientMediumRequest, and the concrete Medium you are using for
355
370
    details on concurrency and pipelining.
356
371
    """
357
372
 
403
418
    def _finished_reading(self):
404
419
        """Helper for finished_reading.
405
420
 
406
 
        finished_reading checks the state of the request to determine if 
 
421
        finished_reading checks the state of the request to determine if
407
422
        finished_reading is allowed, and if it is hands off to _finished_reading
408
423
        to perform the action.
409
424
        """
423
438
    def _finished_writing(self):
424
439
        """Helper for finished_writing.
425
440
 
426
 
        finished_writing checks the state of the request to determine if 
 
441
        finished_writing checks the state of the request to determine if
427
442
        finished_writing is allowed, and if it is hands off to _finished_writing
428
443
        to perform the action.
429
444
        """
449
464
        read_bytes checks the state of the request to determing if bytes
450
465
        should be read. After that it hands off to _read_bytes to do the
451
466
        actual read.
452
 
        
 
467
 
453
468
        By default this forwards to self._medium.read_bytes because we are
454
469
        operating on the medium's stream.
455
470
        """
460
475
        if not line.endswith('\n'):
461
476
            # end of file encountered reading from server
462
477
            raise errors.ConnectionReset(
463
 
                "please check connectivity and permissions",
464
 
                "(and try -Dhpss if further diagnosis is required)")
 
478
                "please check connectivity and permissions")
465
479
        return line
466
480
 
467
481
    def _read_line(self):
468
482
        """Helper for SmartClientMediumRequest.read_line.
469
 
        
 
483
 
470
484
        By default this forwards to self._medium._get_line because we are
471
485
        operating on the medium's stream.
472
486
        """
516
530
        value[0] = 0
517
531
        if count != 0:
518
532
            trace.note('HPSS calls: %d %s', count, medium_repr)
519
 
        
 
533
 
520
534
    def flush_all(self):
521
535
        for ref in list(self.counts.keys()):
522
536
            self.done(ref)
523
537
 
524
538
_debug_counter = None
525
 
  
526
 
  
 
539
 
 
540
 
527
541
class SmartClientMedium(SmartMedium):
528
542
    """Smart client is a medium for sending smart protocol requests over."""
529
543
 
621
635
 
622
636
    def disconnect(self):
623
637
        """If this medium maintains a persistent connection, close it.
624
 
        
 
638
 
625
639
        The default implementation does nothing.
626
640
        """
627
 
        
 
641
 
628
642
    def remote_path_from_transport(self, transport):
629
643
        """Convert transport into a path suitable for using in a request.
630
 
        
 
644
 
631
645
        Note that the resulting remote path doesn't encode the host name or
632
646
        anything but path, so it is only safe to use it in requests sent over
633
647
        the medium from the matching transport.
661
675
 
662
676
    def _flush(self):
663
677
        """Flush the output stream.
664
 
        
 
678
 
665
679
        This method is used by the SmartClientStreamMediumRequest to ensure that
666
680
        all data for a request is sent, to avoid long timeouts or deadlocks.
667
681
        """
678
692
 
679
693
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
680
694
    """A client medium using simple pipes.
681
 
    
 
695
 
682
696
    This client does not manage the pipes: it assumes they will always be open.
683
697
    """
684
698
 
690
704
    def _accept_bytes(self, bytes):
691
705
        """See SmartClientStreamMedium.accept_bytes."""
692
706
        self._writeable_pipe.write(bytes)
 
707
        self._report_activity(len(bytes), 'write')
693
708
 
694
709
    def _flush(self):
695
710
        """See SmartClientStreamMedium._flush()."""
697
712
 
698
713
    def _read_bytes(self, count):
699
714
        """See SmartClientStreamMedium._read_bytes."""
700
 
        return self._readable_pipe.read(count)
 
715
        bytes = self._readable_pipe.read(count)
 
716
        self._report_activity(len(bytes), 'read')
 
717
        return bytes
701
718
 
702
719
 
703
720
class SmartSSHClientMedium(SmartClientStreamMedium):
704
721
    """A client medium using SSH."""
705
 
    
 
722
 
706
723
    def __init__(self, host, port=None, username=None, password=None,
707
724
            base=None, vendor=None, bzr_remote_path=None):
708
725
        """Creates a client that will connect on the first use.
709
 
        
 
726
 
710
727
        :param vendor: An optional override for the ssh vendor to use. See
711
728
            bzrlib.transport.ssh for details on ssh vendors.
712
729
        """
713
 
        SmartClientStreamMedium.__init__(self, base)
714
730
        self._connected = False
715
731
        self._host = host
716
732
        self._password = password
717
733
        self._port = port
718
734
        self._username = username
 
735
        # SmartClientStreamMedium stores the repr of this object in its
 
736
        # _DebugCounter so we have to store all the values used in our repr
 
737
        # method before calling the super init.
 
738
        SmartClientStreamMedium.__init__(self, base)
719
739
        self._read_from = None
720
740
        self._ssh_connection = None
721
741
        self._vendor = vendor
722
742
        self._write_to = None
723
743
        self._bzr_remote_path = bzr_remote_path
724
 
        if self._bzr_remote_path is None:
725
 
            symbol_versioning.warn(
726
 
                'bzr_remote_path is required as of bzr 0.92',
727
 
                DeprecationWarning, stacklevel=2)
728
 
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
744
        # for the benefit of progress making a short description of this
 
745
        # transport
 
746
        self._scheme = 'bzr+ssh'
 
747
 
 
748
    def __repr__(self):
 
749
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
 
750
            self.__class__.__name__,
 
751
            self._connected,
 
752
            self._username,
 
753
            self._host,
 
754
            self._port)
729
755
 
730
756
    def _accept_bytes(self, bytes):
731
757
        """See SmartClientStreamMedium.accept_bytes."""
732
758
        self._ensure_connection()
733
759
        self._write_to.write(bytes)
 
760
        self._report_activity(len(bytes), 'write')
734
761
 
735
762
    def disconnect(self):
736
763
        """See SmartClientMedium.disconnect()."""
766
793
        if not self._connected:
767
794
            raise errors.MediumNotConnected(self)
768
795
        bytes_to_read = min(count, _MAX_READ_SIZE)
769
 
        return self._read_from.read(bytes_to_read)
 
796
        bytes = self._read_from.read(bytes_to_read)
 
797
        self._report_activity(len(bytes), 'read')
 
798
        return bytes
770
799
 
771
800
 
772
801
# Port 4155 is the default port for bzr://, registered with IANA.
776
805
 
777
806
class SmartTCPClientMedium(SmartClientStreamMedium):
778
807
    """A client medium using TCP."""
779
 
    
 
808
 
780
809
    def __init__(self, host, port, base):
781
810
        """Creates a client that will connect on the first use."""
782
811
        SmartClientStreamMedium.__init__(self, base)
788
817
    def _accept_bytes(self, bytes):
789
818
        """See SmartClientMedium.accept_bytes."""
790
819
        self._ensure_connection()
791
 
        osutils.send_all(self._socket, bytes)
 
820
        osutils.send_all(self._socket, bytes, self._report_activity)
792
821
 
793
822
    def disconnect(self):
794
823
        """See SmartClientMedium.disconnect()."""
807
836
        else:
808
837
            port = int(self._port)
809
838
        try:
810
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC, 
 
839
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
811
840
                socket.SOCK_STREAM, 0, 0)
812
841
        except socket.gaierror, (err_num, err_msg):
813
842
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
817
846
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
818
847
            try:
819
848
                self._socket = socket.socket(family, socktype, proto)
820
 
                self._socket.setsockopt(socket.IPPROTO_TCP, 
 
849
                self._socket.setsockopt(socket.IPPROTO_TCP,
821
850
                                        socket.TCP_NODELAY, 1)
822
851
                self._socket.connect(sockaddr)
823
852
            except socket.error, err:
839
868
 
840
869
    def _flush(self):
841
870
        """See SmartClientStreamMedium._flush().
842
 
        
843
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
871
 
 
872
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
844
873
        add a means to do a flush, but that can be done in the future.
845
874
        """
846
875
 
851
880
        # We ignore the desired_count because on sockets it's more efficient to
852
881
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
853
882
        try:
854
 
            return self._socket.recv(_MAX_READ_SIZE)
 
883
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
855
884
        except socket.error, e:
856
885
            if len(e.args) and e.args[0] == errno.ECONNRESET:
857
886
                # Callers expect an empty string in that case
858
887
                return ''
859
888
            else:
860
889
                raise
 
890
        else:
 
891
            self._report_activity(len(bytes), 'read')
 
892
            return bytes
861
893
 
862
894
 
863
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
876
908
 
877
909
    def _accept_bytes(self, bytes):
878
910
        """See SmartClientMediumRequest._accept_bytes.
879
 
        
 
911
 
880
912
        This forwards to self._medium._accept_bytes because we are operating
881
913
        on the mediums stream.
882
914
        """
885
917
    def _finished_reading(self):
886
918
        """See SmartClientMediumRequest._finished_reading.
887
919
 
888
 
        This clears the _current_request on self._medium to allow a new 
 
920
        This clears the _current_request on self._medium to allow a new
889
921
        request to be created.
890
922
        """
891
923
        if self._medium._current_request is not self:
892
924
            raise AssertionError()
893
925
        self._medium._current_request = None
894
 
        
 
926
 
895
927
    def _finished_writing(self):
896
928
        """See SmartClientMediumRequest._finished_writing.
897
929