1
# Copyright (C) 2006-2011 Canonical Ltd
1
# Copyright (C) 2006-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
41
38
from bzrlib import (
48
from bzrlib.i18n import gettext
49
from bzrlib.smart import client, protocol, request, signals, vfs
46
from bzrlib.smart import client, protocol, request, vfs
50
47
from bzrlib.transport import ssh
52
49
from bzrlib import osutils
179
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
190
179
class SmartServerStreamMedium(SmartMedium):
191
180
"""Handles smart commands coming over a stream.
205
194
the stream. See also the _push_back method.
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
def __init__(self, backing_transport, root_client_path='/'):
211
198
"""Construct new server.
213
200
:param backing_transport: Transport for the directory served.
216
203
self.backing_transport = backing_transport
217
204
self.root_client_path = root_client_path
218
205
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
206
SmartMedium.__init__(self)
231
214
while not self.finished:
232
215
server_protocol = self._build_protocol()
233
# TODO: This seems inelegant:
234
if server_protocol is None:
235
# We could 'continue' only to notice that self.finished is
238
216
self._serve_one_request(server_protocol)
239
except errors.ConnectionTimeout, e:
240
trace.note('%s' % (e,))
241
trace.log_exception_quietly()
242
self._disconnect_client()
243
# We reported it, no reason to make a big fuss.
245
217
except Exception, e:
246
218
stderr.write("%s terminating on exception %s\n" % (self, e))
248
self._disconnect_client()
250
def _stop_gracefully(self):
251
"""When we finish this message, stop looking for more."""
252
trace.mutter('Stopping %s' % (self,))
255
def _disconnect_client(self):
256
"""Close the current connection. We stopped due to a timeout/etc."""
257
# The default implementation is a no-op, because that is all we used to
258
# do when disconnecting from a client. I suppose we never had the
259
# *server* initiate a disconnect, before
261
def _wait_for_bytes_with_timeout(self, timeout_seconds):
262
"""Wait for more bytes to be read, but timeout if none available.
264
This allows us to detect idle connections, and stop trying to read from
265
them, without setting the socket itself to non-blocking. This also
266
allows us to specify when we watch for idle timeouts.
268
:return: Did we timeout? (True if we timed out, False if there is data
271
raise NotImplementedError(self._wait_for_bytes_with_timeout)
273
221
def _build_protocol(self):
274
222
"""Identifies the version of the incoming request, and returns an
280
228
:returns: a SmartServerRequestProtocol.
282
self._wait_for_bytes_with_timeout(self._client_timeout)
284
# We're stopping, so don't try to do any more work
286
230
bytes = self._get_line()
287
231
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
288
232
protocol = protocol_factory(
290
234
protocol.accept_bytes(unused_bytes)
293
def _wait_on_descriptor(self, fd, timeout_seconds):
294
"""select() on a file descriptor, waiting for nonblocking read()
296
This will raise a ConnectionTimeout exception if we do not get a
297
readable handle before timeout_seconds.
300
t_end = self._timer() + timeout_seconds
301
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
303
while not rs and not xs and self._timer() < t_end:
307
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
308
except (select.error, socket.error) as e:
309
err = getattr(e, 'errno', None)
310
if err is None and getattr(e, 'args', None) is not None:
311
# select.error doesn't have 'errno', it just has args[0]
313
if err in _bad_file_descriptor:
314
return # Not a socket indicates read() will fail
315
elif err == errno.EINTR:
316
# Interrupted, keep looping.
321
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
322
% (timeout_seconds,))
324
237
def _serve_one_request(self, protocol):
325
238
"""Read one request from input, process, send back a response.
348
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
350
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
354
266
:param sock: the socket the server will read from. It will be put
355
267
into blocking mode.
357
269
SmartServerStreamMedium.__init__(
358
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
360
271
sock.setblocking(True)
361
272
self.socket = sock
362
# Get the getpeername now, as we might be closed later when we care.
364
self._client_info = sock.getpeername()
366
self._client_info = '<unknown>'
369
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
372
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
375
274
def _serve_one_request_unguarded(self, protocol):
376
275
while protocol.next_read_size():
386
285
self._push_back(protocol.unused_data)
388
def _disconnect_client(self):
389
"""Close the current connection. We stopped due to a timeout/etc."""
392
def _wait_for_bytes_with_timeout(self, timeout_seconds):
393
"""Wait for more bytes to be read, but timeout if none available.
395
This allows us to detect idle connections, and stop trying to read from
396
them, without setting the socket itself to non-blocking. This also
397
allows us to specify when we watch for idle timeouts.
399
:return: None, this will raise ConnectionTimeout if we time out before
402
return self._wait_on_descriptor(self.socket, timeout_seconds)
404
287
def _read_bytes(self, desired_count):
405
288
return osutils.read_bytes_from_socket(
406
289
self.socket, self._report_activity)
424
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
426
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
def __init__(self, in_file, out_file, backing_transport):
427
310
"""Construct new server.
429
312
:param in_file: Python file from which requests can be read.
430
313
:param out_file: Python file to write responses.
431
314
:param backing_transport: Transport for the directory served.
433
SmartServerStreamMedium.__init__(self, backing_transport,
316
SmartServerStreamMedium.__init__(self, backing_transport)
435
317
if sys.platform == 'win32':
436
318
# force binary mode for files
442
324
self._in = in_file
443
325
self._out = out_file
446
"""See SmartServerStreamMedium.serve"""
447
# This is the regular serve, except it adds signal trapping for soft
449
stop_gracefully = self._stop_gracefully
450
signals.register_on_hangup(id(self), stop_gracefully)
452
return super(SmartServerPipeStreamMedium, self).serve()
454
signals.unregister_on_hangup(id(self))
456
327
def _serve_one_request_unguarded(self, protocol):
458
329
# We need to be careful not to read past the end of the current
472
343
protocol.accept_bytes(bytes)
474
def _disconnect_client(self):
479
def _wait_for_bytes_with_timeout(self, timeout_seconds):
480
"""Wait for more bytes to be read, but timeout if none available.
482
This allows us to detect idle connections, and stop trying to read from
483
them, without setting the socket itself to non-blocking. This also
484
allows us to specify when we watch for idle timeouts.
486
:return: None, this will raise ConnectionTimeout if we time out before
489
if (getattr(self._in, 'fileno', None) is None
490
or sys.platform == 'win32'):
491
# You can't select() file descriptors on Windows.
493
return self._wait_on_descriptor(self._in, timeout_seconds)
495
345
def _read_bytes(self, desired_count):
496
346
return self._in.read(desired_count)
641
491
return self._medium._get_line()
644
class _VfsRefuser(object):
645
"""An object that refuses all VFS requests.
650
client._SmartClient.hooks.install_named_hook(
651
'call', self.check_vfs, 'vfs refuser')
653
def check_vfs(self, params):
655
request_method = request.request_handlers.get(params.method)
657
# A method we don't know about doesn't count as a VFS method.
659
if issubclass(request_method, vfs.VfsRequest):
660
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
663
494
class _DebugCounter(object):
664
495
"""An object that counts the HPSS calls made to each client medium.
712
543
value['count'] = 0
713
544
value['vfs_count'] = 0
715
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
716
count, vfs_count, medium_repr))
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
718
549
def flush_all(self):
719
550
for ref in list(self.counts.keys()):
722
553
_debug_counter = None
726
556
class SmartClientMedium(SmartMedium):
743
573
if _debug_counter is None:
744
574
_debug_counter = _DebugCounter()
745
575
_debug_counter.track(self)
746
if 'hpss_client_no_vfs' in debug.debug_flags:
748
if _vfs_refuser is None:
749
_vfs_refuser = _VfsRefuser()
751
577
def _is_remote_before(self, version_tuple):
752
578
"""Is it possible the remote side supports RPCs for a given version?