~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-06 20:42:40 UTC
  • mto: This revision was merged to the branch mainline in revision 4088.
  • Revision ID: john@arbash-meinel.com-20090306204240-mzjavv31z3gu1x7i
Fix a small bug in setup.py when an extension fails to build

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""The 'medium' layer for the smart servers and clients.
18
18
 
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
36
 
import thread
37
36
import weakref
38
 
 
39
37
from bzrlib import (
40
38
    debug,
41
39
    errors,
 
40
    osutils,
42
41
    symbol_versioning,
43
42
    trace,
44
 
    ui,
45
43
    urlutils,
46
44
    )
47
 
from bzrlib.smart import client, protocol, request, vfs
 
45
from bzrlib.smart import client, protocol
48
46
from bzrlib.transport import ssh
49
47
""")
50
 
#usually already imported, and getting IllegalScoperReplacer on it here.
51
 
from bzrlib import osutils
 
48
 
52
49
 
53
50
# We must not read any more than 64k at a time so we don't risk "no buffer
54
51
# space available" errors on some platforms.  Windows in particular is likely
165
162
        self._push_back(excess)
166
163
        return line
167
164
 
168
 
    def _report_activity(self, bytes, direction):
169
 
        """Notify that this medium has activity.
170
 
 
171
 
        Implementations should call this from all methods that actually do IO.
172
 
        Be careful that it's not called twice, if one method is implemented on
173
 
        top of another.
174
 
 
175
 
        :param bytes: Number of bytes read or written.
176
 
        :param direction: 'read' or 'write' or None.
177
 
        """
178
 
        ui.ui_factory.report_transport_activity(self, bytes, direction)
179
 
 
180
165
 
181
166
class SmartServerStreamMedium(SmartMedium):
182
167
    """Handles smart commands coming over a stream.
287
272
        self._push_back(protocol.unused_data)
288
273
 
289
274
    def _read_bytes(self, desired_count):
290
 
        return _read_bytes_from_socket(
291
 
            self.socket.recv, desired_count, self._report_activity)
 
275
        # We ignore the desired_count because on sockets it's more efficient to
 
276
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
277
        return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
292
278
 
293
279
    def terminate_due_to_error(self):
294
280
        # TODO: This should log to a server log file, but no such thing
295
281
        # exists yet.  Andrew Bennetts 2006-09-29.
296
 
        osutils.until_no_eintr(self.socket.close)
 
282
        self.socket.close()
297
283
        self.finished = True
298
284
 
299
285
    def _write_out(self, bytes):
300
 
        tstart = osutils.timer_func()
301
 
        osutils.send_all(self.socket, bytes, self._report_activity)
302
 
        if 'hpss' in debug.debug_flags:
303
 
            thread_id = thread.get_ident()
304
 
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
305
 
                         % ('wrote', thread_id, len(bytes),
306
 
                            osutils.timer_func() - tstart))
 
286
        osutils.send_all(self.socket, bytes)
307
287
 
308
288
 
309
289
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
334
314
            bytes_to_read = protocol.next_read_size()
335
315
            if bytes_to_read == 0:
336
316
                # Finished serving this request.
337
 
                osutils.until_no_eintr(self._out.flush)
 
317
                self._out.flush()
338
318
                return
339
319
            bytes = self.read_bytes(bytes_to_read)
340
320
            if bytes == '':
341
321
                # Connection has been closed.
342
322
                self.finished = True
343
 
                osutils.until_no_eintr(self._out.flush)
 
323
                self._out.flush()
344
324
                return
345
325
            protocol.accept_bytes(bytes)
346
326
 
347
327
    def _read_bytes(self, desired_count):
348
 
        return osutils.until_no_eintr(self._in.read, desired_count)
 
328
        return self._in.read(desired_count)
349
329
 
350
330
    def terminate_due_to_error(self):
351
331
        # TODO: This should log to a server log file, but no such thing
352
332
        # exists yet.  Andrew Bennetts 2006-09-29.
353
 
        osutils.until_no_eintr(self._out.close)
 
333
        self._out.close()
354
334
        self.finished = True
355
335
 
356
336
    def _write_out(self, bytes):
357
 
        osutils.until_no_eintr(self._out.write, bytes)
 
337
        self._out.write(bytes)
358
338
 
359
339
 
360
340
class SmartClientMediumRequest(object):
386
366
    def accept_bytes(self, bytes):
387
367
        """Accept bytes for inclusion in this request.
388
368
 
389
 
        This method may not be called after finished_writing() has been
 
369
        This method may not be be called after finished_writing() has been
390
370
        called.  It depends upon the Medium whether or not the bytes will be
391
371
        immediately transmitted. Message based Mediums will tend to buffer the
392
372
        bytes until finished_writing() is called.
480
460
        if not line.endswith('\n'):
481
461
            # end of file encountered reading from server
482
462
            raise errors.ConnectionReset(
483
 
                "Unexpected end of message. Please check connectivity "
484
 
                "and permissions, and report a bug if problems persist.")
 
463
                "please check connectivity and permissions")
485
464
        return line
486
465
 
487
466
    def _read_line(self):
515
494
        """
516
495
        medium_repr = repr(medium)
517
496
        # Add this medium to the WeakKeyDictionary
518
 
        self.counts[medium] = dict(count=0, vfs_count=0,
519
 
                                   medium_repr=medium_repr)
 
497
        self.counts[medium] = [0, medium_repr]
520
498
        # Weakref callbacks are fired in reverse order of their association
521
499
        # with the referenced object.  So we add a weakref *after* adding to
522
500
        # the WeakKeyDict so that we can report the value from it before the
526
504
    def increment_call_count(self, params):
527
505
        # Increment the count in the WeakKeyDictionary
528
506
        value = self.counts[params.medium]
529
 
        value['count'] += 1
530
 
        try:
531
 
            request_method = request.request_handlers.get(params.method)
532
 
        except KeyError:
533
 
            # A method we don't know about doesn't count as a VFS method.
534
 
            return
535
 
        if issubclass(request_method, vfs.VfsRequest):
536
 
            value['vfs_count'] += 1
 
507
        value[0] += 1
537
508
 
538
509
    def done(self, ref):
539
510
        value = self.counts[ref]
540
 
        count, vfs_count, medium_repr = (
541
 
            value['count'], value['vfs_count'], value['medium_repr'])
 
511
        count, medium_repr = value
542
512
        # In case this callback is invoked for the same ref twice (by the
543
513
        # weakref callback and by the atexit function), set the call count back
544
514
        # to 0 so this item won't be reported twice.
545
 
        value['count'] = 0
546
 
        value['vfs_count'] = 0
 
515
        value[0] = 0
547
516
        if count != 0:
548
 
            trace.note('HPSS calls: %d (%d vfs) %s',
549
 
                       count, vfs_count, medium_repr)
 
517
            trace.note('HPSS calls: %d %s', count, medium_repr)
550
518
 
551
519
    def flush_all(self):
552
520
        for ref in list(self.counts.keys()):
720
688
 
721
689
    def _accept_bytes(self, bytes):
722
690
        """See SmartClientStreamMedium.accept_bytes."""
723
 
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
724
 
        self._report_activity(len(bytes), 'write')
 
691
        self._writeable_pipe.write(bytes)
725
692
 
726
693
    def _flush(self):
727
694
        """See SmartClientStreamMedium._flush()."""
728
 
        osutils.until_no_eintr(self._writeable_pipe.flush)
 
695
        self._writeable_pipe.flush()
729
696
 
730
697
    def _read_bytes(self, count):
731
698
        """See SmartClientStreamMedium._read_bytes."""
732
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
733
 
        self._report_activity(len(bytes), 'read')
734
 
        return bytes
 
699
        return self._readable_pipe.read(count)
735
700
 
736
701
 
737
702
class SmartSSHClientMedium(SmartClientStreamMedium):
744
709
        :param vendor: An optional override for the ssh vendor to use. See
745
710
            bzrlib.transport.ssh for details on ssh vendors.
746
711
        """
 
712
        SmartClientStreamMedium.__init__(self, base)
747
713
        self._connected = False
748
714
        self._host = host
749
715
        self._password = password
750
716
        self._port = port
751
717
        self._username = username
752
 
        # for the benefit of progress making a short description of this
753
 
        # transport
754
 
        self._scheme = 'bzr+ssh'
755
 
        # SmartClientStreamMedium stores the repr of this object in its
756
 
        # _DebugCounter so we have to store all the values used in our repr
757
 
        # method before calling the super init.
758
 
        SmartClientStreamMedium.__init__(self, base)
759
718
        self._read_from = None
760
719
        self._ssh_connection = None
761
720
        self._vendor = vendor
762
721
        self._write_to = None
763
722
        self._bzr_remote_path = bzr_remote_path
764
 
 
765
 
    def __repr__(self):
766
 
        if self._port is None:
767
 
            maybe_port = ''
768
 
        else:
769
 
            maybe_port = ':%s' % self._port
770
 
        return "%s(%s://%s@%s%s/)" % (
771
 
            self.__class__.__name__,
772
 
            self._scheme,
773
 
            self._username,
774
 
            self._host,
775
 
            maybe_port)
 
723
        if self._bzr_remote_path is None:
 
724
            symbol_versioning.warn(
 
725
                'bzr_remote_path is required as of bzr 0.92',
 
726
                DeprecationWarning, stacklevel=2)
 
727
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
776
728
 
777
729
    def _accept_bytes(self, bytes):
778
730
        """See SmartClientStreamMedium.accept_bytes."""
779
731
        self._ensure_connection()
780
 
        osutils.until_no_eintr(self._write_to.write, bytes)
781
 
        self._report_activity(len(bytes), 'write')
 
732
        self._write_to.write(bytes)
782
733
 
783
734
    def disconnect(self):
784
735
        """See SmartClientMedium.disconnect()."""
785
736
        if not self._connected:
786
737
            return
787
 
        osutils.until_no_eintr(self._read_from.close)
788
 
        osutils.until_no_eintr(self._write_to.close)
 
738
        self._read_from.close()
 
739
        self._write_to.close()
789
740
        self._ssh_connection.close()
790
741
        self._connected = False
791
742
 
814
765
        if not self._connected:
815
766
            raise errors.MediumNotConnected(self)
816
767
        bytes_to_read = min(count, _MAX_READ_SIZE)
817
 
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
818
 
        self._report_activity(len(bytes), 'read')
819
 
        return bytes
 
768
        return self._read_from.read(bytes_to_read)
820
769
 
821
770
 
822
771
# Port 4155 is the default port for bzr://, registered with IANA.
838
787
    def _accept_bytes(self, bytes):
839
788
        """See SmartClientMedium.accept_bytes."""
840
789
        self._ensure_connection()
841
 
        osutils.send_all(self._socket, bytes, self._report_activity)
 
790
        osutils.send_all(self._socket, bytes)
842
791
 
843
792
    def disconnect(self):
844
793
        """See SmartClientMedium.disconnect()."""
845
794
        if not self._connected:
846
795
            return
847
 
        osutils.until_no_eintr(self._socket.close)
 
796
        self._socket.close()
848
797
        self._socket = None
849
798
        self._connected = False
850
799
 
898
847
        """See SmartClientMedium.read_bytes."""
899
848
        if not self._connected:
900
849
            raise errors.MediumNotConnected(self)
901
 
        return _read_bytes_from_socket(
902
 
            self._socket.recv, count, self._report_activity)
 
850
        # We ignore the desired_count because on sockets it's more efficient to
 
851
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
852
        try:
 
853
            return self._socket.recv(_MAX_READ_SIZE)
 
854
        except socket.error, e:
 
855
            if len(e.args) and e.args[0] == errno.ECONNRESET:
 
856
                # Callers expect an empty string in that case
 
857
                return ''
 
858
            else:
 
859
                raise
903
860
 
904
861
 
905
862
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
941
898
        """
942
899
        self._medium._flush()
943
900
 
944
 
 
945
 
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
 
    # We ignore the desired_count because on sockets it's more efficient to
947
 
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
948
 
    try:
949
 
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
950
 
    except socket.error, e:
951
 
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
 
            # The connection was closed by the other side.  Callers expect an
953
 
            # empty string to signal end-of-stream.
954
 
            bytes = ''
955
 
        else:
956
 
            raise
957
 
    else:
958
 
        report_activity(len(bytes), 'read')
959
 
    return bytes
960