~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: Martin Packman
  • Date: 2011-11-24 17:01:07 UTC
  • mto: This revision was merged to the branch mainline in revision 6304.
  • Revision ID: martin.packman@canonical.com-20111124170107-b3yd5vkzdglmnjk7
Allow a bracketed suffix in option help test

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Implementation of Transport over SFTP, using paramiko."""
 
18
 
 
19
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
 
20
# then raise TransportNotPossible, which will break remote access to any
 
21
# formats which rely on OS-level locks.  That should be fine as those formats
 
22
# are pretty old, but these combinations may have to be removed from the test
 
23
# suite.  Those formats all date back to 0.7; so we should be able to remove
 
24
# these methods when we officially drop support for those formats.
 
25
 
 
26
import bisect
 
27
import errno
 
28
import itertools
 
29
import os
 
30
import random
 
31
import stat
 
32
import sys
 
33
import time
 
34
import warnings
 
35
 
 
36
from bzrlib import (
 
37
    config,
 
38
    debug,
 
39
    errors,
 
40
    urlutils,
 
41
    )
 
42
from bzrlib.errors import (FileExists,
 
43
                           NoSuchFile,
 
44
                           TransportError,
 
45
                           LockError,
 
46
                           PathError,
 
47
                           ParamikoNotPresent,
 
48
                           )
 
49
from bzrlib.osutils import fancy_rename
 
50
from bzrlib.trace import mutter, warning
 
51
from bzrlib.transport import (
 
52
    FileFileStream,
 
53
    _file_streams,
 
54
    ssh,
 
55
    ConnectedTransport,
 
56
    )
 
57
 
 
58
# Disable one particular warning that comes from paramiko in Python2.5; if
 
59
# this is emitted at the wrong time it tends to cause spurious test failures
 
60
# or at least noise in the test case::
 
61
#
 
62
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
 
63
# test_permissions.TestSftpPermissions.test_new_files
 
64
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
 
65
#  self.packet.write(struct.pack('>I', n))
 
66
warnings.filterwarnings('ignore',
 
67
        'integer argument expected, got float',
 
68
        category=DeprecationWarning,
 
69
        module='paramiko.message')
 
70
 
 
71
try:
 
72
    import paramiko
 
73
except ImportError, e:
 
74
    raise ParamikoNotPresent(e)
 
75
else:
 
76
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
 
77
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
 
78
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
 
79
    from paramiko.sftp_attr import SFTPAttributes
 
80
    from paramiko.sftp_file import SFTPFile
 
81
 
 
82
 
 
83
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
 
84
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
 
85
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
 
86
 
 
87
 
 
88
class SFTPLock(object):
 
89
    """This fakes a lock in a remote location.
 
90
 
 
91
    A present lock is indicated just by the existence of a file.  This
 
92
    doesn't work well on all transports and they are only used in
 
93
    deprecated storage formats.
 
94
    """
 
95
 
 
96
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
 
97
 
 
98
    def __init__(self, path, transport):
 
99
        self.lock_file = None
 
100
        self.path = path
 
101
        self.lock_path = path + '.write-lock'
 
102
        self.transport = transport
 
103
        try:
 
104
            # RBC 20060103 FIXME should we be using private methods here ?
 
105
            abspath = transport._remote_path(self.lock_path)
 
106
            self.lock_file = transport._sftp_open_exclusive(abspath)
 
107
        except FileExists:
 
108
            raise LockError('File %r already locked' % (self.path,))
 
109
 
 
110
    def unlock(self):
 
111
        if not self.lock_file:
 
112
            return
 
113
        self.lock_file.close()
 
114
        self.lock_file = None
 
115
        try:
 
116
            self.transport.delete(self.lock_path)
 
117
        except (NoSuchFile,):
 
118
            # What specific errors should we catch here?
 
119
            pass
 
120
 
 
121
 
 
122
class _SFTPReadvHelper(object):
 
123
    """A class to help with managing the state of a readv request."""
 
124
 
 
125
    # See _get_requests for an explanation.
 
126
    _max_request_size = 32768
 
127
 
 
128
    def __init__(self, original_offsets, relpath, _report_activity):
 
129
        """Create a new readv helper.
 
130
 
 
131
        :param original_offsets: The original requests given by the caller of
 
132
            readv()
 
133
        :param relpath: The name of the file (if known)
 
134
        :param _report_activity: A Transport._report_activity bound method,
 
135
            to be called as data arrives.
 
136
        """
 
137
        self.original_offsets = list(original_offsets)
 
138
        self.relpath = relpath
 
139
        self._report_activity = _report_activity
 
140
 
 
141
    def _get_requests(self):
 
142
        """Break up the offsets into individual requests over sftp.
 
143
 
 
144
        The SFTP spec only requires implementers to support 32kB requests. We
 
145
        could try something larger (openssh supports 64kB), but then we have to
 
146
        handle requests that fail.
 
147
        So instead, we just break up our maximum chunks into 32kB chunks, and
 
148
        asyncronously requests them.
 
149
        Newer versions of paramiko would do the chunking for us, but we want to
 
150
        start processing results right away, so we do it ourselves.
 
151
        """
 
152
        # TODO: Because we issue async requests, we don't 'fudge' any extra
 
153
        #       data.  I'm not 100% sure that is the best choice.
 
154
 
 
155
        # The first thing we do, is to collapse the individual requests as much
 
156
        # as possible, so we don't issues requests <32kB
 
157
        sorted_offsets = sorted(self.original_offsets)
 
158
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
 
159
                                                        limit=0, fudge_factor=0))
 
160
        requests = []
 
161
        for c_offset in coalesced:
 
162
            start = c_offset.start
 
163
            size = c_offset.length
 
164
 
 
165
            # Break this up into 32kB requests
 
166
            while size > 0:
 
167
                next_size = min(size, self._max_request_size)
 
168
                requests.append((start, next_size))
 
169
                size -= next_size
 
170
                start += next_size
 
171
        if 'sftp' in debug.debug_flags:
 
172
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
 
173
                self.relpath, len(sorted_offsets), len(coalesced),
 
174
                len(requests))
 
175
        return requests
 
176
 
 
177
    def request_and_yield_offsets(self, fp):
 
178
        """Request the data from the remote machine, yielding the results.
 
179
 
 
180
        :param fp: A Paramiko SFTPFile object that supports readv.
 
181
        :return: Yield the data requested by the original readv caller, one by
 
182
            one.
 
183
        """
 
184
        requests = self._get_requests()
 
185
        offset_iter = iter(self.original_offsets)
 
186
        cur_offset, cur_size = offset_iter.next()
 
187
        # paramiko .readv() yields strings that are in the order of the requests
 
188
        # So we track the current request to know where the next data is
 
189
        # being returned from.
 
190
        input_start = None
 
191
        last_end = None
 
192
        buffered_data = []
 
193
        buffered_len = 0
 
194
 
 
195
        # This is used to buffer chunks which we couldn't process yet
 
196
        # It is (start, end, data) tuples.
 
197
        data_chunks = []
 
198
        # Create an 'unlimited' data stream, so we stop based on requests,
 
199
        # rather than just because the data stream ended. This lets us detect
 
200
        # short readv.
 
201
        data_stream = itertools.chain(fp.readv(requests),
 
202
                                      itertools.repeat(None))
 
203
        for (start, length), data in itertools.izip(requests, data_stream):
 
204
            if data is None:
 
205
                if cur_coalesced is not None:
 
206
                    raise errors.ShortReadvError(self.relpath,
 
207
                        start, length, len(data))
 
208
            if len(data) != length:
 
209
                raise errors.ShortReadvError(self.relpath,
 
210
                    start, length, len(data))
 
211
            self._report_activity(length, 'read')
 
212
            if last_end is None:
 
213
                # This is the first request, just buffer it
 
214
                buffered_data = [data]
 
215
                buffered_len = length
 
216
                input_start = start
 
217
            elif start == last_end:
 
218
                # The data we are reading fits neatly on the previous
 
219
                # buffer, so this is all part of a larger coalesced range.
 
220
                buffered_data.append(data)
 
221
                buffered_len += length
 
222
            else:
 
223
                # We have an 'interrupt' in the data stream. So we know we are
 
224
                # at a request boundary.
 
225
                if buffered_len > 0:
 
226
                    # We haven't consumed the buffer so far, so put it into
 
227
                    # data_chunks, and continue.
 
228
                    buffered = ''.join(buffered_data)
 
229
                    data_chunks.append((input_start, buffered))
 
230
                input_start = start
 
231
                buffered_data = [data]
 
232
                buffered_len = length
 
233
            last_end = start + length
 
234
            if input_start == cur_offset and cur_size <= buffered_len:
 
235
                # Simplify the next steps a bit by transforming buffered_data
 
236
                # into a single string. We also have the nice property that
 
237
                # when there is only one string ''.join([x]) == x, so there is
 
238
                # no data copying.
 
239
                buffered = ''.join(buffered_data)
 
240
                # Clean out buffered data so that we keep memory
 
241
                # consumption low
 
242
                del buffered_data[:]
 
243
                buffered_offset = 0
 
244
                # TODO: We *could* also consider the case where cur_offset is in
 
245
                #       in the buffered range, even though it doesn't *start*
 
246
                #       the buffered range. But for packs we pretty much always
 
247
                #       read in order, so you won't get any extra data in the
 
248
                #       middle.
 
249
                while (input_start == cur_offset
 
250
                       and (buffered_offset + cur_size) <= buffered_len):
 
251
                    # We've buffered enough data to process this request, spit it
 
252
                    # out
 
253
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
 
254
                    # move the direct pointer into our buffered data
 
255
                    buffered_offset += cur_size
 
256
                    # Move the start-of-buffer pointer
 
257
                    input_start += cur_size
 
258
                    # Yield the requested data
 
259
                    yield cur_offset, cur_data
 
260
                    cur_offset, cur_size = offset_iter.next()
 
261
                # at this point, we've consumed as much of buffered as we can,
 
262
                # so break off the portion that we consumed
 
263
                if buffered_offset == len(buffered_data):
 
264
                    # No tail to leave behind
 
265
                    buffered_data = []
 
266
                    buffered_len = 0
 
267
                else:
 
268
                    buffered = buffered[buffered_offset:]
 
269
                    buffered_data = [buffered]
 
270
                    buffered_len = len(buffered)
 
271
        # now that the data stream is done, close the handle
 
272
        fp.close()
 
273
        if buffered_len:
 
274
            buffered = ''.join(buffered_data)
 
275
            del buffered_data[:]
 
276
            data_chunks.append((input_start, buffered))
 
277
        if data_chunks:
 
278
            if 'sftp' in debug.debug_flags:
 
279
                mutter('SFTP readv left with %d out-of-order bytes',
 
280
                    sum(map(lambda x: len(x[1]), data_chunks)))
 
281
            # We've processed all the readv data, at this point, anything we
 
282
            # couldn't process is in data_chunks. This doesn't happen often, so
 
283
            # this code path isn't optimized
 
284
            # We use an interesting process for data_chunks
 
285
            # Specifically if we have "bisect_left([(start, len, entries)],
 
286
            #                                       (qstart,)])
 
287
            # If start == qstart, then we get the specific node. Otherwise we
 
288
            # get the previous node
 
289
            while True:
 
290
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
 
291
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
 
292
                    # The data starts here
 
293
                    data = data_chunks[idx][1][:cur_size]
 
294
                elif idx > 0:
 
295
                    # The data is in a portion of a previous page
 
296
                    idx -= 1
 
297
                    sub_offset = cur_offset - data_chunks[idx][0]
 
298
                    data = data_chunks[idx][1]
 
299
                    data = data[sub_offset:sub_offset + cur_size]
 
300
                else:
 
301
                    # We are missing the page where the data should be found,
 
302
                    # something is wrong
 
303
                    data = ''
 
304
                if len(data) != cur_size:
 
305
                    raise AssertionError('We must have miscalulated.'
 
306
                        ' We expected %d bytes, but only found %d'
 
307
                        % (cur_size, len(data)))
 
308
                yield cur_offset, data
 
309
                cur_offset, cur_size = offset_iter.next()
 
310
 
 
311
 
 
312
class SFTPTransport(ConnectedTransport):
 
313
    """Transport implementation for SFTP access."""
 
314
 
 
315
    _do_prefetch = _default_do_prefetch
 
316
    # TODO: jam 20060717 Conceivably these could be configurable, either
 
317
    #       by auto-tuning at run-time, or by a configuration (per host??)
 
318
    #       but the performance curve is pretty flat, so just going with
 
319
    #       reasonable defaults.
 
320
    _max_readv_combine = 200
 
321
    # Having to round trip to the server means waiting for a response,
 
322
    # so it is better to download extra bytes.
 
323
    # 8KiB had good performance for both local and remote network operations
 
324
    _bytes_to_read_before_seek = 8192
 
325
 
 
326
    # The sftp spec says that implementations SHOULD allow reads
 
327
    # to be at least 32K. paramiko.readv() does an async request
 
328
    # for the chunks. So we need to keep it within a single request
 
329
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
 
330
    # up the request itself, rather than us having to worry about it
 
331
    _max_request_size = 32768
 
332
 
 
333
    def _remote_path(self, relpath):
 
334
        """Return the path to be passed along the sftp protocol for relpath.
 
335
 
 
336
        :param relpath: is a urlencoded string.
 
337
        """
 
338
        remote_path = self._parsed_url.clone(relpath).path
 
339
        # the initial slash should be removed from the path, and treated as a
 
340
        # homedir relative path (the path begins with a double slash if it is
 
341
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
 
342
        # RBC 20060118 we are not using this as its too user hostile. instead
 
343
        # we are following lftp and using /~/foo to mean '~/foo'
 
344
        # vila--20070602 and leave absolute paths begin with a single slash.
 
345
        if remote_path.startswith('/~/'):
 
346
            remote_path = remote_path[3:]
 
347
        elif remote_path == '/~':
 
348
            remote_path = ''
 
349
        return remote_path
 
350
 
 
351
    def _create_connection(self, credentials=None):
 
352
        """Create a new connection with the provided credentials.
 
353
 
 
354
        :param credentials: The credentials needed to establish the connection.
 
355
 
 
356
        :return: The created connection and its associated credentials.
 
357
 
 
358
        The credentials are only the password as it may have been entered
 
359
        interactively by the user and may be different from the one provided
 
360
        in base url at transport creation time.
 
361
        """
 
362
        if credentials is None:
 
363
            password = self._parsed_url.password
 
364
        else:
 
365
            password = credentials
 
366
 
 
367
        vendor = ssh._get_ssh_vendor()
 
368
        user = self._parsed_url.user
 
369
        if user is None:
 
370
            auth = config.AuthenticationConfig()
 
371
            user = auth.get_user('ssh', self._parsed_url.host,
 
372
                self._parsed_url.port)
 
373
        connection = vendor.connect_sftp(self._parsed_url.user, password,
 
374
            self._parsed_url.host, self._parsed_url.port)
 
375
        return connection, (user, password)
 
376
 
 
377
    def disconnect(self):
 
378
        connection = self._get_connection()
 
379
        if connection is not None:
 
380
            connection.close()
 
381
 
 
382
    def _get_sftp(self):
 
383
        """Ensures that a connection is established"""
 
384
        connection = self._get_connection()
 
385
        if connection is None:
 
386
            # First connection ever
 
387
            connection, credentials = self._create_connection()
 
388
            self._set_connection(connection, credentials)
 
389
        return connection
 
390
 
 
391
    def has(self, relpath):
 
392
        """
 
393
        Does the target location exist?
 
394
        """
 
395
        try:
 
396
            self._get_sftp().stat(self._remote_path(relpath))
 
397
            # stat result is about 20 bytes, let's say
 
398
            self._report_activity(20, 'read')
 
399
            return True
 
400
        except IOError:
 
401
            return False
 
402
 
 
403
    def get(self, relpath):
 
404
        """Get the file at the given relative path.
 
405
 
 
406
        :param relpath: The relative path to the file
 
407
        """
 
408
        try:
 
409
            path = self._remote_path(relpath)
 
410
            f = self._get_sftp().file(path, mode='rb')
 
411
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
 
412
                f.prefetch()
 
413
            return f
 
414
        except (IOError, paramiko.SSHException), e:
 
415
            self._translate_io_exception(e, path, ': error retrieving',
 
416
                failure_exc=errors.ReadError)
 
417
 
 
418
    def get_bytes(self, relpath):
 
419
        # reimplement this here so that we can report how many bytes came back
 
420
        f = self.get(relpath)
 
421
        try:
 
422
            bytes = f.read()
 
423
            self._report_activity(len(bytes), 'read')
 
424
            return bytes
 
425
        finally:
 
426
            f.close()
 
427
 
 
428
    def _readv(self, relpath, offsets):
 
429
        """See Transport.readv()"""
 
430
        # We overload the default readv() because we want to use a file
 
431
        # that does not have prefetch enabled.
 
432
        # Also, if we have a new paramiko, it implements an async readv()
 
433
        if not offsets:
 
434
            return
 
435
 
 
436
        try:
 
437
            path = self._remote_path(relpath)
 
438
            fp = self._get_sftp().file(path, mode='rb')
 
439
            readv = getattr(fp, 'readv', None)
 
440
            if readv:
 
441
                return self._sftp_readv(fp, offsets, relpath)
 
442
            if 'sftp' in debug.debug_flags:
 
443
                mutter('seek and read %s offsets', len(offsets))
 
444
            return self._seek_and_read(fp, offsets, relpath)
 
445
        except (IOError, paramiko.SSHException), e:
 
446
            self._translate_io_exception(e, path, ': error retrieving')
 
447
 
 
448
    def recommended_page_size(self):
 
449
        """See Transport.recommended_page_size().
 
450
 
 
451
        For SFTP we suggest a large page size to reduce the overhead
 
452
        introduced by latency.
 
453
        """
 
454
        return 64 * 1024
 
455
 
 
456
    def _sftp_readv(self, fp, offsets, relpath):
 
457
        """Use the readv() member of fp to do async readv.
 
458
 
 
459
        Then read them using paramiko.readv(). paramiko.readv()
 
460
        does not support ranges > 64K, so it caps the request size, and
 
461
        just reads until it gets all the stuff it wants.
 
462
        """
 
463
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
 
464
        return helper.request_and_yield_offsets(fp)
 
465
 
 
466
    def put_file(self, relpath, f, mode=None):
 
467
        """
 
468
        Copy the file-like object into the location.
 
469
 
 
470
        :param relpath: Location to put the contents, relative to base.
 
471
        :param f:       File-like object.
 
472
        :param mode: The final mode for the file
 
473
        """
 
474
        final_path = self._remote_path(relpath)
 
475
        return self._put(final_path, f, mode=mode)
 
476
 
 
477
    def _put(self, abspath, f, mode=None):
 
478
        """Helper function so both put() and copy_abspaths can reuse the code"""
 
479
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
 
480
                        os.getpid(), random.randint(0,0x7FFFFFFF))
 
481
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
 
482
        closed = False
 
483
        try:
 
484
            try:
 
485
                fout.set_pipelined(True)
 
486
                length = self._pump(f, fout)
 
487
            except (IOError, paramiko.SSHException), e:
 
488
                self._translate_io_exception(e, tmp_abspath)
 
489
            # XXX: This doesn't truly help like we would like it to.
 
490
            #      The problem is that openssh strips sticky bits. So while we
 
491
            #      can properly set group write permission, we lose the group
 
492
            #      sticky bit. So it is probably best to stop chmodding, and
 
493
            #      just tell users that they need to set the umask correctly.
 
494
            #      The attr.st_mode = mode, in _sftp_open_exclusive
 
495
            #      will handle when the user wants the final mode to be more
 
496
            #      restrictive. And then we avoid a round trip. Unless
 
497
            #      paramiko decides to expose an async chmod()
 
498
 
 
499
            # This is designed to chmod() right before we close.
 
500
            # Because we set_pipelined() earlier, theoretically we might
 
501
            # avoid the round trip for fout.close()
 
502
            if mode is not None:
 
503
                self._get_sftp().chmod(tmp_abspath, mode)
 
504
            fout.close()
 
505
            closed = True
 
506
            self._rename_and_overwrite(tmp_abspath, abspath)
 
507
            return length
 
508
        except Exception, e:
 
509
            # If we fail, try to clean up the temporary file
 
510
            # before we throw the exception
 
511
            # but don't let another exception mess things up
 
512
            # Write out the traceback, because otherwise
 
513
            # the catch and throw destroys it
 
514
            import traceback
 
515
            mutter(traceback.format_exc())
 
516
            try:
 
517
                if not closed:
 
518
                    fout.close()
 
519
                self._get_sftp().remove(tmp_abspath)
 
520
            except:
 
521
                # raise the saved except
 
522
                raise e
 
523
            # raise the original with its traceback if we can.
 
524
            raise
 
525
 
 
526
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
 
527
                               create_parent_dir=False,
 
528
                               dir_mode=None):
 
529
        abspath = self._remote_path(relpath)
 
530
 
 
531
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
532
        #       set the file mode at create time. If it does, use it.
 
533
        #       But for now, we just chmod later anyway.
 
534
 
 
535
        def _open_and_write_file():
 
536
            """Try to open the target file, raise error on failure"""
 
537
            fout = None
 
538
            try:
 
539
                try:
 
540
                    fout = self._get_sftp().file(abspath, mode='wb')
 
541
                    fout.set_pipelined(True)
 
542
                    writer(fout)
 
543
                except (paramiko.SSHException, IOError), e:
 
544
                    self._translate_io_exception(e, abspath,
 
545
                                                 ': unable to open')
 
546
 
 
547
                # This is designed to chmod() right before we close.
 
548
                # Because we set_pipelined() earlier, theoretically we might
 
549
                # avoid the round trip for fout.close()
 
550
                if mode is not None:
 
551
                    self._get_sftp().chmod(abspath, mode)
 
552
            finally:
 
553
                if fout is not None:
 
554
                    fout.close()
 
555
 
 
556
        if not create_parent_dir:
 
557
            _open_and_write_file()
 
558
            return
 
559
 
 
560
        # Try error handling to create the parent directory if we need to
 
561
        try:
 
562
            _open_and_write_file()
 
563
        except NoSuchFile:
 
564
            # Try to create the parent directory, and then go back to
 
565
            # writing the file
 
566
            parent_dir = os.path.dirname(abspath)
 
567
            self._mkdir(parent_dir, dir_mode)
 
568
            _open_and_write_file()
 
569
 
 
570
    def put_file_non_atomic(self, relpath, f, mode=None,
 
571
                            create_parent_dir=False,
 
572
                            dir_mode=None):
 
573
        """Copy the file-like object into the target location.
 
574
 
 
575
        This function is not strictly safe to use. It is only meant to
 
576
        be used when you already know that the target does not exist.
 
577
        It is not safe, because it will open and truncate the remote
 
578
        file. So there may be a time when the file has invalid contents.
 
579
 
 
580
        :param relpath: The remote location to put the contents.
 
581
        :param f:       File-like object.
 
582
        :param mode:    Possible access permissions for new file.
 
583
                        None means do not set remote permissions.
 
584
        :param create_parent_dir: If we cannot create the target file because
 
585
                        the parent directory does not exist, go ahead and
 
586
                        create it, and then try again.
 
587
        """
 
588
        def writer(fout):
 
589
            self._pump(f, fout)
 
590
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
591
                                    create_parent_dir=create_parent_dir,
 
592
                                    dir_mode=dir_mode)
 
593
 
 
594
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
595
                             create_parent_dir=False,
 
596
                             dir_mode=None):
 
597
        def writer(fout):
 
598
            fout.write(bytes)
 
599
        self._put_non_atomic_helper(relpath, writer, mode=mode,
 
600
                                    create_parent_dir=create_parent_dir,
 
601
                                    dir_mode=dir_mode)
 
602
 
 
603
    def iter_files_recursive(self):
 
604
        """Walk the relative paths of all files in this transport."""
 
605
        # progress is handled by list_dir
 
606
        queue = list(self.list_dir('.'))
 
607
        while queue:
 
608
            relpath = queue.pop(0)
 
609
            st = self.stat(relpath)
 
610
            if stat.S_ISDIR(st.st_mode):
 
611
                for i, basename in enumerate(self.list_dir(relpath)):
 
612
                    queue.insert(i, relpath+'/'+basename)
 
613
            else:
 
614
                yield relpath
 
615
 
 
616
    def _mkdir(self, abspath, mode=None):
 
617
        if mode is None:
 
618
            local_mode = 0777
 
619
        else:
 
620
            local_mode = mode
 
621
        try:
 
622
            self._report_activity(len(abspath), 'write')
 
623
            self._get_sftp().mkdir(abspath, local_mode)
 
624
            self._report_activity(1, 'read')
 
625
            if mode is not None:
 
626
                # chmod a dir through sftp will erase any sgid bit set
 
627
                # on the server side.  So, if the bit mode are already
 
628
                # set, avoid the chmod.  If the mode is not fine but
 
629
                # the sgid bit is set, report a warning to the user
 
630
                # with the umask fix.
 
631
                stat = self._get_sftp().lstat(abspath)
 
632
                mode = mode & 0777 # can't set special bits anyway
 
633
                if mode != stat.st_mode & 0777:
 
634
                    if stat.st_mode & 06000:
 
635
                        warning('About to chmod %s over sftp, which will result'
 
636
                                ' in its suid or sgid bits being cleared.  If'
 
637
                                ' you want to preserve those bits, change your '
 
638
                                ' environment on the server to use umask 0%03o.'
 
639
                                % (abspath, 0777 - mode))
 
640
                    self._get_sftp().chmod(abspath, mode=mode)
 
641
        except (paramiko.SSHException, IOError), e:
 
642
            self._translate_io_exception(e, abspath, ': unable to mkdir',
 
643
                failure_exc=FileExists)
 
644
 
 
645
    def mkdir(self, relpath, mode=None):
 
646
        """Create a directory at the given path."""
 
647
        self._mkdir(self._remote_path(relpath), mode=mode)
 
648
 
 
649
    def open_write_stream(self, relpath, mode=None):
 
650
        """See Transport.open_write_stream."""
 
651
        # initialise the file to zero-length
 
652
        # this is three round trips, but we don't use this
 
653
        # api more than once per write_group at the moment so
 
654
        # it is a tolerable overhead. Better would be to truncate
 
655
        # the file after opening. RBC 20070805
 
656
        self.put_bytes_non_atomic(relpath, "", mode)
 
657
        abspath = self._remote_path(relpath)
 
658
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
 
659
        #       set the file mode at create time. If it does, use it.
 
660
        #       But for now, we just chmod later anyway.
 
661
        handle = None
 
662
        try:
 
663
            handle = self._get_sftp().file(abspath, mode='wb')
 
664
            handle.set_pipelined(True)
 
665
        except (paramiko.SSHException, IOError), e:
 
666
            self._translate_io_exception(e, abspath,
 
667
                                         ': unable to open')
 
668
        _file_streams[self.abspath(relpath)] = handle
 
669
        return FileFileStream(self, relpath, handle)
 
670
 
 
671
    def _translate_io_exception(self, e, path, more_info='',
 
672
                                failure_exc=PathError):
 
673
        """Translate a paramiko or IOError into a friendlier exception.
 
674
 
 
675
        :param e: The original exception
 
676
        :param path: The path in question when the error is raised
 
677
        :param more_info: Extra information that can be included,
 
678
                          such as what was going on
 
679
        :param failure_exc: Paramiko has the super fun ability to raise completely
 
680
                           opaque errors that just set "e.args = ('Failure',)" with
 
681
                           no more information.
 
682
                           If this parameter is set, it defines the exception
 
683
                           to raise in these cases.
 
684
        """
 
685
        # paramiko seems to generate detailless errors.
 
686
        self._translate_error(e, path, raise_generic=False)
 
687
        if getattr(e, 'args', None) is not None:
 
688
            if (e.args == ('No such file or directory',) or
 
689
                e.args == ('No such file',)):
 
690
                raise NoSuchFile(path, str(e) + more_info)
 
691
            if (e.args == ('mkdir failed',) or
 
692
                e.args[0].startswith('syserr: File exists')):
 
693
                raise FileExists(path, str(e) + more_info)
 
694
            # strange but true, for the paramiko server.
 
695
            if (e.args == ('Failure',)):
 
696
                raise failure_exc(path, str(e) + more_info)
 
697
            # Can be something like args = ('Directory not empty:
 
698
            # '/srv/bazaar.launchpad.net/blah...: '
 
699
            # [Errno 39] Directory not empty',)
 
700
            if (e.args[0].startswith('Directory not empty: ')
 
701
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
 
702
                raise errors.DirectoryNotEmpty(path, str(e))
 
703
            if e.args == ('Operation unsupported',):
 
704
                raise errors.TransportNotPossible()
 
705
            mutter('Raising exception with args %s', e.args)
 
706
        if getattr(e, 'errno', None) is not None:
 
707
            mutter('Raising exception with errno %s', e.errno)
 
708
        raise e
 
709
 
 
710
    def append_file(self, relpath, f, mode=None):
 
711
        """
 
712
        Append the text in the file-like object into the final
 
713
        location.
 
714
        """
 
715
        try:
 
716
            path = self._remote_path(relpath)
 
717
            fout = self._get_sftp().file(path, 'ab')
 
718
            if mode is not None:
 
719
                self._get_sftp().chmod(path, mode)
 
720
            result = fout.tell()
 
721
            self._pump(f, fout)
 
722
            return result
 
723
        except (IOError, paramiko.SSHException), e:
 
724
            self._translate_io_exception(e, relpath, ': unable to append')
 
725
 
 
726
    def rename(self, rel_from, rel_to):
 
727
        """Rename without special overwriting"""
 
728
        try:
 
729
            self._get_sftp().rename(self._remote_path(rel_from),
 
730
                              self._remote_path(rel_to))
 
731
        except (IOError, paramiko.SSHException), e:
 
732
            self._translate_io_exception(e, rel_from,
 
733
                    ': unable to rename to %r' % (rel_to))
 
734
 
 
735
    def _rename_and_overwrite(self, abs_from, abs_to):
 
736
        """Do a fancy rename on the remote server.
 
737
 
 
738
        Using the implementation provided by osutils.
 
739
        """
 
740
        try:
 
741
            sftp = self._get_sftp()
 
742
            fancy_rename(abs_from, abs_to,
 
743
                         rename_func=sftp.rename,
 
744
                         unlink_func=sftp.remove)
 
745
        except (IOError, paramiko.SSHException), e:
 
746
            self._translate_io_exception(e, abs_from,
 
747
                                         ': unable to rename to %r' % (abs_to))
 
748
 
 
749
    def move(self, rel_from, rel_to):
 
750
        """Move the item at rel_from to the location at rel_to"""
 
751
        path_from = self._remote_path(rel_from)
 
752
        path_to = self._remote_path(rel_to)
 
753
        self._rename_and_overwrite(path_from, path_to)
 
754
 
 
755
    def delete(self, relpath):
 
756
        """Delete the item at relpath"""
 
757
        path = self._remote_path(relpath)
 
758
        try:
 
759
            self._get_sftp().remove(path)
 
760
        except (IOError, paramiko.SSHException), e:
 
761
            self._translate_io_exception(e, path, ': unable to delete')
 
762
 
 
763
    def external_url(self):
 
764
        """See bzrlib.transport.Transport.external_url."""
 
765
        # the external path for SFTP is the base
 
766
        return self.base
 
767
 
 
768
    def listable(self):
 
769
        """Return True if this store supports listing."""
 
770
        return True
 
771
 
 
772
    def list_dir(self, relpath):
 
773
        """
 
774
        Return a list of all files at the given location.
 
775
        """
 
776
        # does anything actually use this?
 
777
        # -- Unknown
 
778
        # This is at least used by copy_tree for remote upgrades.
 
779
        # -- David Allouche 2006-08-11
 
780
        path = self._remote_path(relpath)
 
781
        try:
 
782
            entries = self._get_sftp().listdir(path)
 
783
            self._report_activity(sum(map(len, entries)), 'read')
 
784
        except (IOError, paramiko.SSHException), e:
 
785
            self._translate_io_exception(e, path, ': failed to list_dir')
 
786
        return [urlutils.escape(entry) for entry in entries]
 
787
 
 
788
    def rmdir(self, relpath):
 
789
        """See Transport.rmdir."""
 
790
        path = self._remote_path(relpath)
 
791
        try:
 
792
            return self._get_sftp().rmdir(path)
 
793
        except (IOError, paramiko.SSHException), e:
 
794
            self._translate_io_exception(e, path, ': failed to rmdir')
 
795
 
 
796
    def stat(self, relpath):
 
797
        """Return the stat information for a file."""
 
798
        path = self._remote_path(relpath)
 
799
        try:
 
800
            return self._get_sftp().lstat(path)
 
801
        except (IOError, paramiko.SSHException), e:
 
802
            self._translate_io_exception(e, path, ': unable to stat')
 
803
 
 
804
    def readlink(self, relpath):
 
805
        """See Transport.readlink."""
 
806
        path = self._remote_path(relpath)
 
807
        try:
 
808
            return self._get_sftp().readlink(path)
 
809
        except (IOError, paramiko.SSHException), e:
 
810
            self._translate_io_exception(e, path, ': unable to readlink')
 
811
 
 
812
    def symlink(self, source, link_name):
 
813
        """See Transport.symlink."""
 
814
        try:
 
815
            conn = self._get_sftp()
 
816
            sftp_retval = conn.symlink(source, link_name)
 
817
            if SFTP_OK != sftp_retval:
 
818
                raise TransportError(
 
819
                    '%r: unable to create symlink to %r' % (link_name, source),
 
820
                    sftp_retval
 
821
                )
 
822
        except (IOError, paramiko.SSHException), e:
 
823
            self._translate_io_exception(e, link_name,
 
824
                                         ': unable to create symlink to %r' % (source))
 
825
 
 
826
    def lock_read(self, relpath):
 
827
        """
 
828
        Lock the given file for shared (read) access.
 
829
        :return: A lock object, which has an unlock() member function
 
830
        """
 
831
        # FIXME: there should be something clever i can do here...
 
832
        class BogusLock(object):
 
833
            def __init__(self, path):
 
834
                self.path = path
 
835
            def unlock(self):
 
836
                pass
 
837
        return BogusLock(relpath)
 
838
 
 
839
    def lock_write(self, relpath):
 
840
        """
 
841
        Lock the given file for exclusive (write) access.
 
842
        WARNING: many transports do not support this, so trying avoid using it
 
843
 
 
844
        :return: A lock object, which has an unlock() member function
 
845
        """
 
846
        # This is a little bit bogus, but basically, we create a file
 
847
        # which should not already exist, and if it does, we assume
 
848
        # that there is a lock, and if it doesn't, the we assume
 
849
        # that we have taken the lock.
 
850
        return SFTPLock(relpath, self)
 
851
 
 
852
    def _sftp_open_exclusive(self, abspath, mode=None):
 
853
        """Open a remote path exclusively.
 
854
 
 
855
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
 
856
        the file already exists. However it does not expose this
 
857
        at the higher level of SFTPClient.open(), so we have to
 
858
        sneak away with it.
 
859
 
 
860
        WARNING: This breaks the SFTPClient abstraction, so it
 
861
        could easily break against an updated version of paramiko.
 
862
 
 
863
        :param abspath: The remote absolute path where the file should be opened
 
864
        :param mode: The mode permissions bits for the new file
 
865
        """
 
866
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
 
867
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
 
868
        #       However, there is no way to set the permission mode at open
 
869
        #       time using the sftp_client.file() functionality.
 
870
        path = self._get_sftp()._adjust_cwd(abspath)
 
871
        # mutter('sftp abspath %s => %s', abspath, path)
 
872
        attr = SFTPAttributes()
 
873
        if mode is not None:
 
874
            attr.st_mode = mode
 
875
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
 
876
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
 
877
        try:
 
878
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
 
879
            if t != CMD_HANDLE:
 
880
                raise TransportError('Expected an SFTP handle')
 
881
            handle = msg.get_string()
 
882
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
 
883
        except (paramiko.SSHException, IOError), e:
 
884
            self._translate_io_exception(e, abspath, ': unable to open',
 
885
                failure_exc=FileExists)
 
886
 
 
887
    def _can_roundtrip_unix_modebits(self):
 
888
        if sys.platform == 'win32':
 
889
            # anyone else?
 
890
            return False
 
891
        else:
 
892
            return True
 
893
 
 
894
 
 
895
def get_test_permutations():
 
896
    """Return the permutations to be used in testing."""
 
897
    from bzrlib.tests import stub_sftp
 
898
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
 
899
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
 
900
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
 
901
            ]