~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Martin Pool
  • Date: 2007-04-04 06:17:31 UTC
  • mto: This revision was merged to the branch mainline in revision 2397.
  • Revision ID: mbp@sourcefrog.net-20070404061731-tt2xrzllqhbodn83
Contents of TODO file moved into bug tracker

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
3
#
3
4
# This program is free software; you can redistribute it and/or modify
4
5
# it under the terms of the GNU General Public License as published by
12
13
#
13
14
# You should have received a copy of the GNU General Public License
14
15
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
 
17
18
"""Implementation of Transport over SFTP, using paramiko."""
18
19
 
23
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
24
25
# these methods when we officially drop support for those formats.
25
26
 
26
 
import bisect
27
27
import errno
28
 
import itertools
29
28
import os
30
29
import random
31
30
import select
35
34
import time
36
35
import urllib
37
36
import urlparse
38
 
import warnings
 
37
import weakref
39
38
 
40
39
from bzrlib import (
41
 
    config,
42
 
    debug,
43
40
    errors,
44
41
    urlutils,
45
42
    )
51
48
                           ParamikoNotPresent,
52
49
                           )
53
50
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
54
 
from bzrlib.symbol_versioning import (
55
 
        deprecated_function,
56
 
        )
57
51
from bzrlib.trace import mutter, warning
58
52
from bzrlib.transport import (
59
 
    FileFileStream,
60
 
    _file_streams,
61
 
    local,
 
53
    register_urlparse_netloc_protocol,
62
54
    Server,
 
55
    split_url,
63
56
    ssh,
64
 
    ConnectedTransport,
 
57
    Transport,
65
58
    )
66
 
 
67
 
# Disable one particular warning that comes from paramiko in Python2.5; if
68
 
# this is emitted at the wrong time it tends to cause spurious test failures
69
 
# or at least noise in the test case::
70
 
#
71
 
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
72
 
# test_permissions.TestSftpPermissions.test_new_files
73
 
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
74
 
#  self.packet.write(struct.pack('>I', n))
75
 
warnings.filterwarnings('ignore',
76
 
        'integer argument expected, got float',
77
 
        category=DeprecationWarning,
78
 
        module='paramiko.message')
 
59
from bzrlib.transport.local import LocalURLServer
79
60
 
80
61
try:
81
62
    import paramiko
89
70
    from paramiko.sftp_file import SFTPFile
90
71
 
91
72
 
 
73
register_urlparse_netloc_protocol('sftp')
 
74
 
 
75
 
 
76
# This is a weakref dictionary, so that we can reuse connections
 
77
# that are still active. Long term, it might be nice to have some
 
78
# sort of expiration policy, such as disconnect if inactive for
 
79
# X seconds. But that requires a lot more fanciness.
 
80
_connected_hosts = weakref.WeakValueDictionary()
 
81
 
 
82
 
92
83
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
93
84
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
94
85
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
95
86
 
96
87
 
 
88
def clear_connection_cache():
 
89
    """Remove all hosts from the SFTP connection cache.
 
90
 
 
91
    Primarily useful for test cases wanting to force garbage collection.
 
92
    """
 
93
    _connected_hosts.clear()
 
94
 
 
95
 
97
96
class SFTPLock(object):
98
97
    """This fakes a lock in a remote location.
99
 
 
 
98
    
100
99
    A present lock is indicated just by the existence of a file.  This
101
 
    doesn't work well on all transports and they are only used in
 
100
    doesn't work well on all transports and they are only used in 
102
101
    deprecated storage formats.
103
102
    """
104
 
 
 
103
    
105
104
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
106
105
 
107
106
    def __init__(self, path, transport):
 
107
        assert isinstance(transport, SFTPTransport)
 
108
 
108
109
        self.lock_file = None
109
110
        self.path = path
110
111
        self.lock_path = path + '.write-lock'
134
135
            pass
135
136
 
136
137
 
137
 
class _SFTPReadvHelper(object):
138
 
    """A class to help with managing the state of a readv request."""
139
 
 
140
 
    # See _get_requests for an explanation.
141
 
    _max_request_size = 32768
142
 
 
143
 
    def __init__(self, original_offsets, relpath, _report_activity):
144
 
        """Create a new readv helper.
145
 
 
146
 
        :param original_offsets: The original requests given by the caller of
147
 
            readv()
148
 
        :param relpath: The name of the file (if known)
149
 
        :param _report_activity: A Transport._report_activity bound method,
150
 
            to be called as data arrives.
151
 
        """
152
 
        self.original_offsets = list(original_offsets)
153
 
        self.relpath = relpath
154
 
        self._report_activity = _report_activity
155
 
 
156
 
    def _get_requests(self):
157
 
        """Break up the offsets into individual requests over sftp.
158
 
 
159
 
        The SFTP spec only requires implementers to support 32kB requests. We
160
 
        could try something larger (openssh supports 64kB), but then we have to
161
 
        handle requests that fail.
162
 
        So instead, we just break up our maximum chunks into 32kB chunks, and
163
 
        asyncronously requests them.
164
 
        Newer versions of paramiko would do the chunking for us, but we want to
165
 
        start processing results right away, so we do it ourselves.
166
 
        """
167
 
        # TODO: Because we issue async requests, we don't 'fudge' any extra
168
 
        #       data.  I'm not 100% sure that is the best choice.
169
 
 
170
 
        # The first thing we do, is to collapse the individual requests as much
171
 
        # as possible, so we don't issues requests <32kB
172
 
        sorted_offsets = sorted(self.original_offsets)
173
 
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
174
 
                                                        limit=0, fudge_factor=0))
175
 
        requests = []
176
 
        for c_offset in coalesced:
177
 
            start = c_offset.start
178
 
            size = c_offset.length
179
 
 
180
 
            # Break this up into 32kB requests
181
 
            while size > 0:
182
 
                next_size = min(size, self._max_request_size)
183
 
                requests.append((start, next_size))
184
 
                size -= next_size
185
 
                start += next_size
186
 
        if 'sftp' in debug.debug_flags:
187
 
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
188
 
                self.relpath, len(sorted_offsets), len(coalesced),
189
 
                len(requests))
190
 
        return requests
191
 
 
192
 
    def request_and_yield_offsets(self, fp):
193
 
        """Request the data from the remote machine, yielding the results.
194
 
 
195
 
        :param fp: A Paramiko SFTPFile object that supports readv.
196
 
        :return: Yield the data requested by the original readv caller, one by
197
 
            one.
198
 
        """
199
 
        requests = self._get_requests()
200
 
        offset_iter = iter(self.original_offsets)
201
 
        cur_offset, cur_size = offset_iter.next()
202
 
        # paramiko .readv() yields strings that are in the order of the requests
203
 
        # So we track the current request to know where the next data is
204
 
        # being returned from.
205
 
        input_start = None
206
 
        last_end = None
207
 
        buffered_data = []
208
 
        buffered_len = 0
209
 
 
210
 
        # This is used to buffer chunks which we couldn't process yet
211
 
        # It is (start, end, data) tuples.
212
 
        data_chunks = []
213
 
        # Create an 'unlimited' data stream, so we stop based on requests,
214
 
        # rather than just because the data stream ended. This lets us detect
215
 
        # short readv.
216
 
        data_stream = itertools.chain(fp.readv(requests),
217
 
                                      itertools.repeat(None))
218
 
        for (start, length), data in itertools.izip(requests, data_stream):
219
 
            if data is None:
220
 
                if cur_coalesced is not None:
221
 
                    raise errors.ShortReadvError(self.relpath,
222
 
                        start, length, len(data))
223
 
            if len(data) != length:
224
 
                raise errors.ShortReadvError(self.relpath,
225
 
                    start, length, len(data))
226
 
            self._report_activity(length, 'read')
227
 
            if last_end is None:
228
 
                # This is the first request, just buffer it
229
 
                buffered_data = [data]
230
 
                buffered_len = length
231
 
                input_start = start
232
 
            elif start == last_end:
233
 
                # The data we are reading fits neatly on the previous
234
 
                # buffer, so this is all part of a larger coalesced range.
235
 
                buffered_data.append(data)
236
 
                buffered_len += length
237
 
            else:
238
 
                # We have an 'interrupt' in the data stream. So we know we are
239
 
                # at a request boundary.
240
 
                if buffered_len > 0:
241
 
                    # We haven't consumed the buffer so far, so put it into
242
 
                    # data_chunks, and continue.
243
 
                    buffered = ''.join(buffered_data)
244
 
                    data_chunks.append((input_start, buffered))
245
 
                input_start = start
246
 
                buffered_data = [data]
247
 
                buffered_len = length
248
 
            last_end = start + length
249
 
            if input_start == cur_offset and cur_size <= buffered_len:
250
 
                # Simplify the next steps a bit by transforming buffered_data
251
 
                # into a single string. We also have the nice property that
252
 
                # when there is only one string ''.join([x]) == x, so there is
253
 
                # no data copying.
254
 
                buffered = ''.join(buffered_data)
255
 
                # Clean out buffered data so that we keep memory
256
 
                # consumption low
257
 
                del buffered_data[:]
258
 
                buffered_offset = 0
259
 
                # TODO: We *could* also consider the case where cur_offset is in
260
 
                #       in the buffered range, even though it doesn't *start*
261
 
                #       the buffered range. But for packs we pretty much always
262
 
                #       read in order, so you won't get any extra data in the
263
 
                #       middle.
264
 
                while (input_start == cur_offset
265
 
                       and (buffered_offset + cur_size) <= buffered_len):
266
 
                    # We've buffered enough data to process this request, spit it
267
 
                    # out
268
 
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
269
 
                    # move the direct pointer into our buffered data
270
 
                    buffered_offset += cur_size
271
 
                    # Move the start-of-buffer pointer
272
 
                    input_start += cur_size
273
 
                    # Yield the requested data
274
 
                    yield cur_offset, cur_data
275
 
                    cur_offset, cur_size = offset_iter.next()
276
 
                # at this point, we've consumed as much of buffered as we can,
277
 
                # so break off the portion that we consumed
278
 
                if buffered_offset == len(buffered_data):
279
 
                    # No tail to leave behind
280
 
                    buffered_data = []
281
 
                    buffered_len = 0
282
 
                else:
283
 
                    buffered = buffered[buffered_offset:]
284
 
                    buffered_data = [buffered]
285
 
                    buffered_len = len(buffered)
286
 
        if buffered_len:
287
 
            buffered = ''.join(buffered_data)
288
 
            del buffered_data[:]
289
 
            data_chunks.append((input_start, buffered))
290
 
        if data_chunks:
291
 
            if 'sftp' in debug.debug_flags:
292
 
                mutter('SFTP readv left with %d out-of-order bytes',
293
 
                    sum(map(lambda x: len(x[1]), data_chunks)))
294
 
            # We've processed all the readv data, at this point, anything we
295
 
            # couldn't process is in data_chunks. This doesn't happen often, so
296
 
            # this code path isn't optimized
297
 
            # We use an interesting process for data_chunks
298
 
            # Specifically if we have "bisect_left([(start, len, entries)],
299
 
            #                                       (qstart,)])
300
 
            # If start == qstart, then we get the specific node. Otherwise we
301
 
            # get the previous node
302
 
            while True:
303
 
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
304
 
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
305
 
                    # The data starts here
306
 
                    data = data_chunks[idx][1][:cur_size]
307
 
                elif idx > 0:
308
 
                    # The data is in a portion of a previous page
309
 
                    idx -= 1
310
 
                    sub_offset = cur_offset - data_chunks[idx][0]
311
 
                    data = data_chunks[idx][1]
312
 
                    data = data[sub_offset:sub_offset + cur_size]
313
 
                else:
314
 
                    # We are missing the page where the data should be found,
315
 
                    # something is wrong
316
 
                    data = ''
317
 
                if len(data) != cur_size:
318
 
                    raise AssertionError('We must have miscalulated.'
319
 
                        ' We expected %d bytes, but only found %d'
320
 
                        % (cur_size, len(data)))
321
 
                yield cur_offset, data
322
 
                cur_offset, cur_size = offset_iter.next()
323
 
 
324
 
 
325
 
class SFTPTransport(ConnectedTransport):
 
138
class SFTPUrlHandling(Transport):
 
139
    """Mix-in that does common handling of SSH/SFTP URLs."""
 
140
 
 
141
    def __init__(self, base):
 
142
        self._parse_url(base)
 
143
        base = self._unparse_url(self._path)
 
144
        if base[-1] != '/':
 
145
            base += '/'
 
146
        super(SFTPUrlHandling, self).__init__(base)
 
147
 
 
148
    def _parse_url(self, url):
 
149
        (self._scheme,
 
150
         self._username, self._password,
 
151
         self._host, self._port, self._path) = self._split_url(url)
 
152
 
 
153
    def _unparse_url(self, path):
 
154
        """Return a URL for a path relative to this transport.
 
155
        """
 
156
        path = urllib.quote(path)
 
157
        # handle homedir paths
 
158
        if not path.startswith('/'):
 
159
            path = "/~/" + path
 
160
        netloc = urllib.quote(self._host)
 
161
        if self._username is not None:
 
162
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
163
        if self._port is not None:
 
164
            netloc = '%s:%d' % (netloc, self._port)
 
165
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
166
 
 
167
    def _split_url(self, url):
 
168
        (scheme, username, password, host, port, path) = split_url(url)
 
169
        ## assert scheme == 'sftp'
 
170
 
 
171
        # the initial slash should be removed from the path, and treated
 
172
        # as a homedir relative path (the path begins with a double slash
 
173
        # if it is absolute).
 
174
        # see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
175
        # RBC 20060118 we are not using this as its too user hostile. instead
 
176
        # we are following lftp and using /~/foo to mean '~/foo'.
 
177
        # handle homedir paths
 
178
        if path.startswith('/~/'):
 
179
            path = path[3:]
 
180
        elif path == '/~':
 
181
            path = ''
 
182
        return (scheme, username, password, host, port, path)
 
183
 
 
184
    def abspath(self, relpath):
 
185
        """Return the full url to the given relative path.
 
186
        
 
187
        @param relpath: the relative path or path components
 
188
        @type relpath: str or list
 
189
        """
 
190
        return self._unparse_url(self._remote_path(relpath))
 
191
    
 
192
    def _remote_path(self, relpath):
 
193
        """Return the path to be passed along the sftp protocol for relpath.
 
194
        
 
195
        :param relpath: is a urlencoded string.
 
196
        """
 
197
        return self._combine_paths(self._path, relpath)
 
198
 
 
199
 
 
200
class SFTPTransport(SFTPUrlHandling):
326
201
    """Transport implementation for SFTP access."""
327
202
 
328
203
    _do_prefetch = _default_do_prefetch
343
218
    # up the request itself, rather than us having to worry about it
344
219
    _max_request_size = 32768
345
220
 
346
 
    def __init__(self, base, _from_transport=None):
347
 
        super(SFTPTransport, self).__init__(base,
348
 
                                            _from_transport=_from_transport)
 
221
    def __init__(self, base, clone_from=None):
 
222
        super(SFTPTransport, self).__init__(base)
 
223
        if clone_from is None:
 
224
            self._sftp_connect()
 
225
        else:
 
226
            # use the same ssh connection, etc
 
227
            self._sftp = clone_from._sftp
 
228
        # super saves 'self.base'
 
229
    
 
230
    def should_cache(self):
 
231
        """
 
232
        Return True if the data pulled across should be cached locally.
 
233
        """
 
234
        return True
 
235
 
 
236
    def clone(self, offset=None):
 
237
        """
 
238
        Return a new SFTPTransport with root at self.base + offset.
 
239
        We share the same SFTP session between such transports, because it's
 
240
        fairly expensive to set them up.
 
241
        """
 
242
        if offset is None:
 
243
            return SFTPTransport(self.base, self)
 
244
        else:
 
245
            return SFTPTransport(self.abspath(offset), self)
349
246
 
350
247
    def _remote_path(self, relpath):
351
248
        """Return the path to be passed along the sftp protocol for relpath.
352
 
 
353
 
        :param relpath: is a urlencoded string.
354
 
        """
355
 
        relative = urlutils.unescape(relpath).encode('utf-8')
356
 
        remote_path = self._combine_paths(self._path, relative)
357
 
        # the initial slash should be removed from the path, and treated as a
358
 
        # homedir relative path (the path begins with a double slash if it is
359
 
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
360
 
        # RBC 20060118 we are not using this as its too user hostile. instead
361
 
        # we are following lftp and using /~/foo to mean '~/foo'
362
 
        # vila--20070602 and leave absolute paths begin with a single slash.
363
 
        if remote_path.startswith('/~/'):
364
 
            remote_path = remote_path[3:]
365
 
        elif remote_path == '/~':
366
 
            remote_path = ''
367
 
        return remote_path
368
 
 
369
 
    def _create_connection(self, credentials=None):
370
 
        """Create a new connection with the provided credentials.
371
 
 
372
 
        :param credentials: The credentials needed to establish the connection.
373
 
 
374
 
        :return: The created connection and its associated credentials.
375
 
 
376
 
        The credentials are only the password as it may have been entered
377
 
        interactively by the user and may be different from the one provided
378
 
        in base url at transport creation time.
379
 
        """
380
 
        if credentials is None:
381
 
            password = self._password
 
249
        
 
250
        relpath is a urlencoded string.
 
251
 
 
252
        :return: a path prefixed with / for regular abspath-based urls, or a
 
253
            path that does not begin with / for urls which begin with /~/.
 
254
        """
 
255
        # how does this work? 
 
256
        # it processes relpath with respect to 
 
257
        # our state:
 
258
        # firstly we create a path to evaluate: 
 
259
        # if relpath is an abspath or homedir path, its the entire thing
 
260
        # otherwise we join our base with relpath
 
261
        # then we eliminate all empty segments (double //'s) outside the first
 
262
        # two elements of the list. This avoids problems with trailing 
 
263
        # slashes, or other abnormalities.
 
264
        # finally we evaluate the entire path in a single pass
 
265
        # '.'s are stripped,
 
266
        # '..' result in popping the left most already 
 
267
        # processed path (which can never be empty because of the check for
 
268
        # abspath and homedir meaning that its not, or that we've used our
 
269
        # path. If the pop would pop the root, we ignore it.
 
270
 
 
271
        # Specific case examinations:
 
272
        # remove the special casefor ~: if the current root is ~/ popping of it
 
273
        # = / thus our seed for a ~ based path is ['', '~']
 
274
        # and if we end up with [''] then we had basically ('', '..') (which is
 
275
        # '/..' so we append '' if the length is one, and assert that the first
 
276
        # element is still ''. Lastly, if we end with ['', '~'] as a prefix for
 
277
        # the output, we've got a homedir path, so we strip that prefix before
 
278
        # '/' joining the resulting list.
 
279
        #
 
280
        # case one: '/' -> ['', ''] cannot shrink
 
281
        # case two: '/' + '../foo' -> ['', 'foo'] (take '', '', '..', 'foo')
 
282
        #           and pop the second '' for the '..', append 'foo'
 
283
        # case three: '/~/' -> ['', '~', ''] 
 
284
        # case four: '/~/' + '../foo' -> ['', '~', '', '..', 'foo'],
 
285
        #           and we want to get '/foo' - the empty path in the middle
 
286
        #           needs to be stripped, then normal path manipulation will 
 
287
        #           work.
 
288
        # case five: '/..' ['', '..'], we want ['', '']
 
289
        #            stripping '' outside the first two is ok
 
290
        #            ignore .. if its too high up
 
291
        #
 
292
        # lastly this code is possibly reusable by FTP, but not reusable by
 
293
        # local paths: ~ is resolvable correctly, nor by HTTP or the smart
 
294
        # server: ~ is resolved remotely.
 
295
        # 
 
296
        # however, a version of this that acts on self.base is possible to be
 
297
        # written which manipulates the URL in canonical form, and would be
 
298
        # reusable for all transports, if a flag for allowing ~/ at all was
 
299
        # provided.
 
300
        assert isinstance(relpath, basestring)
 
301
        relpath = urlutils.unescape(relpath)
 
302
 
 
303
        # case 1)
 
304
        if relpath.startswith('/'):
 
305
            # abspath - normal split is fine.
 
306
            current_path = relpath.split('/')
 
307
        elif relpath.startswith('~/'):
 
308
            # root is homedir based: normal split and prefix '' to remote the
 
309
            # special case
 
310
            current_path = [''].extend(relpath.split('/'))
382
311
        else:
383
 
            password = credentials
384
 
 
385
 
        vendor = ssh._get_ssh_vendor()
386
 
        user = self._user
387
 
        if user is None:
388
 
            auth = config.AuthenticationConfig()
389
 
            user = auth.get_user('ssh', self._host, self._port)
390
 
        connection = vendor.connect_sftp(self._user, password,
391
 
                                         self._host, self._port)
392
 
        return connection, (user, password)
393
 
 
394
 
    def _get_sftp(self):
395
 
        """Ensures that a connection is established"""
396
 
        connection = self._get_connection()
397
 
        if connection is None:
398
 
            # First connection ever
399
 
            connection, credentials = self._create_connection()
400
 
            self._set_connection(connection, credentials)
401
 
        return connection
 
312
            # root is from the current directory:
 
313
            if self._path.startswith('/'):
 
314
                # abspath, take the regular split
 
315
                current_path = []
 
316
            else:
 
317
                # homedir based, add the '', '~' not present in self._path
 
318
                current_path = ['', '~']
 
319
            # add our current dir
 
320
            current_path.extend(self._path.split('/'))
 
321
            # add the users relpath
 
322
            current_path.extend(relpath.split('/'))
 
323
        # strip '' segments that are not in the first one - the leading /.
 
324
        to_process = current_path[:1]
 
325
        for segment in current_path[1:]:
 
326
            if segment != '':
 
327
                to_process.append(segment)
 
328
 
 
329
        # process '.' and '..' segments into output_path.
 
330
        output_path = []
 
331
        for segment in to_process:
 
332
            if segment == '..':
 
333
                # directory pop. Remove a directory 
 
334
                # as long as we are not at the root
 
335
                if len(output_path) > 1:
 
336
                    output_path.pop()
 
337
                # else: pass
 
338
                # cannot pop beyond the root, so do nothing
 
339
            elif segment == '.':
 
340
                continue # strip the '.' from the output.
 
341
            else:
 
342
                # this will append '' to output_path for the root elements,
 
343
                # which is appropriate: its why we strip '' in the first pass.
 
344
                output_path.append(segment)
 
345
 
 
346
        # check output special cases:
 
347
        if output_path == ['']:
 
348
            # [''] -> ['', '']
 
349
            output_path = ['', '']
 
350
        elif output_path[:2] == ['', '~']:
 
351
            # ['', '~', ...] -> ...
 
352
            output_path = output_path[2:]
 
353
        path = '/'.join(output_path)
 
354
        return path
 
355
 
 
356
    def relpath(self, abspath):
 
357
        scheme, username, password, host, port, path = self._split_url(abspath)
 
358
        error = []
 
359
        if (username != self._username):
 
360
            error.append('username mismatch')
 
361
        if (host != self._host):
 
362
            error.append('host mismatch')
 
363
        if (port != self._port):
 
364
            error.append('port mismatch')
 
365
        if (not path.startswith(self._path)):
 
366
            error.append('path mismatch')
 
367
        if error:
 
368
            extra = ': ' + ', '.join(error)
 
369
            raise PathNotChild(abspath, self.base, extra=extra)
 
370
        pl = len(self._path)
 
371
        return path[pl:].strip('/')
402
372
 
403
373
    def has(self, relpath):
404
374
        """
405
375
        Does the target location exist?
406
376
        """
407
377
        try:
408
 
            self._get_sftp().stat(self._remote_path(relpath))
409
 
            # stat result is about 20 bytes, let's say
410
 
            self._report_activity(20, 'read')
 
378
            self._sftp.stat(self._remote_path(relpath))
411
379
            return True
412
380
        except IOError:
413
381
            return False
414
382
 
415
383
    def get(self, relpath):
416
 
        """Get the file at the given relative path.
 
384
        """
 
385
        Get the file at the given relative path.
417
386
 
418
387
        :param relpath: The relative path to the file
419
388
        """
420
389
        try:
421
 
            # FIXME: by returning the file directly, we don't pass this
422
 
            # through to report_activity.  We could try wrapping the object
423
 
            # before it's returned.  For readv and get_bytes it's handled in
424
 
            # the higher-level function.
425
 
            # -- mbp 20090126
426
390
            path = self._remote_path(relpath)
427
 
            f = self._get_sftp().file(path, mode='rb')
 
391
            f = self._sftp.file(path, mode='rb')
428
392
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
429
393
                f.prefetch()
430
394
            return f
431
395
        except (IOError, paramiko.SSHException), e:
432
 
            self._translate_io_exception(e, path, ': error retrieving',
433
 
                failure_exc=errors.ReadError)
434
 
 
435
 
    def get_bytes(self, relpath):
436
 
        # reimplement this here so that we can report how many bytes came back
437
 
        f = self.get(relpath)
438
 
        try:
439
 
            bytes = f.read()
440
 
            self._report_activity(len(bytes), 'read')
441
 
            return bytes
442
 
        finally:
443
 
            f.close()
444
 
 
445
 
    def _readv(self, relpath, offsets):
 
396
            self._translate_io_exception(e, path, ': error retrieving')
 
397
 
 
398
    def readv(self, relpath, offsets):
446
399
        """See Transport.readv()"""
447
400
        # We overload the default readv() because we want to use a file
448
401
        # that does not have prefetch enabled.
452
405
 
453
406
        try:
454
407
            path = self._remote_path(relpath)
455
 
            fp = self._get_sftp().file(path, mode='rb')
 
408
            fp = self._sftp.file(path, mode='rb')
456
409
            readv = getattr(fp, 'readv', None)
457
410
            if readv:
458
411
                return self._sftp_readv(fp, offsets, relpath)
459
 
            if 'sftp' in debug.debug_flags:
460
 
                mutter('seek and read %s offsets', len(offsets))
 
412
            mutter('seek and read %s offsets', len(offsets))
461
413
            return self._seek_and_read(fp, offsets, relpath)
462
414
        except (IOError, paramiko.SSHException), e:
463
415
            self._translate_io_exception(e, path, ': error retrieving')
464
416
 
465
 
    def recommended_page_size(self):
466
 
        """See Transport.recommended_page_size().
467
 
 
468
 
        For SFTP we suggest a large page size to reduce the overhead
469
 
        introduced by latency.
470
 
        """
471
 
        return 64 * 1024
472
 
 
473
 
    def _sftp_readv(self, fp, offsets, relpath):
 
417
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
474
418
        """Use the readv() member of fp to do async readv.
475
419
 
476
 
        Then read them using paramiko.readv(). paramiko.readv()
 
420
        And then read them using paramiko.readv(). paramiko.readv()
477
421
        does not support ranges > 64K, so it caps the request size, and
478
 
        just reads until it gets all the stuff it wants.
 
422
        just reads until it gets all the stuff it wants
479
423
        """
480
 
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
481
 
        return helper.request_and_yield_offsets(fp)
 
424
        offsets = list(offsets)
 
425
        sorted_offsets = sorted(offsets)
 
426
 
 
427
        # The algorithm works as follows:
 
428
        # 1) Coalesce nearby reads into a single chunk
 
429
        #    This generates a list of combined regions, the total size
 
430
        #    and the size of the sub regions. This coalescing step is limited
 
431
        #    in the number of nearby chunks to combine, and is allowed to
 
432
        #    skip small breaks in the requests. Limiting it makes sure that
 
433
        #    we can start yielding some data earlier, and skipping means we
 
434
        #    make fewer requests. (Beneficial even when using async)
 
435
        # 2) Break up this combined regions into chunks that are smaller
 
436
        #    than 64KiB. Technically the limit is 65536, but we are a
 
437
        #    little bit conservative. This is because sftp has a maximum
 
438
        #    return chunk size of 64KiB (max size of an unsigned short)
 
439
        # 3) Issue a readv() to paramiko to create an async request for
 
440
        #    all of this data
 
441
        # 4) Read in the data as it comes back, until we've read one
 
442
        #    continuous section as determined in step 1
 
443
        # 5) Break up the full sections into hunks for the original requested
 
444
        #    offsets. And put them in a cache
 
445
        # 6) Check if the next request is in the cache, and if it is, remove
 
446
        #    it from the cache, and yield its data. Continue until no more
 
447
        #    entries are in the cache.
 
448
        # 7) loop back to step 4 until all data has been read
 
449
        #
 
450
        # TODO: jam 20060725 This could be optimized one step further, by
 
451
        #       attempting to yield whatever data we have read, even before
 
452
        #       the first coallesced section has been fully processed.
 
453
 
 
454
        # When coalescing for use with readv(), we don't really need to
 
455
        # use any fudge factor, because the requests are made asynchronously
 
456
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
457
                               limit=self._max_readv_combine,
 
458
                               fudge_factor=0,
 
459
                               ))
 
460
        requests = []
 
461
        for c_offset in coalesced:
 
462
            start = c_offset.start
 
463
            size = c_offset.length
 
464
 
 
465
            # We need to break this up into multiple requests
 
466
            while size > 0:
 
467
                next_size = min(size, self._max_request_size)
 
468
                requests.append((start, next_size))
 
469
                size -= next_size
 
470
                start += next_size
 
471
 
 
472
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
473
                len(offsets), len(coalesced), len(requests))
 
474
 
 
475
        # Queue the current read until we have read the full coalesced section
 
476
        cur_data = []
 
477
        cur_data_len = 0
 
478
        cur_coalesced_stack = iter(coalesced)
 
479
        cur_coalesced = cur_coalesced_stack.next()
 
480
 
 
481
        # Cache the results, but only until they have been fulfilled
 
482
        data_map = {}
 
483
        # turn the list of offsets into a stack
 
484
        offset_stack = iter(offsets)
 
485
        cur_offset_and_size = offset_stack.next()
 
486
 
 
487
        for data in fp.readv(requests):
 
488
            cur_data += data
 
489
            cur_data_len += len(data)
 
490
 
 
491
            if cur_data_len < cur_coalesced.length:
 
492
                continue
 
493
            assert cur_data_len == cur_coalesced.length, \
 
494
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
495
                                                        cur_coalesced.length)
 
496
            all_data = ''.join(cur_data)
 
497
            cur_data = []
 
498
            cur_data_len = 0
 
499
 
 
500
            for suboffset, subsize in cur_coalesced.ranges:
 
501
                key = (cur_coalesced.start+suboffset, subsize)
 
502
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
503
 
 
504
            # Now that we've read some data, see if we can yield anything back
 
505
            while cur_offset_and_size in data_map:
 
506
                this_data = data_map.pop(cur_offset_and_size)
 
507
                yield cur_offset_and_size[0], this_data
 
508
                cur_offset_and_size = offset_stack.next()
 
509
 
 
510
            # We read a coalesced entry, so mark it as done
 
511
            cur_coalesced = None
 
512
            # Now that we've read all of the data for this coalesced section
 
513
            # on to the next
 
514
            cur_coalesced = cur_coalesced_stack.next()
 
515
 
 
516
        if cur_coalesced is not None:
 
517
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
 
518
                cur_coalesced.length, len(data))
482
519
 
483
520
    def put_file(self, relpath, f, mode=None):
484
521
        """
489
526
        :param mode: The final mode for the file
490
527
        """
491
528
        final_path = self._remote_path(relpath)
492
 
        return self._put(final_path, f, mode=mode)
 
529
        self._put(final_path, f, mode=mode)
493
530
 
494
531
    def _put(self, abspath, f, mode=None):
495
532
        """Helper function so both put() and copy_abspaths can reuse the code"""
500
537
        try:
501
538
            try:
502
539
                fout.set_pipelined(True)
503
 
                length = self._pump(f, fout)
 
540
                self._pump(f, fout)
504
541
            except (IOError, paramiko.SSHException), e:
505
542
                self._translate_io_exception(e, tmp_abspath)
506
543
            # XXX: This doesn't truly help like we would like it to.
509
546
            #      sticky bit. So it is probably best to stop chmodding, and
510
547
            #      just tell users that they need to set the umask correctly.
511
548
            #      The attr.st_mode = mode, in _sftp_open_exclusive
512
 
            #      will handle when the user wants the final mode to be more
513
 
            #      restrictive. And then we avoid a round trip. Unless
 
549
            #      will handle when the user wants the final mode to be more 
 
550
            #      restrictive. And then we avoid a round trip. Unless 
514
551
            #      paramiko decides to expose an async chmod()
515
552
 
516
553
            # This is designed to chmod() right before we close.
517
 
            # Because we set_pipelined() earlier, theoretically we might
 
554
            # Because we set_pipelined() earlier, theoretically we might 
518
555
            # avoid the round trip for fout.close()
519
556
            if mode is not None:
520
 
                self._get_sftp().chmod(tmp_abspath, mode)
 
557
                self._sftp.chmod(tmp_abspath, mode)
521
558
            fout.close()
522
559
            closed = True
523
560
            self._rename_and_overwrite(tmp_abspath, abspath)
524
 
            return length
525
561
        except Exception, e:
526
562
            # If we fail, try to clean up the temporary file
527
563
            # before we throw the exception
533
569
            try:
534
570
                if not closed:
535
571
                    fout.close()
536
 
                self._get_sftp().remove(tmp_abspath)
 
572
                self._sftp.remove(tmp_abspath)
537
573
            except:
538
574
                # raise the saved except
539
575
                raise e
554
590
            fout = None
555
591
            try:
556
592
                try:
557
 
                    fout = self._get_sftp().file(abspath, mode='wb')
 
593
                    fout = self._sftp.file(abspath, mode='wb')
558
594
                    fout.set_pipelined(True)
559
595
                    writer(fout)
560
596
                except (paramiko.SSHException, IOError), e:
562
598
                                                 ': unable to open')
563
599
 
564
600
                # This is designed to chmod() right before we close.
565
 
                # Because we set_pipelined() earlier, theoretically we might
 
601
                # Because we set_pipelined() earlier, theoretically we might 
566
602
                # avoid the round trip for fout.close()
567
603
                if mode is not None:
568
 
                    self._get_sftp().chmod(abspath, mode)
 
604
                    self._sftp.chmod(abspath, mode)
569
605
            finally:
570
606
                if fout is not None:
571
607
                    fout.close()
619
655
 
620
656
    def iter_files_recursive(self):
621
657
        """Walk the relative paths of all files in this transport."""
622
 
        # progress is handled by list_dir
623
658
        queue = list(self.list_dir('.'))
624
659
        while queue:
625
660
            relpath = queue.pop(0)
636
671
        else:
637
672
            local_mode = mode
638
673
        try:
639
 
            self._report_activity(len(abspath), 'write')
640
 
            self._get_sftp().mkdir(abspath, local_mode)
641
 
            self._report_activity(1, 'read')
 
674
            self._sftp.mkdir(abspath, local_mode)
642
675
            if mode is not None:
643
 
                # chmod a dir through sftp will erase any sgid bit set
644
 
                # on the server side.  So, if the bit mode are already
645
 
                # set, avoid the chmod.  If the mode is not fine but
646
 
                # the sgid bit is set, report a warning to the user
647
 
                # with the umask fix.
648
 
                stat = self._get_sftp().lstat(abspath)
649
 
                mode = mode & 0777 # can't set special bits anyway
650
 
                if mode != stat.st_mode & 0777:
651
 
                    if stat.st_mode & 06000:
652
 
                        warning('About to chmod %s over sftp, which will result'
653
 
                                ' in its suid or sgid bits being cleared.  If'
654
 
                                ' you want to preserve those bits, change your '
655
 
                                ' environment on the server to use umask 0%03o.'
656
 
                                % (abspath, 0777 - mode))
657
 
                    self._get_sftp().chmod(abspath, mode=mode)
 
676
                self._sftp.chmod(abspath, mode=mode)
658
677
        except (paramiko.SSHException, IOError), e:
659
678
            self._translate_io_exception(e, abspath, ': unable to mkdir',
660
679
                failure_exc=FileExists)
663
682
        """Create a directory at the given path."""
664
683
        self._mkdir(self._remote_path(relpath), mode=mode)
665
684
 
666
 
    def open_write_stream(self, relpath, mode=None):
667
 
        """See Transport.open_write_stream."""
668
 
        # initialise the file to zero-length
669
 
        # this is three round trips, but we don't use this
670
 
        # api more than once per write_group at the moment so
671
 
        # it is a tolerable overhead. Better would be to truncate
672
 
        # the file after opening. RBC 20070805
673
 
        self.put_bytes_non_atomic(relpath, "", mode)
674
 
        abspath = self._remote_path(relpath)
675
 
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
676
 
        #       set the file mode at create time. If it does, use it.
677
 
        #       But for now, we just chmod later anyway.
678
 
        handle = None
679
 
        try:
680
 
            handle = self._get_sftp().file(abspath, mode='wb')
681
 
            handle.set_pipelined(True)
682
 
        except (paramiko.SSHException, IOError), e:
683
 
            self._translate_io_exception(e, abspath,
684
 
                                         ': unable to open')
685
 
        _file_streams[self.abspath(relpath)] = handle
686
 
        return FileFileStream(self, relpath, handle)
687
 
 
688
 
    def _translate_io_exception(self, e, path, more_info='',
 
685
    def _translate_io_exception(self, e, path, more_info='', 
689
686
                                failure_exc=PathError):
690
687
        """Translate a paramiko or IOError into a friendlier exception.
691
688
 
696
693
        :param failure_exc: Paramiko has the super fun ability to raise completely
697
694
                           opaque errors that just set "e.args = ('Failure',)" with
698
695
                           no more information.
699
 
                           If this parameter is set, it defines the exception
 
696
                           If this parameter is set, it defines the exception 
700
697
                           to raise in these cases.
701
698
        """
702
699
        # paramiko seems to generate detailless errors.
705
702
            if (e.args == ('No such file or directory',) or
706
703
                e.args == ('No such file',)):
707
704
                raise NoSuchFile(path, str(e) + more_info)
708
 
            if (e.args == ('mkdir failed',) or
709
 
                e.args[0].startswith('syserr: File exists')):
 
705
            if (e.args == ('mkdir failed',)):
710
706
                raise FileExists(path, str(e) + more_info)
711
707
            # strange but true, for the paramiko server.
712
708
            if (e.args == ('Failure',)):
723
719
        """
724
720
        try:
725
721
            path = self._remote_path(relpath)
726
 
            fout = self._get_sftp().file(path, 'ab')
 
722
            fout = self._sftp.file(path, 'ab')
727
723
            if mode is not None:
728
 
                self._get_sftp().chmod(path, mode)
 
724
                self._sftp.chmod(path, mode)
729
725
            result = fout.tell()
730
726
            self._pump(f, fout)
731
727
            return result
735
731
    def rename(self, rel_from, rel_to):
736
732
        """Rename without special overwriting"""
737
733
        try:
738
 
            self._get_sftp().rename(self._remote_path(rel_from),
 
734
            self._sftp.rename(self._remote_path(rel_from),
739
735
                              self._remote_path(rel_to))
740
736
        except (IOError, paramiko.SSHException), e:
741
737
            self._translate_io_exception(e, rel_from,
743
739
 
744
740
    def _rename_and_overwrite(self, abs_from, abs_to):
745
741
        """Do a fancy rename on the remote server.
746
 
 
 
742
        
747
743
        Using the implementation provided by osutils.
748
744
        """
749
745
        try:
750
 
            sftp = self._get_sftp()
751
746
            fancy_rename(abs_from, abs_to,
752
 
                         rename_func=sftp.rename,
753
 
                         unlink_func=sftp.remove)
 
747
                    rename_func=self._sftp.rename,
 
748
                    unlink_func=self._sftp.remove)
754
749
        except (IOError, paramiko.SSHException), e:
755
 
            self._translate_io_exception(e, abs_from,
756
 
                                         ': unable to rename to %r' % (abs_to))
 
750
            self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
757
751
 
758
752
    def move(self, rel_from, rel_to):
759
753
        """Move the item at rel_from to the location at rel_to"""
765
759
        """Delete the item at relpath"""
766
760
        path = self._remote_path(relpath)
767
761
        try:
768
 
            self._get_sftp().remove(path)
 
762
            self._sftp.remove(path)
769
763
        except (IOError, paramiko.SSHException), e:
770
764
            self._translate_io_exception(e, path, ': unable to delete')
771
 
 
772
 
    def external_url(self):
773
 
        """See bzrlib.transport.Transport.external_url."""
774
 
        # the external path for SFTP is the base
775
 
        return self.base
776
 
 
 
765
            
777
766
    def listable(self):
778
767
        """Return True if this store supports listing."""
779
768
        return True
788
777
        # -- David Allouche 2006-08-11
789
778
        path = self._remote_path(relpath)
790
779
        try:
791
 
            entries = self._get_sftp().listdir(path)
792
 
            self._report_activity(sum(map(len, entries)), 'read')
 
780
            entries = self._sftp.listdir(path)
793
781
        except (IOError, paramiko.SSHException), e:
794
782
            self._translate_io_exception(e, path, ': failed to list_dir')
795
783
        return [urlutils.escape(entry) for entry in entries]
798
786
        """See Transport.rmdir."""
799
787
        path = self._remote_path(relpath)
800
788
        try:
801
 
            return self._get_sftp().rmdir(path)
 
789
            return self._sftp.rmdir(path)
802
790
        except (IOError, paramiko.SSHException), e:
803
791
            self._translate_io_exception(e, path, ': failed to rmdir')
804
792
 
806
794
        """Return the stat information for a file."""
807
795
        path = self._remote_path(relpath)
808
796
        try:
809
 
            return self._get_sftp().stat(path)
 
797
            return self._sftp.stat(path)
810
798
        except (IOError, paramiko.SSHException), e:
811
799
            self._translate_io_exception(e, path, ': unable to stat')
812
800
 
836
824
        # that we have taken the lock.
837
825
        return SFTPLock(relpath, self)
838
826
 
 
827
    def _sftp_connect(self):
 
828
        """Connect to the remote sftp server.
 
829
        After this, self._sftp should have a valid connection (or
 
830
        we raise an TransportError 'could not connect').
 
831
 
 
832
        TODO: Raise a more reasonable ConnectionFailed exception
 
833
        """
 
834
        self._sftp = _sftp_connect(self._host, self._port, self._username,
 
835
                self._password)
 
836
 
839
837
    def _sftp_open_exclusive(self, abspath, mode=None):
840
838
        """Open a remote path exclusively.
841
839
 
852
850
        """
853
851
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
854
852
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
855
 
        #       However, there is no way to set the permission mode at open
 
853
        #       However, there is no way to set the permission mode at open 
856
854
        #       time using the sftp_client.file() functionality.
857
 
        path = self._get_sftp()._adjust_cwd(abspath)
 
855
        path = self._sftp._adjust_cwd(abspath)
858
856
        # mutter('sftp abspath %s => %s', abspath, path)
859
857
        attr = SFTPAttributes()
860
858
        if mode is not None:
861
859
            attr.st_mode = mode
862
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
 
860
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
863
861
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
864
862
        try:
865
 
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
 
863
            t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
866
864
            if t != CMD_HANDLE:
867
865
                raise TransportError('Expected an SFTP handle')
868
866
            handle = msg.get_string()
869
 
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
 
867
            return SFTPFile(self._sftp, handle, 'wb', -1)
870
868
        except (paramiko.SSHException, IOError), e:
871
869
            self._translate_io_exception(e, abspath, ': unable to open',
872
870
                failure_exc=FileExists)
943
941
                # probably a failed test; unit test thread will log the
944
942
                # failure/error
945
943
                sys.excepthook(*sys.exc_info())
946
 
                warning('Exception from within unit test server thread: %r' %
 
944
                warning('Exception from within unit test server thread: %r' % 
947
945
                        x)
948
946
 
949
947
 
960
958
 
961
959
    Not all methods are implemented, this is deliberate as this class is not a
962
960
    replacement for the builtin sockets layer. fileno is not implemented to
963
 
    prevent the proxy being bypassed.
 
961
    prevent the proxy being bypassed. 
964
962
    """
965
963
 
966
964
    simulated_time = 0
968
966
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
969
967
        "setblocking", "setsockopt", "settimeout", "shutdown"])
970
968
 
971
 
    def __init__(self, sock, latency, bandwidth=1.0,
 
969
    def __init__(self, sock, latency, bandwidth=1.0, 
972
970
                 really_sleep=True):
973
 
        """
 
971
        """ 
974
972
        :param bandwith: simulated bandwith (MegaBit)
975
973
        :param really_sleep: If set to false, the SocketDelay will just
976
974
        increase a counter, instead of calling time.sleep. This is useful for
979
977
        self.sock = sock
980
978
        self.latency = latency
981
979
        self.really_sleep = really_sleep
982
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024)
 
980
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
983
981
        self.new_roundtrip = False
984
982
 
985
983
    def sleep(self, s):
1047
1045
 
1048
1046
    def _run_server_entry(self, sock):
1049
1047
        """Entry point for all implementations of _run_server.
1050
 
 
 
1048
        
1051
1049
        If self.add_latency is > 0.000001 then sock is given a latency adding
1052
1050
        decorator.
1053
1051
        """
1070
1068
        event = threading.Event()
1071
1069
        ssh_server.start_server(event, server)
1072
1070
        event.wait(5.0)
1073
 
 
1074
 
    def setUp(self, backing_server=None):
1075
 
        # XXX: TODO: make sftpserver back onto backing_server rather than local
1076
 
        # disk.
1077
 
        if not (backing_server is None or
1078
 
                isinstance(backing_server, local.LocalURLServer)):
1079
 
            raise AssertionError(
1080
 
                "backing_server should not be %r, because this can only serve the "
1081
 
                "local current working directory." % (backing_server,))
 
1071
    
 
1072
    def setUp(self, vfs_server=None):
 
1073
        # XXX: TODO: make sftpserver back onto vfs_server rather than local disk.
 
1074
        assert vfs_server is None or isinstance(vfs_server, LocalURLServer), \
 
1075
            "SFTPServer currently assumes local transport, got %s" % vfs_server
1082
1076
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
1083
1077
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
1084
1078
        if sys.platform == 'win32':
1146
1140
            def close(self):
1147
1141
                pass
1148
1142
 
1149
 
        server = paramiko.SFTPServer(
1150
 
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1151
 
            root=self._root, home=self._server_homedir)
 
1143
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
1144
                                     root=self._root, home=self._server_homedir)
1152
1145
        try:
1153
 
            server.start_subsystem(
1154
 
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
 
1146
            server.start_subsystem('sftp', None, sock)
1155
1147
        except socket.error, e:
1156
1148
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1157
1149
                # it's okay for the client to disconnect abruptly
1160
1152
            else:
1161
1153
                raise
1162
1154
        except Exception, e:
1163
 
            # This typically seems to happen during interpreter shutdown, so
1164
 
            # most of the useful ways to report this error are won't work.
1165
 
            # Writing the exception type, and then the text of the exception,
1166
 
            # seems to be the best we can do.
1167
 
            import sys
1168
 
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
1169
 
            sys.stderr.write('%s\n\n' % (e,))
 
1155
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1170
1156
        server.finish_subsystem()
1171
1157
 
1172
1158
 
1191
1177
 
1192
1178
 
1193
1179
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1194
 
    """A test server for sftp transports where only absolute paths will work.
1195
 
 
1196
 
    It does this by serving from a deeply-nested directory that doesn't exist.
1197
 
    """
1198
 
 
1199
 
    def setUp(self, backing_server=None):
 
1180
    """A test servere for sftp transports, using absolute urls to non-home."""
 
1181
 
 
1182
    def setUp(self):
1200
1183
        self._server_homedir = '/dev/noone/runs/tests/here'
1201
 
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
 
1184
        super(SFTPSiblingAbsoluteServer, self).setUp()
 
1185
 
 
1186
 
 
1187
def _sftp_connect(host, port, username, password):
 
1188
    """Connect to the remote sftp server.
 
1189
 
 
1190
    :raises: a TransportError 'could not connect'.
 
1191
 
 
1192
    :returns: an paramiko.sftp_client.SFTPClient
 
1193
 
 
1194
    TODO: Raise a more reasonable ConnectionFailed exception
 
1195
    """
 
1196
    idx = (host, port, username)
 
1197
    try:
 
1198
        return _connected_hosts[idx]
 
1199
    except KeyError:
 
1200
        pass
 
1201
    
 
1202
    sftp = _sftp_connect_uncached(host, port, username, password)
 
1203
    _connected_hosts[idx] = sftp
 
1204
    return sftp
 
1205
 
 
1206
def _sftp_connect_uncached(host, port, username, password):
 
1207
    vendor = ssh._get_ssh_vendor()
 
1208
    sftp = vendor.connect_sftp(username, password, host, port)
 
1209
    return sftp
1202
1210
 
1203
1211
 
1204
1212
def get_test_permutations():