~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2010-02-27 12:27:33 UTC
  • mto: This revision was merged to the branch mainline in revision 5061.
  • Revision ID: robertc@robertcollins.net-20100227122733-2o3me3fkk3pk36ns
``bzrlib.branchbuilder.BranchBuilder.build_snapshot`` now accepts a
``message_callback`` in the same way that commit does. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
33
33
from bzrlib.lazy_import import lazy_import
34
34
lazy_import(globals(), """
35
35
import atexit
 
36
import thread
36
37
import weakref
 
38
 
37
39
from bzrlib import (
38
40
    debug,
39
41
    errors,
40
 
    osutils,
41
42
    symbol_versioning,
42
43
    trace,
43
44
    ui,
44
45
    urlutils,
45
46
    )
46
 
from bzrlib.smart import client, protocol
 
47
from bzrlib.smart import client, protocol, request, vfs
47
48
from bzrlib.transport import ssh
48
49
""")
49
 
 
 
50
#usually already imported, and getting IllegalScoperReplacer on it here.
 
51
from bzrlib import osutils
50
52
 
51
53
# We must not read any more than 64k at a time so we don't risk "no buffer
52
54
# space available" errors on some platforms.  Windows in particular is likely
285
287
        self._push_back(protocol.unused_data)
286
288
 
287
289
    def _read_bytes(self, desired_count):
288
 
        # We ignore the desired_count because on sockets it's more efficient to
289
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
290
 
        bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
 
        self._report_activity(len(bytes), 'read')
292
 
        return bytes
 
290
        return _read_bytes_from_socket(
 
291
            self.socket.recv, desired_count, self._report_activity)
293
292
 
294
293
    def terminate_due_to_error(self):
295
294
        # TODO: This should log to a server log file, but no such thing
296
295
        # exists yet.  Andrew Bennetts 2006-09-29.
297
 
        self.socket.close()
 
296
        osutils.until_no_eintr(self.socket.close)
298
297
        self.finished = True
299
298
 
300
299
    def _write_out(self, bytes):
 
300
        tstart = osutils.timer_func()
301
301
        osutils.send_all(self.socket, bytes, self._report_activity)
 
302
        if 'hpss' in debug.debug_flags:
 
303
            thread_id = thread.get_ident()
 
304
            trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
 
305
                         % ('wrote', thread_id, len(bytes),
 
306
                            osutils.timer_func() - tstart))
302
307
 
303
308
 
304
309
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
329
334
            bytes_to_read = protocol.next_read_size()
330
335
            if bytes_to_read == 0:
331
336
                # Finished serving this request.
332
 
                self._out.flush()
 
337
                osutils.until_no_eintr(self._out.flush)
333
338
                return
334
339
            bytes = self.read_bytes(bytes_to_read)
335
340
            if bytes == '':
336
341
                # Connection has been closed.
337
342
                self.finished = True
338
 
                self._out.flush()
 
343
                osutils.until_no_eintr(self._out.flush)
339
344
                return
340
345
            protocol.accept_bytes(bytes)
341
346
 
342
347
    def _read_bytes(self, desired_count):
343
 
        return self._in.read(desired_count)
 
348
        return osutils.until_no_eintr(self._in.read, desired_count)
344
349
 
345
350
    def terminate_due_to_error(self):
346
351
        # TODO: This should log to a server log file, but no such thing
347
352
        # exists yet.  Andrew Bennetts 2006-09-29.
348
 
        self._out.close()
 
353
        osutils.until_no_eintr(self._out.close)
349
354
        self.finished = True
350
355
 
351
356
    def _write_out(self, bytes):
352
 
        self._out.write(bytes)
 
357
        osutils.until_no_eintr(self._out.write, bytes)
353
358
 
354
359
 
355
360
class SmartClientMediumRequest(object):
381
386
    def accept_bytes(self, bytes):
382
387
        """Accept bytes for inclusion in this request.
383
388
 
384
 
        This method may not be be called after finished_writing() has been
 
389
        This method may not be called after finished_writing() has been
385
390
        called.  It depends upon the Medium whether or not the bytes will be
386
391
        immediately transmitted. Message based Mediums will tend to buffer the
387
392
        bytes until finished_writing() is called.
475
480
        if not line.endswith('\n'):
476
481
            # end of file encountered reading from server
477
482
            raise errors.ConnectionReset(
478
 
                "please check connectivity and permissions")
 
483
                "Unexpected end of message. Please check connectivity "
 
484
                "and permissions, and report a bug if problems persist.")
479
485
        return line
480
486
 
481
487
    def _read_line(self):
509
515
        """
510
516
        medium_repr = repr(medium)
511
517
        # Add this medium to the WeakKeyDictionary
512
 
        self.counts[medium] = [0, medium_repr]
 
518
        self.counts[medium] = dict(count=0, vfs_count=0,
 
519
                                   medium_repr=medium_repr)
513
520
        # Weakref callbacks are fired in reverse order of their association
514
521
        # with the referenced object.  So we add a weakref *after* adding to
515
522
        # the WeakKeyDict so that we can report the value from it before the
519
526
    def increment_call_count(self, params):
520
527
        # Increment the count in the WeakKeyDictionary
521
528
        value = self.counts[params.medium]
522
 
        value[0] += 1
 
529
        value['count'] += 1
 
530
        try:
 
531
            request_method = request.request_handlers.get(params.method)
 
532
        except KeyError:
 
533
            # A method we don't know about doesn't count as a VFS method.
 
534
            return
 
535
        if issubclass(request_method, vfs.VfsRequest):
 
536
            value['vfs_count'] += 1
523
537
 
524
538
    def done(self, ref):
525
539
        value = self.counts[ref]
526
 
        count, medium_repr = value
 
540
        count, vfs_count, medium_repr = (
 
541
            value['count'], value['vfs_count'], value['medium_repr'])
527
542
        # In case this callback is invoked for the same ref twice (by the
528
543
        # weakref callback and by the atexit function), set the call count back
529
544
        # to 0 so this item won't be reported twice.
530
 
        value[0] = 0
 
545
        value['count'] = 0
 
546
        value['vfs_count'] = 0
531
547
        if count != 0:
532
 
            trace.note('HPSS calls: %d %s', count, medium_repr)
 
548
            trace.note('HPSS calls: %d (%d vfs) %s',
 
549
                       count, vfs_count, medium_repr)
533
550
 
534
551
    def flush_all(self):
535
552
        for ref in list(self.counts.keys()):
703
720
 
704
721
    def _accept_bytes(self, bytes):
705
722
        """See SmartClientStreamMedium.accept_bytes."""
706
 
        self._writeable_pipe.write(bytes)
 
723
        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
707
724
        self._report_activity(len(bytes), 'write')
708
725
 
709
726
    def _flush(self):
710
727
        """See SmartClientStreamMedium._flush()."""
711
 
        self._writeable_pipe.flush()
 
728
        osutils.until_no_eintr(self._writeable_pipe.flush)
712
729
 
713
730
    def _read_bytes(self, count):
714
731
        """See SmartClientStreamMedium._read_bytes."""
715
 
        bytes = self._readable_pipe.read(count)
 
732
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
716
733
        self._report_activity(len(bytes), 'read')
717
734
        return bytes
718
735
 
732
749
        self._password = password
733
750
        self._port = port
734
751
        self._username = username
 
752
        # for the benefit of progress making a short description of this
 
753
        # transport
 
754
        self._scheme = 'bzr+ssh'
735
755
        # SmartClientStreamMedium stores the repr of this object in its
736
756
        # _DebugCounter so we have to store all the values used in our repr
737
757
        # method before calling the super init.
741
761
        self._vendor = vendor
742
762
        self._write_to = None
743
763
        self._bzr_remote_path = bzr_remote_path
744
 
        # for the benefit of progress making a short description of this
745
 
        # transport
746
 
        self._scheme = 'bzr+ssh'
747
764
 
748
765
    def __repr__(self):
749
 
        return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
 
766
        if self._port is None:
 
767
            maybe_port = ''
 
768
        else:
 
769
            maybe_port = ':%s' % self._port
 
770
        return "%s(%s://%s@%s%s/)" % (
750
771
            self.__class__.__name__,
751
 
            self._connected,
 
772
            self._scheme,
752
773
            self._username,
753
774
            self._host,
754
 
            self._port)
 
775
            maybe_port)
755
776
 
756
777
    def _accept_bytes(self, bytes):
757
778
        """See SmartClientStreamMedium.accept_bytes."""
758
779
        self._ensure_connection()
759
 
        self._write_to.write(bytes)
 
780
        osutils.until_no_eintr(self._write_to.write, bytes)
760
781
        self._report_activity(len(bytes), 'write')
761
782
 
762
783
    def disconnect(self):
763
784
        """See SmartClientMedium.disconnect()."""
764
785
        if not self._connected:
765
786
            return
766
 
        self._read_from.close()
767
 
        self._write_to.close()
 
787
        osutils.until_no_eintr(self._read_from.close)
 
788
        osutils.until_no_eintr(self._write_to.close)
768
789
        self._ssh_connection.close()
769
790
        self._connected = False
770
791
 
793
814
        if not self._connected:
794
815
            raise errors.MediumNotConnected(self)
795
816
        bytes_to_read = min(count, _MAX_READ_SIZE)
796
 
        bytes = self._read_from.read(bytes_to_read)
 
817
        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
797
818
        self._report_activity(len(bytes), 'read')
798
819
        return bytes
799
820
 
823
844
        """See SmartClientMedium.disconnect()."""
824
845
        if not self._connected:
825
846
            return
826
 
        self._socket.close()
 
847
        osutils.until_no_eintr(self._socket.close)
827
848
        self._socket = None
828
849
        self._connected = False
829
850
 
877
898
        """See SmartClientMedium.read_bytes."""
878
899
        if not self._connected:
879
900
            raise errors.MediumNotConnected(self)
880
 
        # We ignore the desired_count because on sockets it's more efficient to
881
 
        # read large chunks (of _MAX_READ_SIZE bytes) at a time.
882
 
        try:
883
 
            bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
884
 
        except socket.error, e:
885
 
            if len(e.args) and e.args[0] == errno.ECONNRESET:
886
 
                # Callers expect an empty string in that case
887
 
                return ''
888
 
            else:
889
 
                raise
890
 
        else:
891
 
            self._report_activity(len(bytes), 'read')
892
 
            return bytes
 
901
        return _read_bytes_from_socket(
 
902
            self._socket.recv, count, self._report_activity)
893
903
 
894
904
 
895
905
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
931
941
        """
932
942
        self._medium._flush()
933
943
 
 
944
 
 
945
def _read_bytes_from_socket(sock, desired_count, report_activity):
 
946
    # We ignore the desired_count because on sockets it's more efficient to
 
947
    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
 
948
    try:
 
949
        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
 
950
    except socket.error, e:
 
951
        if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
 
952
            # The connection was closed by the other side.  Callers expect an
 
953
            # empty string to signal end-of-stream.
 
954
            bytes = ''
 
955
        else:
 
956
            raise
 
957
    else:
 
958
        report_activity(len(bytes), 'read')
 
959
    return bytes
 
960