~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Vincent Ladeuil
  • Date: 2010-02-10 15:46:03 UTC
  • mfrom: (4985.3.21 update)
  • mto: This revision was merged to the branch mainline in revision 5021.
  • Revision ID: v.ladeuil+lp@free.fr-20100210154603-k4no1gvfuqpzrw7p
Update performs two merges in a more logical order but stop on conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
 
36
import thread
36
37
import weakref
 
38
 
37
39
from bzrlib import (
38
40
    debug,
39
41
    errors,
40
 
    osutils,
41
42
    symbol_versioning,
42
43
    trace,
 
44
    ui,
43
45
    urlutils,
44
46
    )
45
 
from bzrlib.smart import client, protocol
 
47
from bzrlib.smart import client, protocol, request, vfs
46
48
from bzrlib.transport import ssh
47
49
""")
48
 
 
 
50
#usually already imported, and getting IllegalScoperReplacer on it here.
 
51
from bzrlib import osutils
49
52
 
50
53
# We must not read any more than 64k at a time so we don't risk "no buffer
51
54
# space available" errors on some platforms.  Windows in particular is likely
87
90
 
88
91
def _get_line(read_bytes_func):
89
92
    """Read bytes using read_bytes_func until a newline byte.
90
 
    
 
93
 
91
94
    This isn't particularly efficient, so should only be used when the
92
95
    expected size of the line is quite short.
93
 
    
 
96
 
94
97
    :returns: a tuple of two strs: (line, excess)
95
98
    """
96
99
    newline_pos = -1
112
115
 
113
116
    def __init__(self):
114
117
        self._push_back_buffer = None
115
 
        
 
118
 
116
119
    def _push_back(self, bytes):
117
120
        """Return unused bytes to the medium, because they belong to the next
118
121
        request(s).
152
155
 
153
156
    def _get_line(self):
154
157
        """Read bytes from this request's response until a newline byte.
155
 
        
 
158
 
156
159
        This isn't particularly efficient, so should only be used when the
157
160
        expected size of the line is quite short.
158
161
 
161
164
        line, excess = _get_line(self.read_bytes)
162
165
        self._push_back(excess)
163
166
        return line
164
 
 
 
167
 
 
168
    def _report_activity(self, bytes, direction):
 
169
        """Notify that this medium has activity.
 
170
 
 
171
        Implementations should call this from all methods that actually do IO.
 
172
        Be careful that it's not called twice, if one method is implemented on
 
173
        top of another.
 
174
 
 
175
        :param bytes: Number of bytes read or written.
 
176
        :param direction: 'read' or 'write' or None.
 
177
        """
 
178
        ui.ui_factory.report_transport_activity(self, bytes, direction)
 
179
 
165
180
 
166
181
class SmartServerStreamMedium(SmartMedium):
167
182
    """Handles smart commands coming over a stream.
172
187
    One instance is created for each connected client; it can serve multiple
173
188
    requests in the lifetime of the connection.
174
189
 
175
 
    The server passes requests through to an underlying backing transport, 
 
190
    The server passes requests through to an underlying backing transport,
176
191
    which will typically be a LocalTransport looking at the server's filesystem.
177
192
 
178
193
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
223
238
 
224
239
    def _serve_one_request(self, protocol):
225
240
        """Read one request from input, process, send back a response.
226
 
        
 
241
 
227
242
        :param protocol: a SmartServerRequestProtocol.
228
243
        """
229
244
        try:
268
283
                self.finished = True
269
284
                return
270
285
            protocol.accept_bytes(bytes)
271
 
        
 
286
 
272
287
        self._push_back(protocol.unused_data)
273
288
 
274
289
    def _read_bytes(self, desired_count):
275
 
        # We ignore the desired_count because on sockets it's more efficient to
276
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
 
        return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
 
290
        return _read_bytes_from_socket(
 
291
            self.socket.recv, desired_count, self._report_activity)
278
292
 
279
293
    def terminate_due_to_error(self):
280
294
        # TODO: This should log to a server log file, but no such thing
281
295
        # exists yet.  Andrew Bennetts 2006-09-29.
282
 
        self.socket.close()
 
296
        osutils.until_no_eintr(self.socket.close)
283
297
        self.finished = True
284
298
 
285
299
    def _write_out(self, bytes):
286
 
        osutils.send_all(self.socket, bytes)
 
300
        tstart = osutils.timer_func()
 
301
        osutils.send_all(self.socket, bytes, self._report_activity)
 
302
        if 'hpss' in debug.debug_flags:
 
303
            thread_id = thread.get_ident()
 
304
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
305
                         % ('wrote', thread_id, len(bytes),
 
306
                            osutils.timer_func() - tstart))
287
307
 
288
308
 
289
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
314
334
            bytes_to_read = protocol.next_read_size()
315
335
            if bytes_to_read == 0:
316
336
                # Finished serving this request.
317
 
                self._out.flush()
 
337
                osutils.until_no_eintr(self._out.flush)
318
338
                return
319
339
            bytes = self.read_bytes(bytes_to_read)
320
340
            if bytes == '':
321
341
                # Connection has been closed.
322
342
                self.finished = True
323
 
                self._out.flush()
 
343
                osutils.until_no_eintr(self._out.flush)
324
344
                return
325
345
            protocol.accept_bytes(bytes)
326
346
 
327
347
    def _read_bytes(self, desired_count):
328
 
        return self._in.read(desired_count)
 
348
        return osutils.until_no_eintr(self._in.read, desired_count)
329
349
 
330
350
    def terminate_due_to_error(self):
331
351
        # TODO: This should log to a server log file, but no such thing
332
352
        # exists yet.  Andrew Bennetts 2006-09-29.
333
 
        self._out.close()
 
353
        osutils.until_no_eintr(self._out.close)
334
354
        self.finished = True
335
355
 
336
356
    def _write_out(self, bytes):
337
 
        self._out.write(bytes)
 
357
        osutils.until_no_eintr(self._out.write, bytes)
338
358
 
339
359
 
340
360
class SmartClientMediumRequest(object):
350
370
    request.finished_reading()
351
371
 
352
372
    It is up to the individual SmartClientMedium whether multiple concurrent
353
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
354
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
373
    requests can exist. See SmartClientMedium.get_request to obtain instances
 
374
    of SmartClientMediumRequest, and the concrete Medium you are using for
355
375
    details on concurrency and pipelining.
356
376
    """
357
377
 
366
386
    def accept_bytes(self, bytes):
367
387
        """Accept bytes for inclusion in this request.
368
388
 
369
 
        This method may not be be called after finished_writing() has been
 
389
        This method may not be called after finished_writing() has been
370
390
        called.  It depends upon the Medium whether or not the bytes will be
371
391
        immediately transmitted. Message based Mediums will tend to buffer the
372
392
        bytes until finished_writing() is called.
403
423
    def _finished_reading(self):
404
424
        """Helper for finished_reading.
405
425
 
406
 
        finished_reading checks the state of the request to determine if 
 
426
        finished_reading checks the state of the request to determine if
407
427
        finished_reading is allowed, and if it is hands off to _finished_reading
408
428
        to perform the action.
409
429
        """
423
443
    def _finished_writing(self):
424
444
        """Helper for finished_writing.
425
445
 
426
 
        finished_writing checks the state of the request to determine if 
 
446
        finished_writing checks the state of the request to determine if
427
447
        finished_writing is allowed, and if it is hands off to _finished_writing
428
448
        to perform the action.
429
449
        """
449
469
        read_bytes checks the state of the request to determing if bytes
450
470
        should be read. After that it hands off to _read_bytes to do the
451
471
        actual read.
452
 
        
 
472
 
453
473
        By default this forwards to self._medium.read_bytes because we are
454
474
        operating on the medium's stream.
455
475
        """
460
480
        if not line.endswith('\n'):
461
481
            # end of file encountered reading from server
462
482
            raise errors.ConnectionReset(
463
 
                "please check connectivity and permissions",
464
 
                "(and try -Dhpss if further diagnosis is required)")
 
483
                "Unexpected end of message. Please check connectivity "
 
484
                "and permissions, and report a bug if problems persist.")
465
485
        return line
466
486
 
467
487
    def _read_line(self):
468
488
        """Helper for SmartClientMediumRequest.read_line.
469
 
        
 
489
 
470
490
        By default this forwards to self._medium._get_line because we are
471
491
        operating on the medium's stream.
472
492
        """
495
515
        """
496
516
        medium_repr = repr(medium)
497
517
        # Add this medium to the WeakKeyDictionary
498
 
        self.counts[medium] = [0, medium_repr]
 
518
        self.counts[medium] = dict(count=0, vfs_count=0,
 
519
                                   medium_repr=medium_repr)
499
520
        # Weakref callbacks are fired in reverse order of their association
500
521
        # with the referenced object.  So we add a weakref *after* adding to
501
522
        # the WeakKeyDict so that we can report the value from it before the
505
526
    def increment_call_count(self, params):
506
527
        # Increment the count in the WeakKeyDictionary
507
528
        value = self.counts[params.medium]
508
 
        value[0] += 1
 
529
        value['count'] += 1
 
530
        try:
 
531
            request_method = request.request_handlers.get(params.method)
 
532
        except KeyError:
 
533
            # A method we don't know about doesn't count as a VFS method.
 
534
            return
 
535
        if issubclass(request_method, vfs.VfsRequest):
 
536
            value['vfs_count'] += 1
509
537
 
510
538
    def done(self, ref):
511
539
        value = self.counts[ref]
512
 
        count, medium_repr = value
 
540
        count, vfs_count, medium_repr = (
 
541
            value['count'], value['vfs_count'], value['medium_repr'])
513
542
        # In case this callback is invoked for the same ref twice (by the
514
543
        # weakref callback and by the atexit function), set the call count back
515
544
        # to 0 so this item won't be reported twice.
516
 
        value[0] = 0
 
545
        value['count'] = 0
 
546
        value['vfs_count'] = 0
517
547
        if count != 0:
518
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
519
 
        
 
548
            trace.note('HPSS calls: %d (%d vfs) %s',
 
549
                       count, vfs_count, medium_repr)
 
550
 
520
551
    def flush_all(self):
521
552
        for ref in list(self.counts.keys()):
522
553
            self.done(ref)
523
554
 
524
555
_debug_counter = None
525
 
  
526
 
  
 
556
 
 
557
 
527
558
class SmartClientMedium(SmartMedium):
528
559
    """Smart client is a medium for sending smart protocol requests over."""
529
560
 
574
605
        """
575
606
        if (self._remote_version_is_before is not None and
576
607
            version_tuple > self._remote_version_is_before):
 
608
            # We have been told that the remote side is older than some version
 
609
            # which is newer than a previously supplied older-than version.
 
610
            # This indicates that some smart verb call is not guarded
 
611
            # appropriately (it should simply not have been tried).
577
612
            raise AssertionError(
578
613
                "_remember_remote_is_before(%r) called, but "
579
614
                "_remember_remote_is_before(%r) was called previously."
617
652
 
618
653
    def disconnect(self):
619
654
        """If this medium maintains a persistent connection, close it.
620
 
        
 
655
 
621
656
        The default implementation does nothing.
622
657
        """
623
 
        
 
658
 
624
659
    def remote_path_from_transport(self, transport):
625
660
        """Convert transport into a path suitable for using in a request.
626
 
        
 
661
 
627
662
        Note that the resulting remote path doesn't encode the host name or
628
663
        anything but path, so it is only safe to use it in requests sent over
629
664
        the medium from the matching transport.
657
692
 
658
693
    def _flush(self):
659
694
        """Flush the output stream.
660
 
        
 
695
 
661
696
        This method is used by the SmartClientStreamMediumRequest to ensure that
662
697
        all data for a request is sent, to avoid long timeouts or deadlocks.
663
698
        """
674
709
 
675
710
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
676
711
    """A client medium using simple pipes.
677
 
    
 
712
 
678
713
    This client does not manage the pipes: it assumes they will always be open.
679
714
    """
680
715
 
685
720
 
686
721
    def _accept_bytes(self, bytes):
687
722
        """See SmartClientStreamMedium.accept_bytes."""
688
 
        self._writeable_pipe.write(bytes)
 
723
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
 
724
        self._report_activity(len(bytes), 'write')
689
725
 
690
726
    def _flush(self):
691
727
        """See SmartClientStreamMedium._flush()."""
692
 
        self._writeable_pipe.flush()
 
728
        osutils.until_no_eintr(self._writeable_pipe.flush)
693
729
 
694
730
    def _read_bytes(self, count):
695
731
        """See SmartClientStreamMedium._read_bytes."""
696
 
        return self._readable_pipe.read(count)
 
732
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
733
        self._report_activity(len(bytes), 'read')
 
734
        return bytes
697
735
 
698
736
 
699
737
class SmartSSHClientMedium(SmartClientStreamMedium):
700
738
    """A client medium using SSH."""
701
 
    
 
739
 
702
740
    def __init__(self, host, port=None, username=None, password=None,
703
741
            base=None, vendor=None, bzr_remote_path=None):
704
742
        """Creates a client that will connect on the first use.
705
 
        
 
743
 
706
744
        :param vendor: An optional override for the ssh vendor to use. See
707
745
            bzrlib.transport.ssh for details on ssh vendors.
708
746
        """
709
 
        SmartClientStreamMedium.__init__(self, base)
710
747
        self._connected = False
711
748
        self._host = host
712
749
        self._password = password
713
750
        self._port = port
714
751
        self._username = username
 
752
        # for the benefit of progress making a short description of this
 
753
        # transport
 
754
        self._scheme = 'bzr+ssh'
 
755
        # SmartClientStreamMedium stores the repr of this object in its
 
756
        # _DebugCounter so we have to store all the values used in our repr
 
757
        # method before calling the super init.
 
758
        SmartClientStreamMedium.__init__(self, base)
715
759
        self._read_from = None
716
760
        self._ssh_connection = None
717
761
        self._vendor = vendor
718
762
        self._write_to = None
719
763
        self._bzr_remote_path = bzr_remote_path
720
 
        if self._bzr_remote_path is None:
721
 
            symbol_versioning.warn(
722
 
                'bzr_remote_path is required as of bzr 0.92',
723
 
                DeprecationWarning, stacklevel=2)
724
 
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
764
 
 
765
    def __repr__(self):
 
766
        if self._port is None:
 
767
            maybe_port = ''
 
768
        else:
 
769
            maybe_port = ':%s' % self._port
 
770
        return "%s(%s://%s@%s%s/)" % (
 
771
            self.__class__.__name__,
 
772
            self._scheme,
 
773
            self._username,
 
774
            self._host,
 
775
            maybe_port)
725
776
 
726
777
    def _accept_bytes(self, bytes):
727
778
        """See SmartClientStreamMedium.accept_bytes."""
728
779
        self._ensure_connection()
729
 
        self._write_to.write(bytes)
 
780
        osutils.until_no_eintr(self._write_to.write, bytes)
 
781
        self._report_activity(len(bytes), 'write')
730
782
 
731
783
    def disconnect(self):
732
784
        """See SmartClientMedium.disconnect()."""
733
785
        if not self._connected:
734
786
            return
735
 
        self._read_from.close()
736
 
        self._write_to.close()
 
787
        osutils.until_no_eintr(self._read_from.close)
 
788
        osutils.until_no_eintr(self._write_to.close)
737
789
        self._ssh_connection.close()
738
790
        self._connected = False
739
791
 
762
814
        if not self._connected:
763
815
            raise errors.MediumNotConnected(self)
764
816
        bytes_to_read = min(count, _MAX_READ_SIZE)
765
 
        return self._read_from.read(bytes_to_read)
 
817
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
 
818
        self._report_activity(len(bytes), 'read')
 
819
        return bytes
766
820
 
767
821
 
768
822
# Port 4155 is the default port for bzr://, registered with IANA.
772
826
 
773
827
class SmartTCPClientMedium(SmartClientStreamMedium):
774
828
    """A client medium using TCP."""
775
 
    
 
829
 
776
830
    def __init__(self, host, port, base):
777
831
        """Creates a client that will connect on the first use."""
778
832
        SmartClientStreamMedium.__init__(self, base)
784
838
    def _accept_bytes(self, bytes):
785
839
        """See SmartClientMedium.accept_bytes."""
786
840
        self._ensure_connection()
787
 
        osutils.send_all(self._socket, bytes)
 
841
        osutils.send_all(self._socket, bytes, self._report_activity)
788
842
 
789
843
    def disconnect(self):
790
844
        """See SmartClientMedium.disconnect()."""
791
845
        if not self._connected:
792
846
            return
793
 
        self._socket.close()
 
847
        osutils.until_no_eintr(self._socket.close)
794
848
        self._socket = None
795
849
        self._connected = False
796
850
 
803
857
        else:
804
858
            port = int(self._port)
805
859
        try:
806
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC, 
 
860
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
807
861
                socket.SOCK_STREAM, 0, 0)
808
862
        except socket.gaierror, (err_num, err_msg):
809
863
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
813
867
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
814
868
            try:
815
869
                self._socket = socket.socket(family, socktype, proto)
816
 
                self._socket.setsockopt(socket.IPPROTO_TCP, 
 
870
                self._socket.setsockopt(socket.IPPROTO_TCP,
817
871
                                        socket.TCP_NODELAY, 1)
818
872
                self._socket.connect(sockaddr)
819
873
            except socket.error, err:
835
889
 
836
890
    def _flush(self):
837
891
        """See SmartClientStreamMedium._flush().
838
 
        
839
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
892
 
 
893
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
840
894
        add a means to do a flush, but that can be done in the future.
841
895
        """
842
896
 
844
898
        """See SmartClientMedium.read_bytes."""
845
899
        if not self._connected:
846
900
            raise errors.MediumNotConnected(self)
847
 
        # We ignore the desired_count because on sockets it's more efficient to
848
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
849
 
        try:
850
 
            return self._socket.recv(_MAX_READ_SIZE)
851
 
        except socket.error, e:
852
 
            if len(e.args) and e.args[0] == errno.ECONNRESET:
853
 
                # Callers expect an empty string in that case
854
 
                return ''
855
 
            else:
856
 
                raise
 
901
        return _read_bytes_from_socket(
 
902
            self._socket.recv, count, self._report_activity)
857
903
 
858
904
 
859
905
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
872
918
 
873
919
    def _accept_bytes(self, bytes):
874
920
        """See SmartClientMediumRequest._accept_bytes.
875
 
        
 
921
 
876
922
        This forwards to self._medium._accept_bytes because we are operating
877
923
        on the mediums stream.
878
924
        """
881
927
    def _finished_reading(self):
882
928
        """See SmartClientMediumRequest._finished_reading.
883
929
 
884
 
        This clears the _current_request on self._medium to allow a new 
 
930
        This clears the _current_request on self._medium to allow a new
885
931
        request to be created.
886
932
        """
887
933
        if self._medium._current_request is not self:
888
934
            raise AssertionError()
889
935
        self._medium._current_request = None
890
 
        
 
936
 
891
937
    def _finished_writing(self):
892
938
        """See SmartClientMediumRequest._finished_writing.
893
939
 
895
941
        """
896
942
        self._medium._flush()
897
943
 
 
944
 
 
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
 
946
    # We ignore the desired_count because on sockets it's more efficient to
 
947
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
948
    try:
 
949
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
 
950
    except socket.error, e:
 
951
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
 
952
            # The connection was closed by the other side.  Callers expect an
 
953
            # empty string to signal end-of-stream.
 
954
            bytes = ''
 
955
        else:
 
956
            raise
 
957
    else:
 
958
        report_activity(len(bytes), 'read')
 
959
    return bytes
 
960