74
67
root_client_path. unused_bytes are any bytes that were not part of a
75
68
protocol version marker.
77
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
78
protocol_factory = protocol.build_server_protocol_three
79
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
80
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
81
protocol_factory = protocol.SmartServerRequestProtocolTwo
82
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
70
if bytes.startswith(MESSAGE_VERSION_THREE):
71
protocol_factory = build_server_protocol_three
72
bytes = bytes[len(MESSAGE_VERSION_THREE):]
73
elif bytes.startswith(REQUEST_VERSION_TWO):
74
protocol_factory = SmartServerRequestProtocolTwo
75
bytes = bytes[len(REQUEST_VERSION_TWO):]
84
protocol_factory = protocol.SmartServerRequestProtocolOne
77
protocol_factory = SmartServerRequestProtocolOne
85
78
return protocol_factory, bytes
88
def _get_line(read_bytes_func):
89
"""Read bytes using read_bytes_func until a newline byte.
91
This isn't particularly efficient, so should only be used when the
92
expected size of the line is quite short.
94
:returns: a tuple of two strs: (line, excess)
98
while newline_pos == -1:
99
new_bytes = read_bytes_func(1)
102
# Ran out of bytes before receiving a complete line.
104
newline_pos = bytes.find('\n')
105
line = bytes[:newline_pos+1]
106
excess = bytes[newline_pos+1:]
110
class SmartMedium(object):
111
"""Base class for smart protocol media, both client- and server-side."""
114
self._push_back_buffer = None
116
def _push_back(self, bytes):
117
"""Return unused bytes to the medium, because they belong to the next
120
This sets the _push_back_buffer to the given bytes.
122
if self._push_back_buffer is not None:
123
raise AssertionError(
124
"_push_back called when self._push_back_buffer is %r"
125
% (self._push_back_buffer,))
128
self._push_back_buffer = bytes
130
def _get_push_back_buffer(self):
131
if self._push_back_buffer == '':
132
raise AssertionError(
133
'%s._push_back_buffer should never be the empty string, '
134
'which can be confused with EOF' % (self,))
135
bytes = self._push_back_buffer
136
self._push_back_buffer = None
139
def read_bytes(self, desired_count):
140
"""Read some bytes from this medium.
142
:returns: some bytes, possibly more or less than the number requested
143
in 'desired_count' depending on the medium.
145
if self._push_back_buffer is not None:
146
return self._get_push_back_buffer()
147
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
148
return self._read_bytes(bytes_to_read)
150
def _read_bytes(self, count):
151
raise NotImplementedError(self._read_bytes)
154
"""Read bytes from this request's response until a newline byte.
156
This isn't particularly efficient, so should only be used when the
157
expected size of the line is quite short.
159
:returns: a string of bytes ending in a newline (byte 0x0A).
161
line, excess = _get_line(self.read_bytes)
162
self._push_back(excess)
165
def _report_activity(self, bytes, direction):
166
"""Notify that this medium has activity.
168
Implementations should call this from all methods that actually do IO.
169
Be careful that it's not called twice, if one method is implemented on
172
:param bytes: Number of bytes read or written.
173
:param direction: 'read' or 'write' or None.
175
ui.ui_factory.report_transport_activity(self, bytes, direction)
178
class SmartServerStreamMedium(SmartMedium):
81
class SmartServerStreamMedium(object):
179
82
"""Handles smart commands coming over a stream.
181
84
The stream may be a pipe connected to sshd, or a tcp socket, or an
461
401
return self._read_bytes(count)
463
403
def _read_bytes(self, count):
464
"""Helper for SmartClientMediumRequest.read_bytes.
404
"""Helper for read_bytes.
466
406
read_bytes checks the state of the request to determing if bytes
467
407
should be read. After that it hands off to _read_bytes to do the
470
By default this forwards to self._medium.read_bytes because we are
471
operating on the medium's stream.
473
return self._medium.read_bytes(count)
410
raise NotImplementedError(self._read_bytes)
475
412
def read_line(self):
476
line = self._read_line()
477
if not line.endswith('\n'):
478
# end of file encountered reading from server
479
raise errors.ConnectionReset(
480
"Unexpected end of message. Please check connectivity "
481
"and permissions, and report a bug if problems persist.")
413
"""Read bytes from this request's response until a newline byte.
415
This isn't particularly efficient, so should only be used when the
416
expected size of the line is quite short.
418
:returns: a string of bytes ending in a newline (byte 0x0A).
420
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
422
while not line or line[-1] != '\n':
423
new_char = self.read_bytes(1)
426
# end of file encountered reading from server
427
raise errors.ConnectionReset(
428
"please check connectivity and permissions",
429
"(and try -Dhpss if further diagnosis is required)")
484
def _read_line(self):
485
"""Helper for SmartClientMediumRequest.read_line.
487
By default this forwards to self._medium._get_line because we are
488
operating on the medium's stream.
490
return self._medium._get_line()
493
class _DebugCounter(object):
494
"""An object that counts the HPSS calls made to each client medium.
496
When a medium is garbage-collected, or failing that when
497
bzrlib.global_state exits, the total number of calls made on that medium
498
are reported via trace.note.
502
self.counts = weakref.WeakKeyDictionary()
503
client._SmartClient.hooks.install_named_hook(
504
'call', self.increment_call_count, 'hpss call counter')
505
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
def track(self, medium):
508
"""Start tracking calls made to a medium.
510
This only keeps a weakref to the medium, so shouldn't affect the
513
medium_repr = repr(medium)
514
# Add this medium to the WeakKeyDictionary
515
self.counts[medium] = dict(count=0, vfs_count=0,
516
medium_repr=medium_repr)
517
# Weakref callbacks are fired in reverse order of their association
518
# with the referenced object. So we add a weakref *after* adding to
519
# the WeakKeyDict so that we can report the value from it before the
520
# entry is removed by the WeakKeyDict's own callback.
521
ref = weakref.ref(medium, self.done)
523
def increment_call_count(self, params):
524
# Increment the count in the WeakKeyDictionary
525
value = self.counts[params.medium]
528
request_method = request.request_handlers.get(params.method)
530
# A method we don't know about doesn't count as a VFS method.
532
if issubclass(request_method, vfs.VfsRequest):
533
value['vfs_count'] += 1
536
value = self.counts[ref]
537
count, vfs_count, medium_repr = (
538
value['count'], value['vfs_count'], value['medium_repr'])
539
# In case this callback is invoked for the same ref twice (by the
540
# weakref callback and by the atexit function), set the call count back
541
# to 0 so this item won't be reported twice.
543
value['vfs_count'] = 0
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
549
for ref in list(self.counts.keys()):
552
_debug_counter = None
555
class SmartClientMedium(SmartMedium):
433
class SmartClientMedium(object):
556
434
"""Smart client is a medium for sending smart protocol requests over."""
558
436
def __init__(self, base):
733
597
def _read_bytes(self, count):
734
598
"""See SmartClientStreamMedium._read_bytes."""
735
bytes_to_read = min(count, _MAX_READ_SIZE)
736
bytes = self._readable_pipe.read(bytes_to_read)
737
self._report_activity(len(bytes), 'read')
741
class SSHParams(object):
742
"""A set of parameters for starting a remote bzr via SSH."""
599
return self._readable_pipe.read(count)
602
class SmartSSHClientMedium(SmartClientStreamMedium):
603
"""A client medium using SSH."""
744
605
def __init__(self, host, port=None, username=None, password=None,
745
bzr_remote_path='bzr'):
748
self.username = username
749
self.password = password
750
self.bzr_remote_path = bzr_remote_path
753
class SmartSSHClientMedium(SmartClientStreamMedium):
754
"""A client medium using SSH.
756
It delegates IO to a SmartClientSocketMedium or
757
SmartClientAlreadyConnectedSocketMedium (depending on platform).
760
def __init__(self, base, ssh_params, vendor=None):
606
base=None, vendor=None, bzr_remote_path=None):
761
607
"""Creates a client that will connect on the first use.
763
:param ssh_params: A SSHParams instance.
764
609
:param vendor: An optional override for the ssh vendor to use. See
765
610
bzrlib.transport.ssh for details on ssh vendors.
767
self._real_medium = None
768
self._ssh_params = ssh_params
769
# for the benefit of progress making a short description of this
771
self._scheme = 'bzr+ssh'
772
# SmartClientStreamMedium stores the repr of this object in its
773
# _DebugCounter so we have to store all the values used in our repr
774
# method before calling the super init.
775
612
SmartClientStreamMedium.__init__(self, base)
613
self._connected = False
615
self._password = password
617
self._username = username
618
self._read_from = None
619
self._ssh_connection = None
776
620
self._vendor = vendor
777
self._ssh_connection = None
780
if self._ssh_params.port is None:
783
maybe_port = ':%s' % self._ssh_params.port
784
return "%s(%s://%s@%s%s/)" % (
785
self.__class__.__name__,
787
self._ssh_params.username,
788
self._ssh_params.host,
621
self._write_to = None
622
self._bzr_remote_path = bzr_remote_path
623
if self._bzr_remote_path is None:
624
symbol_versioning.warn(
625
'bzr_remote_path is required as of bzr 0.92',
626
DeprecationWarning, stacklevel=2)
627
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
791
629
def _accept_bytes(self, bytes):
792
630
"""See SmartClientStreamMedium.accept_bytes."""
793
631
self._ensure_connection()
794
self._real_medium.accept_bytes(bytes)
632
self._write_to.write(bytes)
796
634
def disconnect(self):
797
635
"""See SmartClientMedium.disconnect()."""
798
if self._real_medium is not None:
799
self._real_medium.disconnect()
800
self._real_medium = None
801
if self._ssh_connection is not None:
802
self._ssh_connection.close()
803
self._ssh_connection = None
636
if not self._connected:
638
self._read_from.close()
639
self._write_to.close()
640
self._ssh_connection.close()
641
self._connected = False
805
643
def _ensure_connection(self):
806
644
"""Connect this medium if not already connected."""
807
if self._real_medium is not None:
809
647
if self._vendor is None:
810
648
vendor = ssh._get_ssh_vendor()
812
650
vendor = self._vendor
813
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
814
self._ssh_params.password, self._ssh_params.host,
815
self._ssh_params.port,
816
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
651
self._ssh_connection = vendor.connect_ssh(self._username,
652
self._password, self._host, self._port,
653
command=[self._bzr_remote_path, 'serve', '--inet',
817
654
'--directory=/', '--allow-writes'])
818
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
819
if io_kind == 'socket':
820
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
821
self.base, io_object)
822
elif io_kind == 'pipes':
823
read_from, write_to = io_object
824
self._real_medium = SmartSimplePipesClientMedium(
825
read_from, write_to, self.base)
827
raise AssertionError(
828
"Unexpected io_kind %r from %r"
829
% (io_kind, self._ssh_connection))
655
self._read_from, self._write_to = \
656
self._ssh_connection.get_filelike_channels()
657
self._connected = True
831
659
def _flush(self):
832
660
"""See SmartClientStreamMedium._flush()."""
833
self._real_medium._flush()
661
self._write_to.flush()
835
663
def _read_bytes(self, count):
836
664
"""See SmartClientStreamMedium.read_bytes."""
837
if self._real_medium is None:
665
if not self._connected:
838
666
raise errors.MediumNotConnected(self)
839
return self._real_medium.read_bytes(count)
667
return self._read_from.read(count)
842
670
# Port 4155 is the default port for bzr://, registered with IANA.
843
BZR_DEFAULT_INTERFACE = None
671
BZR_DEFAULT_INTERFACE = '0.0.0.0'
844
672
BZR_DEFAULT_PORT = 4155
847
class SmartClientSocketMedium(SmartClientStreamMedium):
848
"""A client medium using a socket.
675
class SmartTCPClientMedium(SmartClientStreamMedium):
676
"""A client medium using TCP."""
850
This class isn't usable directly. Use one of its subclasses instead.
853
def __init__(self, base):
678
def __init__(self, host, port, base):
679
"""Creates a client that will connect on the first use."""
854
680
SmartClientStreamMedium.__init__(self, base)
681
self._connected = False
855
684
self._socket = None
856
self._connected = False
858
686
def _accept_bytes(self, bytes):
859
687
"""See SmartClientMedium.accept_bytes."""
860
688
self._ensure_connection()
861
osutils.send_all(self._socket, bytes, self._report_activity)
863
def _ensure_connection(self):
864
"""Connect this medium if not already connected."""
865
raise NotImplementedError(self._ensure_connection)
868
"""See SmartClientStreamMedium._flush().
870
For sockets we do no flushing. For TCP sockets we may want to turn off
871
TCP_NODELAY and add a means to do a flush, but that can be done in the
875
def _read_bytes(self, count):
876
"""See SmartClientMedium.read_bytes."""
877
if not self._connected:
878
raise errors.MediumNotConnected(self)
879
return osutils.read_bytes_from_socket(
880
self._socket, self._report_activity)
689
osutils.send_all(self._socket, bytes)
882
691
def disconnect(self):
883
692
"""See SmartClientMedium.disconnect()."""
887
696
self._socket = None
888
697
self._connected = False
891
class SmartTCPClientMedium(SmartClientSocketMedium):
892
"""A client medium that creates a TCP connection."""
894
def __init__(self, host, port, base):
895
"""Creates a client that will connect on the first use."""
896
SmartClientSocketMedium.__init__(self, base)
900
699
def _ensure_connection(self):
901
700
"""Connect this medium if not already connected."""
902
701
if self._connected:
703
self._socket = socket.socket()
704
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
904
705
if self._port is None:
905
706
port = BZR_DEFAULT_PORT
907
708
port = int(self._port)
909
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
910
socket.SOCK_STREAM, 0, 0)
911
except socket.gaierror, (err_num, err_msg):
912
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
913
(self._host, port, err_msg))
914
# Initialize err in case there are no addresses returned:
915
err = socket.error("no address found for %s" % self._host)
916
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
918
self._socket = socket.socket(family, socktype, proto)
919
self._socket.setsockopt(socket.IPPROTO_TCP,
920
socket.TCP_NODELAY, 1)
921
self._socket.connect(sockaddr)
922
except socket.error, err:
923
if self._socket is not None:
928
if self._socket is None:
710
self._socket.connect((self._host, port))
711
except socket.error, err:
929
712
# socket errors either have a (string) or (errno, string) as their
931
714
if type(err.args) is str: