32
from bzrlib.lazy_import import lazy_import
33
lazy_import(globals(), """
31
34
from bzrlib import (
35
from bzrlib.smart.protocol import (
37
SmartServerRequestProtocolOne,
38
SmartServerRequestProtocolTwo,
42
from bzrlib.transport import ssh
43
except errors.ParamikoNotPresent:
44
# no paramiko. SmartSSHClientMedium will break.
48
class SmartServerStreamMedium(object):
40
from bzrlib.smart import protocol
41
from bzrlib.transport import ssh
45
# We must not read any more than 64k at a time so we don't risk "no buffer
46
# space available" errors on some platforms. Windows in particular is likely
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
48
_MAX_READ_SIZE = 64 * 1024
51
def _get_protocol_factory_for_bytes(bytes):
52
"""Determine the right protocol factory for 'bytes'.
54
This will return an appropriate protocol factory depending on the version
55
of the protocol being used, as determined by inspecting the given bytes.
56
The bytes should have at least one newline byte (i.e. be a whole line),
57
otherwise it's possible that a request will be incorrectly identified as
60
Typical use would be::
62
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
63
server_protocol = factory(transport, write_func, root_client_path)
64
server_protocol.accept_bytes(unused_bytes)
66
:param bytes: a str of bytes of the start of the request.
67
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
68
a callable that takes three args: transport, write_func,
69
root_client_path. unused_bytes are any bytes that were not part of a
70
protocol version marker.
72
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
73
protocol_factory = protocol.build_server_protocol_three
74
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
75
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
76
protocol_factory = protocol.SmartServerRequestProtocolTwo
77
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
79
protocol_factory = protocol.SmartServerRequestProtocolOne
80
return protocol_factory, bytes
83
def _get_line(read_bytes_func):
84
"""Read bytes using read_bytes_func until a newline byte.
86
This isn't particularly efficient, so should only be used when the
87
expected size of the line is quite short.
89
:returns: a tuple of two strs: (line, excess)
93
while newline_pos == -1:
94
new_bytes = read_bytes_func(1)
97
# Ran out of bytes before receiving a complete line.
99
newline_pos = bytes.find('\n')
100
line = bytes[:newline_pos+1]
101
excess = bytes[newline_pos+1:]
105
class SmartMedium(object):
106
"""Base class for smart protocol media, both client- and server-side."""
109
self._push_back_buffer = None
111
def _push_back(self, bytes):
112
"""Return unused bytes to the medium, because they belong to the next
115
This sets the _push_back_buffer to the given bytes.
117
if self._push_back_buffer is not None:
118
raise AssertionError(
119
"_push_back called when self._push_back_buffer is %r"
120
% (self._push_back_buffer,))
123
self._push_back_buffer = bytes
125
def _get_push_back_buffer(self):
126
if self._push_back_buffer == '':
127
raise AssertionError(
128
'%s._push_back_buffer should never be the empty string, '
129
'which can be confused with EOF' % (self,))
130
bytes = self._push_back_buffer
131
self._push_back_buffer = None
134
def read_bytes(self, desired_count):
135
"""Read some bytes from this medium.
137
:returns: some bytes, possibly more or less than the number requested
138
in 'desired_count' depending on the medium.
140
if self._push_back_buffer is not None:
141
return self._get_push_back_buffer()
142
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
143
return self._read_bytes(bytes_to_read)
145
def _read_bytes(self, count):
146
raise NotImplementedError(self._read_bytes)
149
"""Read bytes from this request's response until a newline byte.
151
This isn't particularly efficient, so should only be used when the
152
expected size of the line is quite short.
154
:returns: a string of bytes ending in a newline (byte 0x0A).
156
line, excess = _get_line(self.read_bytes)
157
self._push_back(excess)
161
class SmartServerStreamMedium(SmartMedium):
49
162
"""Handles smart commands coming over a stream.
51
164
The stream may be a pipe connected to sshd, or a tcp socket, or an
116
232
"""Called when an unhandled exception from the protocol occurs."""
117
233
raise NotImplementedError(self.terminate_due_to_error)
119
def _get_bytes(self, desired_count):
235
def _read_bytes(self, desired_count):
120
236
"""Get some bytes from the medium.
122
238
:param desired_count: number of bytes we want to read.
124
raise NotImplementedError(self._get_bytes)
127
"""Read bytes from this request's response until a newline byte.
129
This isn't particularly efficient, so should only be used when the
130
expected size of the line is quite short.
132
:returns: a string of bytes ending in a newline (byte 0x0A).
134
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
136
while not line or line[-1] != '\n':
137
new_char = self._get_bytes(1)
140
# Ran out of bytes before receiving a complete line.
240
raise NotImplementedError(self._read_bytes)
145
243
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
147
def __init__(self, sock, backing_transport):
245
def __init__(self, sock, backing_transport, root_client_path='/'):
150
248
:param sock: the socket the server will read from. It will be put
151
249
into blocking mode.
153
SmartServerStreamMedium.__init__(self, backing_transport)
251
SmartServerStreamMedium.__init__(
252
self, backing_transport, root_client_path=root_client_path)
155
253
sock.setblocking(True)
156
254
self.socket = sock
158
256
def _serve_one_request_unguarded(self, protocol):
159
257
while protocol.next_read_size():
161
protocol.accept_bytes(self.push_back)
164
bytes = self._get_bytes(4096)
168
protocol.accept_bytes(bytes)
258
# We can safely try to read large chunks. If there is less data
259
# than _MAX_READ_SIZE ready, the socket wil just return a short
260
# read immediately rather than block.
261
bytes = self.read_bytes(_MAX_READ_SIZE)
265
protocol.accept_bytes(bytes)
170
self.push_back = protocol.excess_buffer
267
self._push_back(protocol.unused_data)
172
def _get_bytes(self, desired_count):
269
def _read_bytes(self, desired_count):
173
270
# We ignore the desired_count because on sockets it's more efficient to
175
return self.socket.recv(4096)
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
177
274
def terminate_due_to_error(self):
178
"""Called when an unhandled exception from the protocol occurs."""
179
275
# TODO: This should log to a server log file, but no such thing
180
276
# exists yet. Andrew Bennetts 2006-09-29.
181
277
self.socket.close()
182
278
self.finished = True
184
280
def _write_out(self, bytes):
185
self.socket.sendall(bytes)
281
osutils.send_all(self.socket, bytes)
188
284
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
340
439
return self._read_bytes(count)
342
441
def _read_bytes(self, count):
343
"""Helper for read_bytes.
442
"""Helper for SmartClientMediumRequest.read_bytes.
345
444
read_bytes checks the state of the request to determing if bytes
346
445
should be read. After that it hands off to _read_bytes to do the
448
By default this forwards to self._medium.read_bytes because we are
449
operating on the medium's stream.
349
raise NotImplementedError(self._read_bytes)
451
return self._medium.read_bytes(count)
351
453
def read_line(self):
352
"""Read bytes from this request's response until a newline byte.
454
line = self._read_line()
455
if not line.endswith('\n'):
456
# end of file encountered reading from server
457
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
462
def _read_line(self):
463
"""Helper for SmartClientMediumRequest.read_line.
354
This isn't particularly efficient, so should only be used when the
355
expected size of the line is quite short.
357
:returns: a string of bytes ending in a newline (byte 0x0A).
465
By default this forwards to self._medium._get_line because we are
466
operating on the medium's stream.
359
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
361
while not line or line[-1] != '\n':
362
new_char = self.read_bytes(1)
365
raise errors.SmartProtocolError(
366
'unexpected end of file reading from server')
370
class SmartClientMedium(object):
468
return self._medium._get_line()
471
class SmartClientMedium(SmartMedium):
371
472
"""Smart client is a medium for sending smart protocol requests over."""
474
def __init__(self, base):
475
super(SmartClientMedium, self).__init__()
477
self._protocol_version_error = None
478
self._protocol_version = None
479
self._done_hello = False
480
# Be optimistic: we assume the remote end can accept new remote
481
# requests until we get an error saying otherwise.
482
# _remote_version_is_before tracks the bzr version the remote side
483
# can be based on what we've seen so far.
484
self._remote_version_is_before = None
486
def _is_remote_before(self, version_tuple):
487
"""Is it possible the remote side supports RPCs for a given version?
491
needed_version = (1, 2)
492
if medium._is_remote_before(needed_version):
493
fallback_to_pre_1_2_rpc()
497
except UnknownSmartMethod:
498
medium._remember_remote_is_before(needed_version)
499
fallback_to_pre_1_2_rpc()
501
:seealso: _remember_remote_is_before
503
if self._remote_version_is_before is None:
504
# So far, the remote side seems to support everything
506
return version_tuple >= self._remote_version_is_before
508
def _remember_remote_is_before(self, version_tuple):
509
"""Tell this medium that the remote side is older the given version.
511
:seealso: _is_remote_before
513
if (self._remote_version_is_before is not None and
514
version_tuple > self._remote_version_is_before):
515
raise AssertionError(
516
"_remember_remote_is_before(%r) called, but "
517
"_remember_remote_is_before(%r) was called previously."
518
% (version_tuple, self._remote_version_is_before))
519
self._remote_version_is_before = version_tuple
521
def protocol_version(self):
522
"""Find out if 'hello' smart request works."""
523
if self._protocol_version_error is not None:
524
raise self._protocol_version_error
525
if not self._done_hello:
527
medium_request = self.get_request()
528
# Send a 'hello' request in protocol version one, for maximum
529
# backwards compatibility.
530
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
531
client_protocol.query_version()
532
self._done_hello = True
533
except errors.SmartProtocolError, e:
534
# Cache the error, just like we would cache a successful
536
self._protocol_version_error = e
540
def should_probe(self):
541
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
544
Some transports are unambiguously smart-only; there's no need to check
545
if the transport is able to carry smart requests, because that's all
546
it is for. In those cases, this method should return False.
548
But some HTTP transports can sometimes fail to carry smart requests,
549
but still be usuable for accessing remote bzrdirs via plain file
550
accesses. So for those transports, their media should return True here
551
so that RemoteBzrDirFormat can determine if it is appropriate for that
373
556
def disconnect(self):
374
557
"""If this medium maintains a persistent connection, close it.
376
559
The default implementation does nothing.
562
def remote_path_from_transport(self, transport):
563
"""Convert transport into a path suitable for using in a request.
565
Note that the resulting remote path doesn't encode the host name or
566
anything but path, so it is only safe to use it in requests sent over
567
the medium from the matching transport.
569
medium_base = urlutils.join(self.base, '/')
570
rel_url = urlutils.relative_url(medium_base, transport.base)
571
return urllib.unquote(rel_url)
380
574
class SmartClientStreamMedium(SmartClientMedium):
381
575
"""Stream based medium common class.
507
699
"""See SmartClientStreamMedium.read_bytes."""
508
700
if not self._connected:
509
701
raise errors.MediumNotConnected(self)
510
return self._read_from.read(count)
702
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
706
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
708
BZR_DEFAULT_PORT = 4155
513
711
class SmartTCPClientMedium(SmartClientStreamMedium):
514
712
"""A client medium using TCP."""
516
def __init__(self, host, port):
714
def __init__(self, host, port, base):
517
715
"""Creates a client that will connect on the first use."""
518
SmartClientStreamMedium.__init__(self)
716
SmartClientStreamMedium.__init__(self, base)
519
717
self._connected = False
520
718
self._host = host
521
719
self._port = port