~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-06-05 04:05:05 UTC
  • mfrom: (3473.1.1 ianc-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20080605040505-i9kqxg2fps2qjdi0
Add the 'alias' command (Tim Penhey)

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
25
# these methods when we officially drop support for those formats.
26
26
 
27
 
import bisect
28
27
import errno
29
 
import itertools
30
28
import os
31
29
import random
32
30
import select
39
37
import warnings
40
38
 
41
39
from bzrlib import (
42
 
    config,
43
40
    errors,
44
41
    urlutils,
45
42
    )
134
131
            pass
135
132
 
136
133
 
137
 
class _SFTPReadvHelper(object):
138
 
    """A class to help with managing the state of a readv request."""
139
 
 
140
 
    # See _get_requests for an explanation.
141
 
    _max_request_size = 32768
142
 
 
143
 
    def __init__(self, original_offsets, relpath):
144
 
        """Create a new readv helper.
145
 
 
146
 
        :param original_offsets: The original requests given by the caller of
147
 
            readv()
148
 
        :param relpath: The name of the file (if known)
149
 
        """
150
 
        self.original_offsets = list(original_offsets)
151
 
        self.relpath = relpath
152
 
 
153
 
    def _get_requests(self):
154
 
        """Break up the offsets into individual requests over sftp.
155
 
 
156
 
        The SFTP spec only requires implementers to support 32kB requests. We
157
 
        could try something larger (openssh supports 64kB), but then we have to
158
 
        handle requests that fail.
159
 
        So instead, we just break up our maximum chunks into 32kB chunks, and
160
 
        asyncronously requests them.
161
 
        Newer versions of paramiko would do the chunking for us, but we want to
162
 
        start processing results right away, so we do it ourselves.
163
 
        """
164
 
        # TODO: Because we issue async requests, we don't 'fudge' any extra
165
 
        #       data.  I'm not 100% sure that is the best choice.
166
 
 
167
 
        # The first thing we do, is to collapse the individual requests as much
168
 
        # as possible, so we don't issues requests <32kB
169
 
        sorted_offsets = sorted(self.original_offsets)
170
 
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
171
 
                                                        limit=0, fudge_factor=0))
172
 
        requests = []
173
 
        for c_offset in coalesced:
174
 
            start = c_offset.start
175
 
            size = c_offset.length
176
 
 
177
 
            # Break this up into 32kB requests
178
 
            while size > 0:
179
 
                next_size = min(size, self._max_request_size)
180
 
                requests.append((start, next_size))
181
 
                size -= next_size
182
 
                start += next_size
183
 
        mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
184
 
               self.relpath, len(sorted_offsets), len(coalesced),
185
 
               len(requests))
186
 
        return requests
187
 
 
188
 
    def request_and_yield_offsets(self, fp):
189
 
        """Request the data from the remote machine, yielding the results.
190
 
 
191
 
        :param fp: A Paramiko SFTPFile object that supports readv.
192
 
        :return: Yield the data requested by the original readv caller, one by
193
 
            one.
194
 
        """
195
 
        requests = self._get_requests()
196
 
        offset_iter = iter(self.original_offsets)
197
 
        cur_offset, cur_size = offset_iter.next()
198
 
        # paramiko .readv() yields strings that are in the order of the requests
199
 
        # So we track the current request to know where the next data is
200
 
        # being returned from.
201
 
        input_start = None
202
 
        last_end = None
203
 
        buffered_data = []
204
 
        buffered_len = 0
205
 
 
206
 
        # This is used to buffer chunks which we couldn't process yet
207
 
        # It is (start, end, data) tuples.
208
 
        data_chunks = []
209
 
        # Create an 'unlimited' data stream, so we stop based on requests,
210
 
        # rather than just because the data stream ended. This lets us detect
211
 
        # short readv.
212
 
        data_stream = itertools.chain(fp.readv(requests),
213
 
                                      itertools.repeat(None))
214
 
        for (start, length), data in itertools.izip(requests, data_stream):
215
 
            if data is None:
216
 
                if cur_coalesced is not None:
217
 
                    raise errors.ShortReadvError(self.relpath,
218
 
                        start, length, len(data))
219
 
            if len(data) != length:
220
 
                raise errors.ShortReadvError(self.relpath,
221
 
                    start, length, len(data))
222
 
            if last_end is None:
223
 
                # This is the first request, just buffer it
224
 
                buffered_data = [data]
225
 
                buffered_len = length
226
 
                input_start = start
227
 
            elif start == last_end:
228
 
                # The data we are reading fits neatly on the previous
229
 
                # buffer, so this is all part of a larger coalesced range.
230
 
                buffered_data.append(data)
231
 
                buffered_len += length
232
 
            else:
233
 
                # We have an 'interrupt' in the data stream. So we know we are
234
 
                # at a request boundary.
235
 
                if buffered_len > 0:
236
 
                    # We haven't consumed the buffer so far, so put it into
237
 
                    # data_chunks, and continue.
238
 
                    buffered = ''.join(buffered_data)
239
 
                    data_chunks.append((input_start, buffered))
240
 
                input_start = start
241
 
                buffered_data = [data]
242
 
                buffered_len = length
243
 
            last_end = start + length
244
 
            if input_start == cur_offset and cur_size <= buffered_len:
245
 
                # Simplify the next steps a bit by transforming buffered_data
246
 
                # into a single string. We also have the nice property that
247
 
                # when there is only one string ''.join([x]) == x, so there is
248
 
                # no data copying.
249
 
                buffered = ''.join(buffered_data)
250
 
                # Clean out buffered data so that we keep memory
251
 
                # consumption low
252
 
                del buffered_data[:]
253
 
                buffered_offset = 0
254
 
                # TODO: We *could* also consider the case where cur_offset is in
255
 
                #       in the buffered range, even though it doesn't *start*
256
 
                #       the buffered range. But for packs we pretty much always
257
 
                #       read in order, so you won't get any extra data in the
258
 
                #       middle.
259
 
                while (input_start == cur_offset
260
 
                       and (buffered_offset + cur_size) <= buffered_len):
261
 
                    # We've buffered enough data to process this request, spit it
262
 
                    # out
263
 
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
264
 
                    # move the direct pointer into our buffered data
265
 
                    buffered_offset += cur_size
266
 
                    # Move the start-of-buffer pointer
267
 
                    input_start += cur_size
268
 
                    # Yield the requested data
269
 
                    yield cur_offset, cur_data
270
 
                    cur_offset, cur_size = offset_iter.next()
271
 
                # at this point, we've consumed as much of buffered as we can,
272
 
                # so break off the portion that we consumed
273
 
                if buffered_offset == len(buffered_data):
274
 
                    # No tail to leave behind
275
 
                    buffered_data = []
276
 
                    buffered_len = 0
277
 
                else:
278
 
                    buffered = buffered[buffered_offset:]
279
 
                    buffered_data = [buffered]
280
 
                    buffered_len = len(buffered)
281
 
        if buffered_len:
282
 
            buffered = ''.join(buffered_data)
283
 
            del buffered_data[:]
284
 
            data_chunks.append((input_start, buffered))
285
 
        if data_chunks:
286
 
            mutter('SFTP readv left with %d out-of-order bytes',
287
 
                   sum(map(lambda x: len(x[1]), data_chunks)))
288
 
            # We've processed all the readv data, at this point, anything we
289
 
            # couldn't process is in data_chunks. This doesn't happen often, so
290
 
            # this code path isn't optimized
291
 
            # We use an interesting process for data_chunks
292
 
            # Specifically if we have "bisect_left([(start, len, entries)],
293
 
            #                                       (qstart,)])
294
 
            # If start == qstart, then we get the specific node. Otherwise we
295
 
            # get the previous node
296
 
            while True:
297
 
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
298
 
                if data_chunks[idx][0] == cur_offset: # The data starts here
299
 
                    data = data_chunks[idx][1][:cur_size]
300
 
                elif idx > 0:
301
 
                    # The data is in a portion of a previous page
302
 
                    idx -= 1
303
 
                    sub_offset = cur_offset - data_chunks[idx][0]
304
 
                    data = data_chunks[idx][1]
305
 
                    data = data[sub_offset:sub_offset + cur_size]
306
 
                else:
307
 
                    # We are missing the page where the data should be found,
308
 
                    # something is wrong
309
 
                    data = ''
310
 
                if len(data) != cur_size:
311
 
                    raise AssertionError('We must have miscalulated.'
312
 
                        ' We expected %d bytes, but only found %d'
313
 
                        % (cur_size, len(data)))
314
 
                yield cur_offset, data
315
 
                cur_offset, cur_size = offset_iter.next()
316
 
 
317
 
 
318
134
class SFTPTransport(ConnectedTransport):
319
135
    """Transport implementation for SFTP access."""
320
136
 
376
192
            password = credentials
377
193
 
378
194
        vendor = ssh._get_ssh_vendor()
379
 
        user = self._user
380
 
        if user is None:
381
 
            auth = config.AuthenticationConfig()
382
 
            user = auth.get_user('ssh', self._host, self._port)
383
195
        connection = vendor.connect_sftp(self._user, password,
384
196
                                         self._host, self._port)
385
 
        return connection, (user, password)
 
197
        return connection, password
386
198
 
387
199
    def _get_sftp(self):
388
200
        """Ensures that a connection is established"""
453
265
        does not support ranges > 64K, so it caps the request size, and
454
266
        just reads until it gets all the stuff it wants
455
267
        """
456
 
        helper = _SFTPReadvHelper(offsets, relpath)
457
 
        return helper.request_and_yield_offsets(fp)
 
268
        offsets = list(offsets)
 
269
        sorted_offsets = sorted(offsets)
 
270
 
 
271
        # The algorithm works as follows:
 
272
        # 1) Coalesce nearby reads into a single chunk
 
273
        #    This generates a list of combined regions, the total size
 
274
        #    and the size of the sub regions. This coalescing step is limited
 
275
        #    in the number of nearby chunks to combine, and is allowed to
 
276
        #    skip small breaks in the requests. Limiting it makes sure that
 
277
        #    we can start yielding some data earlier, and skipping means we
 
278
        #    make fewer requests. (Beneficial even when using async)
 
279
        # 2) Break up this combined regions into chunks that are smaller
 
280
        #    than 64KiB. Technically the limit is 65536, but we are a
 
281
        #    little bit conservative. This is because sftp has a maximum
 
282
        #    return chunk size of 64KiB (max size of an unsigned short)
 
283
        # 3) Issue a readv() to paramiko to create an async request for
 
284
        #    all of this data
 
285
        # 4) Read in the data as it comes back, until we've read one
 
286
        #    continuous section as determined in step 1
 
287
        # 5) Break up the full sections into hunks for the original requested
 
288
        #    offsets. And put them in a cache
 
289
        # 6) Check if the next request is in the cache, and if it is, remove
 
290
        #    it from the cache, and yield its data. Continue until no more
 
291
        #    entries are in the cache.
 
292
        # 7) loop back to step 4 until all data has been read
 
293
        #
 
294
        # TODO: jam 20060725 This could be optimized one step further, by
 
295
        #       attempting to yield whatever data we have read, even before
 
296
        #       the first coallesced section has been fully processed.
 
297
 
 
298
        # When coalescing for use with readv(), we don't really need to
 
299
        # use any fudge factor, because the requests are made asynchronously
 
300
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
301
                               limit=self._max_readv_combine,
 
302
                               fudge_factor=0,
 
303
                               ))
 
304
        requests = []
 
305
        for c_offset in coalesced:
 
306
            start = c_offset.start
 
307
            size = c_offset.length
 
308
 
 
309
            # We need to break this up into multiple requests
 
310
            while size > 0:
 
311
                next_size = min(size, self._max_request_size)
 
312
                requests.append((start, next_size))
 
313
                size -= next_size
 
314
                start += next_size
 
315
 
 
316
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
317
                len(offsets), len(coalesced), len(requests))
 
318
 
 
319
        # Queue the current read until we have read the full coalesced section
 
320
        cur_data = []
 
321
        cur_data_len = 0
 
322
        cur_coalesced_stack = iter(coalesced)
 
323
        cur_coalesced = cur_coalesced_stack.next()
 
324
 
 
325
        # Cache the results, but only until they have been fulfilled
 
326
        data_map = {}
 
327
        # turn the list of offsets into a stack
 
328
        offset_stack = iter(offsets)
 
329
        cur_offset_and_size = offset_stack.next()
 
330
 
 
331
        for data in fp.readv(requests):
 
332
            cur_data += data
 
333
            cur_data_len += len(data)
 
334
 
 
335
            if cur_data_len < cur_coalesced.length:
 
336
                continue
 
337
            if cur_data_len != cur_coalesced.length:
 
338
                raise AssertionError(
 
339
                    "Somehow we read too much: %s != %s" 
 
340
                    % (cur_data_len, cur_coalesced.length))
 
341
            all_data = ''.join(cur_data)
 
342
            cur_data = []
 
343
            cur_data_len = 0
 
344
 
 
345
            for suboffset, subsize in cur_coalesced.ranges:
 
346
                key = (cur_coalesced.start+suboffset, subsize)
 
347
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
348
 
 
349
            # Now that we've read some data, see if we can yield anything back
 
350
            while cur_offset_and_size in data_map:
 
351
                this_data = data_map.pop(cur_offset_and_size)
 
352
                yield cur_offset_and_size[0], this_data
 
353
                cur_offset_and_size = offset_stack.next()
 
354
 
 
355
            # We read a coalesced entry, so mark it as done
 
356
            cur_coalesced = None
 
357
            # Now that we've read all of the data for this coalesced section
 
358
            # on to the next
 
359
            cur_coalesced = cur_coalesced_stack.next()
 
360
 
 
361
        if cur_coalesced is not None:
 
362
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
 
363
                cur_coalesced.length, len(data))
458
364
 
459
365
    def put_file(self, relpath, f, mode=None):
460
366
        """
613
519
        try:
614
520
            self._get_sftp().mkdir(abspath, local_mode)
615
521
            if mode is not None:
616
 
                # chmod a dir through sftp will erase any sgid bit set
617
 
                # on the server side.  So, if the bit mode are already
618
 
                # set, avoid the chmod.  If the mode is not fine but
619
 
                # the sgid bit is set, report a warning to the user
620
 
                # with the umask fix.
621
 
                stat = self._get_sftp().lstat(abspath)
622
 
                mode = mode & 0777 # can't set special bits anyway
623
 
                if mode != stat.st_mode & 0777:
624
 
                    if stat.st_mode & 06000:
625
 
                        warning('About to chmod %s over sftp, which will result'
626
 
                                ' in its suid or sgid bits being cleared.  If'
627
 
                                ' you want to preserve those bits, change your '
628
 
                                ' environment on the server to use umask 0%03o.'
629
 
                                % (abspath, 0777 - mode))
630
 
                    self._get_sftp().chmod(abspath, mode=mode)
 
522
                self._get_sftp().chmod(abspath, mode=mode)
631
523
        except (paramiko.SSHException, IOError), e:
632
524
            self._translate_io_exception(e, abspath, ': unable to mkdir',
633
525
                failure_exc=FileExists)
678
570
            if (e.args == ('No such file or directory',) or
679
571
                e.args == ('No such file',)):
680
572
                raise NoSuchFile(path, str(e) + more_info)
681
 
            if (e.args == ('mkdir failed',) or
682
 
                e.args[0].startswith('syserr: File exists')):
 
573
            if (e.args == ('mkdir failed',)):
683
574
                raise FileExists(path, str(e) + more_info)
684
575
            # strange but true, for the paramiko server.
685
576
            if (e.args == ('Failure',)):