~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Joe Julian
  • Date: 2010-01-10 02:25:31 UTC
  • mto: (4634.119.7 2.0)
  • mto: This revision was merged to the branch mainline in revision 4959.
  • Revision ID: joe@julianfamily.org-20100110022531-wqk61rsagz8xsiga
Added MANIFEST.in to allow bdist_rpm to have all the required include files and tools. bdist_rpm will still fail to build correctly on some distributions due to a disttools bug http://bugs.python.org/issue644744

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
from bzrlib import (
38
38
    debug,
39
39
    errors,
40
 
    osutils,
41
40
    symbol_versioning,
42
41
    trace,
43
42
    ui,
44
43
    urlutils,
45
44
    )
46
 
from bzrlib.smart import client, protocol
 
45
from bzrlib.smart import client, protocol, request, vfs
47
46
from bzrlib.transport import ssh
48
47
""")
49
 
 
 
48
#usually already imported, and getting IllegalScoperReplacer on it here.
 
49
from bzrlib import osutils
50
50
 
51
51
# We must not read any more than 64k at a time so we don't risk "no buffer
52
52
# space available" errors on some platforms.  Windows in particular is likely
285
285
        self._push_back(protocol.unused_data)
286
286
 
287
287
    def _read_bytes(self, desired_count):
288
 
        # We ignore the desired_count because on sockets it's more efficient to
289
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
 
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
 
        self._report_activity(len(bytes), 'read')
292
 
        return bytes
 
288
        return _read_bytes_from_socket(
 
289
            self.socket.recv, desired_count, self._report_activity)
293
290
 
294
291
    def terminate_due_to_error(self):
295
292
        # TODO: This should log to a server log file, but no such thing
296
293
        # exists yet.  Andrew Bennetts 2006-09-29.
297
 
        self.socket.close()
 
294
        osutils.until_no_eintr(self.socket.close)
298
295
        self.finished = True
299
296
 
300
297
    def _write_out(self, bytes):
329
326
            bytes_to_read = protocol.next_read_size()
330
327
            if bytes_to_read == 0:
331
328
                # Finished serving this request.
332
 
                self._out.flush()
 
329
                osutils.until_no_eintr(self._out.flush)
333
330
                return
334
331
            bytes = self.read_bytes(bytes_to_read)
335
332
            if bytes == '':
336
333
                # Connection has been closed.
337
334
                self.finished = True
338
 
                self._out.flush()
 
335
                osutils.until_no_eintr(self._out.flush)
339
336
                return
340
337
            protocol.accept_bytes(bytes)
341
338
 
342
339
    def _read_bytes(self, desired_count):
343
 
        return self._in.read(desired_count)
 
340
        return osutils.until_no_eintr(self._in.read, desired_count)
344
341
 
345
342
    def terminate_due_to_error(self):
346
343
        # TODO: This should log to a server log file, but no such thing
347
344
        # exists yet.  Andrew Bennetts 2006-09-29.
348
 
        self._out.close()
 
345
        osutils.until_no_eintr(self._out.close)
349
346
        self.finished = True
350
347
 
351
348
    def _write_out(self, bytes):
352
 
        self._out.write(bytes)
 
349
        osutils.until_no_eintr(self._out.write, bytes)
353
350
 
354
351
 
355
352
class SmartClientMediumRequest(object):
381
378
    def accept_bytes(self, bytes):
382
379
        """Accept bytes for inclusion in this request.
383
380
 
384
 
        This method may not be be called after finished_writing() has been
 
381
        This method may not be called after finished_writing() has been
385
382
        called.  It depends upon the Medium whether or not the bytes will be
386
383
        immediately transmitted. Message based Mediums will tend to buffer the
387
384
        bytes until finished_writing() is called.
475
472
        if not line.endswith('\n'):
476
473
            # end of file encountered reading from server
477
474
            raise errors.ConnectionReset(
478
 
                "please check connectivity and permissions")
 
475
                "Unexpected end of message. Please check connectivity "
 
476
                "and permissions, and report a bug if problems persist.")
479
477
        return line
480
478
 
481
479
    def _read_line(self):
509
507
        """
510
508
        medium_repr = repr(medium)
511
509
        # Add this medium to the WeakKeyDictionary
512
 
        self.counts[medium] = [0, medium_repr]
 
510
        self.counts[medium] = dict(count=0, vfs_count=0,
 
511
                                   medium_repr=medium_repr)
513
512
        # Weakref callbacks are fired in reverse order of their association
514
513
        # with the referenced object.  So we add a weakref *after* adding to
515
514
        # the WeakKeyDict so that we can report the value from it before the
519
518
    def increment_call_count(self, params):
520
519
        # Increment the count in the WeakKeyDictionary
521
520
        value = self.counts[params.medium]
522
 
        value[0] += 1
 
521
        value['count'] += 1
 
522
        try:
 
523
            request_method = request.request_handlers.get(params.method)
 
524
        except KeyError:
 
525
            # A method we don't know about doesn't count as a VFS method.
 
526
            return
 
527
        if issubclass(request_method, vfs.VfsRequest):
 
528
            value['vfs_count'] += 1
523
529
 
524
530
    def done(self, ref):
525
531
        value = self.counts[ref]
526
 
        count, medium_repr = value
 
532
        count, vfs_count, medium_repr = (
 
533
            value['count'], value['vfs_count'], value['medium_repr'])
527
534
        # In case this callback is invoked for the same ref twice (by the
528
535
        # weakref callback and by the atexit function), set the call count back
529
536
        # to 0 so this item won't be reported twice.
530
 
        value[0] = 0
 
537
        value['count'] = 0
 
538
        value['vfs_count'] = 0
531
539
        if count != 0:
532
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
 
540
            trace.note('HPSS calls: %d (%d vfs) %s',
 
541
                       count, vfs_count, medium_repr)
533
542
 
534
543
    def flush_all(self):
535
544
        for ref in list(self.counts.keys()):
703
712
 
704
713
    def _accept_bytes(self, bytes):
705
714
        """See SmartClientStreamMedium.accept_bytes."""
706
 
        self._writeable_pipe.write(bytes)
 
715
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
707
716
        self._report_activity(len(bytes), 'write')
708
717
 
709
718
    def _flush(self):
710
719
        """See SmartClientStreamMedium._flush()."""
711
 
        self._writeable_pipe.flush()
 
720
        osutils.until_no_eintr(self._writeable_pipe.flush)
712
721
 
713
722
    def _read_bytes(self, count):
714
723
        """See SmartClientStreamMedium._read_bytes."""
715
 
        bytes = self._readable_pipe.read(count)
 
724
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
716
725
        self._report_activity(len(bytes), 'read')
717
726
        return bytes
718
727
 
756
765
    def _accept_bytes(self, bytes):
757
766
        """See SmartClientStreamMedium.accept_bytes."""
758
767
        self._ensure_connection()
759
 
        self._write_to.write(bytes)
 
768
        osutils.until_no_eintr(self._write_to.write, bytes)
760
769
        self._report_activity(len(bytes), 'write')
761
770
 
762
771
    def disconnect(self):
763
772
        """See SmartClientMedium.disconnect()."""
764
773
        if not self._connected:
765
774
            return
766
 
        self._read_from.close()
767
 
        self._write_to.close()
 
775
        osutils.until_no_eintr(self._read_from.close)
 
776
        osutils.until_no_eintr(self._write_to.close)
768
777
        self._ssh_connection.close()
769
778
        self._connected = False
770
779
 
793
802
        if not self._connected:
794
803
            raise errors.MediumNotConnected(self)
795
804
        bytes_to_read = min(count, _MAX_READ_SIZE)
796
 
        bytes = self._read_from.read(bytes_to_read)
 
805
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
797
806
        self._report_activity(len(bytes), 'read')
798
807
        return bytes
799
808
 
823
832
        """See SmartClientMedium.disconnect()."""
824
833
        if not self._connected:
825
834
            return
826
 
        self._socket.close()
 
835
        osutils.until_no_eintr(self._socket.close)
827
836
        self._socket = None
828
837
        self._connected = False
829
838
 
877
886
        """See SmartClientMedium.read_bytes."""
878
887
        if not self._connected:
879
888
            raise errors.MediumNotConnected(self)
880
 
        # We ignore the desired_count because on sockets it's more efficient to
881
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
882
 
        try:
883
 
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
 
        except socket.error, e:
885
 
            if len(e.args) and e.args[0] == errno.ECONNRESET:
886
 
                # Callers expect an empty string in that case
887
 
                return ''
888
 
            else:
889
 
                raise
890
 
        else:
891
 
            self._report_activity(len(bytes), 'read')
892
 
            return bytes
 
889
        return _read_bytes_from_socket(
 
890
            self._socket.recv, count, self._report_activity)
893
891
 
894
892
 
895
893
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
931
929
        """
932
930
        self._medium._flush()
933
931
 
 
932
 
 
933
def _read_bytes_from_socket(sock, desired_count, report_activity):
 
934
    # We ignore the desired_count because on sockets it's more efficient to
 
935
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
936
    try:
 
937
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
 
938
    except socket.error, e:
 
939
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
 
940
            # The connection was closed by the other side.  Callers expect an
 
941
            # empty string to signal end-of-stream.
 
942
            bytes = ''
 
943
        else:
 
944
            raise
 
945
    else:
 
946
        report_activity(len(bytes), 'read')
 
947
    return bytes
 
948