32
from bzrlib.lazy_import import lazy_import
33
lazy_import(globals(), """
31
from bzrlib import errors
32
from bzrlib.smart.protocol import (
34
SmartServerRequestProtocolOne,
35
SmartServerRequestProtocolTwo,
40
from bzrlib.smart import protocol
41
from bzrlib.transport import ssh
45
# We must not read any more than 64k at a time so we don't risk "no buffer
46
# space available" errors on some platforms. Windows in particular is likely
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
48
_MAX_READ_SIZE = 64 * 1024
51
def _get_protocol_factory_for_bytes(bytes):
52
"""Determine the right protocol factory for 'bytes'.
54
This will return an appropriate protocol factory depending on the version
55
of the protocol being used, as determined by inspecting the given bytes.
56
The bytes should have at least one newline byte (i.e. be a whole line),
57
otherwise it's possible that a request will be incorrectly identified as
60
Typical use would be::
62
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
63
server_protocol = factory(transport, write_func, root_client_path)
64
server_protocol.accept_bytes(unused_bytes)
66
:param bytes: a str of bytes of the start of the request.
67
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
68
a callable that takes three args: transport, write_func,
69
root_client_path. unused_bytes are any bytes that were not part of a
70
protocol version marker.
72
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
73
protocol_factory = protocol.build_server_protocol_three
74
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
75
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
76
protocol_factory = protocol.SmartServerRequestProtocolTwo
77
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
79
protocol_factory = protocol.SmartServerRequestProtocolOne
80
return protocol_factory, bytes
83
def _get_line(read_bytes_func):
84
"""Read bytes using read_bytes_func until a newline byte.
86
This isn't particularly efficient, so should only be used when the
87
expected size of the line is quite short.
89
:returns: a tuple of two strs: (line, excess)
93
while newline_pos == -1:
94
new_bytes = read_bytes_func(1)
97
# Ran out of bytes before receiving a complete line.
99
newline_pos = bytes.find('\n')
100
line = bytes[:newline_pos+1]
101
excess = bytes[newline_pos+1:]
105
class SmartMedium(object):
106
"""Base class for smart protocol media, both client- and server-side."""
109
self._push_back_buffer = None
111
def _push_back(self, bytes):
112
"""Return unused bytes to the medium, because they belong to the next
115
This sets the _push_back_buffer to the given bytes.
117
if self._push_back_buffer is not None:
118
raise AssertionError(
119
"_push_back called when self._push_back_buffer is %r"
120
% (self._push_back_buffer,))
123
self._push_back_buffer = bytes
125
def _get_push_back_buffer(self):
126
if self._push_back_buffer == '':
127
raise AssertionError(
128
'%s._push_back_buffer should never be the empty string, '
129
'which can be confused with EOF' % (self,))
130
bytes = self._push_back_buffer
131
self._push_back_buffer = None
134
def read_bytes(self, desired_count):
135
"""Read some bytes from this medium.
137
:returns: some bytes, possibly more or less than the number requested
138
in 'desired_count' depending on the medium.
140
if self._push_back_buffer is not None:
141
return self._get_push_back_buffer()
142
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
143
return self._read_bytes(bytes_to_read)
145
def _read_bytes(self, count):
146
raise NotImplementedError(self._read_bytes)
149
"""Read bytes from this request's response until a newline byte.
151
This isn't particularly efficient, so should only be used when the
152
expected size of the line is quite short.
154
:returns: a string of bytes ending in a newline (byte 0x0A).
156
line, excess = _get_line(self.read_bytes)
157
self._push_back(excess)
161
class SmartServerStreamMedium(SmartMedium):
39
from bzrlib.transport import ssh
40
except errors.ParamikoNotPresent:
41
# no paramiko. SmartSSHClientMedium will break.
45
class SmartServerStreamMedium(object):
162
46
"""Handles smart commands coming over a stream.
164
48
The stream may be a pipe connected to sshd, or a tcp socket, or an
232
113
"""Called when an unhandled exception from the protocol occurs."""
233
114
raise NotImplementedError(self.terminate_due_to_error)
235
def _read_bytes(self, desired_count):
116
def _get_bytes(self, desired_count):
236
117
"""Get some bytes from the medium.
238
119
:param desired_count: number of bytes we want to read.
240
raise NotImplementedError(self._read_bytes)
121
raise NotImplementedError(self._get_bytes)
124
"""Read bytes from this request's response until a newline byte.
126
This isn't particularly efficient, so should only be used when the
127
expected size of the line is quite short.
129
:returns: a string of bytes ending in a newline (byte 0x0A).
131
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
133
while not line or line[-1] != '\n':
134
new_char = self._get_bytes(1)
137
# Ran out of bytes before receiving a complete line.
243
142
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
245
def __init__(self, sock, backing_transport, root_client_path='/'):
144
def __init__(self, sock, backing_transport):
248
147
:param sock: the socket the server will read from. It will be put
249
148
into blocking mode.
251
SmartServerStreamMedium.__init__(
252
self, backing_transport, root_client_path=root_client_path)
150
SmartServerStreamMedium.__init__(self, backing_transport)
253
152
sock.setblocking(True)
254
153
self.socket = sock
256
155
def _serve_one_request_unguarded(self, protocol):
257
156
while protocol.next_read_size():
258
# We can safely try to read large chunks. If there is less data
259
# than _MAX_READ_SIZE ready, the socket wil just return a short
260
# read immediately rather than block.
261
bytes = self.read_bytes(_MAX_READ_SIZE)
265
protocol.accept_bytes(bytes)
158
protocol.accept_bytes(self.push_back)
161
bytes = self._get_bytes(4096)
165
protocol.accept_bytes(bytes)
267
self._push_back(protocol.unused_data)
167
self.push_back = protocol.excess_buffer
269
def _read_bytes(self, desired_count):
169
def _get_bytes(self, desired_count):
270
170
# We ignore the desired_count because on sockets it's more efficient to
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
172
return self.socket.recv(4096)
274
174
def terminate_due_to_error(self):
175
"""Called when an unhandled exception from the protocol occurs."""
275
176
# TODO: This should log to a server log file, but no such thing
276
177
# exists yet. Andrew Bennetts 2006-09-29.
277
178
self.socket.close()
278
179
self.finished = True
280
181
def _write_out(self, bytes):
281
osutils.send_all(self.socket, bytes)
182
self.socket.sendall(bytes)
284
185
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
439
337
return self._read_bytes(count)
441
339
def _read_bytes(self, count):
442
"""Helper for SmartClientMediumRequest.read_bytes.
340
"""Helper for read_bytes.
444
342
read_bytes checks the state of the request to determing if bytes
445
343
should be read. After that it hands off to _read_bytes to do the
448
By default this forwards to self._medium.read_bytes because we are
449
operating on the medium's stream.
451
return self._medium.read_bytes(count)
346
raise NotImplementedError(self._read_bytes)
453
348
def read_line(self):
454
line = self._read_line()
455
if not line.endswith('\n'):
456
# end of file encountered reading from server
457
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
462
def _read_line(self):
463
"""Helper for SmartClientMediumRequest.read_line.
349
"""Read bytes from this request's response until a newline byte.
465
By default this forwards to self._medium._get_line because we are
466
operating on the medium's stream.
351
This isn't particularly efficient, so should only be used when the
352
expected size of the line is quite short.
354
:returns: a string of bytes ending in a newline (byte 0x0A).
468
return self._medium._get_line()
471
class SmartClientMedium(SmartMedium):
356
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
358
while not line or line[-1] != '\n':
359
new_char = self.read_bytes(1)
362
raise errors.SmartProtocolError(
363
'unexpected end of file reading from server')
367
class SmartClientMedium(object):
472
368
"""Smart client is a medium for sending smart protocol requests over."""
474
def __init__(self, base):
475
super(SmartClientMedium, self).__init__()
477
self._protocol_version_error = None
478
self._protocol_version = None
479
self._done_hello = False
480
# Be optimistic: we assume the remote end can accept new remote
481
# requests until we get an error saying otherwise.
482
# _remote_version_is_before tracks the bzr version the remote side
483
# can be based on what we've seen so far.
484
self._remote_version_is_before = None
486
def _is_remote_before(self, version_tuple):
487
"""Is it possible the remote side supports RPCs for a given version?
491
needed_version = (1, 2)
492
if medium._is_remote_before(needed_version):
493
fallback_to_pre_1_2_rpc()
497
except UnknownSmartMethod:
498
medium._remember_remote_is_before(needed_version)
499
fallback_to_pre_1_2_rpc()
501
:seealso: _remember_remote_is_before
503
if self._remote_version_is_before is None:
504
# So far, the remote side seems to support everything
506
return version_tuple >= self._remote_version_is_before
508
def _remember_remote_is_before(self, version_tuple):
509
"""Tell this medium that the remote side is older the given version.
511
:seealso: _is_remote_before
513
if (self._remote_version_is_before is not None and
514
version_tuple > self._remote_version_is_before):
515
raise AssertionError(
516
"_remember_remote_is_before(%r) called, but "
517
"_remember_remote_is_before(%r) was called previously."
518
% (version_tuple, self._remote_version_is_before))
519
self._remote_version_is_before = version_tuple
521
def protocol_version(self):
522
"""Find out if 'hello' smart request works."""
523
if self._protocol_version_error is not None:
524
raise self._protocol_version_error
525
if not self._done_hello:
527
medium_request = self.get_request()
528
# Send a 'hello' request in protocol version one, for maximum
529
# backwards compatibility.
530
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
531
client_protocol.query_version()
532
self._done_hello = True
533
except errors.SmartProtocolError, e:
534
# Cache the error, just like we would cache a successful
536
self._protocol_version_error = e
540
def should_probe(self):
541
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
544
Some transports are unambiguously smart-only; there's no need to check
545
if the transport is able to carry smart requests, because that's all
546
it is for. In those cases, this method should return False.
548
But some HTTP transports can sometimes fail to carry smart requests,
549
but still be usuable for accessing remote bzrdirs via plain file
550
accesses. So for those transports, their media should return True here
551
so that RemoteBzrDirFormat can determine if it is appropriate for that
556
370
def disconnect(self):
557
371
"""If this medium maintains a persistent connection, close it.
559
373
The default implementation does nothing.
562
def remote_path_from_transport(self, transport):
563
"""Convert transport into a path suitable for using in a request.
565
Note that the resulting remote path doesn't encode the host name or
566
anything but path, so it is only safe to use it in requests sent over
567
the medium from the matching transport.
569
medium_base = urlutils.join(self.base, '/')
570
rel_url = urlutils.relative_url(medium_base, transport.base)
571
return urllib.unquote(rel_url)
574
377
class SmartClientStreamMedium(SmartClientMedium):
575
378
"""Stream based medium common class.
699
499
"""See SmartClientStreamMedium.read_bytes."""
700
500
if not self._connected:
701
501
raise errors.MediumNotConnected(self)
702
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
706
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
708
BZR_DEFAULT_PORT = 4155
502
return self._read_from.read(count)
711
505
class SmartTCPClientMedium(SmartClientStreamMedium):
712
506
"""A client medium using TCP."""
714
def __init__(self, host, port, base):
508
def __init__(self, host, port):
715
509
"""Creates a client that will connect on the first use."""
716
SmartClientStreamMedium.__init__(self, base)
510
SmartClientStreamMedium.__init__(self)
717
511
self._connected = False
718
512
self._host = host
719
513
self._port = port