~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: John Arbash Meinel
  • Date: 2008-11-25 17:15:26 UTC
  • mto: This revision was merged to the branch mainline in revision 3851.
  • Revision ID: john@arbash-meinel.com-20081125171526-pi2g4m1w70pkie1f
Add a bit of help text when supplying --help.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
36
 
import thread
37
36
import weakref
38
 
 
39
37
from bzrlib import (
40
38
    debug,
41
39
    errors,
 
40
    osutils,
42
41
    symbol_versioning,
43
42
    trace,
44
 
    ui,
45
43
    urlutils,
46
44
    )
47
 
from bzrlib.smart import client, protocol, request, vfs
 
45
from bzrlib.smart import client, protocol
48
46
from bzrlib.transport import ssh
49
47
""")
50
 
#usually already imported, and getting IllegalScoperReplacer on it here.
51
 
from bzrlib import osutils
 
48
 
52
49
 
53
50
# We must not read any more than 64k at a time so we don't risk "no buffer
54
51
# space available" errors on some platforms.  Windows in particular is likely
90
87
 
91
88
def _get_line(read_bytes_func):
92
89
    """Read bytes using read_bytes_func until a newline byte.
93
 
 
 
90
    
94
91
    This isn't particularly efficient, so should only be used when the
95
92
    expected size of the line is quite short.
96
 
 
 
93
    
97
94
    :returns: a tuple of two strs: (line, excess)
98
95
    """
99
96
    newline_pos = -1
115
112
 
116
113
    def __init__(self):
117
114
        self._push_back_buffer = None
118
 
 
 
115
        
119
116
    def _push_back(self, bytes):
120
117
        """Return unused bytes to the medium, because they belong to the next
121
118
        request(s).
155
152
 
156
153
    def _get_line(self):
157
154
        """Read bytes from this request's response until a newline byte.
158
 
 
 
155
        
159
156
        This isn't particularly efficient, so should only be used when the
160
157
        expected size of the line is quite short.
161
158
 
164
161
        line, excess = _get_line(self.read_bytes)
165
162
        self._push_back(excess)
166
163
        return line
167
 
 
168
 
    def _report_activity(self, bytes, direction):
169
 
        """Notify that this medium has activity.
170
 
 
171
 
        Implementations should call this from all methods that actually do IO.
172
 
        Be careful that it's not called twice, if one method is implemented on
173
 
        top of another.
174
 
 
175
 
        :param bytes: Number of bytes read or written.
176
 
        :param direction: 'read' or 'write' or None.
177
 
        """
178
 
        ui.ui_factory.report_transport_activity(self, bytes, direction)
179
 
 
 
164
 
180
165
 
181
166
class SmartServerStreamMedium(SmartMedium):
182
167
    """Handles smart commands coming over a stream.
187
172
    One instance is created for each connected client; it can serve multiple
188
173
    requests in the lifetime of the connection.
189
174
 
190
 
    The server passes requests through to an underlying backing transport,
 
175
    The server passes requests through to an underlying backing transport, 
191
176
    which will typically be a LocalTransport looking at the server's filesystem.
192
177
 
193
178
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
238
223
 
239
224
    def _serve_one_request(self, protocol):
240
225
        """Read one request from input, process, send back a response.
241
 
 
 
226
        
242
227
        :param protocol: a SmartServerRequestProtocol.
243
228
        """
244
229
        try:
283
268
                self.finished = True
284
269
                return
285
270
            protocol.accept_bytes(bytes)
286
 
 
 
271
        
287
272
        self._push_back(protocol.unused_data)
288
273
 
289
274
    def _read_bytes(self, desired_count):
290
 
        return _read_bytes_from_socket(
291
 
            self.socket.recv, desired_count, self._report_activity)
 
275
        # We ignore the desired_count because on sockets it's more efficient to
 
276
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
277
        return self.socket.recv(_MAX_READ_SIZE)
292
278
 
293
279
    def terminate_due_to_error(self):
294
280
        # TODO: This should log to a server log file, but no such thing
295
281
        # exists yet.  Andrew Bennetts 2006-09-29.
296
 
        osutils.until_no_eintr(self.socket.close)
 
282
        self.socket.close()
297
283
        self.finished = True
298
284
 
299
285
    def _write_out(self, bytes):
300
 
        tstart = osutils.timer_func()
301
 
        osutils.send_all(self.socket, bytes, self._report_activity)
302
 
        if 'hpss' in debug.debug_flags:
303
 
            thread_id = thread.get_ident()
304
 
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
 
                         % ('wrote', thread_id, len(bytes),
306
 
                            osutils.timer_func() - tstart))
 
286
        osutils.send_all(self.socket, bytes)
307
287
 
308
288
 
309
289
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
314
            bytes_to_read = protocol.next_read_size()
335
315
            if bytes_to_read == 0:
336
316
                # Finished serving this request.
337
 
                osutils.until_no_eintr(self._out.flush)
 
317
                self._out.flush()
338
318
                return
339
319
            bytes = self.read_bytes(bytes_to_read)
340
320
            if bytes == '':
341
321
                # Connection has been closed.
342
322
                self.finished = True
343
 
                osutils.until_no_eintr(self._out.flush)
 
323
                self._out.flush()
344
324
                return
345
325
            protocol.accept_bytes(bytes)
346
326
 
347
327
    def _read_bytes(self, desired_count):
348
 
        return osutils.until_no_eintr(self._in.read, desired_count)
 
328
        return self._in.read(desired_count)
349
329
 
350
330
    def terminate_due_to_error(self):
351
331
        # TODO: This should log to a server log file, but no such thing
352
332
        # exists yet.  Andrew Bennetts 2006-09-29.
353
 
        osutils.until_no_eintr(self._out.close)
 
333
        self._out.close()
354
334
        self.finished = True
355
335
 
356
336
    def _write_out(self, bytes):
357
 
        osutils.until_no_eintr(self._out.write, bytes)
 
337
        self._out.write(bytes)
358
338
 
359
339
 
360
340
class SmartClientMediumRequest(object):
370
350
    request.finished_reading()
371
351
 
372
352
    It is up to the individual SmartClientMedium whether multiple concurrent
373
 
    requests can exist. See SmartClientMedium.get_request to obtain instances
374
 
    of SmartClientMediumRequest, and the concrete Medium you are using for
 
353
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
354
    of SmartClientMediumRequest, and the concrete Medium you are using for 
375
355
    details on concurrency and pipelining.
376
356
    """
377
357
 
386
366
    def accept_bytes(self, bytes):
387
367
        """Accept bytes for inclusion in this request.
388
368
 
389
 
        This method may not be called after finished_writing() has been
 
369
        This method may not be be called after finished_writing() has been
390
370
        called.  It depends upon the Medium whether or not the bytes will be
391
371
        immediately transmitted. Message based Mediums will tend to buffer the
392
372
        bytes until finished_writing() is called.
423
403
    def _finished_reading(self):
424
404
        """Helper for finished_reading.
425
405
 
426
 
        finished_reading checks the state of the request to determine if
 
406
        finished_reading checks the state of the request to determine if 
427
407
        finished_reading is allowed, and if it is hands off to _finished_reading
428
408
        to perform the action.
429
409
        """
443
423
    def _finished_writing(self):
444
424
        """Helper for finished_writing.
445
425
 
446
 
        finished_writing checks the state of the request to determine if
 
426
        finished_writing checks the state of the request to determine if 
447
427
        finished_writing is allowed, and if it is hands off to _finished_writing
448
428
        to perform the action.
449
429
        """
469
449
        read_bytes checks the state of the request to determing if bytes
470
450
        should be read. After that it hands off to _read_bytes to do the
471
451
        actual read.
472
 
 
 
452
        
473
453
        By default this forwards to self._medium.read_bytes because we are
474
454
        operating on the medium's stream.
475
455
        """
480
460
        if not line.endswith('\n'):
481
461
            # end of file encountered reading from server
482
462
            raise errors.ConnectionReset(
483
 
                "Unexpected end of message. Please check connectivity "
484
 
                "and permissions, and report a bug if problems persist.")
 
463
                "please check connectivity and permissions",
 
464
                "(and try -Dhpss if further diagnosis is required)")
485
465
        return line
486
466
 
487
467
    def _read_line(self):
488
468
        """Helper for SmartClientMediumRequest.read_line.
489
 
 
 
469
        
490
470
        By default this forwards to self._medium._get_line because we are
491
471
        operating on the medium's stream.
492
472
        """
515
495
        """
516
496
        medium_repr = repr(medium)
517
497
        # Add this medium to the WeakKeyDictionary
518
 
        self.counts[medium] = dict(count=0, vfs_count=0,
519
 
                                   medium_repr=medium_repr)
 
498
        self.counts[medium] = [0, medium_repr]
520
499
        # Weakref callbacks are fired in reverse order of their association
521
500
        # with the referenced object.  So we add a weakref *after* adding to
522
501
        # the WeakKeyDict so that we can report the value from it before the
526
505
    def increment_call_count(self, params):
527
506
        # Increment the count in the WeakKeyDictionary
528
507
        value = self.counts[params.medium]
529
 
        value['count'] += 1
530
 
        try:
531
 
            request_method = request.request_handlers.get(params.method)
532
 
        except KeyError:
533
 
            # A method we don't know about doesn't count as a VFS method.
534
 
            return
535
 
        if issubclass(request_method, vfs.VfsRequest):
536
 
            value['vfs_count'] += 1
 
508
        value[0] += 1
537
509
 
538
510
    def done(self, ref):
539
511
        value = self.counts[ref]
540
 
        count, vfs_count, medium_repr = (
541
 
            value['count'], value['vfs_count'], value['medium_repr'])
 
512
        count, medium_repr = value
542
513
        # In case this callback is invoked for the same ref twice (by the
543
514
        # weakref callback and by the atexit function), set the call count back
544
515
        # to 0 so this item won't be reported twice.
545
 
        value['count'] = 0
546
 
        value['vfs_count'] = 0
 
516
        value[0] = 0
547
517
        if count != 0:
548
 
            trace.note('HPSS calls: %d (%d vfs) %s',
549
 
                       count, vfs_count, medium_repr)
550
 
 
 
518
            trace.note('HPSS calls: %d %s', count, medium_repr)
 
519
        
551
520
    def flush_all(self):
552
521
        for ref in list(self.counts.keys()):
553
522
            self.done(ref)
554
523
 
555
524
_debug_counter = None
556
 
 
557
 
 
 
525
  
 
526
  
558
527
class SmartClientMedium(SmartMedium):
559
528
    """Smart client is a medium for sending smart protocol requests over."""
560
529
 
605
574
        """
606
575
        if (self._remote_version_is_before is not None and
607
576
            version_tuple > self._remote_version_is_before):
608
 
            # We have been told that the remote side is older than some version
609
 
            # which is newer than a previously supplied older-than version.
610
 
            # This indicates that some smart verb call is not guarded
611
 
            # appropriately (it should simply not have been tried).
612
577
            raise AssertionError(
613
578
                "_remember_remote_is_before(%r) called, but "
614
579
                "_remember_remote_is_before(%r) was called previously."
652
617
 
653
618
    def disconnect(self):
654
619
        """If this medium maintains a persistent connection, close it.
655
 
 
 
620
        
656
621
        The default implementation does nothing.
657
622
        """
658
 
 
 
623
        
659
624
    def remote_path_from_transport(self, transport):
660
625
        """Convert transport into a path suitable for using in a request.
661
 
 
 
626
        
662
627
        Note that the resulting remote path doesn't encode the host name or
663
628
        anything but path, so it is only safe to use it in requests sent over
664
629
        the medium from the matching transport.
692
657
 
693
658
    def _flush(self):
694
659
        """Flush the output stream.
695
 
 
 
660
        
696
661
        This method is used by the SmartClientStreamMediumRequest to ensure that
697
662
        all data for a request is sent, to avoid long timeouts or deadlocks.
698
663
        """
709
674
 
710
675
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
711
676
    """A client medium using simple pipes.
712
 
 
 
677
    
713
678
    This client does not manage the pipes: it assumes they will always be open.
714
679
    """
715
680
 
720
685
 
721
686
    def _accept_bytes(self, bytes):
722
687
        """See SmartClientStreamMedium.accept_bytes."""
723
 
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
 
        self._report_activity(len(bytes), 'write')
 
688
        self._writeable_pipe.write(bytes)
725
689
 
726
690
    def _flush(self):
727
691
        """See SmartClientStreamMedium._flush()."""
728
 
        osutils.until_no_eintr(self._writeable_pipe.flush)
 
692
        self._writeable_pipe.flush()
729
693
 
730
694
    def _read_bytes(self, count):
731
695
        """See SmartClientStreamMedium._read_bytes."""
732
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
 
        self._report_activity(len(bytes), 'read')
734
 
        return bytes
 
696
        return self._readable_pipe.read(count)
735
697
 
736
698
 
737
699
class SmartSSHClientMedium(SmartClientStreamMedium):
738
700
    """A client medium using SSH."""
739
 
 
 
701
    
740
702
    def __init__(self, host, port=None, username=None, password=None,
741
703
            base=None, vendor=None, bzr_remote_path=None):
742
704
        """Creates a client that will connect on the first use.
743
 
 
 
705
        
744
706
        :param vendor: An optional override for the ssh vendor to use. See
745
707
            bzrlib.transport.ssh for details on ssh vendors.
746
708
        """
 
709
        SmartClientStreamMedium.__init__(self, base)
747
710
        self._connected = False
748
711
        self._host = host
749
712
        self._password = password
750
713
        self._port = port
751
714
        self._username = username
752
 
        # for the benefit of progress making a short description of this
753
 
        # transport
754
 
        self._scheme = 'bzr+ssh'
755
 
        # SmartClientStreamMedium stores the repr of this object in its
756
 
        # _DebugCounter so we have to store all the values used in our repr
757
 
        # method before calling the super init.
758
 
        SmartClientStreamMedium.__init__(self, base)
759
715
        self._read_from = None
760
716
        self._ssh_connection = None
761
717
        self._vendor = vendor
762
718
        self._write_to = None
763
719
        self._bzr_remote_path = bzr_remote_path
764
 
 
765
 
    def __repr__(self):
766
 
        if self._port is None:
767
 
            maybe_port = ''
768
 
        else:
769
 
            maybe_port = ':%s' % self._port
770
 
        return "%s(%s://%s@%s%s/)" % (
771
 
            self.__class__.__name__,
772
 
            self._scheme,
773
 
            self._username,
774
 
            self._host,
775
 
            maybe_port)
 
720
        if self._bzr_remote_path is None:
 
721
            symbol_versioning.warn(
 
722
                'bzr_remote_path is required as of bzr 0.92',
 
723
                DeprecationWarning, stacklevel=2)
 
724
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
776
725
 
777
726
    def _accept_bytes(self, bytes):
778
727
        """See SmartClientStreamMedium.accept_bytes."""
779
728
        self._ensure_connection()
780
 
        osutils.until_no_eintr(self._write_to.write, bytes)
781
 
        self._report_activity(len(bytes), 'write')
 
729
        self._write_to.write(bytes)
782
730
 
783
731
    def disconnect(self):
784
732
        """See SmartClientMedium.disconnect()."""
785
733
        if not self._connected:
786
734
            return
787
 
        osutils.until_no_eintr(self._read_from.close)
788
 
        osutils.until_no_eintr(self._write_to.close)
 
735
        self._read_from.close()
 
736
        self._write_to.close()
789
737
        self._ssh_connection.close()
790
738
        self._connected = False
791
739
 
814
762
        if not self._connected:
815
763
            raise errors.MediumNotConnected(self)
816
764
        bytes_to_read = min(count, _MAX_READ_SIZE)
817
 
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
 
        self._report_activity(len(bytes), 'read')
819
 
        return bytes
 
765
        return self._read_from.read(bytes_to_read)
820
766
 
821
767
 
822
768
# Port 4155 is the default port for bzr://, registered with IANA.
826
772
 
827
773
class SmartTCPClientMedium(SmartClientStreamMedium):
828
774
    """A client medium using TCP."""
829
 
 
 
775
    
830
776
    def __init__(self, host, port, base):
831
777
        """Creates a client that will connect on the first use."""
832
778
        SmartClientStreamMedium.__init__(self, base)
838
784
    def _accept_bytes(self, bytes):
839
785
        """See SmartClientMedium.accept_bytes."""
840
786
        self._ensure_connection()
841
 
        osutils.send_all(self._socket, bytes, self._report_activity)
 
787
        osutils.send_all(self._socket, bytes)
842
788
 
843
789
    def disconnect(self):
844
790
        """See SmartClientMedium.disconnect()."""
845
791
        if not self._connected:
846
792
            return
847
 
        osutils.until_no_eintr(self._socket.close)
 
793
        self._socket.close()
848
794
        self._socket = None
849
795
        self._connected = False
850
796
 
857
803
        else:
858
804
            port = int(self._port)
859
805
        try:
860
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
 
806
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC, 
861
807
                socket.SOCK_STREAM, 0, 0)
862
808
        except socket.gaierror, (err_num, err_msg):
863
809
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
867
813
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
868
814
            try:
869
815
                self._socket = socket.socket(family, socktype, proto)
870
 
                self._socket.setsockopt(socket.IPPROTO_TCP,
 
816
                self._socket.setsockopt(socket.IPPROTO_TCP, 
871
817
                                        socket.TCP_NODELAY, 1)
872
818
                self._socket.connect(sockaddr)
873
819
            except socket.error, err:
889
835
 
890
836
    def _flush(self):
891
837
        """See SmartClientStreamMedium._flush().
892
 
 
893
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
838
        
 
839
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
894
840
        add a means to do a flush, but that can be done in the future.
895
841
        """
896
842
 
898
844
        """See SmartClientMedium.read_bytes."""
899
845
        if not self._connected:
900
846
            raise errors.MediumNotConnected(self)
901
 
        return _read_bytes_from_socket(
902
 
            self._socket.recv, count, self._report_activity)
 
847
        # We ignore the desired_count because on sockets it's more efficient to
 
848
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
849
        try:
 
850
            return self._socket.recv(_MAX_READ_SIZE)
 
851
        except socket.error, e:
 
852
            if len(e.args) and e.args[0] == errno.ECONNRESET:
 
853
                # Callers expect an empty string in that case
 
854
                return ''
 
855
            else:
 
856
                raise
903
857
 
904
858
 
905
859
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
918
872
 
919
873
    def _accept_bytes(self, bytes):
920
874
        """See SmartClientMediumRequest._accept_bytes.
921
 
 
 
875
        
922
876
        This forwards to self._medium._accept_bytes because we are operating
923
877
        on the mediums stream.
924
878
        """
927
881
    def _finished_reading(self):
928
882
        """See SmartClientMediumRequest._finished_reading.
929
883
 
930
 
        This clears the _current_request on self._medium to allow a new
 
884
        This clears the _current_request on self._medium to allow a new 
931
885
        request to be created.
932
886
        """
933
887
        if self._medium._current_request is not self:
934
888
            raise AssertionError()
935
889
        self._medium._current_request = None
936
 
 
 
890
        
937
891
    def _finished_writing(self):
938
892
        """See SmartClientMediumRequest._finished_writing.
939
893
 
941
895
        """
942
896
        self._medium._flush()
943
897
 
944
 
 
945
 
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
 
    # We ignore the desired_count because on sockets it's more efficient to
947
 
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
948
 
    try:
949
 
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
 
    except socket.error, e:
951
 
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
 
            # The connection was closed by the other side.  Callers expect an
953
 
            # empty string to signal end-of-stream.
954
 
            bytes = ''
955
 
        else:
956
 
            raise
957
 
    else:
958
 
        report_activity(len(bytes), 'read')
959
 
    return bytes
960