~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-05-04 12:10:51 UTC
  • mfrom: (5819.1.4 777007-developer-doc)
  • Revision ID: pqm@pqm.ubuntu.com-20110504121051-aovlsmqiivjmc4fc
(jelmer) Small fixes to developer documentation. (Jonathan Riddell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
2
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
3
2
#
4
3
# This program is free software; you can redistribute it and/or modify
5
4
# it under the terms of the GNU General Public License as published by
13
12
#
14
13
# You should have received a copy of the GNU General Public License
15
14
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
16
 
18
17
"""Implementation of Transport over SFTP, using paramiko."""
19
18
 
24
23
# suite.  Those formats all date back to 0.7; so we should be able to remove
25
24
# these methods when we officially drop support for those formats.
26
25
 
 
26
import bisect
27
27
import errno
 
28
import itertools
28
29
import os
29
30
import random
30
 
import select
31
 
import socket
32
31
import stat
33
32
import sys
34
33
import time
35
34
import urllib
36
35
import urlparse
37
 
import weakref
 
36
import warnings
38
37
 
39
38
from bzrlib import (
 
39
    config,
 
40
    debug,
40
41
    errors,
41
42
    urlutils,
42
43
    )
48
49
                           ParamikoNotPresent,
49
50
                           )
50
51
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
52
from bzrlib.symbol_versioning import (
 
53
        deprecated_function,
 
54
        )
51
55
from bzrlib.trace import mutter, warning
52
56
from bzrlib.transport import (
53
 
    register_urlparse_netloc_protocol,
 
57
    FileFileStream,
 
58
    _file_streams,
 
59
    local,
54
60
    Server,
55
 
    split_url,
56
61
    ssh,
57
 
    Transport,
 
62
    ConnectedTransport,
58
63
    )
59
64
 
 
65
# Disable one particular warning that comes from paramiko in Python2.5; if
 
66
# this is emitted at the wrong time it tends to cause spurious test failures
 
67
# or at least noise in the test case::
 
68
#
 
69
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
 
70
# test_permissions.TestSftpPermissions.test_new_files
 
71
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
 
72
#  self.packet.write(struct.pack('>I', n))
 
73
warnings.filterwarnings('ignore',
 
74
        'integer argument expected, got float',
 
75
        category=DeprecationWarning,
 
76
        module='paramiko.message')
 
77
 
60
78
try:
61
79
    import paramiko
62
80
except ImportError, e:
64
82
else:
65
83
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
66
84
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
67
 
                               CMD_HANDLE, CMD_OPEN)
 
85
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
68
86
    from paramiko.sftp_attr import SFTPAttributes
69
87
    from paramiko.sftp_file import SFTPFile
70
88
 
71
89
 
72
 
register_urlparse_netloc_protocol('sftp')
73
 
 
74
 
 
75
 
# This is a weakref dictionary, so that we can reuse connections
76
 
# that are still active. Long term, it might be nice to have some
77
 
# sort of expiration policy, such as disconnect if inactive for
78
 
# X seconds. But that requires a lot more fanciness.
79
 
_connected_hosts = weakref.WeakValueDictionary()
80
 
 
81
 
 
82
90
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
83
91
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
84
92
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
85
93
 
86
94
 
87
 
def clear_connection_cache():
88
 
    """Remove all hosts from the SFTP connection cache.
89
 
 
90
 
    Primarily useful for test cases wanting to force garbage collection.
91
 
    """
92
 
    _connected_hosts.clear()
93
 
 
94
 
 
95
95
class SFTPLock(object):
96
96
    """This fakes a lock in a remote location.
97
 
    
 
97
 
98
98
    A present lock is indicated just by the existence of a file.  This
99
 
    doesn't work well on all transports and they are only used in 
 
99
    doesn't work well on all transports and they are only used in
100
100
    deprecated storage formats.
101
101
    """
102
 
    
 
102
 
103
103
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
104
104
 
105
105
    def __init__(self, path, transport):
106
 
        assert isinstance(transport, SFTPTransport)
107
 
 
108
106
        self.lock_file = None
109
107
        self.path = path
110
108
        self.lock_path = path + '.write-lock'
134
132
            pass
135
133
 
136
134
 
137
 
class SFTPUrlHandling(Transport):
138
 
    """Mix-in that does common handling of SSH/SFTP URLs."""
139
 
 
140
 
    def __init__(self, base):
141
 
        self._parse_url(base)
142
 
        base = self._unparse_url(self._path)
143
 
        if base[-1] != '/':
144
 
            base += '/'
145
 
        super(SFTPUrlHandling, self).__init__(base)
146
 
 
147
 
    def _parse_url(self, url):
148
 
        (self._scheme,
149
 
         self._username, self._password,
150
 
         self._host, self._port, self._path) = self._split_url(url)
151
 
 
152
 
    def _unparse_url(self, path):
153
 
        """Return a URL for a path relative to this transport.
154
 
        """
155
 
        path = urllib.quote(path)
156
 
        # handle homedir paths
157
 
        if not path.startswith('/'):
158
 
            path = "/~/" + path
159
 
        netloc = urllib.quote(self._host)
160
 
        if self._username is not None:
161
 
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
162
 
        if self._port is not None:
163
 
            netloc = '%s:%d' % (netloc, self._port)
164
 
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
165
 
 
166
 
    def _split_url(self, url):
167
 
        (scheme, username, password, host, port, path) = split_url(url)
168
 
        ## assert scheme == 'sftp'
169
 
 
170
 
        # the initial slash should be removed from the path, and treated
171
 
        # as a homedir relative path (the path begins with a double slash
172
 
        # if it is absolute).
173
 
        # see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
174
 
        # RBC 20060118 we are not using this as its too user hostile. instead
175
 
        # we are following lftp and using /~/foo to mean '~/foo'.
176
 
        # handle homedir paths
177
 
        if path.startswith('/~/'):
178
 
            path = path[3:]
179
 
        elif path == '/~':
180
 
            path = ''
181
 
        return (scheme, username, password, host, port, path)
182
 
 
183
 
    def abspath(self, relpath):
184
 
        """Return the full url to the given relative path.
185
 
        
186
 
        @param relpath: the relative path or path components
187
 
        @type relpath: str or list
188
 
        """
189
 
        return self._unparse_url(self._remote_path(relpath))
190
 
    
191
 
    def _remote_path(self, relpath):
192
 
        """Return the path to be passed along the sftp protocol for relpath.
193
 
        
194
 
        :param relpath: is a urlencoded string.
195
 
        """
196
 
        return self._combine_paths(self._path, relpath)
197
 
 
198
 
 
199
 
class SFTPTransport(SFTPUrlHandling):
 
135
class _SFTPReadvHelper(object):
 
136
    """A class to help with managing the state of a readv request."""
 
137
 
 
138
    # See _get_requests for an explanation.
 
139
    _max_request_size = 32768
 
140
 
 
141
    def __init__(self, original_offsets, relpath, _report_activity):
 
142
        """Create a new readv helper.
 
143
 
 
144
        :param original_offsets: The original requests given by the caller of
 
145
            readv()
 
146
        :param relpath: The name of the file (if known)
 
147
        :param _report_activity: A Transport._report_activity bound method,
 
148
            to be called as data arrives.
 
149
        """
 
150
        self.original_offsets = list(original_offsets)
 
151
        self.relpath = relpath
 
152
        self._report_activity = _report_activity
 
153
 
 
154
    def _get_requests(self):
 
155
        """Break up the offsets into individual requests over sftp.
 
156
 
 
157
        The SFTP spec only requires implementers to support 32kB requests. We
 
158
        could try something larger (openssh supports 64kB), but then we have to
 
159
        handle requests that fail.
 
160
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
161
        asyncronously requests them.
 
162
        Newer versions of paramiko would do the chunking for us, but we want to
 
163
        start processing results right away, so we do it ourselves.
 
164
        """
 
165
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
166
        #       data.  I'm not 100% sure that is the best choice.
 
167
 
 
168
        # The first thing we do, is to collapse the individual requests as much
 
169
        # as possible, so we don't issues requests <32kB
 
170
        sorted_offsets = sorted(self.original_offsets)
 
171
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
172
                                                        limit=0, fudge_factor=0))
 
173
        requests = []
 
174
        for c_offset in coalesced:
 
175
            start = c_offset.start
 
176
            size = c_offset.length
 
177
 
 
178
            # Break this up into 32kB requests
 
179
            while size > 0:
 
180
                next_size = min(size, self._max_request_size)
 
181
                requests.append((start, next_size))
 
182
                size -= next_size
 
183
                start += next_size
 
184
        if 'sftp' in debug.debug_flags:
 
185
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
186
                self.relpath, len(sorted_offsets), len(coalesced),
 
187
                len(requests))
 
188
        return requests
 
189
 
 
190
    def request_and_yield_offsets(self, fp):
 
191
        """Request the data from the remote machine, yielding the results.
 
192
 
 
193
        :param fp: A Paramiko SFTPFile object that supports readv.
 
194
        :return: Yield the data requested by the original readv caller, one by
 
195
            one.
 
196
        """
 
197
        requests = self._get_requests()
 
198
        offset_iter = iter(self.original_offsets)
 
199
        cur_offset, cur_size = offset_iter.next()
 
200
        # paramiko .readv() yields strings that are in the order of the requests
 
201
        # So we track the current request to know where the next data is
 
202
        # being returned from.
 
203
        input_start = None
 
204
        last_end = None
 
205
        buffered_data = []
 
206
        buffered_len = 0
 
207
 
 
208
        # This is used to buffer chunks which we couldn't process yet
 
209
        # It is (start, end, data) tuples.
 
210
        data_chunks = []
 
211
        # Create an 'unlimited' data stream, so we stop based on requests,
 
212
        # rather than just because the data stream ended. This lets us detect
 
213
        # short readv.
 
214
        data_stream = itertools.chain(fp.readv(requests),
 
215
                                      itertools.repeat(None))
 
216
        for (start, length), data in itertools.izip(requests, data_stream):
 
217
            if data is None:
 
218
                if cur_coalesced is not None:
 
219
                    raise errors.ShortReadvError(self.relpath,
 
220
                        start, length, len(data))
 
221
            if len(data) != length:
 
222
                raise errors.ShortReadvError(self.relpath,
 
223
                    start, length, len(data))
 
224
            self._report_activity(length, 'read')
 
225
            if last_end is None:
 
226
                # This is the first request, just buffer it
 
227
                buffered_data = [data]
 
228
                buffered_len = length
 
229
                input_start = start
 
230
            elif start == last_end:
 
231
                # The data we are reading fits neatly on the previous
 
232
                # buffer, so this is all part of a larger coalesced range.
 
233
                buffered_data.append(data)
 
234
                buffered_len += length
 
235
            else:
 
236
                # We have an 'interrupt' in the data stream. So we know we are
 
237
                # at a request boundary.
 
238
                if buffered_len > 0:
 
239
                    # We haven't consumed the buffer so far, so put it into
 
240
                    # data_chunks, and continue.
 
241
                    buffered = ''.join(buffered_data)
 
242
                    data_chunks.append((input_start, buffered))
 
243
                input_start = start
 
244
                buffered_data = [data]
 
245
                buffered_len = length
 
246
            last_end = start + length
 
247
            if input_start == cur_offset and cur_size <= buffered_len:
 
248
                # Simplify the next steps a bit by transforming buffered_data
 
249
                # into a single string. We also have the nice property that
 
250
                # when there is only one string ''.join([x]) == x, so there is
 
251
                # no data copying.
 
252
                buffered = ''.join(buffered_data)
 
253
                # Clean out buffered data so that we keep memory
 
254
                # consumption low
 
255
                del buffered_data[:]
 
256
                buffered_offset = 0
 
257
                # TODO: We *could* also consider the case where cur_offset is in
 
258
                #       in the buffered range, even though it doesn't *start*
 
259
                #       the buffered range. But for packs we pretty much always
 
260
                #       read in order, so you won't get any extra data in the
 
261
                #       middle.
 
262
                while (input_start == cur_offset
 
263
                       and (buffered_offset + cur_size) <= buffered_len):
 
264
                    # We've buffered enough data to process this request, spit it
 
265
                    # out
 
266
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
267
                    # move the direct pointer into our buffered data
 
268
                    buffered_offset += cur_size
 
269
                    # Move the start-of-buffer pointer
 
270
                    input_start += cur_size
 
271
                    # Yield the requested data
 
272
                    yield cur_offset, cur_data
 
273
                    cur_offset, cur_size = offset_iter.next()
 
274
                # at this point, we've consumed as much of buffered as we can,
 
275
                # so break off the portion that we consumed
 
276
                if buffered_offset == len(buffered_data):
 
277
                    # No tail to leave behind
 
278
                    buffered_data = []
 
279
                    buffered_len = 0
 
280
                else:
 
281
                    buffered = buffered[buffered_offset:]
 
282
                    buffered_data = [buffered]
 
283
                    buffered_len = len(buffered)
 
284
        if buffered_len:
 
285
            buffered = ''.join(buffered_data)
 
286
            del buffered_data[:]
 
287
            data_chunks.append((input_start, buffered))
 
288
        if data_chunks:
 
289
            if 'sftp' in debug.debug_flags:
 
290
                mutter('SFTP readv left with %d out-of-order bytes',
 
291
                    sum(map(lambda x: len(x[1]), data_chunks)))
 
292
            # We've processed all the readv data, at this point, anything we
 
293
            # couldn't process is in data_chunks. This doesn't happen often, so
 
294
            # this code path isn't optimized
 
295
            # We use an interesting process for data_chunks
 
296
            # Specifically if we have "bisect_left([(start, len, entries)],
 
297
            #                                       (qstart,)])
 
298
            # If start == qstart, then we get the specific node. Otherwise we
 
299
            # get the previous node
 
300
            while True:
 
301
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
302
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
 
303
                    # The data starts here
 
304
                    data = data_chunks[idx][1][:cur_size]
 
305
                elif idx > 0:
 
306
                    # The data is in a portion of a previous page
 
307
                    idx -= 1
 
308
                    sub_offset = cur_offset - data_chunks[idx][0]
 
309
                    data = data_chunks[idx][1]
 
310
                    data = data[sub_offset:sub_offset + cur_size]
 
311
                else:
 
312
                    # We are missing the page where the data should be found,
 
313
                    # something is wrong
 
314
                    data = ''
 
315
                if len(data) != cur_size:
 
316
                    raise AssertionError('We must have miscalulated.'
 
317
                        ' We expected %d bytes, but only found %d'
 
318
                        % (cur_size, len(data)))
 
319
                yield cur_offset, data
 
320
                cur_offset, cur_size = offset_iter.next()
 
321
 
 
322
 
 
323
class SFTPTransport(ConnectedTransport):
200
324
    """Transport implementation for SFTP access."""
201
325
 
202
326
    _do_prefetch = _default_do_prefetch
217
341
    # up the request itself, rather than us having to worry about it
218
342
    _max_request_size = 32768
219
343
 
220
 
    def __init__(self, base, clone_from=None):
221
 
        super(SFTPTransport, self).__init__(base)
222
 
        if clone_from is None:
223
 
            self._sftp_connect()
224
 
        else:
225
 
            # use the same ssh connection, etc
226
 
            self._sftp = clone_from._sftp
227
 
        # super saves 'self.base'
228
 
    
229
 
    def should_cache(self):
230
 
        """
231
 
        Return True if the data pulled across should be cached locally.
232
 
        """
233
 
        return True
234
 
 
235
 
    def clone(self, offset=None):
236
 
        """
237
 
        Return a new SFTPTransport with root at self.base + offset.
238
 
        We share the same SFTP session between such transports, because it's
239
 
        fairly expensive to set them up.
240
 
        """
241
 
        if offset is None:
242
 
            return SFTPTransport(self.base, self)
243
 
        else:
244
 
            return SFTPTransport(self.abspath(offset), self)
 
344
    def __init__(self, base, _from_transport=None):
 
345
        super(SFTPTransport, self).__init__(base,
 
346
                                            _from_transport=_from_transport)
245
347
 
246
348
    def _remote_path(self, relpath):
247
349
        """Return the path to be passed along the sftp protocol for relpath.
248
 
        
249
 
        relpath is a urlencoded string.
250
 
 
251
 
        :return: a path prefixed with / for regular abspath-based urls, or a
252
 
            path that does not begin with / for urls which begin with /~/.
253
 
        """
254
 
        # how does this work? 
255
 
        # it processes relpath with respect to 
256
 
        # our state:
257
 
        # firstly we create a path to evaluate: 
258
 
        # if relpath is an abspath or homedir path, its the entire thing
259
 
        # otherwise we join our base with relpath
260
 
        # then we eliminate all empty segments (double //'s) outside the first
261
 
        # two elements of the list. This avoids problems with trailing 
262
 
        # slashes, or other abnormalities.
263
 
        # finally we evaluate the entire path in a single pass
264
 
        # '.'s are stripped,
265
 
        # '..' result in popping the left most already 
266
 
        # processed path (which can never be empty because of the check for
267
 
        # abspath and homedir meaning that its not, or that we've used our
268
 
        # path. If the pop would pop the root, we ignore it.
269
 
 
270
 
        # Specific case examinations:
271
 
        # remove the special casefor ~: if the current root is ~/ popping of it
272
 
        # = / thus our seed for a ~ based path is ['', '~']
273
 
        # and if we end up with [''] then we had basically ('', '..') (which is
274
 
        # '/..' so we append '' if the length is one, and assert that the first
275
 
        # element is still ''. Lastly, if we end with ['', '~'] as a prefix for
276
 
        # the output, we've got a homedir path, so we strip that prefix before
277
 
        # '/' joining the resulting list.
278
 
        #
279
 
        # case one: '/' -> ['', ''] cannot shrink
280
 
        # case two: '/' + '../foo' -> ['', 'foo'] (take '', '', '..', 'foo')
281
 
        #           and pop the second '' for the '..', append 'foo'
282
 
        # case three: '/~/' -> ['', '~', ''] 
283
 
        # case four: '/~/' + '../foo' -> ['', '~', '', '..', 'foo'],
284
 
        #           and we want to get '/foo' - the empty path in the middle
285
 
        #           needs to be stripped, then normal path manipulation will 
286
 
        #           work.
287
 
        # case five: '/..' ['', '..'], we want ['', '']
288
 
        #            stripping '' outside the first two is ok
289
 
        #            ignore .. if its too high up
290
 
        #
291
 
        # lastly this code is possibly reusable by FTP, but not reusable by
292
 
        # local paths: ~ is resolvable correctly, nor by HTTP or the smart
293
 
        # server: ~ is resolved remotely.
294
 
        # 
295
 
        # however, a version of this that acts on self.base is possible to be
296
 
        # written which manipulates the URL in canonical form, and would be
297
 
        # reusable for all transports, if a flag for allowing ~/ at all was
298
 
        # provided.
299
 
        assert isinstance(relpath, basestring)
300
 
        relpath = urlutils.unescape(relpath)
301
 
 
302
 
        # case 1)
303
 
        if relpath.startswith('/'):
304
 
            # abspath - normal split is fine.
305
 
            current_path = relpath.split('/')
306
 
        elif relpath.startswith('~/'):
307
 
            # root is homedir based: normal split and prefix '' to remote the
308
 
            # special case
309
 
            current_path = [''].extend(relpath.split('/'))
 
350
 
 
351
        :param relpath: is a urlencoded string.
 
352
        """
 
353
        relative = urlutils.unescape(relpath).encode('utf-8')
 
354
        remote_path = self._combine_paths(self._path, relative)
 
355
        # the initial slash should be removed from the path, and treated as a
 
356
        # homedir relative path (the path begins with a double slash if it is
 
357
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
358
        # RBC 20060118 we are not using this as its too user hostile. instead
 
359
        # we are following lftp and using /~/foo to mean '~/foo'
 
360
        # vila--20070602 and leave absolute paths begin with a single slash.
 
361
        if remote_path.startswith('/~/'):
 
362
            remote_path = remote_path[3:]
 
363
        elif remote_path == '/~':
 
364
            remote_path = ''
 
365
        return remote_path
 
366
 
 
367
    def _create_connection(self, credentials=None):
 
368
        """Create a new connection with the provided credentials.
 
369
 
 
370
        :param credentials: The credentials needed to establish the connection.
 
371
 
 
372
        :return: The created connection and its associated credentials.
 
373
 
 
374
        The credentials are only the password as it may have been entered
 
375
        interactively by the user and may be different from the one provided
 
376
        in base url at transport creation time.
 
377
        """
 
378
        if credentials is None:
 
379
            password = self._password
310
380
        else:
311
 
            # root is from the current directory:
312
 
            if self._path.startswith('/'):
313
 
                # abspath, take the regular split
314
 
                current_path = []
315
 
            else:
316
 
                # homedir based, add the '', '~' not present in self._path
317
 
                current_path = ['', '~']
318
 
            # add our current dir
319
 
            current_path.extend(self._path.split('/'))
320
 
            # add the users relpath
321
 
            current_path.extend(relpath.split('/'))
322
 
        # strip '' segments that are not in the first one - the leading /.
323
 
        to_process = current_path[:1]
324
 
        for segment in current_path[1:]:
325
 
            if segment != '':
326
 
                to_process.append(segment)
327
 
 
328
 
        # process '.' and '..' segments into output_path.
329
 
        output_path = []
330
 
        for segment in to_process:
331
 
            if segment == '..':
332
 
                # directory pop. Remove a directory 
333
 
                # as long as we are not at the root
334
 
                if len(output_path) > 1:
335
 
                    output_path.pop()
336
 
                # else: pass
337
 
                # cannot pop beyond the root, so do nothing
338
 
            elif segment == '.':
339
 
                continue # strip the '.' from the output.
340
 
            else:
341
 
                # this will append '' to output_path for the root elements,
342
 
                # which is appropriate: its why we strip '' in the first pass.
343
 
                output_path.append(segment)
344
 
 
345
 
        # check output special cases:
346
 
        if output_path == ['']:
347
 
            # [''] -> ['', '']
348
 
            output_path = ['', '']
349
 
        elif output_path[:2] == ['', '~']:
350
 
            # ['', '~', ...] -> ...
351
 
            output_path = output_path[2:]
352
 
        path = '/'.join(output_path)
353
 
        return path
354
 
 
355
 
    def relpath(self, abspath):
356
 
        scheme, username, password, host, port, path = self._split_url(abspath)
357
 
        error = []
358
 
        if (username != self._username):
359
 
            error.append('username mismatch')
360
 
        if (host != self._host):
361
 
            error.append('host mismatch')
362
 
        if (port != self._port):
363
 
            error.append('port mismatch')
364
 
        if (not path.startswith(self._path)):
365
 
            error.append('path mismatch')
366
 
        if error:
367
 
            extra = ': ' + ', '.join(error)
368
 
            raise PathNotChild(abspath, self.base, extra=extra)
369
 
        pl = len(self._path)
370
 
        return path[pl:].strip('/')
 
381
            password = credentials
 
382
 
 
383
        vendor = ssh._get_ssh_vendor()
 
384
        user = self._user
 
385
        if user is None:
 
386
            auth = config.AuthenticationConfig()
 
387
            user = auth.get_user('ssh', self._host, self._port)
 
388
        connection = vendor.connect_sftp(self._user, password,
 
389
                                         self._host, self._port)
 
390
        return connection, (user, password)
 
391
 
 
392
    def disconnect(self):
 
393
        connection = self._get_connection()
 
394
        if connection is not None:
 
395
            connection.close()
 
396
 
 
397
    def _get_sftp(self):
 
398
        """Ensures that a connection is established"""
 
399
        connection = self._get_connection()
 
400
        if connection is None:
 
401
            # First connection ever
 
402
            connection, credentials = self._create_connection()
 
403
            self._set_connection(connection, credentials)
 
404
        return connection
371
405
 
372
406
    def has(self, relpath):
373
407
        """
374
408
        Does the target location exist?
375
409
        """
376
410
        try:
377
 
            self._sftp.stat(self._remote_path(relpath))
 
411
            self._get_sftp().stat(self._remote_path(relpath))
 
412
            # stat result is about 20 bytes, let's say
 
413
            self._report_activity(20, 'read')
378
414
            return True
379
415
        except IOError:
380
416
            return False
381
417
 
382
418
    def get(self, relpath):
383
 
        """
384
 
        Get the file at the given relative path.
 
419
        """Get the file at the given relative path.
385
420
 
386
421
        :param relpath: The relative path to the file
387
422
        """
388
423
        try:
 
424
            # FIXME: by returning the file directly, we don't pass this
 
425
            # through to report_activity.  We could try wrapping the object
 
426
            # before it's returned.  For readv and get_bytes it's handled in
 
427
            # the higher-level function.
 
428
            # -- mbp 20090126
389
429
            path = self._remote_path(relpath)
390
 
            f = self._sftp.file(path, mode='rb')
 
430
            f = self._get_sftp().file(path, mode='rb')
391
431
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
392
432
                f.prefetch()
393
433
            return f
394
434
        except (IOError, paramiko.SSHException), e:
395
 
            self._translate_io_exception(e, path, ': error retrieving')
396
 
 
397
 
    def readv(self, relpath, offsets):
 
435
            self._translate_io_exception(e, path, ': error retrieving',
 
436
                failure_exc=errors.ReadError)
 
437
 
 
438
    def get_bytes(self, relpath):
 
439
        # reimplement this here so that we can report how many bytes came back
 
440
        f = self.get(relpath)
 
441
        try:
 
442
            bytes = f.read()
 
443
            self._report_activity(len(bytes), 'read')
 
444
            return bytes
 
445
        finally:
 
446
            f.close()
 
447
 
 
448
    def _readv(self, relpath, offsets):
398
449
        """See Transport.readv()"""
399
450
        # We overload the default readv() because we want to use a file
400
451
        # that does not have prefetch enabled.
404
455
 
405
456
        try:
406
457
            path = self._remote_path(relpath)
407
 
            fp = self._sftp.file(path, mode='rb')
 
458
            fp = self._get_sftp().file(path, mode='rb')
408
459
            readv = getattr(fp, 'readv', None)
409
460
            if readv:
410
461
                return self._sftp_readv(fp, offsets, relpath)
411
 
            mutter('seek and read %s offsets', len(offsets))
 
462
            if 'sftp' in debug.debug_flags:
 
463
                mutter('seek and read %s offsets', len(offsets))
412
464
            return self._seek_and_read(fp, offsets, relpath)
413
465
        except (IOError, paramiko.SSHException), e:
414
466
            self._translate_io_exception(e, path, ': error retrieving')
415
467
 
416
 
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
 
468
    def recommended_page_size(self):
 
469
        """See Transport.recommended_page_size().
 
470
 
 
471
        For SFTP we suggest a large page size to reduce the overhead
 
472
        introduced by latency.
 
473
        """
 
474
        return 64 * 1024
 
475
 
 
476
    def _sftp_readv(self, fp, offsets, relpath):
417
477
        """Use the readv() member of fp to do async readv.
418
478
 
419
 
        And then read them using paramiko.readv(). paramiko.readv()
 
479
        Then read them using paramiko.readv(). paramiko.readv()
420
480
        does not support ranges > 64K, so it caps the request size, and
421
 
        just reads until it gets all the stuff it wants
 
481
        just reads until it gets all the stuff it wants.
422
482
        """
423
 
        offsets = list(offsets)
424
 
        sorted_offsets = sorted(offsets)
425
 
 
426
 
        # The algorithm works as follows:
427
 
        # 1) Coalesce nearby reads into a single chunk
428
 
        #    This generates a list of combined regions, the total size
429
 
        #    and the size of the sub regions. This coalescing step is limited
430
 
        #    in the number of nearby chunks to combine, and is allowed to
431
 
        #    skip small breaks in the requests. Limiting it makes sure that
432
 
        #    we can start yielding some data earlier, and skipping means we
433
 
        #    make fewer requests. (Beneficial even when using async)
434
 
        # 2) Break up this combined regions into chunks that are smaller
435
 
        #    than 64KiB. Technically the limit is 65536, but we are a
436
 
        #    little bit conservative. This is because sftp has a maximum
437
 
        #    return chunk size of 64KiB (max size of an unsigned short)
438
 
        # 3) Issue a readv() to paramiko to create an async request for
439
 
        #    all of this data
440
 
        # 4) Read in the data as it comes back, until we've read one
441
 
        #    continuous section as determined in step 1
442
 
        # 5) Break up the full sections into hunks for the original requested
443
 
        #    offsets. And put them in a cache
444
 
        # 6) Check if the next request is in the cache, and if it is, remove
445
 
        #    it from the cache, and yield its data. Continue until no more
446
 
        #    entries are in the cache.
447
 
        # 7) loop back to step 4 until all data has been read
448
 
        #
449
 
        # TODO: jam 20060725 This could be optimized one step further, by
450
 
        #       attempting to yield whatever data we have read, even before
451
 
        #       the first coallesced section has been fully processed.
452
 
 
453
 
        # When coalescing for use with readv(), we don't really need to
454
 
        # use any fudge factor, because the requests are made asynchronously
455
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
456
 
                               limit=self._max_readv_combine,
457
 
                               fudge_factor=0,
458
 
                               ))
459
 
        requests = []
460
 
        for c_offset in coalesced:
461
 
            start = c_offset.start
462
 
            size = c_offset.length
463
 
 
464
 
            # We need to break this up into multiple requests
465
 
            while size > 0:
466
 
                next_size = min(size, self._max_request_size)
467
 
                requests.append((start, next_size))
468
 
                size -= next_size
469
 
                start += next_size
470
 
 
471
 
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
472
 
                len(offsets), len(coalesced), len(requests))
473
 
 
474
 
        # Queue the current read until we have read the full coalesced section
475
 
        cur_data = []
476
 
        cur_data_len = 0
477
 
        cur_coalesced_stack = iter(coalesced)
478
 
        cur_coalesced = cur_coalesced_stack.next()
479
 
 
480
 
        # Cache the results, but only until they have been fulfilled
481
 
        data_map = {}
482
 
        # turn the list of offsets into a stack
483
 
        offset_stack = iter(offsets)
484
 
        cur_offset_and_size = offset_stack.next()
485
 
 
486
 
        for data in fp.readv(requests):
487
 
            cur_data += data
488
 
            cur_data_len += len(data)
489
 
 
490
 
            if cur_data_len < cur_coalesced.length:
491
 
                continue
492
 
            assert cur_data_len == cur_coalesced.length, \
493
 
                "Somehow we read too much: %s != %s" % (cur_data_len,
494
 
                                                        cur_coalesced.length)
495
 
            all_data = ''.join(cur_data)
496
 
            cur_data = []
497
 
            cur_data_len = 0
498
 
 
499
 
            for suboffset, subsize in cur_coalesced.ranges:
500
 
                key = (cur_coalesced.start+suboffset, subsize)
501
 
                data_map[key] = all_data[suboffset:suboffset+subsize]
502
 
 
503
 
            # Now that we've read some data, see if we can yield anything back
504
 
            while cur_offset_and_size in data_map:
505
 
                this_data = data_map.pop(cur_offset_and_size)
506
 
                yield cur_offset_and_size[0], this_data
507
 
                cur_offset_and_size = offset_stack.next()
508
 
 
509
 
            # We read a coalesced entry, so mark it as done
510
 
            cur_coalesced = None
511
 
            # Now that we've read all of the data for this coalesced section
512
 
            # on to the next
513
 
            cur_coalesced = cur_coalesced_stack.next()
514
 
 
515
 
        if cur_coalesced is not None:
516
 
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
517
 
                cur_coalesced.length, len(data))
 
483
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
 
484
        return helper.request_and_yield_offsets(fp)
518
485
 
519
486
    def put_file(self, relpath, f, mode=None):
520
487
        """
525
492
        :param mode: The final mode for the file
526
493
        """
527
494
        final_path = self._remote_path(relpath)
528
 
        self._put(final_path, f, mode=mode)
 
495
        return self._put(final_path, f, mode=mode)
529
496
 
530
497
    def _put(self, abspath, f, mode=None):
531
498
        """Helper function so both put() and copy_abspaths can reuse the code"""
536
503
        try:
537
504
            try:
538
505
                fout.set_pipelined(True)
539
 
                self._pump(f, fout)
 
506
                length = self._pump(f, fout)
540
507
            except (IOError, paramiko.SSHException), e:
541
508
                self._translate_io_exception(e, tmp_abspath)
542
509
            # XXX: This doesn't truly help like we would like it to.
545
512
            #      sticky bit. So it is probably best to stop chmodding, and
546
513
            #      just tell users that they need to set the umask correctly.
547
514
            #      The attr.st_mode = mode, in _sftp_open_exclusive
548
 
            #      will handle when the user wants the final mode to be more 
549
 
            #      restrictive. And then we avoid a round trip. Unless 
 
515
            #      will handle when the user wants the final mode to be more
 
516
            #      restrictive. And then we avoid a round trip. Unless
550
517
            #      paramiko decides to expose an async chmod()
551
518
 
552
519
            # This is designed to chmod() right before we close.
553
 
            # Because we set_pipelined() earlier, theoretically we might 
 
520
            # Because we set_pipelined() earlier, theoretically we might
554
521
            # avoid the round trip for fout.close()
555
522
            if mode is not None:
556
 
                self._sftp.chmod(tmp_abspath, mode)
 
523
                self._get_sftp().chmod(tmp_abspath, mode)
557
524
            fout.close()
558
525
            closed = True
559
526
            self._rename_and_overwrite(tmp_abspath, abspath)
 
527
            return length
560
528
        except Exception, e:
561
529
            # If we fail, try to clean up the temporary file
562
530
            # before we throw the exception
568
536
            try:
569
537
                if not closed:
570
538
                    fout.close()
571
 
                self._sftp.remove(tmp_abspath)
 
539
                self._get_sftp().remove(tmp_abspath)
572
540
            except:
573
541
                # raise the saved except
574
542
                raise e
589
557
            fout = None
590
558
            try:
591
559
                try:
592
 
                    fout = self._sftp.file(abspath, mode='wb')
 
560
                    fout = self._get_sftp().file(abspath, mode='wb')
593
561
                    fout.set_pipelined(True)
594
562
                    writer(fout)
595
563
                except (paramiko.SSHException, IOError), e:
597
565
                                                 ': unable to open')
598
566
 
599
567
                # This is designed to chmod() right before we close.
600
 
                # Because we set_pipelined() earlier, theoretically we might 
 
568
                # Because we set_pipelined() earlier, theoretically we might
601
569
                # avoid the round trip for fout.close()
602
570
                if mode is not None:
603
 
                    self._sftp.chmod(abspath, mode)
 
571
                    self._get_sftp().chmod(abspath, mode)
604
572
            finally:
605
573
                if fout is not None:
606
574
                    fout.close()
654
622
 
655
623
    def iter_files_recursive(self):
656
624
        """Walk the relative paths of all files in this transport."""
 
625
        # progress is handled by list_dir
657
626
        queue = list(self.list_dir('.'))
658
627
        while queue:
659
628
            relpath = queue.pop(0)
670
639
        else:
671
640
            local_mode = mode
672
641
        try:
673
 
            self._sftp.mkdir(abspath, local_mode)
 
642
            self._report_activity(len(abspath), 'write')
 
643
            self._get_sftp().mkdir(abspath, local_mode)
 
644
            self._report_activity(1, 'read')
674
645
            if mode is not None:
675
 
                self._sftp.chmod(abspath, mode=mode)
 
646
                # chmod a dir through sftp will erase any sgid bit set
 
647
                # on the server side.  So, if the bit mode are already
 
648
                # set, avoid the chmod.  If the mode is not fine but
 
649
                # the sgid bit is set, report a warning to the user
 
650
                # with the umask fix.
 
651
                stat = self._get_sftp().lstat(abspath)
 
652
                mode = mode & 0777 # can't set special bits anyway
 
653
                if mode != stat.st_mode & 0777:
 
654
                    if stat.st_mode & 06000:
 
655
                        warning('About to chmod %s over sftp, which will result'
 
656
                                ' in its suid or sgid bits being cleared.  If'
 
657
                                ' you want to preserve those bits, change your '
 
658
                                ' environment on the server to use umask 0%03o.'
 
659
                                % (abspath, 0777 - mode))
 
660
                    self._get_sftp().chmod(abspath, mode=mode)
676
661
        except (paramiko.SSHException, IOError), e:
677
662
            self._translate_io_exception(e, abspath, ': unable to mkdir',
678
663
                failure_exc=FileExists)
681
666
        """Create a directory at the given path."""
682
667
        self._mkdir(self._remote_path(relpath), mode=mode)
683
668
 
684
 
    def _translate_io_exception(self, e, path, more_info='', 
 
669
    def open_write_stream(self, relpath, mode=None):
 
670
        """See Transport.open_write_stream."""
 
671
        # initialise the file to zero-length
 
672
        # this is three round trips, but we don't use this
 
673
        # api more than once per write_group at the moment so
 
674
        # it is a tolerable overhead. Better would be to truncate
 
675
        # the file after opening. RBC 20070805
 
676
        self.put_bytes_non_atomic(relpath, "", mode)
 
677
        abspath = self._remote_path(relpath)
 
678
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
679
        #       set the file mode at create time. If it does, use it.
 
680
        #       But for now, we just chmod later anyway.
 
681
        handle = None
 
682
        try:
 
683
            handle = self._get_sftp().file(abspath, mode='wb')
 
684
            handle.set_pipelined(True)
 
685
        except (paramiko.SSHException, IOError), e:
 
686
            self._translate_io_exception(e, abspath,
 
687
                                         ': unable to open')
 
688
        _file_streams[self.abspath(relpath)] = handle
 
689
        return FileFileStream(self, relpath, handle)
 
690
 
 
691
    def _translate_io_exception(self, e, path, more_info='',
685
692
                                failure_exc=PathError):
686
693
        """Translate a paramiko or IOError into a friendlier exception.
687
694
 
692
699
        :param failure_exc: Paramiko has the super fun ability to raise completely
693
700
                           opaque errors that just set "e.args = ('Failure',)" with
694
701
                           no more information.
695
 
                           If this parameter is set, it defines the exception 
 
702
                           If this parameter is set, it defines the exception
696
703
                           to raise in these cases.
697
704
        """
698
705
        # paramiko seems to generate detailless errors.
701
708
            if (e.args == ('No such file or directory',) or
702
709
                e.args == ('No such file',)):
703
710
                raise NoSuchFile(path, str(e) + more_info)
704
 
            if (e.args == ('mkdir failed',)):
 
711
            if (e.args == ('mkdir failed',) or
 
712
                e.args[0].startswith('syserr: File exists')):
705
713
                raise FileExists(path, str(e) + more_info)
706
714
            # strange but true, for the paramiko server.
707
715
            if (e.args == ('Failure',)):
708
716
                raise failure_exc(path, str(e) + more_info)
 
717
            # Can be something like args = ('Directory not empty:
 
718
            # '/srv/bazaar.launchpad.net/blah...: '
 
719
            # [Errno 39] Directory not empty',)
 
720
            if (e.args[0].startswith('Directory not empty: ')
 
721
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
 
722
                raise errors.DirectoryNotEmpty(path, str(e))
 
723
            if e.args == ('Operation unsupported',):
 
724
                raise errors.TransportNotPossible()
709
725
            mutter('Raising exception with args %s', e.args)
710
726
        if getattr(e, 'errno', None) is not None:
711
727
            mutter('Raising exception with errno %s', e.errno)
718
734
        """
719
735
        try:
720
736
            path = self._remote_path(relpath)
721
 
            fout = self._sftp.file(path, 'ab')
 
737
            fout = self._get_sftp().file(path, 'ab')
722
738
            if mode is not None:
723
 
                self._sftp.chmod(path, mode)
 
739
                self._get_sftp().chmod(path, mode)
724
740
            result = fout.tell()
725
741
            self._pump(f, fout)
726
742
            return result
730
746
    def rename(self, rel_from, rel_to):
731
747
        """Rename without special overwriting"""
732
748
        try:
733
 
            self._sftp.rename(self._remote_path(rel_from),
 
749
            self._get_sftp().rename(self._remote_path(rel_from),
734
750
                              self._remote_path(rel_to))
735
751
        except (IOError, paramiko.SSHException), e:
736
752
            self._translate_io_exception(e, rel_from,
738
754
 
739
755
    def _rename_and_overwrite(self, abs_from, abs_to):
740
756
        """Do a fancy rename on the remote server.
741
 
        
 
757
 
742
758
        Using the implementation provided by osutils.
743
759
        """
744
760
        try:
 
761
            sftp = self._get_sftp()
745
762
            fancy_rename(abs_from, abs_to,
746
 
                    rename_func=self._sftp.rename,
747
 
                    unlink_func=self._sftp.remove)
 
763
                         rename_func=sftp.rename,
 
764
                         unlink_func=sftp.remove)
748
765
        except (IOError, paramiko.SSHException), e:
749
 
            self._translate_io_exception(e, abs_from, ': unable to rename to %r' % (abs_to))
 
766
            self._translate_io_exception(e, abs_from,
 
767
                                         ': unable to rename to %r' % (abs_to))
750
768
 
751
769
    def move(self, rel_from, rel_to):
752
770
        """Move the item at rel_from to the location at rel_to"""
758
776
        """Delete the item at relpath"""
759
777
        path = self._remote_path(relpath)
760
778
        try:
761
 
            self._sftp.remove(path)
 
779
            self._get_sftp().remove(path)
762
780
        except (IOError, paramiko.SSHException), e:
763
781
            self._translate_io_exception(e, path, ': unable to delete')
764
 
            
 
782
 
 
783
    def external_url(self):
 
784
        """See bzrlib.transport.Transport.external_url."""
 
785
        # the external path for SFTP is the base
 
786
        return self.base
 
787
 
765
788
    def listable(self):
766
789
        """Return True if this store supports listing."""
767
790
        return True
776
799
        # -- David Allouche 2006-08-11
777
800
        path = self._remote_path(relpath)
778
801
        try:
779
 
            entries = self._sftp.listdir(path)
 
802
            entries = self._get_sftp().listdir(path)
 
803
            self._report_activity(sum(map(len, entries)), 'read')
780
804
        except (IOError, paramiko.SSHException), e:
781
805
            self._translate_io_exception(e, path, ': failed to list_dir')
782
806
        return [urlutils.escape(entry) for entry in entries]
785
809
        """See Transport.rmdir."""
786
810
        path = self._remote_path(relpath)
787
811
        try:
788
 
            return self._sftp.rmdir(path)
 
812
            return self._get_sftp().rmdir(path)
789
813
        except (IOError, paramiko.SSHException), e:
790
814
            self._translate_io_exception(e, path, ': failed to rmdir')
791
815
 
793
817
        """Return the stat information for a file."""
794
818
        path = self._remote_path(relpath)
795
819
        try:
796
 
            return self._sftp.stat(path)
 
820
            return self._get_sftp().lstat(path)
797
821
        except (IOError, paramiko.SSHException), e:
798
822
            self._translate_io_exception(e, path, ': unable to stat')
799
823
 
 
824
    def readlink(self, relpath):
 
825
        """See Transport.readlink."""
 
826
        path = self._remote_path(relpath)
 
827
        try:
 
828
            return self._get_sftp().readlink(path)
 
829
        except (IOError, paramiko.SSHException), e:
 
830
            self._translate_io_exception(e, path, ': unable to readlink')
 
831
 
 
832
    def symlink(self, source, link_name):
 
833
        """See Transport.symlink."""
 
834
        try:
 
835
            conn = self._get_sftp()
 
836
            sftp_retval = conn.symlink(source, link_name)
 
837
            if SFTP_OK != sftp_retval:
 
838
                raise TransportError(
 
839
                    '%r: unable to create symlink to %r' % (link_name, source),
 
840
                    sftp_retval
 
841
                )
 
842
        except (IOError, paramiko.SSHException), e:
 
843
            self._translate_io_exception(e, link_name,
 
844
                                         ': unable to create symlink to %r' % (source))
 
845
 
800
846
    def lock_read(self, relpath):
801
847
        """
802
848
        Lock the given file for shared (read) access.
823
869
        # that we have taken the lock.
824
870
        return SFTPLock(relpath, self)
825
871
 
826
 
    def _sftp_connect(self):
827
 
        """Connect to the remote sftp server.
828
 
        After this, self._sftp should have a valid connection (or
829
 
        we raise an TransportError 'could not connect').
830
 
 
831
 
        TODO: Raise a more reasonable ConnectionFailed exception
832
 
        """
833
 
        self._sftp = _sftp_connect(self._host, self._port, self._username,
834
 
                self._password)
835
 
 
836
872
    def _sftp_open_exclusive(self, abspath, mode=None):
837
873
        """Open a remote path exclusively.
838
874
 
849
885
        """
850
886
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
851
887
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
852
 
        #       However, there is no way to set the permission mode at open 
 
888
        #       However, there is no way to set the permission mode at open
853
889
        #       time using the sftp_client.file() functionality.
854
 
        path = self._sftp._adjust_cwd(abspath)
 
890
        path = self._get_sftp()._adjust_cwd(abspath)
855
891
        # mutter('sftp abspath %s => %s', abspath, path)
856
892
        attr = SFTPAttributes()
857
893
        if mode is not None:
858
894
            attr.st_mode = mode
859
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
895
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
860
896
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
861
897
        try:
862
 
            t, msg = self._sftp._request(CMD_OPEN, path, omode, attr)
 
898
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
863
899
            if t != CMD_HANDLE:
864
900
                raise TransportError('Expected an SFTP handle')
865
901
            handle = msg.get_string()
866
 
            return SFTPFile(self._sftp, handle, 'wb', -1)
 
902
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
867
903
        except (paramiko.SSHException, IOError), e:
868
904
            self._translate_io_exception(e, abspath, ': unable to open',
869
905
                failure_exc=FileExists)
875
911
        else:
876
912
            return True
877
913
 
878
 
# ------------- server test implementation --------------
879
 
import threading
880
 
 
881
 
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
882
 
 
883
 
STUB_SERVER_KEY = """
884
 
-----BEGIN RSA PRIVATE KEY-----
885
 
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
886
 
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
887
 
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
888
 
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
889
 
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
890
 
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
891
 
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
892
 
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
893
 
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
894
 
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
895
 
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
896
 
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
897
 
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
898
 
-----END RSA PRIVATE KEY-----
899
 
"""
900
 
 
901
 
 
902
 
class SocketListener(threading.Thread):
903
 
 
904
 
    def __init__(self, callback):
905
 
        threading.Thread.__init__(self)
906
 
        self._callback = callback
907
 
        self._socket = socket.socket()
908
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
909
 
        self._socket.bind(('localhost', 0))
910
 
        self._socket.listen(1)
911
 
        self.port = self._socket.getsockname()[1]
912
 
        self._stop_event = threading.Event()
913
 
 
914
 
    def stop(self):
915
 
        # called from outside this thread
916
 
        self._stop_event.set()
917
 
        # use a timeout here, because if the test fails, the server thread may
918
 
        # never notice the stop_event.
919
 
        self.join(5.0)
920
 
        self._socket.close()
921
 
 
922
 
    def run(self):
923
 
        while True:
924
 
            readable, writable_unused, exception_unused = \
925
 
                select.select([self._socket], [], [], 0.1)
926
 
            if self._stop_event.isSet():
927
 
                return
928
 
            if len(readable) == 0:
929
 
                continue
930
 
            try:
931
 
                s, addr_unused = self._socket.accept()
932
 
                # because the loopback socket is inline, and transports are
933
 
                # never explicitly closed, best to launch a new thread.
934
 
                threading.Thread(target=self._callback, args=(s,)).start()
935
 
            except socket.error, x:
936
 
                sys.excepthook(*sys.exc_info())
937
 
                warning('Socket error during accept() within unit test server'
938
 
                        ' thread: %r' % x)
939
 
            except Exception, x:
940
 
                # probably a failed test; unit test thread will log the
941
 
                # failure/error
942
 
                sys.excepthook(*sys.exc_info())
943
 
                warning('Exception from within unit test server thread: %r' % 
944
 
                        x)
945
 
 
946
 
 
947
 
class SocketDelay(object):
948
 
    """A socket decorator to make TCP appear slower.
949
 
 
950
 
    This changes recv, send, and sendall to add a fixed latency to each python
951
 
    call if a new roundtrip is detected. That is, when a recv is called and the
952
 
    flag new_roundtrip is set, latency is charged. Every send and send_all
953
 
    sets this flag.
954
 
 
955
 
    In addition every send, sendall and recv sleeps a bit per character send to
956
 
    simulate bandwidth.
957
 
 
958
 
    Not all methods are implemented, this is deliberate as this class is not a
959
 
    replacement for the builtin sockets layer. fileno is not implemented to
960
 
    prevent the proxy being bypassed. 
961
 
    """
962
 
 
963
 
    simulated_time = 0
964
 
    _proxied_arguments = dict.fromkeys([
965
 
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
966
 
        "setblocking", "setsockopt", "settimeout", "shutdown"])
967
 
 
968
 
    def __init__(self, sock, latency, bandwidth=1.0, 
969
 
                 really_sleep=True):
970
 
        """ 
971
 
        :param bandwith: simulated bandwith (MegaBit)
972
 
        :param really_sleep: If set to false, the SocketDelay will just
973
 
        increase a counter, instead of calling time.sleep. This is useful for
974
 
        unittesting the SocketDelay.
975
 
        """
976
 
        self.sock = sock
977
 
        self.latency = latency
978
 
        self.really_sleep = really_sleep
979
 
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
980
 
        self.new_roundtrip = False
981
 
 
982
 
    def sleep(self, s):
983
 
        if self.really_sleep:
984
 
            time.sleep(s)
985
 
        else:
986
 
            SocketDelay.simulated_time += s
987
 
 
988
 
    def __getattr__(self, attr):
989
 
        if attr in SocketDelay._proxied_arguments:
990
 
            return getattr(self.sock, attr)
991
 
        raise AttributeError("'SocketDelay' object has no attribute %r" %
992
 
                             attr)
993
 
 
994
 
    def dup(self):
995
 
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
996
 
                           self._sleep)
997
 
 
998
 
    def recv(self, *args):
999
 
        data = self.sock.recv(*args)
1000
 
        if data and self.new_roundtrip:
1001
 
            self.new_roundtrip = False
1002
 
            self.sleep(self.latency)
1003
 
        self.sleep(len(data) * self.time_per_byte)
1004
 
        return data
1005
 
 
1006
 
    def sendall(self, data, flags=0):
1007
 
        if not self.new_roundtrip:
1008
 
            self.new_roundtrip = True
1009
 
            self.sleep(self.latency)
1010
 
        self.sleep(len(data) * self.time_per_byte)
1011
 
        return self.sock.sendall(data, flags)
1012
 
 
1013
 
    def send(self, data, flags=0):
1014
 
        if not self.new_roundtrip:
1015
 
            self.new_roundtrip = True
1016
 
            self.sleep(self.latency)
1017
 
        bytes_sent = self.sock.send(data, flags)
1018
 
        self.sleep(bytes_sent * self.time_per_byte)
1019
 
        return bytes_sent
1020
 
 
1021
 
 
1022
 
class SFTPServer(Server):
1023
 
    """Common code for SFTP server facilities."""
1024
 
 
1025
 
    def __init__(self, server_interface=StubServer):
1026
 
        self._original_vendor = None
1027
 
        self._homedir = None
1028
 
        self._server_homedir = None
1029
 
        self._listener = None
1030
 
        self._root = None
1031
 
        self._vendor = ssh.ParamikoVendor()
1032
 
        self._server_interface = server_interface
1033
 
        # sftp server logs
1034
 
        self.logs = []
1035
 
        self.add_latency = 0
1036
 
 
1037
 
    def _get_sftp_url(self, path):
1038
 
        """Calculate an sftp url to this server for path."""
1039
 
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
1040
 
 
1041
 
    def log(self, message):
1042
 
        """StubServer uses this to log when a new server is created."""
1043
 
        self.logs.append(message)
1044
 
 
1045
 
    def _run_server_entry(self, sock):
1046
 
        """Entry point for all implementations of _run_server.
1047
 
        
1048
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
1049
 
        decorator.
1050
 
        """
1051
 
        if self.add_latency > 0.000001:
1052
 
            sock = SocketDelay(sock, self.add_latency)
1053
 
        return self._run_server(sock)
1054
 
 
1055
 
    def _run_server(self, s):
1056
 
        ssh_server = paramiko.Transport(s)
1057
 
        key_file = pathjoin(self._homedir, 'test_rsa.key')
1058
 
        f = open(key_file, 'w')
1059
 
        f.write(STUB_SERVER_KEY)
1060
 
        f.close()
1061
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
1062
 
        ssh_server.add_server_key(host_key)
1063
 
        server = self._server_interface(self)
1064
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
1065
 
                                         StubSFTPServer, root=self._root,
1066
 
                                         home=self._server_homedir)
1067
 
        event = threading.Event()
1068
 
        ssh_server.start_server(event, server)
1069
 
        event.wait(5.0)
1070
 
    
1071
 
    def setUp(self):
1072
 
        self._original_vendor = ssh._ssh_vendor
1073
 
        ssh._ssh_vendor = self._vendor
1074
 
        if sys.platform == 'win32':
1075
 
            # Win32 needs to use the UNICODE api
1076
 
            self._homedir = getcwd()
1077
 
        else:
1078
 
            # But Linux SFTP servers should just deal in bytestreams
1079
 
            self._homedir = os.getcwd()
1080
 
        if self._server_homedir is None:
1081
 
            self._server_homedir = self._homedir
1082
 
        self._root = '/'
1083
 
        if sys.platform == 'win32':
1084
 
            self._root = ''
1085
 
        self._listener = SocketListener(self._run_server_entry)
1086
 
        self._listener.setDaemon(True)
1087
 
        self._listener.start()
1088
 
 
1089
 
    def tearDown(self):
1090
 
        """See bzrlib.transport.Server.tearDown."""
1091
 
        self._listener.stop()
1092
 
        ssh._ssh_vendor = self._original_vendor
1093
 
 
1094
 
    def get_bogus_url(self):
1095
 
        """See bzrlib.transport.Server.get_bogus_url."""
1096
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
1097
 
        # we bind a random socket, so that we get a guaranteed unused port
1098
 
        # we just never listen on that port
1099
 
        s = socket.socket()
1100
 
        s.bind(('localhost', 0))
1101
 
        return 'sftp://%s:%s/' % s.getsockname()
1102
 
 
1103
 
 
1104
 
class SFTPFullAbsoluteServer(SFTPServer):
1105
 
    """A test server for sftp transports, using absolute urls and ssh."""
1106
 
 
1107
 
    def get_url(self):
1108
 
        """See bzrlib.transport.Server.get_url."""
1109
 
        return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1110
 
 
1111
 
 
1112
 
class SFTPServerWithoutSSH(SFTPServer):
1113
 
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
1114
 
 
1115
 
    def __init__(self):
1116
 
        super(SFTPServerWithoutSSH, self).__init__()
1117
 
        self._vendor = ssh.LoopbackVendor()
1118
 
 
1119
 
    def _run_server(self, sock):
1120
 
        # Re-import these as locals, so that they're still accessible during
1121
 
        # interpreter shutdown (when all module globals get set to None, leading
1122
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
1123
 
        class FakeChannel(object):
1124
 
            def get_transport(self):
1125
 
                return self
1126
 
            def get_log_channel(self):
1127
 
                return 'paramiko'
1128
 
            def get_name(self):
1129
 
                return '1'
1130
 
            def get_hexdump(self):
1131
 
                return False
1132
 
            def close(self):
1133
 
                pass
1134
 
 
1135
 
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
1136
 
                                     root=self._root, home=self._server_homedir)
1137
 
        try:
1138
 
            server.start_subsystem('sftp', None, sock)
1139
 
        except socket.error, e:
1140
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
1141
 
                # it's okay for the client to disconnect abruptly
1142
 
                # (bug in paramiko 1.6: it should absorb this exception)
1143
 
                pass
1144
 
            else:
1145
 
                raise
1146
 
        except Exception, e:
1147
 
            import sys; sys.stderr.write('\nEXCEPTION %r\n\n' % e.__class__)
1148
 
        server.finish_subsystem()
1149
 
 
1150
 
 
1151
 
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
1152
 
    """A test server for sftp transports, using absolute urls."""
1153
 
 
1154
 
    def get_url(self):
1155
 
        """See bzrlib.transport.Server.get_url."""
1156
 
        if sys.platform == 'win32':
1157
 
            return self._get_sftp_url(urlutils.escape(self._homedir))
1158
 
        else:
1159
 
            return self._get_sftp_url(urlutils.escape(self._homedir[1:]))
1160
 
 
1161
 
 
1162
 
class SFTPHomeDirServer(SFTPServerWithoutSSH):
1163
 
    """A test server for sftp transports, using homedir relative urls."""
1164
 
 
1165
 
    def get_url(self):
1166
 
        """See bzrlib.transport.Server.get_url."""
1167
 
        return self._get_sftp_url("~/")
1168
 
 
1169
 
 
1170
 
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
1171
 
    """A test servere for sftp transports, using absolute urls to non-home."""
1172
 
 
1173
 
    def setUp(self):
1174
 
        self._server_homedir = '/dev/noone/runs/tests/here'
1175
 
        super(SFTPSiblingAbsoluteServer, self).setUp()
1176
 
 
1177
 
 
1178
 
def _sftp_connect(host, port, username, password):
1179
 
    """Connect to the remote sftp server.
1180
 
 
1181
 
    :raises: a TransportError 'could not connect'.
1182
 
 
1183
 
    :returns: an paramiko.sftp_client.SFTPClient
1184
 
 
1185
 
    TODO: Raise a more reasonable ConnectionFailed exception
1186
 
    """
1187
 
    idx = (host, port, username)
1188
 
    try:
1189
 
        return _connected_hosts[idx]
1190
 
    except KeyError:
1191
 
        pass
1192
 
    
1193
 
    sftp = _sftp_connect_uncached(host, port, username, password)
1194
 
    _connected_hosts[idx] = sftp
1195
 
    return sftp
1196
 
 
1197
 
def _sftp_connect_uncached(host, port, username, password):
1198
 
    vendor = ssh._get_ssh_vendor()
1199
 
    sftp = vendor.connect_sftp(username, password, host, port)
1200
 
    return sftp
1201
 
 
1202
914
 
1203
915
def get_test_permutations():
1204
916
    """Return the permutations to be used in testing."""
1205
 
    return [(SFTPTransport, SFTPAbsoluteServer),
1206
 
            (SFTPTransport, SFTPHomeDirServer),
1207
 
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
917
    from bzrlib.tests import stub_sftp
 
918
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
 
919
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
 
920
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
1208
921
            ]