1
# Copyright (C) 2006-2010 Canonical Ltd
1
# Copyright (C) 2006-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
38
41
from bzrlib import (
46
from bzrlib.smart import client, protocol, request, vfs
48
from bzrlib.i18n import gettext
49
from bzrlib.smart import client, protocol, request, signals, vfs
47
50
from bzrlib.transport import ssh
49
52
from bzrlib import osutils
176
179
ui.ui_factory.report_transport_activity(self, bytes, direction)
182
_bad_file_descriptor = (errno.EBADF,)
183
if sys.platform == 'win32':
184
# Given on Windows if you pass a closed socket to select.select. Probably
185
# also given if you pass a file handle to select.
187
_bad_file_descriptor += (WSAENOTSOCK,)
179
190
class SmartServerStreamMedium(SmartMedium):
180
191
"""Handles smart commands coming over a stream.
194
205
the stream. See also the _push_back method.
197
def __init__(self, backing_transport, root_client_path='/'):
210
def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
211
"""Construct new server.
200
213
:param backing_transport: Transport for the directory served.
203
216
self.backing_transport = backing_transport
204
217
self.root_client_path = root_client_path
205
218
self.finished = False
220
raise AssertionError('You must supply a timeout.')
221
self._client_timeout = timeout
222
self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
223
SmartMedium.__init__(self)
214
231
while not self.finished:
215
232
server_protocol = self._build_protocol()
216
233
self._serve_one_request(server_protocol)
234
except errors.ConnectionTimeout, e:
235
trace.note('%s' % (e,))
236
trace.log_exception_quietly()
237
self._disconnect_client()
238
# We reported it, no reason to make a big fuss.
217
240
except Exception, e:
218
241
stderr.write("%s terminating on exception %s\n" % (self, e))
243
self._disconnect_client()
245
def _stop_gracefully(self):
246
"""When we finish this message, stop looking for more."""
247
trace.mutter('Stopping %s' % (self,))
250
def _disconnect_client(self):
251
"""Close the current connection. We stopped due to a timeout/etc."""
252
# The default implementation is a no-op, because that is all we used to
253
# do when disconnecting from a client. I suppose we never had the
254
# *server* initiate a disconnect, before
256
def _wait_for_bytes_with_timeout(self, timeout_seconds):
257
"""Wait for more bytes to be read, but timeout if none available.
259
This allows us to detect idle connections, and stop trying to read from
260
them, without setting the socket itself to non-blocking. This also
261
allows us to specify when we watch for idle timeouts.
263
:return: Did we timeout? (True if we timed out, False if there is data
266
raise NotImplementedError(self._wait_for_bytes_with_timeout)
221
268
def _build_protocol(self):
222
269
"""Identifies the version of the incoming request, and returns an
228
275
:returns: a SmartServerRequestProtocol.
277
self._wait_for_bytes_with_timeout(self._client_timeout)
279
# We're stopping, so don't try to do any more work
230
281
bytes = self._get_line()
231
282
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
283
protocol = protocol_factory(
234
285
protocol.accept_bytes(unused_bytes)
288
def _wait_on_descriptor(self, fd, timeout_seconds):
289
"""select() on a file descriptor, waiting for nonblocking read()
291
This will raise a ConnectionTimeout exception if we do not get a
292
readable handle before timeout_seconds.
295
t_end = self._timer() + timeout_seconds
296
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
298
while not rs and not xs and self._timer() < t_end:
302
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
303
except (select.error, socket.error) as e:
304
err = getattr(e, 'errno', None)
305
if err is None and getattr(e, 'args', None) is not None:
306
# select.error doesn't have 'errno', it just has args[0]
308
if err in _bad_file_descriptor:
309
return # Not a socket indicates read() will fail
310
elif err == errno.EINTR:
311
# Interrupted, keep looping.
316
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
317
% (timeout_seconds,))
237
319
def _serve_one_request(self, protocol):
238
320
"""Read one request from input, process, send back a response.
240
322
:param protocol: a SmartServerRequestProtocol.
243
327
self._serve_one_request_unguarded(protocol)
244
328
except KeyboardInterrupt:
261
345
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
263
def __init__(self, sock, backing_transport, root_client_path='/'):
347
def __init__(self, sock, backing_transport, root_client_path='/',
266
351
:param sock: the socket the server will read from. It will be put
267
352
into blocking mode.
269
354
SmartServerStreamMedium.__init__(
270
self, backing_transport, root_client_path=root_client_path)
355
self, backing_transport, root_client_path=root_client_path,
271
357
sock.setblocking(True)
272
358
self.socket = sock
359
# Get the getpeername now, as we might be closed later when we care.
361
self._client_info = sock.getpeername()
363
self._client_info = '<unknown>'
366
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
369
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
274
372
def _serve_one_request_unguarded(self, protocol):
275
373
while protocol.next_read_size():
285
383
self._push_back(protocol.unused_data)
385
def _disconnect_client(self):
386
"""Close the current connection. We stopped due to a timeout/etc."""
389
def _wait_for_bytes_with_timeout(self, timeout_seconds):
390
"""Wait for more bytes to be read, but timeout if none available.
392
This allows us to detect idle connections, and stop trying to read from
393
them, without setting the socket itself to non-blocking. This also
394
allows us to specify when we watch for idle timeouts.
396
:return: None, this will raise ConnectionTimeout if we time out before
399
return self._wait_on_descriptor(self.socket, timeout_seconds)
287
401
def _read_bytes(self, desired_count):
288
402
return osutils.read_bytes_from_socket(
289
403
self.socket, self._report_activity)
307
421
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
309
def __init__(self, in_file, out_file, backing_transport):
423
def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
424
"""Construct new server.
312
426
:param in_file: Python file from which requests can be read.
313
427
:param out_file: Python file to write responses.
314
428
:param backing_transport: Transport for the directory served.
316
SmartServerStreamMedium.__init__(self, backing_transport)
430
SmartServerStreamMedium.__init__(self, backing_transport,
317
432
if sys.platform == 'win32':
318
433
# force binary mode for files
324
439
self._in = in_file
325
440
self._out = out_file
443
"""See SmartServerStreamMedium.serve"""
444
# This is the regular serve, except it adds signal trapping for soft
446
stop_gracefully = self._stop_gracefully
447
signals.register_on_hangup(id(self), stop_gracefully)
449
return super(SmartServerPipeStreamMedium, self).serve()
451
signals.unregister_on_hangup(id(self))
327
453
def _serve_one_request_unguarded(self, protocol):
329
455
# We need to be careful not to read past the end of the current
343
469
protocol.accept_bytes(bytes)
471
def _disconnect_client(self):
476
def _wait_for_bytes_with_timeout(self, timeout_seconds):
477
"""Wait for more bytes to be read, but timeout if none available.
479
This allows us to detect idle connections, and stop trying to read from
480
them, without setting the socket itself to non-blocking. This also
481
allows us to specify when we watch for idle timeouts.
483
:return: None, this will raise ConnectionTimeout if we time out before
486
if (getattr(self._in, 'fileno', None) is None
487
or sys.platform == 'win32'):
488
# You can't select() file descriptors on Windows.
490
return self._wait_on_descriptor(self._in, timeout_seconds)
345
492
def _read_bytes(self, desired_count):
346
493
return self._in.read(desired_count)
491
638
return self._medium._get_line()
641
class _VfsRefuser(object):
642
"""An object that refuses all VFS requests.
647
client._SmartClient.hooks.install_named_hook(
648
'call', self.check_vfs, 'vfs refuser')
650
def check_vfs(self, params):
652
request_method = request.request_handlers.get(params.method)
654
# A method we don't know about doesn't count as a VFS method.
656
if issubclass(request_method, vfs.VfsRequest):
657
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
494
660
class _DebugCounter(object):
495
661
"""An object that counts the HPSS calls made to each client medium.
543
709
value['count'] = 0
544
710
value['vfs_count'] = 0
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
712
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
713
count, vfs_count, medium_repr))
549
715
def flush_all(self):
550
716
for ref in list(self.counts.keys()):
553
719
_debug_counter = None
556
723
class SmartClientMedium(SmartMedium):
573
740
if _debug_counter is None:
574
741
_debug_counter = _DebugCounter()
575
742
_debug_counter.track(self)
743
if 'hpss_client_no_vfs' in debug.debug_flags:
745
if _vfs_refuser is None:
746
_vfs_refuser = _VfsRefuser()
577
748
def _is_remote_before(self, version_tuple):
578
749
"""Is it possible the remote side supports RPCs for a given version?
711
882
return SmartClientStreamMediumRequest(self)
885
"""We have been disconnected, reset current state.
887
This resets things like _current_request and connected state.
890
self._current_request = None
714
893
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
715
894
"""A client medium using simple pipes.
725
904
def _accept_bytes(self, bytes):
726
905
"""See SmartClientStreamMedium.accept_bytes."""
727
self._writeable_pipe.write(bytes)
907
self._writeable_pipe.write(bytes)
909
if e.errno in (errno.EINVAL, errno.EPIPE):
910
raise errors.ConnectionReset(
911
"Error trying to write to subprocess:\n%s" % (e,))
728
913
self._report_activity(len(bytes), 'write')
730
915
def _flush(self):
731
916
"""See SmartClientStreamMedium._flush()."""
917
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
918
# However, testing shows that even when the child process is
919
# gone, this doesn't error.
732
920
self._writeable_pipe.flush()
734
922
def _read_bytes(self, count):
754
942
class SmartSSHClientMedium(SmartClientStreamMedium):
755
943
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
945
It delegates IO to a SmartSimplePipesClientMedium or
758
946
SmartClientAlreadyConnectedSocketMedium (depending on platform).
784
972
maybe_port = ':%s' % self._ssh_params.port
785
return "%s(%s://%s@%s%s/)" % (
973
if self._ssh_params.username is None:
976
maybe_user = '%s@' % self._ssh_params.username
977
return "%s(%s://%s%s%s/)" % (
786
978
self.__class__.__name__,
788
self._ssh_params.username,
789
981
self._ssh_params.host,