~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-25 21:50:11 UTC
  • mfrom: (0.11.3 tools)
  • mto: This revision was merged to the branch mainline in revision 3659.
  • Revision ID: john@arbash-meinel.com-20080825215011-de9esmzgkue3e522
Merge in Lukáš's helper scripts.
Update the packaging documents to describe how to do the releases
using bzr-builddeb to package all distro platforms
simultaneously.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
28
import socket
30
29
import sys
32
31
 
33
32
from bzrlib.lazy_import import lazy_import
34
33
lazy_import(globals(), """
35
 
import atexit
36
 
import thread
37
 
import weakref
38
 
 
39
34
from bzrlib import (
40
 
    debug,
41
35
    errors,
 
36
    osutils,
42
37
    symbol_versioning,
43
 
    trace,
44
 
    ui,
45
38
    urlutils,
46
39
    )
47
 
from bzrlib.smart import client, protocol, request, vfs
 
40
from bzrlib.smart import protocol
48
41
from bzrlib.transport import ssh
49
42
""")
50
 
#usually already imported, and getting IllegalScoperReplacer on it here.
51
 
from bzrlib import osutils
 
43
 
52
44
 
53
45
# We must not read any more than 64k at a time so we don't risk "no buffer
54
46
# space available" errors on some platforms.  Windows in particular is likely
90
82
 
91
83
def _get_line(read_bytes_func):
92
84
    """Read bytes using read_bytes_func until a newline byte.
93
 
 
 
85
    
94
86
    This isn't particularly efficient, so should only be used when the
95
87
    expected size of the line is quite short.
96
 
 
 
88
    
97
89
    :returns: a tuple of two strs: (line, excess)
98
90
    """
99
91
    newline_pos = -1
115
107
 
116
108
    def __init__(self):
117
109
        self._push_back_buffer = None
118
 
 
 
110
        
119
111
    def _push_back(self, bytes):
120
112
        """Return unused bytes to the medium, because they belong to the next
121
113
        request(s).
155
147
 
156
148
    def _get_line(self):
157
149
        """Read bytes from this request's response until a newline byte.
158
 
 
 
150
        
159
151
        This isn't particularly efficient, so should only be used when the
160
152
        expected size of the line is quite short.
161
153
 
164
156
        line, excess = _get_line(self.read_bytes)
165
157
        self._push_back(excess)
166
158
        return line
167
 
 
168
 
    def _report_activity(self, bytes, direction):
169
 
        """Notify that this medium has activity.
170
 
 
171
 
        Implementations should call this from all methods that actually do IO.
172
 
        Be careful that it's not called twice, if one method is implemented on
173
 
        top of another.
174
 
 
175
 
        :param bytes: Number of bytes read or written.
176
 
        :param direction: 'read' or 'write' or None.
177
 
        """
178
 
        ui.ui_factory.report_transport_activity(self, bytes, direction)
179
 
 
 
159
 
180
160
 
181
161
class SmartServerStreamMedium(SmartMedium):
182
162
    """Handles smart commands coming over a stream.
187
167
    One instance is created for each connected client; it can serve multiple
188
168
    requests in the lifetime of the connection.
189
169
 
190
 
    The server passes requests through to an underlying backing transport,
 
170
    The server passes requests through to an underlying backing transport, 
191
171
    which will typically be a LocalTransport looking at the server's filesystem.
192
172
 
193
173
    :ivar _push_back_buffer: a str of bytes that have been read from the stream
238
218
 
239
219
    def _serve_one_request(self, protocol):
240
220
        """Read one request from input, process, send back a response.
241
 
 
 
221
        
242
222
        :param protocol: a SmartServerRequestProtocol.
243
223
        """
244
224
        try:
283
263
                self.finished = True
284
264
                return
285
265
            protocol.accept_bytes(bytes)
286
 
 
 
266
        
287
267
        self._push_back(protocol.unused_data)
288
268
 
289
269
    def _read_bytes(self, desired_count):
290
 
        return _read_bytes_from_socket(
291
 
            self.socket.recv, desired_count, self._report_activity)
 
270
        # We ignore the desired_count because on sockets it's more efficient to
 
271
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
272
        return self.socket.recv(_MAX_READ_SIZE)
292
273
 
293
274
    def terminate_due_to_error(self):
294
275
        # TODO: This should log to a server log file, but no such thing
295
276
        # exists yet.  Andrew Bennetts 2006-09-29.
296
 
        osutils.until_no_eintr(self.socket.close)
 
277
        self.socket.close()
297
278
        self.finished = True
298
279
 
299
280
    def _write_out(self, bytes):
300
 
        tstart = osutils.timer_func()
301
 
        osutils.send_all(self.socket, bytes, self._report_activity)
302
 
        if 'hpss' in debug.debug_flags:
303
 
            thread_id = thread.get_ident()
304
 
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
 
                         % ('wrote', thread_id, len(bytes),
306
 
                            osutils.timer_func() - tstart))
 
281
        osutils.send_all(self.socket, bytes)
307
282
 
308
283
 
309
284
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
309
            bytes_to_read = protocol.next_read_size()
335
310
            if bytes_to_read == 0:
336
311
                # Finished serving this request.
337
 
                osutils.until_no_eintr(self._out.flush)
 
312
                self._out.flush()
338
313
                return
339
314
            bytes = self.read_bytes(bytes_to_read)
340
315
            if bytes == '':
341
316
                # Connection has been closed.
342
317
                self.finished = True
343
 
                osutils.until_no_eintr(self._out.flush)
 
318
                self._out.flush()
344
319
                return
345
320
            protocol.accept_bytes(bytes)
346
321
 
347
322
    def _read_bytes(self, desired_count):
348
 
        return osutils.until_no_eintr(self._in.read, desired_count)
 
323
        return self._in.read(desired_count)
349
324
 
350
325
    def terminate_due_to_error(self):
351
326
        # TODO: This should log to a server log file, but no such thing
352
327
        # exists yet.  Andrew Bennetts 2006-09-29.
353
 
        osutils.until_no_eintr(self._out.close)
 
328
        self._out.close()
354
329
        self.finished = True
355
330
 
356
331
    def _write_out(self, bytes):
357
 
        osutils.until_no_eintr(self._out.write, bytes)
 
332
        self._out.write(bytes)
358
333
 
359
334
 
360
335
class SmartClientMediumRequest(object):
370
345
    request.finished_reading()
371
346
 
372
347
    It is up to the individual SmartClientMedium whether multiple concurrent
373
 
    requests can exist. See SmartClientMedium.get_request to obtain instances
374
 
    of SmartClientMediumRequest, and the concrete Medium you are using for
 
348
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
349
    of SmartClientMediumRequest, and the concrete Medium you are using for 
375
350
    details on concurrency and pipelining.
376
351
    """
377
352
 
386
361
    def accept_bytes(self, bytes):
387
362
        """Accept bytes for inclusion in this request.
388
363
 
389
 
        This method may not be called after finished_writing() has been
 
364
        This method may not be be called after finished_writing() has been
390
365
        called.  It depends upon the Medium whether or not the bytes will be
391
366
        immediately transmitted. Message based Mediums will tend to buffer the
392
367
        bytes until finished_writing() is called.
423
398
    def _finished_reading(self):
424
399
        """Helper for finished_reading.
425
400
 
426
 
        finished_reading checks the state of the request to determine if
 
401
        finished_reading checks the state of the request to determine if 
427
402
        finished_reading is allowed, and if it is hands off to _finished_reading
428
403
        to perform the action.
429
404
        """
443
418
    def _finished_writing(self):
444
419
        """Helper for finished_writing.
445
420
 
446
 
        finished_writing checks the state of the request to determine if
 
421
        finished_writing checks the state of the request to determine if 
447
422
        finished_writing is allowed, and if it is hands off to _finished_writing
448
423
        to perform the action.
449
424
        """
469
444
        read_bytes checks the state of the request to determing if bytes
470
445
        should be read. After that it hands off to _read_bytes to do the
471
446
        actual read.
472
 
 
 
447
        
473
448
        By default this forwards to self._medium.read_bytes because we are
474
449
        operating on the medium's stream.
475
450
        """
480
455
        if not line.endswith('\n'):
481
456
            # end of file encountered reading from server
482
457
            raise errors.ConnectionReset(
483
 
                "Unexpected end of message. Please check connectivity "
484
 
                "and permissions, and report a bug if problems persist.")
 
458
                "please check connectivity and permissions",
 
459
                "(and try -Dhpss if further diagnosis is required)")
485
460
        return line
486
461
 
487
462
    def _read_line(self):
488
463
        """Helper for SmartClientMediumRequest.read_line.
489
 
 
 
464
        
490
465
        By default this forwards to self._medium._get_line because we are
491
466
        operating on the medium's stream.
492
467
        """
493
468
        return self._medium._get_line()
494
469
 
495
470
 
496
 
class _DebugCounter(object):
497
 
    """An object that counts the HPSS calls made to each client medium.
498
 
 
499
 
    When a medium is garbage-collected, or failing that when atexit functions
500
 
    are run, the total number of calls made on that medium are reported via
501
 
    trace.note.
502
 
    """
503
 
 
504
 
    def __init__(self):
505
 
        self.counts = weakref.WeakKeyDictionary()
506
 
        client._SmartClient.hooks.install_named_hook(
507
 
            'call', self.increment_call_count, 'hpss call counter')
508
 
        atexit.register(self.flush_all)
509
 
 
510
 
    def track(self, medium):
511
 
        """Start tracking calls made to a medium.
512
 
 
513
 
        This only keeps a weakref to the medium, so shouldn't affect the
514
 
        medium's lifetime.
515
 
        """
516
 
        medium_repr = repr(medium)
517
 
        # Add this medium to the WeakKeyDictionary
518
 
        self.counts[medium] = dict(count=0, vfs_count=0,
519
 
                                   medium_repr=medium_repr)
520
 
        # Weakref callbacks are fired in reverse order of their association
521
 
        # with the referenced object.  So we add a weakref *after* adding to
522
 
        # the WeakKeyDict so that we can report the value from it before the
523
 
        # entry is removed by the WeakKeyDict's own callback.
524
 
        ref = weakref.ref(medium, self.done)
525
 
 
526
 
    def increment_call_count(self, params):
527
 
        # Increment the count in the WeakKeyDictionary
528
 
        value = self.counts[params.medium]
529
 
        value['count'] += 1
530
 
        try:
531
 
            request_method = request.request_handlers.get(params.method)
532
 
        except KeyError:
533
 
            # A method we don't know about doesn't count as a VFS method.
534
 
            return
535
 
        if issubclass(request_method, vfs.VfsRequest):
536
 
            value['vfs_count'] += 1
537
 
 
538
 
    def done(self, ref):
539
 
        value = self.counts[ref]
540
 
        count, vfs_count, medium_repr = (
541
 
            value['count'], value['vfs_count'], value['medium_repr'])
542
 
        # In case this callback is invoked for the same ref twice (by the
543
 
        # weakref callback and by the atexit function), set the call count back
544
 
        # to 0 so this item won't be reported twice.
545
 
        value['count'] = 0
546
 
        value['vfs_count'] = 0
547
 
        if count != 0:
548
 
            trace.note('HPSS calls: %d (%d vfs) %s',
549
 
                       count, vfs_count, medium_repr)
550
 
 
551
 
    def flush_all(self):
552
 
        for ref in list(self.counts.keys()):
553
 
            self.done(ref)
554
 
 
555
 
_debug_counter = None
556
 
 
557
 
 
558
471
class SmartClientMedium(SmartMedium):
559
472
    """Smart client is a medium for sending smart protocol requests over."""
560
473
 
569
482
        # _remote_version_is_before tracks the bzr version the remote side
570
483
        # can be based on what we've seen so far.
571
484
        self._remote_version_is_before = None
572
 
        # Install debug hook function if debug flag is set.
573
 
        if 'hpss' in debug.debug_flags:
574
 
            global _debug_counter
575
 
            if _debug_counter is None:
576
 
                _debug_counter = _DebugCounter()
577
 
            _debug_counter.track(self)
578
485
 
579
486
    def _is_remote_before(self, version_tuple):
580
487
        """Is it possible the remote side supports RPCs for a given version?
605
512
        """
606
513
        if (self._remote_version_is_before is not None and
607
514
            version_tuple > self._remote_version_is_before):
608
 
            # We have been told that the remote side is older than some version
609
 
            # which is newer than a previously supplied older-than version.
610
 
            # This indicates that some smart verb call is not guarded
611
 
            # appropriately (it should simply not have been tried).
612
515
            raise AssertionError(
613
516
                "_remember_remote_is_before(%r) called, but "
614
517
                "_remember_remote_is_before(%r) was called previously."
652
555
 
653
556
    def disconnect(self):
654
557
        """If this medium maintains a persistent connection, close it.
655
 
 
 
558
        
656
559
        The default implementation does nothing.
657
560
        """
658
 
 
 
561
        
659
562
    def remote_path_from_transport(self, transport):
660
563
        """Convert transport into a path suitable for using in a request.
661
 
 
 
564
        
662
565
        Note that the resulting remote path doesn't encode the host name or
663
566
        anything but path, so it is only safe to use it in requests sent over
664
567
        the medium from the matching transport.
692
595
 
693
596
    def _flush(self):
694
597
        """Flush the output stream.
695
 
 
 
598
        
696
599
        This method is used by the SmartClientStreamMediumRequest to ensure that
697
600
        all data for a request is sent, to avoid long timeouts or deadlocks.
698
601
        """
709
612
 
710
613
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
711
614
    """A client medium using simple pipes.
712
 
 
 
615
    
713
616
    This client does not manage the pipes: it assumes they will always be open.
714
617
    """
715
618
 
720
623
 
721
624
    def _accept_bytes(self, bytes):
722
625
        """See SmartClientStreamMedium.accept_bytes."""
723
 
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
 
        self._report_activity(len(bytes), 'write')
 
626
        self._writeable_pipe.write(bytes)
725
627
 
726
628
    def _flush(self):
727
629
        """See SmartClientStreamMedium._flush()."""
728
 
        osutils.until_no_eintr(self._writeable_pipe.flush)
 
630
        self._writeable_pipe.flush()
729
631
 
730
632
    def _read_bytes(self, count):
731
633
        """See SmartClientStreamMedium._read_bytes."""
732
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
 
        self._report_activity(len(bytes), 'read')
734
 
        return bytes
 
634
        return self._readable_pipe.read(count)
735
635
 
736
636
 
737
637
class SmartSSHClientMedium(SmartClientStreamMedium):
738
638
    """A client medium using SSH."""
739
 
 
 
639
    
740
640
    def __init__(self, host, port=None, username=None, password=None,
741
641
            base=None, vendor=None, bzr_remote_path=None):
742
642
        """Creates a client that will connect on the first use.
743
 
 
 
643
        
744
644
        :param vendor: An optional override for the ssh vendor to use. See
745
645
            bzrlib.transport.ssh for details on ssh vendors.
746
646
        """
 
647
        SmartClientStreamMedium.__init__(self, base)
747
648
        self._connected = False
748
649
        self._host = host
749
650
        self._password = password
750
651
        self._port = port
751
652
        self._username = username
752
 
        # for the benefit of progress making a short description of this
753
 
        # transport
754
 
        self._scheme = 'bzr+ssh'
755
 
        # SmartClientStreamMedium stores the repr of this object in its
756
 
        # _DebugCounter so we have to store all the values used in our repr
757
 
        # method before calling the super init.
758
 
        SmartClientStreamMedium.__init__(self, base)
759
653
        self._read_from = None
760
654
        self._ssh_connection = None
761
655
        self._vendor = vendor
762
656
        self._write_to = None
763
657
        self._bzr_remote_path = bzr_remote_path
764
 
 
765
 
    def __repr__(self):
766
 
        if self._port is None:
767
 
            maybe_port = ''
768
 
        else:
769
 
            maybe_port = ':%s' % self._port
770
 
        return "%s(%s://%s@%s%s/)" % (
771
 
            self.__class__.__name__,
772
 
            self._scheme,
773
 
            self._username,
774
 
            self._host,
775
 
            maybe_port)
 
658
        if self._bzr_remote_path is None:
 
659
            symbol_versioning.warn(
 
660
                'bzr_remote_path is required as of bzr 0.92',
 
661
                DeprecationWarning, stacklevel=2)
 
662
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
776
663
 
777
664
    def _accept_bytes(self, bytes):
778
665
        """See SmartClientStreamMedium.accept_bytes."""
779
666
        self._ensure_connection()
780
 
        osutils.until_no_eintr(self._write_to.write, bytes)
781
 
        self._report_activity(len(bytes), 'write')
 
667
        self._write_to.write(bytes)
782
668
 
783
669
    def disconnect(self):
784
670
        """See SmartClientMedium.disconnect()."""
785
671
        if not self._connected:
786
672
            return
787
 
        osutils.until_no_eintr(self._read_from.close)
788
 
        osutils.until_no_eintr(self._write_to.close)
 
673
        self._read_from.close()
 
674
        self._write_to.close()
789
675
        self._ssh_connection.close()
790
676
        self._connected = False
791
677
 
814
700
        if not self._connected:
815
701
            raise errors.MediumNotConnected(self)
816
702
        bytes_to_read = min(count, _MAX_READ_SIZE)
817
 
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
 
        self._report_activity(len(bytes), 'read')
819
 
        return bytes
 
703
        return self._read_from.read(bytes_to_read)
820
704
 
821
705
 
822
706
# Port 4155 is the default port for bzr://, registered with IANA.
823
 
BZR_DEFAULT_INTERFACE = None
 
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
824
708
BZR_DEFAULT_PORT = 4155
825
709
 
826
710
 
827
711
class SmartTCPClientMedium(SmartClientStreamMedium):
828
712
    """A client medium using TCP."""
829
 
 
 
713
    
830
714
    def __init__(self, host, port, base):
831
715
        """Creates a client that will connect on the first use."""
832
716
        SmartClientStreamMedium.__init__(self, base)
838
722
    def _accept_bytes(self, bytes):
839
723
        """See SmartClientMedium.accept_bytes."""
840
724
        self._ensure_connection()
841
 
        osutils.send_all(self._socket, bytes, self._report_activity)
 
725
        osutils.send_all(self._socket, bytes)
842
726
 
843
727
    def disconnect(self):
844
728
        """See SmartClientMedium.disconnect()."""
845
729
        if not self._connected:
846
730
            return
847
 
        osutils.until_no_eintr(self._socket.close)
 
731
        self._socket.close()
848
732
        self._socket = None
849
733
        self._connected = False
850
734
 
852
736
        """Connect this medium if not already connected."""
853
737
        if self._connected:
854
738
            return
 
739
        self._socket = socket.socket()
 
740
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
855
741
        if self._port is None:
856
742
            port = BZR_DEFAULT_PORT
857
743
        else:
858
744
            port = int(self._port)
859
745
        try:
860
 
            sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
861
 
                socket.SOCK_STREAM, 0, 0)
862
 
        except socket.gaierror, (err_num, err_msg):
863
 
            raise errors.ConnectionError("failed to lookup %s:%d: %s" %
864
 
                    (self._host, port, err_msg))
865
 
        # Initialize err in case there are no addresses returned:
866
 
        err = socket.error("no address found for %s" % self._host)
867
 
        for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
868
 
            try:
869
 
                self._socket = socket.socket(family, socktype, proto)
870
 
                self._socket.setsockopt(socket.IPPROTO_TCP,
871
 
                                        socket.TCP_NODELAY, 1)
872
 
                self._socket.connect(sockaddr)
873
 
            except socket.error, err:
874
 
                if self._socket is not None:
875
 
                    self._socket.close()
876
 
                self._socket = None
877
 
                continue
878
 
            break
879
 
        if self._socket is None:
 
746
            self._socket.connect((self._host, port))
 
747
        except socket.error, err:
880
748
            # socket errors either have a (string) or (errno, string) as their
881
749
            # args.
882
750
            if type(err.args) is str:
889
757
 
890
758
    def _flush(self):
891
759
        """See SmartClientStreamMedium._flush().
892
 
 
893
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
760
        
 
761
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
894
762
        add a means to do a flush, but that can be done in the future.
895
763
        """
896
764
 
898
766
        """See SmartClientMedium.read_bytes."""
899
767
        if not self._connected:
900
768
            raise errors.MediumNotConnected(self)
901
 
        return _read_bytes_from_socket(
902
 
            self._socket.recv, count, self._report_activity)
 
769
        # We ignore the desired_count because on sockets it's more efficient to
 
770
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
771
        return self._socket.recv(_MAX_READ_SIZE)
903
772
 
904
773
 
905
774
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
918
787
 
919
788
    def _accept_bytes(self, bytes):
920
789
        """See SmartClientMediumRequest._accept_bytes.
921
 
 
 
790
        
922
791
        This forwards to self._medium._accept_bytes because we are operating
923
792
        on the mediums stream.
924
793
        """
927
796
    def _finished_reading(self):
928
797
        """See SmartClientMediumRequest._finished_reading.
929
798
 
930
 
        This clears the _current_request on self._medium to allow a new
 
799
        This clears the _current_request on self._medium to allow a new 
931
800
        request to be created.
932
801
        """
933
802
        if self._medium._current_request is not self:
934
803
            raise AssertionError()
935
804
        self._medium._current_request = None
936
 
 
 
805
        
937
806
    def _finished_writing(self):
938
807
        """See SmartClientMediumRequest._finished_writing.
939
808
 
941
810
        """
942
811
        self._medium._flush()
943
812
 
944
 
 
945
 
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
 
    # We ignore the desired_count because on sockets it's more efficient to
947
 
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
948
 
    try:
949
 
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
 
    except socket.error, e:
951
 
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
 
            # The connection was closed by the other side.  Callers expect an
953
 
            # empty string to signal end-of-stream.
954
 
            bytes = ''
955
 
        else:
956
 
            raise
957
 
    else:
958
 
        report_activity(len(bytes), 'read')
959
 
    return bytes
960