~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Martin Pool
  • Date: 2007-08-21 05:29:59 UTC
  • mto: This revision was merged to the branch mainline in revision 2779.
  • Revision ID: mbp@sourcefrog.net-20070821052959-5odvyjziwyuaeo3d
Move per-inventory tests from test_inv to tests.inventory_implementations

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
"""Implementation of Transport over SFTP, using paramiko."""
 
19
 
 
20
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
21
# then raise TransportNotPossible, which will break remote access to any
 
22
# formats which rely on OS-level locks.  That should be fine as those formats
 
23
# are pretty old, but these combinations may have to be removed from the test
 
24
# suite.  Those formats all date back to 0.7; so we should be able to remove
 
25
# these methods when we officially drop support for those formats.
 
26
 
 
27
import errno
 
28
import os
 
29
import random
 
30
import select
 
31
import socket
 
32
import stat
 
33
import sys
 
34
import time
 
35
import urllib
 
36
import urlparse
 
37
import warnings
 
38
 
 
39
from bzrlib import (
 
40
    errors,
 
41
    urlutils,
 
42
    )
 
43
from bzrlib.errors import (FileExists,
 
44
                           NoSuchFile, PathNotChild,
 
45
                           TransportError,
 
46
                           LockError,
 
47
                           PathError,
 
48
                           ParamikoNotPresent,
 
49
                           )
 
50
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
51
from bzrlib.symbol_versioning import (
 
52
        deprecated_function,
 
53
        zero_ninety,
 
54
        )
 
55
from bzrlib.trace import mutter, warning
 
56
from bzrlib.transport import (
 
57
    local,
 
58
    register_urlparse_netloc_protocol,
 
59
    Server,
 
60
    ssh,
 
61
    ConnectedTransport,
 
62
    )
 
63
 
 
64
# Disable one particular warning that comes from paramiko in Python2.5; if
 
65
# this is emitted at the wrong time it tends to cause spurious test failures
 
66
# or at least noise in the test case::
 
67
#
 
68
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
 
69
# test_permissions.TestSftpPermissions.test_new_files
 
70
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
 
71
#  self.packet.write(struct.pack('>I', n))
 
72
warnings.filterwarnings('ignore',
 
73
        'integer argument expected, got float',
 
74
        category=DeprecationWarning,
 
75
        module='paramiko.message')
 
76
 
 
77
try:
 
78
    import paramiko
 
79
except ImportError, e:
 
80
    raise ParamikoNotPresent(e)
 
81
else:
 
82
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
83
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
84
                               CMD_HANDLE, CMD_OPEN)
 
85
    from paramiko.sftp_attr import SFTPAttributes
 
86
    from paramiko.sftp_file import SFTPFile
 
87
 
 
88
 
 
89
register_urlparse_netloc_protocol('sftp')
 
90
 
 
91
 
 
92
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
93
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
94
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
95
 
 
96
 
 
97
@deprecated_function(zero_ninety)
 
98
def clear_connection_cache():
 
99
    """Remove all hosts from the SFTP connection cache.
 
100
 
 
101
    Primarily useful for test cases wanting to force garbage collection.
 
102
    We don't have a global connection cache anymore.
 
103
    """
 
104
 
 
105
class SFTPLock(object):
 
106
    """This fakes a lock in a remote location.
 
107
    
 
108
    A present lock is indicated just by the existence of a file.  This
 
109
    doesn't work well on all transports and they are only used in 
 
110
    deprecated storage formats.
 
111
    """
 
112
    
 
113
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
114
 
 
115
    def __init__(self, path, transport):
 
116
        assert isinstance(transport, SFTPTransport)
 
117
 
 
118
        self.lock_file = None
 
119
        self.path = path
 
120
        self.lock_path = path + '.write-lock'
 
121
        self.transport = transport
 
122
        try:
 
123
            # RBC 20060103 FIXME should we be using private methods here ?
 
124
            abspath = transport._remote_path(self.lock_path)
 
125
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
126
        except FileExists:
 
127
            raise LockError('File %r already locked' % (self.path,))
 
128
 
 
129
    def __del__(self):
 
130
        """Should this warn, or actually try to cleanup?"""
 
131
        if self.lock_file:
 
132
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
133
            self.unlock()
 
134
 
 
135
    def unlock(self):
 
136
        if not self.lock_file:
 
137
            return
 
138
        self.lock_file.close()
 
139
        self.lock_file = None
 
140
        try:
 
141
            self.transport.delete(self.lock_path)
 
142
        except (NoSuchFile,):
 
143
            # What specific errors should we catch here?
 
144
            pass
 
145
 
 
146
 
 
147
class SFTPTransport(ConnectedTransport):
 
148
    """Transport implementation for SFTP access."""
 
149
 
 
150
    _do_prefetch = _default_do_prefetch
 
151
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
152
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
153
    #       but the performance curve is pretty flat, so just going with
 
154
    #       reasonable defaults.
 
155
    _max_readv_combine = 200
 
156
    # Having to round trip to the server means waiting for a response,
 
157
    # so it is better to download extra bytes.
 
158
    # 8KiB had good performance for both local and remote network operations
 
159
    _bytes_to_read_before_seek = 8192
 
160
 
 
161
    # The sftp spec says that implementations SHOULD allow reads
 
162
    # to be at least 32K. paramiko.readv() does an async request
 
163
    # for the chunks. So we need to keep it within a single request
 
164
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
165
    # up the request itself, rather than us having to worry about it
 
166
    _max_request_size = 32768
 
167
 
 
168
    def __init__(self, base, _from_transport=None):
 
169
        assert base.startswith('sftp://')
 
170
        super(SFTPTransport, self).__init__(base,
 
171
                                            _from_transport=_from_transport)
 
172
 
 
173
    def _remote_path(self, relpath):
 
174
        """Return the path to be passed along the sftp protocol for relpath.
 
175
        
 
176
        :param relpath: is a urlencoded string.
 
177
        """
 
178
        relative = urlutils.unescape(relpath).encode('utf-8')
 
179
        remote_path = self._combine_paths(self._path, relative)
 
180
        # the initial slash should be removed from the path, and treated as a
 
181
        # homedir relative path (the path begins with a double slash if it is
 
182
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
183
        # RBC 20060118 we are not using this as its too user hostile. instead
 
184
        # we are following lftp and using /~/foo to mean '~/foo'
 
185
        # vila--20070602 and leave absolute paths begin with a single slash.
 
186
        if remote_path.startswith('/~/'):
 
187
            remote_path = remote_path[3:]
 
188
        elif remote_path == '/~':
 
189
            remote_path = ''
 
190
        return remote_path
 
191
 
 
192
    def _create_connection(self, credentials=None):
 
193
        """Create a new connection with the provided credentials.
 
194
 
 
195
        :param credentials: The credentials needed to establish the connection.
 
196
 
 
197
        :return: The created connection and its associated credentials.
 
198
 
 
199
        The credentials are only the password as it may have been entered
 
200
        interactively by the user and may be different from the one provided
 
201
        in base url at transport creation time.
 
202
        """
 
203
        if credentials is None:
 
204
            password = self._password
 
205
        else:
 
206
            password = credentials
 
207
 
 
208
        vendor = ssh._get_ssh_vendor()
 
209
        connection = vendor.connect_sftp(self._user, password,
 
210
                                         self._host, self._port)
 
211
        return connection, password
 
212
 
 
213
    def _get_sftp(self):
 
214
        """Ensures that a connection is established"""
 
215
        connection = self._get_connection()
 
216
        if connection is None:
 
217
            # First connection ever
 
218
            connection, credentials = self._create_connection()
 
219
            self._set_connection(connection, credentials)
 
220
        return connection
 
221
 
 
222
    def has(self, relpath):
 
223
        """
 
224
        Does the target location exist?
 
225
        """
 
226
        try:
 
227
            self._get_sftp().stat(self._remote_path(relpath))
 
228
            return True
 
229
        except IOError:
 
230
            return False
 
231
 
 
232
    def get(self, relpath):
 
233
        """
 
234
        Get the file at the given relative path.
 
235
 
 
236
        :param relpath: The relative path to the file
 
237
        """
 
238
        try:
 
239
            path = self._remote_path(relpath)
 
240
            f = self._get_sftp().file(path, mode='rb')
 
241
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
242
                f.prefetch()
 
243
            return f
 
244
        except (IOError, paramiko.SSHException), e:
 
245
            self._translate_io_exception(e, path, ': error retrieving',
 
246
                failure_exc=errors.ReadError)
 
247
 
 
248
    def readv(self, relpath, offsets):
 
249
        """See Transport.readv()"""
 
250
        # We overload the default readv() because we want to use a file
 
251
        # that does not have prefetch enabled.
 
252
        # Also, if we have a new paramiko, it implements an async readv()
 
253
        if not offsets:
 
254
            return
 
255
 
 
256
        try:
 
257
            path = self._remote_path(relpath)
 
258
            fp = self._get_sftp().file(path, mode='rb')
 
259
            readv = getattr(fp, 'readv', None)
 
260
            if readv:
 
261
                return self._sftp_readv(fp, offsets, relpath)
 
262
            mutter('seek and read %s offsets', len(offsets))
 
263
            return self._seek_and_read(fp, offsets, relpath)
 
264
        except (IOError, paramiko.SSHException), e:
 
265
            self._translate_io_exception(e, path, ': error retrieving')
 
266
 
 
267
    def _sftp_readv(self, fp, offsets, relpath='<unknown>'):
 
268
        """Use the readv() member of fp to do async readv.
 
269
 
 
270
        And then read them using paramiko.readv(). paramiko.readv()
 
271
        does not support ranges > 64K, so it caps the request size, and
 
272
        just reads until it gets all the stuff it wants
 
273
        """
 
274
        offsets = list(offsets)
 
275
        sorted_offsets = sorted(offsets)
 
276
 
 
277
        # The algorithm works as follows:
 
278
        # 1) Coalesce nearby reads into a single chunk
 
279
        #    This generates a list of combined regions, the total size
 
280
        #    and the size of the sub regions. This coalescing step is limited
 
281
        #    in the number of nearby chunks to combine, and is allowed to
 
282
        #    skip small breaks in the requests. Limiting it makes sure that
 
283
        #    we can start yielding some data earlier, and skipping means we
 
284
        #    make fewer requests. (Beneficial even when using async)
 
285
        # 2) Break up this combined regions into chunks that are smaller
 
286
        #    than 64KiB. Technically the limit is 65536, but we are a
 
287
        #    little bit conservative. This is because sftp has a maximum
 
288
        #    return chunk size of 64KiB (max size of an unsigned short)
 
289
        # 3) Issue a readv() to paramiko to create an async request for
 
290
        #    all of this data
 
291
        # 4) Read in the data as it comes back, until we've read one
 
292
        #    continuous section as determined in step 1
 
293
        # 5) Break up the full sections into hunks for the original requested
 
294
        #    offsets. And put them in a cache
 
295
        # 6) Check if the next request is in the cache, and if it is, remove
 
296
        #    it from the cache, and yield its data. Continue until no more
 
297
        #    entries are in the cache.
 
298
        # 7) loop back to step 4 until all data has been read
 
299
        #
 
300
        # TODO: jam 20060725 This could be optimized one step further, by
 
301
        #       attempting to yield whatever data we have read, even before
 
302
        #       the first coallesced section has been fully processed.
 
303
 
 
304
        # When coalescing for use with readv(), we don't really need to
 
305
        # use any fudge factor, because the requests are made asynchronously
 
306
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
307
                               limit=self._max_readv_combine,
 
308
                               fudge_factor=0,
 
309
                               ))
 
310
        requests = []
 
311
        for c_offset in coalesced:
 
312
            start = c_offset.start
 
313
            size = c_offset.length
 
314
 
 
315
            # We need to break this up into multiple requests
 
316
            while size > 0:
 
317
                next_size = min(size, self._max_request_size)
 
318
                requests.append((start, next_size))
 
319
                size -= next_size
 
320
                start += next_size
 
321
 
 
322
        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
 
323
                len(offsets), len(coalesced), len(requests))
 
324
 
 
325
        # Queue the current read until we have read the full coalesced section
 
326
        cur_data = []
 
327
        cur_data_len = 0
 
328
        cur_coalesced_stack = iter(coalesced)
 
329
        cur_coalesced = cur_coalesced_stack.next()
 
330
 
 
331
        # Cache the results, but only until they have been fulfilled
 
332
        data_map = {}
 
333
        # turn the list of offsets into a stack
 
334
        offset_stack = iter(offsets)
 
335
        cur_offset_and_size = offset_stack.next()
 
336
 
 
337
        for data in fp.readv(requests):
 
338
            cur_data += data
 
339
            cur_data_len += len(data)
 
340
 
 
341
            if cur_data_len < cur_coalesced.length:
 
342
                continue
 
343
            assert cur_data_len == cur_coalesced.length, \
 
344
                "Somehow we read too much: %s != %s" % (cur_data_len,
 
345
                                                        cur_coalesced.length)
 
346
            all_data = ''.join(cur_data)
 
347
            cur_data = []
 
348
            cur_data_len = 0
 
349
 
 
350
            for suboffset, subsize in cur_coalesced.ranges:
 
351
                key = (cur_coalesced.start+suboffset, subsize)
 
352
                data_map[key] = all_data[suboffset:suboffset+subsize]
 
353
 
 
354
            # Now that we've read some data, see if we can yield anything back
 
355
            while cur_offset_and_size in data_map:
 
356
                this_data = data_map.pop(cur_offset_and_size)
 
357
                yield cur_offset_and_size[0], this_data
 
358
                cur_offset_and_size = offset_stack.next()
 
359
 
 
360
            # We read a coalesced entry, so mark it as done
 
361
            cur_coalesced = None
 
362
            # Now that we've read all of the data for this coalesced section
 
363
            # on to the next
 
364
            cur_coalesced = cur_coalesced_stack.next()
 
365
 
 
366
        if cur_coalesced is not None:
 
367
            raise errors.ShortReadvError(relpath, cur_coalesced.start,
 
368
                cur_coalesced.length, len(data))
 
369
 
 
370
    def put_file(self, relpath, f, mode=None):
 
371
        """
 
372
        Copy the file-like object into the location.
 
373
 
 
374
        :param relpath: Location to put the contents, relative to base.
 
375
        :param f:       File-like object.
 
376
        :param mode: The final mode for the file
 
377
        """
 
378
        final_path = self._remote_path(relpath)
 
379
        self._put(final_path, f, mode=mode)
 
380
 
 
381
    def _put(self, abspath, f, mode=None):
 
382
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
383
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
384
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
385
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
386
        closed = False
 
387
        try:
 
388
            try:
 
389
                fout.set_pipelined(True)
 
390
                self._pump(f, fout)
 
391
            except (IOError, paramiko.SSHException), e:
 
392
                self._translate_io_exception(e, tmp_abspath)
 
393
            # XXX: This doesn't truly help like we would like it to.
 
394
            #      The problem is that openssh strips sticky bits. So while we
 
395
            #      can properly set group write permission, we lose the group
 
396
            #      sticky bit. So it is probably best to stop chmodding, and
 
397
            #      just tell users that they need to set the umask correctly.
 
398
            #      The attr.st_mode = mode, in _sftp_open_exclusive
 
399
            #      will handle when the user wants the final mode to be more 
 
400
            #      restrictive. And then we avoid a round trip. Unless 
 
401
            #      paramiko decides to expose an async chmod()
 
402
 
 
403
            # This is designed to chmod() right before we close.
 
404
            # Because we set_pipelined() earlier, theoretically we might 
 
405
            # avoid the round trip for fout.close()
 
406
            if mode is not None:
 
407
                self._get_sftp().chmod(tmp_abspath, mode)
 
408
            fout.close()
 
409
            closed = True
 
410
            self._rename_and_overwrite(tmp_abspath, abspath)
 
411
        except Exception, e:
 
412
            # If we fail, try to clean up the temporary file
 
413
            # before we throw the exception
 
414
            # but don't let another exception mess things up
 
415
            # Write out the traceback, because otherwise
 
416
            # the catch and throw destroys it
 
417
            import traceback
 
418
            mutter(traceback.format_exc())
 
419
            try:
 
420
                if not closed:
 
421
                    fout.close()
 
422
                self._get_sftp().remove(tmp_abspath)
 
423
            except:
 
424
                # raise the saved except
 
425
                raise e
 
426
            # raise the original with its traceback if we can.
 
427
            raise
 
428
 
 
429
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
 
430
                               create_parent_dir=False,
 
431
                               dir_mode=None):
 
432
        abspath = self._remote_path(relpath)
 
433
 
 
434
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
435
        #       set the file mode at create time. If it does, use it.
 
436
        #       But for now, we just chmod later anyway.
 
437
 
 
438
        def _open_and_write_file():
 
439
            """Try to open the target file, raise error on failure"""
 
440
            fout = None
 
441
            try:
 
442
                try:
 
443
                    fout = self._get_sftp().file(abspath, mode='wb')
 
444
                    fout.set_pipelined(True)
 
445
                    writer(fout)
 
446
                except (paramiko.SSHException, IOError), e:
 
447
                    self._translate_io_exception(e, abspath,
 
448
                                                 ': unable to open')
 
449
 
 
450
                # This is designed to chmod() right before we close.
 
451
                # Because we set_pipelined() earlier, theoretically we might 
 
452
                # avoid the round trip for fout.close()
 
453
                if mode is not None:
 
454
                    self._get_sftp().chmod(abspath, mode)
 
455
            finally:
 
456
                if fout is not None:
 
457
                    fout.close()
 
458
 
 
459
        if not create_parent_dir:
 
460
            _open_and_write_file()
 
461
            return
 
462
 
 
463
        # Try error handling to create the parent directory if we need to
 
464
        try:
 
465
            _open_and_write_file()
 
466
        except NoSuchFile:
 
467
            # Try to create the parent directory, and then go back to
 
468
            # writing the file
 
469
            parent_dir = os.path.dirname(abspath)
 
470
            self._mkdir(parent_dir, dir_mode)
 
471
            _open_and_write_file()
 
472
 
 
473
    def put_file_non_atomic(self, relpath, f, mode=None,
 
474
                            create_parent_dir=False,
 
475
                            dir_mode=None):
 
476
        """Copy the file-like object into the target location.
 
477
 
 
478
        This function is not strictly safe to use. It is only meant to
 
479
        be used when you already know that the target does not exist.
 
480
        It is not safe, because it will open and truncate the remote
 
481
        file. So there may be a time when the file has invalid contents.
 
482
 
 
483
        :param relpath: The remote location to put the contents.
 
484
        :param f:       File-like object.
 
485
        :param mode:    Possible access permissions for new file.
 
486
                        None means do not set remote permissions.
 
487
        :param create_parent_dir: If we cannot create the target file because
 
488
                        the parent directory does not exist, go ahead and
 
489
                        create it, and then try again.
 
490
        """
 
491
        def writer(fout):
 
492
            self._pump(f, fout)
 
493
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
494
                                    create_parent_dir=create_parent_dir,
 
495
                                    dir_mode=dir_mode)
 
496
 
 
497
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
498
                             create_parent_dir=False,
 
499
                             dir_mode=None):
 
500
        def writer(fout):
 
501
            fout.write(bytes)
 
502
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
503
                                    create_parent_dir=create_parent_dir,
 
504
                                    dir_mode=dir_mode)
 
505
 
 
506
    def iter_files_recursive(self):
 
507
        """Walk the relative paths of all files in this transport."""
 
508
        queue = list(self.list_dir('.'))
 
509
        while queue:
 
510
            relpath = queue.pop(0)
 
511
            st = self.stat(relpath)
 
512
            if stat.S_ISDIR(st.st_mode):
 
513
                for i, basename in enumerate(self.list_dir(relpath)):
 
514
                    queue.insert(i, relpath+'/'+basename)
 
515
            else:
 
516
                yield relpath
 
517
 
 
518
    def _mkdir(self, abspath, mode=None):
 
519
        if mode is None:
 
520
            local_mode = 0777
 
521
        else:
 
522
            local_mode = mode
 
523
        try:
 
524
            self._get_sftp().mkdir(abspath, local_mode)
 
525
            if mode is not None:
 
526
                self._get_sftp().chmod(abspath, mode=mode)
 
527
        except (paramiko.SSHException, IOError), e:
 
528
            self._translate_io_exception(e, abspath, ': unable to mkdir',
 
529
                failure_exc=FileExists)
 
530
 
 
531
    def mkdir(self, relpath, mode=None):
 
532
        """Create a directory at the given path."""
 
533
        self._mkdir(self._remote_path(relpath), mode=mode)
 
534
 
 
535
    def _translate_io_exception(self, e, path, more_info='',
 
536
                                failure_exc=PathError):
 
537
        """Translate a paramiko or IOError into a friendlier exception.
 
538
 
 
539
        :param e: The original exception
 
540
        :param path: The path in question when the error is raised
 
541
        :param more_info: Extra information that can be included,
 
542
                          such as what was going on
 
543
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
544
                           opaque errors that just set "e.args = ('Failure',)" with
 
545
                           no more information.
 
546
                           If this parameter is set, it defines the exception 
 
547
                           to raise in these cases.
 
548
        """
 
549
        # paramiko seems to generate detailless errors.
 
550
        self._translate_error(e, path, raise_generic=False)
 
551
        if getattr(e, 'args', None) is not None:
 
552
            if (e.args == ('No such file or directory',) or
 
553
                e.args == ('No such file',)):
 
554
                raise NoSuchFile(path, str(e) + more_info)
 
555
            if (e.args == ('mkdir failed',)):
 
556
                raise FileExists(path, str(e) + more_info)
 
557
            # strange but true, for the paramiko server.
 
558
            if (e.args == ('Failure',)):
 
559
                raise failure_exc(path, str(e) + more_info)
 
560
            mutter('Raising exception with args %s', e.args)
 
561
        if getattr(e, 'errno', None) is not None:
 
562
            mutter('Raising exception with errno %s', e.errno)
 
563
        raise e
 
564
 
 
565
    def append_file(self, relpath, f, mode=None):
 
566
        """
 
567
        Append the text in the file-like object into the final
 
568
        location.
 
569
        """
 
570
        try:
 
571
            path = self._remote_path(relpath)
 
572
            fout = self._get_sftp().file(path, 'ab')
 
573
            if mode is not None:
 
574
                self._get_sftp().chmod(path, mode)
 
575
            result = fout.tell()
 
576
            self._pump(f, fout)
 
577
            return result
 
578
        except (IOError, paramiko.SSHException), e:
 
579
            self._translate_io_exception(e, relpath, ': unable to append')
 
580
 
 
581
    def rename(self, rel_from, rel_to):
 
582
        """Rename without special overwriting"""
 
583
        try:
 
584
            self._get_sftp().rename(self._remote_path(rel_from),
 
585
                              self._remote_path(rel_to))
 
586
        except (IOError, paramiko.SSHException), e:
 
587
            self._translate_io_exception(e, rel_from,
 
588
                    ': unable to rename to %r' % (rel_to))
 
589
 
 
590
    def _rename_and_overwrite(self, abs_from, abs_to):
 
591
        """Do a fancy rename on the remote server.
 
592
        
 
593
        Using the implementation provided by osutils.
 
594
        """
 
595
        try:
 
596
            sftp = self._get_sftp()
 
597
            fancy_rename(abs_from, abs_to,
 
598
                         rename_func=sftp.rename,
 
599
                         unlink_func=sftp.remove)
 
600
        except (IOError, paramiko.SSHException), e:
 
601
            self._translate_io_exception(e, abs_from,
 
602
                                         ': unable to rename to %r' % (abs_to))
 
603
 
 
604
    def move(self, rel_from, rel_to):
 
605
        """Move the item at rel_from to the location at rel_to"""
 
606
        path_from = self._remote_path(rel_from)
 
607
        path_to = self._remote_path(rel_to)
 
608
        self._rename_and_overwrite(path_from, path_to)
 
609
 
 
610
    def delete(self, relpath):
 
611
        """Delete the item at relpath"""
 
612
        path = self._remote_path(relpath)
 
613
        try:
 
614
            self._get_sftp().remove(path)
 
615
        except (IOError, paramiko.SSHException), e:
 
616
            self._translate_io_exception(e, path, ': unable to delete')
 
617
            
 
618
    def external_url(self):
 
619
        """See bzrlib.transport.Transport.external_url."""
 
620
        # the external path for SFTP is the base
 
621
        return self.base
 
622
 
 
623
    def listable(self):
 
624
        """Return True if this store supports listing."""
 
625
        return True
 
626
 
 
627
    def list_dir(self, relpath):
 
628
        """
 
629
        Return a list of all files at the given location.
 
630
        """
 
631
        # does anything actually use this?
 
632
        # -- Unknown
 
633
        # This is at least used by copy_tree for remote upgrades.
 
634
        # -- David Allouche 2006-08-11
 
635
        path = self._remote_path(relpath)
 
636
        try:
 
637
            entries = self._get_sftp().listdir(path)
 
638
        except (IOError, paramiko.SSHException), e:
 
639
            self._translate_io_exception(e, path, ': failed to list_dir')
 
640
        return [urlutils.escape(entry) for entry in entries]
 
641
 
 
642
    def rmdir(self, relpath):
 
643
        """See Transport.rmdir."""
 
644
        path = self._remote_path(relpath)
 
645
        try:
 
646
            return self._get_sftp().rmdir(path)
 
647
        except (IOError, paramiko.SSHException), e:
 
648
            self._translate_io_exception(e, path, ': failed to rmdir')
 
649
 
 
650
    def stat(self, relpath):
 
651
        """Return the stat information for a file."""
 
652
        path = self._remote_path(relpath)
 
653
        try:
 
654
            return self._get_sftp().stat(path)
 
655
        except (IOError, paramiko.SSHException), e:
 
656
            self._translate_io_exception(e, path, ': unable to stat')
 
657
 
 
658
    def lock_read(self, relpath):
 
659
        """
 
660
        Lock the given file for shared (read) access.
 
661
        :return: A lock object, which has an unlock() member function
 
662
        """
 
663
        # FIXME: there should be something clever i can do here...
 
664
        class BogusLock(object):
 
665
            def __init__(self, path):
 
666
                self.path = path
 
667
            def unlock(self):
 
668
                pass
 
669
        return BogusLock(relpath)
 
670
 
 
671
    def lock_write(self, relpath):
 
672
        """
 
673
        Lock the given file for exclusive (write) access.
 
674
        WARNING: many transports do not support this, so trying avoid using it
 
675
 
 
676
        :return: A lock object, which has an unlock() member function
 
677
        """
 
678
        # This is a little bit bogus, but basically, we create a file
 
679
        # which should not already exist, and if it does, we assume
 
680
        # that there is a lock, and if it doesn't, the we assume
 
681
        # that we have taken the lock.
 
682
        return SFTPLock(relpath, self)
 
683
 
 
684
    def _sftp_open_exclusive(self, abspath, mode=None):
 
685
        """Open a remote path exclusively.
 
686
 
 
687
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
688
        the file already exists. However it does not expose this
 
689
        at the higher level of SFTPClient.open(), so we have to
 
690
        sneak away with it.
 
691
 
 
692
        WARNING: This breaks the SFTPClient abstraction, so it
 
693
        could easily break against an updated version of paramiko.
 
694
 
 
695
        :param abspath: The remote absolute path where the file should be opened
 
696
        :param mode: The mode permissions bits for the new file
 
697
        """
 
698
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
 
699
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
 
700
        #       However, there is no way to set the permission mode at open 
 
701
        #       time using the sftp_client.file() functionality.
 
702
        path = self._get_sftp()._adjust_cwd(abspath)
 
703
        # mutter('sftp abspath %s => %s', abspath, path)
 
704
        attr = SFTPAttributes()
 
705
        if mode is not None:
 
706
            attr.st_mode = mode
 
707
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE 
 
708
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
709
        try:
 
710
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
 
711
            if t != CMD_HANDLE:
 
712
                raise TransportError('Expected an SFTP handle')
 
713
            handle = msg.get_string()
 
714
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
 
715
        except (paramiko.SSHException, IOError), e:
 
716
            self._translate_io_exception(e, abspath, ': unable to open',
 
717
                failure_exc=FileExists)
 
718
 
 
719
    def _can_roundtrip_unix_modebits(self):
 
720
        if sys.platform == 'win32':
 
721
            # anyone else?
 
722
            return False
 
723
        else:
 
724
            return True
 
725
 
 
726
# ------------- server test implementation --------------
 
727
import threading
 
728
 
 
729
from bzrlib.tests.stub_sftp import StubServer, StubSFTPServer
 
730
 
 
731
STUB_SERVER_KEY = """
 
732
-----BEGIN RSA PRIVATE KEY-----
 
733
MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
 
734
oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
 
735
d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
 
736
gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
 
737
EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
 
738
soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
 
739
tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
 
740
avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
 
741
4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
 
742
H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
 
743
qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
 
744
HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
 
745
nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
 
746
-----END RSA PRIVATE KEY-----
 
747
"""
 
748
 
 
749
 
 
750
class SocketListener(threading.Thread):
 
751
 
 
752
    def __init__(self, callback):
 
753
        threading.Thread.__init__(self)
 
754
        self._callback = callback
 
755
        self._socket = socket.socket()
 
756
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
757
        self._socket.bind(('localhost', 0))
 
758
        self._socket.listen(1)
 
759
        self.port = self._socket.getsockname()[1]
 
760
        self._stop_event = threading.Event()
 
761
 
 
762
    def stop(self):
 
763
        # called from outside this thread
 
764
        self._stop_event.set()
 
765
        # use a timeout here, because if the test fails, the server thread may
 
766
        # never notice the stop_event.
 
767
        self.join(5.0)
 
768
        self._socket.close()
 
769
 
 
770
    def run(self):
 
771
        while True:
 
772
            readable, writable_unused, exception_unused = \
 
773
                select.select([self._socket], [], [], 0.1)
 
774
            if self._stop_event.isSet():
 
775
                return
 
776
            if len(readable) == 0:
 
777
                continue
 
778
            try:
 
779
                s, addr_unused = self._socket.accept()
 
780
                # because the loopback socket is inline, and transports are
 
781
                # never explicitly closed, best to launch a new thread.
 
782
                threading.Thread(target=self._callback, args=(s,)).start()
 
783
            except socket.error, x:
 
784
                sys.excepthook(*sys.exc_info())
 
785
                warning('Socket error during accept() within unit test server'
 
786
                        ' thread: %r' % x)
 
787
            except Exception, x:
 
788
                # probably a failed test; unit test thread will log the
 
789
                # failure/error
 
790
                sys.excepthook(*sys.exc_info())
 
791
                warning('Exception from within unit test server thread: %r' % 
 
792
                        x)
 
793
 
 
794
 
 
795
class SocketDelay(object):
 
796
    """A socket decorator to make TCP appear slower.
 
797
 
 
798
    This changes recv, send, and sendall to add a fixed latency to each python
 
799
    call if a new roundtrip is detected. That is, when a recv is called and the
 
800
    flag new_roundtrip is set, latency is charged. Every send and send_all
 
801
    sets this flag.
 
802
 
 
803
    In addition every send, sendall and recv sleeps a bit per character send to
 
804
    simulate bandwidth.
 
805
 
 
806
    Not all methods are implemented, this is deliberate as this class is not a
 
807
    replacement for the builtin sockets layer. fileno is not implemented to
 
808
    prevent the proxy being bypassed. 
 
809
    """
 
810
 
 
811
    simulated_time = 0
 
812
    _proxied_arguments = dict.fromkeys([
 
813
        "close", "getpeername", "getsockname", "getsockopt", "gettimeout",
 
814
        "setblocking", "setsockopt", "settimeout", "shutdown"])
 
815
 
 
816
    def __init__(self, sock, latency, bandwidth=1.0, 
 
817
                 really_sleep=True):
 
818
        """ 
 
819
        :param bandwith: simulated bandwith (MegaBit)
 
820
        :param really_sleep: If set to false, the SocketDelay will just
 
821
        increase a counter, instead of calling time.sleep. This is useful for
 
822
        unittesting the SocketDelay.
 
823
        """
 
824
        self.sock = sock
 
825
        self.latency = latency
 
826
        self.really_sleep = really_sleep
 
827
        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
 
828
        self.new_roundtrip = False
 
829
 
 
830
    def sleep(self, s):
 
831
        if self.really_sleep:
 
832
            time.sleep(s)
 
833
        else:
 
834
            SocketDelay.simulated_time += s
 
835
 
 
836
    def __getattr__(self, attr):
 
837
        if attr in SocketDelay._proxied_arguments:
 
838
            return getattr(self.sock, attr)
 
839
        raise AttributeError("'SocketDelay' object has no attribute %r" %
 
840
                             attr)
 
841
 
 
842
    def dup(self):
 
843
        return SocketDelay(self.sock.dup(), self.latency, self.time_per_byte,
 
844
                           self._sleep)
 
845
 
 
846
    def recv(self, *args):
 
847
        data = self.sock.recv(*args)
 
848
        if data and self.new_roundtrip:
 
849
            self.new_roundtrip = False
 
850
            self.sleep(self.latency)
 
851
        self.sleep(len(data) * self.time_per_byte)
 
852
        return data
 
853
 
 
854
    def sendall(self, data, flags=0):
 
855
        if not self.new_roundtrip:
 
856
            self.new_roundtrip = True
 
857
            self.sleep(self.latency)
 
858
        self.sleep(len(data) * self.time_per_byte)
 
859
        return self.sock.sendall(data, flags)
 
860
 
 
861
    def send(self, data, flags=0):
 
862
        if not self.new_roundtrip:
 
863
            self.new_roundtrip = True
 
864
            self.sleep(self.latency)
 
865
        bytes_sent = self.sock.send(data, flags)
 
866
        self.sleep(bytes_sent * self.time_per_byte)
 
867
        return bytes_sent
 
868
 
 
869
 
 
870
class SFTPServer(Server):
 
871
    """Common code for SFTP server facilities."""
 
872
 
 
873
    def __init__(self, server_interface=StubServer):
 
874
        self._original_vendor = None
 
875
        self._homedir = None
 
876
        self._server_homedir = None
 
877
        self._listener = None
 
878
        self._root = None
 
879
        self._vendor = ssh.ParamikoVendor()
 
880
        self._server_interface = server_interface
 
881
        # sftp server logs
 
882
        self.logs = []
 
883
        self.add_latency = 0
 
884
 
 
885
    def _get_sftp_url(self, path):
 
886
        """Calculate an sftp url to this server for path."""
 
887
        return 'sftp://foo:bar@localhost:%d/%s' % (self._listener.port, path)
 
888
 
 
889
    def log(self, message):
 
890
        """StubServer uses this to log when a new server is created."""
 
891
        self.logs.append(message)
 
892
 
 
893
    def _run_server_entry(self, sock):
 
894
        """Entry point for all implementations of _run_server.
 
895
        
 
896
        If self.add_latency is > 0.000001 then sock is given a latency adding
 
897
        decorator.
 
898
        """
 
899
        if self.add_latency > 0.000001:
 
900
            sock = SocketDelay(sock, self.add_latency)
 
901
        return self._run_server(sock)
 
902
 
 
903
    def _run_server(self, s):
 
904
        ssh_server = paramiko.Transport(s)
 
905
        key_file = pathjoin(self._homedir, 'test_rsa.key')
 
906
        f = open(key_file, 'w')
 
907
        f.write(STUB_SERVER_KEY)
 
908
        f.close()
 
909
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
910
        ssh_server.add_server_key(host_key)
 
911
        server = self._server_interface(self)
 
912
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
913
                                         StubSFTPServer, root=self._root,
 
914
                                         home=self._server_homedir)
 
915
        event = threading.Event()
 
916
        ssh_server.start_server(event, server)
 
917
        event.wait(5.0)
 
918
    
 
919
    def setUp(self, backing_server=None):
 
920
        # XXX: TODO: make sftpserver back onto backing_server rather than local
 
921
        # disk.
 
922
        assert (backing_server is None or
 
923
                isinstance(backing_server, local.LocalURLServer)), (
 
924
            "backing_server should not be %r, because this can only serve the "
 
925
            "local current working directory." % (backing_server,))
 
926
        self._original_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
927
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._vendor
 
928
        if sys.platform == 'win32':
 
929
            # Win32 needs to use the UNICODE api
 
930
            self._homedir = getcwd()
 
931
        else:
 
932
            # But Linux SFTP servers should just deal in bytestreams
 
933
            self._homedir = os.getcwd()
 
934
        if self._server_homedir is None:
 
935
            self._server_homedir = self._homedir
 
936
        self._root = '/'
 
937
        if sys.platform == 'win32':
 
938
            self._root = ''
 
939
        self._listener = SocketListener(self._run_server_entry)
 
940
        self._listener.setDaemon(True)
 
941
        self._listener.start()
 
942
 
 
943
    def tearDown(self):
 
944
        """See bzrlib.transport.Server.tearDown."""
 
945
        self._listener.stop()
 
946
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
947
 
 
948
    def get_bogus_url(self):
 
949
        """See bzrlib.transport.Server.get_bogus_url."""
 
950
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
951
        # we bind a random socket, so that we get a guaranteed unused port
 
952
        # we just never listen on that port
 
953
        s = socket.socket()
 
954
        s.bind(('localhost', 0))
 
955
        return 'sftp://%s:%s/' % s.getsockname()
 
956
 
 
957
 
 
958
class SFTPFullAbsoluteServer(SFTPServer):
 
959
    """A test server for sftp transports, using absolute urls and ssh."""
 
960
 
 
961
    def get_url(self):
 
962
        """See bzrlib.transport.Server.get_url."""
 
963
        homedir = self._homedir
 
964
        if sys.platform != 'win32':
 
965
            # Remove the initial '/' on all platforms but win32
 
966
            homedir = homedir[1:]
 
967
        return self._get_sftp_url(urlutils.escape(homedir))
 
968
 
 
969
 
 
970
class SFTPServerWithoutSSH(SFTPServer):
 
971
    """An SFTP server that uses a simple TCP socket pair rather than SSH."""
 
972
 
 
973
    def __init__(self):
 
974
        super(SFTPServerWithoutSSH, self).__init__()
 
975
        self._vendor = ssh.LoopbackVendor()
 
976
 
 
977
    def _run_server(self, sock):
 
978
        # Re-import these as locals, so that they're still accessible during
 
979
        # interpreter shutdown (when all module globals get set to None, leading
 
980
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
981
        class FakeChannel(object):
 
982
            def get_transport(self):
 
983
                return self
 
984
            def get_log_channel(self):
 
985
                return 'paramiko'
 
986
            def get_name(self):
 
987
                return '1'
 
988
            def get_hexdump(self):
 
989
                return False
 
990
            def close(self):
 
991
                pass
 
992
 
 
993
        server = paramiko.SFTPServer(FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
 
994
                                     root=self._root, home=self._server_homedir)
 
995
        try:
 
996
            server.start_subsystem('sftp', None, sock)
 
997
        except socket.error, e:
 
998
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
999
                # it's okay for the client to disconnect abruptly
 
1000
                # (bug in paramiko 1.6: it should absorb this exception)
 
1001
                pass
 
1002
            else:
 
1003
                raise
 
1004
        except Exception, e:
 
1005
            # This typically seems to happen during interpreter shutdown, so
 
1006
            # most of the useful ways to report this error are won't work.
 
1007
            # Writing the exception type, and then the text of the exception,
 
1008
            # seems to be the best we can do.
 
1009
            import sys
 
1010
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
 
1011
            sys.stderr.write('%s\n\n' % (e,))
 
1012
        server.finish_subsystem()
 
1013
 
 
1014
 
 
1015
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
 
1016
    """A test server for sftp transports, using absolute urls."""
 
1017
 
 
1018
    def get_url(self):
 
1019
        """See bzrlib.transport.Server.get_url."""
 
1020
        homedir = self._homedir
 
1021
        if sys.platform != 'win32':
 
1022
            # Remove the initial '/' on all platforms but win32
 
1023
            homedir = homedir[1:]
 
1024
        return self._get_sftp_url(urlutils.escape(homedir))
 
1025
 
 
1026
 
 
1027
class SFTPHomeDirServer(SFTPServerWithoutSSH):
 
1028
    """A test server for sftp transports, using homedir relative urls."""
 
1029
 
 
1030
    def get_url(self):
 
1031
        """See bzrlib.transport.Server.get_url."""
 
1032
        return self._get_sftp_url("~/")
 
1033
 
 
1034
 
 
1035
class SFTPSiblingAbsoluteServer(SFTPAbsoluteServer):
 
1036
    """A test server for sftp transports where only absolute paths will work.
 
1037
 
 
1038
    It does this by serving from a deeply-nested directory that doesn't exist.
 
1039
    """
 
1040
 
 
1041
    def setUp(self, backing_server=None):
 
1042
        self._server_homedir = '/dev/noone/runs/tests/here'
 
1043
        super(SFTPSiblingAbsoluteServer, self).setUp(backing_server)
 
1044
 
 
1045
 
 
1046
def get_test_permutations():
 
1047
    """Return the permutations to be used in testing."""
 
1048
    return [(SFTPTransport, SFTPAbsoluteServer),
 
1049
            (SFTPTransport, SFTPHomeDirServer),
 
1050
            (SFTPTransport, SFTPSiblingAbsoluteServer),
 
1051
            ]