~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Andrew Bennetts
  • Date: 2010-01-12 03:53:21 UTC
  • mfrom: (4948 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4964.
  • Revision ID: andrew.bennetts@canonical.com-20100112035321-hofpz5p10224ryj3
Merge lp:bzr, resolving conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
 
36
import thread
36
37
import weakref
 
38
 
37
39
from bzrlib import (
38
40
    debug,
39
41
    errors,
40
 
    osutils,
41
42
    symbol_versioning,
42
43
    trace,
 
44
    ui,
43
45
    urlutils,
44
46
    )
45
 
from bzrlib.smart import client, protocol
 
47
from bzrlib.smart import client, protocol, request, vfs
46
48
from bzrlib.transport import ssh
47
49
""")
48
 
 
 
50
#usually already imported, and getting IllegalScoperReplacer on it here.
 
51
from bzrlib import osutils
49
52
 
50
53
# We must not read any more than 64k at a time so we don't risk "no buffer
51
54
# space available" errors on some platforms.  Windows in particular is likely
87
90
 
88
91
def _get_line(read_bytes_func):
89
92
    """Read bytes using read_bytes_func until a newline byte.
90
 
    
 
93
 
91
94
    This isn't particularly efficient, so should only be used when the
92
95
    expected size of the line is quite short.
93
 
    
 
96
 
94
97
    :returns: a tuple of two strs: (line, excess)
95
98
    """
96
99
    newline_pos = -1
112
115
 
113
116
    def __init__(self):
114
117
        self._push_back_buffer = None
115
 
        
 
118
 
116
119
    def _push_back(self, bytes):
117
120
        """Return unused bytes to the medium, because they belong to the next
118
121
        request(s).
152
155
 
153
156
    def _get_line(self):
154
157
        """Read bytes from this request's response until a newline byte.
155
 
        
 
158
 
156
159
        This isn't particularly efficient, so should only be used when the
157
160
        expected size of the line is quite short.
158
161
 
161
164
        line, excess = _get_line(self.read_bytes)
162
165
        self._push_back(excess)
163
166
        return line
164
 
 
 
167
 
 
168
    def _report_activity(self, bytes, direction):
 
169
        """Notify that this medium has activity.
 
170
 
 
171
        Implementations should call this from all methods that actually do IO.
 
172
        Be careful that it's not called twice, if one method is implemented on
 
173
        top of another.
 
174
 
 
175
        :param bytes: Number of bytes read or written.
 
176
        :param direction: 'read' or 'write' or None.
 
177
        """
 
178
        ui.ui_factory.report_transport_activity(self, bytes, direction)
 
179
 
165
180
 
166
181
class SmartServerStreamMedium(SmartMedium):
167
182
    """Handles smart commands coming over a stream.
172
187
    One instance is created for each connected client; it can serve multiple
173
188
    requests in the lifetime of the connection.
174
189
 
175
 
    The server passes requests through to an underlying backing transport, 
 
190
    The server passes requests through to an underlying backing transport,
176
191
    which will typically be a LocalTransport looking at the server's filesystem.
177
192
 
178
193
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
223
238
 
224
239
    def _serve_one_request(self, protocol):
225
240
        """Read one request from input, process, send back a response.
226
 
        
 
241
 
227
242
        :param protocol: a SmartServerRequestProtocol.
228
243
        """
229
244
        try:
268
283
                self.finished = True
269
284
                return
270
285
            protocol.accept_bytes(bytes)
271
 
        
 
286
 
272
287
        self._push_back(protocol.unused_data)
273
288
 
274
289
    def _read_bytes(self, desired_count):
275
 
        # We ignore the desired_count because on sockets it's more efficient to
276
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
 
        return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
290
        return _read_bytes_from_socket(
 
291
            self.socket.recv, desired_count, self._report_activity)
278
292
 
279
293
    def terminate_due_to_error(self):
280
294
        # TODO: This should log to a server log file, but no such thing
281
295
        # exists yet.  Andrew Bennetts 2006-09-29.
282
 
        self.socket.close()
 
296
        osutils.until_no_eintr(self.socket.close)
283
297
        self.finished = True
284
298
 
285
299
    def _write_out(self, bytes):
286
 
        osutils.send_all(self.socket, bytes)
 
300
        tstart = osutils.timer_func()
 
301
        osutils.send_all(self.socket, bytes, self._report_activity)
 
302
        if 'hpss' in debug.debug_flags:
 
303
            thread_id = thread.get_ident()
 
304
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
305
                         % ('wrote', thread_id, len(bytes),
 
306
                            osutils.timer_func() - tstart))
287
307
 
288
308
 
289
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
314
334
            bytes_to_read = protocol.next_read_size()
315
335
            if bytes_to_read == 0:
316
336
                # Finished serving this request.
317
 
                self._out.flush()
 
337
                osutils.until_no_eintr(self._out.flush)
318
338
                return
319
339
            bytes = self.read_bytes(bytes_to_read)
320
340
            if bytes == '':
321
341
                # Connection has been closed.
322
342
                self.finished = True
323
 
                self._out.flush()
 
343
                osutils.until_no_eintr(self._out.flush)
324
344
                return
325
345
            protocol.accept_bytes(bytes)
326
346
 
327
347
    def _read_bytes(self, desired_count):
328
 
        return self._in.read(desired_count)
 
348
        return osutils.until_no_eintr(self._in.read, desired_count)
329
349
 
330
350
    def terminate_due_to_error(self):
331
351
        # TODO: This should log to a server log file, but no such thing
332
352
        # exists yet.  Andrew Bennetts 2006-09-29.
333
 
        self._out.close()
 
353
        osutils.until_no_eintr(self._out.close)
334
354
        self.finished = True
335
355
 
336
356
    def _write_out(self, bytes):
337
 
        self._out.write(bytes)
 
357
        osutils.until_no_eintr(self._out.write, bytes)
338
358
 
339
359
 
340
360
class SmartClientMediumRequest(object):
350
370
    request.finished_reading()
351
371
 
352
372
    It is up to the individual SmartClientMedium whether multiple concurrent
353
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
354
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
373
    requests can exist. See SmartClientMedium.get_request to obtain instances
 
374
    of SmartClientMediumRequest, and the concrete Medium you are using for
355
375
    details on concurrency and pipelining.
356
376
    """
357
377
 
366
386
    def accept_bytes(self, bytes):
367
387
        """Accept bytes for inclusion in this request.
368
388
 
369
 
        This method may not be be called after finished_writing() has been
 
389
        This method may not be called after finished_writing() has been
370
390
        called.  It depends upon the Medium whether or not the bytes will be
371
391
        immediately transmitted. Message based Mediums will tend to buffer the
372
392
        bytes until finished_writing() is called.
403
423
    def _finished_reading(self):
404
424
        """Helper for finished_reading.
405
425
 
406
 
        finished_reading checks the state of the request to determine if 
 
426
        finished_reading checks the state of the request to determine if
407
427
        finished_reading is allowed, and if it is hands off to _finished_reading
408
428
        to perform the action.
409
429
        """
423
443
    def _finished_writing(self):
424
444
        """Helper for finished_writing.
425
445
 
426
 
        finished_writing checks the state of the request to determine if 
 
446
        finished_writing checks the state of the request to determine if
427
447
        finished_writing is allowed, and if it is hands off to _finished_writing
428
448
        to perform the action.
429
449
        """
449
469
        read_bytes checks the state of the request to determing if bytes
450
470
        should be read. After that it hands off to _read_bytes to do the
451
471
        actual read.
452
 
        
 
472
 
453
473
        By default this forwards to self._medium.read_bytes because we are
454
474
        operating on the medium's stream.
455
475
        """
460
480
        if not line.endswith('\n'):
461
481
            # end of file encountered reading from server
462
482
            raise errors.ConnectionReset(
463
 
                "please check connectivity and permissions",
464
 
                "(and try -Dhpss if further diagnosis is required)")
 
483
                "Unexpected end of message. Please check connectivity "
 
484
                "and permissions, and report a bug if problems persist.")
465
485
        return line
466
486
 
467
487
    def _read_line(self):
468
488
        """Helper for SmartClientMediumRequest.read_line.
469
 
        
 
489
 
470
490
        By default this forwards to self._medium._get_line because we are
471
491
        operating on the medium's stream.
472
492
        """
495
515
        """
496
516
        medium_repr = repr(medium)
497
517
        # Add this medium to the WeakKeyDictionary
498
 
        self.counts[medium] = [0, medium_repr]
 
518
        self.counts[medium] = dict(count=0, vfs_count=0,
 
519
                                   medium_repr=medium_repr)
499
520
        # Weakref callbacks are fired in reverse order of their association
500
521
        # with the referenced object.  So we add a weakref *after* adding to
501
522
        # the WeakKeyDict so that we can report the value from it before the
505
526
    def increment_call_count(self, params):
506
527
        # Increment the count in the WeakKeyDictionary
507
528
        value = self.counts[params.medium]
508
 
        value[0] += 1
 
529
        value['count'] += 1
 
530
        try:
 
531
            request_method = request.request_handlers.get(params.method)
 
532
        except KeyError:
 
533
            # A method we don't know about doesn't count as a VFS method.
 
534
            return
 
535
        if issubclass(request_method, vfs.VfsRequest):
 
536
            value['vfs_count'] += 1
509
537
 
510
538
    def done(self, ref):
511
539
        value = self.counts[ref]
512
 
        count, medium_repr = value
 
540
        count, vfs_count, medium_repr = (
 
541
            value['count'], value['vfs_count'], value['medium_repr'])
513
542
        # In case this callback is invoked for the same ref twice (by the
514
543
        # weakref callback and by the atexit function), set the call count back
515
544
        # to 0 so this item won't be reported twice.
516
 
        value[0] = 0
 
545
        value['count'] = 0
 
546
        value['vfs_count'] = 0
517
547
        if count != 0:
518
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
519
 
        
 
548
            trace.note('HPSS calls: %d (%d vfs) %s',
 
549
                       count, vfs_count, medium_repr)
 
550
 
520
551
    def flush_all(self):
521
552
        for ref in list(self.counts.keys()):
522
553
            self.done(ref)
523
554
 
524
555
_debug_counter = None
525
 
  
526
 
  
 
556
 
 
557
 
527
558
class SmartClientMedium(SmartMedium):
528
559
    """Smart client is a medium for sending smart protocol requests over."""
529
560
 
574
605
        """
575
606
        if (self._remote_version_is_before is not None and
576
607
            version_tuple > self._remote_version_is_before):
 
608
            # We have been told that the remote side is older than some version
 
609
            # which is newer than a previously supplied older-than version.
 
610
            # This indicates that some smart verb call is not guarded
 
611
            # appropriately (it should simply not have been tried).
577
612
            raise AssertionError(
578
613
                "_remember_remote_is_before(%r) called, but "
579
614
                "_remember_remote_is_before(%r) was called previously."
617
652
 
618
653
    def disconnect(self):
619
654
        """If this medium maintains a persistent connection, close it.
620
 
        
 
655
 
621
656
        The default implementation does nothing.
622
657
        """
623
 
        
 
658
 
624
659
    def remote_path_from_transport(self, transport):
625
660
        """Convert transport into a path suitable for using in a request.
626
 
        
 
661
 
627
662
        Note that the resulting remote path doesn't encode the host name or
628
663
        anything but path, so it is only safe to use it in requests sent over
629
664
        the medium from the matching transport.
657
692
 
658
693
    def _flush(self):
659
694
        """Flush the output stream.
660
 
        
 
695
 
661
696
        This method is used by the SmartClientStreamMediumRequest to ensure that
662
697
        all data for a request is sent, to avoid long timeouts or deadlocks.
663
698
        """
674
709
 
675
710
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
676
711
    """A client medium using simple pipes.
677
 
    
 
712
 
678
713
    This client does not manage the pipes: it assumes they will always be open.
679
714
    """
680
715
 
685
720
 
686
721
    def _accept_bytes(self, bytes):
687
722
        """See SmartClientStreamMedium.accept_bytes."""
688
 
        self._writeable_pipe.write(bytes)
 
723
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
 
724
        self._report_activity(len(bytes), 'write')
689
725
 
690
726
    def _flush(self):
691
727
        """See SmartClientStreamMedium._flush()."""
692
 
        self._writeable_pipe.flush()
 
728
        osutils.until_no_eintr(self._writeable_pipe.flush)
693
729
 
694
730
    def _read_bytes(self, count):
695
731
        """See SmartClientStreamMedium._read_bytes."""
696
 
        return self._readable_pipe.read(count)
 
732
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
733
        self._report_activity(len(bytes), 'read')
 
734
        return bytes
697
735
 
698
736
 
699
737
class SmartSSHClientMedium(SmartClientStreamMedium):
700
738
    """A client medium using SSH."""
701
 
    
 
739
 
702
740
    def __init__(self, host, port=None, username=None, password=None,
703
741
            base=None, vendor=None, bzr_remote_path=None):
704
742
        """Creates a client that will connect on the first use.
705
 
        
 
743
 
706
744
        :param vendor: An optional override for the ssh vendor to use. See
707
745
            bzrlib.transport.ssh for details on ssh vendors.
708
746
        """
709
 
        SmartClientStreamMedium.__init__(self, base)
710
747
        self._connected = False
711
748
        self._host = host
712
749
        self._password = password
713
750
        self._port = port
714
751
        self._username = username
 
752
        # SmartClientStreamMedium stores the repr of this object in its
 
753
        # _DebugCounter so we have to store all the values used in our repr
 
754
        # method before calling the super init.
 
755
        SmartClientStreamMedium.__init__(self, base)
715
756
        self._read_from = None
716
757
        self._ssh_connection = None
717
758
        self._vendor = vendor
718
759
        self._write_to = None
719
760
        self._bzr_remote_path = bzr_remote_path
720
 
        if self._bzr_remote_path is None:
721
 
            symbol_versioning.warn(
722
 
                'bzr_remote_path is required as of bzr 0.92',
723
 
                DeprecationWarning, stacklevel=2)
724
 
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
761
        # for the benefit of progress making a short description of this
 
762
        # transport
 
763
        self._scheme = 'bzr+ssh'
 
764
 
 
765
    def __repr__(self):
 
766
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
 
767
            self.__class__.__name__,
 
768
            self._connected,
 
769
            self._username,
 
770
            self._host,
 
771
            self._port)
725
772
 
726
773
    def _accept_bytes(self, bytes):
727
774
        """See SmartClientStreamMedium.accept_bytes."""
728
775
        self._ensure_connection()
729
 
        self._write_to.write(bytes)
 
776
        osutils.until_no_eintr(self._write_to.write, bytes)
 
777
        self._report_activity(len(bytes), 'write')
730
778
 
731
779
    def disconnect(self):
732
780
        """See SmartClientMedium.disconnect()."""
733
781
        if not self._connected:
734
782
            return
735
 
        self._read_from.close()
736
 
        self._write_to.close()
 
783
        osutils.until_no_eintr(self._read_from.close)
 
784
        osutils.until_no_eintr(self._write_to.close)
737
785
        self._ssh_connection.close()
738
786
        self._connected = False
739
787
 
762
810
        if not self._connected:
763
811
            raise errors.MediumNotConnected(self)
764
812
        bytes_to_read = min(count, _MAX_READ_SIZE)
765
 
        return self._read_from.read(bytes_to_read)
 
813
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
 
814
        self._report_activity(len(bytes), 'read')
 
815
        return bytes
766
816
 
767
817
 
768
818
# Port 4155 is the default port for bzr://, registered with IANA.
772
822
 
773
823
class SmartTCPClientMedium(SmartClientStreamMedium):
774
824
    """A client medium using TCP."""
775
 
    
 
825
 
776
826
    def __init__(self, host, port, base):
777
827
        """Creates a client that will connect on the first use."""
778
828
        SmartClientStreamMedium.__init__(self, base)
784
834
    def _accept_bytes(self, bytes):
785
835
        """See SmartClientMedium.accept_bytes."""
786
836
        self._ensure_connection()
787
 
        osutils.send_all(self._socket, bytes)
 
837
        osutils.send_all(self._socket, bytes, self._report_activity)
788
838
 
789
839
    def disconnect(self):
790
840
        """See SmartClientMedium.disconnect()."""
791
841
        if not self._connected:
792
842
            return
793
 
        self._socket.close()
 
843
        osutils.until_no_eintr(self._socket.close)
794
844
        self._socket = None
795
845
        self._connected = False
796
846
 
803
853
        else:
804
854
            port = int(self._port)
805
855
        try:
806
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC, 
 
856
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
807
857
                socket.SOCK_STREAM, 0, 0)
808
858
        except socket.gaierror, (err_num, err_msg):
809
859
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
813
863
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
814
864
            try:
815
865
                self._socket = socket.socket(family, socktype, proto)
816
 
                self._socket.setsockopt(socket.IPPROTO_TCP, 
 
866
                self._socket.setsockopt(socket.IPPROTO_TCP,
817
867
                                        socket.TCP_NODELAY, 1)
818
868
                self._socket.connect(sockaddr)
819
869
            except socket.error, err:
835
885
 
836
886
    def _flush(self):
837
887
        """See SmartClientStreamMedium._flush().
838
 
        
839
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
888
 
 
889
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
840
890
        add a means to do a flush, but that can be done in the future.
841
891
        """
842
892
 
844
894
        """See SmartClientMedium.read_bytes."""
845
895
        if not self._connected:
846
896
            raise errors.MediumNotConnected(self)
847
 
        # We ignore the desired_count because on sockets it's more efficient to
848
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
849
 
        try:
850
 
            return self._socket.recv(_MAX_READ_SIZE)
851
 
        except socket.error, e:
852
 
            if len(e.args) and e.args[0] == errno.ECONNRESET:
853
 
                # Callers expect an empty string in that case
854
 
                return ''
855
 
            else:
856
 
                raise
 
897
        return _read_bytes_from_socket(
 
898
            self._socket.recv, count, self._report_activity)
857
899
 
858
900
 
859
901
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
872
914
 
873
915
    def _accept_bytes(self, bytes):
874
916
        """See SmartClientMediumRequest._accept_bytes.
875
 
        
 
917
 
876
918
        This forwards to self._medium._accept_bytes because we are operating
877
919
        on the mediums stream.
878
920
        """
881
923
    def _finished_reading(self):
882
924
        """See SmartClientMediumRequest._finished_reading.
883
925
 
884
 
        This clears the _current_request on self._medium to allow a new 
 
926
        This clears the _current_request on self._medium to allow a new
885
927
        request to be created.
886
928
        """
887
929
        if self._medium._current_request is not self:
888
930
            raise AssertionError()
889
931
        self._medium._current_request = None
890
 
        
 
932
 
891
933
    def _finished_writing(self):
892
934
        """See SmartClientMediumRequest._finished_writing.
893
935
 
895
937
        """
896
938
        self._medium._flush()
897
939
 
 
940
 
 
941
def _read_bytes_from_socket(sock, desired_count, report_activity):
 
942
    # We ignore the desired_count because on sockets it's more efficient to
 
943
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
944
    try:
 
945
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
 
946
    except socket.error, e:
 
947
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
 
948
            # The connection was closed by the other side.  Callers expect an
 
949
            # empty string to signal end-of-stream.
 
950
            bytes = ''
 
951
        else:
 
952
            raise
 
953
    else:
 
954
        report_activity(len(bytes), 'read')
 
955
    return bytes
 
956