~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/sftp.py

  • Committer: mbp at sourcefrog
  • Date: 2005-03-09 07:14:22 UTC
  • Revision ID: mbp@sourcefrog.net-20050309071421-eb5d3514d415bc4c
write inventory to temporary file and atomically replace

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Implementation of Transport over SFTP, using paramiko."""
18
 
 
19
 
from __future__ import absolute_import
20
 
 
21
 
# TODO: Remove the transport-based lock_read and lock_write methods.  They'll
22
 
# then raise TransportNotPossible, which will break remote access to any
23
 
# formats which rely on OS-level locks.  That should be fine as those formats
24
 
# are pretty old, but these combinations may have to be removed from the test
25
 
# suite.  Those formats all date back to 0.7; so we should be able to remove
26
 
# these methods when we officially drop support for those formats.
27
 
 
28
 
import bisect
29
 
import errno
30
 
import itertools
31
 
import os
32
 
import random
33
 
import stat
34
 
import sys
35
 
import time
36
 
import warnings
37
 
 
38
 
from bzrlib import (
39
 
    config,
40
 
    debug,
41
 
    errors,
42
 
    urlutils,
43
 
    )
44
 
from bzrlib.errors import (FileExists,
45
 
                           NoSuchFile,
46
 
                           TransportError,
47
 
                           LockError,
48
 
                           PathError,
49
 
                           ParamikoNotPresent,
50
 
                           )
51
 
from bzrlib.osutils import fancy_rename
52
 
from bzrlib.trace import mutter, warning
53
 
from bzrlib.transport import (
54
 
    FileFileStream,
55
 
    _file_streams,
56
 
    ssh,
57
 
    ConnectedTransport,
58
 
    )
59
 
 
60
 
# Disable one particular warning that comes from paramiko in Python2.5; if
61
 
# this is emitted at the wrong time it tends to cause spurious test failures
62
 
# or at least noise in the test case::
63
 
#
64
 
# [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
65
 
# test_permissions.TestSftpPermissions.test_new_files
66
 
# /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
67
 
#  self.packet.write(struct.pack('>I', n))
68
 
warnings.filterwarnings('ignore',
69
 
        'integer argument expected, got float',
70
 
        category=DeprecationWarning,
71
 
        module='paramiko.message')
72
 
 
73
 
try:
74
 
    import paramiko
75
 
except ImportError, e:
76
 
    raise ParamikoNotPresent(e)
77
 
else:
78
 
    from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
79
 
                               SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
80
 
                               SFTP_OK, CMD_HANDLE, CMD_OPEN)
81
 
    from paramiko.sftp_attr import SFTPAttributes
82
 
    from paramiko.sftp_file import SFTPFile
83
 
 
84
 
 
85
 
_paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
86
 
# don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
87
 
_default_do_prefetch = (_paramiko_version >= (1, 5, 5))
88
 
 
89
 
 
90
 
class SFTPLock(object):
91
 
    """This fakes a lock in a remote location.
92
 
 
93
 
    A present lock is indicated just by the existence of a file.  This
94
 
    doesn't work well on all transports and they are only used in
95
 
    deprecated storage formats.
96
 
    """
97
 
 
98
 
    __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
99
 
 
100
 
    def __init__(self, path, transport):
101
 
        self.lock_file = None
102
 
        self.path = path
103
 
        self.lock_path = path + '.write-lock'
104
 
        self.transport = transport
105
 
        try:
106
 
            # RBC 20060103 FIXME should we be using private methods here ?
107
 
            abspath = transport._remote_path(self.lock_path)
108
 
            self.lock_file = transport._sftp_open_exclusive(abspath)
109
 
        except FileExists:
110
 
            raise LockError('File %r already locked' % (self.path,))
111
 
 
112
 
    def unlock(self):
113
 
        if not self.lock_file:
114
 
            return
115
 
        self.lock_file.close()
116
 
        self.lock_file = None
117
 
        try:
118
 
            self.transport.delete(self.lock_path)
119
 
        except (NoSuchFile,):
120
 
            # What specific errors should we catch here?
121
 
            pass
122
 
 
123
 
 
124
 
class _SFTPReadvHelper(object):
125
 
    """A class to help with managing the state of a readv request."""
126
 
 
127
 
    # See _get_requests for an explanation.
128
 
    _max_request_size = 32768
129
 
 
130
 
    def __init__(self, original_offsets, relpath, _report_activity):
131
 
        """Create a new readv helper.
132
 
 
133
 
        :param original_offsets: The original requests given by the caller of
134
 
            readv()
135
 
        :param relpath: The name of the file (if known)
136
 
        :param _report_activity: A Transport._report_activity bound method,
137
 
            to be called as data arrives.
138
 
        """
139
 
        self.original_offsets = list(original_offsets)
140
 
        self.relpath = relpath
141
 
        self._report_activity = _report_activity
142
 
 
143
 
    def _get_requests(self):
144
 
        """Break up the offsets into individual requests over sftp.
145
 
 
146
 
        The SFTP spec only requires implementers to support 32kB requests. We
147
 
        could try something larger (openssh supports 64kB), but then we have to
148
 
        handle requests that fail.
149
 
        So instead, we just break up our maximum chunks into 32kB chunks, and
150
 
        asyncronously requests them.
151
 
        Newer versions of paramiko would do the chunking for us, but we want to
152
 
        start processing results right away, so we do it ourselves.
153
 
        """
154
 
        # TODO: Because we issue async requests, we don't 'fudge' any extra
155
 
        #       data.  I'm not 100% sure that is the best choice.
156
 
 
157
 
        # The first thing we do, is to collapse the individual requests as much
158
 
        # as possible, so we don't issues requests <32kB
159
 
        sorted_offsets = sorted(self.original_offsets)
160
 
        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
161
 
                                                        limit=0, fudge_factor=0))
162
 
        requests = []
163
 
        for c_offset in coalesced:
164
 
            start = c_offset.start
165
 
            size = c_offset.length
166
 
 
167
 
            # Break this up into 32kB requests
168
 
            while size > 0:
169
 
                next_size = min(size, self._max_request_size)
170
 
                requests.append((start, next_size))
171
 
                size -= next_size
172
 
                start += next_size
173
 
        if 'sftp' in debug.debug_flags:
174
 
            mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
175
 
                self.relpath, len(sorted_offsets), len(coalesced),
176
 
                len(requests))
177
 
        return requests
178
 
 
179
 
    def request_and_yield_offsets(self, fp):
180
 
        """Request the data from the remote machine, yielding the results.
181
 
 
182
 
        :param fp: A Paramiko SFTPFile object that supports readv.
183
 
        :return: Yield the data requested by the original readv caller, one by
184
 
            one.
185
 
        """
186
 
        requests = self._get_requests()
187
 
        offset_iter = iter(self.original_offsets)
188
 
        cur_offset, cur_size = offset_iter.next()
189
 
        # paramiko .readv() yields strings that are in the order of the requests
190
 
        # So we track the current request to know where the next data is
191
 
        # being returned from.
192
 
        input_start = None
193
 
        last_end = None
194
 
        buffered_data = []
195
 
        buffered_len = 0
196
 
 
197
 
        # This is used to buffer chunks which we couldn't process yet
198
 
        # It is (start, end, data) tuples.
199
 
        data_chunks = []
200
 
        # Create an 'unlimited' data stream, so we stop based on requests,
201
 
        # rather than just because the data stream ended. This lets us detect
202
 
        # short readv.
203
 
        data_stream = itertools.chain(fp.readv(requests),
204
 
                                      itertools.repeat(None))
205
 
        for (start, length), data in itertools.izip(requests, data_stream):
206
 
            if data is None:
207
 
                if cur_coalesced is not None:
208
 
                    raise errors.ShortReadvError(self.relpath,
209
 
                        start, length, len(data))
210
 
            if len(data) != length:
211
 
                raise errors.ShortReadvError(self.relpath,
212
 
                    start, length, len(data))
213
 
            self._report_activity(length, 'read')
214
 
            if last_end is None:
215
 
                # This is the first request, just buffer it
216
 
                buffered_data = [data]
217
 
                buffered_len = length
218
 
                input_start = start
219
 
            elif start == last_end:
220
 
                # The data we are reading fits neatly on the previous
221
 
                # buffer, so this is all part of a larger coalesced range.
222
 
                buffered_data.append(data)
223
 
                buffered_len += length
224
 
            else:
225
 
                # We have an 'interrupt' in the data stream. So we know we are
226
 
                # at a request boundary.
227
 
                if buffered_len > 0:
228
 
                    # We haven't consumed the buffer so far, so put it into
229
 
                    # data_chunks, and continue.
230
 
                    buffered = ''.join(buffered_data)
231
 
                    data_chunks.append((input_start, buffered))
232
 
                input_start = start
233
 
                buffered_data = [data]
234
 
                buffered_len = length
235
 
            last_end = start + length
236
 
            if input_start == cur_offset and cur_size <= buffered_len:
237
 
                # Simplify the next steps a bit by transforming buffered_data
238
 
                # into a single string. We also have the nice property that
239
 
                # when there is only one string ''.join([x]) == x, so there is
240
 
                # no data copying.
241
 
                buffered = ''.join(buffered_data)
242
 
                # Clean out buffered data so that we keep memory
243
 
                # consumption low
244
 
                del buffered_data[:]
245
 
                buffered_offset = 0
246
 
                # TODO: We *could* also consider the case where cur_offset is in
247
 
                #       in the buffered range, even though it doesn't *start*
248
 
                #       the buffered range. But for packs we pretty much always
249
 
                #       read in order, so you won't get any extra data in the
250
 
                #       middle.
251
 
                while (input_start == cur_offset
252
 
                       and (buffered_offset + cur_size) <= buffered_len):
253
 
                    # We've buffered enough data to process this request, spit it
254
 
                    # out
255
 
                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
256
 
                    # move the direct pointer into our buffered data
257
 
                    buffered_offset += cur_size
258
 
                    # Move the start-of-buffer pointer
259
 
                    input_start += cur_size
260
 
                    # Yield the requested data
261
 
                    yield cur_offset, cur_data
262
 
                    cur_offset, cur_size = offset_iter.next()
263
 
                # at this point, we've consumed as much of buffered as we can,
264
 
                # so break off the portion that we consumed
265
 
                if buffered_offset == len(buffered_data):
266
 
                    # No tail to leave behind
267
 
                    buffered_data = []
268
 
                    buffered_len = 0
269
 
                else:
270
 
                    buffered = buffered[buffered_offset:]
271
 
                    buffered_data = [buffered]
272
 
                    buffered_len = len(buffered)
273
 
        # now that the data stream is done, close the handle
274
 
        fp.close()
275
 
        if buffered_len:
276
 
            buffered = ''.join(buffered_data)
277
 
            del buffered_data[:]
278
 
            data_chunks.append((input_start, buffered))
279
 
        if data_chunks:
280
 
            if 'sftp' in debug.debug_flags:
281
 
                mutter('SFTP readv left with %d out-of-order bytes',
282
 
                    sum(map(lambda x: len(x[1]), data_chunks)))
283
 
            # We've processed all the readv data, at this point, anything we
284
 
            # couldn't process is in data_chunks. This doesn't happen often, so
285
 
            # this code path isn't optimized
286
 
            # We use an interesting process for data_chunks
287
 
            # Specifically if we have "bisect_left([(start, len, entries)],
288
 
            #                                       (qstart,)])
289
 
            # If start == qstart, then we get the specific node. Otherwise we
290
 
            # get the previous node
291
 
            while True:
292
 
                idx = bisect.bisect_left(data_chunks, (cur_offset,))
293
 
                if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
294
 
                    # The data starts here
295
 
                    data = data_chunks[idx][1][:cur_size]
296
 
                elif idx > 0:
297
 
                    # The data is in a portion of a previous page
298
 
                    idx -= 1
299
 
                    sub_offset = cur_offset - data_chunks[idx][0]
300
 
                    data = data_chunks[idx][1]
301
 
                    data = data[sub_offset:sub_offset + cur_size]
302
 
                else:
303
 
                    # We are missing the page where the data should be found,
304
 
                    # something is wrong
305
 
                    data = ''
306
 
                if len(data) != cur_size:
307
 
                    raise AssertionError('We must have miscalulated.'
308
 
                        ' We expected %d bytes, but only found %d'
309
 
                        % (cur_size, len(data)))
310
 
                yield cur_offset, data
311
 
                cur_offset, cur_size = offset_iter.next()
312
 
 
313
 
 
314
 
class SFTPTransport(ConnectedTransport):
315
 
    """Transport implementation for SFTP access."""
316
 
 
317
 
    _do_prefetch = _default_do_prefetch
318
 
    # TODO: jam 20060717 Conceivably these could be configurable, either
319
 
    #       by auto-tuning at run-time, or by a configuration (per host??)
320
 
    #       but the performance curve is pretty flat, so just going with
321
 
    #       reasonable defaults.
322
 
    _max_readv_combine = 200
323
 
    # Having to round trip to the server means waiting for a response,
324
 
    # so it is better to download extra bytes.
325
 
    # 8KiB had good performance for both local and remote network operations
326
 
    _bytes_to_read_before_seek = 8192
327
 
 
328
 
    # The sftp spec says that implementations SHOULD allow reads
329
 
    # to be at least 32K. paramiko.readv() does an async request
330
 
    # for the chunks. So we need to keep it within a single request
331
 
    # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
332
 
    # up the request itself, rather than us having to worry about it
333
 
    _max_request_size = 32768
334
 
 
335
 
    def _remote_path(self, relpath):
336
 
        """Return the path to be passed along the sftp protocol for relpath.
337
 
 
338
 
        :param relpath: is a urlencoded string.
339
 
        """
340
 
        remote_path = self._parsed_url.clone(relpath).path
341
 
        # the initial slash should be removed from the path, and treated as a
342
 
        # homedir relative path (the path begins with a double slash if it is
343
 
        # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
344
 
        # RBC 20060118 we are not using this as its too user hostile. instead
345
 
        # we are following lftp and using /~/foo to mean '~/foo'
346
 
        # vila--20070602 and leave absolute paths begin with a single slash.
347
 
        if remote_path.startswith('/~/'):
348
 
            remote_path = remote_path[3:]
349
 
        elif remote_path == '/~':
350
 
            remote_path = ''
351
 
        return remote_path
352
 
 
353
 
    def _create_connection(self, credentials=None):
354
 
        """Create a new connection with the provided credentials.
355
 
 
356
 
        :param credentials: The credentials needed to establish the connection.
357
 
 
358
 
        :return: The created connection and its associated credentials.
359
 
 
360
 
        The credentials are only the password as it may have been entered
361
 
        interactively by the user and may be different from the one provided
362
 
        in base url at transport creation time.
363
 
        """
364
 
        if credentials is None:
365
 
            password = self._parsed_url.password
366
 
        else:
367
 
            password = credentials
368
 
 
369
 
        vendor = ssh._get_ssh_vendor()
370
 
        user = self._parsed_url.user
371
 
        if user is None:
372
 
            auth = config.AuthenticationConfig()
373
 
            user = auth.get_user('ssh', self._parsed_url.host,
374
 
                self._parsed_url.port)
375
 
        connection = vendor.connect_sftp(self._parsed_url.user, password,
376
 
            self._parsed_url.host, self._parsed_url.port)
377
 
        return connection, (user, password)
378
 
 
379
 
    def disconnect(self):
380
 
        connection = self._get_connection()
381
 
        if connection is not None:
382
 
            connection.close()
383
 
 
384
 
    def _get_sftp(self):
385
 
        """Ensures that a connection is established"""
386
 
        connection = self._get_connection()
387
 
        if connection is None:
388
 
            # First connection ever
389
 
            connection, credentials = self._create_connection()
390
 
            self._set_connection(connection, credentials)
391
 
        return connection
392
 
 
393
 
    def has(self, relpath):
394
 
        """
395
 
        Does the target location exist?
396
 
        """
397
 
        try:
398
 
            self._get_sftp().stat(self._remote_path(relpath))
399
 
            # stat result is about 20 bytes, let's say
400
 
            self._report_activity(20, 'read')
401
 
            return True
402
 
        except IOError:
403
 
            return False
404
 
 
405
 
    def get(self, relpath):
406
 
        """Get the file at the given relative path.
407
 
 
408
 
        :param relpath: The relative path to the file
409
 
        """
410
 
        try:
411
 
            path = self._remote_path(relpath)
412
 
            f = self._get_sftp().file(path, mode='rb')
413
 
            if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
414
 
                f.prefetch()
415
 
            return f
416
 
        except (IOError, paramiko.SSHException), e:
417
 
            self._translate_io_exception(e, path, ': error retrieving',
418
 
                failure_exc=errors.ReadError)
419
 
 
420
 
    def get_bytes(self, relpath):
421
 
        # reimplement this here so that we can report how many bytes came back
422
 
        f = self.get(relpath)
423
 
        try:
424
 
            bytes = f.read()
425
 
            self._report_activity(len(bytes), 'read')
426
 
            return bytes
427
 
        finally:
428
 
            f.close()
429
 
 
430
 
    def _readv(self, relpath, offsets):
431
 
        """See Transport.readv()"""
432
 
        # We overload the default readv() because we want to use a file
433
 
        # that does not have prefetch enabled.
434
 
        # Also, if we have a new paramiko, it implements an async readv()
435
 
        if not offsets:
436
 
            return
437
 
 
438
 
        try:
439
 
            path = self._remote_path(relpath)
440
 
            fp = self._get_sftp().file(path, mode='rb')
441
 
            readv = getattr(fp, 'readv', None)
442
 
            if readv:
443
 
                return self._sftp_readv(fp, offsets, relpath)
444
 
            if 'sftp' in debug.debug_flags:
445
 
                mutter('seek and read %s offsets', len(offsets))
446
 
            return self._seek_and_read(fp, offsets, relpath)
447
 
        except (IOError, paramiko.SSHException), e:
448
 
            self._translate_io_exception(e, path, ': error retrieving')
449
 
 
450
 
    def recommended_page_size(self):
451
 
        """See Transport.recommended_page_size().
452
 
 
453
 
        For SFTP we suggest a large page size to reduce the overhead
454
 
        introduced by latency.
455
 
        """
456
 
        return 64 * 1024
457
 
 
458
 
    def _sftp_readv(self, fp, offsets, relpath):
459
 
        """Use the readv() member of fp to do async readv.
460
 
 
461
 
        Then read them using paramiko.readv(). paramiko.readv()
462
 
        does not support ranges > 64K, so it caps the request size, and
463
 
        just reads until it gets all the stuff it wants.
464
 
        """
465
 
        helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
466
 
        return helper.request_and_yield_offsets(fp)
467
 
 
468
 
    def put_file(self, relpath, f, mode=None):
469
 
        """
470
 
        Copy the file-like object into the location.
471
 
 
472
 
        :param relpath: Location to put the contents, relative to base.
473
 
        :param f:       File-like object.
474
 
        :param mode: The final mode for the file
475
 
        """
476
 
        final_path = self._remote_path(relpath)
477
 
        return self._put(final_path, f, mode=mode)
478
 
 
479
 
    def _put(self, abspath, f, mode=None):
480
 
        """Helper function so both put() and copy_abspaths can reuse the code"""
481
 
        tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
482
 
                        os.getpid(), random.randint(0,0x7FFFFFFF))
483
 
        fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
484
 
        closed = False
485
 
        try:
486
 
            try:
487
 
                fout.set_pipelined(True)
488
 
                length = self._pump(f, fout)
489
 
            except (IOError, paramiko.SSHException), e:
490
 
                self._translate_io_exception(e, tmp_abspath)
491
 
            # XXX: This doesn't truly help like we would like it to.
492
 
            #      The problem is that openssh strips sticky bits. So while we
493
 
            #      can properly set group write permission, we lose the group
494
 
            #      sticky bit. So it is probably best to stop chmodding, and
495
 
            #      just tell users that they need to set the umask correctly.
496
 
            #      The attr.st_mode = mode, in _sftp_open_exclusive
497
 
            #      will handle when the user wants the final mode to be more
498
 
            #      restrictive. And then we avoid a round trip. Unless
499
 
            #      paramiko decides to expose an async chmod()
500
 
 
501
 
            # This is designed to chmod() right before we close.
502
 
            # Because we set_pipelined() earlier, theoretically we might
503
 
            # avoid the round trip for fout.close()
504
 
            if mode is not None:
505
 
                self._get_sftp().chmod(tmp_abspath, mode)
506
 
            fout.close()
507
 
            closed = True
508
 
            self._rename_and_overwrite(tmp_abspath, abspath)
509
 
            return length
510
 
        except Exception, e:
511
 
            # If we fail, try to clean up the temporary file
512
 
            # before we throw the exception
513
 
            # but don't let another exception mess things up
514
 
            # Write out the traceback, because otherwise
515
 
            # the catch and throw destroys it
516
 
            import traceback
517
 
            mutter(traceback.format_exc())
518
 
            try:
519
 
                if not closed:
520
 
                    fout.close()
521
 
                self._get_sftp().remove(tmp_abspath)
522
 
            except:
523
 
                # raise the saved except
524
 
                raise e
525
 
            # raise the original with its traceback if we can.
526
 
            raise
527
 
 
528
 
    def _put_non_atomic_helper(self, relpath, writer, mode=None,
529
 
                               create_parent_dir=False,
530
 
                               dir_mode=None):
531
 
        abspath = self._remote_path(relpath)
532
 
 
533
 
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
534
 
        #       set the file mode at create time. If it does, use it.
535
 
        #       But for now, we just chmod later anyway.
536
 
 
537
 
        def _open_and_write_file():
538
 
            """Try to open the target file, raise error on failure"""
539
 
            fout = None
540
 
            try:
541
 
                try:
542
 
                    fout = self._get_sftp().file(abspath, mode='wb')
543
 
                    fout.set_pipelined(True)
544
 
                    writer(fout)
545
 
                except (paramiko.SSHException, IOError), e:
546
 
                    self._translate_io_exception(e, abspath,
547
 
                                                 ': unable to open')
548
 
 
549
 
                # This is designed to chmod() right before we close.
550
 
                # Because we set_pipelined() earlier, theoretically we might
551
 
                # avoid the round trip for fout.close()
552
 
                if mode is not None:
553
 
                    self._get_sftp().chmod(abspath, mode)
554
 
            finally:
555
 
                if fout is not None:
556
 
                    fout.close()
557
 
 
558
 
        if not create_parent_dir:
559
 
            _open_and_write_file()
560
 
            return
561
 
 
562
 
        # Try error handling to create the parent directory if we need to
563
 
        try:
564
 
            _open_and_write_file()
565
 
        except NoSuchFile:
566
 
            # Try to create the parent directory, and then go back to
567
 
            # writing the file
568
 
            parent_dir = os.path.dirname(abspath)
569
 
            self._mkdir(parent_dir, dir_mode)
570
 
            _open_and_write_file()
571
 
 
572
 
    def put_file_non_atomic(self, relpath, f, mode=None,
573
 
                            create_parent_dir=False,
574
 
                            dir_mode=None):
575
 
        """Copy the file-like object into the target location.
576
 
 
577
 
        This function is not strictly safe to use. It is only meant to
578
 
        be used when you already know that the target does not exist.
579
 
        It is not safe, because it will open and truncate the remote
580
 
        file. So there may be a time when the file has invalid contents.
581
 
 
582
 
        :param relpath: The remote location to put the contents.
583
 
        :param f:       File-like object.
584
 
        :param mode:    Possible access permissions for new file.
585
 
                        None means do not set remote permissions.
586
 
        :param create_parent_dir: If we cannot create the target file because
587
 
                        the parent directory does not exist, go ahead and
588
 
                        create it, and then try again.
589
 
        """
590
 
        def writer(fout):
591
 
            self._pump(f, fout)
592
 
        self._put_non_atomic_helper(relpath, writer, mode=mode,
593
 
                                    create_parent_dir=create_parent_dir,
594
 
                                    dir_mode=dir_mode)
595
 
 
596
 
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
597
 
                             create_parent_dir=False,
598
 
                             dir_mode=None):
599
 
        def writer(fout):
600
 
            fout.write(bytes)
601
 
        self._put_non_atomic_helper(relpath, writer, mode=mode,
602
 
                                    create_parent_dir=create_parent_dir,
603
 
                                    dir_mode=dir_mode)
604
 
 
605
 
    def iter_files_recursive(self):
606
 
        """Walk the relative paths of all files in this transport."""
607
 
        # progress is handled by list_dir
608
 
        queue = list(self.list_dir('.'))
609
 
        while queue:
610
 
            relpath = queue.pop(0)
611
 
            st = self.stat(relpath)
612
 
            if stat.S_ISDIR(st.st_mode):
613
 
                for i, basename in enumerate(self.list_dir(relpath)):
614
 
                    queue.insert(i, relpath+'/'+basename)
615
 
            else:
616
 
                yield relpath
617
 
 
618
 
    def _mkdir(self, abspath, mode=None):
619
 
        if mode is None:
620
 
            local_mode = 0777
621
 
        else:
622
 
            local_mode = mode
623
 
        try:
624
 
            self._report_activity(len(abspath), 'write')
625
 
            self._get_sftp().mkdir(abspath, local_mode)
626
 
            self._report_activity(1, 'read')
627
 
            if mode is not None:
628
 
                # chmod a dir through sftp will erase any sgid bit set
629
 
                # on the server side.  So, if the bit mode are already
630
 
                # set, avoid the chmod.  If the mode is not fine but
631
 
                # the sgid bit is set, report a warning to the user
632
 
                # with the umask fix.
633
 
                stat = self._get_sftp().lstat(abspath)
634
 
                mode = mode & 0777 # can't set special bits anyway
635
 
                if mode != stat.st_mode & 0777:
636
 
                    if stat.st_mode & 06000:
637
 
                        warning('About to chmod %s over sftp, which will result'
638
 
                                ' in its suid or sgid bits being cleared.  If'
639
 
                                ' you want to preserve those bits, change your '
640
 
                                ' environment on the server to use umask 0%03o.'
641
 
                                % (abspath, 0777 - mode))
642
 
                    self._get_sftp().chmod(abspath, mode=mode)
643
 
        except (paramiko.SSHException, IOError), e:
644
 
            self._translate_io_exception(e, abspath, ': unable to mkdir',
645
 
                failure_exc=FileExists)
646
 
 
647
 
    def mkdir(self, relpath, mode=None):
648
 
        """Create a directory at the given path."""
649
 
        self._mkdir(self._remote_path(relpath), mode=mode)
650
 
 
651
 
    def open_write_stream(self, relpath, mode=None):
652
 
        """See Transport.open_write_stream."""
653
 
        # initialise the file to zero-length
654
 
        # this is three round trips, but we don't use this
655
 
        # api more than once per write_group at the moment so
656
 
        # it is a tolerable overhead. Better would be to truncate
657
 
        # the file after opening. RBC 20070805
658
 
        self.put_bytes_non_atomic(relpath, "", mode)
659
 
        abspath = self._remote_path(relpath)
660
 
        # TODO: jam 20060816 paramiko doesn't publicly expose a way to
661
 
        #       set the file mode at create time. If it does, use it.
662
 
        #       But for now, we just chmod later anyway.
663
 
        handle = None
664
 
        try:
665
 
            handle = self._get_sftp().file(abspath, mode='wb')
666
 
            handle.set_pipelined(True)
667
 
        except (paramiko.SSHException, IOError), e:
668
 
            self._translate_io_exception(e, abspath,
669
 
                                         ': unable to open')
670
 
        _file_streams[self.abspath(relpath)] = handle
671
 
        return FileFileStream(self, relpath, handle)
672
 
 
673
 
    def _translate_io_exception(self, e, path, more_info='',
674
 
                                failure_exc=PathError):
675
 
        """Translate a paramiko or IOError into a friendlier exception.
676
 
 
677
 
        :param e: The original exception
678
 
        :param path: The path in question when the error is raised
679
 
        :param more_info: Extra information that can be included,
680
 
                          such as what was going on
681
 
        :param failure_exc: Paramiko has the super fun ability to raise completely
682
 
                           opaque errors that just set "e.args = ('Failure',)" with
683
 
                           no more information.
684
 
                           If this parameter is set, it defines the exception
685
 
                           to raise in these cases.
686
 
        """
687
 
        # paramiko seems to generate detailless errors.
688
 
        self._translate_error(e, path, raise_generic=False)
689
 
        if getattr(e, 'args', None) is not None:
690
 
            if (e.args == ('No such file or directory',) or
691
 
                e.args == ('No such file',)):
692
 
                raise NoSuchFile(path, str(e) + more_info)
693
 
            if (e.args == ('mkdir failed',) or
694
 
                e.args[0].startswith('syserr: File exists')):
695
 
                raise FileExists(path, str(e) + more_info)
696
 
            # strange but true, for the paramiko server.
697
 
            if (e.args == ('Failure',)):
698
 
                raise failure_exc(path, str(e) + more_info)
699
 
            # Can be something like args = ('Directory not empty:
700
 
            # '/srv/bazaar.launchpad.net/blah...: '
701
 
            # [Errno 39] Directory not empty',)
702
 
            if (e.args[0].startswith('Directory not empty: ')
703
 
                or getattr(e, 'errno', None) == errno.ENOTEMPTY):
704
 
                raise errors.DirectoryNotEmpty(path, str(e))
705
 
            if e.args == ('Operation unsupported',):
706
 
                raise errors.TransportNotPossible()
707
 
            mutter('Raising exception with args %s', e.args)
708
 
        if getattr(e, 'errno', None) is not None:
709
 
            mutter('Raising exception with errno %s', e.errno)
710
 
        raise e
711
 
 
712
 
    def append_file(self, relpath, f, mode=None):
713
 
        """
714
 
        Append the text in the file-like object into the final
715
 
        location.
716
 
        """
717
 
        try:
718
 
            path = self._remote_path(relpath)
719
 
            fout = self._get_sftp().file(path, 'ab')
720
 
            if mode is not None:
721
 
                self._get_sftp().chmod(path, mode)
722
 
            result = fout.tell()
723
 
            self._pump(f, fout)
724
 
            return result
725
 
        except (IOError, paramiko.SSHException), e:
726
 
            self._translate_io_exception(e, relpath, ': unable to append')
727
 
 
728
 
    def rename(self, rel_from, rel_to):
729
 
        """Rename without special overwriting"""
730
 
        try:
731
 
            self._get_sftp().rename(self._remote_path(rel_from),
732
 
                              self._remote_path(rel_to))
733
 
        except (IOError, paramiko.SSHException), e:
734
 
            self._translate_io_exception(e, rel_from,
735
 
                    ': unable to rename to %r' % (rel_to))
736
 
 
737
 
    def _rename_and_overwrite(self, abs_from, abs_to):
738
 
        """Do a fancy rename on the remote server.
739
 
 
740
 
        Using the implementation provided by osutils.
741
 
        """
742
 
        try:
743
 
            sftp = self._get_sftp()
744
 
            fancy_rename(abs_from, abs_to,
745
 
                         rename_func=sftp.rename,
746
 
                         unlink_func=sftp.remove)
747
 
        except (IOError, paramiko.SSHException), e:
748
 
            self._translate_io_exception(e, abs_from,
749
 
                                         ': unable to rename to %r' % (abs_to))
750
 
 
751
 
    def move(self, rel_from, rel_to):
752
 
        """Move the item at rel_from to the location at rel_to"""
753
 
        path_from = self._remote_path(rel_from)
754
 
        path_to = self._remote_path(rel_to)
755
 
        self._rename_and_overwrite(path_from, path_to)
756
 
 
757
 
    def delete(self, relpath):
758
 
        """Delete the item at relpath"""
759
 
        path = self._remote_path(relpath)
760
 
        try:
761
 
            self._get_sftp().remove(path)
762
 
        except (IOError, paramiko.SSHException), e:
763
 
            self._translate_io_exception(e, path, ': unable to delete')
764
 
 
765
 
    def external_url(self):
766
 
        """See bzrlib.transport.Transport.external_url."""
767
 
        # the external path for SFTP is the base
768
 
        return self.base
769
 
 
770
 
    def listable(self):
771
 
        """Return True if this store supports listing."""
772
 
        return True
773
 
 
774
 
    def list_dir(self, relpath):
775
 
        """
776
 
        Return a list of all files at the given location.
777
 
        """
778
 
        # does anything actually use this?
779
 
        # -- Unknown
780
 
        # This is at least used by copy_tree for remote upgrades.
781
 
        # -- David Allouche 2006-08-11
782
 
        path = self._remote_path(relpath)
783
 
        try:
784
 
            entries = self._get_sftp().listdir(path)
785
 
            self._report_activity(sum(map(len, entries)), 'read')
786
 
        except (IOError, paramiko.SSHException), e:
787
 
            self._translate_io_exception(e, path, ': failed to list_dir')
788
 
        return [urlutils.escape(entry) for entry in entries]
789
 
 
790
 
    def rmdir(self, relpath):
791
 
        """See Transport.rmdir."""
792
 
        path = self._remote_path(relpath)
793
 
        try:
794
 
            return self._get_sftp().rmdir(path)
795
 
        except (IOError, paramiko.SSHException), e:
796
 
            self._translate_io_exception(e, path, ': failed to rmdir')
797
 
 
798
 
    def stat(self, relpath):
799
 
        """Return the stat information for a file."""
800
 
        path = self._remote_path(relpath)
801
 
        try:
802
 
            return self._get_sftp().lstat(path)
803
 
        except (IOError, paramiko.SSHException), e:
804
 
            self._translate_io_exception(e, path, ': unable to stat')
805
 
 
806
 
    def readlink(self, relpath):
807
 
        """See Transport.readlink."""
808
 
        path = self._remote_path(relpath)
809
 
        try:
810
 
            return self._get_sftp().readlink(path)
811
 
        except (IOError, paramiko.SSHException), e:
812
 
            self._translate_io_exception(e, path, ': unable to readlink')
813
 
 
814
 
    def symlink(self, source, link_name):
815
 
        """See Transport.symlink."""
816
 
        try:
817
 
            conn = self._get_sftp()
818
 
            sftp_retval = conn.symlink(source, link_name)
819
 
            if SFTP_OK != sftp_retval:
820
 
                raise TransportError(
821
 
                    '%r: unable to create symlink to %r' % (link_name, source),
822
 
                    sftp_retval
823
 
                )
824
 
        except (IOError, paramiko.SSHException), e:
825
 
            self._translate_io_exception(e, link_name,
826
 
                                         ': unable to create symlink to %r' % (source))
827
 
 
828
 
    def lock_read(self, relpath):
829
 
        """
830
 
        Lock the given file for shared (read) access.
831
 
        :return: A lock object, which has an unlock() member function
832
 
        """
833
 
        # FIXME: there should be something clever i can do here...
834
 
        class BogusLock(object):
835
 
            def __init__(self, path):
836
 
                self.path = path
837
 
            def unlock(self):
838
 
                pass
839
 
        return BogusLock(relpath)
840
 
 
841
 
    def lock_write(self, relpath):
842
 
        """
843
 
        Lock the given file for exclusive (write) access.
844
 
        WARNING: many transports do not support this, so trying avoid using it
845
 
 
846
 
        :return: A lock object, which has an unlock() member function
847
 
        """
848
 
        # This is a little bit bogus, but basically, we create a file
849
 
        # which should not already exist, and if it does, we assume
850
 
        # that there is a lock, and if it doesn't, the we assume
851
 
        # that we have taken the lock.
852
 
        return SFTPLock(relpath, self)
853
 
 
854
 
    def _sftp_open_exclusive(self, abspath, mode=None):
855
 
        """Open a remote path exclusively.
856
 
 
857
 
        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
858
 
        the file already exists. However it does not expose this
859
 
        at the higher level of SFTPClient.open(), so we have to
860
 
        sneak away with it.
861
 
 
862
 
        WARNING: This breaks the SFTPClient abstraction, so it
863
 
        could easily break against an updated version of paramiko.
864
 
 
865
 
        :param abspath: The remote absolute path where the file should be opened
866
 
        :param mode: The mode permissions bits for the new file
867
 
        """
868
 
        # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
869
 
        #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
870
 
        #       However, there is no way to set the permission mode at open
871
 
        #       time using the sftp_client.file() functionality.
872
 
        path = self._get_sftp()._adjust_cwd(abspath)
873
 
        # mutter('sftp abspath %s => %s', abspath, path)
874
 
        attr = SFTPAttributes()
875
 
        if mode is not None:
876
 
            attr.st_mode = mode
877
 
        omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
878
 
                | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
879
 
        try:
880
 
            t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
881
 
            if t != CMD_HANDLE:
882
 
                raise TransportError('Expected an SFTP handle')
883
 
            handle = msg.get_string()
884
 
            return SFTPFile(self._get_sftp(), handle, 'wb', -1)
885
 
        except (paramiko.SSHException, IOError), e:
886
 
            self._translate_io_exception(e, abspath, ': unable to open',
887
 
                failure_exc=FileExists)
888
 
 
889
 
    def _can_roundtrip_unix_modebits(self):
890
 
        if sys.platform == 'win32':
891
 
            # anyone else?
892
 
            return False
893
 
        else:
894
 
            return True
895
 
 
896
 
 
897
 
def get_test_permutations():
898
 
    """Return the permutations to be used in testing."""
899
 
    from bzrlib.tests import stub_sftp
900
 
    return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
901
 
            (SFTPTransport, stub_sftp.SFTPHomeDirServer),
902
 
            (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
903
 
            ]