24
24
bzrlib/transport/smart/__init__.py.
33
from bzrlib.lazy_import import lazy_import
34
lazy_import(globals(), """
32
37
from bzrlib import (
38
from bzrlib.smart.protocol import (
39
MESSAGE_VERSION_THREE,
41
SmartClientRequestProtocolOne,
42
SmartServerRequestProtocolOne,
43
SmartServerRequestProtocolTwo,
44
build_server_protocol_three
46
from bzrlib.smart import client, protocol, request, vfs
46
47
from bzrlib.transport import ssh
51
# We must not read any more than 64k at a time so we don't risk "no buffer
52
# space available" errors on some platforms. Windows in particular is likely
53
# to give error 10053 or 10055 if we read more than 64k from a socket.
54
_MAX_READ_SIZE = 64 * 1024
49
57
def _get_protocol_factory_for_bytes(bytes):
67
75
root_client_path. unused_bytes are any bytes that were not part of a
68
76
protocol version marker.
70
if bytes.startswith(MESSAGE_VERSION_THREE):
71
protocol_factory = build_server_protocol_three
72
bytes = bytes[len(MESSAGE_VERSION_THREE):]
73
elif bytes.startswith(REQUEST_VERSION_TWO):
74
protocol_factory = SmartServerRequestProtocolTwo
75
bytes = bytes[len(REQUEST_VERSION_TWO):]
78
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
79
protocol_factory = protocol.build_server_protocol_three
80
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
81
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
82
protocol_factory = protocol.SmartServerRequestProtocolTwo
83
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
77
protocol_factory = SmartServerRequestProtocolOne
85
protocol_factory = protocol.SmartServerRequestProtocolOne
78
86
return protocol_factory, bytes
81
class SmartServerStreamMedium(object):
89
def _get_line(read_bytes_func):
90
"""Read bytes using read_bytes_func until a newline byte.
92
This isn't particularly efficient, so should only be used when the
93
expected size of the line is quite short.
95
:returns: a tuple of two strs: (line, excess)
99
while newline_pos == -1:
100
new_bytes = read_bytes_func(1)
103
# Ran out of bytes before receiving a complete line.
105
newline_pos = bytes.find('\n')
106
line = bytes[:newline_pos+1]
107
excess = bytes[newline_pos+1:]
111
class SmartMedium(object):
112
"""Base class for smart protocol media, both client- and server-side."""
115
self._push_back_buffer = None
117
def _push_back(self, bytes):
118
"""Return unused bytes to the medium, because they belong to the next
121
This sets the _push_back_buffer to the given bytes.
123
if self._push_back_buffer is not None:
124
raise AssertionError(
125
"_push_back called when self._push_back_buffer is %r"
126
% (self._push_back_buffer,))
129
self._push_back_buffer = bytes
131
def _get_push_back_buffer(self):
132
if self._push_back_buffer == '':
133
raise AssertionError(
134
'%s._push_back_buffer should never be the empty string, '
135
'which can be confused with EOF' % (self,))
136
bytes = self._push_back_buffer
137
self._push_back_buffer = None
140
def read_bytes(self, desired_count):
141
"""Read some bytes from this medium.
143
:returns: some bytes, possibly more or less than the number requested
144
in 'desired_count' depending on the medium.
146
if self._push_back_buffer is not None:
147
return self._get_push_back_buffer()
148
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
149
return self._read_bytes(bytes_to_read)
151
def _read_bytes(self, count):
152
raise NotImplementedError(self._read_bytes)
155
"""Read bytes from this request's response until a newline byte.
157
This isn't particularly efficient, so should only be used when the
158
expected size of the line is quite short.
160
:returns: a string of bytes ending in a newline (byte 0x0A).
162
line, excess = _get_line(self.read_bytes)
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
179
class SmartServerStreamMedium(SmartMedium):
82
180
"""Handles smart commands coming over a stream.
84
182
The stream may be a pipe connected to sshd, or a tcp socket, or an
105
203
self.backing_transport = backing_transport
106
204
self.root_client_path = root_client_path
107
205
self.finished = False
108
self._push_back_buffer = None
110
def _push_back(self, bytes):
111
"""Return unused bytes to the medium, because they belong to the next
114
This sets the _push_back_buffer to the given bytes.
116
if self._push_back_buffer is not None:
117
raise AssertionError(
118
"_push_back called when self._push_back_buffer is %r"
119
% (self._push_back_buffer,))
122
self._push_back_buffer = bytes
124
def _get_push_back_buffer(self):
125
if self._push_back_buffer == '':
126
raise AssertionError(
127
'%s._push_back_buffer should never be the empty string, '
128
'which can be confused with EOF' % (self,))
129
bytes = self._push_back_buffer
130
self._push_back_buffer = None
206
SmartMedium.__init__(self)
134
209
"""Serve requests until the client disconnects."""
175
250
"""Called when an unhandled exception from the protocol occurs."""
176
251
raise NotImplementedError(self.terminate_due_to_error)
178
def _get_bytes(self, desired_count):
253
def _read_bytes(self, desired_count):
179
254
"""Get some bytes from the medium.
181
256
:param desired_count: number of bytes we want to read.
183
raise NotImplementedError(self._get_bytes)
186
"""Read bytes from this request's response until a newline byte.
188
This isn't particularly efficient, so should only be used when the
189
expected size of the line is quite short.
191
:returns: a string of bytes ending in a newline (byte 0x0A).
195
while newline_pos == -1:
196
new_bytes = self._get_bytes(1)
199
# Ran out of bytes before receiving a complete line.
201
newline_pos = bytes.find('\n')
202
line = bytes[:newline_pos+1]
203
self._push_back(bytes[newline_pos+1:])
258
raise NotImplementedError(self._read_bytes)
207
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
220
274
def _serve_one_request_unguarded(self, protocol):
221
275
while protocol.next_read_size():
222
bytes = self._get_bytes(4096)
276
# We can safely try to read large chunks. If there is less data
277
# than _MAX_READ_SIZE ready, the socket wil just return a short
278
# read immediately rather than block.
279
bytes = self.read_bytes(_MAX_READ_SIZE)
224
281
self.finished = True
226
283
protocol.accept_bytes(bytes)
228
285
self._push_back(protocol.unused_data)
230
def _get_bytes(self, desired_count):
231
if self._push_back_buffer is not None:
232
return self._get_push_back_buffer()
233
# We ignore the desired_count because on sockets it's more efficient to
235
return self.socket.recv(4096)
287
def _read_bytes(self, desired_count):
288
return _read_bytes_from_socket(
289
self.socket.recv, desired_count, self._report_activity)
237
291
def terminate_due_to_error(self):
238
292
# TODO: This should log to a server log file, but no such thing
239
293
# exists yet. Andrew Bennetts 2006-09-29.
401
456
return self._read_bytes(count)
403
458
def _read_bytes(self, count):
404
"""Helper for read_bytes.
459
"""Helper for SmartClientMediumRequest.read_bytes.
406
461
read_bytes checks the state of the request to determing if bytes
407
462
should be read. After that it hands off to _read_bytes to do the
465
By default this forwards to self._medium.read_bytes because we are
466
operating on the medium's stream.
410
raise NotImplementedError(self._read_bytes)
468
return self._medium.read_bytes(count)
412
470
def read_line(self):
413
"""Read bytes from this request's response until a newline byte.
415
This isn't particularly efficient, so should only be used when the
416
expected size of the line is quite short.
418
:returns: a string of bytes ending in a newline (byte 0x0A).
420
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
422
while not line or line[-1] != '\n':
423
new_char = self.read_bytes(1)
426
# end of file encountered reading from server
427
raise errors.ConnectionReset(
428
"please check connectivity and permissions",
429
"(and try -Dhpss if further diagnosis is required)")
471
line = self._read_line()
472
if not line.endswith('\n'):
473
# end of file encountered reading from server
474
raise errors.ConnectionReset(
475
"please check connectivity and permissions")
433
class SmartClientMedium(object):
478
def _read_line(self):
479
"""Helper for SmartClientMediumRequest.read_line.
481
By default this forwards to self._medium._get_line because we are
482
operating on the medium's stream.
484
return self._medium._get_line()
487
class _DebugCounter(object):
488
"""An object that counts the HPSS calls made to each client medium.
490
When a medium is garbage-collected, or failing that when atexit functions
491
are run, the total number of calls made on that medium are reported via
496
self.counts = weakref.WeakKeyDictionary()
497
client._SmartClient.hooks.install_named_hook(
498
'call', self.increment_call_count, 'hpss call counter')
499
atexit.register(self.flush_all)
501
def track(self, medium):
502
"""Start tracking calls made to a medium.
504
This only keeps a weakref to the medium, so shouldn't affect the
507
medium_repr = repr(medium)
508
# Add this medium to the WeakKeyDictionary
509
self.counts[medium] = dict(count=0, vfs_count=0,
510
medium_repr=medium_repr)
511
# Weakref callbacks are fired in reverse order of their association
512
# with the referenced object. So we add a weakref *after* adding to
513
# the WeakKeyDict so that we can report the value from it before the
514
# entry is removed by the WeakKeyDict's own callback.
515
ref = weakref.ref(medium, self.done)
517
def increment_call_count(self, params):
518
# Increment the count in the WeakKeyDictionary
519
value = self.counts[params.medium]
521
request_method = request.request_handlers.get(params.method)
522
if issubclass(request_method, vfs.VfsRequest):
523
value['vfs_count'] += 1
526
value = self.counts[ref]
527
count, vfs_count, medium_repr = (
528
value['count'], value['vfs_count'], value['medium_repr'])
529
# In case this callback is invoked for the same ref twice (by the
530
# weakref callback and by the atexit function), set the call count back
531
# to 0 so this item won't be reported twice.
533
value['vfs_count'] = 0
535
trace.note('HPSS calls: %d (%d vfs) %s',
536
count, vfs_count, medium_repr)
539
for ref in list(self.counts.keys()):
542
_debug_counter = None
545
class SmartClientMedium(SmartMedium):
434
546
"""Smart client is a medium for sending smart protocol requests over."""
436
548
def __init__(self, base):
439
551
self._protocol_version_error = None
440
552
self._protocol_version = None
441
553
self._done_hello = False
554
# Be optimistic: we assume the remote end can accept new remote
555
# requests until we get an error saying otherwise.
556
# _remote_version_is_before tracks the bzr version the remote side
557
# can be based on what we've seen so far.
558
self._remote_version_is_before = None
559
# Install debug hook function if debug flag is set.
560
if 'hpss' in debug.debug_flags:
561
global _debug_counter
562
if _debug_counter is None:
563
_debug_counter = _DebugCounter()
564
_debug_counter.track(self)
566
def _is_remote_before(self, version_tuple):
567
"""Is it possible the remote side supports RPCs for a given version?
571
needed_version = (1, 2)
572
if medium._is_remote_before(needed_version):
573
fallback_to_pre_1_2_rpc()
577
except UnknownSmartMethod:
578
medium._remember_remote_is_before(needed_version)
579
fallback_to_pre_1_2_rpc()
581
:seealso: _remember_remote_is_before
583
if self._remote_version_is_before is None:
584
# So far, the remote side seems to support everything
586
return version_tuple >= self._remote_version_is_before
588
def _remember_remote_is_before(self, version_tuple):
589
"""Tell this medium that the remote side is older the given version.
591
:seealso: _is_remote_before
593
if (self._remote_version_is_before is not None and
594
version_tuple > self._remote_version_is_before):
595
# We have been told that the remote side is older than some version
596
# which is newer than a previously supplied older-than version.
597
# This indicates that some smart verb call is not guarded
598
# appropriately (it should simply not have been tried).
599
raise AssertionError(
600
"_remember_remote_is_before(%r) called, but "
601
"_remember_remote_is_before(%r) was called previously."
602
% (version_tuple, self._remote_version_is_before))
603
self._remote_version_is_before = version_tuple
443
605
def protocol_version(self):
444
606
"""Find out if 'hello' smart request works."""
561
717
def _read_bytes(self, count):
562
718
"""See SmartClientStreamMedium._read_bytes."""
563
return self._readable_pipe.read(count)
719
bytes = self._readable_pipe.read(count)
720
self._report_activity(len(bytes), 'read')
566
724
class SmartSSHClientMedium(SmartClientStreamMedium):
567
725
"""A client medium using SSH."""
569
727
def __init__(self, host, port=None, username=None, password=None,
570
728
base=None, vendor=None, bzr_remote_path=None):
571
729
"""Creates a client that will connect on the first use.
573
731
:param vendor: An optional override for the ssh vendor to use. See
574
732
bzrlib.transport.ssh for details on ssh vendors.
576
SmartClientStreamMedium.__init__(self, base)
577
734
self._connected = False
578
735
self._host = host
579
736
self._password = password
580
737
self._port = port
581
738
self._username = username
739
# SmartClientStreamMedium stores the repr of this object in its
740
# _DebugCounter so we have to store all the values used in our repr
741
# method before calling the super init.
742
SmartClientStreamMedium.__init__(self, base)
582
743
self._read_from = None
583
744
self._ssh_connection = None
584
745
self._vendor = vendor
585
746
self._write_to = None
586
747
self._bzr_remote_path = bzr_remote_path
587
if self._bzr_remote_path is None:
588
symbol_versioning.warn(
589
'bzr_remote_path is required as of bzr 0.92',
590
DeprecationWarning, stacklevel=2)
591
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
748
# for the benefit of progress making a short description of this
750
self._scheme = 'bzr+ssh'
753
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
754
self.__class__.__name__,
593
760
def _accept_bytes(self, bytes):
594
761
"""See SmartClientStreamMedium.accept_bytes."""
595
762
self._ensure_connection()
596
763
self._write_to.write(bytes)
764
self._report_activity(len(bytes), 'write')
598
766
def disconnect(self):
599
767
"""See SmartClientMedium.disconnect()."""
628
796
"""See SmartClientStreamMedium.read_bytes."""
629
797
if not self._connected:
630
798
raise errors.MediumNotConnected(self)
631
return self._read_from.read(count)
799
bytes_to_read = min(count, _MAX_READ_SIZE)
800
bytes = self._read_from.read(bytes_to_read)
801
self._report_activity(len(bytes), 'read')
634
805
# Port 4155 is the default port for bzr://, registered with IANA.
635
BZR_DEFAULT_INTERFACE = '0.0.0.0'
806
BZR_DEFAULT_INTERFACE = None
636
807
BZR_DEFAULT_PORT = 4155
639
810
class SmartTCPClientMedium(SmartClientStreamMedium):
640
811
"""A client medium using TCP."""
642
813
def __init__(self, host, port, base):
643
814
"""Creates a client that will connect on the first use."""
644
815
SmartClientStreamMedium.__init__(self, base)
664
835
"""Connect this medium if not already connected."""
665
836
if self._connected:
667
self._socket = socket.socket()
668
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
669
838
if self._port is None:
670
839
port = BZR_DEFAULT_PORT
672
841
port = int(self._port)
674
self._socket.connect((self._host, port))
675
except socket.error, err:
843
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
844
socket.SOCK_STREAM, 0, 0)
845
except socket.gaierror, (err_num, err_msg):
846
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
847
(self._host, port, err_msg))
848
# Initialize err in case there are no addresses returned:
849
err = socket.error("no address found for %s" % self._host)
850
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
852
self._socket = socket.socket(family, socktype, proto)
853
self._socket.setsockopt(socket.IPPROTO_TCP,
854
socket.TCP_NODELAY, 1)
855
self._socket.connect(sockaddr)
856
except socket.error, err:
857
if self._socket is not None:
862
if self._socket is None:
676
863
# socket errors either have a (string) or (errno, string) as their
678
865
if type(err.args) is str:
737
925
self._medium._flush()
739
def _read_bytes(self, count):
740
"""See SmartClientMediumRequest._read_bytes.
742
This forwards to self._medium._read_bytes because we are operating
743
on the mediums stream.
745
return self._medium._read_bytes(count)
928
def _read_bytes_from_socket(sock, desired_count, report_activity):
929
# We ignore the desired_count because on sockets it's more efficient to
930
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
932
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
933
except socket.error, e:
934
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
935
# The connection was closed by the other side. Callers expect an
936
# empty string to signal end-of-stream.
941
report_activity(len(bytes), 'read')