45
from bzrlib.smart import client, protocol, request, vfs
48
from bzrlib.i18n import gettext
49
from bzrlib.smart import client, protocol, request, signals, vfs
46
50
from bzrlib.transport import ssh
48
52
from bzrlib import osutils
175
179
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
178
190
class SmartServerStreamMedium(SmartMedium):
179
191
"""Handles smart commands coming over a stream.
193
205
the stream. See also the _push_back method.
196
def __init__(self, backing_transport, root_client_path='/'):
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
211
"""Construct new server.
199
213
:param backing_transport: Transport for the directory served.
202
216
self.backing_transport = backing_transport
203
217
self.root_client_path = root_client_path
204
218
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
205
223
SmartMedium.__init__(self)
213
231
while not self.finished:
214
232
server_protocol = self._build_protocol()
215
233
self._serve_one_request(server_protocol)
234
except errors.ConnectionTimeout, e:
235
trace.note('%s' % (e,))
236
trace.log_exception_quietly()
237
self._disconnect_client()
238
# We reported it, no reason to make a big fuss.
216
240
except Exception, e:
217
241
stderr.write("%s terminating on exception %s\n" % (self, e))
243
self._disconnect_client()
245
def _stop_gracefully(self):
246
"""When we finish this message, stop looking for more."""
247
trace.mutter('Stopping %s' % (self,))
250
def _disconnect_client(self):
251
"""Close the current connection. We stopped due to a timeout/etc."""
252
# The default implementation is a no-op, because that is all we used to
253
# do when disconnecting from a client. I suppose we never had the
254
# *server* initiate a disconnect, before
256
def _wait_for_bytes_with_timeout(self, timeout_seconds):
257
"""Wait for more bytes to be read, but timeout if none available.
259
This allows us to detect idle connections, and stop trying to read from
260
them, without setting the socket itself to non-blocking. This also
261
allows us to specify when we watch for idle timeouts.
263
:return: Did we timeout? (True if we timed out, False if there is data
266
raise NotImplementedError(self._wait_for_bytes_with_timeout)
220
268
def _build_protocol(self):
221
269
"""Identifies the version of the incoming request, and returns an
227
275
:returns: a SmartServerRequestProtocol.
277
self._wait_for_bytes_with_timeout(self._client_timeout)
279
# We're stopping, so don't try to do any more work
229
281
bytes = self._get_line()
230
282
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
231
283
protocol = protocol_factory(
233
285
protocol.accept_bytes(unused_bytes)
288
def _wait_on_descriptor(self, fd, timeout_seconds):
289
"""select() on a file descriptor, waiting for nonblocking read()
291
This will raise a ConnectionTimeout exception if we do not get a
292
readable handle before timeout_seconds.
295
t_end = self._timer() + timeout_seconds
296
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
298
while not rs and not xs and self._timer() < t_end:
302
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
303
except (select.error, socket.error) as e:
304
err = getattr(e, 'errno', None)
305
if err is None and getattr(e, 'args', None) is not None:
306
# select.error doesn't have 'errno', it just has args[0]
308
if err in _bad_file_descriptor:
309
return # Not a socket indicates read() will fail
310
elif err == errno.EINTR:
311
# Interrupted, keep looping.
316
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
317
% (timeout_seconds,))
236
319
def _serve_one_request(self, protocol):
237
320
"""Read one request from input, process, send back a response.
239
322
:param protocol: a SmartServerRequestProtocol.
242
327
self._serve_one_request_unguarded(protocol)
243
328
except KeyboardInterrupt:
260
345
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
262
def __init__(self, sock, backing_transport, root_client_path='/'):
347
def __init__(self, sock, backing_transport, root_client_path='/',
265
351
:param sock: the socket the server will read from. It will be put
266
352
into blocking mode.
268
354
SmartServerStreamMedium.__init__(
269
self, backing_transport, root_client_path=root_client_path)
355
self, backing_transport, root_client_path=root_client_path,
270
357
sock.setblocking(True)
271
358
self.socket = sock
359
# Get the getpeername now, as we might be closed later when we care.
361
self._client_info = sock.getpeername()
363
self._client_info = '<unknown>'
366
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
369
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
273
372
def _serve_one_request_unguarded(self, protocol):
274
373
while protocol.next_read_size():
284
383
self._push_back(protocol.unused_data)
385
def _disconnect_client(self):
386
"""Close the current connection. We stopped due to a timeout/etc."""
389
def _wait_for_bytes_with_timeout(self, timeout_seconds):
390
"""Wait for more bytes to be read, but timeout if none available.
392
This allows us to detect idle connections, and stop trying to read from
393
them, without setting the socket itself to non-blocking. This also
394
allows us to specify when we watch for idle timeouts.
396
:return: None, this will raise ConnectionTimeout if we time out before
399
return self._wait_on_descriptor(self.socket, timeout_seconds)
286
401
def _read_bytes(self, desired_count):
287
402
return osutils.read_bytes_from_socket(
288
403
self.socket, self._report_activity)
306
421
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
308
def __init__(self, in_file, out_file, backing_transport):
423
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
424
"""Construct new server.
311
426
:param in_file: Python file from which requests can be read.
312
427
:param out_file: Python file to write responses.
313
428
:param backing_transport: Transport for the directory served.
315
SmartServerStreamMedium.__init__(self, backing_transport)
430
SmartServerStreamMedium.__init__(self, backing_transport,
316
432
if sys.platform == 'win32':
317
433
# force binary mode for files
323
439
self._in = in_file
324
440
self._out = out_file
443
"""See SmartServerStreamMedium.serve"""
444
# This is the regular serve, except it adds signal trapping for soft
446
stop_gracefully = self._stop_gracefully
447
signals.register_on_hangup(id(self), stop_gracefully)
449
return super(SmartServerPipeStreamMedium, self).serve()
451
signals.unregister_on_hangup(id(self))
326
453
def _serve_one_request_unguarded(self, protocol):
328
455
# We need to be careful not to read past the end of the current
342
469
protocol.accept_bytes(bytes)
471
def _disconnect_client(self):
476
def _wait_for_bytes_with_timeout(self, timeout_seconds):
477
"""Wait for more bytes to be read, but timeout if none available.
479
This allows us to detect idle connections, and stop trying to read from
480
them, without setting the socket itself to non-blocking. This also
481
allows us to specify when we watch for idle timeouts.
483
:return: None, this will raise ConnectionTimeout if we time out before
486
if (getattr(self._in, 'fileno', None) is None
487
or sys.platform == 'win32'):
488
# You can't select() file descriptors on Windows.
490
return self._wait_on_descriptor(self._in, timeout_seconds)
344
492
def _read_bytes(self, desired_count):
345
493
return self._in.read(desired_count)
561
709
value['count'] = 0
562
710
value['vfs_count'] = 0
564
trace.note('HPSS calls: %d (%d vfs) %s',
565
count, vfs_count, medium_repr)
712
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
713
count, vfs_count, medium_repr))
567
715
def flush_all(self):
568
716
for ref in list(self.counts.keys()):