~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: John Arbash Meinel
  • Date: 2008-10-30 00:55:00 UTC
  • mto: (3815.2.5 prepare-1.9)
  • mto: This revision was merged to the branch mainline in revision 3811.
  • Revision ID: john@arbash-meinel.com-20081030005500-r5cej1cxflqhs3io
Switch so that we are using a simple timestamp as the first action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
25
# these methods when we officially drop support for those formats.
26
26
 
 
27
import bisect
27
28
import errno
 
29
import itertools
28
30
import os
29
31
import random
30
32
import select
37
39
import warnings
38
40
 
39
41
from bzrlib import (
 
42
    config,
40
43
    errors,
41
44
    urlutils,
42
45
    )
131
134
            pass
132
135
 
133
136
 
 
137
class _SFTPReadvHelper(object):
 
138
    """A class to help with managing the state of a readv request."""
 
139
 
 
140
    # See _get_requests for an explanation.
 
141
    _max_request_size = 32768
 
142
 
 
143
    def __init__(self, original_offsets, relpath):
 
144
        """Create a new readv helper.
 
145
 
 
146
        :param original_offsets: The original requests given by the caller of
 
147
            readv()
 
148
        :param relpath: The name of the file (if known)
 
149
        """
 
150
        self.original_offsets = list(original_offsets)
 
151
        self.relpath = relpath
 
152
 
 
153
    def _get_requests(self):
 
154
        """Break up the offsets into individual requests over sftp.
 
155
 
 
156
        The SFTP spec only requires implementers to support 32kB requests. We
 
157
        could try something larger (openssh supports 64kB), but then we have to
 
158
        handle requests that fail.
 
159
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
160
        asyncronously requests them.
 
161
        Newer versions of paramiko would do the chunking for us, but we want to
 
162
        start processing results right away, so we do it ourselves.
 
163
        """
 
164
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
165
        #       data.  I'm not 100% sure that is the best choice.
 
166
 
 
167
        # The first thing we do, is to collapse the individual requests as much
 
168
        # as possible, so we don't issues requests <32kB
 
169
        sorted_offsets = sorted(self.original_offsets)
 
170
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
171
                                                        limit=0, fudge_factor=0))
 
172
        requests = []
 
173
        for c_offset in coalesced:
 
174
            start = c_offset.start
 
175
            size = c_offset.length
 
176
 
 
177
            # Break this up into 32kB requests
 
178
            while size > 0:
 
179
                next_size = min(size, self._max_request_size)
 
180
                requests.append((start, next_size))
 
181
                size -= next_size
 
182
                start += next_size
 
183
        mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
184
               self.relpath, len(sorted_offsets), len(coalesced),
 
185
               len(requests))
 
186
        return requests
 
187
 
 
188
    def request_and_yield_offsets(self, fp):
 
189
        """Request the data from the remote machine, yielding the results.
 
190
 
 
191
        :param fp: A Paramiko SFTPFile object that supports readv.
 
192
        :return: Yield the data requested by the original readv caller, one by
 
193
            one.
 
194
        """
 
195
        requests = self._get_requests()
 
196
        offset_iter = iter(self.original_offsets)
 
197
        cur_offset, cur_size = offset_iter.next()
 
198
        # paramiko .readv() yields strings that are in the order of the requests
 
199
        # So we track the current request to know where the next data is
 
200
        # being returned from.
 
201
        input_start = None
 
202
        last_end = None
 
203
        buffered_data = []
 
204
        buffered_len = 0
 
205
 
 
206
        # This is used to buffer chunks which we couldn't process yet
 
207
        # It is (start, end, data) tuples.
 
208
        data_chunks = []
 
209
        # Create an 'unlimited' data stream, so we stop based on requests,
 
210
        # rather than just because the data stream ended. This lets us detect
 
211
        # short readv.
 
212
        data_stream = itertools.chain(fp.readv(requests),
 
213
                                      itertools.repeat(None))
 
214
        for (start, length), data in itertools.izip(requests, data_stream):
 
215
            if data is None:
 
216
                if cur_coalesced is not None:
 
217
                    raise errors.ShortReadvError(self.relpath,
 
218
                        start, length, len(data))
 
219
            if len(data) != length:
 
220
                raise errors.ShortReadvError(self.relpath,
 
221
                    start, length, len(data))
 
222
            if last_end is None:
 
223
                # This is the first request, just buffer it
 
224
                buffered_data = [data]
 
225
                buffered_len = length
 
226
                input_start = start
 
227
            elif start == last_end:
 
228
                # The data we are reading fits neatly on the previous
 
229
                # buffer, so this is all part of a larger coalesced range.
 
230
                buffered_data.append(data)
 
231
                buffered_len += length
 
232
            else:
 
233
                # We have an 'interrupt' in the data stream. So we know we are
 
234
                # at a request boundary.
 
235
                if buffered_len > 0:
 
236
                    # We haven't consumed the buffer so far, so put it into
 
237
                    # data_chunks, and continue.
 
238
                    buffered = ''.join(buffered_data)
 
239
                    data_chunks.append((input_start, buffered))
 
240
                input_start = start
 
241
                buffered_data = [data]
 
242
                buffered_len = length
 
243
            last_end = start + length
 
244
            if input_start == cur_offset and cur_size <= buffered_len:
 
245
                # Simplify the next steps a bit by transforming buffered_data
 
246
                # into a single string. We also have the nice property that
 
247
                # when there is only one string ''.join([x]) == x, so there is
 
248
                # no data copying.
 
249
                buffered = ''.join(buffered_data)
 
250
                # Clean out buffered data so that we keep memory
 
251
                # consumption low
 
252
                del buffered_data[:]
 
253
                buffered_offset = 0
 
254
                # TODO: We *could* also consider the case where cur_offset is in
 
255
                #       in the buffered range, even though it doesn't *start*
 
256
                #       the buffered range. But for packs we pretty much always
 
257
                #       read in order, so you won't get any extra data in the
 
258
                #       middle.
 
259
                while (input_start == cur_offset
 
260
                       and (buffered_offset + cur_size) <= buffered_len):
 
261
                    # We've buffered enough data to process this request, spit it
 
262
                    # out
 
263
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
264
                    # move the direct pointer into our buffered data
 
265
                    buffered_offset += cur_size
 
266
                    # Move the start-of-buffer pointer
 
267
                    input_start += cur_size
 
268
                    # Yield the requested data
 
269
                    yield cur_offset, cur_data
 
270
                    cur_offset, cur_size = offset_iter.next()
 
271
                # at this point, we've consumed as much of buffered as we can,
 
272
                # so break off the portion that we consumed
 
273
                if buffered_offset == len(buffered_data):
 
274
                    # No tail to leave behind
 
275
                    buffered_data = []
 
276
                    buffered_len = 0
 
277
                else:
 
278
                    buffered = buffered[buffered_offset:]
 
279
                    buffered_data = [buffered]
 
280
                    buffered_len = len(buffered)
 
281
        if buffered_len:
 
282
            buffered = ''.join(buffered_data)
 
283
            del buffered_data[:]
 
284
            data_chunks.append((input_start, buffered))
 
285
        if data_chunks:
 
286
            mutter('SFTP readv left with %d out-of-order bytes',
 
287
                   sum(map(lambda x: len(x[1]), data_chunks)))
 
288
            # We've processed all the readv data, at this point, anything we
 
289
            # couldn't process is in data_chunks. This doesn't happen often, so
 
290
            # this code path isn't optimized
 
291
            # We use an interesting process for data_chunks
 
292
            # Specifically if we have "bisect_left([(start, len, entries)],
 
293
            #                                       (qstart,)])
 
294
            # If start == qstart, then we get the specific node. Otherwise we
 
295
            # get the previous node
 
296
            while True:
 
297
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
298
                if data_chunks[idx][0] == cur_offset: # The data starts here
 
299
                    data = data_chunks[idx][1][:cur_size]
 
300
                elif idx > 0:
 
301
                    # The data is in a portion of a previous page
 
302
                    idx -= 1
 
303
                    sub_offset = cur_offset - data_chunks[idx][0]
 
304
                    data = data_chunks[idx][1]
 
305
                    data = data[sub_offset:sub_offset + cur_size]
 
306
                else:
 
307
                    # We are missing the page where the data should be found,
 
308
                    # something is wrong
 
309
                    data = ''
 
310
                if len(data) != cur_size:
 
311
                    raise AssertionError('We must have miscalulated.'
 
312
                        ' We expected %d bytes, but only found %d'
 
313
                        % (cur_size, len(data)))
 
314
                yield cur_offset, data
 
315
                cur_offset, cur_size = offset_iter.next()
 
316
 
 
317
 
134
318
class SFTPTransport(ConnectedTransport):
135
319
    """Transport implementation for SFTP access."""
136
320
 
192
376
            password = credentials
193
377
 
194
378
        vendor = ssh._get_ssh_vendor()
 
379
        user = self._user
 
380
        if user is None:
 
381
            auth = config.AuthenticationConfig()
 
382
            user = auth.get_user('ssh', self._host, self._port)
195
383
        connection = vendor.connect_sftp(self._user, password,
196
384
                                         self._host, self._port)
197
 
        return connection, password
 
385
        return connection, (user, password)
198
386
 
199
387
    def _get_sftp(self):
200
388
        """Ensures that a connection is established"""
265
453
        does not support ranges > 64K, so it caps the request size, and
266
454
        just reads until it gets all the stuff it wants
267
455
        """
268
 
        offsets = list(offsets)
269
 
        sorted_offsets = sorted(offsets)
270
 
 
271
 
        # The algorithm works as follows:
272
 
        # 1) Coalesce nearby reads into a single chunk
273
 
        #    This generates a list of combined regions, the total size
274
 
        #    and the size of the sub regions. This coalescing step is limited
275
 
        #    in the number of nearby chunks to combine, and is allowed to
276
 
        #    skip small breaks in the requests. Limiting it makes sure that
277
 
        #    we can start yielding some data earlier, and skipping means we
278
 
        #    make fewer requests. (Beneficial even when using async)
279
 
        # 2) Break up this combined regions into chunks that are smaller
280
 
        #    than 64KiB. Technically the limit is 65536, but we are a
281
 
        #    little bit conservative. This is because sftp has a maximum
282
 
        #    return chunk size of 64KiB (max size of an unsigned short)
283
 
        # 3) Issue a readv() to paramiko to create an async request for
284
 
        #    all of this data
285
 
        # 4) Read in the data as it comes back, until we've read one
286
 
        #    continuous section as determined in step 1
287
 
        # 5) Break up the full sections into hunks for the original requested
288
 
        #    offsets. And put them in a cache
289
 
        # 6) Check if the next request is in the cache, and if it is, remove
290
 
        #    it from the cache, and yield its data. Continue until no more
291
 
        #    entries are in the cache.
292
 
        # 7) loop back to step 4 until all data has been read
293
 
        #
294
 
        # TODO: jam 20060725 This could be optimized one step further, by
295
 
        #       attempting to yield whatever data we have read, even before
296
 
        #       the first coallesced section has been fully processed.
297
 
 
298
 
        # When coalescing for use with readv(), we don't really need to
299
 
        # use any fudge factor, because the requests are made asynchronously
300
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
301
 
                               limit=self._max_readv_combine,
302
 
                               fudge_factor=0,
303
 
                               ))
304
 
        requests = []
305
 
        for c_offset in coalesced:
306
 
            start = c_offset.start
307
 
            size = c_offset.length
308
 
 
309
 
            # We need to break this up into multiple requests
310
 
            while size > 0:
311
 
                next_size = min(size, self._max_request_size)
312
 
                requests.append((start, next_size))
313
 
                size -= next_size
314
 
                start += next_size
315
 
 
316
 
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
317
 
                len(offsets), len(coalesced), len(requests))
318
 
 
319
 
        # Queue the current read until we have read the full coalesced section
320
 
        cur_data = []
321
 
        cur_data_len = 0
322
 
        cur_coalesced_stack = iter(coalesced)
323
 
        cur_coalesced = cur_coalesced_stack.next()
324
 
 
325
 
        # Cache the results, but only until they have been fulfilled
326
 
        data_map = {}
327
 
        # turn the list of offsets into a stack
328
 
        offset_stack = iter(offsets)
329
 
        cur_offset_and_size = offset_stack.next()
330
 
 
331
 
        for data in fp.readv(requests):
332
 
            cur_data += data
333
 
            cur_data_len += len(data)
334
 
 
335
 
            if cur_data_len < cur_coalesced.length:
336
 
                continue
337
 
            if cur_data_len != cur_coalesced.length:
338
 
                raise AssertionError(
339
 
                    "Somehow we read too much: %s != %s" 
340
 
                    % (cur_data_len, cur_coalesced.length))
341
 
            all_data = ''.join(cur_data)
342
 
            cur_data = []
343
 
            cur_data_len = 0
344
 
 
345
 
            for suboffset, subsize in cur_coalesced.ranges:
346
 
                key = (cur_coalesced.start+suboffset, subsize)
347
 
                data_map[key] = all_data[suboffset:suboffset+subsize]
348
 
 
349
 
            # Now that we've read some data, see if we can yield anything back
350
 
            while cur_offset_and_size in data_map:
351
 
                this_data = data_map.pop(cur_offset_and_size)
352
 
                yield cur_offset_and_size[0], this_data
353
 
                cur_offset_and_size = offset_stack.next()
354
 
 
355
 
            # We read a coalesced entry, so mark it as done
356
 
            cur_coalesced = None
357
 
            # Now that we've read all of the data for this coalesced section
358
 
            # on to the next
359
 
            cur_coalesced = cur_coalesced_stack.next()
360
 
 
361
 
        if cur_coalesced is not None:
362
 
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
363
 
                cur_coalesced.length, len(data))
 
456
        helper = _SFTPReadvHelper(offsets, relpath)
 
457
        return helper.request_and_yield_offsets(fp)
364
458
 
365
459
    def put_file(self, relpath, f, mode=None):
366
460
        """
519
613
        try:
520
614
            self._get_sftp().mkdir(abspath, local_mode)
521
615
            if mode is not None:
522
 
                self._get_sftp().chmod(abspath, mode=mode)
 
616
                # chmod a dir through sftp will erase any sgid bit set
 
617
                # on the server side.  So, if the bit mode are already
 
618
                # set, avoid the chmod.  If the mode is not fine but
 
619
                # the sgid bit is set, report a warning to the user
 
620
                # with the umask fix.
 
621
                stat = self._get_sftp().lstat(abspath)
 
622
                mode = mode & 0777 # can't set special bits anyway
 
623
                if mode != stat.st_mode & 0777:
 
624
                    if stat.st_mode & 06000:
 
625
                        warning('About to chmod %s over sftp, which will result'
 
626
                                ' in its suid or sgid bits being cleared.  If'
 
627
                                ' you want to preserve those bits, change your '
 
628
                                ' environment on the server to use umask 0%03o.'
 
629
                                % (abspath, 0777 - mode))
 
630
                    self._get_sftp().chmod(abspath, mode=mode)
523
631
        except (paramiko.SSHException, IOError), e:
524
632
            self._translate_io_exception(e, abspath, ': unable to mkdir',
525
633
                failure_exc=FileExists)
570
678
            if (e.args == ('No such file or directory',) or
571
679
                e.args == ('No such file',)):
572
680
                raise NoSuchFile(path, str(e) + more_info)
573
 
            if (e.args == ('mkdir failed',)):
 
681
            if (e.args == ('mkdir failed',) or
 
682
                e.args[0].startswith('syserr: File exists')):
574
683
                raise FileExists(path, str(e) + more_info)
575
684
            # strange but true, for the paramiko server.
576
685
            if (e.args == ('Failure',)):