32
from bzrlib.lazy_import import lazy_import
33
lazy_import(globals(), """
34
31
from bzrlib import (
40
from bzrlib.smart import protocol
41
from bzrlib.transport import ssh
45
# We must not read any more than 64k at a time so we don't risk "no buffer
46
# space available" errors on some platforms. Windows in particular is likely
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
48
_MAX_READ_SIZE = 64 * 1024
51
def _get_protocol_factory_for_bytes(bytes):
52
"""Determine the right protocol factory for 'bytes'.
54
This will return an appropriate protocol factory depending on the version
55
of the protocol being used, as determined by inspecting the given bytes.
56
The bytes should have at least one newline byte (i.e. be a whole line),
57
otherwise it's possible that a request will be incorrectly identified as
60
Typical use would be::
62
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
63
server_protocol = factory(transport, write_func, root_client_path)
64
server_protocol.accept_bytes(unused_bytes)
66
:param bytes: a str of bytes of the start of the request.
67
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
68
a callable that takes three args: transport, write_func,
69
root_client_path. unused_bytes are any bytes that were not part of a
70
protocol version marker.
72
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
73
protocol_factory = protocol.build_server_protocol_three
74
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
75
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
76
protocol_factory = protocol.SmartServerRequestProtocolTwo
77
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
79
protocol_factory = protocol.SmartServerRequestProtocolOne
80
return protocol_factory, bytes
83
def _get_line(read_bytes_func):
84
"""Read bytes using read_bytes_func until a newline byte.
86
This isn't particularly efficient, so should only be used when the
87
expected size of the line is quite short.
89
:returns: a tuple of two strs: (line, excess)
93
while newline_pos == -1:
94
new_bytes = read_bytes_func(1)
97
# Ran out of bytes before receiving a complete line.
99
newline_pos = bytes.find('\n')
100
line = bytes[:newline_pos+1]
101
excess = bytes[newline_pos+1:]
105
class SmartMedium(object):
106
"""Base class for smart protocol media, both client- and server-side."""
109
self._push_back_buffer = None
111
def _push_back(self, bytes):
112
"""Return unused bytes to the medium, because they belong to the next
115
This sets the _push_back_buffer to the given bytes.
117
if self._push_back_buffer is not None:
118
raise AssertionError(
119
"_push_back called when self._push_back_buffer is %r"
120
% (self._push_back_buffer,))
123
self._push_back_buffer = bytes
125
def _get_push_back_buffer(self):
126
if self._push_back_buffer == '':
127
raise AssertionError(
128
'%s._push_back_buffer should never be the empty string, '
129
'which can be confused with EOF' % (self,))
130
bytes = self._push_back_buffer
131
self._push_back_buffer = None
134
def read_bytes(self, desired_count):
135
"""Read some bytes from this medium.
137
:returns: some bytes, possibly more or less than the number requested
138
in 'desired_count' depending on the medium.
140
if self._push_back_buffer is not None:
141
return self._get_push_back_buffer()
142
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
143
return self._read_bytes(bytes_to_read)
145
def _read_bytes(self, count):
146
raise NotImplementedError(self._read_bytes)
149
"""Read bytes from this request's response until a newline byte.
151
This isn't particularly efficient, so should only be used when the
152
expected size of the line is quite short.
154
:returns: a string of bytes ending in a newline (byte 0x0A).
156
line, excess = _get_line(self.read_bytes)
157
self._push_back(excess)
161
class SmartServerStreamMedium(SmartMedium):
35
from bzrlib.smart.protocol import (
37
SmartServerRequestProtocolOne,
38
SmartServerRequestProtocolTwo,
42
from bzrlib.transport import ssh
43
except errors.ParamikoNotPresent:
44
# no paramiko. SmartSSHClientMedium will break.
48
class SmartServerStreamMedium(object):
162
49
"""Handles smart commands coming over a stream.
164
51
The stream may be a pipe connected to sshd, or a tcp socket, or an
232
116
"""Called when an unhandled exception from the protocol occurs."""
233
117
raise NotImplementedError(self.terminate_due_to_error)
235
def _read_bytes(self, desired_count):
119
def _get_bytes(self, desired_count):
236
120
"""Get some bytes from the medium.
238
122
:param desired_count: number of bytes we want to read.
240
raise NotImplementedError(self._read_bytes)
124
raise NotImplementedError(self._get_bytes)
127
"""Read bytes from this request's response until a newline byte.
129
This isn't particularly efficient, so should only be used when the
130
expected size of the line is quite short.
132
:returns: a string of bytes ending in a newline (byte 0x0A).
134
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
136
while not line or line[-1] != '\n':
137
new_char = self._get_bytes(1)
140
# Ran out of bytes before receiving a complete line.
243
145
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
245
def __init__(self, sock, backing_transport, root_client_path='/'):
147
def __init__(self, sock, backing_transport):
248
150
:param sock: the socket the server will read from. It will be put
249
151
into blocking mode.
251
SmartServerStreamMedium.__init__(
252
self, backing_transport, root_client_path=root_client_path)
153
SmartServerStreamMedium.__init__(self, backing_transport)
253
155
sock.setblocking(True)
254
156
self.socket = sock
256
158
def _serve_one_request_unguarded(self, protocol):
257
159
while protocol.next_read_size():
258
# We can safely try to read large chunks. If there is less data
259
# than _MAX_READ_SIZE ready, the socket wil just return a short
260
# read immediately rather than block.
261
bytes = self.read_bytes(_MAX_READ_SIZE)
265
protocol.accept_bytes(bytes)
161
protocol.accept_bytes(self.push_back)
164
bytes = self._get_bytes(4096)
168
protocol.accept_bytes(bytes)
267
self._push_back(protocol.unused_data)
170
self.push_back = protocol.excess_buffer
269
def _read_bytes(self, desired_count):
172
def _get_bytes(self, desired_count):
270
173
# We ignore the desired_count because on sockets it's more efficient to
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
175
return self.socket.recv(4096)
274
177
def terminate_due_to_error(self):
178
"""Called when an unhandled exception from the protocol occurs."""
275
179
# TODO: This should log to a server log file, but no such thing
276
180
# exists yet. Andrew Bennetts 2006-09-29.
277
181
self.socket.close()
278
182
self.finished = True
280
184
def _write_out(self, bytes):
281
osutils.send_all(self.socket, bytes)
185
self.socket.sendall(bytes)
284
188
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
439
340
return self._read_bytes(count)
441
342
def _read_bytes(self, count):
442
"""Helper for SmartClientMediumRequest.read_bytes.
343
"""Helper for read_bytes.
444
345
read_bytes checks the state of the request to determing if bytes
445
346
should be read. After that it hands off to _read_bytes to do the
448
By default this forwards to self._medium.read_bytes because we are
449
operating on the medium's stream.
451
return self._medium.read_bytes(count)
349
raise NotImplementedError(self._read_bytes)
453
351
def read_line(self):
454
line = self._read_line()
455
if not line.endswith('\n'):
456
# end of file encountered reading from server
457
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
462
def _read_line(self):
463
"""Helper for SmartClientMediumRequest.read_line.
352
"""Read bytes from this request's response until a newline byte.
465
By default this forwards to self._medium._get_line because we are
466
operating on the medium's stream.
354
This isn't particularly efficient, so should only be used when the
355
expected size of the line is quite short.
357
:returns: a string of bytes ending in a newline (byte 0x0A).
468
return self._medium._get_line()
471
class SmartClientMedium(SmartMedium):
359
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
361
while not line or line[-1] != '\n':
362
new_char = self.read_bytes(1)
365
raise errors.SmartProtocolError(
366
'unexpected end of file reading from server')
370
class SmartClientMedium(object):
472
371
"""Smart client is a medium for sending smart protocol requests over."""
474
def __init__(self, base):
475
super(SmartClientMedium, self).__init__()
477
self._protocol_version_error = None
478
self._protocol_version = None
479
self._done_hello = False
480
# Be optimistic: we assume the remote end can accept new remote
481
# requests until we get an error saying otherwise.
482
# _remote_version_is_before tracks the bzr version the remote side
483
# can be based on what we've seen so far.
484
self._remote_version_is_before = None
486
def _is_remote_before(self, version_tuple):
487
"""Is it possible the remote side supports RPCs for a given version?
491
needed_version = (1, 2)
492
if medium._is_remote_before(needed_version):
493
fallback_to_pre_1_2_rpc()
497
except UnknownSmartMethod:
498
medium._remember_remote_is_before(needed_version)
499
fallback_to_pre_1_2_rpc()
501
:seealso: _remember_remote_is_before
503
if self._remote_version_is_before is None:
504
# So far, the remote side seems to support everything
506
return version_tuple >= self._remote_version_is_before
508
def _remember_remote_is_before(self, version_tuple):
509
"""Tell this medium that the remote side is older the given version.
511
:seealso: _is_remote_before
513
if (self._remote_version_is_before is not None and
514
version_tuple > self._remote_version_is_before):
515
raise AssertionError(
516
"_remember_remote_is_before(%r) called, but "
517
"_remember_remote_is_before(%r) was called previously."
518
% (version_tuple, self._remote_version_is_before))
519
self._remote_version_is_before = version_tuple
521
def protocol_version(self):
522
"""Find out if 'hello' smart request works."""
523
if self._protocol_version_error is not None:
524
raise self._protocol_version_error
525
if not self._done_hello:
527
medium_request = self.get_request()
528
# Send a 'hello' request in protocol version one, for maximum
529
# backwards compatibility.
530
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
531
client_protocol.query_version()
532
self._done_hello = True
533
except errors.SmartProtocolError, e:
534
# Cache the error, just like we would cache a successful
536
self._protocol_version_error = e
540
def should_probe(self):
541
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
544
Some transports are unambiguously smart-only; there's no need to check
545
if the transport is able to carry smart requests, because that's all
546
it is for. In those cases, this method should return False.
548
But some HTTP transports can sometimes fail to carry smart requests,
549
but still be usuable for accessing remote bzrdirs via plain file
550
accesses. So for those transports, their media should return True here
551
so that RemoteBzrDirFormat can determine if it is appropriate for that
556
373
def disconnect(self):
557
374
"""If this medium maintains a persistent connection, close it.
559
376
The default implementation does nothing.
562
def remote_path_from_transport(self, transport):
563
"""Convert transport into a path suitable for using in a request.
565
Note that the resulting remote path doesn't encode the host name or
566
anything but path, so it is only safe to use it in requests sent over
567
the medium from the matching transport.
569
medium_base = urlutils.join(self.base, '/')
570
rel_url = urlutils.relative_url(medium_base, transport.base)
571
return urllib.unquote(rel_url)
574
380
class SmartClientStreamMedium(SmartClientMedium):
575
381
"""Stream based medium common class.
699
507
"""See SmartClientStreamMedium.read_bytes."""
700
508
if not self._connected:
701
509
raise errors.MediumNotConnected(self)
702
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
706
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
708
BZR_DEFAULT_PORT = 4155
510
return self._read_from.read(count)
711
513
class SmartTCPClientMedium(SmartClientStreamMedium):
712
514
"""A client medium using TCP."""
714
def __init__(self, host, port, base):
516
def __init__(self, host, port):
715
517
"""Creates a client that will connect on the first use."""
716
SmartClientStreamMedium.__init__(self, base)
518
SmartClientStreamMedium.__init__(self)
717
519
self._connected = False
718
520
self._host = host
719
521
self._port = port