~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Martin Pool
  • Date: 2010-09-15 10:40:51 UTC
  • mfrom: (5425 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5426.
  • Revision ID: mbp@sourcefrog.net-20100915104051-pkjmuc2bc27buysp
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Implementation of Transport over SFTP, using paramiko."""
 
18
 
 
19
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
20
# then raise TransportNotPossible, which will break remote access to any
 
21
# formats which rely on OS-level locks.  That should be fine as those formats
 
22
# are pretty old, but these combinations may have to be removed from the test
 
23
# suite.  Those formats all date back to 0.7; so we should be able to remove
 
24
# these methods when we officially drop support for those formats.
 
25
 
 
26
import bisect
 
27
import errno
 
28
import itertools
 
29
import os
 
30
import random
 
31
import stat
 
32
import sys
 
33
import time
 
34
import urllib
 
35
import urlparse
 
36
import warnings
 
37
 
 
38
from bzrlib import (
 
39
    config,
 
40
    debug,
 
41
    errors,
 
42
    urlutils,
 
43
    )
 
44
from bzrlib.errors import (FileExists,
 
45
                           NoSuchFile, PathNotChild,
 
46
                           TransportError,
 
47
                           LockError,
 
48
                           PathError,
 
49
                           ParamikoNotPresent,
 
50
                           )
 
51
from bzrlib.osutils import pathjoin, fancy_rename, getcwd
 
52
from bzrlib.symbol_versioning import (
 
53
        deprecated_function,
 
54
        )
 
55
from bzrlib.trace import mutter, warning
 
56
from bzrlib.transport import (
 
57
    FileFileStream,
 
58
    _file_streams,
 
59
    local,
 
60
    Server,
 
61
    ssh,
 
62
    ConnectedTransport,
 
63
    )
 
64
 
 
65
# Disable one particular warning that comes from paramiko in Python2.5; if
 
66
# this is emitted at the wrong time it tends to cause spurious test failures
 
67
# or at least noise in the test case::
 
68
#
 
69
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
 
70
# test_permissions.TestSftpPermissions.test_new_files
 
71
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
 
72
#  self.packet.write(struct.pack('>I', n))
 
73
warnings.filterwarnings('ignore',
 
74
        'integer argument expected, got float',
 
75
        category=DeprecationWarning,
 
76
        module='paramiko.message')
 
77
 
 
78
try:
 
79
    import paramiko
 
80
except ImportError, e:
 
81
    raise ParamikoNotPresent(e)
 
82
else:
 
83
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
84
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
85
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
 
86
    from paramiko.sftp_attr import SFTPAttributes
 
87
    from paramiko.sftp_file import SFTPFile
 
88
 
 
89
 
 
90
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
91
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
92
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
93
 
 
94
 
 
95
class SFTPLock(object):
 
96
    """This fakes a lock in a remote location.
 
97
 
 
98
    A present lock is indicated just by the existence of a file.  This
 
99
    doesn't work well on all transports and they are only used in
 
100
    deprecated storage formats.
 
101
    """
 
102
 
 
103
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
104
 
 
105
    def __init__(self, path, transport):
 
106
        self.lock_file = None
 
107
        self.path = path
 
108
        self.lock_path = path + '.write-lock'
 
109
        self.transport = transport
 
110
        try:
 
111
            # RBC 20060103 FIXME should we be using private methods here ?
 
112
            abspath = transport._remote_path(self.lock_path)
 
113
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
114
        except FileExists:
 
115
            raise LockError('File %r already locked' % (self.path,))
 
116
 
 
117
    def __del__(self):
 
118
        """Should this warn, or actually try to cleanup?"""
 
119
        if self.lock_file:
 
120
            warning("SFTPLock %r not explicitly unlocked" % (self.path,))
 
121
            self.unlock()
 
122
 
 
123
    def unlock(self):
 
124
        if not self.lock_file:
 
125
            return
 
126
        self.lock_file.close()
 
127
        self.lock_file = None
 
128
        try:
 
129
            self.transport.delete(self.lock_path)
 
130
        except (NoSuchFile,):
 
131
            # What specific errors should we catch here?
 
132
            pass
 
133
 
 
134
 
 
135
class _SFTPReadvHelper(object):
 
136
    """A class to help with managing the state of a readv request."""
 
137
 
 
138
    # See _get_requests for an explanation.
 
139
    _max_request_size = 32768
 
140
 
 
141
    def __init__(self, original_offsets, relpath, _report_activity):
 
142
        """Create a new readv helper.
 
143
 
 
144
        :param original_offsets: The original requests given by the caller of
 
145
            readv()
 
146
        :param relpath: The name of the file (if known)
 
147
        :param _report_activity: A Transport._report_activity bound method,
 
148
            to be called as data arrives.
 
149
        """
 
150
        self.original_offsets = list(original_offsets)
 
151
        self.relpath = relpath
 
152
        self._report_activity = _report_activity
 
153
 
 
154
    def _get_requests(self):
 
155
        """Break up the offsets into individual requests over sftp.
 
156
 
 
157
        The SFTP spec only requires implementers to support 32kB requests. We
 
158
        could try something larger (openssh supports 64kB), but then we have to
 
159
        handle requests that fail.
 
160
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
161
        asyncronously requests them.
 
162
        Newer versions of paramiko would do the chunking for us, but we want to
 
163
        start processing results right away, so we do it ourselves.
 
164
        """
 
165
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
166
        #       data.  I'm not 100% sure that is the best choice.
 
167
 
 
168
        # The first thing we do, is to collapse the individual requests as much
 
169
        # as possible, so we don't issues requests <32kB
 
170
        sorted_offsets = sorted(self.original_offsets)
 
171
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
172
                                                        limit=0, fudge_factor=0))
 
173
        requests = []
 
174
        for c_offset in coalesced:
 
175
            start = c_offset.start
 
176
            size = c_offset.length
 
177
 
 
178
            # Break this up into 32kB requests
 
179
            while size > 0:
 
180
                next_size = min(size, self._max_request_size)
 
181
                requests.append((start, next_size))
 
182
                size -= next_size
 
183
                start += next_size
 
184
        if 'sftp' in debug.debug_flags:
 
185
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
186
                self.relpath, len(sorted_offsets), len(coalesced),
 
187
                len(requests))
 
188
        return requests
 
189
 
 
190
    def request_and_yield_offsets(self, fp):
 
191
        """Request the data from the remote machine, yielding the results.
 
192
 
 
193
        :param fp: A Paramiko SFTPFile object that supports readv.
 
194
        :return: Yield the data requested by the original readv caller, one by
 
195
            one.
 
196
        """
 
197
        requests = self._get_requests()
 
198
        offset_iter = iter(self.original_offsets)
 
199
        cur_offset, cur_size = offset_iter.next()
 
200
        # paramiko .readv() yields strings that are in the order of the requests
 
201
        # So we track the current request to know where the next data is
 
202
        # being returned from.
 
203
        input_start = None
 
204
        last_end = None
 
205
        buffered_data = []
 
206
        buffered_len = 0
 
207
 
 
208
        # This is used to buffer chunks which we couldn't process yet
 
209
        # It is (start, end, data) tuples.
 
210
        data_chunks = []
 
211
        # Create an 'unlimited' data stream, so we stop based on requests,
 
212
        # rather than just because the data stream ended. This lets us detect
 
213
        # short readv.
 
214
        data_stream = itertools.chain(fp.readv(requests),
 
215
                                      itertools.repeat(None))
 
216
        for (start, length), data in itertools.izip(requests, data_stream):
 
217
            if data is None:
 
218
                if cur_coalesced is not None:
 
219
                    raise errors.ShortReadvError(self.relpath,
 
220
                        start, length, len(data))
 
221
            if len(data) != length:
 
222
                raise errors.ShortReadvError(self.relpath,
 
223
                    start, length, len(data))
 
224
            self._report_activity(length, 'read')
 
225
            if last_end is None:
 
226
                # This is the first request, just buffer it
 
227
                buffered_data = [data]
 
228
                buffered_len = length
 
229
                input_start = start
 
230
            elif start == last_end:
 
231
                # The data we are reading fits neatly on the previous
 
232
                # buffer, so this is all part of a larger coalesced range.
 
233
                buffered_data.append(data)
 
234
                buffered_len += length
 
235
            else:
 
236
                # We have an 'interrupt' in the data stream. So we know we are
 
237
                # at a request boundary.
 
238
                if buffered_len > 0:
 
239
                    # We haven't consumed the buffer so far, so put it into
 
240
                    # data_chunks, and continue.
 
241
                    buffered = ''.join(buffered_data)
 
242
                    data_chunks.append((input_start, buffered))
 
243
                input_start = start
 
244
                buffered_data = [data]
 
245
                buffered_len = length
 
246
            last_end = start + length
 
247
            if input_start == cur_offset and cur_size <= buffered_len:
 
248
                # Simplify the next steps a bit by transforming buffered_data
 
249
                # into a single string. We also have the nice property that
 
250
                # when there is only one string ''.join([x]) == x, so there is
 
251
                # no data copying.
 
252
                buffered = ''.join(buffered_data)
 
253
                # Clean out buffered data so that we keep memory
 
254
                # consumption low
 
255
                del buffered_data[:]
 
256
                buffered_offset = 0
 
257
                # TODO: We *could* also consider the case where cur_offset is in
 
258
                #       in the buffered range, even though it doesn't *start*
 
259
                #       the buffered range. But for packs we pretty much always
 
260
                #       read in order, so you won't get any extra data in the
 
261
                #       middle.
 
262
                while (input_start == cur_offset
 
263
                       and (buffered_offset + cur_size) <= buffered_len):
 
264
                    # We've buffered enough data to process this request, spit it
 
265
                    # out
 
266
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
267
                    # move the direct pointer into our buffered data
 
268
                    buffered_offset += cur_size
 
269
                    # Move the start-of-buffer pointer
 
270
                    input_start += cur_size
 
271
                    # Yield the requested data
 
272
                    yield cur_offset, cur_data
 
273
                    cur_offset, cur_size = offset_iter.next()
 
274
                # at this point, we've consumed as much of buffered as we can,
 
275
                # so break off the portion that we consumed
 
276
                if buffered_offset == len(buffered_data):
 
277
                    # No tail to leave behind
 
278
                    buffered_data = []
 
279
                    buffered_len = 0
 
280
                else:
 
281
                    buffered = buffered[buffered_offset:]
 
282
                    buffered_data = [buffered]
 
283
                    buffered_len = len(buffered)
 
284
        if buffered_len:
 
285
            buffered = ''.join(buffered_data)
 
286
            del buffered_data[:]
 
287
            data_chunks.append((input_start, buffered))
 
288
        if data_chunks:
 
289
            if 'sftp' in debug.debug_flags:
 
290
                mutter('SFTP readv left with %d out-of-order bytes',
 
291
                    sum(map(lambda x: len(x[1]), data_chunks)))
 
292
            # We've processed all the readv data, at this point, anything we
 
293
            # couldn't process is in data_chunks. This doesn't happen often, so
 
294
            # this code path isn't optimized
 
295
            # We use an interesting process for data_chunks
 
296
            # Specifically if we have "bisect_left([(start, len, entries)],
 
297
            #                                       (qstart,)])
 
298
            # If start == qstart, then we get the specific node. Otherwise we
 
299
            # get the previous node
 
300
            while True:
 
301
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
302
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
 
303
                    # The data starts here
 
304
                    data = data_chunks[idx][1][:cur_size]
 
305
                elif idx > 0:
 
306
                    # The data is in a portion of a previous page
 
307
                    idx -= 1
 
308
                    sub_offset = cur_offset - data_chunks[idx][0]
 
309
                    data = data_chunks[idx][1]
 
310
                    data = data[sub_offset:sub_offset + cur_size]
 
311
                else:
 
312
                    # We are missing the page where the data should be found,
 
313
                    # something is wrong
 
314
                    data = ''
 
315
                if len(data) != cur_size:
 
316
                    raise AssertionError('We must have miscalulated.'
 
317
                        ' We expected %d bytes, but only found %d'
 
318
                        % (cur_size, len(data)))
 
319
                yield cur_offset, data
 
320
                cur_offset, cur_size = offset_iter.next()
 
321
 
 
322
 
 
323
class SFTPTransport(ConnectedTransport):
 
324
    """Transport implementation for SFTP access."""
 
325
 
 
326
    _do_prefetch = _default_do_prefetch
 
327
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
328
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
329
    #       but the performance curve is pretty flat, so just going with
 
330
    #       reasonable defaults.
 
331
    _max_readv_combine = 200
 
332
    # Having to round trip to the server means waiting for a response,
 
333
    # so it is better to download extra bytes.
 
334
    # 8KiB had good performance for both local and remote network operations
 
335
    _bytes_to_read_before_seek = 8192
 
336
 
 
337
    # The sftp spec says that implementations SHOULD allow reads
 
338
    # to be at least 32K. paramiko.readv() does an async request
 
339
    # for the chunks. So we need to keep it within a single request
 
340
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
341
    # up the request itself, rather than us having to worry about it
 
342
    _max_request_size = 32768
 
343
 
 
344
    def __init__(self, base, _from_transport=None):
 
345
        super(SFTPTransport, self).__init__(base,
 
346
                                            _from_transport=_from_transport)
 
347
 
 
348
    def _remote_path(self, relpath):
 
349
        """Return the path to be passed along the sftp protocol for relpath.
 
350
 
 
351
        :param relpath: is a urlencoded string.
 
352
        """
 
353
        relative = urlutils.unescape(relpath).encode('utf-8')
 
354
        remote_path = self._combine_paths(self._path, relative)
 
355
        # the initial slash should be removed from the path, and treated as a
 
356
        # homedir relative path (the path begins with a double slash if it is
 
357
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
358
        # RBC 20060118 we are not using this as its too user hostile. instead
 
359
        # we are following lftp and using /~/foo to mean '~/foo'
 
360
        # vila--20070602 and leave absolute paths begin with a single slash.
 
361
        if remote_path.startswith('/~/'):
 
362
            remote_path = remote_path[3:]
 
363
        elif remote_path == '/~':
 
364
            remote_path = ''
 
365
        return remote_path
 
366
 
 
367
    def _create_connection(self, credentials=None):
 
368
        """Create a new connection with the provided credentials.
 
369
 
 
370
        :param credentials: The credentials needed to establish the connection.
 
371
 
 
372
        :return: The created connection and its associated credentials.
 
373
 
 
374
        The credentials are only the password as it may have been entered
 
375
        interactively by the user and may be different from the one provided
 
376
        in base url at transport creation time.
 
377
        """
 
378
        if credentials is None:
 
379
            password = self._password
 
380
        else:
 
381
            password = credentials
 
382
 
 
383
        vendor = ssh._get_ssh_vendor()
 
384
        user = self._user
 
385
        if user is None:
 
386
            auth = config.AuthenticationConfig()
 
387
            user = auth.get_user('ssh', self._host, self._port)
 
388
        connection = vendor.connect_sftp(self._user, password,
 
389
                                         self._host, self._port)
 
390
        return connection, (user, password)
 
391
 
 
392
    def disconnect(self):
 
393
        connection = self._get_connection()
 
394
        if connection is not None:
 
395
            connection.close()
 
396
 
 
397
    def _get_sftp(self):
 
398
        """Ensures that a connection is established"""
 
399
        connection = self._get_connection()
 
400
        if connection is None:
 
401
            # First connection ever
 
402
            connection, credentials = self._create_connection()
 
403
            self._set_connection(connection, credentials)
 
404
        return connection
 
405
 
 
406
    def has(self, relpath):
 
407
        """
 
408
        Does the target location exist?
 
409
        """
 
410
        try:
 
411
            self._get_sftp().stat(self._remote_path(relpath))
 
412
            # stat result is about 20 bytes, let's say
 
413
            self._report_activity(20, 'read')
 
414
            return True
 
415
        except IOError:
 
416
            return False
 
417
 
 
418
    def get(self, relpath):
 
419
        """Get the file at the given relative path.
 
420
 
 
421
        :param relpath: The relative path to the file
 
422
        """
 
423
        try:
 
424
            # FIXME: by returning the file directly, we don't pass this
 
425
            # through to report_activity.  We could try wrapping the object
 
426
            # before it's returned.  For readv and get_bytes it's handled in
 
427
            # the higher-level function.
 
428
            # -- mbp 20090126
 
429
            path = self._remote_path(relpath)
 
430
            f = self._get_sftp().file(path, mode='rb')
 
431
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
432
                f.prefetch()
 
433
            return f
 
434
        except (IOError, paramiko.SSHException), e:
 
435
            self._translate_io_exception(e, path, ': error retrieving',
 
436
                failure_exc=errors.ReadError)
 
437
 
 
438
    def get_bytes(self, relpath):
 
439
        # reimplement this here so that we can report how many bytes came back
 
440
        f = self.get(relpath)
 
441
        try:
 
442
            bytes = f.read()
 
443
            self._report_activity(len(bytes), 'read')
 
444
            return bytes
 
445
        finally:
 
446
            f.close()
 
447
 
 
448
    def _readv(self, relpath, offsets):
 
449
        """See Transport.readv()"""
 
450
        # We overload the default readv() because we want to use a file
 
451
        # that does not have prefetch enabled.
 
452
        # Also, if we have a new paramiko, it implements an async readv()
 
453
        if not offsets:
 
454
            return
 
455
 
 
456
        try:
 
457
            path = self._remote_path(relpath)
 
458
            fp = self._get_sftp().file(path, mode='rb')
 
459
            readv = getattr(fp, 'readv', None)
 
460
            if readv:
 
461
                return self._sftp_readv(fp, offsets, relpath)
 
462
            if 'sftp' in debug.debug_flags:
 
463
                mutter('seek and read %s offsets', len(offsets))
 
464
            return self._seek_and_read(fp, offsets, relpath)
 
465
        except (IOError, paramiko.SSHException), e:
 
466
            self._translate_io_exception(e, path, ': error retrieving')
 
467
 
 
468
    def recommended_page_size(self):
 
469
        """See Transport.recommended_page_size().
 
470
 
 
471
        For SFTP we suggest a large page size to reduce the overhead
 
472
        introduced by latency.
 
473
        """
 
474
        return 64 * 1024
 
475
 
 
476
    def _sftp_readv(self, fp, offsets, relpath):
 
477
        """Use the readv() member of fp to do async readv.
 
478
 
 
479
        Then read them using paramiko.readv(). paramiko.readv()
 
480
        does not support ranges > 64K, so it caps the request size, and
 
481
        just reads until it gets all the stuff it wants.
 
482
        """
 
483
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
 
484
        return helper.request_and_yield_offsets(fp)
 
485
 
 
486
    def put_file(self, relpath, f, mode=None):
 
487
        """
 
488
        Copy the file-like object into the location.
 
489
 
 
490
        :param relpath: Location to put the contents, relative to base.
 
491
        :param f:       File-like object.
 
492
        :param mode: The final mode for the file
 
493
        """
 
494
        final_path = self._remote_path(relpath)
 
495
        return self._put(final_path, f, mode=mode)
 
496
 
 
497
    def _put(self, abspath, f, mode=None):
 
498
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
499
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
500
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
501
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
502
        closed = False
 
503
        try:
 
504
            try:
 
505
                fout.set_pipelined(True)
 
506
                length = self._pump(f, fout)
 
507
            except (IOError, paramiko.SSHException), e:
 
508
                self._translate_io_exception(e, tmp_abspath)
 
509
            # XXX: This doesn't truly help like we would like it to.
 
510
            #      The problem is that openssh strips sticky bits. So while we
 
511
            #      can properly set group write permission, we lose the group
 
512
            #      sticky bit. So it is probably best to stop chmodding, and
 
513
            #      just tell users that they need to set the umask correctly.
 
514
            #      The attr.st_mode = mode, in _sftp_open_exclusive
 
515
            #      will handle when the user wants the final mode to be more
 
516
            #      restrictive. And then we avoid a round trip. Unless
 
517
            #      paramiko decides to expose an async chmod()
 
518
 
 
519
            # This is designed to chmod() right before we close.
 
520
            # Because we set_pipelined() earlier, theoretically we might
 
521
            # avoid the round trip for fout.close()
 
522
            if mode is not None:
 
523
                self._get_sftp().chmod(tmp_abspath, mode)
 
524
            fout.close()
 
525
            closed = True
 
526
            self._rename_and_overwrite(tmp_abspath, abspath)
 
527
            return length
 
528
        except Exception, e:
 
529
            # If we fail, try to clean up the temporary file
 
530
            # before we throw the exception
 
531
            # but don't let another exception mess things up
 
532
            # Write out the traceback, because otherwise
 
533
            # the catch and throw destroys it
 
534
            import traceback
 
535
            mutter(traceback.format_exc())
 
536
            try:
 
537
                if not closed:
 
538
                    fout.close()
 
539
                self._get_sftp().remove(tmp_abspath)
 
540
            except:
 
541
                # raise the saved except
 
542
                raise e
 
543
            # raise the original with its traceback if we can.
 
544
            raise
 
545
 
 
546
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
 
547
                               create_parent_dir=False,
 
548
                               dir_mode=None):
 
549
        abspath = self._remote_path(relpath)
 
550
 
 
551
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
552
        #       set the file mode at create time. If it does, use it.
 
553
        #       But for now, we just chmod later anyway.
 
554
 
 
555
        def _open_and_write_file():
 
556
            """Try to open the target file, raise error on failure"""
 
557
            fout = None
 
558
            try:
 
559
                try:
 
560
                    fout = self._get_sftp().file(abspath, mode='wb')
 
561
                    fout.set_pipelined(True)
 
562
                    writer(fout)
 
563
                except (paramiko.SSHException, IOError), e:
 
564
                    self._translate_io_exception(e, abspath,
 
565
                                                 ': unable to open')
 
566
 
 
567
                # This is designed to chmod() right before we close.
 
568
                # Because we set_pipelined() earlier, theoretically we might
 
569
                # avoid the round trip for fout.close()
 
570
                if mode is not None:
 
571
                    self._get_sftp().chmod(abspath, mode)
 
572
            finally:
 
573
                if fout is not None:
 
574
                    fout.close()
 
575
 
 
576
        if not create_parent_dir:
 
577
            _open_and_write_file()
 
578
            return
 
579
 
 
580
        # Try error handling to create the parent directory if we need to
 
581
        try:
 
582
            _open_and_write_file()
 
583
        except NoSuchFile:
 
584
            # Try to create the parent directory, and then go back to
 
585
            # writing the file
 
586
            parent_dir = os.path.dirname(abspath)
 
587
            self._mkdir(parent_dir, dir_mode)
 
588
            _open_and_write_file()
 
589
 
 
590
    def put_file_non_atomic(self, relpath, f, mode=None,
 
591
                            create_parent_dir=False,
 
592
                            dir_mode=None):
 
593
        """Copy the file-like object into the target location.
 
594
 
 
595
        This function is not strictly safe to use. It is only meant to
 
596
        be used when you already know that the target does not exist.
 
597
        It is not safe, because it will open and truncate the remote
 
598
        file. So there may be a time when the file has invalid contents.
 
599
 
 
600
        :param relpath: The remote location to put the contents.
 
601
        :param f:       File-like object.
 
602
        :param mode:    Possible access permissions for new file.
 
603
                        None means do not set remote permissions.
 
604
        :param create_parent_dir: If we cannot create the target file because
 
605
                        the parent directory does not exist, go ahead and
 
606
                        create it, and then try again.
 
607
        """
 
608
        def writer(fout):
 
609
            self._pump(f, fout)
 
610
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
611
                                    create_parent_dir=create_parent_dir,
 
612
                                    dir_mode=dir_mode)
 
613
 
 
614
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
615
                             create_parent_dir=False,
 
616
                             dir_mode=None):
 
617
        def writer(fout):
 
618
            fout.write(bytes)
 
619
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
620
                                    create_parent_dir=create_parent_dir,
 
621
                                    dir_mode=dir_mode)
 
622
 
 
623
    def iter_files_recursive(self):
 
624
        """Walk the relative paths of all files in this transport."""
 
625
        # progress is handled by list_dir
 
626
        queue = list(self.list_dir('.'))
 
627
        while queue:
 
628
            relpath = queue.pop(0)
 
629
            st = self.stat(relpath)
 
630
            if stat.S_ISDIR(st.st_mode):
 
631
                for i, basename in enumerate(self.list_dir(relpath)):
 
632
                    queue.insert(i, relpath+'/'+basename)
 
633
            else:
 
634
                yield relpath
 
635
 
 
636
    def _mkdir(self, abspath, mode=None):
 
637
        if mode is None:
 
638
            local_mode = 0777
 
639
        else:
 
640
            local_mode = mode
 
641
        try:
 
642
            self._report_activity(len(abspath), 'write')
 
643
            self._get_sftp().mkdir(abspath, local_mode)
 
644
            self._report_activity(1, 'read')
 
645
            if mode is not None:
 
646
                # chmod a dir through sftp will erase any sgid bit set
 
647
                # on the server side.  So, if the bit mode are already
 
648
                # set, avoid the chmod.  If the mode is not fine but
 
649
                # the sgid bit is set, report a warning to the user
 
650
                # with the umask fix.
 
651
                stat = self._get_sftp().lstat(abspath)
 
652
                mode = mode & 0777 # can't set special bits anyway
 
653
                if mode != stat.st_mode & 0777:
 
654
                    if stat.st_mode & 06000:
 
655
                        warning('About to chmod %s over sftp, which will result'
 
656
                                ' in its suid or sgid bits being cleared.  If'
 
657
                                ' you want to preserve those bits, change your '
 
658
                                ' environment on the server to use umask 0%03o.'
 
659
                                % (abspath, 0777 - mode))
 
660
                    self._get_sftp().chmod(abspath, mode=mode)
 
661
        except (paramiko.SSHException, IOError), e:
 
662
            self._translate_io_exception(e, abspath, ': unable to mkdir',
 
663
                failure_exc=FileExists)
 
664
 
 
665
    def mkdir(self, relpath, mode=None):
 
666
        """Create a directory at the given path."""
 
667
        self._mkdir(self._remote_path(relpath), mode=mode)
 
668
 
 
669
    def open_write_stream(self, relpath, mode=None):
 
670
        """See Transport.open_write_stream."""
 
671
        # initialise the file to zero-length
 
672
        # this is three round trips, but we don't use this
 
673
        # api more than once per write_group at the moment so
 
674
        # it is a tolerable overhead. Better would be to truncate
 
675
        # the file after opening. RBC 20070805
 
676
        self.put_bytes_non_atomic(relpath, "", mode)
 
677
        abspath = self._remote_path(relpath)
 
678
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
679
        #       set the file mode at create time. If it does, use it.
 
680
        #       But for now, we just chmod later anyway.
 
681
        handle = None
 
682
        try:
 
683
            handle = self._get_sftp().file(abspath, mode='wb')
 
684
            handle.set_pipelined(True)
 
685
        except (paramiko.SSHException, IOError), e:
 
686
            self._translate_io_exception(e, abspath,
 
687
                                         ': unable to open')
 
688
        _file_streams[self.abspath(relpath)] = handle
 
689
        return FileFileStream(self, relpath, handle)
 
690
 
 
691
    def _translate_io_exception(self, e, path, more_info='',
 
692
                                failure_exc=PathError):
 
693
        """Translate a paramiko or IOError into a friendlier exception.
 
694
 
 
695
        :param e: The original exception
 
696
        :param path: The path in question when the error is raised
 
697
        :param more_info: Extra information that can be included,
 
698
                          such as what was going on
 
699
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
700
                           opaque errors that just set "e.args = ('Failure',)" with
 
701
                           no more information.
 
702
                           If this parameter is set, it defines the exception
 
703
                           to raise in these cases.
 
704
        """
 
705
        # paramiko seems to generate detailless errors.
 
706
        self._translate_error(e, path, raise_generic=False)
 
707
        if getattr(e, 'args', None) is not None:
 
708
            if (e.args == ('No such file or directory',) or
 
709
                e.args == ('No such file',)):
 
710
                raise NoSuchFile(path, str(e) + more_info)
 
711
            if (e.args == ('mkdir failed',) or
 
712
                e.args[0].startswith('syserr: File exists')):
 
713
                raise FileExists(path, str(e) + more_info)
 
714
            # strange but true, for the paramiko server.
 
715
            if (e.args == ('Failure',)):
 
716
                raise failure_exc(path, str(e) + more_info)
 
717
            # Can be something like args = ('Directory not empty:
 
718
            # '/srv/bazaar.launchpad.net/blah...: '
 
719
            # [Errno 39] Directory not empty',)
 
720
            if (e.args[0].startswith('Directory not empty: ')
 
721
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
 
722
                raise errors.DirectoryNotEmpty(path, str(e))
 
723
            if e.args == ('Operation unsupported',):
 
724
                raise errors.TransportNotPossible()
 
725
            mutter('Raising exception with args %s', e.args)
 
726
        if getattr(e, 'errno', None) is not None:
 
727
            mutter('Raising exception with errno %s', e.errno)
 
728
        raise e
 
729
 
 
730
    def append_file(self, relpath, f, mode=None):
 
731
        """
 
732
        Append the text in the file-like object into the final
 
733
        location.
 
734
        """
 
735
        try:
 
736
            path = self._remote_path(relpath)
 
737
            fout = self._get_sftp().file(path, 'ab')
 
738
            if mode is not None:
 
739
                self._get_sftp().chmod(path, mode)
 
740
            result = fout.tell()
 
741
            self._pump(f, fout)
 
742
            return result
 
743
        except (IOError, paramiko.SSHException), e:
 
744
            self._translate_io_exception(e, relpath, ': unable to append')
 
745
 
 
746
    def rename(self, rel_from, rel_to):
 
747
        """Rename without special overwriting"""
 
748
        try:
 
749
            self._get_sftp().rename(self._remote_path(rel_from),
 
750
                              self._remote_path(rel_to))
 
751
        except (IOError, paramiko.SSHException), e:
 
752
            self._translate_io_exception(e, rel_from,
 
753
                    ': unable to rename to %r' % (rel_to))
 
754
 
 
755
    def _rename_and_overwrite(self, abs_from, abs_to):
 
756
        """Do a fancy rename on the remote server.
 
757
 
 
758
        Using the implementation provided by osutils.
 
759
        """
 
760
        try:
 
761
            sftp = self._get_sftp()
 
762
            fancy_rename(abs_from, abs_to,
 
763
                         rename_func=sftp.rename,
 
764
                         unlink_func=sftp.remove)
 
765
        except (IOError, paramiko.SSHException), e:
 
766
            self._translate_io_exception(e, abs_from,
 
767
                                         ': unable to rename to %r' % (abs_to))
 
768
 
 
769
    def move(self, rel_from, rel_to):
 
770
        """Move the item at rel_from to the location at rel_to"""
 
771
        path_from = self._remote_path(rel_from)
 
772
        path_to = self._remote_path(rel_to)
 
773
        self._rename_and_overwrite(path_from, path_to)
 
774
 
 
775
    def delete(self, relpath):
 
776
        """Delete the item at relpath"""
 
777
        path = self._remote_path(relpath)
 
778
        try:
 
779
            self._get_sftp().remove(path)
 
780
        except (IOError, paramiko.SSHException), e:
 
781
            self._translate_io_exception(e, path, ': unable to delete')
 
782
 
 
783
    def external_url(self):
 
784
        """See bzrlib.transport.Transport.external_url."""
 
785
        # the external path for SFTP is the base
 
786
        return self.base
 
787
 
 
788
    def listable(self):
 
789
        """Return True if this store supports listing."""
 
790
        return True
 
791
 
 
792
    def list_dir(self, relpath):
 
793
        """
 
794
        Return a list of all files at the given location.
 
795
        """
 
796
        # does anything actually use this?
 
797
        # -- Unknown
 
798
        # This is at least used by copy_tree for remote upgrades.
 
799
        # -- David Allouche 2006-08-11
 
800
        path = self._remote_path(relpath)
 
801
        try:
 
802
            entries = self._get_sftp().listdir(path)
 
803
            self._report_activity(sum(map(len, entries)), 'read')
 
804
        except (IOError, paramiko.SSHException), e:
 
805
            self._translate_io_exception(e, path, ': failed to list_dir')
 
806
        return [urlutils.escape(entry) for entry in entries]
 
807
 
 
808
    def rmdir(self, relpath):
 
809
        """See Transport.rmdir."""
 
810
        path = self._remote_path(relpath)
 
811
        try:
 
812
            return self._get_sftp().rmdir(path)
 
813
        except (IOError, paramiko.SSHException), e:
 
814
            self._translate_io_exception(e, path, ': failed to rmdir')
 
815
 
 
816
    def stat(self, relpath):
 
817
        """Return the stat information for a file."""
 
818
        path = self._remote_path(relpath)
 
819
        try:
 
820
            return self._get_sftp().lstat(path)
 
821
        except (IOError, paramiko.SSHException), e:
 
822
            self._translate_io_exception(e, path, ': unable to stat')
 
823
 
 
824
    def readlink(self, relpath):
 
825
        """See Transport.readlink."""
 
826
        path = self._remote_path(relpath)
 
827
        try:
 
828
            return self._get_sftp().readlink(path)
 
829
        except (IOError, paramiko.SSHException), e:
 
830
            self._translate_io_exception(e, path, ': unable to readlink')
 
831
 
 
832
    def symlink(self, source, link_name):
 
833
        """See Transport.symlink."""
 
834
        try:
 
835
            conn = self._get_sftp()
 
836
            sftp_retval = conn.symlink(source, link_name)
 
837
            if SFTP_OK != sftp_retval:
 
838
                raise TransportError(
 
839
                    '%r: unable to create symlink to %r' % (link_name, source),
 
840
                    sftp_retval
 
841
                )
 
842
        except (IOError, paramiko.SSHException), e:
 
843
            self._translate_io_exception(e, link_name,
 
844
                                         ': unable to create symlink to %r' % (source))
 
845
 
 
846
    def lock_read(self, relpath):
 
847
        """
 
848
        Lock the given file for shared (read) access.
 
849
        :return: A lock object, which has an unlock() member function
 
850
        """
 
851
        # FIXME: there should be something clever i can do here...
 
852
        class BogusLock(object):
 
853
            def __init__(self, path):
 
854
                self.path = path
 
855
            def unlock(self):
 
856
                pass
 
857
        return BogusLock(relpath)
 
858
 
 
859
    def lock_write(self, relpath):
 
860
        """
 
861
        Lock the given file for exclusive (write) access.
 
862
        WARNING: many transports do not support this, so trying avoid using it
 
863
 
 
864
        :return: A lock object, which has an unlock() member function
 
865
        """
 
866
        # This is a little bit bogus, but basically, we create a file
 
867
        # which should not already exist, and if it does, we assume
 
868
        # that there is a lock, and if it doesn't, the we assume
 
869
        # that we have taken the lock.
 
870
        return SFTPLock(relpath, self)
 
871
 
 
872
    def _sftp_open_exclusive(self, abspath, mode=None):
 
873
        """Open a remote path exclusively.
 
874
 
 
875
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
876
        the file already exists. However it does not expose this
 
877
        at the higher level of SFTPClient.open(), so we have to
 
878
        sneak away with it.
 
879
 
 
880
        WARNING: This breaks the SFTPClient abstraction, so it
 
881
        could easily break against an updated version of paramiko.
 
882
 
 
883
        :param abspath: The remote absolute path where the file should be opened
 
884
        :param mode: The mode permissions bits for the new file
 
885
        """
 
886
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
 
887
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
 
888
        #       However, there is no way to set the permission mode at open
 
889
        #       time using the sftp_client.file() functionality.
 
890
        path = self._get_sftp()._adjust_cwd(abspath)
 
891
        # mutter('sftp abspath %s => %s', abspath, path)
 
892
        attr = SFTPAttributes()
 
893
        if mode is not None:
 
894
            attr.st_mode = mode
 
895
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
 
896
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
897
        try:
 
898
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
 
899
            if t != CMD_HANDLE:
 
900
                raise TransportError('Expected an SFTP handle')
 
901
            handle = msg.get_string()
 
902
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
 
903
        except (paramiko.SSHException, IOError), e:
 
904
            self._translate_io_exception(e, abspath, ': unable to open',
 
905
                failure_exc=FileExists)
 
906
 
 
907
    def _can_roundtrip_unix_modebits(self):
 
908
        if sys.platform == 'win32':
 
909
            # anyone else?
 
910
            return False
 
911
        else:
 
912
            return True
 
913
 
 
914
 
 
915
def get_test_permutations():
 
916
    """Return the permutations to be used in testing."""
 
917
    from bzrlib.tests import stub_sftp
 
918
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
 
919
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
 
920
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
 
921
            ]