137
class _SFTPReadvHelper(object):
138
"""A class to help with managing the state of a readv request."""
140
# See _get_requests for an explanation.
141
_max_request_size = 32768
143
def __init__(self, original_offsets, relpath):
144
"""Create a new readv helper.
146
:param original_offsets: The original requests given by the caller of
148
:param relpath: The name of the file (if known)
150
self.original_offsets = list(original_offsets)
151
self.relpath = relpath
153
def _get_requests(self):
154
"""Break up the offsets into individual requests over sftp.
156
The SFTP spec only requires implementers to support 32kB requests. We
157
could try something larger (openssh supports 64kB), but then we have to
158
handle requests that fail.
159
So instead, we just break up our maximum chunks into 32kB chunks, and
160
asyncronously requests them.
161
Newer versions of paramiko would do the chunking for us, but we want to
162
start processing results right away, so we do it ourselves.
164
# TODO: Because we issue async requests, we don't 'fudge' any extra
165
# data. I'm not 100% sure that is the best choice.
167
# The first thing we do, is to collapse the individual requests as much
168
# as possible, so we don't issues requests <32kB
169
sorted_offsets = sorted(self.original_offsets)
170
coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
171
limit=0, fudge_factor=0))
173
for c_offset in coalesced:
174
start = c_offset.start
175
size = c_offset.length
177
# Break this up into 32kB requests
179
next_size = min(size, self._max_request_size)
180
requests.append((start, next_size))
183
mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
184
self.relpath, len(sorted_offsets), len(coalesced),
188
def request_and_yield_offsets(self, fp):
189
"""Request the data from the remote machine, yielding the results.
191
:param fp: A Paramiko SFTPFile object that supports readv.
192
:return: Yield the data requested by the original readv caller, one by
195
requests = self._get_requests()
196
offset_iter = iter(self.original_offsets)
197
cur_offset, cur_size = offset_iter.next()
198
# paramiko .readv() yields strings that are in the order of the requests
199
# So we track the current request to know where the next data is
200
# being returned from.
206
# This is used to buffer chunks which we couldn't process yet
207
# It is (start, end, data) tuples.
209
# Create an 'unlimited' data stream, so we stop based on requests,
210
# rather than just because the data stream ended. This lets us detect
212
data_stream = itertools.chain(fp.readv(requests),
213
itertools.repeat(None))
214
for (start, length), data in itertools.izip(requests, data_stream):
216
if cur_coalesced is not None:
217
raise errors.ShortReadvError(self.relpath,
218
start, length, len(data))
219
if len(data) != length:
220
raise errors.ShortReadvError(self.relpath,
221
start, length, len(data))
223
# This is the first request, just buffer it
224
buffered_data = [data]
225
buffered_len = length
227
elif start == last_end:
228
# The data we are reading fits neatly on the previous
229
# buffer, so this is all part of a larger coalesced range.
230
buffered_data.append(data)
231
buffered_len += length
233
# We have an 'interrupt' in the data stream. So we know we are
234
# at a request boundary.
236
# We haven't consumed the buffer so far, so put it into
237
# data_chunks, and continue.
238
buffered = ''.join(buffered_data)
239
data_chunks.append((input_start, buffered))
241
buffered_data = [data]
242
buffered_len = length
243
last_end = start + length
244
if input_start == cur_offset and cur_size <= buffered_len:
245
# Simplify the next steps a bit by transforming buffered_data
246
# into a single string. We also have the nice property that
247
# when there is only one string ''.join([x]) == x, so there is
249
buffered = ''.join(buffered_data)
250
# Clean out buffered data so that we keep memory
254
# TODO: We *could* also consider the case where cur_offset is in
255
# in the buffered range, even though it doesn't *start*
256
# the buffered range. But for packs we pretty much always
257
# read in order, so you won't get any extra data in the
259
while (input_start == cur_offset
260
and (buffered_offset + cur_size) <= buffered_len):
261
# We've buffered enough data to process this request, spit it
263
cur_data = buffered[buffered_offset:buffered_offset + cur_size]
264
# move the direct pointer into our buffered data
265
buffered_offset += cur_size
266
# Move the start-of-buffer pointer
267
input_start += cur_size
268
# Yield the requested data
269
yield cur_offset, cur_data
270
cur_offset, cur_size = offset_iter.next()
271
# at this point, we've consumed as much of buffered as we can,
272
# so break off the portion that we consumed
273
if buffered_offset == len(buffered_data):
274
# No tail to leave behind
278
buffered = buffered[buffered_offset:]
279
buffered_data = [buffered]
280
buffered_len = len(buffered)
282
buffered = ''.join(buffered_data)
284
data_chunks.append((input_start, buffered))
286
mutter('SFTP readv left with %d out-of-order bytes',
287
sum(map(lambda x: len(x[1]), data_chunks)))
288
# We've processed all the readv data, at this point, anything we
289
# couldn't process is in data_chunks. This doesn't happen often, so
290
# this code path isn't optimized
291
# We use an interesting process for data_chunks
292
# Specifically if we have "bisect_left([(start, len, entries)],
294
# If start == qstart, then we get the specific node. Otherwise we
295
# get the previous node
297
idx = bisect.bisect_left(data_chunks, (cur_offset,))
298
if data_chunks[idx][0] == cur_offset: # The data starts here
299
data = data_chunks[idx][1][:cur_size]
301
# The data is in a portion of a previous page
303
sub_offset = cur_offset - data_chunks[idx][0]
304
data = data_chunks[idx][1]
305
data = data[sub_offset:sub_offset + cur_size]
307
# We are missing the page where the data should be found,
310
if len(data) != cur_size:
311
raise AssertionError('We must have miscalulated.'
312
' We expected %d bytes, but only found %d'
313
% (cur_size, len(data)))
314
yield cur_offset, data
315
cur_offset, cur_size = offset_iter.next()
134
318
class SFTPTransport(ConnectedTransport):
135
319
"""Transport implementation for SFTP access."""
265
453
does not support ranges > 64K, so it caps the request size, and
266
454
just reads until it gets all the stuff it wants
268
offsets = list(offsets)
269
sorted_offsets = sorted(offsets)
271
# The algorithm works as follows:
272
# 1) Coalesce nearby reads into a single chunk
273
# This generates a list of combined regions, the total size
274
# and the size of the sub regions. This coalescing step is limited
275
# in the number of nearby chunks to combine, and is allowed to
276
# skip small breaks in the requests. Limiting it makes sure that
277
# we can start yielding some data earlier, and skipping means we
278
# make fewer requests. (Beneficial even when using async)
279
# 2) Break up this combined regions into chunks that are smaller
280
# than 64KiB. Technically the limit is 65536, but we are a
281
# little bit conservative. This is because sftp has a maximum
282
# return chunk size of 64KiB (max size of an unsigned short)
283
# 3) Issue a readv() to paramiko to create an async request for
285
# 4) Read in the data as it comes back, until we've read one
286
# continuous section as determined in step 1
287
# 5) Break up the full sections into hunks for the original requested
288
# offsets. And put them in a cache
289
# 6) Check if the next request is in the cache, and if it is, remove
290
# it from the cache, and yield its data. Continue until no more
291
# entries are in the cache.
292
# 7) loop back to step 4 until all data has been read
294
# TODO: jam 20060725 This could be optimized one step further, by
295
# attempting to yield whatever data we have read, even before
296
# the first coallesced section has been fully processed.
298
# When coalescing for use with readv(), we don't really need to
299
# use any fudge factor, because the requests are made asynchronously
300
coalesced = list(self._coalesce_offsets(sorted_offsets,
301
limit=self._max_readv_combine,
305
for c_offset in coalesced:
306
start = c_offset.start
307
size = c_offset.length
309
# We need to break this up into multiple requests
311
next_size = min(size, self._max_request_size)
312
requests.append((start, next_size))
316
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
317
len(offsets), len(coalesced), len(requests))
319
# Queue the current read until we have read the full coalesced section
322
cur_coalesced_stack = iter(coalesced)
323
cur_coalesced = cur_coalesced_stack.next()
325
# Cache the results, but only until they have been fulfilled
327
# turn the list of offsets into a stack
328
offset_stack = iter(offsets)
329
cur_offset_and_size = offset_stack.next()
331
for data in fp.readv(requests):
333
cur_data_len += len(data)
335
if cur_data_len < cur_coalesced.length:
337
if cur_data_len != cur_coalesced.length:
338
raise AssertionError(
339
"Somehow we read too much: %s != %s"
340
% (cur_data_len, cur_coalesced.length))
341
all_data = ''.join(cur_data)
345
for suboffset, subsize in cur_coalesced.ranges:
346
key = (cur_coalesced.start+suboffset, subsize)
347
data_map[key] = all_data[suboffset:suboffset+subsize]
349
# Now that we've read some data, see if we can yield anything back
350
while cur_offset_and_size in data_map:
351
this_data = data_map.pop(cur_offset_and_size)
352
yield cur_offset_and_size[0], this_data
353
cur_offset_and_size = offset_stack.next()
355
# We read a coalesced entry, so mark it as done
357
# Now that we've read all of the data for this coalesced section
359
cur_coalesced = cur_coalesced_stack.next()
361
if cur_coalesced is not None:
362
raise errors.ShortReadvError(relpath, cur_coalesced.start,
363
cur_coalesced.length, len(data))
456
helper = _SFTPReadvHelper(offsets, relpath)
457
return helper.request_and_yield_offsets(fp)
365
459
def put_file(self, relpath, f, mode=None):