136
class _SFTPReadvHelper(object):
137
"""A class to help with managing the state of a readv request."""
139
# See _get_requests for an explanation.
140
_max_request_size = 32768
142
def __init__(self, original_offsets, relpath):
143
"""Create a new readv helper.
145
:param original_offsets: The original requests given by the caller of
147
:param relpath: The name of the file (if known)
149
self.original_offsets = list(original_offsets)
150
self.relpath = relpath
152
def _get_requests(self):
153
"""Break up the offsets into individual requests over sftp.
155
The SFTP spec only requires implementers to support 32kB requests. We
156
could try something larger (openssh supports 64kB), but then we have to
157
handle requests that fail.
158
So instead, we just break up our maximum chunks into 32kB chunks, and
159
asyncronously requests them.
160
Newer versions of paramiko would do the chunking for us, but we want to
161
start processing results right away, so we do it ourselves.
163
# TODO: Because we issue async requests, we don't 'fudge' any extra
164
# data. I'm not 100% sure that is the best choice.
166
# The first thing we do, is to collapse the individual requests as much
167
# as possible, so we don't issues requests <32kB
168
sorted_offsets = sorted(self.original_offsets)
169
coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
170
limit=0, fudge_factor=0))
172
for c_offset in coalesced:
173
start = c_offset.start
174
size = c_offset.length
176
# Break this up into 32kB requests
178
next_size = min(size, self._max_request_size)
179
requests.append((start, next_size))
182
mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
183
self.relpath, len(sorted_offsets), len(coalesced),
187
def request_and_yield_offsets(self, fp):
188
"""Request the data from the remote machine, yielding the results.
190
:param fp: A Paramiko SFTPFile object that supports readv.
191
:return: Yield the data requested by the original readv caller, one by
194
requests = self._get_requests()
195
offset_iter = iter(self.original_offsets)
196
cur_offset, cur_size = offset_iter.next()
197
# paramiko .readv() yields strings that are in the order of the requests
198
# So we track the current request to know where the next data is
199
# being returned from.
205
# This is used to buffer chunks which we couldn't process yet
206
# It is (start, end, data) tuples.
208
# Create an 'unlimited' data stream, so we stop based on requests,
209
# rather than just because the data stream ended. This lets us detect
211
data_stream = itertools.chain(fp.readv(requests),
212
itertools.repeat(None))
213
for (start, length), data in itertools.izip(requests, data_stream):
215
if cur_coalesced is not None:
216
raise errors.ShortReadvError(self.relpath,
217
start, length, len(data))
218
if len(data) != length:
219
raise errors.ShortReadvError(self.relpath,
220
start, length, len(data))
222
# This is the first request, just buffer it
223
buffered_data = [data]
224
buffered_len = length
226
elif start == last_end:
227
# The data we are reading fits neatly on the previous
228
# buffer, so this is all part of a larger coalesced range.
229
buffered_data.append(data)
230
buffered_len += length
232
# We have an 'interrupt' in the data stream. So we know we are
233
# at a request boundary.
235
# We haven't consumed the buffer so far, so put it into
236
# data_chunks, and continue.
237
buffered = ''.join(buffered_data)
238
data_chunks.append((input_start, buffered))
240
buffered_data = [data]
241
buffered_len = length
242
last_end = start + length
243
if input_start == cur_offset and cur_size <= buffered_len:
244
# Simplify the next steps a bit by transforming buffered_data
245
# into a single string. We also have the nice property that
246
# when there is only one string ''.join([x]) == x, so there is
248
buffered = ''.join(buffered_data)
249
# Clean out buffered data so that we keep memory
253
# TODO: We *could* also consider the case where cur_offset is in
254
# in the buffered range, even though it doesn't *start*
255
# the buffered range. But for packs we pretty much always
256
# read in order, so you won't get any extra data in the
258
while (input_start == cur_offset
259
and (buffered_offset + cur_size) <= buffered_len):
260
# We've buffered enough data to process this request, spit it
262
cur_data = buffered[buffered_offset:buffered_offset + cur_size]
263
# move the direct pointer into our buffered data
264
buffered_offset += cur_size
265
# Move the start-of-buffer pointer
266
input_start += cur_size
267
# Yield the requested data
268
yield cur_offset, cur_data
269
cur_offset, cur_size = offset_iter.next()
270
# at this point, we've consumed as much of buffered as we can,
271
# so break off the portion that we consumed
272
if buffered_offset == len(buffered_data):
273
# No tail to leave behind
277
buffered = buffered[buffered_offset:]
278
buffered_data = [buffered]
279
buffered_len = len(buffered)
281
buffered = ''.join(buffered_data)
283
data_chunks.append((input_start, buffered))
285
mutter('SFTP readv left with %d out-of-order bytes',
286
sum(map(lambda x: len(x[1]), data_chunks)))
287
# We've processed all the readv data, at this point, anything we
288
# couldn't process is in data_chunks. This doesn't happen often, so
289
# this code path isn't optimized
290
# We use an interesting process for data_chunks
291
# Specifically if we have "bisect_left([(start, len, entries)],
293
# If start == qstart, then we get the specific node. Otherwise we
294
# get the previous node
296
idx = bisect.bisect_left(data_chunks, (cur_offset,))
297
if data_chunks[idx][0] == cur_offset: # The data starts here
298
data = data_chunks[idx][1][:cur_size]
300
# The data is in a portion of a previous page
302
sub_offset = cur_offset - data_chunks[idx][0]
303
data = data_chunks[idx][1]
304
data = data[sub_offset:sub_offset + cur_size]
306
# We are missing the page where the data should be found,
309
if len(data) != cur_size:
310
raise AssertionError('We must have miscalulated.'
311
' We expected %d bytes, but only found %d'
312
% (cur_size, len(data)))
313
yield cur_offset, data
314
cur_offset, cur_size = offset_iter.next()
134
317
class SFTPTransport(ConnectedTransport):
135
318
"""Transport implementation for SFTP access."""
265
448
does not support ranges > 64K, so it caps the request size, and
266
449
just reads until it gets all the stuff it wants
268
offsets = list(offsets)
269
sorted_offsets = sorted(offsets)
271
# The algorithm works as follows:
272
# 1) Coalesce nearby reads into a single chunk
273
# This generates a list of combined regions, the total size
274
# and the size of the sub regions. This coalescing step is limited
275
# in the number of nearby chunks to combine, and is allowed to
276
# skip small breaks in the requests. Limiting it makes sure that
277
# we can start yielding some data earlier, and skipping means we
278
# make fewer requests. (Beneficial even when using async)
279
# 2) Break up this combined regions into chunks that are smaller
280
# than 64KiB. Technically the limit is 65536, but we are a
281
# little bit conservative. This is because sftp has a maximum
282
# return chunk size of 64KiB (max size of an unsigned short)
283
# 3) Issue a readv() to paramiko to create an async request for
285
# 4) Read in the data as it comes back, until we've read one
286
# continuous section as determined in step 1
287
# 5) Break up the full sections into hunks for the original requested
288
# offsets. And put them in a cache
289
# 6) Check if the next request is in the cache, and if it is, remove
290
# it from the cache, and yield its data. Continue until no more
291
# entries are in the cache.
292
# 7) loop back to step 4 until all data has been read
294
# TODO: jam 20060725 This could be optimized one step further, by
295
# attempting to yield whatever data we have read, even before
296
# the first coallesced section has been fully processed.
298
# When coalescing for use with readv(), we don't really need to
299
# use any fudge factor, because the requests are made asynchronously
300
coalesced = list(self._coalesce_offsets(sorted_offsets,
301
limit=self._max_readv_combine,
305
for c_offset in coalesced:
306
start = c_offset.start
307
size = c_offset.length
309
# We need to break this up into multiple requests
311
next_size = min(size, self._max_request_size)
312
requests.append((start, next_size))
316
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
317
len(offsets), len(coalesced), len(requests))
319
# Queue the current read until we have read the full coalesced section
322
cur_coalesced_stack = iter(coalesced)
323
cur_coalesced = cur_coalesced_stack.next()
325
# Cache the results, but only until they have been fulfilled
327
# turn the list of offsets into a stack
328
offset_stack = iter(offsets)
329
cur_offset_and_size = offset_stack.next()
331
for data in fp.readv(requests):
332
cur_data.append(data)
333
cur_data_len += len(data)
335
if cur_data_len < cur_coalesced.length:
337
if cur_data_len != cur_coalesced.length:
338
raise AssertionError(
339
"Somehow we read too much: %s != %s"
340
% (cur_data_len, cur_coalesced.length))
341
all_data = ''.join(cur_data)
345
for suboffset, subsize in cur_coalesced.ranges:
346
key = (cur_coalesced.start+suboffset, subsize)
347
data_map[key] = all_data[suboffset:suboffset+subsize]
349
# Now that we've read some data, see if we can yield anything back
350
while cur_offset_and_size in data_map:
351
this_data = data_map.pop(cur_offset_and_size)
352
yield cur_offset_and_size[0], this_data
353
cur_offset_and_size = offset_stack.next()
355
# We read a coalesced entry, so mark it as done
357
# Now that we've read all of the data for this coalesced section
359
cur_coalesced = cur_coalesced_stack.next()
361
if cur_coalesced is not None:
362
raise errors.ShortReadvError(relpath, cur_coalesced.start,
363
cur_coalesced.length, len(data))
451
helper = _SFTPReadvHelper(offsets, relpath)
452
return helper.request_and_yield_offsets(fp)
365
454
def put_file(self, relpath, f, mode=None):