137
class _SFTPReadvHelper(object):
138
"""A class to help with managing the state of a readv request."""
140
# See _get_requests for an explanation.
141
_max_request_size = 32768
143
def __init__(self, original_offsets, relpath, _report_activity):
144
"""Create a new readv helper.
146
:param original_offsets: The original requests given by the caller of
148
:param relpath: The name of the file (if known)
149
:param _report_activity: A Transport._report_activity bound method,
150
to be called as data arrives.
152
self.original_offsets = list(original_offsets)
153
self.relpath = relpath
154
self._report_activity = _report_activity
156
def _get_requests(self):
157
"""Break up the offsets into individual requests over sftp.
159
The SFTP spec only requires implementers to support 32kB requests. We
160
could try something larger (openssh supports 64kB), but then we have to
161
handle requests that fail.
162
So instead, we just break up our maximum chunks into 32kB chunks, and
163
asyncronously requests them.
164
Newer versions of paramiko would do the chunking for us, but we want to
165
start processing results right away, so we do it ourselves.
167
# TODO: Because we issue async requests, we don't 'fudge' any extra
168
# data. I'm not 100% sure that is the best choice.
170
# The first thing we do, is to collapse the individual requests as much
171
# as possible, so we don't issues requests <32kB
172
sorted_offsets = sorted(self.original_offsets)
173
coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
174
limit=0, fudge_factor=0))
176
for c_offset in coalesced:
177
start = c_offset.start
178
size = c_offset.length
180
# Break this up into 32kB requests
182
next_size = min(size, self._max_request_size)
183
requests.append((start, next_size))
186
if 'sftp' in debug.debug_flags:
187
mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
188
self.relpath, len(sorted_offsets), len(coalesced),
192
def request_and_yield_offsets(self, fp):
193
"""Request the data from the remote machine, yielding the results.
195
:param fp: A Paramiko SFTPFile object that supports readv.
196
:return: Yield the data requested by the original readv caller, one by
199
requests = self._get_requests()
200
offset_iter = iter(self.original_offsets)
201
cur_offset, cur_size = offset_iter.next()
202
# paramiko .readv() yields strings that are in the order of the requests
203
# So we track the current request to know where the next data is
204
# being returned from.
210
# This is used to buffer chunks which we couldn't process yet
211
# It is (start, end, data) tuples.
213
# Create an 'unlimited' data stream, so we stop based on requests,
214
# rather than just because the data stream ended. This lets us detect
216
data_stream = itertools.chain(fp.readv(requests),
217
itertools.repeat(None))
218
for (start, length), data in itertools.izip(requests, data_stream):
220
if cur_coalesced is not None:
221
raise errors.ShortReadvError(self.relpath,
222
start, length, len(data))
223
if len(data) != length:
224
raise errors.ShortReadvError(self.relpath,
225
start, length, len(data))
226
self._report_activity(length, 'read')
228
# This is the first request, just buffer it
229
buffered_data = [data]
230
buffered_len = length
232
elif start == last_end:
233
# The data we are reading fits neatly on the previous
234
# buffer, so this is all part of a larger coalesced range.
235
buffered_data.append(data)
236
buffered_len += length
238
# We have an 'interrupt' in the data stream. So we know we are
239
# at a request boundary.
241
# We haven't consumed the buffer so far, so put it into
242
# data_chunks, and continue.
243
buffered = ''.join(buffered_data)
244
data_chunks.append((input_start, buffered))
246
buffered_data = [data]
247
buffered_len = length
248
last_end = start + length
249
if input_start == cur_offset and cur_size <= buffered_len:
250
# Simplify the next steps a bit by transforming buffered_data
251
# into a single string. We also have the nice property that
252
# when there is only one string ''.join([x]) == x, so there is
254
buffered = ''.join(buffered_data)
255
# Clean out buffered data so that we keep memory
259
# TODO: We *could* also consider the case where cur_offset is in
260
# in the buffered range, even though it doesn't *start*
261
# the buffered range. But for packs we pretty much always
262
# read in order, so you won't get any extra data in the
264
while (input_start == cur_offset
265
and (buffered_offset + cur_size) <= buffered_len):
266
# We've buffered enough data to process this request, spit it
268
cur_data = buffered[buffered_offset:buffered_offset + cur_size]
269
# move the direct pointer into our buffered data
270
buffered_offset += cur_size
271
# Move the start-of-buffer pointer
272
input_start += cur_size
273
# Yield the requested data
274
yield cur_offset, cur_data
275
cur_offset, cur_size = offset_iter.next()
276
# at this point, we've consumed as much of buffered as we can,
277
# so break off the portion that we consumed
278
if buffered_offset == len(buffered_data):
279
# No tail to leave behind
283
buffered = buffered[buffered_offset:]
284
buffered_data = [buffered]
285
buffered_len = len(buffered)
287
buffered = ''.join(buffered_data)
289
data_chunks.append((input_start, buffered))
291
if 'sftp' in debug.debug_flags:
292
mutter('SFTP readv left with %d out-of-order bytes',
293
sum(map(lambda x: len(x[1]), data_chunks)))
294
# We've processed all the readv data, at this point, anything we
295
# couldn't process is in data_chunks. This doesn't happen often, so
296
# this code path isn't optimized
297
# We use an interesting process for data_chunks
298
# Specifically if we have "bisect_left([(start, len, entries)],
300
# If start == qstart, then we get the specific node. Otherwise we
301
# get the previous node
303
idx = bisect.bisect_left(data_chunks, (cur_offset,))
304
if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
305
# The data starts here
306
data = data_chunks[idx][1][:cur_size]
308
# The data is in a portion of a previous page
310
sub_offset = cur_offset - data_chunks[idx][0]
311
data = data_chunks[idx][1]
312
data = data[sub_offset:sub_offset + cur_size]
314
# We are missing the page where the data should be found,
317
if len(data) != cur_size:
318
raise AssertionError('We must have miscalulated.'
319
' We expected %d bytes, but only found %d'
320
% (cur_size, len(data)))
321
yield cur_offset, data
322
cur_offset, cur_size = offset_iter.next()
325
class SFTPTransport(ConnectedTransport):
138
class SFTPUrlHandling(Transport):
139
"""Mix-in that does common handling of SSH/SFTP URLs."""
141
def __init__(self, base):
142
self._parse_url(base)
143
base = self._unparse_url(self._path)
146
super(SFTPUrlHandling, self).__init__(base)
148
def _parse_url(self, url):
150
self._username, self._password,
151
self._host, self._port, self._path) = self._split_url(url)
153
def _unparse_url(self, path):
154
"""Return a URL for a path relative to this transport.
156
path = urllib.quote(path)
157
# handle homedir paths
158
if not path.startswith('/'):
160
netloc = urllib.quote(self._host)
161
if self._username is not None:
162
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
163
if self._port is not None:
164
netloc = '%s:%d' % (netloc, self._port)
165
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
167
def _split_url(self, url):
168
(scheme, username, password, host, port, path) = split_url(url)
169
## assert scheme == 'sftp'
171
# the initial slash should be removed from the path, and treated
172
# as a homedir relative path (the path begins with a double slash
173
# if it is absolute).
174
# see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
175
# RBC 20060118 we are not using this as its too user hostile. instead
176
# we are following lftp and using /~/foo to mean '~/foo'.
177
# handle homedir paths
178
if path.startswith('/~/'):
182
return (scheme, username, password, host, port, path)
184
def abspath(self, relpath):
185
"""Return the full url to the given relative path.
187
@param relpath: the relative path or path components
188
@type relpath: str or list
190
return self._unparse_url(self._remote_path(relpath))
192
def _remote_path(self, relpath):
193
"""Return the path to be passed along the sftp protocol for relpath.
195
:param relpath: is a urlencoded string.
197
return self._combine_paths(self._path, relpath)
200
class SFTPTransport(SFTPUrlHandling):
326
201
"""Transport implementation for SFTP access."""
328
203
_do_prefetch = _default_do_prefetch
343
218
# up the request itself, rather than us having to worry about it
344
219
_max_request_size = 32768
346
def __init__(self, base, _from_transport=None):
347
super(SFTPTransport, self).__init__(base,
348
_from_transport=_from_transport)
221
def __init__(self, base, clone_from=None):
222
super(SFTPTransport, self).__init__(base)
223
if clone_from is None:
226
# use the same ssh connection, etc
227
self._sftp = clone_from._sftp
228
# super saves 'self.base'
230
def should_cache(self):
232
Return True if the data pulled across should be cached locally.
236
def clone(self, offset=None):
238
Return a new SFTPTransport with root at self.base + offset.
239
We share the same SFTP session between such transports, because it's
240
fairly expensive to set them up.
243
return SFTPTransport(self.base, self)
245
return SFTPTransport(self.abspath(offset), self)
350
247
def _remote_path(self, relpath):
351
248
"""Return the path to be passed along the sftp protocol for relpath.
353
:param relpath: is a urlencoded string.
355
relative = urlutils.unescape(relpath).encode('utf-8')
356
remote_path = self._combine_paths(self._path, relative)
357
# the initial slash should be removed from the path, and treated as a
358
# homedir relative path (the path begins with a double slash if it is
359
# absolute). see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
360
# RBC 20060118 we are not using this as its too user hostile. instead
361
# we are following lftp and using /~/foo to mean '~/foo'
362
# vila--20070602 and leave absolute paths begin with a single slash.
363
if remote_path.startswith('/~/'):
364
remote_path = remote_path[3:]
365
elif remote_path == '/~':
369
def _create_connection(self, credentials=None):
370
"""Create a new connection with the provided credentials.
372
:param credentials: The credentials needed to establish the connection.
374
:return: The created connection and its associated credentials.
376
The credentials are only the password as it may have been entered
377
interactively by the user and may be different from the one provided
378
in base url at transport creation time.
380
if credentials is None:
381
password = self._password
250
relpath is a urlencoded string.
252
:return: a path prefixed with / for regular abspath-based urls, or a
253
path that does not begin with / for urls which begin with /~/.
255
# how does this work?
256
# it processes relpath with respect to
258
# firstly we create a path to evaluate:
259
# if relpath is an abspath or homedir path, its the entire thing
260
# otherwise we join our base with relpath
261
# then we eliminate all empty segments (double //'s) outside the first
262
# two elements of the list. This avoids problems with trailing
263
# slashes, or other abnormalities.
264
# finally we evaluate the entire path in a single pass
266
# '..' result in popping the left most already
267
# processed path (which can never be empty because of the check for
268
# abspath and homedir meaning that its not, or that we've used our
269
# path. If the pop would pop the root, we ignore it.
271
# Specific case examinations:
272
# remove the special casefor ~: if the current root is ~/ popping of it
273
# = / thus our seed for a ~ based path is ['', '~']
274
# and if we end up with [''] then we had basically ('', '..') (which is
275
# '/..' so we append '' if the length is one, and assert that the first
276
# element is still ''. Lastly, if we end with ['', '~'] as a prefix for
277
# the output, we've got a homedir path, so we strip that prefix before
278
# '/' joining the resulting list.
280
# case one: '/' -> ['', ''] cannot shrink
281
# case two: '/' + '../foo' -> ['', 'foo'] (take '', '', '..', 'foo')
282
# and pop the second '' for the '..', append 'foo'
283
# case three: '/~/' -> ['', '~', '']
284
# case four: '/~/' + '../foo' -> ['', '~', '', '..', 'foo'],
285
# and we want to get '/foo' - the empty path in the middle
286
# needs to be stripped, then normal path manipulation will
288
# case five: '/..' ['', '..'], we want ['', '']
289
# stripping '' outside the first two is ok
290
# ignore .. if its too high up
292
# lastly this code is possibly reusable by FTP, but not reusable by
293
# local paths: ~ is resolvable correctly, nor by HTTP or the smart
294
# server: ~ is resolved remotely.
296
# however, a version of this that acts on self.base is possible to be
297
# written which manipulates the URL in canonical form, and would be
298
# reusable for all transports, if a flag for allowing ~/ at all was
300
assert isinstance(relpath, basestring)
301
relpath = urlutils.unescape(relpath)
304
if relpath.startswith('/'):
305
# abspath - normal split is fine.
306
current_path = relpath.split('/')
307
elif relpath.startswith('~/'):
308
# root is homedir based: normal split and prefix '' to remote the
310
current_path = [''].extend(relpath.split('/'))
383
password = credentials
385
vendor = ssh._get_ssh_vendor()
388
auth = config.AuthenticationConfig()
389
user = auth.get_user('ssh', self._host, self._port)
390
connection = vendor.connect_sftp(self._user, password,
391
self._host, self._port)
392
return connection, (user, password)
395
"""Ensures that a connection is established"""
396
connection = self._get_connection()
397
if connection is None:
398
# First connection ever
399
connection, credentials = self._create_connection()
400
self._set_connection(connection, credentials)
312
# root is from the current directory:
313
if self._path.startswith('/'):
314
# abspath, take the regular split
317
# homedir based, add the '', '~' not present in self._path
318
current_path = ['', '~']
319
# add our current dir
320
current_path.extend(self._path.split('/'))
321
# add the users relpath
322
current_path.extend(relpath.split('/'))
323
# strip '' segments that are not in the first one - the leading /.
324
to_process = current_path[:1]
325
for segment in current_path[1:]:
327
to_process.append(segment)
329
# process '.' and '..' segments into output_path.
331
for segment in to_process:
333
# directory pop. Remove a directory
334
# as long as we are not at the root
335
if len(output_path) > 1:
338
# cannot pop beyond the root, so do nothing
340
continue # strip the '.' from the output.
342
# this will append '' to output_path for the root elements,
343
# which is appropriate: its why we strip '' in the first pass.
344
output_path.append(segment)
346
# check output special cases:
347
if output_path == ['']:
349
output_path = ['', '']
350
elif output_path[:2] == ['', '~']:
351
# ['', '~', ...] -> ...
352
output_path = output_path[2:]
353
path = '/'.join(output_path)
356
def relpath(self, abspath):
357
scheme, username, password, host, port, path = self._split_url(abspath)
359
if (username != self._username):
360
error.append('username mismatch')
361
if (host != self._host):
362
error.append('host mismatch')
363
if (port != self._port):
364
error.append('port mismatch')
365
if (not path.startswith(self._path)):
366
error.append('path mismatch')
368
extra = ': ' + ', '.join(error)
369
raise PathNotChild(abspath, self.base, extra=extra)
371
return path[pl:].strip('/')
403
373
def has(self, relpath):
405
375
Does the target location exist?
408
self._get_sftp().stat(self._remote_path(relpath))
409
# stat result is about 20 bytes, let's say
410
self._report_activity(20, 'read')
378
self._sftp.stat(self._remote_path(relpath))
415
383
def get(self, relpath):
416
"""Get the file at the given relative path.
385
Get the file at the given relative path.
418
387
:param relpath: The relative path to the file
421
# FIXME: by returning the file directly, we don't pass this
422
# through to report_activity. We could try wrapping the object
423
# before it's returned. For readv and get_bytes it's handled in
424
# the higher-level function.
426
390
path = self._remote_path(relpath)
427
f = self._get_sftp().file(path, mode='rb')
391
f = self._sftp.file(path, mode='rb')
428
392
if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
431
395
except (IOError, paramiko.SSHException), e:
432
self._translate_io_exception(e, path, ': error retrieving',
433
failure_exc=errors.ReadError)
435
def get_bytes(self, relpath):
436
# reimplement this here so that we can report how many bytes came back
437
f = self.get(relpath)
440
self._report_activity(len(bytes), 'read')
445
def _readv(self, relpath, offsets):
396
self._translate_io_exception(e, path, ': error retrieving')
398
def readv(self, relpath, offsets):
446
399
"""See Transport.readv()"""
447
400
# We overload the default readv() because we want to use a file
448
401
# that does not have prefetch enabled.
454
407
path = self._remote_path(relpath)
455
fp = self._get_sftp().file(path, mode='rb')
408
fp = self._sftp.file(path, mode='rb')
456
409
readv = getattr(fp, 'readv', None)
458
411
return self._sftp_readv(fp, offsets, relpath)
459
if 'sftp' in debug.debug_flags:
460
mutter('seek and read %s offsets', len(offsets))
412
mutter('seek and read %s offsets', len(offsets))
461
413
return self._seek_and_read(fp, offsets, relpath)
462
414
except (IOError, paramiko.SSHException), e:
463
415
self._translate_io_exception(e, path, ': error retrieving')
465
def recommended_page_size(self):
466
"""See Transport.recommended_page_size().
468
For SFTP we suggest a large page size to reduce the overhead
469
introduced by latency.
473
def _sftp_readv(self, fp, offsets, relpath):
417
def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
474
418
"""Use the readv() member of fp to do async readv.
476
Then read them using paramiko.readv(). paramiko.readv()
420
And then read them using paramiko.readv(). paramiko.readv()
477
421
does not support ranges > 64K, so it caps the request size, and
478
just reads until it gets all the stuff it wants.
422
just reads until it gets all the stuff it wants
480
helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
481
return helper.request_and_yield_offsets(fp)
424
offsets = list(offsets)
425
sorted_offsets = sorted(offsets)
427
# The algorithm works as follows:
428
# 1) Coalesce nearby reads into a single chunk
429
# This generates a list of combined regions, the total size
430
# and the size of the sub regions. This coalescing step is limited
431
# in the number of nearby chunks to combine, and is allowed to
432
# skip small breaks in the requests. Limiting it makes sure that
433
# we can start yielding some data earlier, and skipping means we
434
# make fewer requests. (Beneficial even when using async)
435
# 2) Break up this combined regions into chunks that are smaller
436
# than 64KiB. Technically the limit is 65536, but we are a
437
# little bit conservative. This is because sftp has a maximum
438
# return chunk size of 64KiB (max size of an unsigned short)
439
# 3) Issue a readv() to paramiko to create an async request for
441
# 4) Read in the data as it comes back, until we've read one
442
# continuous section as determined in step 1
443
# 5) Break up the full sections into hunks for the original requested
444
# offsets. And put them in a cache
445
# 6) Check if the next request is in the cache, and if it is, remove
446
# it from the cache, and yield its data. Continue until no more
447
# entries are in the cache.
448
# 7) loop back to step 4 until all data has been read
450
# TODO: jam 20060725 This could be optimized one step further, by
451
# attempting to yield whatever data we have read, even before
452
# the first coallesced section has been fully processed.
454
# When coalescing for use with readv(), we don't really need to
455
# use any fudge factor, because the requests are made asynchronously
456
coalesced = list(self._coalesce_offsets(sorted_offsets,
457
limit=self._max_readv_combine,
461
for c_offset in coalesced:
462
start = c_offset.start
463
size = c_offset.length
465
# We need to break this up into multiple requests
467
next_size = min(size, self._max_request_size)
468
requests.append((start, next_size))
472
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
473
len(offsets), len(coalesced), len(requests))
475
# Queue the current read until we have read the full coalesced section
478
cur_coalesced_stack = iter(coalesced)
479
cur_coalesced = cur_coalesced_stack.next()
481
# Cache the results, but only until they have been fulfilled
483
# turn the list of offsets into a stack
484
offset_stack = iter(offsets)
485
cur_offset_and_size = offset_stack.next()
487
for data in fp.readv(requests):
489
cur_data_len += len(data)
491
if cur_data_len < cur_coalesced.length:
493
assert cur_data_len == cur_coalesced.length, \
494
"Somehow we read too much: %s != %s" % (cur_data_len,
495
cur_coalesced.length)
496
all_data = ''.join(cur_data)
500
for suboffset, subsize in cur_coalesced.ranges:
501
key = (cur_coalesced.start+suboffset, subsize)
502
data_map[key] = all_data[suboffset:suboffset+subsize]
504
# Now that we've read some data, see if we can yield anything back
505
while cur_offset_and_size in data_map:
506
this_data = data_map.pop(cur_offset_and_size)
507
yield cur_offset_and_size[0], this_data
508
cur_offset_and_size = offset_stack.next()
510
# We read a coalesced entry, so mark it as done
512
# Now that we've read all of the data for this coalesced section
514
cur_coalesced = cur_coalesced_stack.next()
516
if cur_coalesced is not None:
517
raise errors.ShortReadvError(relpath, cur_coalesced.start,
518
cur_coalesced.length, len(data))
483
520
def put_file(self, relpath, f, mode=None):