313
314
class SFTPTransport (Transport):
315
Transport implementation for SFTP access.
315
"""Transport implementation for SFTP access"""
317
317
_do_prefetch = _default_do_prefetch
318
# TODO: jam 20060717 Conceivably these could be configurable, either
319
# by auto-tuning at run-time, or by a configuration (per host??)
320
# but the performance curve is pretty flat, so just going with
321
# reasonable defaults.
322
_max_readv_combine = 200
323
# Having to round trip to the server means waiting for a response,
324
# so it is better to download extra bytes.
325
# 8KiB had good performance for both local and remote network operations
326
_bytes_to_read_before_seek = 8192
328
# The sftp spec says that implementations SHOULD allow reads
329
# to be at least 32K. paramiko.readv() does an async request
330
# for the chunks. So we need to keep it within a single request
331
# size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
332
# up the request itself, rather than us having to worry about it
333
_max_request_size = 32768
319
335
def __init__(self, base, clone_from=None):
320
336
assert base.startswith('sftp://')
426
442
except (IOError, paramiko.SSHException), e:
427
443
self._translate_io_exception(e, path, ': error retrieving')
429
def get_partial(self, relpath, start, length=None):
431
Get just part of a file.
433
:param relpath: Path to the file, relative to base
434
:param start: The starting position to read from
435
:param length: The length to read. A length of None indicates
436
read to the end of the file.
437
:return: A file-like object containing at least the specified bytes.
438
Some implementations may return objects which can be read
439
past this length, but this is not guaranteed.
441
# TODO: implement get_partial_multi to help with knit support
442
f = self.get(relpath)
444
if self._do_prefetch and hasattr(f, 'prefetch'):
445
def readv(self, relpath, offsets):
446
"""See Transport.readv()"""
447
# We overload the default readv() because we want to use a file
448
# that does not have prefetch enabled.
449
# Also, if we have a new paramiko, it implements an async readv()
454
path = self._remote_path(relpath)
455
fp = self._sftp.file(path, mode='rb')
456
readv = getattr(fp, 'readv', None)
458
return self._sftp_readv(fp, offsets)
459
mutter('seek and read %s offsets', len(offsets))
460
return self._seek_and_read(fp, offsets)
461
except (IOError, paramiko.SSHException), e:
462
self._translate_io_exception(e, path, ': error retrieving')
464
def _sftp_readv(self, fp, offsets):
465
"""Use the readv() member of fp to do async readv.
467
And then read them using paramiko.readv(). paramiko.readv()
468
does not support ranges > 64K, so it caps the request size, and
469
just reads until it gets all the stuff it wants
471
offsets = list(offsets)
472
sorted_offsets = sorted(offsets)
474
# The algorithm works as follows:
475
# 1) Coalesce nearby reads into a single chunk
476
# This generates a list of combined regions, the total size
477
# and the size of the sub regions. This coalescing step is limited
478
# in the number of nearby chunks to combine, and is allowed to
479
# skip small breaks in the requests. Limiting it makes sure that
480
# we can start yielding some data earlier, and skipping means we
481
# make fewer requests. (Beneficial even when using async)
482
# 2) Break up this combined regions into chunks that are smaller
483
# than 64KiB. Technically the limit is 65536, but we are a
484
# little bit conservative. This is because sftp has a maximum
485
# return chunk size of 64KiB (max size of an unsigned short)
486
# 3) Issue a readv() to paramiko to create an async request for
488
# 4) Read in the data as it comes back, until we've read one
489
# continuous section as determined in step 1
490
# 5) Break up the full sections into hunks for the original requested
491
# offsets. And put them in a cache
492
# 6) Check if the next request is in the cache, and if it is, remove
493
# it from the cache, and yield its data. Continue until no more
494
# entries are in the cache.
495
# 7) loop back to step 4 until all data has been read
497
# TODO: jam 20060725 This could be optimized one step further, by
498
# attempting to yield whatever data we have read, even before
499
# the first coallesced section has been fully processed.
501
# When coalescing for use with readv(), we don't really need to
502
# use any fudge factor, because the requests are made asynchronously
503
coalesced = list(self._coalesce_offsets(sorted_offsets,
504
limit=self._max_readv_combine,
508
for c_offset in coalesced:
509
start = c_offset.start
510
size = c_offset.length
512
# We need to break this up into multiple requests
514
next_size = min(size, self._max_request_size)
515
requests.append((start, next_size))
519
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
520
len(offsets), len(coalesced), len(requests))
522
# Queue the current read until we have read the full coalesced section
525
cur_coalesced_stack = iter(coalesced)
526
cur_coalesced = cur_coalesced_stack.next()
528
# Cache the results, but only until they have been fulfilled
530
# turn the list of offsets into a stack
531
offset_stack = iter(offsets)
532
cur_offset_and_size = offset_stack.next()
534
for data in fp.readv(requests):
536
cur_data_len += len(data)
538
if cur_data_len < cur_coalesced.length:
540
assert cur_data_len == cur_coalesced.length, \
541
"Somehow we read too much: %s != %s" % (cur_data_len,
542
cur_coalesced.length)
543
all_data = ''.join(cur_data)
547
for suboffset, subsize in cur_coalesced.ranges:
548
key = (cur_coalesced.start+suboffset, subsize)
549
data_map[key] = all_data[suboffset:suboffset+subsize]
551
# Now that we've read some data, see if we can yield anything back
552
while cur_offset_and_size in data_map:
553
this_data = data_map.pop(cur_offset_and_size)
554
yield cur_offset_and_size[0], this_data
555
cur_offset_and_size = offset_stack.next()
557
# Now that we've read all of the data for this coalesced section
559
cur_coalesced = cur_coalesced_stack.next()
448
561
def put(self, relpath, f, mode=None):