48
from bzrlib.i18n import gettext
49
from bzrlib.smart import client, protocol, request, signals, vfs
45
from bzrlib.smart import client, protocol, request, vfs
50
46
from bzrlib.transport import ssh
52
48
from bzrlib import osutils
179
175
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
190
178
class SmartServerStreamMedium(SmartMedium):
191
179
"""Handles smart commands coming over a stream.
205
193
the stream. See also the _push_back method.
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
196
def __init__(self, backing_transport, root_client_path='/'):
211
197
"""Construct new server.
213
199
:param backing_transport: Transport for the directory served.
216
202
self.backing_transport = backing_transport
217
203
self.root_client_path = root_client_path
218
204
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
205
SmartMedium.__init__(self)
231
213
while not self.finished:
232
214
server_protocol = self._build_protocol()
233
# TODO: This seems inelegant:
234
if server_protocol is None:
235
# We could 'continue' only to notice that self.finished is
238
215
self._serve_one_request(server_protocol)
239
except errors.ConnectionTimeout, e:
240
trace.note('%s' % (e,))
241
trace.log_exception_quietly()
242
self._disconnect_client()
243
# We reported it, no reason to make a big fuss.
245
216
except Exception, e:
246
217
stderr.write("%s terminating on exception %s\n" % (self, e))
248
self._disconnect_client()
250
def _stop_gracefully(self):
251
"""When we finish this message, stop looking for more."""
252
trace.mutter('Stopping %s' % (self,))
255
def _disconnect_client(self):
256
"""Close the current connection. We stopped due to a timeout/etc."""
257
# The default implementation is a no-op, because that is all we used to
258
# do when disconnecting from a client. I suppose we never had the
259
# *server* initiate a disconnect, before
261
def _wait_for_bytes_with_timeout(self, timeout_seconds):
262
"""Wait for more bytes to be read, but timeout if none available.
264
This allows us to detect idle connections, and stop trying to read from
265
them, without setting the socket itself to non-blocking. This also
266
allows us to specify when we watch for idle timeouts.
268
:return: Did we timeout? (True if we timed out, False if there is data
271
raise NotImplementedError(self._wait_for_bytes_with_timeout)
273
220
def _build_protocol(self):
274
221
"""Identifies the version of the incoming request, and returns an
280
227
:returns: a SmartServerRequestProtocol.
282
self._wait_for_bytes_with_timeout(self._client_timeout)
284
# We're stopping, so don't try to do any more work
286
229
bytes = self._get_line()
287
230
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
288
231
protocol = protocol_factory(
290
233
protocol.accept_bytes(unused_bytes)
293
def _wait_on_descriptor(self, fd, timeout_seconds):
294
"""select() on a file descriptor, waiting for nonblocking read()
296
This will raise a ConnectionTimeout exception if we do not get a
297
readable handle before timeout_seconds.
300
t_end = self._timer() + timeout_seconds
301
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
303
while not rs and not xs and self._timer() < t_end:
307
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
308
except (select.error, socket.error) as e:
309
err = getattr(e, 'errno', None)
310
if err is None and getattr(e, 'args', None) is not None:
311
# select.error doesn't have 'errno', it just has args[0]
313
if err in _bad_file_descriptor:
314
return # Not a socket indicates read() will fail
315
elif err == errno.EINTR:
316
# Interrupted, keep looping.
321
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
322
% (timeout_seconds,))
324
236
def _serve_one_request(self, protocol):
325
237
"""Read one request from input, process, send back a response.
348
260
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
350
def __init__(self, sock, backing_transport, root_client_path='/',
262
def __init__(self, sock, backing_transport, root_client_path='/'):
354
265
:param sock: the socket the server will read from. It will be put
355
266
into blocking mode.
357
268
SmartServerStreamMedium.__init__(
358
self, backing_transport, root_client_path=root_client_path,
269
self, backing_transport, root_client_path=root_client_path)
360
270
sock.setblocking(True)
361
271
self.socket = sock
362
# Get the getpeername now, as we might be closed later when we care.
364
self._client_info = sock.getpeername()
366
self._client_info = '<unknown>'
369
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
372
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
375
273
def _serve_one_request_unguarded(self, protocol):
376
274
while protocol.next_read_size():
386
284
self._push_back(protocol.unused_data)
388
def _disconnect_client(self):
389
"""Close the current connection. We stopped due to a timeout/etc."""
392
def _wait_for_bytes_with_timeout(self, timeout_seconds):
393
"""Wait for more bytes to be read, but timeout if none available.
395
This allows us to detect idle connections, and stop trying to read from
396
them, without setting the socket itself to non-blocking. This also
397
allows us to specify when we watch for idle timeouts.
399
:return: None, this will raise ConnectionTimeout if we time out before
402
return self._wait_on_descriptor(self.socket, timeout_seconds)
404
286
def _read_bytes(self, desired_count):
405
287
return osutils.read_bytes_from_socket(
406
288
self.socket, self._report_activity)
424
306
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
426
def __init__(self, in_file, out_file, backing_transport, timeout=None):
308
def __init__(self, in_file, out_file, backing_transport):
427
309
"""Construct new server.
429
311
:param in_file: Python file from which requests can be read.
430
312
:param out_file: Python file to write responses.
431
313
:param backing_transport: Transport for the directory served.
433
SmartServerStreamMedium.__init__(self, backing_transport,
315
SmartServerStreamMedium.__init__(self, backing_transport)
435
316
if sys.platform == 'win32':
436
317
# force binary mode for files
442
323
self._in = in_file
443
324
self._out = out_file
446
"""See SmartServerStreamMedium.serve"""
447
# This is the regular serve, except it adds signal trapping for soft
449
stop_gracefully = self._stop_gracefully
450
signals.register_on_hangup(id(self), stop_gracefully)
452
return super(SmartServerPipeStreamMedium, self).serve()
454
signals.unregister_on_hangup(id(self))
456
326
def _serve_one_request_unguarded(self, protocol):
458
328
# We need to be careful not to read past the end of the current
472
342
protocol.accept_bytes(bytes)
474
def _disconnect_client(self):
479
def _wait_for_bytes_with_timeout(self, timeout_seconds):
480
"""Wait for more bytes to be read, but timeout if none available.
482
This allows us to detect idle connections, and stop trying to read from
483
them, without setting the socket itself to non-blocking. This also
484
allows us to specify when we watch for idle timeouts.
486
:return: None, this will raise ConnectionTimeout if we time out before
489
if (getattr(self._in, 'fileno', None) is None
490
or sys.platform == 'win32'):
491
# You can't select() file descriptors on Windows.
493
return self._wait_on_descriptor(self._in, timeout_seconds)
495
344
def _read_bytes(self, desired_count):
496
345
return self._in.read(desired_count)
641
490
return self._medium._get_line()
644
class _VfsRefuser(object):
645
"""An object that refuses all VFS requests.
650
client._SmartClient.hooks.install_named_hook(
651
'call', self.check_vfs, 'vfs refuser')
653
def check_vfs(self, params):
655
request_method = request.request_handlers.get(params.method)
657
# A method we don't know about doesn't count as a VFS method.
659
if issubclass(request_method, vfs.VfsRequest):
660
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
663
493
class _DebugCounter(object):
664
494
"""An object that counts the HPSS calls made to each client medium.
712
542
value['count'] = 0
713
543
value['vfs_count'] = 0
715
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
716
count, vfs_count, medium_repr))
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
718
548
def flush_all(self):
719
549
for ref in list(self.counts.keys()):
722
552
_debug_counter = None
726
555
class SmartClientMedium(SmartMedium):
743
572
if _debug_counter is None:
744
573
_debug_counter = _DebugCounter()
745
574
_debug_counter.track(self)
746
if 'hpss_client_no_vfs' in debug.debug_flags:
748
if _vfs_refuser is None:
749
_vfs_refuser = _VfsRefuser()
751
576
def _is_remote_before(self, version_tuple):
752
577
"""Is it possible the remote side supports RPCs for a given version?