31
31
from bzrlib import (
35
36
from bzrlib.smart.protocol import (
36
37
REQUEST_VERSION_TWO,
38
SmartClientRequestProtocolOne,
37
39
SmartServerRequestProtocolOne,
38
40
SmartServerRequestProtocolTwo,
42
from bzrlib.transport import ssh
43
except errors.ParamikoNotPresent:
44
# no paramiko. SmartSSHClientMedium will break.
42
from bzrlib.transport import ssh
48
45
class SmartServerStreamMedium(object):
57
54
The server passes requests through to an underlying backing transport,
58
55
which will typically be a LocalTransport looking at the server's filesystem.
57
:ivar _push_back_buffer: a str of bytes that have been read from the stream
58
but not used yet, or None if there are no buffered bytes. Subclasses
59
should make sure to exhaust this buffer before reading more bytes from
60
the stream. See also the _push_back method.
61
def __init__(self, backing_transport):
63
def __init__(self, backing_transport, root_client_path='/'):
62
64
"""Construct new server.
64
66
:param backing_transport: Transport for the directory served.
66
68
# backing_transport could be passed to serve instead of __init__
67
69
self.backing_transport = backing_transport
70
self.root_client_path = root_client_path
68
71
self.finished = False
72
self._push_back_buffer = None
74
def _push_back(self, bytes):
75
"""Return unused bytes to the medium, because they belong to the next
78
This sets the _push_back_buffer to the given bytes.
80
assert self._push_back_buffer is None, (
81
"_push_back called when self._push_back_buffer is %r"
82
% (self._push_back_buffer,))
85
self._push_back_buffer = bytes
87
def _get_push_back_buffer(self):
88
assert self._push_back_buffer != '', (
89
'%s._push_back_buffer should never be the empty string, '
90
'which can be confused with EOF' % (self,))
91
bytes = self._push_back_buffer
92
self._push_back_buffer = None
71
96
"""Serve requests until the client disconnects."""
96
121
bytes = bytes[len(REQUEST_VERSION_TWO):]
98
123
protocol_class = SmartServerRequestProtocolOne
99
protocol = protocol_class(self.backing_transport, self._write_out)
124
protocol = protocol_class(
125
self.backing_transport, self._write_out, self.root_client_path)
100
126
protocol.accept_bytes(bytes)
132
158
:returns: a string of bytes ending in a newline (byte 0x0A).
134
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
136
while not line or line[-1] != '\n':
137
new_char = self._get_bytes(1)
162
while newline_pos == -1:
163
new_bytes = self._get_bytes(1)
140
166
# Ran out of bytes before receiving a complete line.
168
newline_pos = bytes.find('\n')
169
line = bytes[:newline_pos+1]
170
self._push_back(bytes[newline_pos+1:])
145
174
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
147
def __init__(self, sock, backing_transport):
176
def __init__(self, sock, backing_transport, root_client_path='/'):
150
179
:param sock: the socket the server will read from. It will be put
151
180
into blocking mode.
153
SmartServerStreamMedium.__init__(self, backing_transport)
182
SmartServerStreamMedium.__init__(
183
self, backing_transport, root_client_path=root_client_path)
155
184
sock.setblocking(True)
156
185
self.socket = sock
158
187
def _serve_one_request_unguarded(self, protocol):
159
188
while protocol.next_read_size():
161
protocol.accept_bytes(self.push_back)
164
bytes = self._get_bytes(4096)
168
protocol.accept_bytes(bytes)
189
bytes = self._get_bytes(4096)
193
protocol.accept_bytes(bytes)
170
self.push_back = protocol.excess_buffer
195
self._push_back(protocol.excess_buffer)
172
197
def _get_bytes(self, desired_count):
198
if self._push_back_buffer is not None:
199
return self._get_push_back_buffer()
173
200
# We ignore the desired_count because on sockets it's more efficient to
174
201
# read 4k at a time.
175
202
return self.socket.recv(4096)
182
209
self.finished = True
184
211
def _write_out(self, bytes):
185
self.socket.sendall(bytes)
212
osutils.send_all(self.socket, bytes)
188
215
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
221
248
protocol.accept_bytes(bytes)
223
250
def _get_bytes(self, desired_count):
251
if self._push_back_buffer is not None:
252
return self._get_push_back_buffer()
224
253
return self._in.read(desired_count)
226
255
def terminate_due_to_error(self):
362
391
new_char = self.read_bytes(1)
364
393
if new_char == '':
365
raise errors.SmartProtocolError(
366
'unexpected end of file reading from server')
394
# end of file encountered reading from server
395
raise errors.ConnectionReset(
396
"please check connectivity and permissions",
397
"(and try -Dhpss if further diagnosis is required)")
370
401
class SmartClientMedium(object):
371
402
"""Smart client is a medium for sending smart protocol requests over."""
405
super(SmartClientMedium, self).__init__()
406
self._protocol_version_error = None
407
self._protocol_version = None
409
def protocol_version(self):
410
"""Find out the best protocol version to use."""
411
if self._protocol_version_error is not None:
412
raise self._protocol_version_error
413
if self._protocol_version is None:
415
medium_request = self.get_request()
416
# Send a 'hello' request in protocol version one, for maximum
417
# backwards compatibility.
418
client_protocol = SmartClientRequestProtocolOne(medium_request)
419
self._protocol_version = client_protocol.query_version()
420
except errors.SmartProtocolError, e:
421
# Cache the error, just like we would cache a successful
423
self._protocol_version_error = e
425
return self._protocol_version
373
427
def disconnect(self):
374
428
"""If this medium maintains a persistent connection, close it.
389
443
def __init__(self):
444
SmartClientMedium.__init__(self)
390
445
self._current_request = None
446
# Be optimistic: we assume the remote end can accept new remote
447
# requests until we get an error saying otherwise. (1.2 adds some
448
# requests that send bodies, which confuses older servers.)
449
self._remote_is_at_least_1_2 = True
392
451
def accept_bytes(self, bytes):
393
452
self._accept_bytes(bytes)
510
569
return self._read_from.read(count)
572
# Port 4155 is the default port for bzr://, registered with IANA.
573
BZR_DEFAULT_INTERFACE = '0.0.0.0'
574
BZR_DEFAULT_PORT = 4155
513
577
class SmartTCPClientMedium(SmartClientStreamMedium):
514
578
"""A client medium using TCP."""
524
588
def _accept_bytes(self, bytes):
525
589
"""See SmartClientMedium.accept_bytes."""
526
590
self._ensure_connection()
527
self._socket.sendall(bytes)
591
osutils.send_all(self._socket, bytes)
529
593
def disconnect(self):
530
594
"""See SmartClientMedium.disconnect()."""
541
605
self._socket = socket.socket()
542
606
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
543
result = self._socket.connect_ex((self._host, int(self._port)))
607
if self._port is None:
608
port = BZR_DEFAULT_PORT
610
port = int(self._port)
612
self._socket.connect((self._host, port))
613
except socket.error, err:
614
# socket errors either have a (string) or (errno, string) as their
616
if type(err.args) is str:
619
err_msg = err.args[1]
545
620
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
546
(self._host, self._port, os.strerror(result)))
621
(self._host, port, err_msg))
547
622
self._connected = True
549
624
def _flush(self):