48
from bzrlib.i18n import gettext
49
from bzrlib.smart import client, protocol, request, signals, vfs
45
from bzrlib.smart import client, protocol, request, vfs
50
46
from bzrlib.transport import ssh
52
48
from bzrlib import osutils
179
175
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
190
178
class SmartServerStreamMedium(SmartMedium):
191
179
"""Handles smart commands coming over a stream.
205
193
the stream. See also the _push_back method.
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
196
def __init__(self, backing_transport, root_client_path='/'):
211
197
"""Construct new server.
213
199
:param backing_transport: Transport for the directory served.
216
202
self.backing_transport = backing_transport
217
203
self.root_client_path = root_client_path
218
204
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
205
SmartMedium.__init__(self)
231
213
while not self.finished:
232
214
server_protocol = self._build_protocol()
233
215
self._serve_one_request(server_protocol)
234
except errors.ConnectionTimeout, e:
235
trace.note('%s' % (e,))
236
trace.log_exception_quietly()
237
self._disconnect_client()
238
# We reported it, no reason to make a big fuss.
240
216
except Exception, e:
241
217
stderr.write("%s terminating on exception %s\n" % (self, e))
243
self._disconnect_client()
245
def _stop_gracefully(self):
246
"""When we finish this message, stop looking for more."""
247
trace.mutter('Stopping %s' % (self,))
250
def _disconnect_client(self):
251
"""Close the current connection. We stopped due to a timeout/etc."""
252
# The default implementation is a no-op, because that is all we used to
253
# do when disconnecting from a client. I suppose we never had the
254
# *server* initiate a disconnect, before
256
def _wait_for_bytes_with_timeout(self, timeout_seconds):
257
"""Wait for more bytes to be read, but timeout if none available.
259
This allows us to detect idle connections, and stop trying to read from
260
them, without setting the socket itself to non-blocking. This also
261
allows us to specify when we watch for idle timeouts.
263
:return: Did we timeout? (True if we timed out, False if there is data
266
raise NotImplementedError(self._wait_for_bytes_with_timeout)
268
220
def _build_protocol(self):
269
221
"""Identifies the version of the incoming request, and returns an
275
227
:returns: a SmartServerRequestProtocol.
277
self._wait_for_bytes_with_timeout(self._client_timeout)
279
# We're stopping, so don't try to do any more work
281
229
bytes = self._get_line()
282
230
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
283
231
protocol = protocol_factory(
285
233
protocol.accept_bytes(unused_bytes)
288
def _wait_on_descriptor(self, fd, timeout_seconds):
289
"""select() on a file descriptor, waiting for nonblocking read()
291
This will raise a ConnectionTimeout exception if we do not get a
292
readable handle before timeout_seconds.
295
t_end = self._timer() + timeout_seconds
296
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
298
while not rs and not xs and self._timer() < t_end:
302
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
303
except (select.error, socket.error) as e:
304
err = getattr(e, 'errno', None)
305
if err is None and getattr(e, 'args', None) is not None:
306
# select.error doesn't have 'errno', it just has args[0]
308
if err in _bad_file_descriptor:
309
return # Not a socket indicates read() will fail
310
elif err == errno.EINTR:
311
# Interrupted, keep looping.
316
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
317
% (timeout_seconds,))
319
236
def _serve_one_request(self, protocol):
320
237
"""Read one request from input, process, send back a response.
322
239
:param protocol: a SmartServerRequestProtocol.
327
242
self._serve_one_request_unguarded(protocol)
328
243
except KeyboardInterrupt:
345
260
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
347
def __init__(self, sock, backing_transport, root_client_path='/',
262
def __init__(self, sock, backing_transport, root_client_path='/'):
351
265
:param sock: the socket the server will read from. It will be put
352
266
into blocking mode.
354
268
SmartServerStreamMedium.__init__(
355
self, backing_transport, root_client_path=root_client_path,
269
self, backing_transport, root_client_path=root_client_path)
357
270
sock.setblocking(True)
358
271
self.socket = sock
359
# Get the getpeername now, as we might be closed later when we care.
361
self._client_info = sock.getpeername()
363
self._client_info = '<unknown>'
366
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
369
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
372
273
def _serve_one_request_unguarded(self, protocol):
373
274
while protocol.next_read_size():
383
284
self._push_back(protocol.unused_data)
385
def _disconnect_client(self):
386
"""Close the current connection. We stopped due to a timeout/etc."""
389
def _wait_for_bytes_with_timeout(self, timeout_seconds):
390
"""Wait for more bytes to be read, but timeout if none available.
392
This allows us to detect idle connections, and stop trying to read from
393
them, without setting the socket itself to non-blocking. This also
394
allows us to specify when we watch for idle timeouts.
396
:return: None, this will raise ConnectionTimeout if we time out before
399
return self._wait_on_descriptor(self.socket, timeout_seconds)
401
286
def _read_bytes(self, desired_count):
402
287
return osutils.read_bytes_from_socket(
403
288
self.socket, self._report_activity)
421
306
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
423
def __init__(self, in_file, out_file, backing_transport, timeout=None):
308
def __init__(self, in_file, out_file, backing_transport):
424
309
"""Construct new server.
426
311
:param in_file: Python file from which requests can be read.
427
312
:param out_file: Python file to write responses.
428
313
:param backing_transport: Transport for the directory served.
430
SmartServerStreamMedium.__init__(self, backing_transport,
315
SmartServerStreamMedium.__init__(self, backing_transport)
432
316
if sys.platform == 'win32':
433
317
# force binary mode for files
439
323
self._in = in_file
440
324
self._out = out_file
443
"""See SmartServerStreamMedium.serve"""
444
# This is the regular serve, except it adds signal trapping for soft
446
stop_gracefully = self._stop_gracefully
447
signals.register_on_hangup(id(self), stop_gracefully)
449
return super(SmartServerPipeStreamMedium, self).serve()
451
signals.unregister_on_hangup(id(self))
453
326
def _serve_one_request_unguarded(self, protocol):
455
328
# We need to be careful not to read past the end of the current
469
342
protocol.accept_bytes(bytes)
471
def _disconnect_client(self):
476
def _wait_for_bytes_with_timeout(self, timeout_seconds):
477
"""Wait for more bytes to be read, but timeout if none available.
479
This allows us to detect idle connections, and stop trying to read from
480
them, without setting the socket itself to non-blocking. This also
481
allows us to specify when we watch for idle timeouts.
483
:return: None, this will raise ConnectionTimeout if we time out before
486
if (getattr(self._in, 'fileno', None) is None
487
or sys.platform == 'win32'):
488
# You can't select() file descriptors on Windows.
490
return self._wait_on_descriptor(self._in, timeout_seconds)
492
344
def _read_bytes(self, desired_count):
493
345
return self._in.read(desired_count)
638
490
return self._medium._get_line()
641
class _VfsRefuser(object):
642
"""An object that refuses all VFS requests.
647
client._SmartClient.hooks.install_named_hook(
648
'call', self.check_vfs, 'vfs refuser')
650
def check_vfs(self, params):
652
request_method = request.request_handlers.get(params.method)
654
# A method we don't know about doesn't count as a VFS method.
656
if issubclass(request_method, vfs.VfsRequest):
657
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
660
493
class _DebugCounter(object):
661
494
"""An object that counts the HPSS calls made to each client medium.
709
542
value['count'] = 0
710
543
value['vfs_count'] = 0
712
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
713
count, vfs_count, medium_repr))
545
trace.note('HPSS calls: %d (%d vfs) %s',
546
count, vfs_count, medium_repr)
715
548
def flush_all(self):
716
549
for ref in list(self.counts.keys()):
719
552
_debug_counter = None
723
555
class SmartClientMedium(SmartMedium):
740
572
if _debug_counter is None:
741
573
_debug_counter = _DebugCounter()
742
574
_debug_counter.track(self)
743
if 'hpss_client_no_vfs' in debug.debug_flags:
745
if _vfs_refuser is None:
746
_vfs_refuser = _VfsRefuser()
748
576
def _is_remote_before(self, version_tuple):
749
577
"""Is it possible the remote side supports RPCs for a given version?