1
# Copyright (C) 2006-2011 Canonical Ltd
1
# Copyright (C) 2006-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
27
from __future__ import absolute_import
35
32
from bzrlib.lazy_import import lazy_import
36
33
lazy_import(globals(), """
42
38
from bzrlib import (
50
from bzrlib.i18n import gettext
51
from bzrlib.smart import client, protocol, request, signals, vfs
46
from bzrlib.smart import client, protocol, request, vfs
52
47
from bzrlib.transport import ssh
54
49
from bzrlib import osutils
181
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
184
_bad_file_descriptor = (errno.EBADF,)
185
if sys.platform == 'win32':
186
# Given on Windows if you pass a closed socket to select.select. Probably
187
# also given if you pass a file handle to select.
189
_bad_file_descriptor += (WSAENOTSOCK,)
192
179
class SmartServerStreamMedium(SmartMedium):
193
180
"""Handles smart commands coming over a stream.
207
194
the stream. See also the _push_back method.
212
def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
def __init__(self, backing_transport, root_client_path='/'):
213
198
"""Construct new server.
215
200
:param backing_transport: Transport for the directory served.
218
203
self.backing_transport = backing_transport
219
204
self.root_client_path = root_client_path
220
205
self.finished = False
222
raise AssertionError('You must supply a timeout.')
223
self._client_timeout = timeout
224
self._client_poll_timeout = min(timeout / 10.0, 1.0)
225
206
SmartMedium.__init__(self)
233
214
while not self.finished:
234
215
server_protocol = self._build_protocol()
235
216
self._serve_one_request(server_protocol)
236
except errors.ConnectionTimeout, e:
237
trace.note('%s' % (e,))
238
trace.log_exception_quietly()
239
self._disconnect_client()
240
# We reported it, no reason to make a big fuss.
242
217
except Exception, e:
243
218
stderr.write("%s terminating on exception %s\n" % (self, e))
245
self._disconnect_client()
247
def _stop_gracefully(self):
248
"""When we finish this message, stop looking for more."""
249
trace.mutter('Stopping %s' % (self,))
252
def _disconnect_client(self):
253
"""Close the current connection. We stopped due to a timeout/etc."""
254
# The default implementation is a no-op, because that is all we used to
255
# do when disconnecting from a client. I suppose we never had the
256
# *server* initiate a disconnect, before
258
def _wait_for_bytes_with_timeout(self, timeout_seconds):
259
"""Wait for more bytes to be read, but timeout if none available.
261
This allows us to detect idle connections, and stop trying to read from
262
them, without setting the socket itself to non-blocking. This also
263
allows us to specify when we watch for idle timeouts.
265
:return: Did we timeout? (True if we timed out, False if there is data
268
raise NotImplementedError(self._wait_for_bytes_with_timeout)
270
221
def _build_protocol(self):
271
222
"""Identifies the version of the incoming request, and returns an
277
228
:returns: a SmartServerRequestProtocol.
279
self._wait_for_bytes_with_timeout(self._client_timeout)
281
# We're stopping, so don't try to do any more work
283
230
bytes = self._get_line()
284
231
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
285
232
protocol = protocol_factory(
287
234
protocol.accept_bytes(unused_bytes)
290
def _wait_on_descriptor(self, fd, timeout_seconds):
291
"""select() on a file descriptor, waiting for nonblocking read()
293
This will raise a ConnectionTimeout exception if we do not get a
294
readable handle before timeout_seconds.
297
t_end = self._timer() + timeout_seconds
298
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
300
while not rs and not xs and self._timer() < t_end:
304
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
305
except (select.error, socket.error) as e:
306
err = getattr(e, 'errno', None)
307
if err is None and getattr(e, 'args', None) is not None:
308
# select.error doesn't have 'errno', it just has args[0]
310
if err in _bad_file_descriptor:
311
return # Not a socket indicates read() will fail
312
elif err == errno.EINTR:
313
# Interrupted, keep looping.
318
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
319
% (timeout_seconds,))
321
237
def _serve_one_request(self, protocol):
322
238
"""Read one request from input, process, send back a response.
324
240
:param protocol: a SmartServerRequestProtocol.
329
243
self._serve_one_request_unguarded(protocol)
330
244
except KeyboardInterrupt:
347
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
349
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
353
266
:param sock: the socket the server will read from. It will be put
354
267
into blocking mode.
356
269
SmartServerStreamMedium.__init__(
357
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
359
271
sock.setblocking(True)
360
272
self.socket = sock
361
# Get the getpeername now, as we might be closed later when we care.
363
self._client_info = sock.getpeername()
365
self._client_info = '<unknown>'
368
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
371
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
374
274
def _serve_one_request_unguarded(self, protocol):
375
275
while protocol.next_read_size():
385
285
self._push_back(protocol.unused_data)
387
def _disconnect_client(self):
388
"""Close the current connection. We stopped due to a timeout/etc."""
391
def _wait_for_bytes_with_timeout(self, timeout_seconds):
392
"""Wait for more bytes to be read, but timeout if none available.
394
This allows us to detect idle connections, and stop trying to read from
395
them, without setting the socket itself to non-blocking. This also
396
allows us to specify when we watch for idle timeouts.
398
:return: None, this will raise ConnectionTimeout if we time out before
401
return self._wait_on_descriptor(self.socket, timeout_seconds)
403
287
def _read_bytes(self, desired_count):
404
288
return osutils.read_bytes_from_socket(
405
289
self.socket, self._report_activity)
423
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
425
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
def __init__(self, in_file, out_file, backing_transport):
426
310
"""Construct new server.
428
312
:param in_file: Python file from which requests can be read.
429
313
:param out_file: Python file to write responses.
430
314
:param backing_transport: Transport for the directory served.
432
SmartServerStreamMedium.__init__(self, backing_transport,
316
SmartServerStreamMedium.__init__(self, backing_transport)
434
317
if sys.platform == 'win32':
435
318
# force binary mode for files
441
324
self._in = in_file
442
325
self._out = out_file
445
"""See SmartServerStreamMedium.serve"""
446
# This is the regular serve, except it adds signal trapping for soft
448
stop_gracefully = self._stop_gracefully
449
signals.register_on_hangup(id(self), stop_gracefully)
451
return super(SmartServerPipeStreamMedium, self).serve()
453
signals.unregister_on_hangup(id(self))
455
327
def _serve_one_request_unguarded(self, protocol):
457
329
# We need to be careful not to read past the end of the current
471
343
protocol.accept_bytes(bytes)
473
def _disconnect_client(self):
478
def _wait_for_bytes_with_timeout(self, timeout_seconds):
479
"""Wait for more bytes to be read, but timeout if none available.
481
This allows us to detect idle connections, and stop trying to read from
482
them, without setting the socket itself to non-blocking. This also
483
allows us to specify when we watch for idle timeouts.
485
:return: None, this will raise ConnectionTimeout if we time out before
488
if (getattr(self._in, 'fileno', None) is None
489
or sys.platform == 'win32'):
490
# You can't select() file descriptors on Windows.
492
return self._wait_on_descriptor(self._in, timeout_seconds)
494
345
def _read_bytes(self, desired_count):
495
346
return self._in.read(desired_count)
640
491
return self._medium._get_line()
643
class _VfsRefuser(object):
644
"""An object that refuses all VFS requests.
649
client._SmartClient.hooks.install_named_hook(
650
'call', self.check_vfs, 'vfs refuser')
652
def check_vfs(self, params):
654
request_method = request.request_handlers.get(params.method)
656
# A method we don't know about doesn't count as a VFS method.
658
if issubclass(request_method, vfs.VfsRequest):
659
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
662
494
class _DebugCounter(object):
663
495
"""An object that counts the HPSS calls made to each client medium.
711
543
value['count'] = 0
712
544
value['vfs_count'] = 0
714
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
715
count, vfs_count, medium_repr))
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
717
549
def flush_all(self):
718
550
for ref in list(self.counts.keys()):
721
553
_debug_counter = None
725
556
class SmartClientMedium(SmartMedium):
742
573
if _debug_counter is None:
743
574
_debug_counter = _DebugCounter()
744
575
_debug_counter.track(self)
745
if 'hpss_client_no_vfs' in debug.debug_flags:
747
if _vfs_refuser is None:
748
_vfs_refuser = _VfsRefuser()
750
577
def _is_remote_before(self, version_tuple):
751
578
"""Is it possible the remote side supports RPCs for a given version?
843
670
medium_base = urlutils.join(self.base, '/')
844
671
rel_url = urlutils.relative_url(medium_base, transport.base)
845
return urlutils.unquote(rel_url)
672
return urllib.unquote(rel_url)
848
675
class SmartClientStreamMedium(SmartClientMedium):
884
711
return SmartClientStreamMediumRequest(self)
887
"""We have been disconnected, reset current state.
889
This resets things like _current_request and connected state.
892
self._current_request = None
895
714
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
896
715
"""A client medium using simple pipes.
906
725
def _accept_bytes(self, bytes):
907
726
"""See SmartClientStreamMedium.accept_bytes."""
909
self._writeable_pipe.write(bytes)
911
if e.errno in (errno.EINVAL, errno.EPIPE):
912
raise errors.ConnectionReset(
913
"Error trying to write to subprocess", e)
727
self._writeable_pipe.write(bytes)
915
728
self._report_activity(len(bytes), 'write')
917
730
def _flush(self):
918
731
"""See SmartClientStreamMedium._flush()."""
919
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
920
# However, testing shows that even when the child process is
921
# gone, this doesn't error.
922
732
self._writeable_pipe.flush()
924
734
def _read_bytes(self, count):
944
754
class SmartSSHClientMedium(SmartClientStreamMedium):
945
755
"""A client medium using SSH.
947
It delegates IO to a SmartSimplePipesClientMedium or
757
It delegates IO to a SmartClientSocketMedium or
948
758
SmartClientAlreadyConnectedSocketMedium (depending on platform).
974
784
maybe_port = ':%s' % self._ssh_params.port
975
if self._ssh_params.username is None:
978
maybe_user = '%s@' % self._ssh_params.username
979
return "%s(%s://%s%s%s/)" % (
785
return "%s(%s://%s@%s%s/)" % (
980
786
self.__class__.__name__,
788
self._ssh_params.username,
983
789
self._ssh_params.host,
1022
828
raise AssertionError(
1023
829
"Unexpected io_kind %r from %r"
1024
830
% (io_kind, self._ssh_connection))
1025
for hook in transport.Transport.hooks["post_connect"]:
1028
832
def _flush(self):
1029
833
"""See SmartClientStreamMedium._flush()."""
1132
936
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1133
937
(self._host, port, err_msg))
1134
938
self._connected = True
1135
for hook in transport.Transport.hooks["post_connect"]:
1139
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):