1
# Copyright (C) 2006-2010 Canonical Ltd
1
# Copyright (C) 2006-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
27
from __future__ import absolute_import
32
35
from bzrlib.lazy_import import lazy_import
33
36
lazy_import(globals(), """
38
42
from bzrlib import (
46
from bzrlib.smart import client, protocol, request, vfs
50
from bzrlib.i18n import gettext
51
from bzrlib.smart import client, protocol, request, signals, vfs
47
52
from bzrlib.transport import ssh
49
54
from bzrlib import osutils
176
181
ui.ui_factory.report_transport_activity(self, bytes, direction)
184
_bad_file_descriptor = (errno.EBADF,)
185
if sys.platform == 'win32':
186
# Given on Windows if you pass a closed socket to select.select. Probably
187
# also given if you pass a file handle to select.
189
_bad_file_descriptor += (WSAENOTSOCK,)
179
192
class SmartServerStreamMedium(SmartMedium):
180
193
"""Handles smart commands coming over a stream.
194
207
the stream. See also the _push_back method.
197
def __init__(self, backing_transport, root_client_path='/'):
212
def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
213
"""Construct new server.
200
215
:param backing_transport: Transport for the directory served.
203
218
self.backing_transport = backing_transport
204
219
self.root_client_path = root_client_path
205
220
self.finished = False
222
raise AssertionError('You must supply a timeout.')
223
self._client_timeout = timeout
224
self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
225
SmartMedium.__init__(self)
214
233
while not self.finished:
215
234
server_protocol = self._build_protocol()
216
235
self._serve_one_request(server_protocol)
236
except errors.ConnectionTimeout, e:
237
trace.note('%s' % (e,))
238
trace.log_exception_quietly()
239
self._disconnect_client()
240
# We reported it, no reason to make a big fuss.
217
242
except Exception, e:
218
243
stderr.write("%s terminating on exception %s\n" % (self, e))
245
self._disconnect_client()
247
def _stop_gracefully(self):
248
"""When we finish this message, stop looking for more."""
249
trace.mutter('Stopping %s' % (self,))
252
def _disconnect_client(self):
253
"""Close the current connection. We stopped due to a timeout/etc."""
254
# The default implementation is a no-op, because that is all we used to
255
# do when disconnecting from a client. I suppose we never had the
256
# *server* initiate a disconnect, before
258
def _wait_for_bytes_with_timeout(self, timeout_seconds):
259
"""Wait for more bytes to be read, but timeout if none available.
261
This allows us to detect idle connections, and stop trying to read from
262
them, without setting the socket itself to non-blocking. This also
263
allows us to specify when we watch for idle timeouts.
265
:return: Did we timeout? (True if we timed out, False if there is data
268
raise NotImplementedError(self._wait_for_bytes_with_timeout)
221
270
def _build_protocol(self):
222
271
"""Identifies the version of the incoming request, and returns an
228
277
:returns: a SmartServerRequestProtocol.
279
self._wait_for_bytes_with_timeout(self._client_timeout)
281
# We're stopping, so don't try to do any more work
230
283
bytes = self._get_line()
231
284
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
285
protocol = protocol_factory(
234
287
protocol.accept_bytes(unused_bytes)
290
def _wait_on_descriptor(self, fd, timeout_seconds):
291
"""select() on a file descriptor, waiting for nonblocking read()
293
This will raise a ConnectionTimeout exception if we do not get a
294
readable handle before timeout_seconds.
297
t_end = self._timer() + timeout_seconds
298
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
300
while not rs and not xs and self._timer() < t_end:
304
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
305
except (select.error, socket.error) as e:
306
err = getattr(e, 'errno', None)
307
if err is None and getattr(e, 'args', None) is not None:
308
# select.error doesn't have 'errno', it just has args[0]
310
if err in _bad_file_descriptor:
311
return # Not a socket indicates read() will fail
312
elif err == errno.EINTR:
313
# Interrupted, keep looping.
318
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
319
% (timeout_seconds,))
237
321
def _serve_one_request(self, protocol):
238
322
"""Read one request from input, process, send back a response.
240
324
:param protocol: a SmartServerRequestProtocol.
243
329
self._serve_one_request_unguarded(protocol)
244
330
except KeyboardInterrupt:
261
347
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
263
def __init__(self, sock, backing_transport, root_client_path='/'):
349
def __init__(self, sock, backing_transport, root_client_path='/',
266
353
:param sock: the socket the server will read from. It will be put
267
354
into blocking mode.
269
356
SmartServerStreamMedium.__init__(
270
self, backing_transport, root_client_path=root_client_path)
357
self, backing_transport, root_client_path=root_client_path,
271
359
sock.setblocking(True)
272
360
self.socket = sock
361
# Get the getpeername now, as we might be closed later when we care.
363
self._client_info = sock.getpeername()
365
self._client_info = '<unknown>'
368
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
371
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
274
374
def _serve_one_request_unguarded(self, protocol):
275
375
while protocol.next_read_size():
285
385
self._push_back(protocol.unused_data)
387
def _disconnect_client(self):
388
"""Close the current connection. We stopped due to a timeout/etc."""
391
def _wait_for_bytes_with_timeout(self, timeout_seconds):
392
"""Wait for more bytes to be read, but timeout if none available.
394
This allows us to detect idle connections, and stop trying to read from
395
them, without setting the socket itself to non-blocking. This also
396
allows us to specify when we watch for idle timeouts.
398
:return: None, this will raise ConnectionTimeout if we time out before
401
return self._wait_on_descriptor(self.socket, timeout_seconds)
287
403
def _read_bytes(self, desired_count):
288
404
return osutils.read_bytes_from_socket(
289
405
self.socket, self._report_activity)
307
423
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
309
def __init__(self, in_file, out_file, backing_transport):
425
def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
426
"""Construct new server.
312
428
:param in_file: Python file from which requests can be read.
313
429
:param out_file: Python file to write responses.
314
430
:param backing_transport: Transport for the directory served.
316
SmartServerStreamMedium.__init__(self, backing_transport)
432
SmartServerStreamMedium.__init__(self, backing_transport,
317
434
if sys.platform == 'win32':
318
435
# force binary mode for files
324
441
self._in = in_file
325
442
self._out = out_file
445
"""See SmartServerStreamMedium.serve"""
446
# This is the regular serve, except it adds signal trapping for soft
448
stop_gracefully = self._stop_gracefully
449
signals.register_on_hangup(id(self), stop_gracefully)
451
return super(SmartServerPipeStreamMedium, self).serve()
453
signals.unregister_on_hangup(id(self))
327
455
def _serve_one_request_unguarded(self, protocol):
329
457
# We need to be careful not to read past the end of the current
343
471
protocol.accept_bytes(bytes)
473
def _disconnect_client(self):
478
def _wait_for_bytes_with_timeout(self, timeout_seconds):
479
"""Wait for more bytes to be read, but timeout if none available.
481
This allows us to detect idle connections, and stop trying to read from
482
them, without setting the socket itself to non-blocking. This also
483
allows us to specify when we watch for idle timeouts.
485
:return: None, this will raise ConnectionTimeout if we time out before
488
if (getattr(self._in, 'fileno', None) is None
489
or sys.platform == 'win32'):
490
# You can't select() file descriptors on Windows.
492
return self._wait_on_descriptor(self._in, timeout_seconds)
345
494
def _read_bytes(self, desired_count):
346
495
return self._in.read(desired_count)
491
640
return self._medium._get_line()
643
class _VfsRefuser(object):
644
"""An object that refuses all VFS requests.
649
client._SmartClient.hooks.install_named_hook(
650
'call', self.check_vfs, 'vfs refuser')
652
def check_vfs(self, params):
654
request_method = request.request_handlers.get(params.method)
656
# A method we don't know about doesn't count as a VFS method.
658
if issubclass(request_method, vfs.VfsRequest):
659
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
494
662
class _DebugCounter(object):
495
663
"""An object that counts the HPSS calls made to each client medium.
543
711
value['count'] = 0
544
712
value['vfs_count'] = 0
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
714
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
715
count, vfs_count, medium_repr))
549
717
def flush_all(self):
550
718
for ref in list(self.counts.keys()):
553
721
_debug_counter = None
556
725
class SmartClientMedium(SmartMedium):
573
742
if _debug_counter is None:
574
743
_debug_counter = _DebugCounter()
575
744
_debug_counter.track(self)
745
if 'hpss_client_no_vfs' in debug.debug_flags:
747
if _vfs_refuser is None:
748
_vfs_refuser = _VfsRefuser()
577
750
def _is_remote_before(self, version_tuple):
578
751
"""Is it possible the remote side supports RPCs for a given version?
670
843
medium_base = urlutils.join(self.base, '/')
671
844
rel_url = urlutils.relative_url(medium_base, transport.base)
672
return urllib.unquote(rel_url)
845
return urlutils.unquote(rel_url)
675
848
class SmartClientStreamMedium(SmartClientMedium):
711
884
return SmartClientStreamMediumRequest(self)
887
"""We have been disconnected, reset current state.
889
This resets things like _current_request and connected state.
892
self._current_request = None
714
895
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
715
896
"""A client medium using simple pipes.
725
906
def _accept_bytes(self, bytes):
726
907
"""See SmartClientStreamMedium.accept_bytes."""
727
self._writeable_pipe.write(bytes)
909
self._writeable_pipe.write(bytes)
911
if e.errno in (errno.EINVAL, errno.EPIPE):
912
raise errors.ConnectionReset(
913
"Error trying to write to subprocess", e)
728
915
self._report_activity(len(bytes), 'write')
730
917
def _flush(self):
731
918
"""See SmartClientStreamMedium._flush()."""
919
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
920
# However, testing shows that even when the child process is
921
# gone, this doesn't error.
732
922
self._writeable_pipe.flush()
734
924
def _read_bytes(self, count):
754
944
class SmartSSHClientMedium(SmartClientStreamMedium):
755
945
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
947
It delegates IO to a SmartSimplePipesClientMedium or
758
948
SmartClientAlreadyConnectedSocketMedium (depending on platform).
784
974
maybe_port = ':%s' % self._ssh_params.port
785
return "%s(%s://%s@%s%s/)" % (
975
if self._ssh_params.username is None:
978
maybe_user = '%s@' % self._ssh_params.username
979
return "%s(%s://%s%s%s/)" % (
786
980
self.__class__.__name__,
788
self._ssh_params.username,
789
983
self._ssh_params.host,
828
1022
raise AssertionError(
829
1023
"Unexpected io_kind %r from %r"
830
1024
% (io_kind, self._ssh_connection))
1025
for hook in transport.Transport.hooks["post_connect"]:
832
1028
def _flush(self):
833
1029
"""See SmartClientStreamMedium._flush()."""
936
1132
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
937
1133
(self._host, port, err_msg))
938
1134
self._connected = True
1135
for hook in transport.Transport.hooks["post_connect"]:
941
1139
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):