65
53
from paramiko.sftp_file import SFTPFile
66
54
from paramiko.sftp_client import SFTPClient
69
register_urlparse_netloc_protocol('sftp')
73
# TODO: This should possibly ignore SIGHUP as well, but bzr currently
74
# doesn't handle it itself.
75
# <https://launchpad.net/products/bzr/+bug/41433/+index>
77
signal.signal(signal.SIGINT, signal.SIG_IGN)
80
def os_specific_subprocess_params():
81
"""Get O/S specific subprocess parameters."""
82
if sys.platform == 'win32':
83
# setting the process group and closing fds is not supported on
87
# We close fds other than the pipes as the child process does not need
90
# We also set the child process to ignore SIGINT. Normally the signal
91
# would be sent to every process in the foreground process group, but
92
# this causes it to be seen only by bzr and not by ssh. Python will
93
# generate a KeyboardInterrupt in bzr, and we will then have a chance
94
# to release locks or do other cleanup over ssh before the connection
96
# <https://launchpad.net/products/bzr/+bug/5987>
98
# Running it in a separate process group is not good because then it
99
# can't get non-echoed input of a password or passphrase.
100
# <https://launchpad.net/products/bzr/+bug/40508>
101
return {'preexec_fn': _ignore_sigint,
106
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
107
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
108
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
110
# Paramiko 1.5 tries to open a socket.AF_UNIX in order to connect
111
# to ssh-agent. That attribute doesn't exist on win32 (it does in cygwin)
112
# so we get an AttributeError exception. So we will not try to
113
# connect to an agent if we are on win32 and using Paramiko older than 1.6
114
_use_ssh_agent = (sys.platform != 'win32' or _paramiko_version >= (1, 6, 0))
56
if 'sftp' not in urlparse.uses_netloc: urlparse.uses_netloc.append('sftp')
60
if sys.platform == 'win32':
61
# close_fds not supported on win32
117
64
_ssh_vendor = None
118
65
def _get_ssh_vendor():
314
class SFTPTransport(Transport):
315
"""Transport implementation for SFTP access."""
317
_do_prefetch = _default_do_prefetch
318
# TODO: jam 20060717 Conceivably these could be configurable, either
319
# by auto-tuning at run-time, or by a configuration (per host??)
320
# but the performance curve is pretty flat, so just going with
321
# reasonable defaults.
322
_max_readv_combine = 200
323
# Having to round trip to the server means waiting for a response,
324
# so it is better to download extra bytes.
325
# 8KiB had good performance for both local and remote network operations
326
_bytes_to_read_before_seek = 8192
328
# The sftp spec says that implementations SHOULD allow reads
329
# to be at least 32K. paramiko.readv() does an async request
330
# for the chunks. So we need to keep it within a single request
331
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
332
# up the request itself, rather than us having to worry about it
333
_max_request_size = 32768
247
class SFTPTransport (Transport):
249
Transport implementation for SFTP access.
251
_do_prefetch = False # Right now Paramiko's prefetch support causes things to hang
335
253
def __init__(self, base, clone_from=None):
336
254
assert base.startswith('sftp://')
337
255
self._parse_url(base)
338
256
base = self._unparse_url()
339
257
if base[-1] != '/':
341
259
super(SFTPTransport, self).__init__(base)
342
260
if clone_from is None:
343
261
self._sftp_connect()
437
354
path = self._remote_path(relpath)
438
355
f = self._sftp.file(path, mode='rb')
439
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
356
if self._do_prefetch and hasattr(f, 'prefetch'):
442
359
except (IOError, paramiko.SSHException), e:
443
360
self._translate_io_exception(e, path, ': error retrieving')
445
def readv(self, relpath, offsets):
446
"""See Transport.readv()"""
447
# We overload the default readv() because we want to use a file
448
# that does not have prefetch enabled.
449
# Also, if we have a new paramiko, it implements an async readv()
454
path = self._remote_path(relpath)
455
fp = self._sftp.file(path, mode='rb')
456
readv = getattr(fp, 'readv', None)
458
return self._sftp_readv(fp, offsets)
459
mutter('seek and read %s offsets', len(offsets))
460
return self._seek_and_read(fp, offsets)
461
except (IOError, paramiko.SSHException), e:
462
self._translate_io_exception(e, path, ': error retrieving')
464
def _sftp_readv(self, fp, offsets):
465
"""Use the readv() member of fp to do async readv.
467
And then read them using paramiko.readv(). paramiko.readv()
468
does not support ranges > 64K, so it caps the request size, and
469
just reads until it gets all the stuff it wants
471
offsets = list(offsets)
472
sorted_offsets = sorted(offsets)
474
# The algorithm works as follows:
475
# 1) Coalesce nearby reads into a single chunk
476
# This generates a list of combined regions, the total size
477
# and the size of the sub regions. This coalescing step is limited
478
# in the number of nearby chunks to combine, and is allowed to
479
# skip small breaks in the requests. Limiting it makes sure that
480
# we can start yielding some data earlier, and skipping means we
481
# make fewer requests. (Beneficial even when using async)
482
# 2) Break up this combined regions into chunks that are smaller
483
# than 64KiB. Technically the limit is 65536, but we are a
484
# little bit conservative. This is because sftp has a maximum
485
# return chunk size of 64KiB (max size of an unsigned short)
486
# 3) Issue a readv() to paramiko to create an async request for
488
# 4) Read in the data as it comes back, until we've read one
489
# continuous section as determined in step 1
490
# 5) Break up the full sections into hunks for the original requested
491
# offsets. And put them in a cache
492
# 6) Check if the next request is in the cache, and if it is, remove
493
# it from the cache, and yield its data. Continue until no more
494
# entries are in the cache.
495
# 7) loop back to step 4 until all data has been read
497
# TODO: jam 20060725 This could be optimized one step further, by
498
# attempting to yield whatever data we have read, even before
499
# the first coallesced section has been fully processed.
501
# When coalescing for use with readv(), we don't really need to
502
# use any fudge factor, because the requests are made asynchronously
503
coalesced = list(self._coalesce_offsets(sorted_offsets,
504
limit=self._max_readv_combine,
508
for c_offset in coalesced:
509
start = c_offset.start
510
size = c_offset.length
512
# We need to break this up into multiple requests
514
next_size = min(size, self._max_request_size)
515
requests.append((start, next_size))
519
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
520
len(offsets), len(coalesced), len(requests))
522
# Queue the current read until we have read the full coalesced section
525
cur_coalesced_stack = iter(coalesced)
526
cur_coalesced = cur_coalesced_stack.next()
528
# Cache the results, but only until they have been fulfilled
530
# turn the list of offsets into a stack
531
offset_stack = iter(offsets)
532
cur_offset_and_size = offset_stack.next()
534
for data in fp.readv(requests):
536
cur_data_len += len(data)
538
if cur_data_len < cur_coalesced.length:
540
assert cur_data_len == cur_coalesced.length, \
541
"Somehow we read too much: %s != %s" % (cur_data_len,
542
cur_coalesced.length)
543
all_data = ''.join(cur_data)
547
for suboffset, subsize in cur_coalesced.ranges:
548
key = (cur_coalesced.start+suboffset, subsize)
549
data_map[key] = all_data[suboffset:suboffset+subsize]
551
# Now that we've read some data, see if we can yield anything back
552
while cur_offset_and_size in data_map:
553
this_data = data_map.pop(cur_offset_and_size)
554
yield cur_offset_and_size[0], this_data
555
cur_offset_and_size = offset_stack.next()
557
# Now that we've read all of the data for this coalesced section
559
cur_coalesced = cur_coalesced_stack.next()
362
def get_partial(self, relpath, start, length=None):
364
Get just part of a file.
366
:param relpath: Path to the file, relative to base
367
:param start: The starting position to read from
368
:param length: The length to read. A length of None indicates
369
read to the end of the file.
370
:return: A file-like object containing at least the specified bytes.
371
Some implementations may return objects which can be read
372
past this length, but this is not guaranteed.
374
# TODO: implement get_partial_multi to help with knit support
375
f = self.get(relpath)
377
if self._do_prefetch and hasattr(f, 'prefetch'):
561
381
def put(self, relpath, f, mode=None):
671
490
path = self._remote_path(relpath)
672
491
fout = self._sftp.file(path, 'ab')
674
self._sftp.chmod(path, mode)
676
492
self._pump(f, fout)
678
493
except (IOError, paramiko.SSHException), e:
679
494
self._translate_io_exception(e, relpath, ': unable to append')
681
def rename(self, rel_from, rel_to):
682
"""Rename without special overwriting"""
684
self._sftp.rename(self._remote_path(rel_from),
685
self._remote_path(rel_to))
686
except (IOError, paramiko.SSHException), e:
687
self._translate_io_exception(e, rel_from,
688
': unable to rename to %r' % (rel_to))
690
def _rename_and_overwrite(self, abs_from, abs_to):
496
def _rename(self, abs_from, abs_to):
691
497
"""Do a fancy rename on the remote server.
693
499
Using the implementation provided by osutils.
782
588
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
783
589
if self._port is not None:
784
590
netloc = '%s:%d' % (netloc, self._port)
785
592
return urlparse.urlunparse(('sftp', netloc, path, '', '', ''))
787
594
def _split_url(self, url):
788
(scheme, username, password, host, port, path) = split_url(url)
595
if isinstance(url, unicode):
596
url = url.encode('utf-8')
597
(scheme, netloc, path, params,
598
query, fragment) = urlparse.urlparse(url, allow_fragments=False)
789
599
assert scheme == 'sftp'
600
username = password = host = port = None
602
username, host = netloc.split('@', 1)
604
username, password = username.split(':', 1)
605
password = urllib.unquote(password)
606
username = urllib.unquote(username)
611
host, port = host.rsplit(':', 1)
615
# TODO: Should this be ConnectionError?
616
raise TransportError('%s: invalid port number' % port)
617
host = urllib.unquote(host)
619
path = urllib.unquote(path)
791
621
# the initial slash should be removed from the path, and treated
792
622
# as a homedir relative path (the path begins with a double slash
824
654
vendor = _get_ssh_vendor()
825
655
if vendor == 'loopback':
826
656
sock = socket.socket()
828
sock.connect((self._host, self._port))
829
except socket.error, e:
830
raise ConnectionError('Unable to connect to SSH host %s:%s: %s'
831
% (self._host, self._port, e))
657
sock.connect((self._host, self._port))
832
658
self._sftp = SFTPClient(LoopbackSFTP(sock))
833
659
elif vendor != 'none':
835
sock = SFTPSubprocess(self._host, vendor, self._port,
837
self._sftp = SFTPClient(sock)
838
except (EOFError, paramiko.SSHException), e:
839
raise ConnectionError('Unable to connect to SSH host %s:%s: %s'
840
% (self._host, self._port, e))
841
except (OSError, IOError), e:
842
# If the machine is fast enough, ssh can actually exit
843
# before we try and send it the sftp request, which
844
# raises a Broken Pipe
845
if e.errno not in (errno.EPIPE,):
847
raise ConnectionError('Unable to connect to SSH host %s:%s: %s'
848
% (self._host, self._port, e))
660
sock = SFTPSubprocess(self._host, vendor, self._port,
662
self._sftp = SFTPClient(sock)
850
664
self._paramiko_connect()
1030
847
self._socket.bind(('localhost', 0))
1031
848
self._socket.listen(1)
1032
849
self.port = self._socket.getsockname()[1]
1033
self._stop_event = threading.Event()
850
self.stop_event = threading.Event()
853
s, _ = self._socket.accept()
854
# now close the listen socket
857
self._callback(s, self.stop_event)
859
pass #Ignore socket errors
861
# probably a failed test
862
warning('Exception from within unit test server thread: %r' % x)
1036
# called from outside this thread
1037
self._stop_event.set()
865
self.stop_event.set()
1038
866
# use a timeout here, because if the test fails, the server thread may
1039
867
# never notice the stop_event.
1041
self._socket.close()
1045
readable, writable_unused, exception_unused = \
1046
select.select([self._socket], [], [], 0.1)
1047
if self._stop_event.isSet():
1049
if len(readable) == 0:
1052
s, addr_unused = self._socket.accept()
1053
# because the loopback socket is inline, and transports are
1054
# never explicitly closed, best to launch a new thread.
1055
threading.Thread(target=self._callback, args=(s,)).start()
1056
except socket.error, x:
1057
sys.excepthook(*sys.exc_info())
1058
warning('Socket error during accept() within unit test server'
1060
except Exception, x:
1061
# probably a failed test; unit test thread will log the
1063
sys.excepthook(*sys.exc_info())
1064
warning('Exception from within unit test server thread: %r' %
1068
class SocketDelay(object):
1069
"""A socket decorator to make TCP appear slower.
1071
This changes recv, send, and sendall to add a fixed latency to each python
1072
call if a new roundtrip is detected. That is, when a recv is called and the
1073
flag new_roundtrip is set, latency is charged. Every send and send_all
1076
In addition every send, sendall and recv sleeps a bit per character send to
1079
Not all methods are implemented, this is deliberate as this class is not a
1080
replacement for the builtin sockets layer. fileno is not implemented to
1081
prevent the proxy being bypassed.
1085
_proxied_arguments = dict.fromkeys([
1086
"close", "getpeername", "getsockname", "getsockopt", "gettimeout",
1087
"setblocking", "setsockopt", "settimeout", "shutdown"])
1089
def __init__(self, sock, latency, bandwidth=1.0,
1092
:param bandwith: simulated bandwith (MegaBit)
1093
:param really_sleep: If set to false, the SocketDelay will just
1094
increase a counter, instead of calling time.sleep. This is useful for
1095
unittesting the SocketDelay.
1098
self.latency = latency
1099
self.really_sleep = really_sleep
1100
self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
1101
self.new_roundtrip = False
1104
if self.really_sleep:
1107
SocketDelay.simulated_time += s
1109
def __getattr__(self, attr):
1110
if attr in SocketDelay._proxied_arguments:
1111
return getattr(self.sock, attr)
1112
raise AttributeError("'SocketDelay' object has no attribute %r" %
1116
return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
1119
def recv(self, *args):
1120
data = self.sock.recv(*args)
1121
if data and self.new_roundtrip:
1122
self.new_roundtrip = False
1123
self.sleep(self.latency)
1124
self.sleep(len(data) * self.time_per_byte)
1127
def sendall(self, data, flags=0):
1128
if not self.new_roundtrip:
1129
self.new_roundtrip = True
1130
self.sleep(self.latency)
1131
self.sleep(len(data) * self.time_per_byte)
1132
return self.sock.sendall(data, flags)
1134
def send(self, data, flags=0):
1135
if not self.new_roundtrip:
1136
self.new_roundtrip = True
1137
self.sleep(self.latency)
1138
bytes_sent = self.sock.send(data, flags)
1139
self.sleep(bytes_sent * self.time_per_byte)
1143
871
class SFTPServer(Server):