~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Patch Queue Manager
  • Date: 2016-02-01 19:13:13 UTC
  • mfrom: (6614.2.2 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20160201191313-wdfvmfff1djde6oq
(vila) Release 2.7.0 (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
2
#
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
13
12
#
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
16
 
18
17
"""Implementation of Transport over SFTP, using paramiko."""
19
18
 
 
19
from __future__ import absolute_import
 
20
 
20
21
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
21
22
# then raise TransportNotPossible, which will break remote access to any
22
23
# formats which rely on OS-level locks.  That should be fine as those formats
24
25
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
26
# these methods when we officially drop support for those formats.
26
27
 
 
28
import bisect
27
29
import errno
 
30
import itertools
28
31
import os
29
32
import random
30
 
import select
31
 
import socket
32
33
import stat
33
34
import sys
34
35
import time
35
 
import urllib
36
 
import urlparse
37
 
import weakref
 
36
import warnings
38
37
 
39
38
from bzrlib import (
 
39
    config,
 
40
    debug,
40
41
    errors,
41
42
    urlutils,
42
43
    )
43
44
from bzrlib.errors import (FileExists,
44
 
                           NoSuchFile, PathNotChild,
 
45
                           NoSuchFile,
45
46
                           TransportError,
46
47
                           LockError,
47
48
                           PathError,
48
49
                           ParamikoNotPresent,
49
50
                           )
50
 
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
51
from bzrlib.osutils import fancy_rename
51
52
from bzrlib.trace import mutter, warning
52
53
from bzrlib.transport import (
53
 
    register_urlparse_netloc_protocol,
54
 
    Server,
55
 
    split_url,
 
54
    FileFileStream,
 
55
    _file_streams,
56
56
    ssh,
57
 
    Transport,
 
57
    ConnectedTransport,
58
58
    )
59
59
 
 
60
# Disable one particular warning that comes from paramiko in Python2.5; if
 
61
# this is emitted at the wrong time it tends to cause spurious test failures
 
62
# or at least noise in the test case::
 
63
#
 
64
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
 
65
# test_permissions.TestSftpPermissions.test_new_files
 
66
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
 
67
#  self.packet.write(struct.pack('>I', n))
 
68
warnings.filterwarnings('ignore',
 
69
        'integer argument expected, got float',
 
70
        category=DeprecationWarning,
 
71
        module='paramiko.message')
 
72
 
60
73
try:
61
74
    import paramiko
62
75
except ImportError, e:
64
77
else:
65
78
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
66
79
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
67
 
                               CMD_HANDLE, CMD_OPEN)
 
80
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
68
81
    from paramiko.sftp_attr import SFTPAttributes
69
82
    from paramiko.sftp_file import SFTPFile
70
83
 
71
84
 
72
 
register_urlparse_netloc_protocol('sftp')
73
 
 
74
 
 
75
 
# This is a weakref dictionary, so that we can reuse connections
76
 
# that are still active. Long term, it might be nice to have some
77
 
# sort of expiration policy, such as disconnect if inactive for
78
 
# X seconds. But that requires a lot more fanciness.
79
 
_connected_hosts = weakref.WeakValueDictionary()
80
 
 
81
 
 
82
85
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
83
86
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
84
87
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
85
88
 
86
89
 
87
 
def clear_connection_cache():
88
 
    """Remove all hosts from the SFTP connection cache.
89
 
 
90
 
    Primarily useful for test cases wanting to force garbage collection.
91
 
    """
92
 
    _connected_hosts.clear()
93
 
 
94
 
 
95
90
class SFTPLock(object):
96
91
    """This fakes a lock in a remote location.
97
 
    
 
92
 
98
93
    A present lock is indicated just by the existence of a file.  This
99
 
    doesn't work well on all transports and they are only used in 
 
94
    doesn't work well on all transports and they are only used in
100
95
    deprecated storage formats.
101
96
    """
102
 
    
 
97
 
103
98
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
104
99
 
105
100
    def __init__(self, path, transport):
106
 
        assert isinstance(transport, SFTPTransport)
107
 
 
108
101
        self.lock_file = None
109
102
        self.path = path
110
103
        self.lock_path = path + '.write-lock'
116
109
        except FileExists:
117
110
            raise LockError('File %r already locked' % (self.path,))
118
111
 
119
 
    def __del__(self):
120
 
        """Should this warn, or actually try to cleanup?"""
121
 
        if self.lock_file:
122
 
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
123
 
            self.unlock()
124
 
 
125
112
    def unlock(self):
126
113
        if not self.lock_file:
127
114
            return
134
121
            pass
135
122
 
136
123
 
137
 
class SFTPUrlHandling(Transport):
138
 
    """Mix-in that does common handling of SSH/SFTP URLs."""
139
 
 
140
 
    def __init__(self, base):
141
 
        self._parse_url(base)
142
 
        base = self._unparse_url(self._path)
143
 
        if base[-1] != '/':
144
 
            base += '/'
145
 
        super(SFTPUrlHandling, self).__init__(base)
146
 
 
147
 
    def _parse_url(self, url):
148
 
        (self._scheme,
149
 
         self._username, self._password,
150
 
         self._host, self._port, self._path) = self._split_url(url)
151
 
 
152
 
    def _unparse_url(self, path):
153
 
        """Return a URL for a path relative to this transport.
154
 
        """
155
 
        path = urllib.quote(path)
156
 
        # handle homedir paths
157
 
        if not path.startswith('/'):
158
 
            path = "/~/" + path
159
 
        netloc = urllib.quote(self._host)
160
 
        if self._username is not None:
161
 
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
162
 
        if self._port is not None:
163
 
            netloc = '%s:%d' % (netloc, self._port)
164
 
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
165
 
 
166
 
    def _split_url(self, url):
167
 
        (scheme, username, password, host, port, path) = split_url(url)
168
 
        ## assert scheme == 'sftp'
169
 
 
170
 
        # the initial slash should be removed from the path, and treated
171
 
        # as a homedir relative path (the path begins with a double slash
172
 
        # if it is absolute).
173
 
        # see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
174
 
        # RBC 20060118 we are not using this as its too user hostile. instead
175
 
        # we are following lftp and using /~/foo to mean '~/foo'.
176
 
        # handle homedir paths
177
 
        if path.startswith('/~/'):
178
 
            path = path[3:]
179
 
        elif path == '/~':
180
 
            path = ''
181
 
        return (scheme, username, password, host, port, path)
182
 
 
183
 
    def abspath(self, relpath):
184
 
        """Return the full url to the given relative path.
185
 
        
186
 
        @param relpath: the relative path or path components
187
 
        @type relpath: str or list
188
 
        """
189
 
        return self._unparse_url(self._remote_path(relpath))
190
 
    
191
 
    def _remote_path(self, relpath):
192
 
        """Return the path to be passed along the sftp protocol for relpath.
193
 
        
194
 
        :param relpath: is a urlencoded string.
195
 
        """
196
 
        return self._combine_paths(self._path, relpath)
197
 
 
198
 
 
199
 
class SFTPTransport(SFTPUrlHandling):
 
124
class _SFTPReadvHelper(object):
 
125
    """A class to help with managing the state of a readv request."""
 
126
 
 
127
    # See _get_requests for an explanation.
 
128
    _max_request_size = 32768
 
129
 
 
130
    def __init__(self, original_offsets, relpath, _report_activity):
 
131
        """Create a new readv helper.
 
132
 
 
133
        :param original_offsets: The original requests given by the caller of
 
134
            readv()
 
135
        :param relpath: The name of the file (if known)
 
136
        :param _report_activity: A Transport._report_activity bound method,
 
137
            to be called as data arrives.
 
138
        """
 
139
        self.original_offsets = list(original_offsets)
 
140
        self.relpath = relpath
 
141
        self._report_activity = _report_activity
 
142
 
 
143
    def _get_requests(self):
 
144
        """Break up the offsets into individual requests over sftp.
 
145
 
 
146
        The SFTP spec only requires implementers to support 32kB requests. We
 
147
        could try something larger (openssh supports 64kB), but then we have to
 
148
        handle requests that fail.
 
149
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
150
        asyncronously requests them.
 
151
        Newer versions of paramiko would do the chunking for us, but we want to
 
152
        start processing results right away, so we do it ourselves.
 
153
        """
 
154
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
155
        #       data.  I'm not 100% sure that is the best choice.
 
156
 
 
157
        # The first thing we do, is to collapse the individual requests as much
 
158
        # as possible, so we don't issues requests <32kB
 
159
        sorted_offsets = sorted(self.original_offsets)
 
160
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
161
                                                        limit=0, fudge_factor=0))
 
162
        requests = []
 
163
        for c_offset in coalesced:
 
164
            start = c_offset.start
 
165
            size = c_offset.length
 
166
 
 
167
            # Break this up into 32kB requests
 
168
            while size > 0:
 
169
                next_size = min(size, self._max_request_size)
 
170
                requests.append((start, next_size))
 
171
                size -= next_size
 
172
                start += next_size
 
173
        if 'sftp' in debug.debug_flags:
 
174
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
175
                self.relpath, len(sorted_offsets), len(coalesced),
 
176
                len(requests))
 
177
        return requests
 
178
 
 
179
    def request_and_yield_offsets(self, fp):
 
180
        """Request the data from the remote machine, yielding the results.
 
181
 
 
182
        :param fp: A Paramiko SFTPFile object that supports readv.
 
183
        :return: Yield the data requested by the original readv caller, one by
 
184
            one.
 
185
        """
 
186
        requests = self._get_requests()
 
187
        offset_iter = iter(self.original_offsets)
 
188
        cur_offset, cur_size = offset_iter.next()
 
189
        # paramiko .readv() yields strings that are in the order of the requests
 
190
        # So we track the current request to know where the next data is
 
191
        # being returned from.
 
192
        input_start = None
 
193
        last_end = None
 
194
        buffered_data = []
 
195
        buffered_len = 0
 
196
 
 
197
        # This is used to buffer chunks which we couldn't process yet
 
198
        # It is (start, end, data) tuples.
 
199
        data_chunks = []
 
200
        # Create an 'unlimited' data stream, so we stop based on requests,
 
201
        # rather than just because the data stream ended. This lets us detect
 
202
        # short readv.
 
203
        data_stream = itertools.chain(fp.readv(requests),
 
204
                                      itertools.repeat(None))
 
205
        for (start, length), data in itertools.izip(requests, data_stream):
 
206
            if data is None:
 
207
                if cur_coalesced is not None:
 
208
                    raise errors.ShortReadvError(self.relpath,
 
209
                        start, length, len(data))
 
210
            if len(data) != length:
 
211
                raise errors.ShortReadvError(self.relpath,
 
212
                    start, length, len(data))
 
213
            self._report_activity(length, 'read')
 
214
            if last_end is None:
 
215
                # This is the first request, just buffer it
 
216
                buffered_data = [data]
 
217
                buffered_len = length
 
218
                input_start = start
 
219
            elif start == last_end:
 
220
                # The data we are reading fits neatly on the previous
 
221
                # buffer, so this is all part of a larger coalesced range.
 
222
                buffered_data.append(data)
 
223
                buffered_len += length
 
224
            else:
 
225
                # We have an 'interrupt' in the data stream. So we know we are
 
226
                # at a request boundary.
 
227
                if buffered_len > 0:
 
228
                    # We haven't consumed the buffer so far, so put it into
 
229
                    # data_chunks, and continue.
 
230
                    buffered = ''.join(buffered_data)
 
231
                    data_chunks.append((input_start, buffered))
 
232
                input_start = start
 
233
                buffered_data = [data]
 
234
                buffered_len = length
 
235
            last_end = start + length
 
236
            if input_start == cur_offset and cur_size <= buffered_len:
 
237
                # Simplify the next steps a bit by transforming buffered_data
 
238
                # into a single string. We also have the nice property that
 
239
                # when there is only one string ''.join([x]) == x, so there is
 
240
                # no data copying.
 
241
                buffered = ''.join(buffered_data)
 
242
                # Clean out buffered data so that we keep memory
 
243
                # consumption low
 
244
                del buffered_data[:]
 
245
                buffered_offset = 0
 
246
                # TODO: We *could* also consider the case where cur_offset is in
 
247
                #       in the buffered range, even though it doesn't *start*
 
248
                #       the buffered range. But for packs we pretty much always
 
249
                #       read in order, so you won't get any extra data in the
 
250
                #       middle.
 
251
                while (input_start == cur_offset
 
252
                       and (buffered_offset + cur_size) <= buffered_len):
 
253
                    # We've buffered enough data to process this request, spit it
 
254
                    # out
 
255
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
256
                    # move the direct pointer into our buffered data
 
257
                    buffered_offset += cur_size
 
258
                    # Move the start-of-buffer pointer
 
259
                    input_start += cur_size
 
260
                    # Yield the requested data
 
261
                    yield cur_offset, cur_data
 
262
                    cur_offset, cur_size = offset_iter.next()
 
263
                # at this point, we've consumed as much of buffered as we can,
 
264
                # so break off the portion that we consumed
 
265
                if buffered_offset == len(buffered_data):
 
266
                    # No tail to leave behind
 
267
                    buffered_data = []
 
268
                    buffered_len = 0
 
269
                else:
 
270
                    buffered = buffered[buffered_offset:]
 
271
                    buffered_data = [buffered]
 
272
                    buffered_len = len(buffered)
 
273
        # now that the data stream is done, close the handle
 
274
        fp.close()
 
275
        if buffered_len:
 
276
            buffered = ''.join(buffered_data)
 
277
            del buffered_data[:]
 
278
            data_chunks.append((input_start, buffered))
 
279
        if data_chunks:
 
280
            if 'sftp' in debug.debug_flags:
 
281
                mutter('SFTP readv left with %d out-of-order bytes',
 
282
                    sum(map(lambda x: len(x[1]), data_chunks)))
 
283
            # We've processed all the readv data, at this point, anything we
 
284
            # couldn't process is in data_chunks. This doesn't happen often, so
 
285
            # this code path isn't optimized
 
286
            # We use an interesting process for data_chunks
 
287
            # Specifically if we have "bisect_left([(start, len, entries)],
 
288
            #                                       (qstart,)])
 
289
            # If start == qstart, then we get the specific node. Otherwise we
 
290
            # get the previous node
 
291
            while True:
 
292
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
293
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
 
294
                    # The data starts here
 
295
                    data = data_chunks[idx][1][:cur_size]
 
296
                elif idx > 0:
 
297
                    # The data is in a portion of a previous page
 
298
                    idx -= 1
 
299
                    sub_offset = cur_offset - data_chunks[idx][0]
 
300
                    data = data_chunks[idx][1]
 
301
                    data = data[sub_offset:sub_offset + cur_size]
 
302
                else:
 
303
                    # We are missing the page where the data should be found,
 
304
                    # something is wrong
 
305
                    data = ''
 
306
                if len(data) != cur_size:
 
307
                    raise AssertionError('We must have miscalulated.'
 
308
                        ' We expected %d bytes, but only found %d'
 
309
                        % (cur_size, len(data)))
 
310
                yield cur_offset, data
 
311
                cur_offset, cur_size = offset_iter.next()
 
312
 
 
313
 
 
314
class SFTPTransport(ConnectedTransport):
200
315
    """Transport implementation for SFTP access."""
201
316
 
202
317
    _do_prefetch = _default_do_prefetch
217
332
    # up the request itself, rather than us having to worry about it
218
333
    _max_request_size = 32768
219
334
 
220
 
    def __init__(self, base, clone_from=None):
221
 
        super(SFTPTransport, self).__init__(base)
222
 
        if clone_from is None:
223
 
            self._sftp_connect()
224
 
        else:
225
 
            # use the same ssh connection, etc
226
 
            self._sftp = clone_from._sftp
227
 
        # super saves 'self.base'
228
 
    
229
 
    def should_cache(self):
230
 
        """
231
 
        Return True if the data pulled across should be cached locally.
232
 
        """
233
 
        return True
234
 
 
235
 
    def clone(self, offset=None):
236
 
        """
237
 
        Return a new SFTPTransport with root at self.base + offset.
238
 
        We share the same SFTP session between such transports, because it's
239
 
        fairly expensive to set them up.
240
 
        """
241
 
        if offset is None:
242
 
            return SFTPTransport(self.base, self)
243
 
        else:
244
 
            return SFTPTransport(self.abspath(offset), self)
245
 
 
246
335
    def _remote_path(self, relpath):
247
336
        """Return the path to be passed along the sftp protocol for relpath.
248
 
        
249
 
        relpath is a urlencoded string.
250
 
 
251
 
        :return: a path prefixed with / for regular abspath-based urls, or a
252
 
            path that does not begin with / for urls which begin with /~/.
253
 
        """
254
 
        # how does this work? 
255
 
        # it processes relpath with respect to 
256
 
        # our state:
257
 
        # firstly we create a path to evaluate: 
258
 
        # if relpath is an abspath or homedir path, its the entire thing
259
 
        # otherwise we join our base with relpath
260
 
        # then we eliminate all empty segments (double //'s) outside the first
261
 
        # two elements of the list. This avoids problems with trailing 
262
 
        # slashes, or other abnormalities.
263
 
        # finally we evaluate the entire path in a single pass
264
 
        # '.'s are stripped,
265
 
        # '..' result in popping the left most already 
266
 
        # processed path (which can never be empty because of the check for
267
 
        # abspath and homedir meaning that its not, or that we've used our
268
 
        # path. If the pop would pop the root, we ignore it.
269
 
 
270
 
        # Specific case examinations:
271
 
        # remove the special casefor ~: if the current root is ~/ popping of it
272
 
        # = / thus our seed for a ~ based path is ['', '~']
273
 
        # and if we end up with [''] then we had basically ('', '..') (which is
274
 
        # '/..' so we append '' if the length is one, and assert that the first
275
 
        # element is still ''. Lastly, if we end with ['', '~'] as a prefix for
276
 
        # the output, we've got a homedir path, so we strip that prefix before
277
 
        # '/' joining the resulting list.
278
 
        #
279
 
        # case one: '/' -> ['', ''] cannot shrink
280
 
        # case two: '/' + '../foo' -> ['', 'foo'] (take '', '', '..', 'foo')
281
 
        #           and pop the second '' for the '..', append 'foo'
282
 
        # case three: '/~/' -> ['', '~', ''] 
283
 
        # case four: '/~/' + '../foo' -> ['', '~', '', '..', 'foo'],
284
 
        #           and we want to get '/foo' - the empty path in the middle
285
 
        #           needs to be stripped, then normal path manipulation will 
286
 
        #           work.
287
 
        # case five: '/..' ['', '..'], we want ['', '']
288
 
        #            stripping '' outside the first two is ok
289
 
        #            ignore .. if its too high up
290
 
        #
291
 
        # lastly this code is possibly reusable by FTP, but not reusable by
292
 
        # local paths: ~ is resolvable correctly, nor by HTTP or the smart
293
 
        # server: ~ is resolved remotely.
294
 
        # 
295
 
        # however, a version of this that acts on self.base is possible to be
296
 
        # written which manipulates the URL in canonical form, and would be
297
 
        # reusable for all transports, if a flag for allowing ~/ at all was
298
 
        # provided.
299
 
        assert isinstance(relpath, basestring)
300
 
        relpath = urlutils.unescape(relpath)
301
 
 
302
 
        # case 1)
303
 
        if relpath.startswith('/'):
304
 
            # abspath - normal split is fine.
305
 
            current_path = relpath.split('/')
306
 
        elif relpath.startswith('~/'):
307
 
            # root is homedir based: normal split and prefix '' to remote the
308
 
            # special case
309
 
            current_path = [''].extend(relpath.split('/'))
 
337
 
 
338
        :param relpath: is a urlencoded string.
 
339
        """
 
340
        remote_path = self._parsed_url.clone(relpath).path
 
341
        # the initial slash should be removed from the path, and treated as a
 
342
        # homedir relative path (the path begins with a double slash if it is
 
343
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
344
        # RBC 20060118 we are not using this as its too user hostile. instead
 
345
        # we are following lftp and using /~/foo to mean '~/foo'
 
346
        # vila--20070602 and leave absolute paths begin with a single slash.
 
347
        if remote_path.startswith('/~/'):
 
348
            remote_path = remote_path[3:]
 
349
        elif remote_path == '/~':
 
350
            remote_path = ''
 
351
        return remote_path
 
352
 
 
353
    def _create_connection(self, credentials=None):
 
354
        """Create a new connection with the provided credentials.
 
355
 
 
356
        :param credentials: The credentials needed to establish the connection.
 
357
 
 
358
        :return: The created connection and its associated credentials.
 
359
 
 
360
        The credentials are only the password as it may have been entered
 
361
        interactively by the user and may be different from the one provided
 
362
        in base url at transport creation time.
 
363
        """
 
364
        if credentials is None:
 
365
            password = self._parsed_url.password
310
366
        else:
311
 
            # root is from the current directory:
312
 
            if self._path.startswith('/'):
313
 
                # abspath, take the regular split
314
 
                current_path = []
315
 
            else:
316
 
                # homedir based, add the '', '~' not present in self._path
317
 
                current_path = ['', '~']
318
 
            # add our current dir
319
 
            current_path.extend(self._path.split('/'))
320
 
            # add the users relpath
321
 
            current_path.extend(relpath.split('/'))
322
 
        # strip '' segments that are not in the first one - the leading /.
323
 
        to_process = current_path[:1]
324
 
        for segment in current_path[1:]:
325
 
            if segment != '':
326
 
                to_process.append(segment)
327
 
 
328
 
        # process '.' and '..' segments into output_path.
329
 
        output_path = []
330
 
        for segment in to_process:
331
 
            if segment == '..':
332
 
                # directory pop. Remove a directory 
333
 
                # as long as we are not at the root
334
 
                if len(output_path) > 1:
335
 
                    output_path.pop()
336
 
                # else: pass
337
 
                # cannot pop beyond the root, so do nothing
338
 
            elif segment == '.':
339
 
                continue # strip the '.' from the output.
340
 
            else:
341
 
                # this will append '' to output_path for the root elements,
342
 
                # which is appropriate: its why we strip '' in the first pass.
343
 
                output_path.append(segment)
344
 
 
345
 
        # check output special cases:
346
 
        if output_path == ['']:
347
 
            # [''] -> ['', '']
348
 
            output_path = ['', '']
349
 
        elif output_path[:2] == ['', '~']:
350
 
            # ['', '~', ...] -> ...
351
 
            output_path = output_path[2:]
352
 
        path = '/'.join(output_path)
353
 
        return path
354
 
 
355
 
    def relpath(self, abspath):
356
 
        scheme, username, password, host, port, path = self._split_url(abspath)
357
 
        error = []
358
 
        if (username != self._username):
359
 
            error.append('username mismatch')
360
 
        if (host != self._host):
361
 
            error.append('host mismatch')
362
 
        if (port != self._port):
363
 
            error.append('port mismatch')
364
 
        if (not path.startswith(self._path)):
365
 
            error.append('path mismatch')
366
 
        if error:
367
 
            extra = ': ' + ', '.join(error)
368
 
            raise PathNotChild(abspath, self.base, extra=extra)
369
 
        pl = len(self._path)
370
 
        return path[pl:].strip('/')
 
367
            password = credentials
 
368
 
 
369
        vendor = ssh._get_ssh_vendor()
 
370
        user = self._parsed_url.user
 
371
        if user is None:
 
372
            auth = config.AuthenticationConfig()
 
373
            user = auth.get_user('ssh', self._parsed_url.host,
 
374
                self._parsed_url.port)
 
375
        connection = vendor.connect_sftp(self._parsed_url.user, password,
 
376
            self._parsed_url.host, self._parsed_url.port)
 
377
        return connection, (user, password)
 
378
 
 
379
    def disconnect(self):
 
380
        connection = self._get_connection()
 
381
        if connection is not None:
 
382
            connection.close()
 
383
 
 
384
    def _get_sftp(self):
 
385
        """Ensures that a connection is established"""
 
386
        connection = self._get_connection()
 
387
        if connection is None:
 
388
            # First connection ever
 
389
            connection, credentials = self._create_connection()
 
390
            self._set_connection(connection, credentials)
 
391
        return connection
371
392
 
372
393
    def has(self, relpath):
373
394
        """
374
395
        Does the target location exist?
375
396
        """
376
397
        try:
377
 
            self._sftp.stat(self._remote_path(relpath))
 
398
            self._get_sftp().stat(self._remote_path(relpath))
 
399
            # stat result is about 20 bytes, let's say
 
400
            self._report_activity(20, 'read')
378
401
            return True
379
402
        except IOError:
380
403
            return False
381
404
 
382
405
    def get(self, relpath):
383
 
        """
384
 
        Get the file at the given relative path.
 
406
        """Get the file at the given relative path.
385
407
 
386
408
        :param relpath: The relative path to the file
387
409
        """
388
410
        try:
389
411
            path = self._remote_path(relpath)
390
 
            f = self._sftp.file(path, mode='rb')
 
412
            f = self._get_sftp().file(path, mode='rb')
391
413
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
392
414
                f.prefetch()
393
415
            return f
394
416
        except (IOError, paramiko.SSHException), e:
395
 
            self._translate_io_exception(e, path, ': error retrieving')
396
 
 
397
 
    def readv(self, relpath, offsets):
 
417
            self._translate_io_exception(e, path, ': error retrieving',
 
418
                failure_exc=errors.ReadError)
 
419
 
 
420
    def get_bytes(self, relpath):
 
421
        # reimplement this here so that we can report how many bytes came back
 
422
        f = self.get(relpath)
 
423
        try:
 
424
            bytes = f.read()
 
425
            self._report_activity(len(bytes), 'read')
 
426
            return bytes
 
427
        finally:
 
428
            f.close()
 
429
 
 
430
    def _readv(self, relpath, offsets):
398
431
        """See Transport.readv()"""
399
432
        # We overload the default readv() because we want to use a file
400
433
        # that does not have prefetch enabled.
404
437
 
405
438
        try:
406
439
            path = self._remote_path(relpath)
407
 
            fp = self._sftp.file(path, mode='rb')
 
440
            fp = self._get_sftp().file(path, mode='rb')
408
441
            readv = getattr(fp, 'readv', None)
409
442
            if readv:
410
443
                return self._sftp_readv(fp, offsets, relpath)
411
 
            mutter('seek and read %s offsets', len(offsets))
 
444
            if 'sftp' in debug.debug_flags:
 
445
                mutter('seek and read %s offsets', len(offsets))
412
446
            return self._seek_and_read(fp, offsets, relpath)
413
447
        except (IOError, paramiko.SSHException), e:
414
448
            self._translate_io_exception(e, path, ': error retrieving')
415
449
 
416
 
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
 
450
    def recommended_page_size(self):
 
451
        """See Transport.recommended_page_size().
 
452
 
 
453
        For SFTP we suggest a large page size to reduce the overhead
 
454
        introduced by latency.
 
455
        """
 
456
        return 64 * 1024
 
457
 
 
458
    def _sftp_readv(self, fp, offsets, relpath):
417
459
        """Use the readv() member of fp to do async readv.
418
460
 
419
 
        And then read them using paramiko.readv(). paramiko.readv()
 
461
        Then read them using paramiko.readv(). paramiko.readv()
420
462
        does not support ranges > 64K, so it caps the request size, and
421
 
        just reads until it gets all the stuff it wants
 
463
        just reads until it gets all the stuff it wants.
422
464
        """
423
 
        offsets = list(offsets)
424
 
        sorted_offsets = sorted(offsets)
425
 
 
426
 
        # The algorithm works as follows:
427
 
        # 1) Coalesce nearby reads into a single chunk
428
 
        #    This generates a list of combined regions, the total size
429
 
        #    and the size of the sub regions. This coalescing step is limited
430
 
        #    in the number of nearby chunks to combine, and is allowed to
431
 
        #    skip small breaks in the requests. Limiting it makes sure that
432
 
        #    we can start yielding some data earlier, and skipping means we
433
 
        #    make fewer requests. (Beneficial even when using async)
434
 
        # 2) Break up this combined regions into chunks that are smaller
435
 
        #    than 64KiB. Technically the limit is 65536, but we are a
436
 
        #    little bit conservative. This is because sftp has a maximum
437
 
        #    return chunk size of 64KiB (max size of an unsigned short)
438
 
        # 3) Issue a readv() to paramiko to create an async request for
439
 
        #    all of this data
440
 
        # 4) Read in the data as it comes back, until we've read one
441
 
        #    continuous section as determined in step 1
442
 
        # 5) Break up the full sections into hunks for the original requested
443
 
        #    offsets. And put them in a cache
444
 
        # 6) Check if the next request is in the cache, and if it is, remove
445
 
        #    it from the cache, and yield its data. Continue until no more
446
 
        #    entries are in the cache.
447
 
        # 7) loop back to step 4 until all data has been read
448
 
        #
449
 
        # TODO: jam 20060725 This could be optimized one step further, by
450
 
        #       attempting to yield whatever data we have read, even before
451
 
        #       the first coallesced section has been fully processed.
452
 
 
453
 
        # When coalescing for use with readv(), we don't really need to
454
 
        # use any fudge factor, because the requests are made asynchronously
455
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
456
 
                               limit=self._max_readv_combine,
457
 
                               fudge_factor=0,
458
 
                               ))
459
 
        requests = []
460
 
        for c_offset in coalesced:
461
 
            start = c_offset.start
462
 
            size = c_offset.length
463
 
 
464
 
            # We need to break this up into multiple requests
465
 
            while size > 0:
466
 
                next_size = min(size, self._max_request_size)
467
 
                requests.append((start, next_size))
468
 
                size -= next_size
469
 
                start += next_size
470
 
 
471
 
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
472
 
                len(offsets), len(coalesced), len(requests))
473
 
 
474
 
        # Queue the current read until we have read the full coalesced section
475
 
        cur_data = []
476
 
        cur_data_len = 0
477
 
        cur_coalesced_stack = iter(coalesced)
478
 
        cur_coalesced = cur_coalesced_stack.next()
479
 
 
480
 
        # Cache the results, but only until they have been fulfilled
481
 
        data_map = {}
482
 
        # turn the list of offsets into a stack
483
 
        offset_stack = iter(offsets)
484
 
        cur_offset_and_size = offset_stack.next()
485
 
 
486
 
        for data in fp.readv(requests):
487
 
            cur_data += data
488
 
            cur_data_len += len(data)
489
 
 
490
 
            if cur_data_len < cur_coalesced.length:
491
 
                continue
492
 
            assert cur_data_len == cur_coalesced.length, \
493
 
                "Somehow we read too much: %s != %s" % (cur_data_len,
494
 
                                                        cur_coalesced.length)
495
 
            all_data = ''.join(cur_data)
496
 
            cur_data = []
497
 
            cur_data_len = 0
498
 
 
499
 
            for suboffset, subsize in cur_coalesced.ranges:
500
 
                key = (cur_coalesced.start+suboffset, subsize)
501
 
                data_map[key] = all_data[suboffset:suboffset+subsize]
502
 
 
503
 
            # Now that we've read some data, see if we can yield anything back
504
 
            while cur_offset_and_size in data_map:
505
 
                this_data = data_map.pop(cur_offset_and_size)
506
 
                yield cur_offset_and_size[0], this_data
507
 
                cur_offset_and_size = offset_stack.next()
508
 
 
509
 
            # We read a coalesced entry, so mark it as done
510
 
            cur_coalesced = None
511
 
            # Now that we've read all of the data for this coalesced section
512
 
            # on to the next
513
 
            cur_coalesced = cur_coalesced_stack.next()
514
 
 
515
 
        if cur_coalesced is not None:
516
 
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
517
 
                cur_coalesced.length, len(data))
 
465
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
 
466
        return helper.request_and_yield_offsets(fp)
518
467
 
519
468
    def put_file(self, relpath, f, mode=None):
520
469
        """
525
474
        :param mode: The final mode for the file
526
475
        """
527
476
        final_path = self._remote_path(relpath)
528
 
        self._put(final_path, f, mode=mode)
 
477
        return self._put(final_path, f, mode=mode)
529
478
 
530
479
    def _put(self, abspath, f, mode=None):
531
480
        """Helper function so both put() and copy_abspaths can reuse the code"""
536
485
        try:
537
486
            try:
538
487
                fout.set_pipelined(True)
539
 
                self._pump(f, fout)
 
488
                length = self._pump(f, fout)
540
489
            except (IOError, paramiko.SSHException), e:
541
490
                self._translate_io_exception(e, tmp_abspath)
542
491
            # XXX: This doesn't truly help like we would like it to.
545
494
            #      sticky bit. So it is probably best to stop chmodding, and
546
495
            #      just tell users that they need to set the umask correctly.
547
496
            #      The attr.st_mode = mode, in _sftp_open_exclusive
548
 
            #      will handle when the user wants the final mode to be more 
549
 
            #      restrictive. And then we avoid a round trip. Unless 
 
497
            #      will handle when the user wants the final mode to be more
 
498
            #      restrictive. And then we avoid a round trip. Unless
550
499
            #      paramiko decides to expose an async chmod()
551
500
 
552
501
            # This is designed to chmod() right before we close.
553
 
            # Because we set_pipelined() earlier, theoretically we might 
 
502
            # Because we set_pipelined() earlier, theoretically we might
554
503
            # avoid the round trip for fout.close()
555
504
            if mode is not None:
556
 
                self._sftp.chmod(tmp_abspath, mode)
 
505
                self._get_sftp().chmod(tmp_abspath, mode)
557
506
            fout.close()
558
507
            closed = True
559
508
            self._rename_and_overwrite(tmp_abspath, abspath)
 
509
            return length
560
510
        except Exception, e:
561
511
            # If we fail, try to clean up the temporary file
562
512
            # before we throw the exception
568
518
            try:
569
519
                if not closed:
570
520
                    fout.close()
571
 
                self._sftp.remove(tmp_abspath)
 
521
                self._get_sftp().remove(tmp_abspath)
572
522
            except:
573
523
                # raise the saved except
574
524
                raise e
589
539
            fout = None
590
540
            try:
591
541
                try:
592
 
                    fout = self._sftp.file(abspath, mode='wb')
 
542
                    fout = self._get_sftp().file(abspath, mode='wb')
593
543
                    fout.set_pipelined(True)
594
544
                    writer(fout)
595
545
                except (paramiko.SSHException, IOError), e:
597
547
                                                 ': unable to open')
598
548
 
599
549
                # This is designed to chmod() right before we close.
600
 
                # Because we set_pipelined() earlier, theoretically we might 
 
550
                # Because we set_pipelined() earlier, theoretically we might
601
551
                # avoid the round trip for fout.close()
602
552
                if mode is not None:
603
 
                    self._sftp.chmod(abspath, mode)
 
553
                    self._get_sftp().chmod(abspath, mode)
604
554
            finally:
605
555
                if fout is not None:
606
556
                    fout.close()
643
593
                                    create_parent_dir=create_parent_dir,
644
594
                                    dir_mode=dir_mode)
645
595
 
646
 
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
596
    def put_bytes_non_atomic(self, relpath, raw_bytes, mode=None,
647
597
                             create_parent_dir=False,
648
598
                             dir_mode=None):
 
599
        if not isinstance(raw_bytes, str):
 
600
            raise TypeError(
 
601
                'raw_bytes must be a plain string, not %s' % type(raw_bytes))
 
602
 
649
603
        def writer(fout):
650
 
            fout.write(bytes)
 
604
            fout.write(raw_bytes)
651
605
        self._put_non_atomic_helper(relpath, writer, mode=mode,
652
606
                                    create_parent_dir=create_parent_dir,
653
607
                                    dir_mode=dir_mode)
654
608
 
655
609
    def iter_files_recursive(self):
656
610
        """Walk the relative paths of all files in this transport."""
 
611
        # progress is handled by list_dir
657
612
        queue = list(self.list_dir('.'))
658
613
        while queue:
659
614
            relpath = queue.pop(0)
670
625
        else:
671
626
            local_mode = mode
672
627
        try:
673
 
            self._sftp.mkdir(abspath, local_mode)
 
628
            self._report_activity(len(abspath), 'write')
 
629
            self._get_sftp().mkdir(abspath, local_mode)
 
630
            self._report_activity(1, 'read')
674
631
            if mode is not None:
675
 
                self._sftp.chmod(abspath, mode=mode)
 
632
                # chmod a dir through sftp will erase any sgid bit set
 
633
                # on the server side.  So, if the bit mode are already
 
634
                # set, avoid the chmod.  If the mode is not fine but
 
635
                # the sgid bit is set, report a warning to the user
 
636
                # with the umask fix.
 
637
                stat = self._get_sftp().lstat(abspath)
 
638
                mode = mode & 0777 # can't set special bits anyway
 
639
                if mode != stat.st_mode & 0777:
 
640
                    if stat.st_mode & 06000:
 
641
                        warning('About to chmod %s over sftp, which will result'
 
642
                                ' in its suid or sgid bits being cleared.  If'
 
643
                                ' you want to preserve those bits, change your '
 
644
                                ' environment on the server to use umask 0%03o.'
 
645
                                % (abspath, 0777 - mode))
 
646
                    self._get_sftp().chmod(abspath, mode=mode)
676
647
        except (paramiko.SSHException, IOError), e:
677
648
            self._translate_io_exception(e, abspath, ': unable to mkdir',
678
649
                failure_exc=FileExists)
681
652
        """Create a directory at the given path."""
682
653
        self._mkdir(self._remote_path(relpath), mode=mode)
683
654
 
684
 
    def _translate_io_exception(self, e, path, more_info='', 
 
655
    def open_write_stream(self, relpath, mode=None):
 
656
        """See Transport.open_write_stream."""
 
657
        # initialise the file to zero-length
 
658
        # this is three round trips, but we don't use this
 
659
        # api more than once per write_group at the moment so
 
660
        # it is a tolerable overhead. Better would be to truncate
 
661
        # the file after opening. RBC 20070805
 
662
        self.put_bytes_non_atomic(relpath, "", mode)
 
663
        abspath = self._remote_path(relpath)
 
664
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
665
        #       set the file mode at create time. If it does, use it.
 
666
        #       But for now, we just chmod later anyway.
 
667
        handle = None
 
668
        try:
 
669
            handle = self._get_sftp().file(abspath, mode='wb')
 
670
            handle.set_pipelined(True)
 
671
        except (paramiko.SSHException, IOError), e:
 
672
            self._translate_io_exception(e, abspath,
 
673
                                         ': unable to open')
 
674
        _file_streams[self.abspath(relpath)] = handle
 
675
        return FileFileStream(self, relpath, handle)
 
676
 
 
677
    def _translate_io_exception(self, e, path, more_info='',
685
678
                                failure_exc=PathError):
686
679
        """Translate a paramiko or IOError into a friendlier exception.
687
680
 
692
685
        :param failure_exc: Paramiko has the super fun ability to raise completely
693
686
                           opaque errors that just set "e.args = ('Failure',)" with
694
687
                           no more information.
695
 
                           If this parameter is set, it defines the exception 
 
688
                           If this parameter is set, it defines the exception
696
689
                           to raise in these cases.
697
690
        """
698
691
        # paramiko seems to generate detailless errors.
701
694
            if (e.args == ('No such file or directory',) or
702
695
                e.args == ('No such file',)):
703
696
                raise NoSuchFile(path, str(e) + more_info)
704
 
            if (e.args == ('mkdir failed',)):
 
697
            if (e.args == ('mkdir failed',) or
 
698
                e.args[0].startswith('syserr: File exists')):
705
699
                raise FileExists(path, str(e) + more_info)
706
700
            # strange but true, for the paramiko server.
707
701
            if (e.args == ('Failure',)):
708
702
                raise failure_exc(path, str(e) + more_info)
 
703
            # Can be something like args = ('Directory not empty:
 
704
            # '/srv/bazaar.launchpad.net/blah...: '
 
705
            # [Errno 39] Directory not empty',)
 
706
            if (e.args[0].startswith('Directory not empty: ')
 
707
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
 
708
                raise errors.DirectoryNotEmpty(path, str(e))
 
709
            if e.args == ('Operation unsupported',):
 
710
                raise errors.TransportNotPossible()
709
711
            mutter('Raising exception with args %s', e.args)
710
712
        if getattr(e, 'errno', None) is not None:
711
713
            mutter('Raising exception with errno %s', e.errno)
718
720
        """
719
721
        try:
720
722
            path = self._remote_path(relpath)
721
 
            fout = self._sftp.file(path, 'ab')
 
723
            fout = self._get_sftp().file(path, 'ab')
722
724
            if mode is not None:
723
 
                self._sftp.chmod(path, mode)
 
725
                self._get_sftp().chmod(path, mode)
724
726
            result = fout.tell()
725
727
            self._pump(f, fout)
726
728
            return result
730
732
    def rename(self, rel_from, rel_to):
731
733
        """Rename without special overwriting"""
732
734
        try:
733
 
            self._sftp.rename(self._remote_path(rel_from),
 
735
            self._get_sftp().rename(self._remote_path(rel_from),
734
736
                              self._remote_path(rel_to))
735
737
        except (IOError, paramiko.SSHException), e:
736
738
            self._translate_io_exception(e, rel_from,
738
740
 
739
741
    def _rename_and_overwrite(self, abs_from, abs_to):
740
742
        """Do a fancy rename on the remote server.
741
 
        
 
743
 
742
744
        Using the implementation provided by osutils.
743
745
        """
744
746
        try:
 
747
            sftp = self._get_sftp()
745
748
            fancy_rename(abs_from, abs_to,
746
 
                    rename_func=self._sftp.rename,
747
 
                    unlink_func=self._sftp.remove)
 
749
                         rename_func=sftp.rename,
 
750
                         unlink_func=sftp.remove)
748
751
        except (IOError, paramiko.SSHException), e:
749
 
            self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
 
752
            self._translate_io_exception(e, abs_from,
 
753
                                         ': unable to rename to %r' % (abs_to))
750
754
 
751
755
    def move(self, rel_from, rel_to):
752
756
        """Move the item at rel_from to the location at rel_to"""
758
762
        """Delete the item at relpath"""
759
763
        path = self._remote_path(relpath)
760
764
        try:
761
 
            self._sftp.remove(path)
 
765
            self._get_sftp().remove(path)
762
766
        except (IOError, paramiko.SSHException), e:
763
767
            self._translate_io_exception(e, path, ': unable to delete')
764
 
            
 
768
 
 
769
    def external_url(self):
 
770
        """See bzrlib.transport.Transport.external_url."""
 
771
        # the external path for SFTP is the base
 
772
        return self.base
 
773
 
765
774
    def listable(self):
766
775
        """Return True if this store supports listing."""
767
776
        return True
776
785
        # -- David Allouche 2006-08-11
777
786
        path = self._remote_path(relpath)
778
787
        try:
779
 
            entries = self._sftp.listdir(path)
 
788
            entries = self._get_sftp().listdir(path)
 
789
            self._report_activity(sum(map(len, entries)), 'read')
780
790
        except (IOError, paramiko.SSHException), e:
781
791
            self._translate_io_exception(e, path, ': failed to list_dir')
782
792
        return [urlutils.escape(entry) for entry in entries]
785
795
        """See Transport.rmdir."""
786
796
        path = self._remote_path(relpath)
787
797
        try:
788
 
            return self._sftp.rmdir(path)
 
798
            return self._get_sftp().rmdir(path)
789
799
        except (IOError, paramiko.SSHException), e:
790
800
            self._translate_io_exception(e, path, ': failed to rmdir')
791
801
 
793
803
        """Return the stat information for a file."""
794
804
        path = self._remote_path(relpath)
795
805
        try:
796
 
            return self._sftp.stat(path)
 
806
            return self._get_sftp().lstat(path)
797
807
        except (IOError, paramiko.SSHException), e:
798
808
            self._translate_io_exception(e, path, ': unable to stat')
799
809
 
 
810
    def readlink(self, relpath):
 
811
        """See Transport.readlink."""
 
812
        path = self._remote_path(relpath)
 
813
        try:
 
814
            return self._get_sftp().readlink(path)
 
815
        except (IOError, paramiko.SSHException), e:
 
816
            self._translate_io_exception(e, path, ': unable to readlink')
 
817
 
 
818
    def symlink(self, source, link_name):
 
819
        """See Transport.symlink."""
 
820
        try:
 
821
            conn = self._get_sftp()
 
822
            sftp_retval = conn.symlink(source, link_name)
 
823
            if SFTP_OK != sftp_retval:
 
824
                raise TransportError(
 
825
                    '%r: unable to create symlink to %r' % (link_name, source),
 
826
                    sftp_retval
 
827
                )
 
828
        except (IOError, paramiko.SSHException), e:
 
829
            self._translate_io_exception(e, link_name,
 
830
                                         ': unable to create symlink to %r' % (source))
 
831
 
800
832
    def lock_read(self, relpath):
801
833
        """
802
834
        Lock the given file for shared (read) access.
823
855
        # that we have taken the lock.
824
856
        return SFTPLock(relpath, self)
825
857
 
826
 
    def _sftp_connect(self):
827
 
        """Connect to the remote sftp server.
828
 
        After this, self._sftp should have a valid connection (or
829
 
        we raise an TransportError 'could not connect').
830
 
 
831
 
        TODO: Raise a more reasonable ConnectionFailed exception
832
 
        """
833
 
        self._sftp = _sftp_connect(self._host, self._port, self._username,
834
 
                self._password)
835
 
 
836
858
    def _sftp_open_exclusive(self, abspath, mode=None):
837
859
        """Open a remote path exclusively.
838
860
 
849
871
        """
850
872
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
851
873
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
852
 
        #       However, there is no way to set the permission mode at open 
 
874
        #       However, there is no way to set the permission mode at open
853
875
        #       time using the sftp_client.file() functionality.
854
 
        path = self._sftp._adjust_cwd(abspath)
 
876
        path = self._get_sftp()._adjust_cwd(abspath)
855
877
        # mutter('sftp abspath %s => %s', abspath, path)
856
878
        attr = SFTPAttributes()
857
879
        if mode is not None:
858
880
            attr.st_mode = mode
859
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
881
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
860
882
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
861
883
        try:
862
 
            t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
 
884
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
863
885
            if t != CMD_HANDLE:
864
886
                raise TransportError('Expected an SFTP handle')
865
887
            handle = msg.get_string()
866
 
            return SFTPFile(self._sftp, handle, 'wb', -1)
 
888
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
867
889
        except (paramiko.SSHException, IOError), e:
868
890
            self._translate_io_exception(e, abspath, ': unable to open',
869
891
                failure_exc=FileExists)
875
897
        else:
876
898
            return True
877
899
 
878
 
# ------------- server test implementation --------------
879
 
import threading
880
 
 
881
 
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
882
 
 
883
 
STUB_SERVER_KEY = """
884
 
-----BEGIN RSA PRIVATE KEY-----
885
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
886
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
887
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
888
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
889
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
890
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
891
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
892
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
893
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
894
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
895
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
896
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
897
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
898
 
-----END RSA PRIVATE KEY-----
899
 
"""
900
 
 
901
 
 
902
 
class SocketListener(threading.Thread):
903
 
 
904
 
    def __init__(self, callback):
905
 
        threading.Thread.__init__(self)
906
 
        self._callback = callback
907
 
        self._socket = socket.socket()
908
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
909
 
        self._socket.bind(('localhost', 0))
910
 
        self._socket.listen(1)
911
 
        self.port = self._socket.getsockname()[1]
912
 
        self._stop_event = threading.Event()
913
 
 
914
 
    def stop(self):
915
 
        # called from outside this thread
916
 
        self._stop_event.set()
917
 
        # use a timeout here, because if the test fails, the server thread may
918
 
        # never notice the stop_event.
919
 
        self.join(5.0)
920
 
        self._socket.close()
921
 
 
922
 
    def run(self):
923
 
        while True:
924
 
            readable, writable_unused, exception_unused = \
925
 
                select.select([self._socket], [], [], 0.1)
926
 
            if self._stop_event.isSet():
927
 
                return
928
 
            if len(readable) == 0:
929
 
                continue
930
 
            try:
931
 
                s, addr_unused = self._socket.accept()
932
 
                # because the loopback socket is inline, and transports are
933
 
                # never explicitly closed, best to launch a new thread.
934
 
                threading.Thread(target=self._callback, args=(s,)).start()
935
 
            except socket.error, x:
936
 
                sys.excepthook(*sys.exc_info())
937
 
                warning('Socket error during accept() within unit test server'
938
 
                        ' thread: %r' % x)
939
 
            except Exception, x:
940
 
                # probably a failed test; unit test thread will log the
941
 
                # failure/error
942
 
                sys.excepthook(*sys.exc_info())
943
 
                warning('Exception from within unit test server thread: %r' % 
944
 
                        x)
945
 
 
946
 
 
947
 
class SocketDelay(object):
948
 
    """A socket decorator to make TCP appear slower.
949
 
 
950
 
    This changes recv, send, and sendall to add a fixed latency to each python
951
 
    call if a new roundtrip is detected. That is, when a recv is called and the
952
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
953
 
    sets this flag.
954
 
 
955
 
    In addition every send, sendall and recv sleeps a bit per character send to
956
 
    simulate bandwidth.
957
 
 
958
 
    Not all methods are implemented, this is deliberate as this class is not a
959
 
    replacement for the builtin sockets layer. fileno is not implemented to
960
 
    prevent the proxy being bypassed. 
961
 
    """
962
 
 
963
 
    simulated_time = 0
964
 
    _proxied_arguments = dict.fromkeys([
965
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
966
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
967
 
 
968
 
    def __init__(self, sock, latency, bandwidth=1.0, 
969
 
                 really_sleep=True):
970
 
        """ 
971
 
        :param bandwith: simulated bandwith (MegaBit)
972
 
        :param really_sleep: If set to false, the SocketDelay will just
973
 
        increase a counter, instead of calling time.sleep. This is useful for
974
 
        unittesting the SocketDelay.
975
 
        """
976
 
        self.sock = sock
977
 
        self.latency = latency
978
 
        self.really_sleep = really_sleep
979
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
980
 
        self.new_roundtrip = False
981
 
 
982
 
    def sleep(self, s):
983
 
        if self.really_sleep:
984
 
            time.sleep(s)
985
 
        else:
986
 
            SocketDelay.simulated_time += s
987
 
 
988
 
    def __getattr__(self, attr):
989
 
        if attr in SocketDelay._proxied_arguments:
990
 
            return getattr(self.sock, attr)
991
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
992
 
                             attr)
993
 
 
994
 
    def dup(self):
995
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
996
 
                           self._sleep)
997
 
 
998
 
    def recv(self, *args):
999
 
        data = self.sock.recv(*args)
1000
 
        if data and self.new_roundtrip:
1001
 
            self.new_roundtrip = False
1002
 
            self.sleep(self.latency)
1003
 
        self.sleep(len(data) * self.time_per_byte)
1004
 
        return data
1005
 
 
1006
 
    def sendall(self, data, flags=0):
1007
 
        if not self.new_roundtrip:
1008
 
            self.new_roundtrip = True
1009
 
            self.sleep(self.latency)
1010
 
        self.sleep(len(data) * self.time_per_byte)
1011
 
        return self.sock.sendall(data, flags)
1012
 
 
1013
 
    def send(self, data, flags=0):
1014
 
        if not self.new_roundtrip:
1015
 
            self.new_roundtrip = True
1016
 
            self.sleep(self.latency)
1017
 
        bytes_sent = self.sock.send(data, flags)
1018
 
        self.sleep(bytes_sent * self.time_per_byte)
1019
 
        return bytes_sent
1020
 
 
1021
 
 
1022
 
class SFTPServer(Server):
1023
 
    """Common code for SFTP server facilities."""
1024
 
 
1025
 
    def __init__(self, server_interface=StubServer):
1026
 
        self._original_vendor = None
1027
 
        self._homedir = None
1028
 
        self._server_homedir = None
1029
 
        self._listener = None
1030
 
        self._root = None
1031
 
        self._vendor = ssh.ParamikoVendor()
1032
 
        self._server_interface = server_interface
1033
 
        # sftp server logs
1034
 
        self.logs = []
1035
 
        self.add_latency = 0
1036
 
 
1037
 
    def _get_sftp_url(self, path):
1038
 
        """Calculate an sftp url to this server for path."""
1039
 
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
1040
 
 
1041
 
    def log(self, message):
1042
 
        """StubServer uses this to log when a new server is created."""
1043
 
        self.logs.append(message)
1044
 
 
1045
 
    def _run_server_entry(self, sock):
1046
 
        """Entry point for all implementations of _run_server.
1047
 
        
1048
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
1049
 
        decorator.
1050
 
        """
1051
 
        if self.add_latency > 0.000001:
1052
 
            sock = SocketDelay(sock, self.add_latency)
1053
 
        return self._run_server(sock)
1054
 
 
1055
 
    def _run_server(self, s):
1056
 
        ssh_server = paramiko.Transport(s)
1057
 
        key_file = pathjoin(self._homedir, 'test_rsa.key')
1058
 
        f = open(key_file, 'w')
1059
 
        f.write(STUB_SERVER_KEY)
1060
 
        f.close()
1061
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
1062
 
        ssh_server.add_server_key(host_key)
1063
 
        server = self._server_interface(self)
1064
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
1065
 
                                         StubSFTPServer, root=self._root,
1066
 
                                         home=self._server_homedir)
1067
 
        event = threading.Event()
1068
 
        ssh_server.start_server(event, server)
1069
 
        event.wait(5.0)
1070
 
    
1071
 
    def setUp(self):
1072
 
        self._original_vendor = ssh._ssh_vendor
1073
 
        ssh._ssh_vendor = self._vendor
1074
 
        if sys.platform == 'win32':
1075
 
            # Win32 needs to use the UNICODE api
1076
 
            self._homedir = getcwd()
1077
 
        else:
1078
 
            # But Linux SFTP servers should just deal in bytestreams
1079
 
            self._homedir = os.getcwd()
1080
 
        if self._server_homedir is None:
1081
 
            self._server_homedir = self._homedir
1082
 
        self._root = '/'
1083
 
        if sys.platform == 'win32':
1084
 
            self._root = ''
1085
 
        self._listener = SocketListener(self._run_server_entry)
1086
 
        self._listener.setDaemon(True)
1087
 
        self._listener.start()
1088
 
 
1089
 
    def tearDown(self):
1090
 
        """See bzrlib.transport.Server.tearDown."""
1091
 
        self._listener.stop()
1092
 
        ssh._ssh_vendor = self._original_vendor
1093
 
 
1094
 
    def get_bogus_url(self):
1095
 
        """See bzrlib.transport.Server.get_bogus_url."""
1096
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
1097
 
        # we bind a random socket, so that we get a guaranteed unused port
1098
 
        # we just never listen on that port
1099
 
        s = socket.socket()
1100
 
        s.bind(('localhost', 0))
1101
 
        return 'sftp://%s:%s/' % s.getsockname()
1102
 
 
1103
 
 
1104
 
class SFTPFullAbsoluteServer(SFTPServer):
1105
 
    """A test server for sftp transports, using absolute urls and ssh."""
1106
 
 
1107
 
    def get_url(self):
1108
 
        """See bzrlib.transport.Server.get_url."""
1109
 
        return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1110
 
 
1111
 
 
1112
 
class SFTPServerWithoutSSH(SFTPServer):
1113
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
1114
 
 
1115
 
    def __init__(self):
1116
 
        super(SFTPServerWithoutSSH, self).__init__()
1117
 
        self._vendor = ssh.LoopbackVendor()
1118
 
 
1119
 
    def _run_server(self, sock):
1120
 
        # Re-import these as locals, so that they're still accessible during
1121
 
        # interpreter shutdown (when all module globals get set to None, leading
1122
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
1123
 
        class FakeChannel(object):
1124
 
            def get_transport(self):
1125
 
                return self
1126
 
            def get_log_channel(self):
1127
 
                return 'paramiko'
1128
 
            def get_name(self):
1129
 
                return '1'
1130
 
            def get_hexdump(self):
1131
 
                return False
1132
 
            def close(self):
1133
 
                pass
1134
 
 
1135
 
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1136
 
                                     root=self._root, home=self._server_homedir)
1137
 
        try:
1138
 
            server.start_subsystem('sftp', None, sock)
1139
 
        except socket.error, e:
1140
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1141
 
                # it's okay for the client to disconnect abruptly
1142
 
                # (bug in paramiko 1.6: it should absorb this exception)
1143
 
                pass
1144
 
            else:
1145
 
                raise
1146
 
        except Exception, e:
1147
 
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1148
 
        server.finish_subsystem()
1149
 
 
1150
 
 
1151
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1152
 
    """A test server for sftp transports, using absolute urls."""
1153
 
 
1154
 
    def get_url(self):
1155
 
        """See bzrlib.transport.Server.get_url."""
1156
 
        if sys.platform == 'win32':
1157
 
            return self._get_sftp_url(urlutils.escape(self._homedir))
1158
 
        else:
1159
 
            return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1160
 
 
1161
 
 
1162
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1163
 
    """A test server for sftp transports, using homedir relative urls."""
1164
 
 
1165
 
    def get_url(self):
1166
 
        """See bzrlib.transport.Server.get_url."""
1167
 
        return self._get_sftp_url("~/")
1168
 
 
1169
 
 
1170
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1171
 
    """A test servere for sftp transports, using absolute urls to non-home."""
1172
 
 
1173
 
    def setUp(self):
1174
 
        self._server_homedir = '/dev/noone/runs/tests/here'
1175
 
        super(SFTPSiblingAbsoluteServer, self).setUp()
1176
 
 
1177
 
 
1178
 
def _sftp_connect(host, port, username, password):
1179
 
    """Connect to the remote sftp server.
1180
 
 
1181
 
    :raises: a TransportError 'could not connect'.
1182
 
 
1183
 
    :returns: an paramiko.sftp_client.SFTPClient
1184
 
 
1185
 
    TODO: Raise a more reasonable ConnectionFailed exception
1186
 
    """
1187
 
    idx = (host, port, username)
1188
 
    try:
1189
 
        return _connected_hosts[idx]
1190
 
    except KeyError:
1191
 
        pass
1192
 
    
1193
 
    sftp = _sftp_connect_uncached(host, port, username, password)
1194
 
    _connected_hosts[idx] = sftp
1195
 
    return sftp
1196
 
 
1197
 
def _sftp_connect_uncached(host, port, username, password):
1198
 
    vendor = ssh._get_ssh_vendor()
1199
 
    sftp = vendor.connect_sftp(username, password, host, port)
1200
 
    return sftp
1201
 
 
1202
900
 
1203
901
def get_test_permutations():
1204
902
    """Return the permutations to be used in testing."""
1205
 
    return [(SFTPTransport, SFTPAbsoluteServer),
1206
 
            (SFTPTransport, SFTPHomeDirServer),
1207
 
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
903
    from bzrlib.tests import stub_sftp
 
904
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
 
905
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
 
906
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
1208
907
            ]