1
# Copyright (C) 2006-2011 Canonical Ltd
1
# Copyright (C) 2006-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
41
38
from bzrlib import (
48
from bzrlib.i18n import gettext
49
from bzrlib.smart import client, protocol, request, signals, vfs
46
from bzrlib.smart import client, protocol, request, vfs
50
47
from bzrlib.transport import ssh
52
49
from bzrlib import osutils
179
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
190
179
class SmartServerStreamMedium(SmartMedium):
191
180
"""Handles smart commands coming over a stream.
205
194
the stream. See also the _push_back method.
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
def __init__(self, backing_transport, root_client_path='/'):
211
198
"""Construct new server.
213
200
:param backing_transport: Transport for the directory served.
216
203
self.backing_transport = backing_transport
217
204
self.root_client_path = root_client_path
218
205
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
206
SmartMedium.__init__(self)
231
214
while not self.finished:
232
215
server_protocol = self._build_protocol()
233
216
self._serve_one_request(server_protocol)
234
except errors.ConnectionTimeout, e:
235
trace.note('%s' % (e,))
236
trace.log_exception_quietly()
237
self._disconnect_client()
238
# We reported it, no reason to make a big fuss.
240
217
except Exception, e:
241
218
stderr.write("%s terminating on exception %s\n" % (self, e))
243
self._disconnect_client()
245
def _stop_gracefully(self):
246
"""When we finish this message, stop looking for more."""
247
trace.mutter('Stopping %s' % (self,))
250
def _disconnect_client(self):
251
"""Close the current connection. We stopped due to a timeout/etc."""
252
# The default implementation is a no-op, because that is all we used to
253
# do when disconnecting from a client. I suppose we never had the
254
# *server* initiate a disconnect, before
256
def _wait_for_bytes_with_timeout(self, timeout_seconds):
257
"""Wait for more bytes to be read, but timeout if none available.
259
This allows us to detect idle connections, and stop trying to read from
260
them, without setting the socket itself to non-blocking. This also
261
allows us to specify when we watch for idle timeouts.
263
:return: Did we timeout? (True if we timed out, False if there is data
266
raise NotImplementedError(self._wait_for_bytes_with_timeout)
268
221
def _build_protocol(self):
269
222
"""Identifies the version of the incoming request, and returns an
275
228
:returns: a SmartServerRequestProtocol.
277
self._wait_for_bytes_with_timeout(self._client_timeout)
279
# We're stopping, so don't try to do any more work
281
230
bytes = self._get_line()
282
231
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
283
232
protocol = protocol_factory(
285
234
protocol.accept_bytes(unused_bytes)
288
def _wait_on_descriptor(self, fd, timeout_seconds):
289
"""select() on a file descriptor, waiting for nonblocking read()
291
This will raise a ConnectionTimeout exception if we do not get a
292
readable handle before timeout_seconds.
295
t_end = self._timer() + timeout_seconds
296
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
298
while not rs and not xs and self._timer() < t_end:
302
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
303
except (select.error, socket.error) as e:
304
err = getattr(e, 'errno', None)
305
if err is None and getattr(e, 'args', None) is not None:
306
# select.error doesn't have 'errno', it just has args[0]
308
if err in _bad_file_descriptor:
309
return # Not a socket indicates read() will fail
310
elif err == errno.EINTR:
311
# Interrupted, keep looping.
316
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
317
% (timeout_seconds,))
319
237
def _serve_one_request(self, protocol):
320
238
"""Read one request from input, process, send back a response.
322
240
:param protocol: a SmartServerRequestProtocol.
327
243
self._serve_one_request_unguarded(protocol)
328
244
except KeyboardInterrupt:
345
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
347
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
351
266
:param sock: the socket the server will read from. It will be put
352
267
into blocking mode.
354
269
SmartServerStreamMedium.__init__(
355
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
357
271
sock.setblocking(True)
358
272
self.socket = sock
359
# Get the getpeername now, as we might be closed later when we care.
361
self._client_info = sock.getpeername()
363
self._client_info = '<unknown>'
366
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
369
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
372
274
def _serve_one_request_unguarded(self, protocol):
373
275
while protocol.next_read_size():
383
285
self._push_back(protocol.unused_data)
385
def _disconnect_client(self):
386
"""Close the current connection. We stopped due to a timeout/etc."""
389
def _wait_for_bytes_with_timeout(self, timeout_seconds):
390
"""Wait for more bytes to be read, but timeout if none available.
392
This allows us to detect idle connections, and stop trying to read from
393
them, without setting the socket itself to non-blocking. This also
394
allows us to specify when we watch for idle timeouts.
396
:return: None, this will raise ConnectionTimeout if we time out before
399
return self._wait_on_descriptor(self.socket, timeout_seconds)
401
287
def _read_bytes(self, desired_count):
402
288
return osutils.read_bytes_from_socket(
403
289
self.socket, self._report_activity)
421
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
423
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
def __init__(self, in_file, out_file, backing_transport):
424
310
"""Construct new server.
426
312
:param in_file: Python file from which requests can be read.
427
313
:param out_file: Python file to write responses.
428
314
:param backing_transport: Transport for the directory served.
430
SmartServerStreamMedium.__init__(self, backing_transport,
316
SmartServerStreamMedium.__init__(self, backing_transport)
432
317
if sys.platform == 'win32':
433
318
# force binary mode for files
439
324
self._in = in_file
440
325
self._out = out_file
443
"""See SmartServerStreamMedium.serve"""
444
# This is the regular serve, except it adds signal trapping for soft
446
stop_gracefully = self._stop_gracefully
447
signals.register_on_hangup(id(self), stop_gracefully)
449
return super(SmartServerPipeStreamMedium, self).serve()
451
signals.unregister_on_hangup(id(self))
453
327
def _serve_one_request_unguarded(self, protocol):
455
329
# We need to be careful not to read past the end of the current
469
343
protocol.accept_bytes(bytes)
471
def _disconnect_client(self):
476
def _wait_for_bytes_with_timeout(self, timeout_seconds):
477
"""Wait for more bytes to be read, but timeout if none available.
479
This allows us to detect idle connections, and stop trying to read from
480
them, without setting the socket itself to non-blocking. This also
481
allows us to specify when we watch for idle timeouts.
483
:return: None, this will raise ConnectionTimeout if we time out before
486
if (getattr(self._in, 'fileno', None) is None
487
or sys.platform == 'win32'):
488
# You can't select() file descriptors on Windows.
490
return self._wait_on_descriptor(self._in, timeout_seconds)
492
345
def _read_bytes(self, desired_count):
493
346
return self._in.read(desired_count)
638
491
return self._medium._get_line()
641
class _VfsRefuser(object):
642
"""An object that refuses all VFS requests.
647
client._SmartClient.hooks.install_named_hook(
648
'call', self.check_vfs, 'vfs refuser')
650
def check_vfs(self, params):
652
request_method = request.request_handlers.get(params.method)
654
# A method we don't know about doesn't count as a VFS method.
656
if issubclass(request_method, vfs.VfsRequest):
657
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
660
494
class _DebugCounter(object):
661
495
"""An object that counts the HPSS calls made to each client medium.
709
543
value['count'] = 0
710
544
value['vfs_count'] = 0
712
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
713
count, vfs_count, medium_repr))
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
715
549
def flush_all(self):
716
550
for ref in list(self.counts.keys()):
719
553
_debug_counter = None
723
556
class SmartClientMedium(SmartMedium):
740
573
if _debug_counter is None:
741
574
_debug_counter = _DebugCounter()
742
575
_debug_counter.track(self)
743
if 'hpss_client_no_vfs' in debug.debug_flags:
745
if _vfs_refuser is None:
746
_vfs_refuser = _VfsRefuser()
748
577
def _is_remote_before(self, version_tuple):
749
578
"""Is it possible the remote side supports RPCs for a given version?