~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
25
# these methods when we officially drop support for those formats.
26
26
 
 
27
import bisect
27
28
import errno
 
29
import itertools
28
30
import os
29
31
import random
30
32
import select
131
133
            pass
132
134
 
133
135
 
 
136
class _SFTPReadvHelper(object):
 
137
    """A class to help with managing the state of a readv request."""
 
138
 
 
139
    # See _get_requests for an explanation.
 
140
    _max_request_size = 32768
 
141
 
 
142
    def __init__(self, original_offsets, relpath):
 
143
        """Create a new readv helper.
 
144
 
 
145
        :param original_offsets: The original requests given by the caller of
 
146
            readv()
 
147
        :param relpath: The name of the file (if known)
 
148
        """
 
149
        self.original_offsets = list(original_offsets)
 
150
        self.relpath = relpath
 
151
 
 
152
    def _get_requests(self):
 
153
        """Break up the offsets into individual requests over sftp.
 
154
 
 
155
        The SFTP spec only requires implementers to support 32kB requests. We
 
156
        could try something larger (openssh supports 64kB), but then we have to
 
157
        handle requests that fail.
 
158
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
159
        asyncronously requests them.
 
160
        Newer versions of paramiko would do the chunking for us, but we want to
 
161
        start processing results right away, so we do it ourselves.
 
162
        """
 
163
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
164
        #       data.  I'm not 100% sure that is the best choice.
 
165
 
 
166
        # The first thing we do, is to collapse the individual requests as much
 
167
        # as possible, so we don't issues requests <32kB
 
168
        sorted_offsets = sorted(self.original_offsets)
 
169
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
170
                                                        limit=0, fudge_factor=0))
 
171
        requests = []
 
172
        for c_offset in coalesced:
 
173
            start = c_offset.start
 
174
            size = c_offset.length
 
175
 
 
176
            # Break this up into 32kB requests
 
177
            while size > 0:
 
178
                next_size = min(size, self._max_request_size)
 
179
                requests.append((start, next_size))
 
180
                size -= next_size
 
181
                start += next_size
 
182
        mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
183
               self.relpath, len(sorted_offsets), len(coalesced),
 
184
               len(requests))
 
185
        return requests
 
186
 
 
187
    def request_and_yield_offsets(self, fp):
 
188
        """Request the data from the remote machine, yielding the results.
 
189
 
 
190
        :param fp: A Paramiko SFTPFile object that supports readv.
 
191
        :return: Yield the data requested by the original readv caller, one by
 
192
            one.
 
193
        """
 
194
        requests = self._get_requests()
 
195
        offset_iter = iter(self.original_offsets)
 
196
        cur_offset, cur_size = offset_iter.next()
 
197
        # paramiko .readv() yields strings that are in the order of the requests
 
198
        # So we track the current request to know where the next data is
 
199
        # being returned from.
 
200
        input_start = None
 
201
        last_end = None
 
202
        buffered_data = []
 
203
        buffered_len = 0
 
204
 
 
205
        # This is used to buffer chunks which we couldn't process yet
 
206
        # It is (start, end, data) tuples.
 
207
        data_chunks = []
 
208
        # Create an 'unlimited' data stream, so we stop based on requests,
 
209
        # rather than just because the data stream ended. This lets us detect
 
210
        # short readv.
 
211
        data_stream = itertools.chain(fp.readv(requests),
 
212
                                      itertools.repeat(None))
 
213
        for (start, length), data in itertools.izip(requests, data_stream):
 
214
            if data is None:
 
215
                if cur_coalesced is not None:
 
216
                    raise errors.ShortReadvError(self.relpath,
 
217
                        start, length, len(data))
 
218
            if len(data) != length:
 
219
                raise errors.ShortReadvError(self.relpath,
 
220
                    start, length, len(data))
 
221
            if last_end is None:
 
222
                # This is the first request, just buffer it
 
223
                buffered_data = [data]
 
224
                buffered_len = length
 
225
                input_start = start
 
226
            elif start == last_end:
 
227
                # The data we are reading fits neatly on the previous
 
228
                # buffer, so this is all part of a larger coalesced range.
 
229
                buffered_data.append(data)
 
230
                buffered_len += length
 
231
            else:
 
232
                # We have an 'interrupt' in the data stream. So we know we are
 
233
                # at a request boundary.
 
234
                if buffered_len > 0:
 
235
                    # We haven't consumed the buffer so far, so put it into
 
236
                    # data_chunks, and continue.
 
237
                    buffered = ''.join(buffered_data)
 
238
                    data_chunks.append((input_start, buffered))
 
239
                input_start = start
 
240
                buffered_data = [data]
 
241
                buffered_len = length
 
242
            last_end = start + length
 
243
            if input_start == cur_offset and cur_size <= buffered_len:
 
244
                # Simplify the next steps a bit by transforming buffered_data
 
245
                # into a single string. We also have the nice property that
 
246
                # when there is only one string ''.join([x]) == x, so there is
 
247
                # no data copying.
 
248
                buffered = ''.join(buffered_data)
 
249
                # Clean out buffered data so that we keep memory
 
250
                # consumption low
 
251
                del buffered_data[:]
 
252
                buffered_offset = 0
 
253
                # TODO: We *could* also consider the case where cur_offset is in
 
254
                #       in the buffered range, even though it doesn't *start*
 
255
                #       the buffered range. But for packs we pretty much always
 
256
                #       read in order, so you won't get any extra data in the
 
257
                #       middle.
 
258
                while (input_start == cur_offset
 
259
                       and (buffered_offset + cur_size) <= buffered_len):
 
260
                    # We've buffered enough data to process this request, spit it
 
261
                    # out
 
262
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
263
                    # move the direct pointer into our buffered data
 
264
                    buffered_offset += cur_size
 
265
                    # Move the start-of-buffer pointer
 
266
                    input_start += cur_size
 
267
                    # Yield the requested data
 
268
                    yield cur_offset, cur_data
 
269
                    cur_offset, cur_size = offset_iter.next()
 
270
                # at this point, we've consumed as much of buffered as we can,
 
271
                # so break off the portion that we consumed
 
272
                if buffered_offset == len(buffered_data):
 
273
                    # No tail to leave behind
 
274
                    buffered_data = []
 
275
                    buffered_len = 0
 
276
                else:
 
277
                    buffered = buffered[buffered_offset:]
 
278
                    buffered_data = [buffered]
 
279
                    buffered_len = len(buffered)
 
280
        if buffered_len:
 
281
            buffered = ''.join(buffered_data)
 
282
            del buffered_data[:]
 
283
            data_chunks.append((input_start, buffered))
 
284
        if data_chunks:
 
285
            mutter('SFTP readv left with %d out-of-order bytes',
 
286
                   sum(map(lambda x: len(x[1]), data_chunks)))
 
287
            # We've processed all the readv data, at this point, anything we
 
288
            # couldn't process is in data_chunks. This doesn't happen often, so
 
289
            # this code path isn't optimized
 
290
            # We use an interesting process for data_chunks
 
291
            # Specifically if we have "bisect_left([(start, len, entries)],
 
292
            #                                       (qstart,)])
 
293
            # If start == qstart, then we get the specific node. Otherwise we
 
294
            # get the previous node
 
295
            while True:
 
296
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
297
                if data_chunks[idx][0] == cur_offset: # The data starts here
 
298
                    data = data_chunks[idx][1][:cur_size]
 
299
                elif idx > 0:
 
300
                    # The data is in a portion of a previous page
 
301
                    idx -= 1
 
302
                    sub_offset = cur_offset - data_chunks[idx][0]
 
303
                    data = data_chunks[idx][1]
 
304
                    data = data[sub_offset:sub_offset + cur_size]
 
305
                else:
 
306
                    # We are missing the page where the data should be found,
 
307
                    # something is wrong
 
308
                    data = ''
 
309
                if len(data) != cur_size:
 
310
                    raise AssertionError('We must have miscalulated.'
 
311
                        ' We expected %d bytes, but only found %d'
 
312
                        % (cur_size, len(data)))
 
313
                yield cur_offset, data
 
314
                cur_offset, cur_size = offset_iter.next()
 
315
 
 
316
 
134
317
class SFTPTransport(ConnectedTransport):
135
318
    """Transport implementation for SFTP access."""
136
319
 
265
448
        does not support ranges > 64K, so it caps the request size, and
266
449
        just reads until it gets all the stuff it wants
267
450
        """
268
 
        offsets = list(offsets)
269
 
        sorted_offsets = sorted(offsets)
270
 
 
271
 
        # The algorithm works as follows:
272
 
        # 1) Coalesce nearby reads into a single chunk
273
 
        #    This generates a list of combined regions, the total size
274
 
        #    and the size of the sub regions. This coalescing step is limited
275
 
        #    in the number of nearby chunks to combine, and is allowed to
276
 
        #    skip small breaks in the requests. Limiting it makes sure that
277
 
        #    we can start yielding some data earlier, and skipping means we
278
 
        #    make fewer requests. (Beneficial even when using async)
279
 
        # 2) Break up this combined regions into chunks that are smaller
280
 
        #    than 64KiB. Technically the limit is 65536, but we are a
281
 
        #    little bit conservative. This is because sftp has a maximum
282
 
        #    return chunk size of 64KiB (max size of an unsigned short)
283
 
        # 3) Issue a readv() to paramiko to create an async request for
284
 
        #    all of this data
285
 
        # 4) Read in the data as it comes back, until we've read one
286
 
        #    continuous section as determined in step 1
287
 
        # 5) Break up the full sections into hunks for the original requested
288
 
        #    offsets. And put them in a cache
289
 
        # 6) Check if the next request is in the cache, and if it is, remove
290
 
        #    it from the cache, and yield its data. Continue until no more
291
 
        #    entries are in the cache.
292
 
        # 7) loop back to step 4 until all data has been read
293
 
        #
294
 
        # TODO: jam 20060725 This could be optimized one step further, by
295
 
        #       attempting to yield whatever data we have read, even before
296
 
        #       the first coallesced section has been fully processed.
297
 
 
298
 
        # When coalescing for use with readv(), we don't really need to
299
 
        # use any fudge factor, because the requests are made asynchronously
300
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
301
 
                               limit=self._max_readv_combine,
302
 
                               fudge_factor=0,
303
 
                               ))
304
 
        requests = []
305
 
        for c_offset in coalesced:
306
 
            start = c_offset.start
307
 
            size = c_offset.length
308
 
 
309
 
            # We need to break this up into multiple requests
310
 
            while size > 0:
311
 
                next_size = min(size, self._max_request_size)
312
 
                requests.append((start, next_size))
313
 
                size -= next_size
314
 
                start += next_size
315
 
 
316
 
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
317
 
                len(offsets), len(coalesced), len(requests))
318
 
 
319
 
        # Queue the current read until we have read the full coalesced section
320
 
        cur_data = []
321
 
        cur_data_len = 0
322
 
        cur_coalesced_stack = iter(coalesced)
323
 
        cur_coalesced = cur_coalesced_stack.next()
324
 
 
325
 
        # Cache the results, but only until they have been fulfilled
326
 
        data_map = {}
327
 
        # turn the list of offsets into a stack
328
 
        offset_stack = iter(offsets)
329
 
        cur_offset_and_size = offset_stack.next()
330
 
 
331
 
        for data in fp.readv(requests):
332
 
            cur_data.append(data)
333
 
            cur_data_len += len(data)
334
 
 
335
 
            if cur_data_len < cur_coalesced.length:
336
 
                continue
337
 
            if cur_data_len != cur_coalesced.length:
338
 
                raise AssertionError(
339
 
                    "Somehow we read too much: %s != %s" 
340
 
                    % (cur_data_len, cur_coalesced.length))
341
 
            all_data = ''.join(cur_data)
342
 
            cur_data = []
343
 
            cur_data_len = 0
344
 
 
345
 
            for suboffset, subsize in cur_coalesced.ranges:
346
 
                key = (cur_coalesced.start+suboffset, subsize)
347
 
                data_map[key] = all_data[suboffset:suboffset+subsize]
348
 
 
349
 
            # Now that we've read some data, see if we can yield anything back
350
 
            while cur_offset_and_size in data_map:
351
 
                this_data = data_map.pop(cur_offset_and_size)
352
 
                yield cur_offset_and_size[0], this_data
353
 
                cur_offset_and_size = offset_stack.next()
354
 
 
355
 
            # We read a coalesced entry, so mark it as done
356
 
            cur_coalesced = None
357
 
            # Now that we've read all of the data for this coalesced section
358
 
            # on to the next
359
 
            cur_coalesced = cur_coalesced_stack.next()
360
 
 
361
 
        if cur_coalesced is not None:
362
 
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
363
 
                cur_coalesced.length, len(data))
 
451
        helper = _SFTPReadvHelper(offsets, relpath)
 
452
        return helper.request_and_yield_offsets(fp)
364
453
 
365
454
    def put_file(self, relpath, f, mode=None):
366
455
        """