45
48
from bzrlib.i18n import gettext
46
from bzrlib.smart import client, protocol, request, vfs
49
from bzrlib.smart import client, protocol, request, signals, vfs
47
50
from bzrlib.transport import ssh
49
52
from bzrlib import osutils
176
179
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
179
190
class SmartServerStreamMedium(SmartMedium):
180
191
"""Handles smart commands coming over a stream.
194
205
the stream. See also the _push_back method.
197
def __init__(self, backing_transport, root_client_path='/'):
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
211
"""Construct new server.
200
213
:param backing_transport: Transport for the directory served.
203
216
self.backing_transport = backing_transport
204
217
self.root_client_path = root_client_path
205
218
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
223
SmartMedium.__init__(self)
214
231
while not self.finished:
215
232
server_protocol = self._build_protocol()
233
# TODO: This seems inelegant:
234
if server_protocol is None:
235
# We could 'continue' only to notice that self.finished is
216
238
self._serve_one_request(server_protocol)
239
except errors.ConnectionTimeout, e:
240
trace.note('%s' % (e,))
241
trace.log_exception_quietly()
242
self._disconnect_client()
243
# We reported it, no reason to make a big fuss.
217
245
except Exception, e:
218
246
stderr.write("%s terminating on exception %s\n" % (self, e))
248
self._disconnect_client()
250
def _stop_gracefully(self):
251
"""When we finish this message, stop looking for more."""
252
trace.mutter('Stopping %s' % (self,))
255
def _disconnect_client(self):
256
"""Close the current connection. We stopped due to a timeout/etc."""
257
# The default implementation is a no-op, because that is all we used to
258
# do when disconnecting from a client. I suppose we never had the
259
# *server* initiate a disconnect, before
261
def _wait_for_bytes_with_timeout(self, timeout_seconds):
262
"""Wait for more bytes to be read, but timeout if none available.
264
This allows us to detect idle connections, and stop trying to read from
265
them, without setting the socket itself to non-blocking. This also
266
allows us to specify when we watch for idle timeouts.
268
:return: Did we timeout? (True if we timed out, False if there is data
271
raise NotImplementedError(self._wait_for_bytes_with_timeout)
221
273
def _build_protocol(self):
222
274
"""Identifies the version of the incoming request, and returns an
228
280
:returns: a SmartServerRequestProtocol.
282
self._wait_for_bytes_with_timeout(self._client_timeout)
284
# We're stopping, so don't try to do any more work
230
286
bytes = self._get_line()
231
287
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
288
protocol = protocol_factory(
234
290
protocol.accept_bytes(unused_bytes)
293
def _wait_on_descriptor(self, fd, timeout_seconds):
294
"""select() on a file descriptor, waiting for nonblocking read()
296
This will raise a ConnectionTimeout exception if we do not get a
297
readable handle before timeout_seconds.
300
t_end = self._timer() + timeout_seconds
301
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
303
while not rs and not xs and self._timer() < t_end:
307
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
308
except (select.error, socket.error) as e:
309
err = getattr(e, 'errno', None)
310
if err is None and getattr(e, 'args', None) is not None:
311
# select.error doesn't have 'errno', it just has args[0]
313
if err in _bad_file_descriptor:
314
return # Not a socket indicates read() will fail
315
elif err == errno.EINTR:
316
# Interrupted, keep looping.
321
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
322
% (timeout_seconds,))
237
324
def _serve_one_request(self, protocol):
238
325
"""Read one request from input, process, send back a response.
261
348
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
263
def __init__(self, sock, backing_transport, root_client_path='/'):
350
def __init__(self, sock, backing_transport, root_client_path='/',
266
354
:param sock: the socket the server will read from. It will be put
267
355
into blocking mode.
269
357
SmartServerStreamMedium.__init__(
270
self, backing_transport, root_client_path=root_client_path)
358
self, backing_transport, root_client_path=root_client_path,
271
360
sock.setblocking(True)
272
361
self.socket = sock
362
# Get the getpeername now, as we might be closed later when we care.
364
self._client_info = sock.getpeername()
366
self._client_info = '<unknown>'
369
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
372
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
274
375
def _serve_one_request_unguarded(self, protocol):
275
376
while protocol.next_read_size():
285
386
self._push_back(protocol.unused_data)
388
def _disconnect_client(self):
389
"""Close the current connection. We stopped due to a timeout/etc."""
392
def _wait_for_bytes_with_timeout(self, timeout_seconds):
393
"""Wait for more bytes to be read, but timeout if none available.
395
This allows us to detect idle connections, and stop trying to read from
396
them, without setting the socket itself to non-blocking. This also
397
allows us to specify when we watch for idle timeouts.
399
:return: None, this will raise ConnectionTimeout if we time out before
402
return self._wait_on_descriptor(self.socket, timeout_seconds)
287
404
def _read_bytes(self, desired_count):
288
405
return osutils.read_bytes_from_socket(
289
406
self.socket, self._report_activity)
307
424
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
309
def __init__(self, in_file, out_file, backing_transport):
426
def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
427
"""Construct new server.
312
429
:param in_file: Python file from which requests can be read.
313
430
:param out_file: Python file to write responses.
314
431
:param backing_transport: Transport for the directory served.
316
SmartServerStreamMedium.__init__(self, backing_transport)
433
SmartServerStreamMedium.__init__(self, backing_transport,
317
435
if sys.platform == 'win32':
318
436
# force binary mode for files
324
442
self._in = in_file
325
443
self._out = out_file
446
"""See SmartServerStreamMedium.serve"""
447
# This is the regular serve, except it adds signal trapping for soft
449
stop_gracefully = self._stop_gracefully
450
signals.register_on_hangup(id(self), stop_gracefully)
452
return super(SmartServerPipeStreamMedium, self).serve()
454
signals.unregister_on_hangup(id(self))
327
456
def _serve_one_request_unguarded(self, protocol):
329
458
# We need to be careful not to read past the end of the current
343
472
protocol.accept_bytes(bytes)
474
def _disconnect_client(self):
479
def _wait_for_bytes_with_timeout(self, timeout_seconds):
480
"""Wait for more bytes to be read, but timeout if none available.
482
This allows us to detect idle connections, and stop trying to read from
483
them, without setting the socket itself to non-blocking. This also
484
allows us to specify when we watch for idle timeouts.
486
:return: None, this will raise ConnectionTimeout if we time out before
489
if (getattr(self._in, 'fileno', None) is None
490
or sys.platform == 'win32'):
491
# You can't select() file descriptors on Windows.
493
return self._wait_on_descriptor(self._in, timeout_seconds)
345
495
def _read_bytes(self, desired_count):
346
496
return self._in.read(desired_count)