~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Joe Julian
  • Date: 2010-01-10 02:25:31 UTC
  • mto: (4634.119.7 2.0)
  • mto: This revision was merged to the branch mainline in revision 4959.
  • Revision ID: joe@julianfamily.org-20100110022531-wqk61rsagz8xsiga
Added MANIFEST.in to allow bdist_rpm to have all the required include files and tools. bdist_rpm will still fail to build correctly on some distributions due to a disttools bug http://bugs.python.org/issue644744

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
from bzrlib import (
38
38
    debug,
39
39
    errors,
40
 
    osutils,
41
40
    symbol_versioning,
42
41
    trace,
43
42
    ui,
44
43
    urlutils,
45
44
    )
46
 
from bzrlib.smart import client, protocol
 
45
from bzrlib.smart import client, protocol, request, vfs
47
46
from bzrlib.transport import ssh
48
47
""")
49
 
 
 
48
#usually already imported, and getting IllegalScoperReplacer on it here.
 
49
from bzrlib import osutils
50
50
 
51
51
# We must not read any more than 64k at a time so we don't risk "no buffer
52
52
# space available" errors on some platforms.  Windows in particular is likely
285
285
        self._push_back(protocol.unused_data)
286
286
 
287
287
    def _read_bytes(self, desired_count):
288
 
        # We ignore the desired_count because on sockets it's more efficient to
289
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
 
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
 
        self._report_activity(len(bytes), 'read')
292
 
        return bytes
 
288
        return _read_bytes_from_socket(
 
289
            self.socket.recv, desired_count, self._report_activity)
293
290
 
294
291
    def terminate_due_to_error(self):
295
292
        # TODO: This should log to a server log file, but no such thing
296
293
        # exists yet.  Andrew Bennetts 2006-09-29.
297
 
        self.socket.close()
 
294
        osutils.until_no_eintr(self.socket.close)
298
295
        self.finished = True
299
296
 
300
297
    def _write_out(self, bytes):
329
326
            bytes_to_read = protocol.next_read_size()
330
327
            if bytes_to_read == 0:
331
328
                # Finished serving this request.
332
 
                self._out.flush()
 
329
                osutils.until_no_eintr(self._out.flush)
333
330
                return
334
331
            bytes = self.read_bytes(bytes_to_read)
335
332
            if bytes == '':
336
333
                # Connection has been closed.
337
334
                self.finished = True
338
 
                self._out.flush()
 
335
                osutils.until_no_eintr(self._out.flush)
339
336
                return
340
337
            protocol.accept_bytes(bytes)
341
338
 
342
339
    def _read_bytes(self, desired_count):
343
 
        return self._in.read(desired_count)
 
340
        return osutils.until_no_eintr(self._in.read, desired_count)
344
341
 
345
342
    def terminate_due_to_error(self):
346
343
        # TODO: This should log to a server log file, but no such thing
347
344
        # exists yet.  Andrew Bennetts 2006-09-29.
348
 
        self._out.close()
 
345
        osutils.until_no_eintr(self._out.close)
349
346
        self.finished = True
350
347
 
351
348
    def _write_out(self, bytes):
352
 
        self._out.write(bytes)
 
349
        osutils.until_no_eintr(self._out.write, bytes)
353
350
 
354
351
 
355
352
class SmartClientMediumRequest(object):
475
472
        if not line.endswith('\n'):
476
473
            # end of file encountered reading from server
477
474
            raise errors.ConnectionReset(
478
 
                "please check connectivity and permissions")
 
475
                "Unexpected end of message. Please check connectivity "
 
476
                "and permissions, and report a bug if problems persist.")
479
477
        return line
480
478
 
481
479
    def _read_line(self):
509
507
        """
510
508
        medium_repr = repr(medium)
511
509
        # Add this medium to the WeakKeyDictionary
512
 
        self.counts[medium] = [0, medium_repr]
 
510
        self.counts[medium] = dict(count=0, vfs_count=0,
 
511
                                   medium_repr=medium_repr)
513
512
        # Weakref callbacks are fired in reverse order of their association
514
513
        # with the referenced object.  So we add a weakref *after* adding to
515
514
        # the WeakKeyDict so that we can report the value from it before the
519
518
    def increment_call_count(self, params):
520
519
        # Increment the count in the WeakKeyDictionary
521
520
        value = self.counts[params.medium]
522
 
        value[0] += 1
 
521
        value['count'] += 1
 
522
        try:
 
523
            request_method = request.request_handlers.get(params.method)
 
524
        except KeyError:
 
525
            # A method we don't know about doesn't count as a VFS method.
 
526
            return
 
527
        if issubclass(request_method, vfs.VfsRequest):
 
528
            value['vfs_count'] += 1
523
529
 
524
530
    def done(self, ref):
525
531
        value = self.counts[ref]
526
 
        count, medium_repr = value
 
532
        count, vfs_count, medium_repr = (
 
533
            value['count'], value['vfs_count'], value['medium_repr'])
527
534
        # In case this callback is invoked for the same ref twice (by the
528
535
        # weakref callback and by the atexit function), set the call count back
529
536
        # to 0 so this item won't be reported twice.
530
 
        value[0] = 0
 
537
        value['count'] = 0
 
538
        value['vfs_count'] = 0
531
539
        if count != 0:
532
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
 
540
            trace.note('HPSS calls: %d (%d vfs) %s',
 
541
                       count, vfs_count, medium_repr)
533
542
 
534
543
    def flush_all(self):
535
544
        for ref in list(self.counts.keys()):
703
712
 
704
713
    def _accept_bytes(self, bytes):
705
714
        """See SmartClientStreamMedium.accept_bytes."""
706
 
        self._writeable_pipe.write(bytes)
 
715
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
707
716
        self._report_activity(len(bytes), 'write')
708
717
 
709
718
    def _flush(self):
710
719
        """See SmartClientStreamMedium._flush()."""
711
 
        self._writeable_pipe.flush()
 
720
        osutils.until_no_eintr(self._writeable_pipe.flush)
712
721
 
713
722
    def _read_bytes(self, count):
714
723
        """See SmartClientStreamMedium._read_bytes."""
715
 
        bytes = self._readable_pipe.read(count)
 
724
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
716
725
        self._report_activity(len(bytes), 'read')
717
726
        return bytes
718
727
 
756
765
    def _accept_bytes(self, bytes):
757
766
        """See SmartClientStreamMedium.accept_bytes."""
758
767
        self._ensure_connection()
759
 
        self._write_to.write(bytes)
 
768
        osutils.until_no_eintr(self._write_to.write, bytes)
760
769
        self._report_activity(len(bytes), 'write')
761
770
 
762
771
    def disconnect(self):
763
772
        """See SmartClientMedium.disconnect()."""
764
773
        if not self._connected:
765
774
            return
766
 
        self._read_from.close()
767
 
        self._write_to.close()
 
775
        osutils.until_no_eintr(self._read_from.close)
 
776
        osutils.until_no_eintr(self._write_to.close)
768
777
        self._ssh_connection.close()
769
778
        self._connected = False
770
779
 
793
802
        if not self._connected:
794
803
            raise errors.MediumNotConnected(self)
795
804
        bytes_to_read = min(count, _MAX_READ_SIZE)
796
 
        bytes = self._read_from.read(bytes_to_read)
 
805
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
797
806
        self._report_activity(len(bytes), 'read')
798
807
        return bytes
799
808
 
823
832
        """See SmartClientMedium.disconnect()."""
824
833
        if not self._connected:
825
834
            return
826
 
        self._socket.close()
 
835
        osutils.until_no_eintr(self._socket.close)
827
836
        self._socket = None
828
837
        self._connected = False
829
838
 
877
886
        """See SmartClientMedium.read_bytes."""
878
887
        if not self._connected:
879
888
            raise errors.MediumNotConnected(self)
880
 
        # We ignore the desired_count because on sockets it's more efficient to
881
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
882
 
        try:
883
 
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
 
        except socket.error, e:
885
 
            if len(e.args) and e.args[0] == errno.ECONNRESET:
886
 
                # Callers expect an empty string in that case
887
 
                return ''
888
 
            else:
889
 
                raise
890
 
        else:
891
 
            self._report_activity(len(bytes), 'read')
892
 
            return bytes
 
889
        return _read_bytes_from_socket(
 
890
            self._socket.recv, count, self._report_activity)
893
891
 
894
892
 
895
893
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
931
929
        """
932
930
        self._medium._flush()
933
931
 
 
932
 
 
933
def _read_bytes_from_socket(sock, desired_count, report_activity):
 
934
    # We ignore the desired_count because on sockets it's more efficient to
 
935
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
936
    try:
 
937
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
 
938
    except socket.error, e:
 
939
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
 
940
            # The connection was closed by the other side.  Callers expect an
 
941
            # empty string to signal end-of-stream.
 
942
            bytes = ''
 
943
        else:
 
944
            raise
 
945
    else:
 
946
        report_activity(len(bytes), 'read')
 
947
    return bytes
 
948