~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Gordon Tyler
  • Date: 2010-01-14 15:24:04 UTC
  • mto: (5037.3.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5046.
  • Revision ID: gordon@doxxx.net-20100114152404-d64ik2afl96lcml0
Reverted changes to test_rules since the original should work now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
 
36
import thread
36
37
import weakref
 
38
 
37
39
from bzrlib import (
38
40
    debug,
39
41
    errors,
40
 
    osutils,
41
42
    symbol_versioning,
42
43
    trace,
43
44
    ui,
46
47
from bzrlib.smart import client, protocol, request, vfs
47
48
from bzrlib.transport import ssh
48
49
""")
49
 
 
 
50
#usually already imported, and getting IllegalScoperReplacer on it here.
 
51
from bzrlib import osutils
50
52
 
51
53
# We must not read any more than 64k at a time so we don't risk "no buffer
52
54
# space available" errors on some platforms.  Windows in particular is likely
291
293
    def terminate_due_to_error(self):
292
294
        # TODO: This should log to a server log file, but no such thing
293
295
        # exists yet.  Andrew Bennetts 2006-09-29.
294
 
        self.socket.close()
 
296
        osutils.until_no_eintr(self.socket.close)
295
297
        self.finished = True
296
298
 
297
299
    def _write_out(self, bytes):
 
300
        tstart = osutils.timer_func()
298
301
        osutils.send_all(self.socket, bytes, self._report_activity)
 
302
        if 'hpss' in debug.debug_flags:
 
303
            thread_id = thread.get_ident()
 
304
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
305
                         % ('wrote', thread_id, len(bytes),
 
306
                            osutils.timer_func() - tstart))
299
307
 
300
308
 
301
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
326
334
            bytes_to_read = protocol.next_read_size()
327
335
            if bytes_to_read == 0:
328
336
                # Finished serving this request.
329
 
                self._out.flush()
 
337
                osutils.until_no_eintr(self._out.flush)
330
338
                return
331
339
            bytes = self.read_bytes(bytes_to_read)
332
340
            if bytes == '':
333
341
                # Connection has been closed.
334
342
                self.finished = True
335
 
                self._out.flush()
 
343
                osutils.until_no_eintr(self._out.flush)
336
344
                return
337
345
            protocol.accept_bytes(bytes)
338
346
 
339
347
    def _read_bytes(self, desired_count):
340
 
        return self._in.read(desired_count)
 
348
        return osutils.until_no_eintr(self._in.read, desired_count)
341
349
 
342
350
    def terminate_due_to_error(self):
343
351
        # TODO: This should log to a server log file, but no such thing
344
352
        # exists yet.  Andrew Bennetts 2006-09-29.
345
 
        self._out.close()
 
353
        osutils.until_no_eintr(self._out.close)
346
354
        self.finished = True
347
355
 
348
356
    def _write_out(self, bytes):
349
 
        self._out.write(bytes)
 
357
        osutils.until_no_eintr(self._out.write, bytes)
350
358
 
351
359
 
352
360
class SmartClientMediumRequest(object):
472
480
        if not line.endswith('\n'):
473
481
            # end of file encountered reading from server
474
482
            raise errors.ConnectionReset(
475
 
                "please check connectivity and permissions")
 
483
                "Unexpected end of message. Please check connectivity "
 
484
                "and permissions, and report a bug if problems persist.")
476
485
        return line
477
486
 
478
487
    def _read_line(self):
518
527
        # Increment the count in the WeakKeyDictionary
519
528
        value = self.counts[params.medium]
520
529
        value['count'] += 1
521
 
        request_method = request.request_handlers.get(params.method)
 
530
        try:
 
531
            request_method = request.request_handlers.get(params.method)
 
532
        except KeyError:
 
533
            # A method we don't know about doesn't count as a VFS method.
 
534
            return
522
535
        if issubclass(request_method, vfs.VfsRequest):
523
536
            value['vfs_count'] += 1
524
537
 
707
720
 
708
721
    def _accept_bytes(self, bytes):
709
722
        """See SmartClientStreamMedium.accept_bytes."""
710
 
        self._writeable_pipe.write(bytes)
 
723
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
711
724
        self._report_activity(len(bytes), 'write')
712
725
 
713
726
    def _flush(self):
714
727
        """See SmartClientStreamMedium._flush()."""
715
 
        self._writeable_pipe.flush()
 
728
        osutils.until_no_eintr(self._writeable_pipe.flush)
716
729
 
717
730
    def _read_bytes(self, count):
718
731
        """See SmartClientStreamMedium._read_bytes."""
719
 
        bytes = self._readable_pipe.read(count)
 
732
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
720
733
        self._report_activity(len(bytes), 'read')
721
734
        return bytes
722
735
 
760
773
    def _accept_bytes(self, bytes):
761
774
        """See SmartClientStreamMedium.accept_bytes."""
762
775
        self._ensure_connection()
763
 
        self._write_to.write(bytes)
 
776
        osutils.until_no_eintr(self._write_to.write, bytes)
764
777
        self._report_activity(len(bytes), 'write')
765
778
 
766
779
    def disconnect(self):
767
780
        """See SmartClientMedium.disconnect()."""
768
781
        if not self._connected:
769
782
            return
770
 
        self._read_from.close()
771
 
        self._write_to.close()
 
783
        osutils.until_no_eintr(self._read_from.close)
 
784
        osutils.until_no_eintr(self._write_to.close)
772
785
        self._ssh_connection.close()
773
786
        self._connected = False
774
787
 
797
810
        if not self._connected:
798
811
            raise errors.MediumNotConnected(self)
799
812
        bytes_to_read = min(count, _MAX_READ_SIZE)
800
 
        bytes = self._read_from.read(bytes_to_read)
 
813
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
801
814
        self._report_activity(len(bytes), 'read')
802
815
        return bytes
803
816
 
827
840
        """See SmartClientMedium.disconnect()."""
828
841
        if not self._connected:
829
842
            return
830
 
        self._socket.close()
 
843
        osutils.until_no_eintr(self._socket.close)
831
844
        self._socket = None
832
845
        self._connected = False
833
846